日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
關于Kafka消費者的這些參數(shù),你應該要知道?

關于Kafka消費者的這些參數(shù),你應該要知道?

作者:丁威 2021-06-28 11:45:28

開發(fā)

架構

Kafka 個人覺得,要想深入了解Kafka Consumer的核心工作機制可以從它的核心參數(shù)切入,為后續(xù)深入了解它的隊列負載機制、消息拉取模型、消費模型、位點提交等機制打下基礎。

成都創(chuàng)新互聯(lián)公司專注于歙縣企業(yè)網(wǎng)站建設,成都響應式網(wǎng)站建設公司,商城網(wǎng)站制作。歙縣網(wǎng)站建設公司,為歙縣等地區(qū)提供建站服務。全流程按需規(guī)劃網(wǎng)站,專業(yè)設計,全程項目跟蹤,成都創(chuàng)新互聯(lián)公司專業(yè)和態(tài)度為您提供的服務

 [[407801]]

本文將對Kafka Consumer做一個簡單的介紹,是深入研究Kafka Conumer的一扇窗。主要從如下三個方面展開:

  • 核心參數(shù)
  • 核心組件
  • 核心API

1、Kafka Consumer核心參數(shù)覽

個人覺得,要想深入了解Kafka Consumer的核心工作機制可以從它的核心參數(shù)切入,為后續(xù)深入了解它的隊列負載機制、消息拉取模型、消費模型、位點提交等機制打下基礎。

kafka Consumer的核心屬性定義在ConsumerConfig中。

1.1 基礎功能參數(shù)

  • group.id

消費組名稱。

  • client.id

客戶端標識id,默認為consumer-序號,在實踐中建議包含客戶端IP,在一個消費組中不能重復。

  • bootstrap.servers

broker服務端地址列表。

  • client.dns.lookup

客戶端尋找bootstrap地址的方式,支持如下兩種方式:

  • resolve_canonical_bootstrap_servers_only

這種方式,會依據(jù)bootstrap.servers提供的主機名(hostname),根據(jù)主機上的名稱服務返回其IP地址的數(shù)組(InetAddress.getAllByName),然后依次獲取inetAddress.getCanonicalHostName(),再建立tcp連接。

一個主機可配置多個網(wǎng)卡,如果啟用該功能,應該可以有效利用多網(wǎng)卡的優(yōu)勢,降低Broker的網(wǎng)絡端負載壓力。

  • use_all_dns_ips

這種方式會直接使用bootstrap.servers中提供的hostname、port創(chuàng)建tcp連接,默認選項。

  • enable.auto.commit

是否開啟自動位點提交,默認為true。

  • auto.commit.interval.ms

如果開啟自動位點提交,位點的提交頻率,默認為5s。

  • partition.assignment.strategy

消費端隊列負載算法,默認為按區(qū)間平均分配(RangeAssignor),可選值:輪詢(RoundRobinAssignor)

  • auto.offset.reset

重置位點策略,但kafka提交位點時,對應的消息已被刪除時采取的恢復策略,默認為latest,可選:earliest、none(會拋出異常)。

  • key.deserializer

使用的key序列化類

  • value.deserializer

消息體序列化類

  • interceptor.classes

消費端攔截器,可以有多個。

  • check.crcs

在消費端時是否需要校驗CRC,默認為true。

1.2 網(wǎng)絡相關參數(shù)

  • send.buffer.bytes

網(wǎng)絡通道(TCP)的發(fā)送緩存區(qū)大小,默認為128K。

  • receive.buffer.bytes

網(wǎng)絡通道(TCP)的接收緩存區(qū)大小,默認為32K。

  • reconnect.backoff.ms

重新建立鏈接的等待時長,默認為50ms,屬于底層網(wǎng)絡參數(shù),基本無需關注。

  • reconnect.backoff.max.ms

