使用

 SingleOutputStreamOperator<String> sream = AsyncDataStream.unorderedWait(stream,
               new AsyncMySQLRequest()
                , 20000, TimeUnit.MILLISECONDS, 30);
// mysql
public class AsyncMySQLRequest extends RichAsyncFunction<String, String> {

    private transient DruidDataSource dataSource;

    private transient ExecutorService executorService;

    public AsyncMySQLRequest(){
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        executorService = Executors.newFixedThreadPool(10);

        dataSource = (DruidDataSource) DruidDataSourceFactory.createDataSource(PropertyUtil.getPropertiesByPrefix("db.ylzslave."));
    }

    @Override
    public void asyncInvoke(String in, ResultFuture<String> resultFuture) throws Exception {

        Future<String> future = executorService.submit(() -> {
            return queryFromMySql(in);
        });

        CompletableFuture.supplyAsync(() -> {
            try {
                return future.get();
            } catch (Exception e) {
                return null;
            }
        }).thenAccept((dbResult) -> {
            resultFuture.complete(Collections.singleton(dbResult));
        });
    }

    private String queryFromMySql(String in) throws SQLException {
        Connection connection = null;
        PreparedStatement stmt = null;
        ResultSet rs = null;
        String result = in;
        try {
            connection = dataSource.getConnection();
            stmt = connection.prepareStatement("select 1");
            rs = stmt.executeQuery();
            while(rs.next()){
                result += rs.getInt(1);
            }
        } finally {
            if (rs != null) {
                rs.close();
            }
            if (stmt != null) {
                stmt.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
        return result;
    }

    @Override
    public void close() throws Exception {
        dataSource.close();
        executorService.shutdown();
    }
}
// es
public class AsyncEsDataRequest extends RichAsyncFunction<Tuple2<String, String>, Tuple3<String, String, String>> {

    private transient RestHighLevelClient restHighLevelClient;

    private final ObjectMapper mapper = new ObjectMapper();

    @Override
    public void open(Configuration parameters) throws Exception {
        HttpHost httpHost = new HttpHost("swarm-manager", 9200, "http");
        //初始化ElasticSearch-Client
        restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHost));
    }

    @Override
    public void close() throws Exception {
        restHighLevelClient.close();
    }

    @Override
    public void asyncInvoke(Tuple2<String, String> input, ResultFuture<Tuple3<String, String, String>> resultFuture) throws Exception {
        search(input, resultFuture);
    }

    //异步去读Es表
    private void search(Tuple2<String, String> input, ResultFuture<Tuple3<String, String, String>> resultFuture) {
        SearchRequest searchRequest = new SearchRequest("renyuanku");
        String uid = input.f0;
        QueryBuilder builder = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("uid", uid));
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(builder);
        searchRequest.source(sourceBuilder);
        ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {
            String uid = input.f0;
            String phone = input.f1;
            String username = null;

            //成功
            @SneakyThrows
            @Override
            public void onResponse(SearchResponse searchResponse) {
                SearchHit[] searchHits = searchResponse.getHits().getHits();
                if (searchHits.length > 0) {
                    JsonNode node = mapper.readValue(searchHits[0].getSourceAsString(), JsonNode.class);
                    username = node.get("username").toString();

                }
                resultFuture.complete(Collections.singleton(Tuple3.create(uid, username, phone)));
            }

            //失败
            @Override
            public void onFailure(Exception e) {
                System.out.println(e.getMessage());
                resultFuture.complete(Collections.singleton(Tuple3.create(uid, username, phone)));
            }
        };
        restHighLevelClient.searchAsync(searchRequest, listener);
    }
}