下一页亚洲11p_国产精品亚洲第一页在线观看_麻豆影院在线观看免费视频_亚洲色播在线观看_激情亚洲欧美另类小说图片

您現(xiàn)在的位置 :

首頁  >  企業(yè)新聞 >  > 正文

環(huán)球時(shí)訊:聊聊Flink必知必會(huì)(四)

時(shí)間 :2023-06-16 21:19:08   來源 : 博客園

概述

Flink Streaming API借鑒了谷歌數(shù)據(jù)流模型(Google Data Flow Model),它的流API支持不同的時(shí)間概念。Flink明確支持以下3個(gè)不同的時(shí)間概念。


(資料圖)

Flink明確支持以下3個(gè)不同的時(shí)間概念。(1)事件時(shí)間:事件發(fā)生的時(shí)間,由產(chǎn)生(或存儲(chǔ))事件的設(shè)備記錄。

(2)接入時(shí)間:Flink在接入事件時(shí)記錄的時(shí)間戳。

(3)處理時(shí)間:管道中特定操作符處理事件的時(shí)間。

支持事件時(shí)間的流處理器需要一種方法來度量事件時(shí)間的進(jìn)度。在Flink中測量事件時(shí)間進(jìn)展的機(jī)制是水印(watermark)。水印是一種特殊類型的事件,是告訴系統(tǒng)事件時(shí)間進(jìn)度的一種方式。水印流是數(shù)據(jù)流的一部分,并帶有時(shí)間戳t。水印(t)聲明事件時(shí)間已經(jīng)到達(dá)該流中的時(shí)間t,這意味著時(shí)間戳t′≤t(時(shí)間戳更早或等于水印的事件)的流中不應(yīng)該有更多的元素。

Flink中水印的處理

水印的時(shí)間戳

時(shí)間t的水印標(biāo)記了數(shù)據(jù)流中的一個(gè)位置,并斷言此時(shí)的流在時(shí)間t之前已經(jīng)完成。為了執(zhí)行基于事件時(shí)間的事件處理,F(xiàn)link需要知道與每個(gè)事件相關(guān)聯(lián)的時(shí)間,它還需要包含水印的流。水印就是系統(tǒng)事件時(shí)間的時(shí)鐘。水印觸發(fā)是基于事件時(shí)間的計(jì)時(shí)器的觸發(fā)。

事件流的類型有兩種,一個(gè)是順序的,一個(gè)是無序的。先看順序場景下,水印的排列。

對(duì)于無序流,水印是至關(guān)重要的,其中事件不是按照它們的時(shí)間戳排序的。

例如,當(dāng)操作符接收到w(11)這條水印時(shí),可以認(rèn)為時(shí)間戳小于或等于11的數(shù)據(jù)已經(jīng)到達(dá),此時(shí)可以觸發(fā)計(jì)算。同樣,當(dāng)接收到w(17)這條水印時(shí),可以認(rèn)為時(shí)間戳小于或等于17的數(shù)據(jù)已經(jīng)到達(dá),此時(shí)可以觸發(fā)計(jì)算。

可以看出,水印的時(shí)間戳是單調(diào)遞增的,時(shí)間戳為t的水印意味著所有后續(xù)記錄的時(shí)間戳將大于t。一般來講,水印是一種聲明,在流中的那個(gè)點(diǎn)之前,即在某個(gè)時(shí)間戳之前的所有事件都應(yīng)該已經(jīng)到達(dá)。

水印是在源函數(shù)處或直接在源函數(shù)之后生成的。源函數(shù)的每個(gè)并行子任務(wù)通常可以獨(dú)立地生成水印。這些水印定義了特定并行源處的事件時(shí)間。

水印的生成

Flink提供了用于處理事件時(shí)間、時(shí)間戳和水印的API。

為了處理事件時(shí)間,F(xiàn)link流程序需要知道事件的時(shí)間戳,這意味著流中的每個(gè)元素都需要分配其事件時(shí)間戳。這通常是通過TimestampAssigner從元素中的某個(gè)字段訪問/提取時(shí)間戳實(shí)現(xiàn)的。

Flink提供了兩種方式創(chuàng)建水印。

