日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Kafka大廠高頻面試題:在保證高性能、高吞吐的同時保證高可用性

Kafka大廠高頻面試題:在保證高性能、高吞吐的同時保證高可用性

作者:Android開發(fā)Alvin老師 2021-09-09 08:20:14

開發(fā)

前端

Kafka Kafka的消息傳輸保障機(jī)制非常直觀。當(dāng)producer向broker發(fā)送消息時,一旦這條消息被commit,由于副本機(jī)制(replication)的存在,它就不會丟失。但是如果producer發(fā)送數(shù)據(jù)給broker后,遇到的網(wǎng)絡(luò)問題而造成通信中斷,那producer就無法判斷該條消息是否已經(jīng)提交(commit)。

創(chuàng)新互聯(lián)建站-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價比潞州網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式潞州網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋潞州地區(qū)。費(fèi)用合理售后完善,十載實體公司更值得信賴。

Kafka的消息傳輸保障機(jī)制非常直觀。當(dāng)producer向broker發(fā)送消息時,一旦這條消息被commit,由于副本機(jī)制(replication)的存在,它就不會丟失。但是如果producer發(fā)送數(shù)據(jù)給broker后,遇到的網(wǎng)絡(luò)問題而造成通信中斷,那producer就無法判斷該條消息是否已經(jīng)提交(commit)。雖然Kafka無法確定網(wǎng)絡(luò)故障期間發(fā)生了什么,但是producer可以retry多次,確保消息已經(jīng)正確傳輸?shù)絙roker中,所以目前Kafka實現(xiàn)的是at least once。

一、冪等性

1.場景

所謂冪等性,就是對接口的多次調(diào)用所產(chǎn)生的結(jié)果和調(diào)用一次是一致的。生產(chǎn)者在進(jìn)行重試的時候有可能會重復(fù)寫入消息,而使用Kafka的冪等性功能就可以避免這種情況。

冪等性是有條件的:

只能保證 Producer 在單個會話內(nèi)不丟不重,如果 Producer 出現(xiàn)意外掛掉再重啟是無法保證的(冪等性情況下,是無法獲取之前的狀態(tài)信息,因此是無法做到跨會話級別的不丟不重)。

冪等性不能跨多個 Topic-Partition,只能保證單個 partition 內(nèi)的冪等性,當(dāng)涉及多個Topic-Partition 時,這中間的狀態(tài)并沒有同步。

Producer 使用冪等性的示例非常簡單,與正常情況下 Producer 使用相比變化不大,只需要把Producer 的配置 enable.idempotence 設(shè)置為 true 即可,如下所示: 

  
 
 
 
  1. Properties props = new Properties();  
  2. props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");  
  3. props.put("acks", "all"); // 當(dāng) enable.idempotence 為 true,這里默認(rèn)為 all  
  4. props.put("bootstrap.servers", "localhost:9092");  
  5. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
  6. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
  7.  
  8. KafkaProducer producer = new KafkaProducer(props);  
  9.  
  10. producer.send(new ProducerRecord(topic, "test"); 

二、事務(wù)

1.場景

冪等性并不能跨多個分區(qū)運(yùn)作,而事務(wù)可以彌補(bǔ)這個缺憾,事務(wù)可以保證對多個分區(qū)寫入操作的原子性。操作的原子性是指多個操作要么全部成功,要么全部失敗,不存在部分成功部分失敗的可能。

為了實現(xiàn)事務(wù),網(wǎng)絡(luò)故障必須提供唯一的transactionalId,這個參數(shù)通過客戶端程序來進(jìn)行設(shè)定。

見代碼庫:

com.heima.kafka.chapter7.ProducerTransactionSend

  
 
 
 
  1. properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId); 

2.前期準(zhǔn)備

事務(wù)要求生產(chǎn)者開啟冪等性特性,因此通過將transactional.id參數(shù)設(shè)置為非空從而開啟事務(wù)特性的同時需要將ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG設(shè)置為true(默認(rèn)值為true),如果顯示設(shè)置為false,則會拋出異常。

