日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
微服務(wù)之服務(wù)注冊(cè)和服務(wù)發(fā)現(xiàn)篇

RPC 配置

Etcd:
Hosts:
- 127.0.0.1:2379
Key: user.rpc

這里分析go-zero 的etcd 部分源碼, 源碼引用https://github.com/zeromicro/go-zero-demo/tree/master/mall

被調(diào)方-服務(wù)注冊(cè)

  • mall/user/rpc/user.go 源碼如下
package main

import (
"flag"
"fmt"

"go-zero-demo-rpc/mall/user/rpc/internal/config"
"go-zero-demo-rpc/mall/user/rpc/internal/server"
"go-zero-demo-rpc/mall/user/rpc/internal/svc"
"go-zero-demo-rpc/mall/user/rpc/types/user"

"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/zrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

var configFile = flag.String("f", "etc/user.yaml", "the config file")

func main() {
flag.Parse()

var c config.Config
conf.MustLoad(*configFile, &c)
ctx := svc.NewServiceContext(c)
svr := server.NewUserServer(ctx)

s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
user.RegisterUserServer(grpcServer, svr)

if c.Mode == service.DevMode || c.Mode == service.TestMode {
reflection.Register(grpcServer)
}
})
defer s.Stop()

fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
s.Start()
}
  • MustNewServer 內(nèi)部實(shí)現(xiàn)調(diào)用了NewServer 方法, 這里我們關(guān)注NewServer 通過internal.NewRpcPubServer 方法實(shí)例化了internal.Server
if c.HasEtcd() {
server, err = internal.NewRpcPubServer(c.Etcd, c.ListenOn, serverOptions...)
if err != nil {
return nil, err
}
}
  • internal.NewRpcPubServer 中的registerEtcd 會(huì)調(diào)用Publisher.KeepAlive 方法
// KeepAlive keeps key:value alive.
func (p *Publisher) KeepAlive() error {
// 這里獲取 etcd 的連接
cli, err := internal.GetRegistry().GetConn(p.endpoints)
if err != nil {
return err
}

p.lease, err = p.register(cli)
if err != nil {
return err
}

proc.AddWrapUpListener(func() {
p.Stop()
})

return p.keepAliveAsync(cli)
}
  • p.register 這里把自己注冊(cè)到服務(wù)中
func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {

// 這里新建一個(gè)租約
resp, err := client.Grant(client.Ctx(), TimeToLive)
if err != nil {
return clientv3.NoLease, err
}

// 得到租約的 ID
lease := resp.ID

// 這里拼接出實(shí)際存儲(chǔ)的 key
if p.id > 0 {
p.fullKey = makeEtcdKey(p.key, p.id)
} else {
p.fullKey = makeEtcdKey(p.key, int64(lease))
}

// p.value 是前面的 figureOutListenOn 方法獲取到自己的地址
_, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))

return lease, err
}
  • 注冊(cè)完之后,keepAliveAsync 開了一個(gè)協(xié)程保活這個(gè)服務(wù)
  • 當(dāng)這個(gè)服務(wù)意外宕機(jī)時(shí), 就不會(huì)再向etcd ?;?etcd 就會(huì)刪除這個(gè)key
  • 注冊(cè)好的服務(wù)如圖

1.png

調(diào)用方-服務(wù)發(fā)現(xiàn)

  • order/api/order.go 源碼如下
package main

import (
"flag"
"fmt"

"go-zero-demo-rpc/order/api/internal/config"
"go-zero-demo-rpc/order/api/internal/handler"
"go-zero-demo-rpc/order/api/internal/svc"

"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/rest"
)

var configFile = flag.String("f", "etc/order.yaml", "the config file")

func main() {
flag.Parse()

var c config.Config
conf.MustLoad(*configFile, &c)

server := rest.MustNewServer(c.RestConf)
defer server.Stop()

ctx := svc.NewServiceContext(c)
handler.RegisterHandlers(server, ctx)

fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
server.Start()
}
  • 在svc.NewServiceContext 方法內(nèi)部又調(diào)用了zrpc.MustNewClient ,zrpc.MustNewClient 主要實(shí)現(xiàn)在zrpc.NewClient
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
UserRpc: user.NewUser(zrpc.MustNewClient(c.UserRpc)),
}
}
  • 最后實(shí)際調(diào)用了internal.NewClient 去實(shí)例化rpc client
func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {
var opts []ClientOption
if c.HasCredential() {
opts = append(opts, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{
App: c.App,
Token: c.Token,
})))
}
if c.NonBlock {
opts = append(opts, WithNonBlock())
}
if c.Timeout > 0 {
opts = append(opts, WithTimeout(time.Duration(c.Timeout)*time.Millisecond))
}

opts = append(opts, options...)

target, err := c.BuildTarget()
if err != nil {
return nil, err
}

client, err := internal.NewClient(target, opts...)
if err != nil {
return nil, err
}

return &RpcClient{
client: client,
}, nil
}
  • 在zrpc/internal/client.go 文件里, 包含一個(gè)init 方法, 這里就是實(shí)際發(fā)現(xiàn)服務(wù)的地方, 在這里注冊(cè)服務(wù)發(fā)現(xiàn)者
