使用
SingleOutputStreamOperator<String> sream = AsyncDataStream.unorderedWait(stream,
new AsyncMySQLRequest()
, 20000, TimeUnit.MILLISECONDS, 30);
// mysql
public class AsyncMySQLRequest extends RichAsyncFunction<String, String> {
private transient DruidDataSource dataSource;
private transient ExecutorService executorService;
public AsyncMySQLRequest(){
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executorService = Executors.newFixedThreadPool(10);
dataSource = (DruidDataSource) DruidDataSourceFactory.createDataSource(PropertyUtil.getPropertiesByPrefix("db.ylzslave."));
}
@Override
public void asyncInvoke(String in, ResultFuture<String> resultFuture) throws Exception {
Future<String> future = executorService.submit(() -> {
return queryFromMySql(in);
});
CompletableFuture.supplyAsync(() -> {
try {
return future.get();
} catch (Exception e) {
return null;
}
}).thenAccept((dbResult) -> {
resultFuture.complete(Collections.singleton(dbResult));
});
}
private String queryFromMySql(String in) throws SQLException {
Connection connection = null;
PreparedStatement stmt = null;
ResultSet rs = null;
String result = in;
try {
connection = dataSource.getConnection();
stmt = connection.prepareStatement("select 1");
rs = stmt.executeQuery();
while(rs.next()){
result += rs.getInt(1);
}
} finally {
if (rs != null) {
rs.close();
}
if (stmt != null) {
stmt.close();
}
if (connection != null) {
connection.close();
}
}
return result;
}
@Override
public void close() throws Exception {
dataSource.close();
executorService.shutdown();
}
}
// es
public class AsyncEsDataRequest extends RichAsyncFunction<Tuple2<String, String>, Tuple3<String, String, String>> {
private transient RestHighLevelClient restHighLevelClient;
private final ObjectMapper mapper = new ObjectMapper();
@Override
public void open(Configuration parameters) throws Exception {
HttpHost httpHost = new HttpHost("swarm-manager", 9200, "http");
//初始化ElasticSearch-Client
restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHost));
}
@Override
public void close() throws Exception {
restHighLevelClient.close();
}
@Override
public void asyncInvoke(Tuple2<String, String> input, ResultFuture<Tuple3<String, String, String>> resultFuture) throws Exception {
search(input, resultFuture);
}
//异步去读Es表
private void search(Tuple2<String, String> input, ResultFuture<Tuple3<String, String, String>> resultFuture) {
SearchRequest searchRequest = new SearchRequest("renyuanku");
String uid = input.f0;
QueryBuilder builder = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("uid", uid));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(builder);
searchRequest.source(sourceBuilder);
ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {
String uid = input.f0;
String phone = input.f1;
String username = null;
//成功
@SneakyThrows
@Override
public void onResponse(SearchResponse searchResponse) {
SearchHit[] searchHits = searchResponse.getHits().getHits();
if (searchHits.length > 0) {
JsonNode node = mapper.readValue(searchHits[0].getSourceAsString(), JsonNode.class);
username = node.get("username").toString();
}
resultFuture.complete(Collections.singleton(Tuple3.create(uid, username, phone)));
}
//失败
@Override
public void onFailure(Exception e) {
System.out.println(e.getMessage());
resultFuture.complete(Collections.singleton(Tuple3.create(uid, username, phone)));
}
};
restHighLevelClient.searchAsync(searchRequest, listener);
}
}