重新建立鏈接的最大等待時長,默認為1s,連續(xù)兩次對同一個連接建立重連,等待時間會在reconnect.backoff.ms的初始值上成指數(shù)級遞增,但超過max后,將不再指數(shù)級遞增。

  • retry.backoff.ms

重試間隔時間,默認為100ms。

  • connections.max.idle.ms

連接的最大空閑時間,默認為9s。

  • request.timeout.ms

請求的超時時間,與Broker端的網(wǎng)絡通訊的請求超時時間。

1.3 核心工作參數(shù)

  • max.poll.records

每一次poll方法調用拉取的最大消息條數(shù),默認為500。

  • max.poll.interval.ms

兩次poll方法調用的最大間隔時間,單位毫秒,默認為5分鐘。如果消費端在該間隔內沒有發(fā)起poll操作,該消費者將被剔除,觸發(fā)重平衡,將該消費者分配的隊列分配給其他消費者。

  • session.timeout.ms

消費者與broker的心跳超時時間,默認10s,broker在指定時間內沒有收到心跳請求,broker端將會將該消費者移出,并觸發(fā)重平衡。

  • heartbeat.interval.ms

心跳間隔時間,消費者會以該頻率向broker發(fā)送心跳,默認為3s,主要是確保session不會失效。

  • fetch.min.bytes

一次拉取消息最小返回的字節(jié)數(shù)量,默認為1字節(jié)。

  • fetch.max.bytes

一次拉取消息最大返回的字節(jié)數(shù)量,默認為1M,如果一個分區(qū)的第一批消息大小大于該值也會返回。

  • max.partition.fetch.bytes

一次拉取每一個分區(qū)最大拉取字節(jié)數(shù),默認為1M。

  • fetch.max.wait.ms

fetch等待拉取數(shù)據(jù)符合fetch.min.bytes的最大等待時間。

  • metadata.max.age.ms

元數(shù)據(jù)在客戶端的過期時間,過期后客戶端會向broker重新拉取最新的元數(shù)據(jù),默認為5分鐘。

  • internal.leave.group.on.close

消費者關閉后是否立即離開訂閱組,默認為true,即當客戶端斷開后立即觸發(fā)重平衡。如果設置為false,則不會立即觸發(fā)重平衡,而是要等session過期后才會觸發(fā)。

2、KafkaConsumer核心組件與API

通過KafkaConsumer核心參數(shù),我們基本可以窺探Kafka中的核心要點,接下來再介紹一下KafkaConsumer的核心組件,為后續(xù)深入研究Kafka消費者消費模型打下基礎。

2.1 核心組件

KafkaConsumer由如下幾個核心組件構成:

  • ConsumerNetworkClient

消費端網(wǎng)絡客戶端,服務底層網(wǎng)絡通訊,負責客戶端與服務端的RPC通信。

  • ConsumerCoordinator

消費端協(xié)調器,在Kafka的設計中,每一個消費組在集群中會選舉一個broker節(jié)點成為該消費組的協(xié)調器,負責消費組狀態(tài)的狀態(tài)管理,尤其是消費組重平衡(消費者的加入與退出),該類就是消費者與broker協(xié)調器進行交互。

  • Fetcher

消息拉取。

溫馨提示:本文不打算對每一個組件進行詳細解讀,這里建議大家按照本文第一部分關于各個參數(shù)的含義,然后對照這些參數(shù)最終是傳resume遞給哪些組件,進行一個關聯(lián)思考。

2.2 核心API概述

最后我們再來看一下消費者的核心API。

  • Set< TopicPartition> assignment()

獲取該消費者的隊列分配列表。

  • Set< String> subscription()

獲取該消費者的訂閱信息。

  • void subscribe(Collection< String> topics)

訂閱主題。

  • void subscribe(Collection< String> topics, ConsumerRebalanceListener callback)

訂閱主題,并指定隊列重平衡的監(jiān)聽器。

  • void assign(Collection< TopicPartition> partitions)

