新聞中心
SparkStreaming與Kafka整合遇到的問題及解決方案
作者:佚名 2017-08-03 09:37:35
大數(shù)據(jù)
Kafka
Spark 最近工作中是做日志分析的平臺,采用了sparkstreaming+kafka,采用kafka主要是看中了它對大數(shù)據(jù)量處理的高性能,處理日志類應用再好不過了,采用了sparkstreaming的流處理框架 主要是考慮到它本身是基于spark核心的,以后的批處理可以一站式服務,并且可以提供準實時服務到elasticsearch中,可以實現(xiàn)準實時定位系統(tǒng)日志。

網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)建站!專注于網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、小程序開發(fā)、集團企業(yè)網(wǎng)站建設(shè)等服務項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了臨滄免費建站歡迎大家使用!
前言
最近工作中是做日志分析的平臺,采用了sparkstreaming+kafka,采用kafka主要是看中了它對大數(shù)據(jù)量處理的高性能,處理日志類應用再好不過了,采用了sparkstreaming的流處理框架 主要是考慮到它本身是基于spark核心的,以后的批處理可以一站式服務,并且可以提供準實時服務到elasticsearch中,可以實現(xiàn)準實時定位系統(tǒng)日志。
實現(xiàn)
Spark-Streaming獲取kafka數(shù)據(jù)的兩種方式-Receiver與direct的方式。
一. 基于Receiver方式
這種方式使用Receiver來獲取數(shù)據(jù)。Receiver是使用Kafka的高層次Consumer API來實現(xiàn)的。receiver從Kafka中獲取的數(shù)據(jù)都是存儲在Spark Executor的內(nèi)存中的,然后Spark Streaming啟動的job會去處理那些數(shù)據(jù)。代碼如下:
- SparkConf sparkConf = new SparkConf().setAppName("log-etl").setMaster("local[4]");
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
- int numThreads = Integer.parseInt("4");
- Map
topicMap = new HashMap (); - topicMap.put("group-45", numThreads);
- //接收的參數(shù)分別是JavaStreamingConetxt,zookeeper連接地址,groupId,kafak的topic
- JavaPairReceiverInputDStream
messages = - KafkaUtils.createStream(jssc, "172.16.206.27:2181,172.16.206.28:2181,172.16.206.29:2181", "1", topicMap);
剛開始的時候系統(tǒng)正常運行,沒有發(fā)現(xiàn)問題,但是如果系統(tǒng)異常重新啟動sparkstreaming程序后,發(fā)現(xiàn)程序會重復處理已經(jīng)處理過的數(shù)據(jù),這種基于receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數(shù)據(jù)的傳統(tǒng)方式。這種方式配合著WAL機制可以保證數(shù)據(jù)零丟失的高可靠性,但是卻無法保證數(shù)據(jù)被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。官方現(xiàn)在也已經(jīng)不推薦這種整合方式,官網(wǎng)相關(guān)地址 http://spark.apache.org/docs/latest/streaming-kafka-integration.html ,下面我們使用官網(wǎng)推薦的第二種方式kafkaUtils的createDirectStream()方式。
二.基于Direct的方式
這種新的不基于Receiver的直接方式,是在Spark 1.3中引入的,從而能夠確保更加健壯的機制。替代掉使用Receiver來接收數(shù)據(jù)后,這種方式會周期性地查詢Kafka,來獲得每個topic+partition的***的offset,從而定義每個batch的offset的范圍。當處理數(shù)據(jù)的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset范圍的數(shù)據(jù)。
代碼如下:
- SparkConf sparkConf = new SparkConf().setAppName("log-etl");
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
- HashSet
topicsSet = new HashSet (Arrays.asList(topics.split(","))); - HashMap
kafkaParams = new HashMap (); - kafkaParams.put("metadata.broker.list", brokers);
- // Create direct kafka stream with brokers and topics
- JavaPairInputDStream
messages = KafkaUtils.createDirectStream( - jssc,
- String.class,
- String.class,
- StringDecoder.class,
- StringDecoder.class,
- kafkaParams,
- topicsSet
- );
這種direct方式的優(yōu)點如下:
1.簡化并行讀?。喝绻x取多個partition,不需要創(chuàng)建多個輸入DStream然后對它們進行union操作。Spark會創(chuàng)建跟Kafka partition一樣多的RDD partition,并且會并行從Kafka中讀取數(shù)據(jù)。所以在Kafka partition和RDD partition之間,有一個一對一的映射關(guān)系。
2.一次且僅一次的事務機制:基于receiver的方式,在spark和zk中通信,很有可能導致數(shù)據(jù)的不一致。
3.高效率:在receiver的情況下,如果要保證數(shù)據(jù)的不丟失,需要開啟wal機制,這種方式下,為、數(shù)據(jù)實際上被復制了兩份,一份在kafka自身的副本中,另外一份要復制到wal中, direct方式下是不需要副本的。
三.基于Direct方式丟失消息的問題
貌似這種方式很***,但是還是有問題的,當業(yè)務需要重啟sparkstreaming程序的時候,業(yè)務日志依然會打入到kafka中,當job重啟后只能從***的offset開始消費消息,造成重啟過程中的消息丟失。kafka中的offset如下圖(使用kafkaManager實時監(jiān)控隊列中的消息):
當停止業(yè)務日志的接受后,先重啟spark程序,但是發(fā)現(xiàn)job并沒有將先前打入到kafka中的數(shù)據(jù)消費掉。這是因為消息沒有經(jīng)過zk,topic的offset也就沒有保存
四.解決消息丟失的處理方案
一般有兩種方式處理這種問題,可以先spark streaming 保存offset,使用spark checkpoint機制,第二種是程序中自己實現(xiàn)保存offset邏輯,我比較喜歡第二種方式,以為這種方式可控,所有主動權(quán)都在自己手中。
先看下大體流程圖,
- SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("log-etl");
- Set
topicSet = new HashSet (); - topicSet.add("group-45");
- kafkaParam.put("metadata.broker.list", "172.16.206.17:9092,172.16.206.31:9092,172.16.206.32:9092");
- kafkaParam.put("group.id", "simple1");
- // transform java Map to scala immutable.map
- scala.collection.mutable.Map
testMap = JavaConversions.mapAsScalaMap(kafkaParam); - scala.collection.immutable.Map
scalaKafkaParam = - testMap.toMap(new Predef.$less$colon$less
, Tuple2 >() { - public Tuple2
apply(Tuple2 v1) { - return v1;
- }
- });
- // init KafkaCluster
- kafkaCluster = new KafkaCluster(scalaKafkaParam);
- scala.collection.mutable.Set
mutableTopics = JavaConversions.asScalaSet(topicSet); - immutableTopics = mutableTopics.toSet();
- scala.collection.immutable.Set
topicAndPartitionSet2 = kafkaCluster.getPartitions(immutableTopics).right().get(); - // kafka direct stream 初始化時使用的offset數(shù)據(jù)
- Map
consumerOffsetsLong = new HashMap (); - // 沒有保存offset時(該group***消費時), 各個partition offset 默認為0
- if (kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet2).isLeft()) {
- System.out.println(kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet2).left().get());
- Set
topicAndPartitionSet1 = JavaConversions.setAsJavaSet((scala.collection.immutable.Set)topicAndPartitionSet2); - for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
- consumerOffsetsLong.put(topicAndPartition, 0L);
- }
- }
- // offset已存在, 使用保存的offset
- else {
- scala.collection.immutable.Map
consumerOffsetsTemp = kafkaCluster.getConsumerOffsets("simple1", topicAndPartitionSet2).right().get(); - Map
consumerOffsets = JavaConversions.mapAsJavaMap((scala.collection.immutable.Map)consumerOffsetsTemp); - Set
topicAndPartitionSet1 = JavaConversions.setAsJavaSet((scala.collection.immutable.Set)topicAndPartitionSet2); - for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
- Long offset = (Long)consumerOffsets.get(topicAndPartition);
- consumerOffsetsLong.put(topicAndPartition, offset);
- }
- }
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000));
- kafkaParamBroadcast = jssc.sparkContext().broadcast(kafkaParam);
- // create direct stream
- JavaInputDStream
message = KafkaUtils.createDirectStream( - jssc,
- String.class,
- String.class,
- StringDecoder.class,
- StringDecoder.class,
- String.class,
- kafkaParam,
- consumerOffsetsLong,
- new Function
, String>() { - public String call(MessageAndMetadata
v1) throws Exception { - System.out.println("接收到的數(shù)據(jù)《《==="+v1.message());
- return v1.message();
- }
- }
- );
- // 得到rdd各個分區(qū)對應的offset, 并保存在offsetRanges中
- final AtomicReference
offsetRanges = new AtomicReference (); - JavaDStream
javaDStream = message.transform(new Function , JavaRDD >() { - public JavaRDD
call(JavaRDD rdd) throws Exception { - OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
- offsetRanges.set(offsets);
- return rdd;
- }
- });
- // output
- javaDStream.foreachRDD(new Function
, Void>() { - public Void call(JavaRDD
v1) throws Exception { - if (v1.isEmpty()) return null;
- List
list = v1.collect(); - for(String s:list){
- System.out.println("數(shù)據(jù)==="+s);
- }
- for (OffsetRange o : offsetRanges.get()) {
- // 封裝topic.partition 與 offset對應關(guān)系 java Map
- TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition());
- Map
topicAndPartitionObjectMap = new HashMap (); - topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset());
- // 轉(zhuǎn)換java map to scala immutable.map
- scala.collection.mutable.Map
testMap = - JavaConversions.mapAsScalaMap(topicAndPartitionObjectMap);
- scala.collection.immutable.Map
scalatopicAndPartitionObjectMap = - testMap.toMap(new Predef.$less$colon$less
, Tuple2 >() { - public Tuple2
apply(Tuple2 v1) { - return v1;
- }
- });
- // 更新offset到kafkaCluster
- kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"), scalatopicAndPartitionObjectMap);
- System.out.println("原數(shù)據(jù)====》"+o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
- );
- }
- return null;
- }
- });
- jssc.start();
- jssc.awaitTermination();
- }
基本使用這種方式就可以解決數(shù)據(jù)丟失的問題。
當前文章:SparkStreaming與Kafka整合遇到的問題及解決方案
本文地址:http://www.dlmjj.cn/article/coddcie.html


咨詢
建站咨詢