KafkaProducer提供了5個與事務(wù)相關(guān)的方法,詳細(xì)如下: 

  
 
 
 
  1. //初始化事務(wù),前提是配置了transactionalId  
  2. public void initTransactions()  
  3. //開啟事務(wù)  
  4. public void beginTransaction()  
  5. //為消費(fèi)者提供事務(wù)內(nèi)的位移提交操作  
  6. public void sendOffsetsToTransaction(Map offsets, String consumerGroupId)  
  7. //提交事務(wù)  
  8. public void commitTransaction()  
  9. //終止事務(wù),類似于回滾  
  10. public void abortTransaction() 

3.案例解析

見代碼庫:

  • com.heima.kafka.chapter7.ProducerTransactionSend

消息發(fā)送端 

  
 
 
 
  1. /** 
  2.     * Kafka Producer事務(wù)的使用  
  3.     */  
  4. public class ProducerTransactionSend {  
  5.     public static final String topic = "topic-transaction";  
  6.     public static final String brokerList = "localhost:9092";  
  7.     public static final String transactionId = "transactionId";  
  8.      
  9.     public static void main(String[] args) {  
  10.         Properties properties = new Properties();  
  11.         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  
  12.         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  
  13.         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);  
  14.         properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);  
  15.          
  16.         KafkaProducer producer = new KafkaProducer<> (properties);  
  17.          
  18.         producer.initTransactions();  
  19.         producer.beginTransaction();  
  20.          
  21.         try { 
  22.             //處理業(yè)務(wù)邏輯并創(chuàng)建ProducerRecord  
  23.             ProducerRecord record1 = new ProducerRecord<>(topic, "msg1");  
  24.             producer.send(record1);  
  25.             ProducerRecord record2 = new ProducerRecord<>(topic, "msg2");  
  26.             producer.send(record2);  
  27.             ProducerRecord record3 = new ProducerRecord<>(topic, "msg3");  
  28.             producer.send(record3);  
  29.             //處理一些其它邏輯  
  30.             producer.commitTransaction();  
  31.         } catch (ProducerFencedException e) {  
  32.             producer.abortTransaction();  
  33.         } 
  34.         producer.close();  
  35.     }  