func init() {
resolver.Register()
}
  • resolver.Register 方法實(shí)現(xiàn)
package resolver

import (
"github.com/zeromicro/go-zero/zrpc/resolver/internal"
)

// Register registers schemes defined zrpc.
// Keep it in a separated package to let third party register manually.
func Register() {
internal.RegisterResolver()
}
  • 最后又回到interval 包的internal.RegisterResolver 方法, 這里我們關(guān)注etcdResolverBuilder
func RegisterResolver() {
resolver.Register(&directResolverBuilder)
resolver.Register(&discovResolverBuilder)
resolver.Register(&etcdResolverBuilder)
resolver.Register(&k8sResolverBuilder)
}
  • etcdBuilder 的內(nèi)嵌了discovBuilder 結(jié)構(gòu)體,
  • Build 方法調(diào)用過程:
  • 實(shí)例化服務(wù)端:internal.NewClient ->client.dial ->grpc.DialContext
  • 由于etcd 是resolver.BuildDiscovTarget 生成的taget 所以是類似這樣子的:discov://127.0.0.1:2379/user.rpc
  • 解析服務(wù)發(fā)現(xiàn):ClientConn.parseTargetAndFindResolver ->grpc.parseTarget ->ClientConn.getResolver
  • 然后在grpc.newCCResolverWrapper 調(diào)用resolver.Builder.Build 方法去發(fā)現(xiàn)服務(wù)
  • 我們著重關(guān)注discovBuilder.Build 方法
type etcdBuilder struct {
discovBuilder
}


type discovBuilder struct{}

func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (
resolver.Resolver, error) {
hosts := strings.FieldsFunc(targets.GetAuthority(target), func(r rune) bool {
return r == EndpointSepChar
})
sub, err := discov.NewSubscriber(hosts, targets.GetEndpoints(target))
if err != nil {
return nil, err
}

update := func() {
var addrs []resolver.Address
for _, val := range subset(sub.Values(), subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: val,
})
}
if err := cc.UpdateState(resolver.State{
Addresses: addrs,
}); err != nil {
logx.Error(err)
}
}
sub.AddListener(update)
update()

return &nopResolver{cc: cc}, nil
}

func (b *discovBuilder) Scheme() string {
return DiscovScheme
}
  • discov.NewSubscriber 方法調(diào)用internal.GetRegistry().Monitor 最后調(diào)用Registry.monitor 方法進(jìn)行監(jiān)視

cluster.getClient 拿到etcd 連接

cluster.load 作為第一次載入數(shù)據(jù)

cluster.watch 去watch 監(jiān)聽etcd 前綴key 的改動(dòng)

func (c *cluster) monitor(key string, l UpdateListener) error {
c.lock.Lock()
c.listeners[key] = append(c.listeners[key], l)
c.lock.Unlock()

cli, err := c.getClient()
if err != nil {
return err
}

c.load(cli, key)
c.watchGroup.Run(func() {
c.watch(cli, key)
})

return nil
}
  • 如下圖是cluster.load 的實(shí)現(xiàn), 就是根據(jù)前綴拿到user.prc 服務(wù)注冊(cè)的所有地址

2.png

Q

為什么不用Redis 做注冊(cè)中心(反正只是把被調(diào)方的地址存儲(chǔ), 過期Redis 也能勝任), 找了很久找到這個(gè)說法

簡(jiǎn)單從以下幾個(gè)方面說一下瑞迪斯為啥在微服務(wù)中不能取代 etcd:

1、redis 沒有版本的概念,歷史版本數(shù)據(jù)在大規(guī)模微服務(wù)中非常有必要,對(duì)于狀態(tài)回滾和故障排查,甚至定鍋都很重要

2、redis 的注冊(cè)和發(fā)現(xiàn)目前只能通過 pub 和 sub 來實(shí)現(xiàn),這兩個(gè)命令完全不能滿足生產(chǎn)環(huán)境的要求,具體原因可以 gg 或看源碼實(shí)現(xiàn)

3、etcd 在 2.+版本時(shí),watch 到數(shù)據(jù)官方文檔均建議再 get 一次,因?yàn)闀?huì)存在數(shù)據(jù)延遲,3.+版本不再需要,可想 redis 的 pub 和 sub 能否達(dá)到此種低延遲的要求

4、樓主看到的微服務(wù)架構(gòu)應(yīng)該都是將 etcd 直接暴露給 client 和 server 的,etcd 的性能擺在那,能夠承受多少的 c/s 直連呢,更好的做法應(yīng)該是對(duì) etcd 做一層保護(hù),當(dāng)然這種做法會(huì)損失一些功能

5、redis 和 etcd 的集群實(shí)現(xiàn)方案是不一致的,etcd 采用的是 raft 協(xié)議,一主多從,只能寫主,底層采用 boltdb 作為 k/v 存儲(chǔ),直接落盤

6、redis 的持久化方案有 aof 和 rdb,這兩種方案在宕機(jī)的時(shí)候都或多或少的會(huì)丟失數(shù)據(jù)

引用自https://www.v2ex.com/t/520367


標(biāo)題名稱:微服務(wù)之服務(wù)注冊(cè)和服務(wù)發(fā)現(xiàn)篇
文章源于:http://www.dlmjj.cn/article/djieods.html