新聞中心
Apache Kafka是目前應(yīng)用非常廣泛的分布式消息系統(tǒng),它主要由Scala語言編寫,提供了高性能、高吞吐量、低延遲的消息處理能力,廣泛應(yīng)用于日志處理、數(shù)據(jù)分析、實時計算等場景。對于Linux用戶來說,下載和啟動Kafka并不是一件難事,本文將介紹Linux Kafka下載指南,以便用戶快速下載和使用Kafka。

創(chuàng)新互聯(lián)是專業(yè)的天峻網(wǎng)站建設(shè)公司,天峻接單;提供成都做網(wǎng)站、網(wǎng)站制作,網(wǎng)頁設(shè)計,網(wǎng)站設(shè)計,建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行天峻網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊,希望更多企業(yè)前來合作!
一、下載Kafka
在開始下載Kafka之前,需要先確認(rèn)下載的版本是否適用于當(dāng)前操作系統(tǒng)的版本。Kafka官網(wǎng)提供了多個版本的下載鏈接,用戶需要根據(jù)自己的需求選擇合適的版本進(jìn)行下載。本文以Kafka 2.8.0版本為例,演示下載的操作。
在終端中執(zhí)行以下命令,下載Kafka二進(jìn)制包:
“`
wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
“`
等待下載完成后,使用以下命令解壓文件:
“`
tar -xzf kafka_2.13-2.8.0.tgz
“`
二、啟動Kafka
解壓Kafka之后,進(jìn)入Kafka目錄,執(zhí)行以下命令啟動Kafka:
“`
cd kafka_2.13-2.8.0
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
“`
執(zhí)行以上命令后,Kafka將會啟動,并在終端輸出相關(guān)的日志信息。此時,用戶就可以通過Kafka提供的API對消息進(jìn)行發(fā)送和消費。
三、常見問題解決
在使用Kafka過程中,常常會遇到一些問題,下面將介紹一些解決方法:
1. Kafka啟動后無法連接
有時候,Kafka啟動后可能會遇到無法連接的問題,這時候需要查看Kafka的日志信息。
打開終端,進(jìn)入Kafka目錄,執(zhí)行以下命令:
“`
cd kafka_2.13-2.8.0
tl -f logs/server.log
“`
執(zhí)行以上命令后,會實時輸出Kafka的日志信息,這時候可以根據(jù)輸出的日志信息,快速定位問題,并進(jìn)行解決。
2. Kafka啟動時提示端口被占用
在啟動Kafka時,如果出現(xiàn)“Address already in use”的錯誤,說明Kafka所占用的端口已經(jīng)被占用。這是因為在啟動Kafka之前,可能已經(jīng)有其他進(jìn)程占用了Kafka所需的端口。
解決方法如下:
執(zhí)行以下命令,查看當(dāng)前所有進(jìn)程所占用的端口:
“`
netstat -tlnp
“`
找到所占用的端口,并終止該進(jìn)程:
“`
kill -9 進(jìn)程ID
“`
然后再次啟動Kafka即可。
3. Kafka啟動后,無法發(fā)送和接收消息
在使用Kafka時,有時候可能會遇到無法發(fā)送和接收消息的問題,這時候需要檢查Kafka的配置文件。
打開終端,進(jìn)入Kafka目錄,執(zhí)行以下命令:
“`
cd kafka_2.13-2.8.0
vi config/server.properties
“`
進(jìn)入該文件后,查找以下參數(shù):
“`
listeners=PLNTEXT://:9092
“`
該參數(shù)是Kafka的監(jiān)聽地址和端口號。確認(rèn)該參數(shù)是否正確,并根據(jù)需要進(jìn)行修改。
四、結(jié)論
通過本文的介紹,相信讀者已經(jīng)了解了Linux Kafka下載指南。下載和啟動Kafka非常簡單,只需幾個命令即可完成。但在使用Kafka時,還需要注意Kafka的配置和日志信息,以便快速解決問題。希望本文對于Linux用戶下載和使用Kafka有所幫助。
相關(guān)問題拓展閱讀:
- 深入理解kafka(五)日志存儲
- linux怎么從mq里面讀取報文信息
深入理解kafka(五)日志存儲
5.1文件目錄布局
根目錄下有以下5個checkpoint文件: cleaner-offset-checkpoint, log-start-offset-checkpoint, meta.properties, recovery-point-offset-checkpoint, replication-offset-checkpoint
分區(qū)目錄下有以下目錄: 0000xxx.index(偏移量為64位長整形,長度固定為20位), 0000xxx.log, 0000xxx.timeindex.
還有可能包含.deleted .cleaned .swap等臨時文件, 以及可能的.snapshot .txnindex leader-epoch-checkpoint
5.2日志格式演變
5.2.1 v0版本
kafka0.10.0之前
RECORD_OVERHEAD包括offset(8B)和message size(4B)
RECORD包括:
crc32(4B):crc32校驗值
magic(1B):消息版本號0
attributes(1B):消息屬性。低3位表示壓縮類型:0-NONE 1-GZIP 2-SNAPPY 3-LZ4(0.9.x引入)
key length(4B):表示消息的key的長度。-1代表null
key: 可選
value length(4B):實際消息體的長度。-1代表null
value: 消息體??梢詾榭?,如墓碑消息
5.2.2 v1版本
kafka0.10.0-0.11.0
比v0多了timestamp(8B)字段,表示消息的時間戳
attributes的第4位也被利用起來,0表示timestamp的類型為CreateTime,1表示timestamp的類型為LogAppendTime
timestamp類型由broker端參數(shù)log.message.timestamp.type來配置,默認(rèn)為CreateTime,即采用生產(chǎn)者創(chuàng)建的時間戳
5.2.3 消息壓縮
保證端到端的壓縮,服務(wù)端配置compression.type,默認(rèn)為”producer”,表示保留生產(chǎn)者使用的壓縮方式,還可以配置為”gzip”,”snappy”,”lz4″
多條消息壓縮至value字段,以提高壓縮率
5.2.4 變長字段
變長整形(Varints):每一個字節(jié)都有一個位于更高位的m位(most significant bit),除了最后一個字節(jié)為1,其余都為0,字節(jié)倒序排列
為了使編碼更加高效,Varints使用ZigZag編碼:sint32對應(yīng) (n>31) sint64對應(yīng) (n>63)
5.2.5 v2版本
Record Batch
first offset:
length:
partition leader epoch:
magic:固定為2
attributes:兩個字節(jié)。低3位表示壓縮格式,第4位表示時間戳類型,第5位表示事務(wù)(0-非事務(wù)1-事務(wù)),第6位控制消息(0-非控制1控制)
first timestamp:
max timestamp:
producer id:
producer epoch:
first sequence:
records count:
v2版本的消息去掉了crc字段,另外增加了length(消息總長度)、timestamp delta(時間戳增量)、offset delta(位移增量)和headers信息,并且棄用了attributes
Record
length:
attributes:棄用,但仍占據(jù)1B
timestamp delta:
offset delta:
headers:
5.3日志索引
稀疏索引(sparse index):每當(dāng)寫入一定量(broker端參數(shù)log.index.interval.bytes指定,默認(rèn)為4096B),偏移量索引文件和時間索引文件分別對應(yīng)一個索引項
日志段切分策略:
1.大小超過broker端參數(shù)log.segment.bytes配置的值,默認(rèn)為(1GB)
2.當(dāng)前日志段消息的更大時間戳與當(dāng)前系統(tǒng)的時間戳差值大于log.roll.ms或者log.roll.hours,ms優(yōu)先級高,默認(rèn)log.roll.hours=168(7天)
3.索引文件或者時間戳索引文件的大小大于log.index.size.max.bytes配置的值,默認(rèn)為(10MB)
4.偏移量差值(offset-baseOffset)>Integer.MAX_VALUE
5.3.1 偏移量索引
每個索引項占用8個字節(jié),分為兩個部分:1.relativeOffset相對偏移量(4B) 2.position物理地址(4B)
使用kafka-dump-log.sh腳本來解析.index文件(包括.timeindex、.snapshot、.txnindex等文件),如下:
bin/kafka-dump-log.sh –files /tmp/kafka-logs/topicId-0/00……00.index
如果broker端參數(shù)log.index.size.max.bytes不是8的倍數(shù),內(nèi)部會自動轉(zhuǎn)換為8的倍數(shù)
5.3.2 時間戳索引
每個索引項占用12個字節(jié),分為兩個部分:1.timestamp當(dāng)前日志分段的更大時間戳(12B) 2.relativeOffset時間戳對應(yīng)的相對偏移量(4B)
如果broker端參數(shù)log.index.size.max.bytes不是12的倍數(shù),內(nèi)部會自動轉(zhuǎn)換為12的倍數(shù)
5.4日志清理
日志清理策略可以控制到主題級別
5.4.1 日志刪除
broker端參數(shù)log.cleanup.policy設(shè)置為delete(默認(rèn)為delete)
檢測周期broker端參數(shù)log.retention.check.interval.ms=300000(默認(rèn)5分鐘)
1.基于時間
broker端參數(shù)log.retention.hours,log.retention.minutes,log.retention.ms,優(yōu)先級ms>minutes>hours
刪除時先增加.delete后綴,延遲刪除根據(jù)file.delete.delay.ms(默認(rèn)60000)配置
2.基于日志大小
日志總大小為broker端參數(shù)log.retention.bytes(默認(rèn)為-1,表示無窮大)
日志段大小為broker端參數(shù)log.segment.bytes(默認(rèn)為,1GB)
3.基于日志起始偏移量
DeleteRecordRequest請求
1.KafkaAdminClient的deleteRecord()
2.kafka-delete-record.sh腳本
5.4.2 日志壓縮
broker端參數(shù)log.cleanup.policy設(shè)置為compact,且log.cleaner.enable設(shè)置為true(默認(rèn)為true)
5.5磁盤存儲
相關(guān)測試:一個由6塊7200r/min的RAID-5陣列組成的磁盤簇的線性寫入600MB/s,隨機寫入100KB/s,隨機內(nèi)存寫入400MB/s,線性內(nèi)存3.6GB/s
5.5.1 頁緩存
Linux操作系統(tǒng)的vm.dirty_background_ratio參數(shù)用來指定臟頁數(shù)量達(dá)到系統(tǒng)的百分比之后就觸發(fā)pdflush/flush/kdmflush,一般小于10,不建議為0
vm.dirty_ratio表示臟頁百分比之后刷盤,但是阻塞新IO請求
kafka同樣提供同步刷盤及間斷性強制刷盤(fsync)功能,可以通過log.flush.interval.messages、log.flush.interval.ms等參數(shù)來控制
kafka不建議使用swap分區(qū),vm.swappiness參數(shù)上限為100,下限為0,建議設(shè)置為1
5.5.2 磁盤I/O流程
一般磁盤IO的場景有以下4種:
1.用戶調(diào)用標(biāo)準(zhǔn)C庫進(jìn)行IO操作,數(shù)據(jù)流為:應(yīng)用程序Buffer->C庫標(biāo)準(zhǔn)IOBuffer->文件系統(tǒng)也緩存->通過具體文件系統(tǒng)到磁盤
2.用戶調(diào)用文件IO,數(shù)據(jù)流為:應(yīng)用程序Buffer->文件系統(tǒng)也緩存->通過具體文件系統(tǒng)到磁盤
3.用戶打開文件時使用O_DIRECT,繞過頁緩存直接讀寫磁盤
4.用戶使用類似dd工具,并使用direct參數(shù),繞過系統(tǒng)cache與文件系統(tǒng)直接讀寫磁盤
Linux系統(tǒng)中IO調(diào)度策略有4種:
1.NOOP:no operation
2.CFQ
3.DEADLINE
4.ANTICIPATORY
5.5.3 零拷貝
指數(shù)據(jù)直接從磁盤文件復(fù)制到網(wǎng)卡設(shè)備中,不需要經(jīng)應(yīng)用程序
對linux而言依賴于底層的sendfile()
對java而言,F(xiàn)ileChannal.transferTo()的底層實現(xiàn)就是sendfile()
linux怎么從mq里面讀取報文信息
在Linux中,可以通過以下方式從消息隊列MQ中讀取報文信息:
1. 使用自帶的PN命令。如果使用的消息隊列系統(tǒng)自帶有消息查詢命令,可以直接使用該命令查詢消息隊列中的報文。如RabbitMQ有rabbitmqctl list_queues等命令。
2. 使用消息隊列的API。大多數(shù)消息隊列系統(tǒng)都提供了客戶端API,可以通過編寫程序使用 API 讀取消息隊列中的消息。例如:
– RabbitMQ提供AMQP客戶端API,可以使用Polyglot AMQP, librabbitmq等庫調(diào)用API讀取消息。
– Kafka提供Kafka客戶端API,可以使用kafka-python, librdkafka等庫調(diào)用API讀取主題中的消息。
– ActiveMQ提供JMS API,可以使用JMS客戶端如NMS讀取消息。
使用消息隊列的API是主流的讀取MQ報文的方式。需要選擇消息隊列對應(yīng)的客戶端API,編寫讀取消息的程序。
3. 使用消息隊列提供的管理工具。一些消息隊列系統(tǒng)提供了并模圖形化的管理控制臺或工具,可以通過該工具查詢和讀取消息隊列中的報文信息。如:
– RabbitMQ提供了RabbitMQ Management插件,可以通過Web UI查詢消息隊列信息。
– Kafka提供了Confluent Control Center等工具可以管理主題和消費消息。
– ActiveMQ提乎肆供了ActiveMQ Console可以管理消息和訂閱者。
使用管理工具可以更直觀簡便地讀取MQ中的報文信息。
4. 解析消息隊列的數(shù)據(jù)存儲。一些消息隊列系統(tǒng)使用數(shù)據(jù)庫等方式存儲消息數(shù)據(jù),通過解析其數(shù)據(jù)存儲也可以讀取報文信息。但這種方式較復(fù)雜,需要深入研究消息隊列的內(nèi)部實現(xiàn),一般不推薦。
所以,讀取Linux下MQ的報文信息,推薦的方式主要是:
1) 使用消息隊列自帶的命令行工具(如果有)
2) 調(diào)用消息隊列提供的API,編寫程序讀取消息
3) 使用消息隊列的管理控制臺或圖形化工具查詢消息絕頃緩
這幾種方式可以比較方便和標(biāo)準(zhǔn)地讀取MQ中的報文信息。選擇具體的方式需要根據(jù)使用的消息隊列系統(tǒng)來決定。
linux kafka 下載的介紹就聊到這里吧,感謝你花時間閱讀本站內(nèi)容,更多關(guān)于linux kafka 下載,簡單易懂的Linux Kafka下載指南,深入理解kafka(五)日志存儲,linux怎么從mq里面讀取報文信息的信息別忘了在本站進(jìn)行查找喔。
成都服務(wù)器租用選創(chuàng)新互聯(lián),先試用再開通。
創(chuàng)新互聯(lián)(www.cdcxhl.com)提供簡單好用,價格厚道的香港/美國云服務(wù)器和獨立服務(wù)器。物理服務(wù)器托管租用:四川成都、綿陽、重慶、貴陽機房服務(wù)器托管租用。
當(dāng)前名稱:簡單易懂的Linux Kafka下載指南 (linux kafka 下載)
本文來源:http://www.dlmjj.cn/article/djsipei.html


咨詢
建站咨詢
