日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Flink代碼這么寫,窗口能觸發(fā)才怪!

1.序篇-先說結(jié)論

博主希望你在看完本文后一定要養(yǎng)成這個編程習(xí)慣:使用 DataStream API 實現(xiàn) Flink 任務(wù)時,Watermark Assigner 能靠近 Source 節(jié)點(diǎn)就靠近 Source 節(jié)點(diǎn),盡量前置。

在鶴山等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都做網(wǎng)站、網(wǎng)站設(shè)計 網(wǎng)站設(shè)計制作按需開發(fā),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站設(shè)計,營銷型網(wǎng)站,外貿(mào)網(wǎng)站制作,鶴山網(wǎng)站建設(shè)費(fèi)用合理。

要想問為啥,接著往下看!!!

我從以下幾個章節(jié)說明上述的問題以及為什么這樣建議,希望能拋磚引玉,帶給大家一些啟發(fā)。

  • ? 踩坑場景篇-這個坑是啥樣的
  • ? 問題排查篇-坑的排查過程
  • ? 問題原理解析篇-導(dǎo)致問題的機(jī)制是什么
  • ? 避坑篇-如何避免這種問題
  • ? 總結(jié)篇

2.踩坑場景篇-這個坑是啥樣的

2.1.需求場景

首先介紹一下這個坑對應(yīng)的一個需求場景以及第一版本的實現(xiàn)代碼。

需求:在電商平臺中,需要根據(jù)網(wǎng)頁在線用戶的心跳日志(每 30s 上報一次用戶心跳日志)計算當(dāng)前這一分鐘在購物車頁面(Shopping-Cart)停留的在線人數(shù)。

數(shù)據(jù)源:每 30s 上報一次的用戶心跳日志(user_id、page、time 三個字段分別對應(yīng) 用戶 id、用戶所在頁面、日志上報時間)

數(shù)據(jù)處理:先過濾出購物車按照時間戳對用戶心跳日志進(jìn)行滾動窗口(Tumble)聚合計算

數(shù)據(jù)匯:每分鐘聚合的結(jié)果數(shù)據(jù)(uv、time兩個字段分別對應(yīng) 購物車頁面的當(dāng)前這一分鐘的同時在線人數(shù)、當(dāng)前這一分鐘的時間戳)

Flink DataStream API 具體實現(xiàn)代碼如下:

public class WatermarkTest {

public static void main(String[] args) throws Exception {

// 獲取到 Flink 環(huán)境,博主自己封裝的接口 FlinkEnv
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);

// 設(shè)置并發(fā)度
flinkEnv.env().setParallelism(100);

flinkEnv.env()
// 數(shù)據(jù)源:上報的日志
.addSource(xxx)
// 過濾出 購物車頁面(Shopping-Cart)的數(shù)據(jù)
.filter(new FilterFunction() {
@Override
public boolean filter(SourceModel value) throws Exception {
return value.getPage().equals("Shopping-Cart");
}
})
// 分配 Watermark
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
@Override
public long extractTimestamp(SourceModel element) {
return element.getTime();
}
})
// 為了進(jìn)行合并計算,shuffle 到一個算子中,所以此處返回結(jié)果固定為 0
.keyBy(new KeySelector() {
@Override
public Long getKey(SourceModel value) throws Exception {
return 0L;
}
})
// 開一分鐘的滾動時間時間窗口
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
// 計算 uv 的處理邏輯
.process(new ProcessWindowFunction() {
@Override
public void process(Long aLong, Context context, Iterable elements,
Collector out) throws Exception {

long windowStart = context.window().getStart();

Set s = new HashSet<>();

elements.forEach(new Consumer() {
@Override
public void accept(SourceModel sourceModel) {
s.add(sourceModel.userId);
}
});

out.collect(
SinkModel
.builder()
.uv(s.size())
.time(windowStart)
.build()
);
}
})
// 輸出
.addSink(xxx);
}

// 輸入數(shù)據(jù) Model
@Data
@Builder
private static class SourceModel {
private long userId;
private String page;
private long time;
}

// 輸出數(shù)據(jù) Model
@Data
@Builder
private static class SinkModel {
private long uv;
private long time;
}

}

2.2.問題場景

當(dāng)我們把這個任務(wù)部署到集群環(huán)境運(yùn)行時,卻發(fā)現(xiàn)一直沒有數(shù)據(jù)產(chǎn)出,但是輸入數(shù)據(jù)(用戶心跳日志)是一直有大量數(shù)據(jù)過來的。

3.問題排查篇-坑的排查過程

