新聞中心
這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Storm如何和Kafka進行整合
這篇文章將為大家詳細講解有關Storm如何和Kafka進行整合,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
創(chuàng)新互聯(lián)專注于企業(yè)成都全網(wǎng)營銷推廣、網(wǎng)站重做改版、義縣網(wǎng)站定制設計、自適應品牌網(wǎng)站建設、H5場景定制、商城網(wǎng)站開發(fā)、集團公司官網(wǎng)建設、成都外貿(mào)網(wǎng)站建設、高端網(wǎng)站制作、響應式網(wǎng)頁設計等建站業(yè)務,價格優(yōu)惠性價比高,為義縣等各大城市提供網(wǎng)站開發(fā)制作服務。
對于Storm 如何和Kafka進行整合
package com.mixbox.storm.kafka; import backtype.storm.Config; import backtype.storm.metric.api.IMetric; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import kafka.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.mixbox.storm.kafka.PartitionManager.KafkaMessageId; import java.util.*; /** * @author Yin Shuai */ public class KafkaSpout extends BaseRichSpout { public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); /** * 內(nèi)部類,Message和Offset的偏移量對象 * * @author Yin Shuai */ public static class MessageAndRealOffset { public Message msg; public long offset; public MessageAndRealOffset(Message msg, long offset) { this.msg = msg; this.offset = offset; } } /** * 發(fā)射的枚舉類 * @author Yin Shuai */ static enum EmitState { EMITTED_MORE_LEFT, EMITTED_END, NO_EMITTED } String _uuid = UUID.randomUUID().toString(); SpoutConfig _spoutConfig; SpoutOutputCollector _collector; // 分區(qū)的協(xié)調(diào)器,getMyManagedPartitions 拿到我所管理的分區(qū) PartitionCoordinator _coordinator; // 動態(tài)的分區(qū)鏈接:保存到kafka各個節(jié)點的連接,以及負責的topic的partition號碼 DynamicPartitionConnections _connections; // 提供了從zookeeper讀寫kafka 消費者信息的功能 ZkState _state; // 上次更新的毫秒數(shù) long _lastUpdateMs = 0; // 當前的分區(qū) int _currPartitionIndex = 0; public KafkaSpout(SpoutConfig spoutConf) { _spoutConfig = spoutConf; } @SuppressWarnings("unchecked") @Override public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) { _collector = collector; ListzkServers = _spoutConfig.zkServers; // 初始化的時候如果zkServers 為空,那么初始化 默認的配置Zookeeper if (zkServers == null) { zkServers = new ArrayList () { { add("192.168.50.144"); add("192.168.50.169"); add("192.168.50.207"); } }; // zkServers = // (List )conf.get(Config.STORM_ZOOKEEPER_SERVERS); System.out.println(" 使用的是Storm默認配置的Zookeeper List : " + zkServers); } Integer zkPort = _spoutConfig.zkPort; // 在這里我們也同時 來檢查zookeeper的端口是否為空 if (zkPort == null) { zkPort = 2181; // zkPort = ((Number) // conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); } Map stateConf = new HashMap(conf); stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers); stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort); stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot); // 通過保存的配置文件,我們持有了一個zookeeper的state,支持節(jié)點內(nèi)容的創(chuàng)建和刪除 _state = new ZkState(stateConf); // 對于連接的維護 _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig)); // using TransactionalState like this is a hack // 拿到總共的任務次數(shù) int totalTasks = context .getComponentTasks(context.getThisComponentId()).size(); // 判斷當前的主機是否是靜態(tài)的statichost if (_spoutConfig.hosts instanceof StaticHosts) { _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); // 當你拿到的spoutConfig是zkhost的時候 } else { _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); } context.registerMetric("kafkaOffset", new IMetric() { KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric( _spoutConfig.topic, _connections); @Override public Object getValueAndReset() { List pms = _coordinator .getMyManagedPartitions(); Set latestPartitions = new HashSet(); for (PartitionManager pm : pms) { latestPartitions.add(pm.getPartition()); } _kafkaOffsetMetric.refreshPartitions(latestPartitions); for (PartitionManager pm : pms) { _kafkaOffsetMetric.setLatestEmittedOffset( pm.getPartition(), pm.lastCompletedOffset()); } return _kafkaOffsetMetric.getValueAndReset(); } }, _spoutConfig.metricsTimeBucketSizeInSecs); context.registerMetric("kafkaPartition", new IMetric() { @Override public Object getValueAndReset() { List pms = _coordinator .getMyManagedPartitions(); Map concatMetricsDataMaps = new HashMap(); for (PartitionManager pm : pms) { concatMetricsDataMaps.putAll(pm.getMetricsDataMap()); } return concatMetricsDataMaps; } }, _spoutConfig.metricsTimeBucketSizeInSecs); } @Override public void close() { _state.close(); } @Override public void nextTuple() { // Storm-spout 是從kafka 消費數(shù)據(jù),把 kafka 的 consumer // 當成是一個spout,并且向其他的bolt的發(fā)送數(shù)據(jù) // 拿到當前我管理的這些PartitionsManager List managers = _coordinator.getMyManagedPartitions(); for (int i = 0; i < managers.size(); i++) { // 對于每一個分區(qū)的 PartitionManager // in case the number of managers decreased // 當前的分區(qū) _currPartitionIndex = _currPartitionIndex % managers.size(); // 拿到當前的分區(qū),并且發(fā)送,這里把SpoutOutputCollector傳遞進去了,由他發(fā)射元祖 EmitState state = managers.get(_currPartitionIndex) .next(_collector); // 如果發(fā)送狀態(tài)為:發(fā)送-還有剩余 if (state != EmitState.EMITTED_MORE_LEFT) { _currPartitionIndex = (_currPartitionIndex + 1) % managers.size(); } // 如果發(fā)送的狀態(tài)為: 發(fā)送-沒有剩余 if (state != EmitState.NO_EMITTED) { break; } } long now = System.currentTimeMillis(); if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) { commit(); } } @Override public void ack(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.ack(id.offset); } } @Override public void fail(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.fail(id.offset); } } @Override public void deactivate() { // 停止工作 commit(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { System.out.println(_spoutConfig.scheme.getOutputFields()); declarer.declare(_spoutConfig.scheme.getOutputFields()); } private void commit() { _lastUpdateMs = System.currentTimeMillis(); for (PartitionManager manager : _coordinator.getMyManagedPartitions()) { manager.commit(); } } }
在粗淺的代碼閱讀之后,在這里進行詳細的分析:
1 KafkaSpout之中持有了一個 MessageAndRealOffset 的內(nèi)部類
public static class MessageAndRealOffset { public Message msg; public long offset; public MessageAndRealOffset(Message msg,long offset) { this.msg = msg; this.offset = offset; } }
2 在Spout之中我們還持有了一個PartitionCoordinator的分區(qū)協(xié)調(diào)器,默認的情況我們實例化的對象
是ZKCoordinator
關于Storm如何和Kafka進行整合就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
新聞名稱:Storm如何和Kafka進行整合
標題路徑:http://www.dlmjj.cn/article/ijpjoi.html