日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網營銷解決方案
Golang語言中kafka客戶端庫Sarama

Golang 語言中 Kafka 客戶端庫 Sarama

作者:frank 2021-05-07 15:28:03

開發(fā)

前端

Kafka Apache Kafka 是一款開源的消息引擎系統(tǒng)。它在項目中的作用主要是削峰填谷和解耦。本文我們只介紹 Apache Kafka 的 Golang 客戶端庫 Sarama。Sarama 是 MIT 許可的 Apache Kafka 0.8 及更高版本的 Golang 客戶端庫。

成都創(chuàng)新互聯(lián)2013年開創(chuàng)至今,是專業(yè)互聯(lián)網技術服務公司,擁有項目成都做網站、成都網站設計、成都外貿網站建設網站策劃,項目實施與項目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元辰溪做網站,已為上家服務,為辰溪各地企業(yè)和個人服務,聯(lián)系電話:028-86922220

01、介紹

Apache Kafka 是一款開源的消息引擎系統(tǒng)。它在項目中的作用主要是削峰填谷和解耦。本文我們只介紹 Apache Kafka 的 Golang 客戶端庫 Sarama。Sarama 是 MIT 許可的 Apache Kafka 0.8 及更高版本的 Golang 客戶端庫。

如果讀者朋友對 Apache Kafka 服務端還不了解,建議先閱讀官方文檔中的入門部分,本文使用的版本是 Apache Kafka 2.8。

02、生產者

我們可以使用 Sarama 庫的 AsyncProducer 或 SyncProducer 生產消息。在大多數(shù)情況下首選使用 AsyncProducer 生產消息。它通過一個 channel 接收消息,并在后臺盡可能高效的異步生產消息。

SyncProducer 發(fā)送 Kafka 消息后阻塞,直到接收到 ACK 確認。SyncProducer 有兩個警告:它通常效率較低,并且實際的耐用性保證取決于 Producer.RequiredAcks 的配置值。在某些配置中,有時仍會丟失由 SyncProducer 確認的消息,但是使用比較簡單。

為了讀者朋友們容易理解,本文我們介紹 SyncProducer 作為生產者的使用方式。如果讀者朋友想了解 AsyncProducer 作為生產者的使用方式,請參考官方文檔。

使用 SyncProducer 作為生產者的示例代碼:

  
 
 
  1. func sendMessage (brokerAddr []string, config *sarama.Config, topic string, value sarama.Encoder) { 
  2.  producer, err := sarama.NewSyncProducer(brokerAddr, config) 
  3.  if err != nil { 
  4.   fmt.Println(err) 
  5.   return 
  6.  } 
  7.  defer func() { 
  8.   if err = producer.Close(); err != nil { 
  9.    fmt.Println(err) 
  10.    return 
  11.   } 
  12.  }() 
  13.  msg := &sarama.ProducerMessage{ 
  14.   Topic: topic, 
  15.   Value: value, 
  16.  } 
  17.  partition, offset, err := producer.SendMessage(msg) 
  18.  if err != nil { 
  19.   fmt.Println(err) 
  20.   return 
  21.  } 
  22.  fmt.Printf("partition:%d offset:%d\n", partition, offset) 

閱讀上面這段代碼,我們調用 NewSyncProducer() 創(chuàng)建一個新的 SyncProducer,給定 broker 地址和配置信息。調用 SendMessage() 生產給定的消息,并且僅在生產成功或失敗時返回。它將返回分區(qū)(Partition)和生產的消息的偏移量(Offset),如果消息生產失敗,則返回錯誤。

需要注意的是,為了避免泄露,必須在生產者上調用 Close(),因為當它超出范圍時,可能不會自動垃圾回收。

03、消費者

我們可以使用 Sarama 庫的消費者 Consumer 或消費者組 ConsumerGroup API 消費消息。為了讀者朋友們容易理解,本文我們介紹使用 Consumer 消費消息。

Consumer 管理 PartitionConsumers,該 PartitionConsumers 處理來自 brokers 的 Kafka 消息。

Consumer 消費消息的示例代碼:

  
 
 
  1. func consumer (brokenAddr []string, topic string, partition int32, offset int64) { 
  2.  consumer, err := sarama.NewConsumer(brokenAddr, nil) 
  3.  if err != nil { 
  4.   fmt.Println(err) 
  5.   return 
  6.  } 
  7.  defer func() { 
  8.   if err = consumer.Close(); err != nil { 
  9.    fmt.Println(err) 
  10.    return 
  11.   } 
  12.  }() 
  13.  partitionConsumer, err := consumer.ConsumePartition(topic, partition, offset) 
  14.  if err != nil { 
  15.   fmt.Println(err) 
  16.   return 
  17.  } 
  18.  defer func() { 
  19.   if err = partitionConsumer.Close(); err != nil { 
  20.    fmt.Println(err) 
  21.    return 
  22.   } 
  23.  }() 
  24.  for msg := range partitionConsumer.Messages() { 
  25.   fmt.Printf("partition:%d offset:%d key:%s val:%s\n", msg.Partition, msg.Offset, msg.Key, msg.Value) 
  26.  } 

閱讀上面這段代碼,我們調用 NewConsumer() 創(chuàng)建一個新的 consumer,給定 broker 地址和配置信息。調用 ConsumePartition() 創(chuàng)建 PartitionConsumer,給定 topic、partition 和 offset。PartitionConsumer 處理來自給定 topic 和 partition 的 Kafka 消息。

需要注意的是,為了防止泄露,必須調用 consumer 和 partitionConsumer 的 Close(),因為當它超出范圍時,可能不會自動垃圾回收。

04、總結

本文主要介紹如何使用 Apache Kafka 的 Golang 語言客戶端庫 Sarama 生產和消費 Kafka 消息。關于生產者和消費者,分別列舉了一個簡單示例。除此之外,Sarama 庫還提供了很多其它 Api,感興趣的讀者朋友可以閱讀官方文檔了解更多。


分享文章:Golang語言中kafka客戶端庫Sarama
當前路徑:http://www.dlmjj.cn/article/dhsjiso.html