模擬事務(wù)回滾案例 

  
 
 
 
  1. try {  
  2.     //處理業(yè)務(wù)邏輯并創(chuàng)建ProducerRecord  
  3.     ProducerRecord record1 = new ProducerRecord<>(topic, "msg1");  
  4.     producer.send(record1);  
  5.      
  6.     //模擬事務(wù)回滾案例  
  7.     System.out.println(1/0);  
  8.      
  9.     ProducerRecord record2 = new ProducerRecord<>(topic, "msg2");  
  10.     producer.send(record2);  
  11.     ProducerRecord record3 = new ProducerRecord<>(topic, "msg3");  
  12.     producer.send(record3);  
  13.     //處理一些其它邏輯  
  14.     producer.commitTransaction();  
  15. } catch (ProducerFencedException e) {  
  16.     producer.abortTransaction();  

從上面案例中,msg1發(fā)送成功之后,出現(xiàn)了異常事務(wù)進(jìn)行了回滾,則msg1消費(fèi)端也收不到消息。

三、控制器

在Kafka集群中會有一個或者多個broker,其中有一個broker會被選舉為控制器(Kafka Controller),它負(fù)責(zé)管理整個集群中所有分區(qū)和副本的狀態(tài)。當(dāng)某個分區(qū)的leader副本出現(xiàn)故障時,由控制器負(fù)責(zé)為該分區(qū)選舉新的leader副本。當(dāng)檢測到某個分區(qū)的ISR集合發(fā)生變化時,由控制器負(fù)責(zé)通知所有broker更新其元數(shù)據(jù)信息。當(dāng)使用kafka-topics.sh腳本為某個topic增加分區(qū)數(shù)量時,同樣還是由控制器負(fù)責(zé)分區(qū)的重新分配。

Kafka中的控制器選舉的工作依賴于Zookeeper,成功競選為控制器的broker會在Zookeeper中創(chuàng)建/controller這個臨時(EPHEMERAL)節(jié)點(diǎn),此臨時節(jié)點(diǎn)的內(nèi)容參考如下:

1.ZooInspector管理

使用zookeeper圖形化的客戶端工具(ZooInspector)提供的jar來進(jìn)行管理,啟動如下:

  1. 定位到j(luò)ar所在目錄
  2. 運(yùn)行jar文件 java -jar zookeeper-dev-ZooInspector.jar
  3. 連接Zookeeper

  
 
 
 
  1. {"version":1,"brokerid":0,"timestamp":"1529210278988"} 

 

其中version在目前版本中固定為1,brokerid表示稱為控制器的broker的id編號,timestamp表示競選稱為控制器時的時間戳。

在任意時刻,集群中有且僅有一個控制器。每個broker啟動的時候會去嘗試去讀取/controller節(jié)點(diǎn)的brokerid的值,如果讀取到brokerid的值不為-1,則表示已經(jīng)有其它broker節(jié)點(diǎn)成功競選為控制器,所以當(dāng)前broker就會放棄競選;如果Zookeeper中不存在/controller這個節(jié)點(diǎn),或者這個節(jié)點(diǎn)中的數(shù)據(jù)異常,那么就會嘗試去創(chuàng)建/controller這個節(jié)點(diǎn),當(dāng)前broker去創(chuàng)建節(jié)點(diǎn)的時候,也有可能其他broker同時去嘗試創(chuàng)建這個節(jié)點(diǎn),只有創(chuàng)建成功的那個broker才會成為控制器,而創(chuàng)建失敗的broker則表示競選失敗。每個broker都會在內(nèi)存中保存當(dāng)前控制器的brokerid值,這個值可以標(biāo)識為activeControllerId。

Zookeeper中還有一個與控制器有關(guān)的/controller_epoch節(jié)點(diǎn),這個節(jié)點(diǎn)是持久(PERSISTENT)節(jié)點(diǎn),節(jié)點(diǎn)中存放的是一個整型的controller_epoch值。controller_epoch用于記錄控制器發(fā)生變更的次數(shù),即記錄當(dāng)前的控制器是第幾代控制器,我們也可以稱之為“控制器的紀(jì)元”。

controller_epoch的初始值為1,即集群中第一個控制器的紀(jì)元為1,當(dāng)控制器發(fā)生變更時,沒選出一個新的控制器就將該字段值加1。每個和控制器交互的請求都會攜帶上controller_epoch這個字段,如果請求的controller_epoch值小于內(nèi)存中的controller_epoch值,則認(rèn)為這個請求是向已經(jīng)過期的控制器所發(fā)送的請求,那么這個請求會被認(rèn)定為無效的請求。如果請求的controller_epoch值大于內(nèi)存中的controller_epoch值,那么則說明已經(jīng)有新的控制器當(dāng)選了。由此可見,Kafka通過controller_epoch來保證控制器的唯一性,進(jìn)而保證相關(guān)操作的一致性。

具備控制器身份的broker需要比其他普通的broker多一份職責(zé),具體細(xì)節(jié)如下:

  1. 監(jiān)聽partition相關(guān)的變化。
  2. 監(jiān)聽topic相關(guān)的變化。
  3. 監(jiān)聽broker相關(guān)的變化。
  4. 從Zookeeper中讀取獲取當(dāng)前所有與topic、partition以及broker有關(guān)的信息并進(jìn)行相應(yīng)的管理。

四、可靠性保證

  1. 可靠性保證:確保系統(tǒng)在各種不同的環(huán)境下能夠發(fā)生一致的行為
  2. Kafka的保證
  3. 保證分區(qū)消息的順序如果使用同一個生產(chǎn)者往同一個分區(qū)寫入消息,而且消息B在消息A之后寫入那么Kafka可以保證消息B的偏移量比消息A的偏移量大,而且消費(fèi)者會先讀取消息A再讀取消息B
  4. 只有當(dāng)消息被寫入分區(qū)的所有同步副本時(文件系統(tǒng)緩存),它才被認(rèn)為是已提交
  5. 生產(chǎn)者可以選擇接收不同類型的確認(rèn),控制參數(shù) acks
  6. 只要還有一個副本是活躍的,那么已提交的消息就不會丟失
  7. 消費(fèi)者只能讀取已經(jīng)提交的消息

1. 失效副本

怎么樣判定一個分區(qū)是否有副本是處于同步失效狀態(tài)的呢?從Kafka 0.9.x版本開始通過唯一的一個參數(shù)replica.lag.time.max.ms(默認(rèn)大小為10,000)來控制,當(dāng)ISR中的一個follower副本滯后leader副本的時間超過參數(shù)replica.lag.time.max.ms指定的值時即判定為副本失效,需要將此follower副本剔出除ISR之外。具體實現(xiàn)原理很簡單,當(dāng)follower副本將leader副本的LEO(Log End Offset,每個分區(qū)最后一條消息的位置)之前的日志全部同步時,則認(rèn)為該follower副本已經(jīng)追趕上leader副本,此時更新該副本的lastCaughtUpTimeMs標(biāo)識。Kafka的副本管理器(ReplicaManager)啟動時會啟動一個副本過期檢測的定時任務(wù),而這個定時任務(wù)會定時檢查當(dāng)前時間與副本的lastCaughtUpTimeMs差值是否大于參數(shù)replica.lag.time.max.ms指定的值。千萬不要錯誤地認(rèn)為follower副本只要拉取leader副本的數(shù)據(jù)就會更新lastCaughtUpTimeMs,試想當(dāng)leader副本的消息流入速度大于follower副本的拉取速度時,follower副本一直不斷的拉取leader副本的消息也不能與leader副本同步,如果還將此follower副本置于ISR中,那么當(dāng)leader副本失效,而選取此follower副本為新的leader副本,那么就會有嚴(yán)重的消息丟失。

2.副本復(fù)制

Kafka 中的每個主題分區(qū)都被復(fù)制了 n 次,其中的 n 是主題的復(fù)制因子(replication factor)。這允許Kafka 在集群服務(wù)器發(fā)生故障時自動切換到這些副本,以便在出現(xiàn)故障時消息仍然可用。Kafka 的復(fù)制是以分區(qū)為粒度的,分區(qū)的預(yù)寫日志被復(fù)制到 n 個服務(wù)器。 在 n 個副本中,一個副本作為 leader,其他副本成為 followers。顧名思義,producer 只能往 leader 分區(qū)上寫數(shù)據(jù)(讀也只能從 leader 分區(qū)上進(jìn)行),followers 只按順序從 leader 上復(fù)制日志。

一個副本可以不同步Leader有如下幾個原因 慢副本:在一定周期時間內(nèi)follower不能追趕上leader。最常見的原因之一是I / O瓶頸導(dǎo)致follower追加復(fù)制消息速度慢于從leader拉取速度。 卡住副本:在一定周期時間內(nèi)follower停止從leader拉取請求。follower replica卡住了是由于GC暫停或follower失效或死亡。

新啟動副本:當(dāng)用戶給主題增加副本因子時,新的follower不在同步副本列表中,直到他們完全趕上了leader日志。

如何確定副本是滯后的:

  
 
 
 
  1. replica.lag.max.messages=4 

 

在服務(wù)端現(xiàn)在只有一個參數(shù)需要配置replica.lag.time.max.ms。這個參數(shù)解釋replicas響應(yīng)partition leader的最長等待時間。檢測卡住或失敗副本的探測——如果一個replica失敗導(dǎo)致發(fā)送拉取請求時間間隔超過replica.lag.time.max.ms。Kafka會認(rèn)為此replica已經(jīng)死亡會從同步副本列表從移除。檢測慢副本機(jī)制發(fā)生了變化——如果一個replica開始落后leader超過replica.lag.time.max.ms。Kafka會認(rèn)為太緩慢并且會從同步副本列表中移除。除非replica請求leader時間間隔大于replica.lag.time.max.ms,因此即使leader使流量激增和大批量寫消息。Kafka也不會從同步副本列表從移除該副本。

Leader Epoch引用

數(shù)據(jù)丟失場景

數(shù)據(jù)出現(xiàn)不一致場景

Kafka 0.11.0.0.版本解決方案

造成上述兩個問題的根本原因在于HW值被用于衡量副本備份的成功與否以及在出現(xiàn)failture時作為日志截斷的依據(jù),但HW值得更新是異步延遲的,特別是需要額外的FETCH請求處理流程才能更新,故這中間發(fā)生的任何崩潰都可能導(dǎo)致HW值的過期。鑒于這些原因,Kafka 0.11引入了leader epoch來取代HW值。Leader端多開辟一段內(nèi)存區(qū)域?qū)iT保存leader的epoch信息,這樣即使出現(xiàn)上面的兩個場景也能很好地規(guī)避這些問題。