取代 subscription,手動指定消費哪些隊列。

  • void unsubscribe()

取消訂閱關系。

  • ConsumerRecords

poll(Duration timeout)

拉取消息,是 KafkaConsumer 的核心方法,將在下文詳細介紹。

  • void commitSync()

同步提交消費進度,為本批次的消費提交,將在后續(xù)文章中詳細介紹。

  • void commitSync(Duration timeout)

同步提交消費進度,可設置超時時間。

  • void commitSync(Map

offsets)

顯示同步提交消費進度, offsets 指明需要提交消費進度的信息。

  • void commitSync(final Map

offsets, final Duration timeout)

顯示同步提交消費進度,帶超時間。

  • void seek(TopicPartition partition, long offset)

重置 consumer#poll 方法下一次拉消息的偏移量。

  • void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)

seek 方法重載方法。

  • void seekToBeginning(Collection< TopicPartition> partitions)

將 poll 方法下一次的拉取偏移量設置為隊列的初始偏移量。

  • void seekToEnd(Collection< TopicPartition> partitions)

將 poll 方法下一次的拉取偏移量設置為隊列的最大偏移量。

  • long position(TopicPartition partition)

獲取將被拉取的偏移量。

  • long position(TopicPartition partition, final Duration timeout)

同上。

  • OffsetAndMetadata committed(TopicPartition partition)

獲取指定分區(qū)已提交的偏移量。

  • OffsetAndMetadata committed(TopicPartition partition, final Duration timeout)

同上。

  • Map metrics()

統(tǒng)計指標。

  • List< PartitionInfo> partitionsFor(String topic)

獲取主題的路由信息。

  • List< PartitionInfo> partitionsFor(String topic, Duration timeout)

同上。

  • Map listTopics()

獲取所有 topic 的路由信息。

  • Map listTopics(Duration timeout)

同上。

  • Set< TopicPartition> paused()

獲取已掛起的分區(qū)信息。

  • void pause(Collection< TopicPartition> partitions)

掛起分區(qū),下一次 poll 方法將不會返回這些分區(qū)的消息。

  • void resume(Collection< TopicPartition> partitions)

恢復掛起的分區(qū)。

  • Map

offsetsForTimes(MaptimestampsToSearch)

根據(jù)時間戳查找最近的一條消息的偏移量。

  • Map

offsetsForTimes(MaptimestampsToSearch, Duration timeout)

同上。

  • Map

beginningOffsets(Collection< TopicPartition> partitions)

查詢指定分區(qū)當前最小的偏移量。

  • Map

beginningOffsets(Collection< TopicPartition> partitions, Duration timeout)

同上。

  • Map

endOffsets(Collection< TopicPartition> partitions)

查詢指定分區(qū)當前最大的偏移量。

  • Map

endOffsets(Collection< TopicPartition> partitions, Duration timeout)

同上。

  • void close()

關閉消費者。

  • void close(Duration timeout)

關閉消費者。

  • void wakeup()

喚醒消費者。

Kafka提供的消費者并不像RocketMQ提供了Push模式自動拉取消息,需要應用程序自動組織這些API進行消息拉取。

值得注意的kafka消費者也支持位點自動提交機制,kafka的消費者(KafkaConsumer)對象是線程不安全的。

基于KafkaConsumer的pause(暫停某些分區(qū)的消費)與resume(恢復某些分區(qū)的消費),可以輕松實現(xiàn)消費端限流機制。

本文主要是對消費者有一個大概的了解,后續(xù)文章將持續(xù)逐一解開消費者的核心運作機制,請持續(xù)關注。

本文轉載自微信公眾號「中間件興趣圈」,可以通過以下二維碼關注。轉載本文請聯(lián)系中間件興趣圈公眾號。

 


當前文章:關于Kafka消費者的這些參數(shù),你應該要知道?
當前路徑:http://www.dlmjj.cn/article/dhegiss.html