新聞中心
玩了分布式這么久,你不會(huì)連Kafka都不清楚吧
作者:cxuan 2019-10-29 09:41:54
開(kāi)源
分布式
Kafka Kafka 現(xiàn)在在企業(yè)和互聯(lián)網(wǎng)項(xiàng)目中的應(yīng)用越來(lái)越多了,本篇文章就從 Kafka 的基礎(chǔ)開(kāi)始帶你一展 Kafka 的宏圖。

公司主營(yíng)業(yè)務(wù):做網(wǎng)站、成都網(wǎng)站建設(shè)、移動(dòng)網(wǎng)站開(kāi)發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競(jìng)爭(zhēng)能力。成都創(chuàng)新互聯(lián)是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開(kāi)放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對(duì)我們的高要求,感謝他們從不同領(lǐng)域給我們帶來(lái)的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶帶來(lái)驚喜。成都創(chuàng)新互聯(lián)推出天水免費(fèi)做網(wǎng)站回饋大家。
Kafka 現(xiàn)在在企業(yè)和互聯(lián)網(wǎng)項(xiàng)目中的應(yīng)用越來(lái)越多了,本篇文章就從 Kafka 的基礎(chǔ)開(kāi)始帶你一展 Kafka 的宏圖。
圖片來(lái)自 Pexels
什么是 Kafka
Kafka 是一個(gè)分布式流式平臺(tái),它有三個(gè)關(guān)鍵能力:
- 訂閱發(fā)布記錄流,它類似于企業(yè)中的消息隊(duì)列或企業(yè)消息傳遞系統(tǒng)。
- 以容錯(cuò)的方式存儲(chǔ)記錄流。
- 實(shí)時(shí)記錄流。
Kafka 的應(yīng)用:
- 作為消息系統(tǒng)。
- 作為存儲(chǔ)系統(tǒng)。
- 作為流處理器。
Kafka 可以建立流數(shù)據(jù)管道,可靠地在系統(tǒng)或應(yīng)用之間獲取數(shù)據(jù)。建立流式應(yīng)用傳輸和響應(yīng)數(shù)據(jù)。
Kafka 作為消息系統(tǒng)
Kafka 作為消息系統(tǒng),它有三個(gè)基本組件:
- Producer : 發(fā)布消息的客戶端
- Broker:一個(gè)從生產(chǎn)者接受并存儲(chǔ)消息的客戶端
- Consumer : 消費(fèi)者從 Broker 中讀取消息
在大型系統(tǒng)中,會(huì)需要和很多子系統(tǒng)做交互,也需要消息傳遞,在諸如此類系統(tǒng)中,你會(huì)找到源系統(tǒng)(消息發(fā)送方)和目的系統(tǒng)(消息接收方)。
為了在這樣的消息系統(tǒng)中傳輸數(shù)據(jù),你需要有合適的數(shù)據(jù)管道:
這種數(shù)據(jù)的交互看起來(lái)就很混亂,如果我們使用消息傳遞系統(tǒng),那么系統(tǒng)就會(huì)變得更加簡(jiǎn)單和整潔。
Kafka 運(yùn)行在一個(gè)或多個(gè)數(shù)據(jù)中心的服務(wù)器上作為集群運(yùn)行:
- Kafka 集群存儲(chǔ)消息記錄的目錄被稱為 Topics。
- 每一條消息記錄包含三個(gè)要素:鍵(Key)、值(Value)、時(shí)間戳(Timestamp)。
核心 API
Kafka 有四個(gè)核心 API,它們分別是:
- Producer API,它允許應(yīng)用程序向一個(gè)或多個(gè) Topics 上發(fā)送消息記錄。
- Consumer API,允許應(yīng)用程序訂閱一個(gè)或多個(gè) Topics 并處理為其生成的記錄流。
- Streams API,它允許應(yīng)用程序作為流處理器,從一個(gè)或多個(gè)主題中消費(fèi)輸入流并為其生成輸出流,有效的將輸入流轉(zhuǎn)換為輸出流。
- Connector API,它允許構(gòu)建和運(yùn)行將 Kafka 主題連接到現(xiàn)有應(yīng)用程序或數(shù)據(jù)系統(tǒng)的可用生產(chǎn)者和消費(fèi)者。例如,關(guān)系數(shù)據(jù)庫(kù)的連接器可能會(huì)捕獲對(duì)表的所有更改。
Kafka 基本概念
Kafka 作為一個(gè)高度可擴(kuò)展可容錯(cuò)的消息系統(tǒng),它有很多基本概念,下面就來(lái)認(rèn)識(shí)一下這些 Kafka 專屬的概念。
Topic
Topic 被稱為主題,在 Kafka 中,使用一個(gè)類別屬性來(lái)劃分消息的所屬類,劃分消息的這個(gè)類稱為 Topic。
Topic 相當(dāng)于消息的分配標(biāo)簽,是一個(gè)邏輯概念。主題好比是數(shù)據(jù)庫(kù)的表,或者文件系統(tǒng)中的文件夾。
Partition
Partition 譯為分區(qū),Topic 中的消息被分割為一個(gè)或多個(gè)的 Partition,它是一個(gè)物理概念,對(duì)應(yīng)到系統(tǒng)上就是一個(gè)或若干個(gè)目錄,一個(gè)分區(qū)就是一個(gè)提交日志。消息以追加的形式寫(xiě)入分區(qū),先后以順序的方式讀取。
注意:由于一個(gè)主題包含無(wú)數(shù)個(gè)分區(qū),因此無(wú)法保證在整個(gè) Topic 中有序,但是單個(gè) Partition 分區(qū)可以保證有序。消息被迫加寫(xiě)入每個(gè)分區(qū)的尾部。Kafka 通過(guò)分區(qū)來(lái)實(shí)現(xiàn)數(shù)據(jù)冗余和伸縮性。
分區(qū)可以分布在不同的服務(wù)器上,也就是說(shuō),一個(gè)主題可以跨越多個(gè)服務(wù)器,以此來(lái)提供比單個(gè)服務(wù)器更強(qiáng)大的性能。
Segment
Segment 被譯為段,將 Partition 進(jìn)一步細(xì)分為若干個(gè) Segment,每個(gè) Segment 文件的大小相等。
Broker
Kafka 集群包含一個(gè)或多個(gè)服務(wù)器,每個(gè) Kafka 中服務(wù)器被稱為 Broker。Broker 接收來(lái)自生產(chǎn)者的消息,為消息設(shè)置偏移量,并提交消息到磁盤(pán)保存。
Broker 為消費(fèi)者提供服務(wù),對(duì)讀取分區(qū)的請(qǐng)求作出響應(yīng),返回已經(jīng)提交到磁盤(pán)上的消息。
Broker 是集群的組成部分,每個(gè)集群中都會(huì)有一個(gè) Broker 同時(shí)充當(dāng)了集群控制器(Leader)的角色,它是由集群中的活躍成員選舉出來(lái)的。
每個(gè)集群中的成員都有可能充當(dāng) Leader,Leader 負(fù)責(zé)管理工作,包括將分區(qū)分配給 Broker 和監(jiān)控 Broker。
集群中,一個(gè)分區(qū)從屬于一個(gè) Leader,但是一個(gè)分區(qū)可以分配給多個(gè) Broker(非 Leader),這時(shí)候會(huì)發(fā)生分區(qū)復(fù)制。
這種復(fù)制的機(jī)制為分區(qū)提供了消息冗余,如果一個(gè) Broker 失效,那么其他活躍用戶會(huì)重新選舉一個(gè) Leader 接管。
Producer
生產(chǎn)者,即消息的發(fā)布者,其會(huì)將某 Topic 的消息發(fā)布到相應(yīng)的 Partition 中。
生產(chǎn)者在默認(rèn)情況下把消息均衡地分布到主題的所有分區(qū)上,而并不關(guān)心特定消息會(huì)被寫(xiě)到哪個(gè)分區(qū)。不過(guò),在某些情況下,生產(chǎn)者會(huì)把消息直接寫(xiě)到指定的分區(qū)。
Consumer
消費(fèi)者,即消息的使用者,一個(gè)消費(fèi)者可以消費(fèi)多個(gè) Topic 的消息,對(duì)于某一個(gè) Topic 的消息,其只會(huì)消費(fèi)同一個(gè) Partition 中的消息。
在了解完 Kafka 的基本概念之后,我們通過(guò)搭建 Kafka 集群來(lái)進(jìn)一步深刻認(rèn)識(shí)一下 Kafka。
確保安裝環(huán)境
安裝 Java 環(huán)境
在安裝 Kafka 之前,先確保 Linux 環(huán)境上是否有 Java 環(huán)境,使用 java -version 命令查看 Java 版本,推薦使用 Jdk 1.8 。
如果沒(méi)有安裝 Java 環(huán)境的話,可以按照這篇文章進(jìn)行安裝:
- https://www.cnblogs.com/zs-notes/p/8535275.html
安裝 Zookeeper 環(huán)境
Kafka 的底層使用 Zookeeper 儲(chǔ)存元數(shù)據(jù),確保一致性,所以安裝 Kafka 前需要先安裝 Zookeeper,Kafka 的發(fā)行版自帶了 Zookeeper ,可以直接使用腳本來(lái)啟動(dòng),不過(guò)安裝一個(gè) Zookeeper 也不費(fèi)勁。
Zookeeper 單機(jī)搭建
Zookeeper 單機(jī)搭建比較簡(jiǎn)單,直接從官網(wǎng)下載一個(gè)穩(wěn)定版本的 Zookeeper:
- https://www.apache.org/dyn/closer.cgi/zookeeper/
這里我使用的是 3.4.10,下載完成后,在 Linux 系統(tǒng)中的 /usr/local 目錄下創(chuàng)建 Zookeeper 文件夾。
然后使用 xftp 工具(xftp 和 xshell 工具都可以在官網(wǎng) https://www.netsarang.com/zh/xshell/ 申請(qǐng)免費(fèi)的家庭版)把下載好的 Zookeeper 壓縮包放到 /usr/local/zookeeper 目錄下。
如果下載的是一個(gè) tar.gz 包的話,直接使用 tar -zxvf zookeeper-3.4.10.tar.gz 解壓即可。
如果下載的是 zip 包的話,還要檢查一下 Linux 中是否有 unzip 工具,如果沒(méi)有的話,使用 yum install unzip 安裝 zip 解壓工具,完成后使用 unzip zookeeper-3.4.10.zip 解壓即可。
解壓完成后,cd 到 /usr/local/zookeeper/zookeeper-3.4.10 ,創(chuàng)建一個(gè) data 文件夾,然后進(jìn)入到 conf 文件夾下,使用 mv zoo_sample.cfg zoo.cfg 進(jìn)行重命名操作。
然后使用 vi 打開(kāi) zoo.cfg ,更改一下dataDir=/usr/local/zookeeper/zookeeper-3.4.10/data ,保存。
進(jìn)入 bin 目錄,啟動(dòng)服務(wù)輸入命令 ./zkServer.sh start 輸出下面內(nèi)容表示搭建成功:
關(guān)閉服務(wù)輸入命令,./zkServer.sh stop:
使用 ./zkServer.sh status 可以查看狀態(tài)信息。
Zookeeper 集群搭建
①準(zhǔn)備條件
需要三個(gè)服務(wù)器,這里我使用了 CentOS7 并安裝了三個(gè)虛擬機(jī),并為各自的虛擬機(jī)分配了 1GB 的內(nèi)存,在每個(gè) /usr/local/ 下面新建 Zookeeper 文件夾。
把 Zookeeper 的壓縮包挪過(guò)來(lái),解壓,完成后會(huì)有 zookeeper-3.4.10 文件夾,進(jìn)入到文件夾,新建兩個(gè)文件夾,分別是 data 和 log 文件夾。
注:上一節(jié)單機(jī)搭建中已經(jīng)創(chuàng)建了一個(gè) data 文件夾,就不需要重新創(chuàng)建了,直接新建一個(gè) log 文件夾,對(duì)另外兩個(gè)新增的服務(wù)需要新建這兩個(gè)文件夾。
②設(shè)置集群
新建完成后,需要編輯 conf/zoo.cfg 文件,三個(gè)文件的內(nèi)容如下:?
- tickTime=2000
- initLimit=10
- syncLimit=5
- dataDir=/usr/local/zookeeper/zookeeper-3.4.10/data
- dataLogDir=/usr/local/zookeeper/zookeeper-3.4.10/log
- clientPort=12181
- server.1=192.168.1.7:12888:13888
- server.2=192.168.1.8:12888:13888
- server.3=192.168.1.9:12888:13888
server.1 中的這個(gè) 1 表示的是服務(wù)器的標(biāo)識(shí)也可以是其他數(shù)字,表示這是第幾號(hào)服務(wù)器,這個(gè)標(biāo)識(shí)要和下面我們配置的 myid 的標(biāo)識(shí)一致。
192.168.1.7:12888:13888 為集群中的 ip 地址,第一個(gè)端口表示的是 master 與 slave 之間的通信接口,默認(rèn)是 2888。
第二個(gè)端口是 Leader 選舉的端口,集群剛啟動(dòng)的時(shí)候選舉或者 Leader 掛掉之后進(jìn)行新的選舉的端口,默認(rèn)是 3888。
現(xiàn)在對(duì)上面的配置文件進(jìn)行解釋:
- tickTime:這個(gè)時(shí)間是作為 Zookeeper 服務(wù)器之間或客戶端與服務(wù)器之間維持心跳的時(shí)間間隔,也就是每個(gè) tickTime 時(shí)間就會(huì)發(fā)送一個(gè)心跳。
- initLimit:這個(gè)配置項(xiàng)是用來(lái)配置 Zookeeper 接受客戶端(這里所說(shuō)的客戶端不是用戶連接 Zookeeper 服務(wù)器的客戶端,而是 Zookeeper 服務(wù)器集群中連接到 Leader 的 Follower 服務(wù)器)初始化連接時(shí)最長(zhǎng)能忍受多少個(gè)心跳時(shí)間間隔數(shù)。
當(dāng)已經(jīng)超過(guò) 5 個(gè)心跳的時(shí)間(也就是 tickTime)長(zhǎng)度后 Zookeeper 服務(wù)器還沒(méi)有收到客戶端的返回信息,那么表明這個(gè)客戶端連接失敗??偟臅r(shí)間長(zhǎng)度就是 5*2000=10 秒。
- syncLimit:這個(gè)配置項(xiàng)標(biāo)識(shí) Leader 與 Follower 之間發(fā)送消息,請(qǐng)求和應(yīng)答時(shí)間長(zhǎng)度,最長(zhǎng)不能超過(guò)多少個(gè) tickTime 的時(shí)間長(zhǎng)度,總的時(shí)間長(zhǎng)度就是 5*2000=10 秒。
- dataDir:快照日志的存儲(chǔ)路徑。
- dataLogDir:事務(wù)日志的存儲(chǔ)路徑,如果不配置這個(gè)那么事務(wù)日志會(huì)默認(rèn)存儲(chǔ)到 dataDir 指定的目錄,這樣會(huì)嚴(yán)重影響 ZK 的性能,當(dāng) ZK 吞吐量較大的時(shí)候,產(chǎn)生的事務(wù)日志、快照日志太多。
- clientPort:這個(gè)端口就是客戶端連接 Zookeeper 服務(wù)器的端口,Zookeeper 會(huì)監(jiān)聽(tīng)這個(gè)端口,接受客戶端的訪問(wèn)請(qǐng)求。
③創(chuàng)建 myid 文件
在了解完其配置文件后,現(xiàn)在來(lái)創(chuàng)建每個(gè)集群節(jié)點(diǎn)的 myid ,我們上面說(shuō)過(guò),這個(gè) myid 就是 server.1 的這個(gè) 1 ,類似的,需要為集群中的每個(gè)服務(wù)都指定標(biāo)識(shí),使用 echo 命令進(jìn)行創(chuàng)建:
- # server.1
- echo "1" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
- # server.2
- echo "2" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
- # server.3
- echo "3" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
④啟動(dòng)服務(wù)并測(cè)試
配置完成,為每個(gè) ZK 服務(wù)啟動(dòng)并測(cè)試,我在 Windows 電腦的測(cè)試結(jié)果如下。
啟動(dòng)服務(wù)(每臺(tái)都需要執(zhí)行):
- cd /usr/local/zookeeper/zookeeper-3.4.10/bin
- ./zkServer.sh start
使用 ./zkServer.sh status 命令檢查服務(wù)狀態(tài):
192.168.1.7 --- follower:
192.168.1.8 --- leader:
192.168.1.9 --- follower:
ZK 集群一般只有一個(gè) Leader,多個(gè) Follower,主一般是相應(yīng)客戶端的讀寫(xiě)請(qǐng)求,而從主同步數(shù)據(jù),當(dāng)主掛掉之后就會(huì)從 Follower 里投票選舉一個(gè) Leader 出來(lái)。
Kafka 集群搭建
準(zhǔn)備條件
準(zhǔn)備條件如下:
- 搭建好的 Zookeeper 集群
- Kafka 壓縮包
- https://www.apache.org/dyn/closer.cgi?path=/kafka/2.3.0/kafka_2.12-2.3.0.tgz
在 /usr/local 下新建 Kafka 文件夾,然后把下載完成的 tar.gz 包移到 /usr/local/kafka 目錄下,使用 tar -zxvf 壓縮包進(jìn)行解壓。
解壓完成后,進(jìn)入到 kafka_2.12-2.3.0 目錄下,新建 log 文件夾,進(jìn)入到 config 目錄下。
我們可以看到有很多 properties 配置文件,這里主要關(guān)注 server.properties 這個(gè)文件即可。
Kafka 啟動(dòng)方式有兩種:
- 一種是使用 Kafka 自帶的 Zookeeper 配置文件來(lái)啟動(dòng)(可以按照官網(wǎng)來(lái)進(jìn)行啟動(dòng),并使用單個(gè)服務(wù)多個(gè)節(jié)點(diǎn)來(lái)模擬集群http://kafka.apache.org/quickstart#quickstart_multibroker)。
- 一種是通過(guò)使用獨(dú)立的 ZK 集群來(lái)啟動(dòng),這里推薦使用第二種方式,使用 ZK 集群來(lái)啟動(dòng)。
②修改配置項(xiàng)
需要為每個(gè)服務(wù)都修改一下配置項(xiàng),也就是 server.properties, 需要更新和添加的內(nèi)容有:
- broker.id=0 //初始是0,每個(gè) server 的broker.id 都應(yīng)該設(shè)置為不一樣的,就和 myid 一樣 我的三個(gè)服務(wù)分別設(shè)置的是 1,2,3
- log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log
- #在log.retention.hours=168 下面新增下面三項(xiàng)
- message.max.byte=5242880
- default.replication.factor=2
- replica.fetch.max.bytes=5242880
- #設(shè)置zookeeper的連接端口
- zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181
配置項(xiàng)的含義:
- broker.id=0 #當(dāng)前機(jī)器在集群中的唯一標(biāo)識(shí),和zookeeper的myid性質(zhì)一樣
- port=9092 #當(dāng)前kafka對(duì)外提供服務(wù)的端口默認(rèn)是9092
- host.name=192.168.1.7 #這個(gè)參數(shù)默認(rèn)是關(guān)閉的,在0.8.1有個(gè)bug,DNS解析問(wèn)題,失敗率的問(wèn)題。
- num.network.threads=3 #這個(gè)是borker進(jìn)行網(wǎng)絡(luò)處理的線程數(shù)
- num.io.threads=8 #這個(gè)是borker進(jìn)行I/O處理的線程數(shù)
- log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #消息存放的目錄,這個(gè)目錄可以配置為“,”逗號(hào)分割的表達(dá)式,上面的num.io.threads要大于這個(gè)目錄的個(gè)數(shù)這個(gè)目錄,如果配置多個(gè)目錄,新創(chuàng)建的topic他把消息持久化的地方是,當(dāng)前以逗號(hào)分割的目錄中,那個(gè)分區(qū)數(shù)最少就放那一個(gè)
- socket.send.buffer.bytes=102400 #發(fā)送緩沖區(qū)buffer大小,數(shù)據(jù)不是一下子就發(fā)送的,先回存儲(chǔ)到緩沖區(qū)了到達(dá)一定的大小后在發(fā)送,能提高性能
- socket.receive.buffer.bytes=102400 #kafka接收緩沖區(qū)大小,當(dāng)數(shù)據(jù)到達(dá)一定大小后在序列化到磁盤(pán)
- socket.request.max.bytes=104857600 #這個(gè)參數(shù)是向kafka請(qǐng)求消息或者向kafka發(fā)送消息的請(qǐng)請(qǐng)求的最大數(shù),這個(gè)值不能超過(guò)java的堆棧大小
- num.partitions=1 #默認(rèn)的分區(qū)數(shù),一個(gè)topic默認(rèn)1個(gè)分區(qū)數(shù)
- log.retention.hours=168 #默認(rèn)消息的最大持久化時(shí)間,168小時(shí),7天
- message.max.byte=5242880 #消息保存的最大值5M
- default.replication.factor=2 #kafka保存消息的副本數(shù),如果一個(gè)副本失效了,另一個(gè)還可以繼續(xù)提供服務(wù)
- replica.fetch.max.bytes=5242880 #取消息的最大直接數(shù)
- log.segment.bytes=1073741824 #這個(gè)參數(shù)是:因?yàn)閗afka的消息是以追加的形式落地到文件,當(dāng)超過(guò)這個(gè)值的時(shí)候,kafka會(huì)新起一個(gè)文件
- log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時(shí)間(log.retention.hours=168 ),到目錄查看是否有過(guò)期的消息如果有,刪除
- log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能
- zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #設(shè)置zookeeper的連接端口
③啟動(dòng) Kafka 集群并測(cè)試
啟動(dòng)服務(wù),進(jìn)入到 /usr/local/kafka/kafka_2.12-2.3.0/bin 目錄下:
- # 啟動(dòng)后臺(tái)進(jìn)程
- ./kafka-server-start.sh -daemon ../config/server.properties
檢查服務(wù)是否啟動(dòng):
- # 執(zhí)行命令 jps
- 6201 QuorumPeerMain
- 7035 Jps
- 6972 Kafka
Kafka 已經(jīng)啟動(dòng),創(chuàng)建 Topic 來(lái)驗(yàn)證是否創(chuàng)建成功:
- # cd .. 往回退一層 到 /usr/local/kafka/kafka_2.12-2.3.0 目錄下
- bin/kafka-topics.sh --create --zookeeper 192.168.1.7:2181 --replication-factor 2 --partitions 1 --topic cxuan
對(duì)上面的解釋:
- Replication-factor 2:復(fù)制兩份
- Partitions 1:創(chuàng)建1個(gè)分區(qū)
- Topic:創(chuàng)建主題
查看我們的主題是否創(chuàng)建成功:
- bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181
啟動(dòng)一個(gè)服務(wù)就能把集群?jiǎn)?dòng)起來(lái)。
在一臺(tái)機(jī)器上創(chuàng)建一個(gè)發(fā)布者:
- # 創(chuàng)建一個(gè)broker,發(fā)布者
- ./kafka-console-producer.sh --broker-list 192.168.1.7:9092 --topic cxuantopic
在一臺(tái)服務(wù)器上創(chuàng)建一個(gè)訂閱者:
- # 創(chuàng)建一個(gè)consumer, 消費(fèi)者
- bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning
注意:這里使用 --zookeeper 的話可能出現(xiàn) zookeeper is not a recognized option 的錯(cuò)誤,這是因?yàn)?Kafka 版本太高,需要使用 --bootstrap-server 指令。
測(cè)試結(jié)果如下:
發(fā)布:
消費(fèi):
④其他命令
顯示 topic:
- bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181
- # 顯示
- cxuantopic
查看 topic 狀態(tài):
- bin/kafka-topics.sh --describe --zookeeper 192.168.1.7:2181 --topic cxuantopic
- # 下面是顯示的詳細(xì)信息
- Topic:cxuantopic PartitionCount:1 ReplicationFactor:2 Configs:
- Topic: cxuantopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
- # 分區(qū)為為1 復(fù)制因子為2 主題 cxuantopic 的分區(qū)為0
- # Replicas: 0,1 復(fù)制的為1,2
Leader 負(fù)責(zé)給定分區(qū)的所有讀取和寫(xiě)入的節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)都會(huì)通過(guò)隨機(jī)選擇成為 Leader。
Replicas 是為該分區(qū)復(fù)制日志的節(jié)點(diǎn)列表,無(wú)論它們是 Leader 還是當(dāng)前處于活動(dòng)狀態(tài)。
Isr 是同步副本的集合。它是副本列表的子集,當(dāng)前仍處于活動(dòng)狀態(tài)并追隨Leader。至此,Kafka 集群搭建完畢。
⑤驗(yàn)證多節(jié)點(diǎn)接收數(shù)據(jù)
剛剛我們都使用的是相同的 ip 服務(wù),下面使用其他集群中的節(jié)點(diǎn),驗(yàn)證是否能夠接受到服務(wù)。
在另外兩個(gè)節(jié)點(diǎn)上使用:
- bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning
然后再使用 Broker 進(jìn)行消息發(fā)送,經(jīng)測(cè)試三個(gè)節(jié)點(diǎn)都可以接受到消息。
配置詳解
在搭建 Kafka 的時(shí)候我們簡(jiǎn)單介紹了一下 server.properties 中配置的含義,現(xiàn)在我們來(lái)詳細(xì)介紹一下參數(shù)的配置和概念。
常規(guī)配置
這些參數(shù)是 Kafka 中最基本的配置:
broker.id:每個(gè) Broker 都需要有一個(gè)標(biāo)識(shí)符,使用 broker.id 來(lái)表示。它的默認(rèn)值是 0,它可以被設(shè)置成其他任意整數(shù),在集群中需要保證每個(gè)節(jié)點(diǎn)的 broker.id 都是唯一的。
port:如果使用配置樣本來(lái)啟動(dòng) Kafka ,它會(huì)監(jiān)聽(tīng) 9092 端口,修改 port 配置參數(shù)可以把它設(shè)置成其他任意可用的端口。
zookeeper.connect:用于保存 Broker 元數(shù)據(jù)的地址是通過(guò) zookeeper.connect 來(lái)指定。
localhost:2181:表示運(yùn)行在本地 2181 端口。該配置參數(shù)是用逗號(hào)分隔的一組 hostname:port/path 列表。
每一部分含義如下:
- hostname 是 Zookeeper 服務(wù)器的服務(wù)名或 IP 地址。
- port 是 Zookeeper 連接的端口。
- /path 是可選的 Zookeeper 路徑,作為 Kafka 集群的 chroot 環(huán)境。如果不指定,默認(rèn)使用跟路徑。
log.dirs:Kafka 把消息都保存在磁盤(pán)上,存放這些日志片段的目錄都是通過(guò) log.dirs 來(lái)指定的。它是一組用逗號(hào)分隔的本地文件系統(tǒng)路徑。
如果指定了多個(gè)路徑,那么 Broker 會(huì)根據(jù) "最少使用" 原則,把同一分區(qū)的日志片段保存到同一路徑下。
要注意,Broker 會(huì)向擁有最少數(shù)目分區(qū)的路徑新增分區(qū),而不是向擁有最小磁盤(pán)空間的路徑新增分區(qū)。
num.recovery.threads.per.data.dir:對(duì)于如下三種情況,Kafka 會(huì)使用可配置的線程池來(lái)處理日志片段:
- 服務(wù)器正常啟動(dòng),用于打開(kāi)每個(gè)分區(qū)的日志片段。
- 服務(wù)器崩潰后啟動(dòng),用于檢查和截?cái)嗝總€(gè)分區(qū)的日志片段。
- 服務(wù)器正常關(guān)閉,用于關(guān)閉日志片段。
默認(rèn)情況下,每個(gè)日志目錄只使用一個(gè)線程。因?yàn)檫@些線程只是在服務(wù)器啟動(dòng)和關(guān)閉時(shí)會(huì)用到,所以完全可以設(shè)置大量的線程來(lái)達(dá)到并行操作的目的。
特別是對(duì)于包含大量分區(qū)的服務(wù)器來(lái)說(shuō),一旦發(fā)生崩潰,在進(jìn)行恢復(fù)時(shí)使用井行操作可能會(huì)省下數(shù)小時(shí)的時(shí)間。
設(shè)置此參數(shù)時(shí)需要注意,所配置的數(shù)字對(duì)應(yīng)的是 log.dirs 指定的單個(gè)日志目錄。
也就是說(shuō),如果 num.recovery.threads.per.data.dir 被設(shè)為 8,并且 log.dir 指定了 3 個(gè)路徑,那么總共需要 24 個(gè)線程。
auto.create.topics.enable:默認(rèn)情況下,Kafka 會(huì)在如下 3 種情況下創(chuàng)建主題:
- 當(dāng)一個(gè)生產(chǎn)者開(kāi)始往主題寫(xiě)入消息時(shí)
- 當(dāng)一個(gè)消費(fèi)者開(kāi)始從主題讀取消息時(shí)
- 當(dāng)任意一個(gè)客戶向主題發(fā)送元數(shù)據(jù)請(qǐng)求時(shí)
delete.topic.enable:如果你想要?jiǎng)h除一個(gè)主題,你可以使用主題管理工具。
默認(rèn)情況下,是不允許刪除主題的,delete.topic.enable 的默認(rèn)值是 false 因此你不能隨意刪除主題。
這是對(duì)生產(chǎn)環(huán)境的合理性保護(hù),但是在開(kāi)發(fā)環(huán)境和測(cè)試環(huán)境,是可以允許你刪除主題的,所以,如果你想要?jiǎng)h除主題,需要把 delete.topic.enable 設(shè)為 true。
主題默認(rèn)配置
Kafka 為新創(chuàng)建的主題提供了很多默認(rèn)配置參數(shù),下面就來(lái)一起認(rèn)識(shí)一下這些參數(shù)。
num.partitions:num.partitions 參數(shù)指定了新創(chuàng)建的主題需要包含多少個(gè)分區(qū)。
如果啟用了主題自動(dòng)創(chuàng)建功能(該功能是默認(rèn)啟用的),主題分區(qū)的個(gè)數(shù)就是該參數(shù)指定的值。該參數(shù)的默認(rèn)值是 1。要注意,我們可以增加主題分區(qū)的個(gè)數(shù),但不能減少分區(qū)的個(gè)數(shù)。
default.replication.factor:這個(gè)參數(shù)比較簡(jiǎn)單,它表示 Kafka 保存消息的副本數(shù)。
如果一個(gè)副本失效了,另一個(gè)還可以繼續(xù)提供服務(wù),default.replication.factor 的默認(rèn)值為 1,這個(gè)參數(shù)在你啟用了主題自動(dòng)創(chuàng)建功能后有效。
log.retention.ms:Kafka 通常根據(jù)時(shí)間來(lái)決定數(shù)據(jù)可以保留多久。默認(rèn)使用 log.retention.hours 參數(shù)來(lái)配置時(shí)間,默認(rèn)是 168 個(gè)小時(shí),也就是一周。
除此之外,還有兩個(gè)參數(shù) log.retention.minutes 和 log.retentiion.ms 。這三個(gè)參數(shù)作用是一樣的,都是決定消息多久以后被刪除,推薦使用 log.retention.ms。
log.retention.bytes:另一種保留消息的方式是判斷消息是否過(guò)期。它的值通過(guò)參數(shù) log.retention.bytes 來(lái)指定,作用在每一個(gè)分區(qū)上。
也就是說(shuō),如果有一個(gè)包含 8 個(gè)分區(qū)的主題,并且 log.retention.bytes 被設(shè)置為 1GB,那么這個(gè)主題最多可以保留 8GB 數(shù)據(jù)。
所以,當(dāng)主題的分區(qū)個(gè)數(shù)增加時(shí),整個(gè)主題可以保留的數(shù)據(jù)也隨之增加。
log.segment.bytes:上述的日志都是作用在日志片段上,而不是作用在單個(gè)消息上。
當(dāng)消息到達(dá) Broker 時(shí),它們被追加到分區(qū)的當(dāng)前日志片段上,當(dāng)日志片段大小到達(dá) log.segment.bytes 指定上限(默認(rèn)為 1GB)時(shí),當(dāng)前日志片段就會(huì)被關(guān)閉,一個(gè)新的日志片段被打開(kāi)。
如果一個(gè)日志片段被關(guān)閉,就開(kāi)始等待過(guò)期。這個(gè)參數(shù)的值越小,就越會(huì)頻繁的關(guān)閉和分配新文件,從而降低磁盤(pán)寫(xiě)入的整體效率。
log.segment.ms:上面提到日志片段經(jīng)關(guān)閉后需等待過(guò)期,那么 log.segment.ms 這個(gè)參數(shù)就是指定日志多長(zhǎng)時(shí)間被關(guān)閉的參數(shù)。
log.segment.ms 和 log.retention.bytes 也不存在互斥問(wèn)題。日志片段會(huì)在大小或時(shí)間到達(dá)上限時(shí)被關(guān)閉,就看哪個(gè)條件先得到滿足。
message.max.bytes:Broker 通過(guò)設(shè)置 message.max.bytes 參數(shù)來(lái)限制單個(gè)消息的大小,默認(rèn)是 1000 000, 也就是 1MB。
如果生產(chǎn)者嘗試發(fā)送的消息超過(guò)這個(gè)大小,不僅消息不會(huì)被接收,還會(huì)收到 Broker 返回的錯(cuò)誤消息。
跟其他與字節(jié)相關(guān)的配置參數(shù)一樣,該參數(shù)指的是壓縮后的消息大小,也就是說(shuō),只要壓縮后的消息小于 mesage.max.bytes,那么消息的實(shí)際大小可以大于這個(gè)值。
這個(gè)值對(duì)性能有顯著的影響。值越大,那么負(fù)責(zé)處理網(wǎng)絡(luò)連接和請(qǐng)求的線程就需要花越多的時(shí)間來(lái)處理這些請(qǐng)求。它還會(huì)增加磁盤(pán)寫(xiě)入塊的大小,從而影響 IO 吞吐量。
文章參考:
- Kafka【第一篇】Kafka 集群搭建
- https://juejin.im/post/5ba792f5e51d450e9e44184d
- https://blog.csdn.net/k393393/article/details/93099276
- 《Kafka 權(quán)威指南》
- https://www.learningjournal.guru/courses/kafka/kafka-foundation-training/broker-configurations/
【編輯推薦】
- 分布式消息系統(tǒng)的設(shè)計(jì)要點(diǎn)
- Kafka源碼分析及圖解原理之Broker端
標(biāo)題名稱:玩了分布式這么久,你不會(huì)連Kafka都不清楚吧
網(wǎng)頁(yè)URL:http://www.dlmjj.cn/article/dheojio.html


咨詢
建站咨詢
