免费A级毛片无码专区网站-成人国产精品视频一区二区-啊 日出水了 用力乖乖在线-国产黑色丝袜在线观看下-天天操美女夜夜操美女-日韩网站在线观看中文字幕-AV高清hd片XXX国产-亚洲av中文字字幕乱码综合-搬开女人下面使劲插视频

Flink的異步算子的原理及使用

1、簡(jiǎn)介Flink的特點(diǎn)是高吞吐低延遲 。但是Flink中的某環(huán)節(jié)的數(shù)據(jù)處理邏輯需要和外部系統(tǒng)交互,調(diào)用耗時(shí)不可控會(huì)顯著降低集群性能 。這時(shí)候就可能需要使用異步算子讓耗時(shí)操作不需要等待結(jié)果返回就可以繼續(xù)下面的耗時(shí)操作 。
2、本章可以了解到啥

  • 異步算子源碼分析
  • 異步算子為啥能夠保證有序性
  • flinksql中怎么自定義使用異步lookup join
3、異步算子的測(cè)試代碼import java.io.Serializable;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * 網(wǎng)上copy的模擬一個(gè)耗時(shí)的異步操作 */public class AsyncIODemo implements Serializable {private final ExecutorService executorService = Executors.newFixedThreadPool(4);public CompletableFuture<String> pullData(final String source) {CompletableFuture<String> completableFuture = new CompletableFuture<>();executorService.submit(() -> {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}/*** 前面睡眠幾秒后,調(diào)用一下完成方法,拼接一個(gè)結(jié)果字符串*/completableFuture.complete("Output value: " + source);});return completableFuture;}}import org.apache.flink.streaming.api.datastream.AsyncDataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.async.AsyncFunction;import org.apache.flink.streaming.api.functions.async.ResultFuture;import java.util.Arrays;import java.util.concurrent.CompletableFuture;import java.util.concurrent.TimeUnit;/** * 網(wǎng)上copy的代碼 */public class AsyncTest {public static void main(String[] args) throws Exception {/*** 獲取Flink執(zhí)行環(huán)境并設(shè)置并行度為1,方便后面觀測(cè)*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/*** 構(gòu)造一個(gè)DataStreamSource的序列*/DataStreamSource stream = env.fromElements("11", "22", "33", "44");/*** 使用AsyncDataStream構(gòu)造一個(gè)異步順序流,這里異步順序流從名字就可以看出來(lái)雖然是異步的,但是卻可以保持順序,* 這個(gè)后面源碼分析可以知道原因*/SingleOutputStreamOperator asyncStream = AsyncDataStream.orderedWait(stream, new AsyncFunction<String, String>() {@Overridepublic void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {/*** 這里調(diào)用模擬的獲取異步請(qǐng)求結(jié)果,并返回一個(gè)CompletableFuture*/CompletableFuture<String> future = new AsyncIODemo().pullData(input);/*** 注冊(cè)一個(gè)future處理完成的回調(diào),當(dāng)future處理完成拿到結(jié)果后,調(diào)用resultFuture的* complete方法真正吐出數(shù)據(jù)*/future.whenCompleteAsync((d,t) ->{resultFuture.complete(Arrays.asList(d));});}// 設(shè)置最長(zhǎng)異步調(diào)用超時(shí)時(shí)間為10秒}, 10, TimeUnit.SECONDS);asyncStream.print();env.execute();}}4、異步算子源碼分析4.1、AsyncDataStreampackage org.apache.flink.streaming.api.datastream;import org.apache.flink.annotation.PublicEvolving;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.Utils;import org.apache.flink.api.java.typeutils.TypeExtractor;import org.apache.flink.streaming.api.functions.async.AsyncFunction;import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;import java.util.concurrent.TimeUnit;/** * 用于將AsyncFunction應(yīng)用到數(shù)據(jù)流的一個(gè)helper類 * * <pre>{@code * DataStream<String> input = ... * AsyncFunction<String, Tuple<String, String>> asyncFunc = ... * * AsyncDataStream.orderedWait(input, asyncFunc, timeout, TimeUnit.MILLISECONDS, 100); * }</pre> */@PublicEvolvingpublic class AsyncDataStream {/** 異步操作的輸出模式,有序或者無(wú)序. */public enum OutputMode {ORDERED,UNORDERED}private static final int DEFAULT_QUEUE_CAPACITY = 100;/*** flag_2,添加一個(gè)AsyncWaitOperator.** @param in The {@link DataStream} where the {@link AsyncWaitOperator} will be added.* @param func {@link AsyncFunction} wrapped inside {@link AsyncWaitOperator}.* @param timeout for the asynchronous operation to complete* @param bufSize The max number of inputs the {@link AsyncWaitOperator} can hold inside.* @param mode Processing mode for {@link AsyncWaitOperator}.* @param <IN> Input type.* @param <OUT> Output type.* @return A new {@link SingleOutputStreamOperator}*/private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(DataStream<IN> in,AsyncFunction<IN, OUT> func,long timeout,int bufSize,OutputMode mode) {TypeInformation<OUT> outTypeInfo =TypeExtractor.getUnaryOperatorReturnType(func,AsyncFunction.class,0,1,new int[] {1, 0},in.getType(),Utils.getCallLocationName(),true);/**這里生成了一個(gè)AsyncWaitOperatorFactory*/AsyncWaitOperatorFactory<IN, OUT> operatorFactory =new AsyncWaitOperatorFactory<>(in.getExecutionEnvironment().clean(func), timeout, bufSize, mode);return in.transform("async wait operator", outTypeInfo, operatorFactory);}/*** 添加一個(gè)AsyncWaitOperator 。輸出流無(wú)順序 。** @param in Input {@link DataStream}* @param func {@link AsyncFunction}* @param timeout for the asynchronous operation to complete* @param timeUnit of the given timeout* @param capacity The max number of async i/o operation that can be triggered* @param <IN> Type of input record* @param <OUT> Type of output record* @return A new {@link SingleOutputStreamOperator}.*/public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(DataStream<IN> in,AsyncFunction<IN, OUT> func,long timeout,TimeUnit timeUnit,int capacity) {return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED);}/*** 添加一個(gè)AsyncWaitOperator 。輸出流無(wú)順序 。* @param in Input {@link DataStream}* @param func {@link AsyncFunction}* @param timeout for the asynchronous operation to complete* @param timeUnit of the given timeout* @param <IN> Type of input record* @param <OUT> Type of output record* @return A new {@link SingleOutputStreamOperator}.*/public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit) {return addOperator(in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.UNORDERED);}/*** flag_1,添加一個(gè)AsyncWaitOperator 。處理輸入記錄的順序保證與輸入記錄的順序相同** @param in Input {@link DataStream}* @param func {@link AsyncFunction}* @param timeout for the asynchronous operation to complete* @param timeUnit of the given timeout* @param capacity The max number of async i/o operation that can be triggered* @param <IN> Type of input record* @param <OUT> Type of output record* @return A new {@link SingleOutputStreamOperator}.*/public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(DataStream<IN> in,AsyncFunction<IN, OUT> func,long timeout,TimeUnit timeUnit,int capacity) {return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);}/*** 添加一個(gè)AsyncWaitOperator 。處理輸入記錄的順序保證與輸入記錄的順序相同* @param in Input {@link DataStream}* @param func {@link AsyncFunction}* @param timeout for the asynchronous operation to complete* @param timeUnit of the given timeout* @param <IN> Type of input record* @param <OUT> Type of output record* @return A new {@link SingleOutputStreamOperator}.*/public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit) {return addOperator(in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.ORDERED);}}

經(jīng)驗(yàn)總結(jié)擴(kuò)展閱讀