日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
一篇帶給你跨數(shù)據(jù)源實(shí)現(xiàn)數(shù)據(jù)同步

場(chǎng)景

在微服務(wù)拆分的架構(gòu)中,各服務(wù)擁有自己的數(shù)據(jù)庫(kù),所以常常會(huì)遇到服務(wù)之間數(shù)據(jù)通信的問(wèn)題。比如,B服務(wù)數(shù)據(jù)庫(kù)的數(shù)據(jù)來(lái)源于A服務(wù)的數(shù)據(jù)庫(kù);A服務(wù)的數(shù)據(jù)有變更操作時(shí),需要同步到B服務(wù)中。

解決方案

1、 在代碼邏輯中,有相關(guān)A服務(wù)數(shù)據(jù)寫操作時(shí),以調(diào)用接口的方式,調(diào)用B服務(wù)接口,B服務(wù)再將數(shù)據(jù)寫到新的數(shù)據(jù)庫(kù)中。這種方式看似簡(jiǎn)單,但其實(shí)“坑”很多。在A服務(wù)代碼邏輯中會(huì)增加大量這種調(diào)用接口同步的代碼,增加了項(xiàng)目代碼的復(fù)雜度,以后會(huì)越來(lái)越難維護(hù)。并且,接口調(diào)用的方式并不是一個(gè)穩(wěn)定的方式,沒(méi)有重試機(jī)制,沒(méi)有同步位置記錄,接口調(diào)用失敗了怎么處理,突然的大量接口調(diào)用會(huì)產(chǎn)生的問(wèn)題等,這些都要考慮并且在業(yè)務(wù)中處理。這里會(huì)有不少工作量。想到這里,就將這個(gè)方案排除了。

2、通過(guò)數(shù)據(jù)庫(kù)的binlog進(jìn)行同步。這種解決方案,與A服務(wù)是獨(dú)立的,不會(huì)和A服務(wù)有代碼上的耦合??梢灾苯覶CP連接進(jìn)行傳輸數(shù)據(jù),優(yōu)于接口調(diào)用的方式。 這是一套成熟的生產(chǎn)解決方案,也有不少binlog同步的中間件工具,所以我們關(guān)注的就是哪個(gè)工具能夠更好的構(gòu)建穩(wěn)定、性能滿足且易于高可用部署的方案。