1.使用WatermarkStrategy上的靜態(tài)輔助方法實(shí)現(xiàn)公共水印策略:

2.實(shí)現(xiàn)WatermarkStrategy接口,自定義TimestampAssigner與WatermarkGenerator捆綁在一起:

@Publicpublic interface WatermarkStrategy        extends TimestampAssignerSupplier, WatermarkGeneratorSupplier {    @Override    WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);    @Override    default TimestampAssigner createTimestampAssigner(            TimestampAssignerSupplier.Context context) {        return new RecordTimestampAssigner<>();    }    @Experimental    default WatermarkAlignmentParams getAlignmentParameters() {        return WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED;    }    default WatermarkStrategy withTimestampAssigner(            TimestampAssignerSupplier timestampAssigner) {        checkNotNull(timestampAssigner, "timestampAssigner");        return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner);    }    default WatermarkStrategy withTimestampAssigner(            SerializableTimestampAssigner timestampAssigner) {        checkNotNull(timestampAssigner, "timestampAssigner");        return new WatermarkStrategyWithTimestampAssigner<>(                this, TimestampAssignerSupplier.of(timestampAssigner));    }    default WatermarkStrategy withIdleness(Duration idleTimeout) {        checkNotNull(idleTimeout, "idleTimeout");        checkArgument(                !(idleTimeout.isZero() || idleTimeout.isNegative()),                "idleTimeout must be greater than zero");        return new WatermarkStrategyWithIdleness<>(this, idleTimeout);    }    @Experimental    default WatermarkStrategy withWatermarkAlignment(            String watermarkGroup, Duration maxAllowedWatermarkDrift) {        return withWatermarkAlignment(                watermarkGroup,                maxAllowedWatermarkDrift,                WatermarksWithWatermarkAlignment.DEFAULT_UPDATE_INTERVAL);    }    @Experimental    default WatermarkStrategy withWatermarkAlignment(            String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval) {        return new WatermarksWithWatermarkAlignment(                this, watermarkGroup, maxAllowedWatermarkDrift, updateInterval);    }    static  WatermarkStrategy forMonotonousTimestamps() {        return (ctx) -> new AscendingTimestampsWatermarks<>();    }    static  WatermarkStrategy forBoundedOutOfOrderness(Duration maxOutOfOrderness) {        return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);    }    static  WatermarkStrategy forGenerator(WatermarkGeneratorSupplier generatorSupplier) {        return generatorSupplier::createWatermarkGenerator;    }    static  WatermarkStrategy noWatermarks() {        return (ctx) -> new NoWatermarksGenerator<>();    }}

這里面提供了很多靜態(tài)的方法和帶有缺省實(shí)現(xiàn)的方法,只有一個(gè)方法是非default和沒有缺省實(shí)現(xiàn)的,就是createWatermarkGenerator方法。

所以默認(rèn)情況下,我們只需要實(shí)現(xiàn)這個(gè)方法就行了,這個(gè)方法主要是返回一個(gè) WatermarkGenerator。

@Publicpublic interface WatermarkGenerator {/** * Called for every event, allows the watermark generator to examine and remember the * event timestamps, or to emit a watermark based on the event itself. */void onEvent(T event, long eventTimestamp, WatermarkOutput output);/** * Called periodically, and might emit a new watermark, or not. * * 

The interval in which this method is called and Watermarks are generated * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}. */void onPeriodicEmit(WatermarkOutput output);}

這個(gè)方法簡單明了,主要是有兩個(gè)方法:

  • onEvent :每個(gè)元素都會(huì)調(diào)用這個(gè)方法,如果我們想依賴每個(gè)元素生成一個(gè)水印,然后發(fā)射到下游(可選,就是看是否用output來收集水印),我們可以實(shí)現(xiàn)這個(gè)方法.

  • onPeriodicEmit : 如果數(shù)據(jù)量比較大的時(shí)候,我們每條數(shù)據(jù)都生成一個(gè)水印的話,會(huì)影響性能,所以這里還有一個(gè)周期性生成水印的方法。這個(gè)水印的生成周期可以這樣設(shè)置:env.getConfig().setAutoWatermarkInterval(5000L)

標(biāo)簽:

推薦文章

X 關(guān)閉

X 關(guān)閉