1、簡(jiǎn)介Flink的特點(diǎn)是高吞吐低延遲 。但是Flink中的某環(huán)節(jié)的數(shù)據(jù)處理邏輯需要和外部系統(tǒng)交互,調(diào)用耗時(shí)不可控會(huì)顯著降低集群性能 。這時(shí)候就可能需要使用異步算子讓耗時(shí)操作不需要等待結(jié)果返回就可以繼續(xù)下面的耗時(shí)操作 。
2、本章可以了解到啥
- 異步算子源碼分析
- 異步算子為啥能夠保證有序性
- flinksql中怎么自定義使用異步lookup join
import java.io.Serializable;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * 網(wǎng)上copy的模擬一個(gè)耗時(shí)的異步操作 */public class AsyncIODemo implements Serializable {private final ExecutorService executorService = Executors.newFixedThreadPool(4);public CompletableFuture<String> pullData(final String source) {CompletableFuture<String> completableFuture = new CompletableFuture<>();executorService.submit(() -> {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}/*** 前面睡眠幾秒后,調(diào)用一下完成方法,拼接一個(gè)結(jié)果字符串*/completableFuture.complete("Output value: " + source);});return completableFuture;}}import org.apache.flink.streaming.api.datastream.AsyncDataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.async.AsyncFunction;import org.apache.flink.streaming.api.functions.async.ResultFuture;import java.util.Arrays;import java.util.concurrent.CompletableFuture;import java.util.concurrent.TimeUnit;/** * 網(wǎng)上copy的代碼 */public class AsyncTest {public static void main(String[] args) throws Exception {/*** 獲取Flink執(zhí)行環(huán)境并設(shè)置并行度為1,方便后面觀測(cè)*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/*** 構(gòu)造一個(gè)DataStreamSource的序列*/DataStreamSource stream = env.fromElements("11", "22", "33", "44");/*** 使用AsyncDataStream構(gòu)造一個(gè)異步順序流,這里異步順序流從名字就可以看出來(lái)雖然是異步的,但是卻可以保持順序,* 這個(gè)后面源碼分析可以知道原因*/SingleOutputStreamOperator asyncStream = AsyncDataStream.orderedWait(stream, new AsyncFunction<String, String>() {@Overridepublic void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {/*** 這里調(diào)用模擬的獲取異步請(qǐng)求結(jié)果,并返回一個(gè)CompletableFuture*/CompletableFuture<String> future = new AsyncIODemo().pullData(input);/*** 注冊(cè)一個(gè)future處理完成的回調(diào),當(dāng)future處理完成拿到結(jié)果后,調(diào)用resultFuture的* complete方法真正吐出數(shù)據(jù)*/future.whenCompleteAsync((d,t) ->{resultFuture.complete(Arrays.asList(d));});}// 設(shè)置最長(zhǎng)異步調(diào)用超時(shí)時(shí)間為10秒}, 10, TimeUnit.SECONDS);asyncStream.print();env.execute();}}4、異步算子源碼分析4.1、AsyncDataStreampackage org.apache.flink.streaming.api.datastream;import org.apache.flink.annotation.PublicEvolving;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.Utils;import org.apache.flink.api.java.typeutils.TypeExtractor;import org.apache.flink.streaming.api.functions.async.AsyncFunction;import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;import java.util.concurrent.TimeUnit;/** * 用于將AsyncFunction應(yīng)用到數(shù)據(jù)流的一個(gè)helper類 * * <pre>{@code * DataStream<String> input = ... * AsyncFunction<String, Tuple<String, String>> asyncFunc = ... * * AsyncDataStream.orderedWait(input, asyncFunc, timeout, TimeUnit.MILLISECONDS, 100); * }</pre> */@PublicEvolvingpublic class AsyncDataStream {/** 異步操作的輸出模式,有序或者無(wú)序. */public enum OutputMode {ORDERED,UNORDERED}private static final int DEFAULT_QUEUE_CAPACITY = 100;/*** flag_2,添加一個(gè)AsyncWaitOperator.** @param in The {@link DataStream} where the {@link AsyncWaitOperator} will be added.* @param func {@link AsyncFunction} wrapped inside {@link AsyncWaitOperator}.* @param timeout for the asynchronous operation to complete* @param bufSize The max number of inputs the {@link AsyncWaitOperator} can hold inside.* @param mode Processing mode for {@link AsyncWaitOperator}.* @param <IN> Input type.* @param <OUT> Output type.* @return A new {@link SingleOutputStreamOperator}*/private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(DataStream<IN> in,AsyncFunction<IN, OUT> func,long timeout,int bufSize,OutputMode mode) {TypeInformation<OUT> outTypeInfo =TypeExtractor.getUnaryOperatorReturnType(func,AsyncFunction.class,0,1,new int[] {1, 0},in.getType(),Utils.getCallLocationName(),true);/**這里生成了一個(gè)AsyncWaitOperatorFactory*/AsyncWaitOperatorFactory<IN, OUT> operatorFactory =new AsyncWaitOperatorFactory<>(in.getExecutionEnvironment().clean(func), timeout, bufSize, mode);return in.transform("async wait operator", outTypeInfo, operatorFactory);}/*** 添加一個(gè)AsyncWaitOperator 。輸出流無(wú)順序 。** @param in Input {@link DataStream}* @param func {@link AsyncFunction}* @param timeout for the asynchronous operation to complete* @param timeUnit of the given timeout* @param capacity The max number of async i/o operation that can be triggered* @param <IN> Type of input record* @param <OUT> Type of output record* @return A new {@link SingleOutputStreamOperator}.*/public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(DataStream<IN> in,AsyncFunction<IN, OUT> func,long timeout,TimeUnit timeUnit,int capacity) {return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED);}/*** 添加一個(gè)AsyncWaitOperator 。輸出流無(wú)順序 。* @param in Input {@link DataStream}* @param func {@link AsyncFunction}* @param timeout for the asynchronous operation to complete* @param timeUnit of the given timeout* @param <IN> Type of input record* @param <OUT> Type of output record* @return A new {@link SingleOutputStreamOperator}.*/public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit) {return addOperator(in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.UNORDERED);}/*** flag_1,添加一個(gè)AsyncWaitOperator 。處理輸入記錄的順序保證與輸入記錄的順序相同** @param in Input {@link DataStream}* @param func {@link AsyncFunction}* @param timeout for the asynchronous operation to complete* @param timeUnit of the given timeout* @param capacity The max number of async i/o operation that can be triggered* @param <IN> Type of input record* @param <OUT> Type of output record* @return A new {@link SingleOutputStreamOperator}.*/public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(DataStream<IN> in,AsyncFunction<IN, OUT> func,long timeout,TimeUnit timeUnit,int capacity) {return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);}/*** 添加一個(gè)AsyncWaitOperator 。處理輸入記錄的順序保證與輸入記錄的順序相同* @param in Input {@link DataStream}* @param func {@link AsyncFunction}* @param timeout for the asynchronous operation to complete* @param timeUnit of the given timeout* @param <IN> Type of input record* @param <OUT> Type of output record* @return A new {@link SingleOutputStreamOperator}.*/public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit) {return addOperator(in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.ORDERED);}}
經(jīng)驗(yàn)總結(jié)擴(kuò)展閱讀
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 好茶葉的特點(diǎn)
- VLQ & Base64 VLQ 編碼方式的原理及代碼實(shí)現(xiàn)
- Spring Boot 配置 jar 包外面的 Properties 配置文件
- 【C++】從零開(kāi)始的CS:GO逆向分析3——寫(xiě)出一個(gè)透視
- 適合隔夜帶飯的菜譜
- 駝鳥(niǎo)蛋的營(yíng)養(yǎng)價(jià)值
- 2023年10月13日是提車(chē)吉日嗎 2023年10月13日是提車(chē)的黃道吉日嗎
- 一斤糖蒜的糖和醋比例
- 2023年10月13日旅游黃道吉日 2023年10月13日旅游行嗎
- 2023年10月13日出差好不好 2023年10月13日是出差的黃道吉日嗎
