新聞中心
前面我們了解了 Dapr 對(duì)發(fā)布訂閱的支持,本節(jié)我們將來(lái)介紹了 Dapr 中對(duì)消息隊(duì)列的支持。消息隊(duì)列,分為兩種綁定,一種是輸出綁定,一種是輸入綁定。出和入是看數(shù)據(jù)的流向,輸出綁定就是作為生產(chǎn)者的服務(wù)把消息通過(guò) Dapr 傳給消息隊(duì)列,輸入綁定就是作為消費(fèi)者的服務(wù)通過(guò) Dapr 從消息隊(duì)列里得到消息。

成都創(chuàng)新互聯(lián)致力于網(wǎng)站制作、成都網(wǎng)站制作,成都網(wǎng)站設(shè)計(jì),集團(tuán)網(wǎng)站建設(shè)等服務(wù)標(biāo)準(zhǔn)化,推過(guò)標(biāo)準(zhǔn)化降低中小企業(yè)的建站的成本,并持續(xù)提升建站的定制化服務(wù)水平進(jìn)行質(zhì)量交付,讓企業(yè)網(wǎng)站從市場(chǎng)競(jìng)爭(zhēng)中脫穎而出。 選擇成都創(chuàng)新互聯(lián),就選擇了安全、穩(wěn)定、美觀的網(wǎng)站建設(shè)服務(wù)!
這里的消息隊(duì)列和發(fā)布訂閱里的消息總線有什么區(qū)別呢?一個(gè)消息進(jìn)入消息總線的話,所有訂閱者都能得到這個(gè)消息,而一個(gè)消息進(jìn)入消息隊(duì)列的話,由消費(fèi)者來(lái)取,一次只有一個(gè)人能得到。此外,消息總線是不要求處理順序的,兩個(gè)消息進(jìn)入消息總線,誰(shuí)先被拿到順序是不一定的,而消息隊(duì)列可以保證是先入先出的。
本節(jié)我們將創(chuàng)建兩個(gè)微服務(wù),一個(gè)具有輸入綁定,另一個(gè)具有輸出綁定,前面我們都使用的 Redis 這種中間件,這里我們將綁定到 Kafka。
- Node.js 微服務(wù)使用輸入綁定
- Python 微服務(wù)利用輸出綁定
綁定連接到 Kafka,允許我們將消息推送到 Kafka 實(shí)例(從 Python 微服務(wù))中,并從該實(shí)例(從 Node.js 微服務(wù))接收消息,而不必知道實(shí)例的位置。相反,同樣只需要直接使用 Dapr API 通過(guò) sidecars 連接即可。
本地運(yùn)行
首先我們?cè)诒镜貋?lái)運(yùn)行示例應(yīng)用,對(duì)應(yīng)的架構(gòu)圖如下所示:
Bindings 本地模式
同樣使用 quickstarts 這個(gè)代碼倉(cāng)庫(kù):
git clone [-b] https://github.com/dapr/quickstarts.git
由于我們這里是使用 Kafka 來(lái)做消息隊(duì)列的中間件,所以我們首先需要在本地環(huán)境運(yùn)行 Kafka,我們可以直接使用 https://github.com/wurstmeister/kafka-docker 這個(gè)項(xiàng)目以 Docker 方式運(yùn)行。
定位到 quickstarts 的 tutorials/bindings 目錄,下面有一個(gè) docker-compose-single-kafka.yml 文件:
$ cd tutorials/bindings
$ cat docker-compose-single-kafka.yml
version: '2'
services:
zookeeper:
image: ghcr.io/dapr/3rdparty/zookeeper:latest
ports:
- "2181:2181"
kafka:
image: ghcr.io/dapr/3rdparty/kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_CREATE_TOPICS: "sample:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
我們可以直接而使用 docker-compose 來(lái)啟動(dòng)一個(gè)單實(shí)例的 Kafka:
$ docker-compose -f ./docker-compose-single-kafka.yml up -d
隔一段時(shí)間鏡像拉取完成后以容器方式啟動(dòng) Kafka:
$ docker-compose -f ./docker-compose-single-kafka.yml ps
NAME COMMAND SERVICE STATUS PORTS
bindings-kafka-1 "start-kafka.sh" kafka running 0.0.0.0:9092->9092/tcp
bindings-zookeeper-1 "/bin/sh -c '/usr/sb…" zookeeper running 0.0.0.0:2181->2181/tcp
在本地運(yùn)行了 Kafka 后,接著我們可以運(yùn)行輸入綁定的 Node.js 微服務(wù):
$ cd nodeapp
同樣先安裝服務(wù)依賴:
$ npm install # 或者執(zhí)行 yarn 命令
然后我們就可以使用 dapr run 命令來(lái)啟動(dòng)該微服務(wù)了,啟動(dòng)方式我們應(yīng)該比較熟悉了,如下所示:
$ dapr run --app-id bindings-nodeapp --app-port 3000 node app.js --components-path ../components
上面的命令和前面有點(diǎn)不一樣的地方是多了一個(gè) --components-path 用來(lái)指定組件路徑,這是因?yàn)楝F(xiàn)在我們要使用 Kafka 這種中間件來(lái)作為我們的消息隊(duì)列組件,那么我們就需要告訴 Dapr,在 ./components 目錄下面就包含一個(gè)對(duì)應(yīng)的 kafka_bindings.yaml 文件,內(nèi)容如下所示:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: sample-topic
spec:
type: bindings.kafka
version: v1
metadata:
# Kafka broker connection setting
- name: brokers
value: localhost:9092
# consumer configuration: topic and consumer group
- name: topics
value: sample
- name: consumerGroup
value: group1
# publisher configuration: topic
- name: publishTopic
value: sample
- name: authRequired
value: "false"
前面在本地模式下面我們沒(méi)有主動(dòng)聲明組件,是因?yàn)槲覀兪褂玫木褪悄J(rèn)的 Redis,而 Kafka 并不是內(nèi)置就有的,所以需要我們主動(dòng)聲明,注意上面組件的類型為 type: bindings.kafka,metadata 下面是訪問(wèn) Kafka 相關(guān)的元數(shù)據(jù)。正常情況下上面的啟動(dòng)命令會(huì)輸出如下所示的日志信息:
?? Starting Dapr with id bindings-nodeapp. HTTP Port: 54215. gRPC Port: 54216
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93 app_id=bindings-nodeapp instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
INFO[0000] dapr initialized. Status: Running. Init Elapsed 347.136ms app_id=bindings-nodeapp instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
?? Updating metadata for app command: node app.js
You're up and running! Both Dapr and your app logs will appear here.
INFO[0001] placement tables updated, version: 0 app_id=bindings-nodeapp instance=MBP2022.local scope=dapr.runtime.actor.internal.placement type=log ver=1.8.4
接下來(lái),需要運(yùn)行輸出綁定的 Python 微服務(wù),定位到 pythonapp? 目錄,安裝 requests 依賴:
$ cd pythonapp
$ pip3 install requests
然后同樣用 dapr run 命令來(lái)啟動(dòng)該微服務(wù),也要注意帶上后面的 --components-path 參數(shù):
$ dapr run --app-id bindings-pythonapp python3 app.py --components-path ../components
?? Starting Dapr with id bindings-pythonapp. HTTP Port: 54554. gRPC Port: 54555
?? Checking if Dapr sidecar is listening on HTTP port 54554
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93 app_id=bindings-pythonapp instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
?? Checking if Dapr sidecar is listening on GRPC port 54555
?? Dapr sidecar is up and running.
?? Updating metadata for app command: python3 app.py
You're up and running! Both Dapr and your app logs will appear here.
啟動(dòng)完成后,觀察 Python 服務(wù)的日志,可以看到不斷輸出如下所示成功輸出綁定到 Kafka 的日志:
== APP == {'data': {'orderId': 1}, 'operation': 'create'}
== APP ==
== APP == {'data': {'orderId': 2}, 'operation': 'create'}
== APP ==
== APP == {'data': {'orderId': 3}, 'operation': 'create'}
== APP ==
# ......同樣這個(gè)時(shí)候 Node.js 微服務(wù)中也不斷有新的日志數(shù)據(jù)產(chǎn)生:
== APP ==
== APP == {'data': {'orderId': 1}, 'operation': 'create'}
== APP ==
== APP == {'data': {'orderId': 2}, 'operation': 'create'}
== APP ==
== APP == {'data': {'orderId': 3}, 'operation': 'create'}
== APP ==
# ......
這是因?yàn)?Python 微服務(wù)每隔 1s 就會(huì)向我們綁定的消息隊(duì)列發(fā)送一條消息,而 Node.js 微服務(wù)作為消費(fèi)者當(dāng)然會(huì)接收到對(duì)應(yīng)的消息數(shù)據(jù)。
在 Kubernetes 中運(yùn)行
上面在本地環(huán)境下可以正常運(yùn)行 Dapr bindings 服務(wù),接下來(lái)我們?cè)俅螌⒃撌纠渴鸬?Kubernetes 集群中來(lái)進(jìn)行觀察。
同樣首先需要提供一個(gè)可用的 Kafka 實(shí)例,這里我們?nèi)匀皇褂?Helm Chart 方式來(lái)進(jìn)行安裝:
$ helm repo add bitnami https://charts.bitnami.com/bitnami
$ helm repo update
然后使用如下所示的命令來(lái)安裝 Kafka:
$ helm upgrade --install dapr-kafka bitnami/kafka --wait --namespace kafka -f ./kafka-non-persistence.yaml --create-namespace
這里我們指定了一個(gè)無(wú)需持久化數(shù)據(jù)(僅供測(cè)試)的 values 文件 kafka-non-persistence.yaml,內(nèi)容如下所示:
replicas: 1
# Disable persistent storage
persistence:
enabled: false
zookeeper:
persistence:
enabled: false
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/os
operator: In
values:
- linux
- key: kubernetes.io/arch
operator: In
values:
- amd64
autoCreateTopicsEnable: true
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/os
operator: In
values:
- linux
- key: kubernetes.io/arch
operator: In
values:
- amd64
安裝完成后可以查看 Pod 的狀態(tài)來(lái)保證 Kafka 啟動(dòng)成功:
$ kubectl -n kafka get pods -w
NAME READY STATUS RESTARTS AGE
dapr-kafka-0 1/1 Running 0 2m7s
dapr-kafka-zookeeper-0 1/1 Running 0 2m57s
接下來(lái)我們首先需要在 Kubernetes 集群中配置使用 Kafka 作為 Binding 消息中間件的 Component 組件:
# kafka_bindings.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: sample-topic
spec:
type: bindings.kafka
version: v1
metadata:
# Kafka broker connection setting
- name: brokers
value: dapr-kafka.kafka:9092
# consumer configuration: topic and consumer group
- name: topics
value: sample
- name: consumerGroup
value: group1
# publisher configuration: topic
- name: publishTopic
value: sample
- name: authRequired
value: "false"
注意該對(duì)象上面指定的組件類型為 bindings.kafka,metadata 下面的元信息包括 Kafka brokers地址、生產(chǎn)者和消費(fèi)者的配置等等,直接應(yīng)用上面的資源清單即可:
$ kubectl apply -f kafka_bindings.yaml
$ kubectl get components sample-topic
NAME AGE
sample-topic 13s
創(chuàng)建完成后在 Dapr Dashboard 中也可以看到對(duì)應(yīng)的組件信息:
dapr dashboard components。
接著部署兩個(gè) Node.js 和 Python 微服務(wù)即可:
$ kubectl apply -f deploy/node.yaml
service/bindings-nodeapp created
deployment.apps/bindings-nodeapp created
$ kubectl apply -f deploy/python.yaml
deployment.apps/bindings-pythonapp created
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
bindings-nodeapp-8bcdd744d-pj2j7 2/2 Running 0 3m44s
bindings-pythonapp-7b7fcc579b-kqx6p 2/2 Running 0 3m39s
部署完成后可以同樣分別觀察 Node.js 和 Python 微服務(wù)的日志:
$ kubectl logs --selector app=bindingspythonapp -c python --tail=-1
{'data': {'orderId': 1}, 'operation': 'create'}
HTTPConnectionPool(host='localhost', port=3500): Max retries exceeded with url: /v1.0/bindings/sample-topic (Caused by NewConnectionError(': Failed to establish a new connection: [Errno 111] Connection refused'))
{'data': {'orderId': 2}, 'operation': 'create'}
{'data': {'orderId': 3}, 'operation': 'create'}
# ......
$ kubectl logs --selector app=bindingsnodeapp -c node --tail=-1
Node App listening on port 3000!
Hello from Kafka!
{ orderId: 2 }
Hello from Kafka!
{ orderId: 3 }
# ......
可以看到兩個(gè)微服務(wù)的日志也服務(wù)我們的預(yù)期的。
如何工作
前面我們?cè)诒镜鼗?Kubernetes 中都運(yùn)行了示例應(yīng)用,而且沒(méi)有更改任何代碼,應(yīng)用結(jié)果都符合預(yù)期,接下來(lái)我們看看這是如何工作的。
在查看應(yīng)用程序代碼之前,我們先看看 Kafka 綁定組件的資源清單文件,它們?yōu)?Kafka 連接指定 brokers,為消費(fèi)者指定 topics 和 consumerGroup,為生產(chǎn)者指定了 publishTopic。
我們創(chuàng)建了名為 sample-topic 的組件,然后我們通過(guò)該組件配置的 Kafka 中的 sample 主題來(lái)設(shè)置輸入和輸出綁定。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: sample-topic
spec:
type: bindings.kafka
version: v1
metadata:
# Kafka broker connection setting
- name: brokers
value: [kafka broker address]
# consumer configuration: topic and consumer group
- name: topics
value: sample
- name: consumerGroup
value: group1
# publisher configuration: topic
- name: publishTopic
value: sample
- name: authRequired
value: "false"
現(xiàn)在我們先導(dǎo)航到 nodeapp? 目錄下面打開(kāi) app.js? 文件,這是 Node.js 輸入綁定示例應(yīng)用的代碼。這里使用 Express? 暴露了一個(gè) API 端點(diǎn),需要注意的是 API 名稱必須與在 Kafka 綁定組件中聲明的組件名稱相同,然后 Dapr 運(yùn)行時(shí)將使用來(lái)自 sample 主題的事件,然后將 POST 請(qǐng)求與事件負(fù)載一起發(fā)送給 Node 應(yīng)用程序。
const express = require("express");
const bodyParser = require("body-parser");
const port = process.env.APP_PORT ?? "3000";
require("isomorphic-fetch");
const app = express();
app.use(bodyParser.json());
// 這里的 api 端點(diǎn)需要與聲明的組件名稱相同
app.post("/sample-topic", (req, res) => {
console.log("Hello from Kafka!");
console.log(req.body);
res.status(200).send();
});
app.listen(port, () => console.log(`Node App listening on port ${port}!`));所以當(dāng) Kafka 中收到消息后就會(huì)打印類似如下所示的日志:
Hello from Kafka!
{ orderId: 3 }
然后我們導(dǎo)航到 pythonapp 目錄下面打開(kāi) app.py 文件,這是輸出綁定示例(生產(chǎn)者)應(yīng)用程序的代碼,該服務(wù)會(huì)每秒發(fā)送一次 POST 請(qǐng)求到 Dapr 的 http 端點(diǎn)的 http://localhost:3500/v1.0/bindings/
import time
import requests
import os
dapr_port = os.getenv("DAPR_HTTP_PORT", 3500)
dapr_url = "http://localhost:{}/v1.0/bindings/sample-topic".format(dapr_port)
n = 0
while True:
n += 1
payload = { "data": {"orderId": n}, "operation": "create" }
print(payload, flush=True)
try:
response = requests.post(dapr_url, json=payload)
print(response, flush=True)
except Exception as e:
print(e, flush=True)
time.sleep(1)
上面代碼中最重要的依然是 Dapr API 地址 dapr_url 的拼接 "http://localhost:{}/v1.0/bindings/sample-topic".format(dapr_port),注意我們依然是面向 localhost 編程,而 v1.0/bindings/
本文題目:Dapr入門(mén)教程之消息隊(duì)列
本文來(lái)源:http://www.dlmjj.cn/article/dpgisih.html


咨詢
建站咨詢
