注:博主使用的版本就是:<flink.version>1.16.1</flink.version>
前提环境:
因公司业务需要,使用Flink对数据进行流式处理,具体处理流程就是,从kafka接到数据,然后连续请求十多个接口(算法)对数据进行打标;
主程序:
具体的异步IO代码(随便找一个展示):
package com.wenge.datagroup.storage.process;import com.alibaba.fastjson.JSONObject;
import com.wenge.datagroup.storage.bean.ParamConfig;
import com.wenge.datagroup.storage.common.ArgsConstants;
import com.wenge.datagroup.storage.process.base.BaseETL;
import com.wenge.datagroup.storage.service.YaYiService.YaYiPolarityService;
import com.wenge.datagroup.storage.utils.ConfigUtil;
import com.wenge.datagroup.storage.utils.Funnel;
import com.wenge.datagroup.storage.utils.YaYiUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;@Slf4j
public class AnalyerAsyncIOProcessPolarity {public static DataStream<JSONObject> process(DataStream<JSONObject> dataStream) {log.error("----------------------------开始异步IO处理----------------");String topic = Funnel.contains(ArgsConstants.TOPIC) ? Funnel.getString(ArgsConstants.TOPIC) : "";String configFile = Funnel.contains(ArgsConstants.CONFIG) ? Funnel.getString(ArgsConstants.CONFIG) : "config.properties";int asyncNum = Funnel.contains(ArgsConstants.ASYNC_NUM) ? Funnel.getInt(ArgsConstants.ASYNC_NUM) : ConfigUtil.getInteger(ArgsConstants.ASYNC_NUM);int mapParallelism = Funnel.contains(ArgsConstants.MAP_PARALLELISM) ? Funnel.getInt(ArgsConstants.MAP_PARALLELISM) : ConfigUtil.getInteger(ArgsConstants.MAP_PARALLELISM);int filterParallelism = Funnel.contains(ArgsConstants.FILTER_PARALLELISM) ? Funnel.getInt(ArgsConstants.FILTER_PARALLELISM) : ConfigUtil.getInteger(ArgsConstants.FILTER_PARALLELISM);int TranslateParallelism = (Funnel.contains(ArgsConstants.Translate_MAP_PARALLELISM) ? Funnel.getInt(ArgsConstants.Translate_MAP_PARALLELISM) : ConfigUtil.getInteger(ArgsConstants.Translate_MAP_PARALLELISM));// 异步IORichAsyncFunction richAsyncFunction = new RichAsyncFunction<JSONObject, JSONObject>() {private transient ExecutorService executorService;private ParamConfig paramConfig;private YaYiUtil yaYiUtil;@Overridepublic void open(Configuration parameters) {// 重新加载配置文件log.error("重新加载配置文件");ConfigUtil.setConfigFile(configFile);ConfigUtil.setTopic(topic);ConfigUtil.init();this.executorService = Executors.newFixedThreadPool(asyncNum);paramConfig = new ParamConfig(ConfigUtil.getString("YaYiappKey"), ConfigUtil.getString("YaYiappSecret"));yaYiUtil = new YaYiUtil(paramConfig);}@Overridepublic void close() throws Exception {// 关闭线程池if (executorService != null) {executorService.shutdown();}log.error("----------------------------情感分析-线程池关闭----------------------");}@Overridepublic void timeout(JSONObject input, ResultFuture<JSONObject> resultFuture) {JSONObject data = input;String uuid = data.getString("uuid");log.error("-----------------------数据超时----------------------:{}", uuid);//对超时数据进行处理resultFuture.complete(Collections.singleton(data));}@Overridepublic void asyncInvoke(JSONObject json, ResultFuture<JSONObject> resultFuture) {CompletableFuture.supplyAsync(new Supplier<JSONObject>() {@Overridepublic JSONObject get() {String uuid = json.getString("uuid");long start =System.currentTimeMillis();try {//TODO: 根据业务逻辑进行处理String title = json.getString("title");String content = json.getString("content");String translate_title = json.getString("translate_title");String translate_content = json.getString("translate_content");String languageRecognition = json.getJSONObject("analysis").getString("language");String dataSourceType = json.getJSONObject("platform").getString("data_source_type");if (StringUtils.isNotBlank(translate_content)) {String polarity = new String();Integer polaritySum = 0;//具体算法调用YaYiPolarityService yaYiPolarityService = new YaYiPolarityService();polarity = yaYiPolarityService.yaYiPolarity(translate_content);if (StringUtils.isNotBlank(polarity)) {polaritySum = StringUtils.equals(polarity, "A") ? 0 : StringUtils.equals(polarity, "B") ? 1 : 2;JSONObject analysis = json.getJSONObject("analysis");if (Objects.nonNull(analysis)) {analysis.put("polarity", polaritySum);json.put("analysis", analysis);} else {JSONObject analysisJson = new JSONObject();analysisJson.put("polarity", polaritySum);json.put("analysis", analysisJson);}}log.error("uuid:{},分析后数据:{}", uuid, polarity);long end =System.currentTimeMillis();log.error("uuid:{},分析,耗时:{} ms", uuid,(end-start));}return json;} catch (Exception e) {log.error("--------分析异常:{},数据:{}",uuid, e);return json;}}}, executorService).thenAccept((JSONObject dbResult) -> {resultFuture.complete(Collections.singleton(dbResult));});}};DataStream<JSONObject> downloadStream = AsyncDataStream.unorderedWait(dataStream,richAsyncFunction,50000,TimeUnit.MILLISECONDS,asyncNum).name("qinggan").setParallelism(TranslateParallelism);return downloadStream;}}
问题
1:异步io访问接口直接引发关闭
解决方案:
方法中传递参数不要使用 Set
具体的原因我没深究,只是经过验证在异步IO中使用如下就会导致线程关闭:
改成如下就行:
2:数据格式问题直接引发关闭
解决方案:
整体来说,flink中如果数据格式传输导致错误,就会引发线程关闭,
所以 DataStream 改为DataStream 一定要通过map和filter 筛选
3:flink 消费kafka不提交offset
虽然在new FlinkKafkaConsumer<> 中设置了自动提交间隔,如下:
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");//自动提交的时间间隔properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "20000");//自动提交的时间间隔
但是在实际应用过程中发现,到了设置的20000ms既20s,依然不提交offset,以为flink读取kafka失败。
实际上是因为我们再代码中开启了Checkpoint,就会覆盖kafka的配置,所以是经过规定的Checkpoint时间后才会提交offset
4:flink本地运行,发现默认并行度为6
当我们在本地中可以通过一下开启本地webui模式
// 使用本地模式并开启WebUI
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8083);
streamEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
当时我们本地打开web UI发现,默认的并行度是6
这样有时候影响我们异步调用外部接口的qps设置,
解决方案:
在开发环境中,没有配置文件,默认并行度就是当前机器的 CPU 核心数(巨坑!)
所以我们需要自己手动指定每个算子的并行度,不要使用默认的,可以通过.setParallelism(2)来指定某一个算子的并行度