經(jīng)過(guò)調(diào)研,我們選擇了canal[

https://github.com/alibaba/canal]。canal是阿里巴巴 MySQL binlog 增量訂閱&消費(fèi)組件,已經(jīng)有在生產(chǎn)上實(shí)踐的例子,并且方便的支持和其他常用的中間件組件組合,比如kafka,elasticsearch等,也有了canal-go go語(yǔ)言的client庫(kù),滿足我們?cè)趃o上的需求,其他具體內(nèi)容參閱canal的github主頁(yè)。

原理簡(jiǎn)圖

工作流程

1.Canal連接到A數(shù)據(jù)庫(kù),模擬slave

2.canal-client與Canal建立連接,并訂閱對(duì)應(yīng)的數(shù)據(jù)庫(kù)表

3.A數(shù)據(jù)庫(kù)發(fā)生變更寫入到binlog,Canal向數(shù)據(jù)庫(kù)發(fā)送dump請(qǐng)求,獲取binlog并解析,發(fā)送解析后的數(shù)據(jù)給canal-client

4.canal-client收到數(shù)據(jù),將數(shù)據(jù)同步到新的數(shù)據(jù)庫(kù)

安裝canal

下載canal

修改配置/conf/canal.properties

# ...
# 可選項(xiàng): tcp(默認(rèn)), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:9002
canal.mq.retries = 0
# flagMessage模式下可以調(diào)大該值, 但不要超過(guò)MQ消息體大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下請(qǐng)將該值改大, 建議50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默認(rèn)50K, 由于kafka最大消息體限制請(qǐng)勿超過(guò)1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get數(shù)據(jù)的超時(shí)時(shí)間, 單位: 毫秒, 空為不限超時(shí)
canal.mq.canalGetTimeout = 100
# 是否為flat json格式對(duì)象
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投遞是否使用事務(wù)
canal.mq.transaction = false

# mq config
canal.mq.topic=default
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.dynamicTopic=mydatabase.mytable
canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=3
#解決消費(fèi)順序問(wèn)題
canal.mq.partitionHash=mydatabase.mytable

然后配置instance,找到

/conf/example/instance.properties配置文件:

## mysql serverId , v1.0.26+ will autoGen(自動(dòng)生成,不需配置)
# canal.instance.mysql.slaveId=0

# position info
canal.instance.master.address=127.0.0.1:3306
# 在Mysql執(zhí)行 SHOW MASTER STATUS;查看當(dāng)前數(shù)據(jù)庫(kù)的binlog
canal.instance.master.journal.name=mysql-bin.000006
canal.instance.master.position=4596
# 賬號(hào)密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@****
canal.instance.connectionCharset = UTF-8
#MQ隊(duì)列名稱
canal.mq.topic=canaltopic
#單隊(duì)列模式的分區(qū)下標(biāo)
canal.mq.partition=0

啟動(dòng)zookeeper和kafka

zookeeper-server-start.bat ../../config/zookeeper.properties
kafka-server-start.bat ../../config/server.properties

啟動(dòng) canal

canal/bin/start.bat

編寫讀取消息的相關(guān)代碼

kafka相關(guān)配置

kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
# 發(fā)生錯(cuò)誤后,消息重發(fā)的次數(shù)。
retries: 0
#當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)把它們放在同一個(gè)批次里。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算。
batch-size: 16384
# 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小。
buffer-memory: 33554432
# 鍵的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生產(chǎn)者在成功寫入消息之前不會(huì)等待任何來(lái)自服務(wù)器的響應(yīng)。
# acks=1 : 只要集群的首領(lǐng)節(jié)點(diǎn)收到消息,生產(chǎn)者就會(huì)收到一個(gè)來(lái)自服務(wù)器成功響應(yīng)。
# acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)全部收到消息時(shí),生產(chǎn)者才會(huì)收到一個(gè)來(lái)自服務(wù)器的成功響應(yīng)。
acks: 1
consumer:
# 自動(dòng)提交的時(shí)間間隔 在spring boot 2.X 版本中這里采用的是值的類型為Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1
# 該屬性指定了消費(fèi)者在讀取一個(gè)沒(méi)有偏移量的分區(qū)或者偏移量無(wú)效的情況下該作何處理:
# latest(默認(rèn)值)在偏移量無(wú)效的情況下,消費(fèi)者將從最新的記錄開始讀取數(shù)據(jù)(在消費(fèi)者啟動(dòng)之后生成的記錄)
# earliest :在偏移量無(wú)效的情況下,消費(fèi)者將從起始位置讀取分區(qū)的記錄
auto-offset-reset: earliest
# 是否自動(dòng)提交偏移量,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為false,然后手動(dòng)提交偏移量
enable-auto-commit: false
# 鍵的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在偵聽器容器中運(yùn)行的線程數(shù)。
concurrency: 5
#listner負(fù)責(zé)ack,每調(diào)用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.retries}")
private String retries;

@Value("${spring.kafka.producer.batch-size}")
private Integer batchSize;

@Value("${spring.kafka.producer.buffer-memory}")
private Integer bufferMemory;

/**
* 生產(chǎn)者配置信息
*/
@Bean
public Map producerConfigs() {


Map props = new HashMap();
//重試,0為不啟用重試機(jī)制
props.put(ProducerConfig.ACKS_CONFIG, "all");
//連接地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, 2);
//控制批處理大小,單位為字節(jié)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
//批量發(fā)送,延遲為1毫秒,啟用該功能能有效減少生產(chǎn)者發(fā)送消息次數(shù),從而提高并發(fā)量
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//生產(chǎn)者可以使用的總內(nèi)存字節(jié)來(lái)緩沖等待發(fā)送到服務(wù)器的記錄
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
//鍵的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//值的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return props;
}

/** kafka無(wú)事務(wù)模式
* @return
*/
/* @Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}*/

/**
* 開啟kafka事務(wù)
*
* @return
*/
@Bean
public ProducerFactory producerFactory() {
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs());

//在producerFactory中開啟事務(wù)功能
factory.transactionCapable();

//TransactionIdPrefix是用來(lái)生成Transactional.id的前綴
factory.setTransactionIdPrefix("tran-");
return factory;

}

@Bean
public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory);
return manager;
}

@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

讀取消息

/**
* 如何解決topic指定對(duì)應(yīng)的表(一個(gè)topic對(duì)應(yīng)一個(gè)表即可解決此問(wèn)題)
* @param record
* @param ack
* @param topic
*/
@KafkaListener(topics = KafkaConstants.CANAL_TOPIC, groupId = KafkaConstants.DISPATCH_GROUP)
public void canalConsumer(ConsumerRecord record, Acknowledgment ack,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
String messageStr = (String) message.get();
CanalDto canalDto = JSONObject.parseObject(messageStr, CanalDto.class);
LOGGER.info("canalConsumer 消費(fèi)了: Topic:{},Message:{}", topic, messageStr);
LOGGER.info(canalDto.toString());
boolean isDdl = canalDto.isDdl();
if(!isDdl){
String type = canalDto.getType();
List data = canalDto.getData();
if("INSERT".equals(type)){
mongodbBase.batchSave(data,OrderTbl.class);
}else if ("UPDATE".equals(type)) {

// mongodbBase.updateFirst();
}else {
//刪除語(yǔ)句
for (OrderTbl orderTbl : data) {
mongodbBase.remove(orderTbl);
}
}
}
ack.acknowledge();
}
}

canal實(shí)體信息

public class CanalDto implements Serializable {

private static final long serialVersionUID = 3652575521269639607L;
//數(shù)據(jù)
private List data;
//數(shù)據(jù)庫(kù)名稱
private String database;
private long es;
//遞增,從1開始
private int id;
//是否是DDL語(yǔ)句
private boolean isDdl;
//表結(jié)構(gòu)的字段類型
private MysqlType mysqlType;
//UPDATE語(yǔ)句,舊數(shù)據(jù)
private String old;
//主鍵名稱
private List pkNames;
//sql語(yǔ)句
private String sql;
private SqlTypeDto sqlType;
//表名
private String table;
private long ts;
//(新增)INSERT、(更新)UPDATE、(刪除)DELETE、(刪除表)ERASE等等
private String type;
}

本文標(biāo)題:一篇帶給你跨數(shù)據(jù)源實(shí)現(xiàn)數(shù)據(jù)同步
URL鏈接:http://www.dlmjj.cn/article/dhgeehj.html