新聞中心
Apache Kafka已經(jīng)成為現(xiàn)代分布式系統(tǒng)中廣泛使用的消息傳遞平臺(tái)。然而,Kafka在某些方面缺乏特定功能,如事務(wù)支持和流處理。為了彌補(bǔ)這些功能,Confluent公司開發(fā)了一個(gè)名為librdkafka的C語言庫(kù)。本文將介紹如何在Linux上安裝和配置librdkafka。

之一步:安裝依賴項(xiàng)
在安裝前,我們需要確保安裝了以下依賴項(xiàng):
– gcc編譯器
– openssl
– zlib
– libssl-dev
– libz-dev
在Ubuntu上,可以使用以下命令安裝這些依賴項(xiàng):
sudo apt update
sudo apt install gcc openssl zlib1g-dev libssl-dev libz-dev
在其他Linux操作系統(tǒng)上,可以使用相應(yīng)的軟件包管理器安裝這些依賴項(xiàng)。
第二步:下載和安裝librdkafka
可以從GitHub librdkafka存儲(chǔ)庫(kù)中下載最新版本的librdkafka??梢允褂靡韵旅顝腉itHub上下載:
git clone https://github.com/edenhill/librdkafka.git
如果您沒有安裝Git,請(qǐng)使用以下命令:
sudo apt-get install git
然后切換到下載的目錄,并進(jìn)行編譯和安裝??梢允褂靡韵旅钔瓿桑?/p>
cd librdkafka
./configure
make
sudo make install
此將在您系統(tǒng)中安裝librdkafka。
第三步:編寫和測(cè)試程序
現(xiàn)在安裝已完成,可以編寫并測(cè)試您的程序。以下示例程序使用生產(chǎn)者API將消息發(fā)送到Kafka主題:
#include
int mn(int argc, char **argv) {
rd_kafka_t *rk; /* Producer instance handle */
rd_kafka_conf_t *conf; /* Temporary configuration object */
char errstr[512]; /* librdkafka API error reporting buffer */
/* Set up a temporary configuration object */
conf = rd_kafka_conf_new();
/* Create a new Kafka producer – use rd_kafka_conf_dup(conf) to *
* copy the configuration, for multiple producers or consumers */
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)))) {
fprintf(stderr, “%% Fled to create new producer: %s\n”, errstr);
return 1;
}
/* Add brokers */
if (rd_kafka_brokers_add(rk, “l(fā)ocalhost:9092”) == 0) {
fprintf(stderr, “%% No valid brokers specified\n”);
return 1;
}
/* Create a new topic – this topic object is *temporary* and *
* lets us specify topic-specific configuration */
rd_kafka_topic_t *rkt;
if (!(rkt = rd_kafka_topic_new(rk, “test”, NULL))) {
fprintf(stderr, “%% Fled to create topic object: %s\n”, rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(rk);
return 1;
}
/* Produce a message */
if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY /* Copy payload */ ,
“Message payload”, 15 /* Message length */ , NULL, 0, NULL) == -1) {
fprintf(stderr, “%% Fled to produce to topic %s partition %i: %s\n”, rd_kafka_topic_name(rkt), RD_KAFKA_PARTITION_UA, rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
return 1;
}
fprintf(stderr, “%% Enqueued message (%6d bytes) for topic %s\n”, 15, rd_kafka_topic_name(rkt));
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
return 0;
}
可以使用以下命令從命令行編譯和運(yùn)行程序:
gcc -o test_producer test_producer.c -lrdkafka
./test_producer
這將使用生產(chǎn)者API發(fā)送消息并輸出確認(rèn)消息是否已成功發(fā)送。
結(jié)論
相關(guān)問題拓展閱讀:
- linux 怎樣查看kafka的某 topic數(shù)據(jù)
- Linux中的零拷貝技術(shù)
linux 怎樣查看kafka的某 topic數(shù)據(jù)
1、創(chuàng)建一個(gè)需要增加備份因子的topic列表的文件,文件格式是json格式的。
2、使用kafka官方提供的工具拿到上碼拍面topic的partions 分布情況,并重定向到文件中。
3、修改ressgintopic.conf 文件的,手動(dòng)分配新增加的partion 備份因子。
4、通過下面命令執(zhí)行備份因子擴(kuò)容過程,bin/kafka-reassign-partitions.sh –zookeeper localhost:reassignment-json。
5、最后查看kafka的某 topic數(shù)據(jù)如圖。
注意大肆事項(xiàng)滾模轎:
Kafka的目的是通過Hadoop的并行加載機(jī)制來統(tǒng)一線上和離線的消息處理,也是為了通過集群來提供實(shí)時(shí)的消息。
Linux中的零拷貝技術(shù)
參考文章: 淺析Linux中的零拷貝技術(shù)
內(nèi)核和用戶空間,共享內(nèi)存。數(shù)據(jù)copy到內(nèi)核區(qū)后,只需要把地址共享給應(yīng)用程序即可,無需再copy一次數(shù)據(jù)到用戶空間。
優(yōu)點(diǎn):
缺點(diǎn):
應(yīng)用:
kafka生產(chǎn)者發(fā)送消息到broker的時(shí)候,broker的網(wǎng)絡(luò)接收到數(shù)據(jù)后,copy到broker的內(nèi)核空間。然后通過mmap技術(shù),broker會(huì)修改消息頭,添加一些元數(shù)據(jù)。所以,寫入數(shù)據(jù)很快。當(dāng)然順序IO也是關(guān)鍵技術(shù)
內(nèi)核直接發(fā)送數(shù)據(jù)到socket,無需用戶空間參與。
優(yōu)點(diǎn):
缺點(diǎn):
為了節(jié)省內(nèi)核里面的一次copy,我們可以使用優(yōu)化過的sendfile。該系統(tǒng)方法需要由特定的硬件來支持,并不是所有系統(tǒng)都支持。如下:
sendfile的時(shí)候,直接把內(nèi)核空間的地址傳遞給socket緩存,DMA直接從指定地址讀取數(shù)據(jù)到流里面。
sendfile只適用于將數(shù)據(jù)從文件拷貝到套接字上,限定了它的使用范圍。Linux在2.6.17版本引入splice系統(tǒng)調(diào)用,用于在兩個(gè)文件描述慎襲符中移動(dòng)數(shù)據(jù)。
splice調(diào)用在兩個(gè)文件描述符之間移動(dòng)數(shù)據(jù),而不需要數(shù)據(jù)在內(nèi)核空間和用戶空間來回拷貝。他從fd_in拷貝len長(zhǎng)度的數(shù)據(jù)到fd_out,但是有一方必須是管道設(shè)備,這也是目前splice的一些局限性。flags參數(shù)有以下幾種取值:
splice調(diào)用利用了Linux提出的管道緩沖區(qū)機(jī)制, 所以稿正至少一個(gè)描述符要為管道。
以上幾種零拷貝技術(shù)都是減少數(shù)據(jù)在用戶空間和內(nèi)核空間拷貝技術(shù)實(shí)現(xiàn)的,但是有些時(shí)候,數(shù)據(jù)必須在用戶空間和內(nèi)核空間之間拷貝。這時(shí)候,我們只能針對(duì)數(shù)據(jù)在用戶空間和內(nèi)核空間拷貝的時(shí)機(jī)上下功夫了。Linux通常利用寫時(shí)復(fù)制(copy on write)來減少系統(tǒng)開銷,這個(gè)技術(shù)又時(shí)常稱作COW。
摘錄網(wǎng)上:
傳統(tǒng)的fork()系統(tǒng)調(diào)用直接把所有的資源復(fù)制給新創(chuàng)建的進(jìn)程。這種實(shí)現(xiàn)過于簡(jiǎn)單并且效率低下,因?yàn)樗截惖臄?shù)據(jù)也許并不共享,更糟的情況是,如果新進(jìn)程打算立即執(zhí)行一個(gè)新的映像,那么所有的拷貝都將前功盡棄。Linux的fork()使用寫時(shí)拷貝(copy-on-write)頁實(shí)現(xiàn)。寫時(shí)拷貝是一種可以推遲甚至免除拷貝數(shù)據(jù)的技術(shù)。內(nèi)核此時(shí)并不復(fù)制整個(gè)進(jìn)程地址空間,而是讓父進(jìn)程和子進(jìn)程共享同一個(gè)拷貝。只有在需要寫入的時(shí)候,數(shù)據(jù)才會(huì)被復(fù)制,從而使各個(gè)進(jìn)程擁有各自的拷貝。也就是說,資源的復(fù)制只有在需要寫入的時(shí)候才進(jìn)行,在此之前,只是以只讀方式共享。這種技術(shù)使地址空間上的鍵孝悔頁的拷貝被推遲到實(shí)際發(fā)生寫入的時(shí)候。在頁根本不會(huì)被寫入的情況下—舉例來說,fork()后立即調(diào)用exec()—它們就無需復(fù)制了。fork()的實(shí)際開銷就是復(fù)制父進(jìn)程的頁表以及給子進(jìn)程創(chuàng)建惟一的進(jìn)程描述符。在一般情況下,進(jìn)程創(chuàng)建后都會(huì)馬上運(yùn)行一個(gè)可執(zhí)行的文件,這種優(yōu)化可以避免拷貝大量根本就不會(huì)被使用的數(shù)據(jù)(地址空間里常常包含數(shù)十兆的數(shù)據(jù))。由于Unix強(qiáng)調(diào)進(jìn)程快速執(zhí)行的能力,所以這個(gè)優(yōu)化是很重要的。這里補(bǔ)充一點(diǎn):Linux COW與exec沒有必然聯(lián)系。
我總結(jié)下:
copy-on-write技術(shù)其實(shí)是一種延遲復(fù)制的技術(shù),只有需要用(寫)的時(shí)候,才去復(fù)制數(shù)據(jù)。
linux 安裝librdkafka的介紹就聊到這里吧,感謝你花時(shí)間閱讀本站內(nèi)容,更多關(guān)于linux 安裝librdkafka,如何在Linux上安裝librdkafka,linux 怎樣查看kafka的某 topic數(shù)據(jù),Linux中的零拷貝技術(shù)的信息別忘了在本站進(jìn)行查找喔。
成都服務(wù)器租用選創(chuàng)新互聯(lián),先試用再開通。
創(chuàng)新互聯(lián)(www.cdcxhl.com)提供簡(jiǎn)單好用,價(jià)格厚道的香港/美國(guó)云服務(wù)器和獨(dú)立服務(wù)器。物理服務(wù)器托管租用:四川成都、綿陽、重慶、貴陽機(jī)房服務(wù)器托管租用。
網(wǎng)站名稱:如何在Linux上安裝librdkafka (linux 安裝librdkafka)
標(biāo)題路徑:http://www.dlmjj.cn/article/dpischd.html


咨詢
建站咨詢
