新聞中心
1.序篇-先說結(jié)論
博主希望你在看完本文后一定要養(yǎng)成這個編程習(xí)慣:使用 DataStream API 實現(xiàn) Flink 任務(wù)時,Watermark Assigner 能靠近 Source 節(jié)點(diǎn)就靠近 Source 節(jié)點(diǎn),盡量前置。

在鶴山等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都做網(wǎng)站、網(wǎng)站設(shè)計 網(wǎng)站設(shè)計制作按需開發(fā),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站設(shè)計,營銷型網(wǎng)站,外貿(mào)網(wǎng)站制作,鶴山網(wǎng)站建設(shè)費(fèi)用合理。
要想問為啥,接著往下看!!!
我從以下幾個章節(jié)說明上述的問題以及為什么這樣建議,希望能拋磚引玉,帶給大家一些啟發(fā)。
- ? 踩坑場景篇-這個坑是啥樣的
- ? 問題排查篇-坑的排查過程
- ? 問題原理解析篇-導(dǎo)致問題的機(jī)制是什么
- ? 避坑篇-如何避免這種問題
- ? 總結(jié)篇
2.踩坑場景篇-這個坑是啥樣的
2.1.需求場景
首先介紹一下這個坑對應(yīng)的一個需求場景以及第一版本的實現(xiàn)代碼。
需求:在電商平臺中,需要根據(jù)網(wǎng)頁在線用戶的心跳日志(每 30s 上報一次用戶心跳日志)計算當(dāng)前這一分鐘在購物車頁面(Shopping-Cart)停留的在線人數(shù)。
數(shù)據(jù)源:每 30s 上報一次的用戶心跳日志(user_id、page、time 三個字段分別對應(yīng) 用戶 id、用戶所在頁面、日志上報時間)
數(shù)據(jù)處理:先過濾出購物車按照時間戳對用戶心跳日志進(jìn)行滾動窗口(Tumble)聚合計算
數(shù)據(jù)匯:每分鐘聚合的結(jié)果數(shù)據(jù)(uv、time兩個字段分別對應(yīng) 購物車頁面的當(dāng)前這一分鐘的同時在線人數(shù)、當(dāng)前這一分鐘的時間戳)
Flink DataStream API 具體實現(xiàn)代碼如下:
public class WatermarkTest {
public static void main(String[] args) throws Exception {
// 獲取到 Flink 環(huán)境,博主自己封裝的接口 FlinkEnv
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
// 設(shè)置并發(fā)度
flinkEnv.env().setParallelism(100);
flinkEnv.env()
// 數(shù)據(jù)源:上報的日志
.addSource(xxx)
// 過濾出 購物車頁面(Shopping-Cart)的數(shù)據(jù)
.filter(new FilterFunction() {
@Override
public boolean filter(SourceModel value) throws Exception {
return value.getPage().equals("Shopping-Cart");
}
})
// 分配 Watermark
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
@Override
public long extractTimestamp(SourceModel element) {
return element.getTime();
}
})
// 為了進(jìn)行合并計算,shuffle 到一個算子中,所以此處返回結(jié)果固定為 0
.keyBy(new KeySelector() {
@Override
public Long getKey(SourceModel value) throws Exception {
return 0L;
}
})
// 開一分鐘的滾動時間時間窗口
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
// 計算 uv 的處理邏輯
.process(new ProcessWindowFunction() {
@Override
public void process(Long aLong, Context context, Iterable elements,
Collector out) throws Exception {
long windowStart = context.window().getStart();
Set s = new HashSet<>();
elements.forEach(new Consumer() {
@Override
public void accept(SourceModel sourceModel) {
s.add(sourceModel.userId);
}
});
out.collect(
SinkModel
.builder()
.uv(s.size())
.time(windowStart)
.build()
);
}
})
// 輸出
.addSink(xxx);
}
// 輸入數(shù)據(jù) Model
@Data
@Builder
private static class SourceModel {
private long userId;
private String page;
private long time;
}
// 輸出數(shù)據(jù) Model
@Data
@Builder
private static class SinkModel {
private long uv;
private long time;
}
}
2.2.問題場景
當(dāng)我們把這個任務(wù)部署到集群環(huán)境運(yùn)行時,卻發(fā)現(xiàn)一直沒有數(shù)據(jù)產(chǎn)出,但是輸入數(shù)據(jù)(用戶心跳日志)是一直有大量數(shù)據(jù)過來的。
3.問題排查篇-坑的排查過程
通過 Flink web ui 定位了下發(fā)現(xiàn)每個算子的表現(xiàn)如下:
- ? Source 算子:一直能夠消費(fèi)到數(shù)據(jù),而且從 web ui 的輸入輸出流量看數(shù)據(jù)量非常大
- ? Filter 算子:Filter 算子有輸入也有輸出,輸入非常大,但是輸出數(shù)據(jù)極少(這里是由于業(yè)務(wù)原因?qū)е碌?,從購物的業(yè)務(wù)上來說只有非常少一部分的人會在購物車頁面停留)
- ? 滾動窗口算子:有極少的輸入數(shù)據(jù),但是一直沒有輸出,并且從 web ui 查看算子的 Watermark 也是沒有的
從這里開始問題就清晰了。
至少從 Flink web ui 上來看是由于窗口算子沒有 Watermark 導(dǎo)致的窗口數(shù)據(jù)沒有觸發(fā)計算。
這時的第一個猜想就是:窗口算子單并發(fā)上面的 Watermark 沒有對齊導(dǎo)致的!!!
接下看一下這個猜想的整體驗證過程:
- ? 由于我們的 Watermark Assigner 是寫在 Filter 算子之后的,因此 Watermark 的生成也是基于 Filter 算子之后的數(shù)據(jù)的。所以想要定位是不是由于上述猜想導(dǎo)致的,我們就需要估算 Filter 算子產(chǎn)出的數(shù)據(jù)量來驗證。
- ? 經(jīng)過驗證,發(fā)現(xiàn) Filter 算子之后產(chǎn)出的數(shù)據(jù),每一分鐘總數(shù)據(jù)量產(chǎn)出到下游算子的不到 60 條。也就是說在我們 100 并發(fā)的任務(wù)上面,每一分鐘最多只有 60 個并發(fā)的 Filter 算子會產(chǎn)出數(shù)據(jù)到下游滾動窗口算子,剩下至少的 40 個并發(fā)的算子沒有發(fā)任何數(shù)據(jù)到下游滾動窗口算子。
- ? 最終,對于下游的滾動窗口算子來說,就沒法做到 Watermark 對齊!因此窗口無法觸發(fā)。
問題原因找到。
4.問題原理解析篇-導(dǎo)致問題的機(jī)制是什么
想要理解 Watermark 對齊 到底是怎么一回事,我們首先要看一下 Flink 中的 Watermark 傳輸及計算機(jī)制:
- Watermark 傳輸方式:廣播。這里的廣播是指 上游算子的一個并發(fā) 會往 能夠連接到的下游算子的所有并發(fā) 廣播,這與上下游算子并發(fā)之間的 Shuffle 機(jī)制有關(guān)。這里的廣播不是說 Flink 提供的 BroadCast 編程 API!!!
舉例:如果一個任務(wù) 100 并發(fā),上下游算子之間 Shuffle 策略是 Forward,那么上游算子的一個并發(fā)的 Watermark 會只往下游算子的連接到的那一個并發(fā)發(fā)送 Watermark;如果策略是 Hash\Rebalance,則上游算子的一個并發(fā)的 Watermark 會往下游算子的所有并發(fā)上發(fā)送 Watermark。
- Watermark 計算方式:下游算子的一個并發(fā)接受到上游算子并發(fā)的 Watermark 之后下游算子當(dāng)前并發(fā)的 Watermark 計算方式(這里的上下游是指有 Channel 連接的),計算公式:
下游算子并發(fā) Watermark = min(上游算子并發(fā) 1 發(fā)送的 Watermark,上游算子并發(fā) 2 發(fā)送的 Watermark......)
即
下游算子并發(fā) Watermark = 所有上游算子并發(fā)發(fā)到下游算子 Watermark 的最小值。
- Watermark 對齊:下游算子并發(fā)的 Watermark 依賴上游算子并發(fā)的 Watermark 差異很大時,這就是 Watermark 沒有對齊,舉例:上有算子一個并發(fā)傳輸?shù)?Watermark 是 23:59 分,另一個并發(fā)傳輸?shù)?Watermark 是 23:00 分,中間查了 59 分鐘,這種情況一般都是異常情況,所以叫做沒有對齊。反之如果 Watermark 差異很小,則叫做Watermark 對齊。
再來一張圖看看 Watermark 的傳輸過程,加深理解:
Watermark 傳播
回到上述案例中,一分鐘上游算子只有 60 個并發(fā)有數(shù)據(jù),發(fā)送了 Watermark 到下游窗口算子,其余 40 個毛都沒有。
所以下游窗口算子的 Watermark 就沒有,因此窗口也就不觸發(fā)了。
5.避坑篇-如何避免這種問題
在上述場景中,其實問題的根本原因就是數(shù)據(jù)經(jīng)過(購物車頁)條件過濾之后,數(shù)據(jù)量變得非常少了。
Watermark Assigner 從極少量的數(shù)據(jù)中去生成極少量 Watermark,有 40 個并發(fā)都沒有 Watermark 生成,下游算子就出現(xiàn)了Watermark 對不齊 的情景。
那么解決方案也很簡單,就是多生成一些 Watermark,確保:
雖然 Filter 之后的數(shù)據(jù)很少,F(xiàn)ilter 算子處理過后,每個并發(fā)上面都有足夠的 Watermark 來傳遞到下游窗口算子,來持續(xù)的觸發(fā)窗口的計算和結(jié)果產(chǎn)出。
具體解決方案:將 Watermark Assigner 重寫到 Source 算子之后,F(xiàn)ilter 算子之前。代碼如下:
public class WatermarkTest {
public static void main(String[] args) throws Exception {
// 獲取到 Flink 環(huán)境,博主自己封裝的接口 FlinkEnv
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
flinkEnv.env().setParallelism(100);
flinkEnv.env()
// 數(shù)據(jù)源
.addSource(xxx)
// 分配 Watermark,移到 Filter 之前
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
@Override
public long extractTimestamp(SourceModel element) {
return element.getTime();
}
})
// 過濾出 購物車頁面(Shopping-Cart)的數(shù)據(jù)
.filter(new FilterFunction() {
@Override
public boolean filter(SourceModel value) throws Exception {
return value.getPage().equals("Shopping-Cart");
}
})
// 為了 shuffle 到一個算子中進(jìn)行合并計算,所以返回結(jié)果 key 固定為 0
.keyBy(new KeySelector() {
@Override
public Long getKey(SourceModel value) throws Exception {
return 0L;
}
})
// 開一分鐘的滾動時間時間窗口
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
// 計算 uv 的處理邏輯
.process(new ProcessWindowFunction() {
@Override
public void process(Long aLong, Context context, Iterable elements,
Collector out) throws Exception {
long windowStart = context.window().getStart();
Set s = new HashSet<>();
elements.forEach(new Consumer() {
@Override
public void accept(SourceModel sourceModel) {
s.add(sourceModel.userId);
}
});
out.collect(
SinkModel
.builder()
.uv(s.size())
.time(windowStart)
.build()
);
}
})
// 輸出
.addSink(xxx);
}
// 輸入數(shù)據(jù) Model
@Data
@Builder
private static class SourceModel {
private long userId;
private String page;
private long time;
}
// 輸出數(shù)據(jù) Model
@Data
@Builder
private static class SinkModel {
private long uv;
private long time;
}
} 解決方案的原理:在上述業(yè)務(wù)場景中,Source 的數(shù)據(jù)是非常多的,我們可以利用大量的 Source 數(shù)據(jù),從而使 Watermark Assigner 能夠持續(xù)不斷的產(chǎn)生 Watermark 傳輸?shù)较掠巍?/p>
雖然經(jīng)過 Filter 算子之后,到下游窗口算子的數(shù)據(jù)量很少,但是 Watermark 不會被 Filter 算子的過濾,大量的 Watermark 依然能夠正常傳輸?shù)酱翱谒阕?,使?Watermark 對齊,從而保障窗口算子的持續(xù)觸發(fā)和結(jié)果輸出。
解決方案雖好,但是有極低幾率會產(chǎn)生亂序丟數(shù)問題::舉例,Watermark 是在 Source 算子之后產(chǎn)生的,有可能一條 23:50:50 的 購物車頁日志的數(shù)據(jù)在 23:52:00 的 網(wǎng)站主頁面 日志數(shù)據(jù)后到達(dá),那么 Watermark 已經(jīng)升高到 23:51:00 秒了,23:50 分的窗口已經(jīng)被觸發(fā)了,從而這條 23:50:50 的 購物車頁 數(shù)據(jù)就被窗口算子丟棄了。
6.總結(jié)篇
本文主要記錄小伙伴萌在使用 DataStream API 由于將 Watermark Assigner 設(shè)置的太靠后,導(dǎo)致的 Watermark 無法對齊,從而事件時間窗口不觸發(fā)的問題。
博主建議的編程習(xí)慣:使用 DataStream API 實現(xiàn) Flink 任務(wù)時,Watermark Assigner 能靠近 Source 節(jié)點(diǎn)就靠近 Source 節(jié)點(diǎn),能前置盡量前置。
網(wǎng)頁標(biāo)題:Flink代碼這么寫,窗口能觸發(fā)才怪!
文章鏈接:http://www.dlmjj.cn/article/ccdjged.html


咨詢
建站咨詢