通過 Flink web ui 定位了下發(fā)現(xiàn)每個算子的表現(xiàn)如下:

  • ? Source 算子:一直能夠消費(fèi)到數(shù)據(jù),而且從 web ui 的輸入輸出流量看數(shù)據(jù)量非常大
  • ? Filter 算子:Filter 算子有輸入也有輸出,輸入非常大,但是輸出數(shù)據(jù)極少(這里是由于業(yè)務(wù)原因?qū)е碌?,從購物的業(yè)務(wù)上來說只有非常少一部分的人會在購物車頁面停留)
  • ? 滾動窗口算子:有極少的輸入數(shù)據(jù),但是一直沒有輸出,并且從 web ui 查看算子的 Watermark 也是沒有的

從這里開始問題就清晰了。

至少從 Flink web ui 上來看是由于窗口算子沒有 Watermark 導(dǎo)致的窗口數(shù)據(jù)沒有觸發(fā)計算。

這時的第一個猜想就是:窗口算子單并發(fā)上面的 Watermark 沒有對齊導(dǎo)致的!!!

接下看一下這個猜想的整體驗證過程:

  • ? 由于我們的 Watermark Assigner 是寫在 Filter 算子之后的,因此 Watermark 的生成也是基于 Filter 算子之后的數(shù)據(jù)的。所以想要定位是不是由于上述猜想導(dǎo)致的,我們就需要估算 Filter 算子產(chǎn)出的數(shù)據(jù)量來驗證。
  • ? 經(jīng)過驗證,發(fā)現(xiàn) Filter 算子之后產(chǎn)出的數(shù)據(jù),每一分鐘總數(shù)據(jù)量產(chǎn)出到下游算子的不到 60 條。也就是說在我們 100 并發(fā)的任務(wù)上面,每一分鐘最多只有 60 個并發(fā)的 Filter 算子會產(chǎn)出數(shù)據(jù)到下游滾動窗口算子,剩下至少的 40 個并發(fā)的算子沒有發(fā)任何數(shù)據(jù)到下游滾動窗口算子。
  • ? 最終,對于下游的滾動窗口算子來說,就沒法做到 Watermark 對齊!因此窗口無法觸發(fā)。

問題原因找到。

4.問題原理解析篇-導(dǎo)致問題的機(jī)制是什么

想要理解 Watermark 對齊 到底是怎么一回事,我們首先要看一下 Flink 中的 Watermark 傳輸及計算機(jī)制:

  • Watermark 傳輸方式:廣播。這里的廣播是指 上游算子的一個并發(fā) 會往 能夠連接到的下游算子的所有并發(fā) 廣播,這與上下游算子并發(fā)之間的 Shuffle 機(jī)制有關(guān)。這里的廣播不是說 Flink 提供的 BroadCast 編程 API!!!

舉例:如果一個任務(wù) 100 并發(fā),上下游算子之間 Shuffle 策略是 Forward,那么上游算子的一個并發(fā)的 Watermark 會只往下游算子的連接到的那一個并發(fā)發(fā)送 Watermark;如果策略是 Hash\Rebalance,則上游算子的一個并發(fā)的 Watermark 會往下游算子的所有并發(fā)上發(fā)送 Watermark。

  • Watermark 計算方式:下游算子的一個并發(fā)接受到上游算子并發(fā)的 Watermark 之后下游算子當(dāng)前并發(fā)的 Watermark 計算方式(這里的上下游是指有 Channel 連接的),計算公式:

下游算子并發(fā) Watermark = min(上游算子并發(fā) 1 發(fā)送的 Watermark,上游算子并發(fā) 2 發(fā)送的 Watermark......)

下游算子并發(fā) Watermark = 所有上游算子并發(fā)發(fā)到下游算子 Watermark 的最小值。

  • Watermark 對齊:下游算子并發(fā)的 Watermark 依賴上游算子并發(fā)的 Watermark 差異很大時,這就是 Watermark 沒有對齊,舉例:上有算子一個并發(fā)傳輸?shù)?Watermark 是 23:59 分,另一個并發(fā)傳輸?shù)?Watermark 是 23:00 分,中間查了 59 分鐘,這種情況一般都是異常情況,所以叫做沒有對齊。反之如果 Watermark 差異很小,則叫做Watermark 對齊。

再來一張圖看看 Watermark 的傳輸過程,加深理解:

Watermark 傳播

回到上述案例中,一分鐘上游算子只有 60 個并發(fā)有數(shù)據(jù),發(fā)送了 Watermark 到下游窗口算子,其余 40 個毛都沒有。

所以下游窗口算子的 Watermark 就沒有,因此窗口也就不觸發(fā)了。

5.避坑篇-如何避免這種問題