所謂leader epoch實際上是一對值:(epoch,offset)。epoch表示leader的版本號,從0開始,當(dāng)leader變更過1次時epoch就會+1,而offset則對應(yīng)于該epoch版本的leader寫入第一條消息的位移。因此假設(shè)有兩對值:

  • (0, 0)
  • (1, 120)

則表示第一個leader從位移0開始寫入消息;共寫了120條[0, 119];而第二個leader版本號是1,從位移120處開始寫入消息。

leader broker中會保存這樣的一個緩存,并定期地寫入到一個checkpoint文件中。

避免數(shù)據(jù)丟失:

避免數(shù)據(jù)不一致

六、消息重復(fù)的場景及解決方案

1.生產(chǎn)者端重復(fù)

生產(chǎn)發(fā)送的消息沒有收到正確的broke響應(yīng),導(dǎo)致producer重試。

producer發(fā)出一條消息,broke落盤以后因為網(wǎng)絡(luò)等種種原因發(fā)送端得到一個發(fā)送失敗的響應(yīng)或者網(wǎng)絡(luò)中斷,然后producer收到一個可恢復(fù)的Exception重試消息導(dǎo)致消息重復(fù)。

解決方案:

  • 啟動kafka的冪等性

要啟動kafka的冪等性,無需修改代碼,默認(rèn)為關(guān)閉,需要修改配置文件:enable.idempotence=true 同時要求 ack=all 且 retries>1。

  • ack=0,不重試。

