新聞中心
SpringBoot整合Kafka stream實時統(tǒng)計數(shù)據(jù)
作者:FastCoder 2021-08-17 06:48:43
開發(fā)
前端
Kafka Kafka Streams是一個客戶端類庫,用于處理和分析存儲在Kafka中的數(shù)據(jù)。它建立在流式處理的一些重要的概念之上:如何區(qū)分事件時間和處理時間、Windowing的支持、簡單高效的管理和實時查詢應(yīng)用程序狀態(tài)。

目前成都創(chuàng)新互聯(lián)公司已為千余家的企業(yè)提供了網(wǎng)站建設(shè)、域名、雅安服務(wù)器托管、綿陽服務(wù)器托管、企業(yè)網(wǎng)站設(shè)計、茂名網(wǎng)站維護等服務(wù),公司將堅持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。
[[417927]]
環(huán)境:springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2
Kafka Stream介紹
Kafka在0.10版本推出了Stream API,提供了對存儲在Kafka內(nèi)的數(shù)據(jù)進行流式處理和分析的能力。
流式計算一般被用來和批量計算做比較。批量計算往往有一個固定的數(shù)據(jù)集作為輸入并計算結(jié)果。而流式計算的輸入往往是“無界”的(Unbounded Data),持續(xù)輸入的,即永遠拿不到全量數(shù)據(jù)去做計算;同時,計算結(jié)果也是持續(xù)輸出的,只能拿到某一個時刻的結(jié)果,而不是最終的結(jié)果。
Kafka Streams是一個客戶端類庫,用于處理和分析存儲在Kafka中的數(shù)據(jù)。它建立在流式處理的一些重要的概念之上:如何區(qū)分事件時間和處理時間、Windowing的支持、簡單高效的管理和實時查詢應(yīng)用程序狀態(tài)。
Kafka Streams的門檻非常低:和編寫一個普通的Kafka消息處理程序沒有太大的差異,可以通過多進程部署來完成擴容、負載均衡、高可用(Kafka Consumer的并行模型)。
Kafka Streams的一些特點:
- 被設(shè)計成一個簡單的、輕量級的客戶端類庫,能夠被集成到任何Java應(yīng)用中
- 除了Kafka之外沒有任何額外的依賴,利用Kafka的分區(qū)模型支持水平擴容和保證順序性
- 通過可容錯的狀態(tài)存儲實現(xiàn)高效的狀態(tài)操作(windowed joins and aggregations)
- 支持exactly-once語義
- 支持紀錄級的處理,實現(xiàn)毫秒級的延遲
- 提供High-Level的Stream DSL和Low-Level的Processor API
Stream Processing Topology流處理拓撲
- 流是Kafka Streams提供的最重要的抽象:它表示一個無限的、不斷更新的數(shù)據(jù)集。流是不可變數(shù)據(jù)記錄的有序、可重放和容錯序列,其中數(shù)據(jù)記錄定義為鍵值對。
- Stream Processing Application是使用了Kafka Streams庫的應(yīng)用程序。它通過processor topologies定義計算邏輯,其中每個processor topology都是多個stream processor(節(jié)點)通過stream組成的圖。
- A stream processor 是處理器拓撲中的節(jié)點;它表示一個處理步驟,通過每次從拓撲中的上游處理器接收一個輸入記錄,將其操作應(yīng)用于該記錄,來轉(zhuǎn)換流中的數(shù)據(jù),并且隨后可以向其下游處理器生成一個或多個輸出記錄。
有兩種特殊的processor:
Source Processor 源處理器是一種特殊類型的流處理器,它沒有任何上游處理器。它通過使用來自一個或多個kafka topic的記錄并將其轉(zhuǎn)發(fā)到其下游處理器,從而從一個或多個kafka topic生成其拓撲的輸入流。
Sink Processor 接收器處理器是一種特殊類型的流處理器,沒有下游處理器。它將從其上游處理器接收到的任何記錄發(fā)送到指定的kafka topic。
相關(guān)的核心概念查看如下鏈接
下面演示Kafka Stream 在Springboot中的應(yīng)用
依賴
org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka org.apache.kafka kafka-streams
配置
- server:
- port: 9090
- spring:
- application:
- name: kafka-demo
- kafka:
- streams:
- application-id: ${spring.application.name}
- properties:
- spring.json.trusted.packages: '*'
- bootstrap-servers:
- - localhost:9092
- - localhost:9093
- - localhost:9094
- producer:
- acks: 1
- retries: 10
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.springframework.kafka.support.serializer.JsonSerializer #org.apache.kafka.common.serialization.StringSerializer
- properties:
- spring.json.trusted.packages: '*'
- consumer:
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer #org.apache.kafka.common.serialization.StringDeserializer
- enable-auto-commit: false
- group-id: ConsumerTest
- auto-offset-reset: latest
- properties:
- session.timeout.ms: 12000
- heartbeat.interval.ms: 3000
- max.poll.records: 100
- spring.json.trusted.packages: '*'
- listener:
- ack-mode: manual-immediate
- type: batch
- concurrency: 8
- properties:
- max.poll.interval.ms: 300000
消息發(fā)送
- @Service
- public class MessageSend {
- @Resource
- private KafkaTemplate
kafkaTemplate ; - public void sendMessage2(Message message) {
- kafkaTemplate.send(new ProducerRecord
("test", message)).addCallback(result -> { - System.out.println("執(zhí)行成功..." + Thread.currentThread().getName()) ;
- }, ex -> {
- System.out.println("執(zhí)行失敗") ;
- ex.printStackTrace() ;
- }) ;
- }
- }
消息監(jiān)聽
- @KafkaListener(topics = {"test"})
- public void listener2(List
> records, Acknowledgment ack) { - for (ConsumerRecord
record : records) { - System.out.println(this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ;
- }
- try {
- TimeUnit.SECONDS.sleep(0) ;
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- ack.acknowledge() ;
- }
- @KafkaListener(topics = {"demo"})
- public void listenerDemo(List
> records, Acknowledgment ack) { - for (ConsumerRecord
record : records) { - System.out.println("Demo Topic: " + this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ;
- }
- ack.acknowledge() ;
- }
Kafka Stream處理
消息轉(zhuǎn)換并轉(zhuǎn)發(fā)其它Topic
- @Bean
- public KStream
- KStream
- stream.map((key, value) -> {
- System.out.println("原始消息內(nèi)容:" + new String((byte[]) value, Charset.forName("UTF-8"))) ;
- return new KeyValue<>(key, "{\"title\": \"123123\", \"message\": \"重新定義內(nèi)容\"}".getBytes(Charset.forName("UTF-8"))) ;
- }).to("demo") ;
- return stream;
- }
執(zhí)行結(jié)果:
Stream對象處理
- @Bean
- public KStream
kStream4(StreamsBuilder streamsBuilder) { - JsonSerde
jsonSerde = new JsonSerde<>() ; - JsonDeserializer
descri = (JsonDeserializer ) jsonSerde.deserializer() ; - descri.addTrustedPackages("*") ;
- KStream
stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); - stream.map((key, value) -> {
- value.setTitle("XXXXXXX") ;
- return new KeyValue<>(key, value) ;
- }).to("demo", Produced.with(Serdes.String(), jsonSerde)) ;
- return stream;
- }
執(zhí)行結(jié)果:
分組處理
- @Bean
- public KStream
kStream5(StreamsBuilder streamsBuilder) { - JsonSerde
jsonSerde = new JsonSerde<>() ; - JsonDeserializer
descri = (JsonDeserializer ) jsonSerde.deserializer() ; - descri.addTrustedPackages("*") ;
- KStream
stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); - stream.selectKey(new KeyValueMapper
() { - @Override
- public String apply(String key, Message value) {
- return value.getOrgCode() ;
- }
- })
- .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
- .count()
- .toStream().print(Printed.toSysOut());
- return stream;
- }
執(zhí)行結(jié)果:
聚合
- @Bean
- public KStream
kStream6(StreamsBuilder streamsBuilder) { - JsonSerde
jsonSerde = new JsonSerde<>() ; - JsonDeserializer
descri = (JsonDeserializer ) jsonSerde.deserializer() ; - descri.addTrustedPackages("*") ;
- KStream
stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); - stream.selectKey(new KeyValueMapper
() { - @Override
- public String apply(String key, Message value) {
- return value.getOrgCode() ;
- }
- })
- .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
- .aggregate(() -> 0L, (key, value ,aggValue) -> {
- System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ;
- return aggValue + 1 ;
- }, Materialized.
>as("kvs").withValueSerde(Serdes.Long())) - .toStream().print(Printed.toSysOut());
- return stream;
- }
執(zhí)行結(jié)果:
Filter過濾數(shù)據(jù)
- @Bean
- public KStream
kStream7(StreamsBuilder streamsBuilder) { - JsonSerde
jsonSerde = new JsonSerde<>() ; - JsonDeserializer
descri = (JsonDeserializer ) jsonSerde.deserializer() ; - descri.addTrustedPackages("*") ;
- KStream
stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); - stream.selectKey(new KeyValueMapper
() { - @Override
- public String apply(String key, Message value) {
- return value.getOrgCode() ;
- }
- })
- .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
- .aggregate(() -> 0L, (key, value ,aggValue) -> {
- System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ;
- return aggValue + 1 ;
- }, Materialized.
>as("kvs").withValueSerde(Serdes.Long())) - .toStream()
- .filter((key, value) -> !"2".equals(key))
- .print(Printed.toSysOut());
- return stream;
- }
執(zhí)行結(jié)果:
過濾Key不等于"2"
分支多流處理
- @Bean
- public KStream
kStream8(StreamsBuilder streamsBuilder) { - JsonSerde
jsonSerde = new JsonSerde<>() ; - JsonDeserializer
descri = (JsonDeserializer ) jsonSerde.deserializer() ; - descri.addTrustedPackages("*") ;
- KStream
stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); - // 分支,多流處理
- KStream
[] arrStream = stream.branch( - (key, value) -> "男".equals(value.getSex()),
- (key, value) -> "女".equals(value.getSex()));
- Stream.of(arrStream).forEach(as -> {
- as.foreach((key, message) -> {
- System.out.println(Thread.currentThread().getName() + ", key = " + key + ", message = " + message) ;
- });
- });
- return stream;
- }
執(zhí)行結(jié)果:
多字段分組
不能使用多個selectKey,后面的會覆蓋前面的
- @Bean
- public KStream
kStreamM2(StreamsBuilder streamsBuilder) { - JsonSerde
jsonSerde = new JsonSerde<>() ; - JsonDeserializer
descri = (JsonDeserializer ) jsonSerde.deserializer() ; - descri.addTrustedPackages("*") ;
- KStream
stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); - stream
- .selectKey(new KeyValueMapper
() { - @Override
- public String apply(String key, Message value) {
- System.out.println(Thread.currentThread().getName()) ;
- return value.getTime() + " | " + value.getOrgCode() ;
- }
- })
- .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
- .count()
- .toStream().print(Printed.toSysOut());
- return stream;
- }
執(zhí)行結(jié)果:
本文題目:Springboot整合KafkaStream實時統(tǒng)計數(shù)據(jù)
當前鏈接:http://www.dlmjj.cn/article/codpsgh.html


咨詢
建站咨詢