在上述場景中,其實問題的根本原因就是數(shù)據(jù)經(jīng)過(購物車頁)條件過濾之后,數(shù)據(jù)量變得非常少了。

Watermark Assigner 從極少量的數(shù)據(jù)中去生成極少量 Watermark,有 40 個并發(fā)都沒有 Watermark 生成,下游算子就出現(xiàn)了Watermark 對不齊 的情景。

那么解決方案也很簡單,就是多生成一些 Watermark,確保:

雖然 Filter 之后的數(shù)據(jù)很少,F(xiàn)ilter 算子處理過后,每個并發(fā)上面都有足夠的 Watermark 來傳遞到下游窗口算子,來持續(xù)的觸發(fā)窗口的計算和結(jié)果產(chǎn)出。

具體解決方案:將 Watermark Assigner 重寫到 Source 算子之后,F(xiàn)ilter 算子之前。代碼如下:

public class WatermarkTest {

public static void main(String[] args) throws Exception {

// 獲取到 Flink 環(huán)境,博主自己封裝的接口 FlinkEnv
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);

flinkEnv.env().setParallelism(100);

flinkEnv.env()
// 數(shù)據(jù)源
.addSource(xxx)
// 分配 Watermark,移到 Filter 之前
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
@Override
public long extractTimestamp(SourceModel element) {
return element.getTime();
}
})
// 過濾出 購物車頁面(Shopping-Cart)的數(shù)據(jù)
.filter(new FilterFunction() {
@Override
public boolean filter(SourceModel value) throws Exception {
return value.getPage().equals("Shopping-Cart");
}
})
// 為了 shuffle 到一個算子中進(jìn)行合并計算,所以返回結(jié)果 key 固定為 0
.keyBy(new KeySelector() {
@Override
public Long getKey(SourceModel value) throws Exception {
return 0L;
}
})
// 開一分鐘的滾動時間時間窗口
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
// 計算 uv 的處理邏輯
.process(new ProcessWindowFunction() {
@Override
public void process(Long aLong, Context context, Iterable elements,
Collector out) throws Exception {

long windowStart = context.window().getStart();

Set s = new HashSet<>();

elements.forEach(new Consumer() {
@Override
public void accept(SourceModel sourceModel) {
s.add(sourceModel.userId);
}
});

out.collect(
SinkModel
.builder()
.uv(s.size())
.time(windowStart)
.build()
);
}
})
// 輸出
.addSink(xxx);
}

// 輸入數(shù)據(jù) Model
@Data
@Builder
private static class SourceModel {
private long userId;
private String page;
private long time;
}

// 輸出數(shù)據(jù) Model
@Data
@Builder
private static class SinkModel {
private long uv;
private long time;
}

}

解決方案的原理:在上述業(yè)務(wù)場景中,Source 的數(shù)據(jù)是非常多的,我們可以利用大量的 Source 數(shù)據(jù),從而使 Watermark Assigner 能夠持續(xù)不斷的產(chǎn)生 Watermark 傳輸?shù)较掠巍?/p>

雖然經(jīng)過 Filter 算子之后,到下游窗口算子的數(shù)據(jù)量很少,但是 Watermark 不會被 Filter 算子的過濾,大量的 Watermark 依然能夠正常傳輸?shù)酱翱谒阕?,使?Watermark 對齊,從而保障窗口算子的持續(xù)觸發(fā)和結(jié)果輸出。

解決方案雖好,但是有極低幾率會產(chǎn)生亂序丟數(shù)問題::舉例,Watermark 是在 Source 算子之后產(chǎn)生的,有可能一條 23:50:50 的 購物車頁日志的數(shù)據(jù)在 23:52:00 的 網(wǎng)站主頁面 日志數(shù)據(jù)后到達(dá),那么 Watermark 已經(jīng)升高到 23:51:00 秒了,23:50 分的窗口已經(jīng)被觸發(fā)了,從而這條 23:50:50 的 購物車頁 數(shù)據(jù)就被窗口算子丟棄了。

6.總結(jié)篇

本文主要記錄小伙伴萌在使用 DataStream API 由于將 Watermark Assigner 設(shè)置的太靠后,導(dǎo)致的 Watermark 無法對齊,從而事件時間窗口不觸發(fā)的問題。

博主建議的編程習(xí)慣:使用 DataStream API 實現(xiàn) Flink 任務(wù)時,Watermark Assigner 能靠近 Source 節(jié)點(diǎn)就靠近 Source 節(jié)點(diǎn),能前置盡量前置。


網(wǎng)頁標(biāo)題:Flink代碼這么寫,窗口能觸發(fā)才怪!
文章鏈接:http://www.dlmjj.cn/article/ccdjged.html