可能會丟消息,適用于吞吐量指標(biāo)重要性高于數(shù)據(jù)丟失,例如:日志收集。

消費(fèi)者端重復(fù)

根本原因

數(shù)據(jù)消費(fèi)完沒有及時提交offset到broker。

解決方案

取消自動自動提交

每次消費(fèi)完或者程序退出時手動提交。這可能也沒法保證一條重復(fù)。

下游做冪等

一般的解決方案是讓下游做冪等或者盡量每消費(fèi)一條消息都記錄offset,對于少數(shù)嚴(yán)格的場景可能需要把offset或唯一ID,例如訂單ID和下游狀態(tài)更新放在同一個數(shù)據(jù)庫里面做事務(wù)來保證精確的一次更新或者在下游數(shù)據(jù)表里面同時記錄消費(fèi)offset,然后更新下游數(shù)據(jù)的時候用消費(fèi)位點(diǎn)做樂觀鎖拒絕掉舊位點(diǎn)的數(shù)據(jù)更新。

七、__consumer_offsets

_consumer_offsets是一個內(nèi)部topic,對用戶而言是透明的,除了它的數(shù)據(jù)文件以及偶爾在日志中出現(xiàn)這兩點(diǎn)之外,用戶一般是感覺不到這個topic的。不過我們的確知道它保存的是Kafka新版本consumer的位移信息。

1.何時創(chuàng)建

一般情況下,當(dāng)集群中第一有消費(fèi)者消費(fèi)消息時會自動創(chuàng)建主題__consumer_offsets,分區(qū)數(shù)可以通過offsets.topic.num.partitions參數(shù)設(shè)定,默認(rèn)值為50,如下:

2.解析分區(qū)

見代碼庫:

  
 
 
 
  1. com.heima.kafka.chapter7.ConsumerOffsetsAnalysis 

獲取所有分區(qū):

總結(jié)

本章主要講解了Kafka相關(guān)穩(wěn)定性的操作,包括冪等性、事務(wù)的處理,同時對可靠性保證與一致性保證做了講解,講解了消息重復(fù)以及解決方案。

 


分享名稱:Kafka大廠高頻面試題:在保證高性能、高吞吐的同時保證高可用性
網(wǎng)頁路徑:http://www.dlmjj.cn/article/dpeppgc.html