新聞中心
2020-08-20:GO語(yǔ)言中的協(xié)程與Python中的協(xié)程的區(qū)別?
福哥答案2020-08-20:
創(chuàng)新互聯(lián)堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的朝陽(yáng)網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
1.golang的協(xié)程是基于gpm機(jī)制,是可以多核多線程的。Python的協(xié)程是eventloop模型(IO多路復(fù)用技術(shù))實(shí)現(xiàn),協(xié)程是嚴(yán)格的 1:N 關(guān)系,也就是一個(gè)線程對(duì)應(yīng)了多個(gè)協(xié)程。雖然可以實(shí)現(xiàn)異步I/O,但是不能有效利用多核(GIL)。
2.golang用go func。python用import asyncio,async/await表達(dá)式。
評(píng)論
Go語(yǔ)言——goroutine并發(fā)模型
參考:
Goroutine并發(fā)調(diào)度模型深度解析手?jǐn)]一個(gè)協(xié)程池
Golang 的 goroutine 是如何實(shí)現(xiàn)的?
Golang - 調(diào)度剖析【第二部分】
OS線程初始棧為2MB。Go語(yǔ)言中,每個(gè)goroutine采用動(dòng)態(tài)擴(kuò)容方式,初始2KB,按需增長(zhǎng),最大1G。此外GC會(huì)收縮棧空間。
BTW,增長(zhǎng)擴(kuò)容都是有代價(jià)的,需要copy數(shù)據(jù)到新的stack,所以初始2KB可能有些性能問(wèn)題。
更多關(guān)于stack的內(nèi)容,可以參見(jiàn)大佬的文章。 聊一聊goroutine stack
用戶線程的調(diào)度以及生命周期管理都是用戶層面,Go語(yǔ)言自己實(shí)現(xiàn)的,不借助OS系統(tǒng)調(diào)用,減少系統(tǒng)資源消耗。
Go語(yǔ)言采用兩級(jí)線程模型,即用戶線程與內(nèi)核線程KSE(kernel scheduling entity)是M:N的。最終goroutine還是會(huì)交給OS線程執(zhí)行,但是需要一個(gè)中介,提供上下文。這就是G-M-P模型
Go調(diào)度器有兩個(gè)不同的運(yùn)行隊(duì)列:
go1.10\src\runtime\runtime2.go
Go調(diào)度器根據(jù)事件進(jìn)行上下文切換。
調(diào)度的目的就是防止M堵塞,空閑,系統(tǒng)進(jìn)程切換。
詳見(jiàn) Golang - 調(diào)度剖析【第二部分】
Linux可以通過(guò)epoll實(shí)現(xiàn)網(wǎng)絡(luò)調(diào)用,統(tǒng)稱網(wǎng)絡(luò)輪詢器N(Net Poller)。
文件IO操作
上面都是防止M堵塞,任務(wù)竊取是防止M空閑
每個(gè)M都有一個(gè)特殊的G,g0。用于執(zhí)行調(diào)度,gc,棧管理等任務(wù),所以g0的棧稱為調(diào)度棧。g0的棧不會(huì)自動(dòng)增長(zhǎng),不會(huì)被gc,來(lái)自os線程的棧。
go1.10\src\runtime\proc.go
G沒(méi)辦法自己運(yùn)行,必須通過(guò)M運(yùn)行
M通過(guò)通過(guò)調(diào)度,執(zhí)行G
從M掛載P的runq中找到G,執(zhí)行G
為什么要使用 Go 語(yǔ)言?Go 語(yǔ)言的優(yōu)勢(shì)在哪里?
1、簡(jiǎn)單易學(xué)。
Go語(yǔ)言的作者本身就很懂C語(yǔ)言,所以同樣Go語(yǔ)言也會(huì)有C語(yǔ)言的基因,所以對(duì)于程序員來(lái)說(shuō),Go語(yǔ)言天生就會(huì)讓人很熟悉,容易上手。
2、并發(fā)性好。
Go語(yǔ)言天生支持并發(fā),可以充分利用多核,輕松地使用并發(fā)。 這是Go語(yǔ)言最大的特點(diǎn)。
描述
Go的語(yǔ)法接近C語(yǔ)言,但對(duì)于變量的聲明有所不同。Go支持垃圾回收功能。Go的并行模型是以東尼·霍爾的通信順序進(jìn)程(CSP)為基礎(chǔ),采取類似模型的其他語(yǔ)言包括Occam和Limbo,但它也具有Pi運(yùn)算的特征,比如通道傳輸。
在1.8版本中開(kāi)放插件(Plugin)的支持,這意味著現(xiàn)在能從Go中動(dòng)態(tài)加載部分函數(shù)。
與C++相比,Go并不包括如枚舉、異常處理、繼承、泛型、斷言、虛函數(shù)等功能,但增加了 切片(Slice) 型、并發(fā)、管道、垃圾回收、接口(Interface)等特性的語(yǔ)言級(jí)支持。
Golang kafka簡(jiǎn)述和操作(sarama同步異步和消費(fèi)組)
一、Kafka簡(jiǎn)述
1. 為什么需要用到消息隊(duì)列
異步:對(duì)比以前的串行同步方式來(lái)說(shuō),可以在同一時(shí)間做更多的事情,提高效率;
解耦:在耦合太高的場(chǎng)景,多個(gè)任務(wù)要對(duì)同一個(gè)數(shù)據(jù)進(jìn)行操作消費(fèi)的時(shí)候,會(huì)導(dǎo)致一個(gè)任務(wù)的處理因?yàn)榱硪粋€(gè)任務(wù)對(duì)數(shù)據(jù)的操作變得及其復(fù)雜。
緩沖:當(dāng)遇到突發(fā)大流量的時(shí)候,消息隊(duì)列可以先把所有消息有序保存起來(lái),避免直接作用于系統(tǒng)主體,系統(tǒng)主題始終以一個(gè)平穩(wěn)的速率去消費(fèi)這些消息。
2.為什么選擇kafka呢?
這沒(méi)有絕對(duì)的好壞,看個(gè)人需求來(lái)選擇,我這里就抄了一段他人總結(jié)的的優(yōu)缺點(diǎn),可見(jiàn)原文
kafka的優(yōu)點(diǎn):
1.支持多個(gè)生產(chǎn)者和消費(fèi)者2.支持broker的橫向拓展3.副本集機(jī)制,實(shí)現(xiàn)數(shù)據(jù)冗余,保證數(shù)據(jù)不丟失4.通過(guò)topic將數(shù)據(jù)進(jìn)行分類5.通過(guò)分批發(fā)送壓縮數(shù)據(jù)的方式,減少數(shù)據(jù)傳輸開(kāi)銷,提高吞高量6.支持多種模式的消息7.基于磁盤實(shí)現(xiàn)數(shù)據(jù)的持久化8.高性能的處理信息,在大數(shù)據(jù)的情況下,可以保證亞秒級(jí)的消息延遲9.一個(gè)消費(fèi)者可以支持多種topic的消息10.對(duì)CPU和內(nèi)存的消耗比較小11.對(duì)網(wǎng)絡(luò)開(kāi)銷也比較小12.支持跨數(shù)據(jù)中心的數(shù)據(jù)復(fù)制13.支持鏡像集群
kafka的缺點(diǎn):
1.由于是批量發(fā)送,所以數(shù)據(jù)達(dá)不到真正的實(shí)時(shí)2.對(duì)于mqtt協(xié)議不支持3.不支持物聯(lián)網(wǎng)傳感數(shù)據(jù)直接接入4.只能支持統(tǒng)一分區(qū)內(nèi)消息有序,無(wú)法實(shí)現(xiàn)全局消息有序5.監(jiān)控不完善,需要安裝插件6.需要配合zookeeper進(jìn)行元數(shù)據(jù)管理7.會(huì)丟失數(shù)據(jù),并且不支持事務(wù)8.可能會(huì)重復(fù)消費(fèi)數(shù)據(jù),消息會(huì)亂序,可用保證一個(gè)固定的partition內(nèi)部的消息是有序的,但是一個(gè)topic有多個(gè)partition的話,就不能保證有序了,需要zookeeper的支持,topic一般需要人工創(chuàng)建,部署和維護(hù)一般都比mq高
3. Golang 操作kafka
3.1. kafka的環(huán)境
網(wǎng)上有很多搭建kafka環(huán)境教程,這里就不再搭建,就展示一下kafka的環(huán)境,在kubernetes上進(jìn)行的搭建,有需要的私我,可以發(fā)yaml文件
3.2. 第三方庫(kù)
github.com/Shopify/sarama // kafka主要的庫(kù)*github.com/bsm/sarama-cluster // kafka消費(fèi)組
3.3. 消費(fèi)者
單個(gè)消費(fèi)者
funcconsumer(){varwg sync.WaitGroup? consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{? ? ? fmt.Println("Failed to start consumer: %s", err)return}? partitionList, err := consumer.Partitions("test0")//獲得該topic所有的分區(qū)iferr !=nil{? ? ? fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList {? ? ? pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{? ? ? ? fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}? ? ? wg.Add(1)gofunc(sarama.PartitionConsumer){//為每個(gè)分區(qū)開(kāi)一個(gè)go協(xié)程去取值formsg :=rangepc.Messages() {//阻塞直到有值發(fā)送過(guò)來(lái),然后再繼續(xù)等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))? ? ? ? }deferpc.AsyncClose()? ? ? ? wg.Done()? ? ? }(pc)? }? wg.Wait()}funcmain(){? consumer()}
消費(fèi)組
funcconsumerCluster(){? groupID :="group-1"config := cluster.NewConfig()? config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second? config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始從最新的offset開(kāi)始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{? ? ? glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){? ? ? errors := c.Errors()? ? ? noti := c.Notifications()for{select{caseerr := -errors:? ? ? ? ? ? glog.Errorln(err)case-noti:? ? ? ? }? ? ? }? }(c)formsg :=rangec.Messages() {? ? ? fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))? ? ? c.MarkOffset(msg,"")//MarkOffset 并不是實(shí)時(shí)寫入kafka,有可能在程序crash時(shí)丟掉未提交的offset}}funcmain(){goconsumerCluster()}
3.4. 生產(chǎn)者
同步生產(chǎn)者
packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){? config := sarama.NewConfig()? config.Producer.RequiredAcks = sarama.WaitForAll//賦值為-1:這意味著producer在follower副本確認(rèn)接收到數(shù)據(jù)后才算一次發(fā)送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//寫到隨機(jī)分區(qū)中,默認(rèn)設(shè)置8個(gè)分區(qū)config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{}? msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!")? client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{? ? ? fmt.Println("producer close err, ", err)return}deferclient.Close()? pid, offset, err := client.SendMessage(msg)iferr !=nil{? ? ? fmt.Println("send message failed, ", err)return}? fmt.Printf("分區(qū)ID:%v, offset:%v \n", pid, offset)}
異步生產(chǎn)者
funcasyncProducer(){? config := sarama.NewConfig()? config.Producer.Return.Successes =true//必須有這個(gè)選項(xiàng)config.Producer.Timeout =5* time.Second? p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//這個(gè)部分一定要寫,不然通道會(huì)被堵塞gofunc(p sarama.AsyncProducer){? ? ? errors := p.Errors()? ? ? success := p.Successes()for{select{caseerr := -errors:iferr !=nil{? ? ? ? ? ? ? glog.Errorln(err)? ? ? ? ? ? }case-success:? ? ? ? }? ? ? }? }(p)for{? ? ? v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))? ? ? fmt.Fprintln(os.Stdout, v)? ? ? msg := sarama.ProducerMessage{? ? ? ? Topic: topics,? ? ? ? Value: sarama.ByteEncoder(v),? ? ? }? ? ? p.Input() - msg? ? ? time.Sleep(time.Second *1)? }}funcmain(){goasyncProducer()select{? ? ? }}
3.5. 結(jié)果展示-
同步生產(chǎn)打?。?/p>
分區(qū)ID:0,offset:90
消費(fèi)打印:
Partition:0,Offset:90,key:,value:Hello World!
異步生產(chǎn)打?。?/p>
async:7272async:7616async:998
消費(fèi)打?。?/p>
Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998
go有沒(méi)有開(kāi)源的類似java的mina或者netty的socket框架
mina與netty都是Trustin Lee的作品,所以在很多方面都十分相似,他們線程模型也是基本一致,采用了Reactors in threads模型,即Main Reactor + Sub Reactors的模式。由main reactor處理連接相關(guān)的任務(wù):accept、connect等,當(dāng)連接處理完畢并建立
協(xié)程與異步IO
協(xié)程,又稱微線程,纖程。英文名 Coroutine 。Python對(duì)協(xié)程的支持是通過(guò) generator 實(shí)現(xiàn)的。在generator中,我們不但可以通過(guò)for循環(huán)來(lái)迭代,還可以不斷調(diào)用 next()函數(shù) 獲取由 yield 語(yǔ)句返回的下一個(gè)值。但是Python的yield不但可以返回一個(gè)值,它還可以接收調(diào)用者發(fā)出的參數(shù)。yield其實(shí)是終端當(dāng)前的函數(shù),返回給調(diào)用方。python3中使用yield來(lái)實(shí)現(xiàn)range,節(jié)省內(nèi)存,提高性能,懶加載的模式。
asyncio是Python 3.4 版本引入的 標(biāo)準(zhǔn)庫(kù) ,直接內(nèi)置了對(duì)異步IO的支持。
從Python 3.5 開(kāi)始引入了新的語(yǔ)法 async 和 await ,用來(lái)簡(jiǎn)化yield的語(yǔ)法:
import asyncio
import threading
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
print(threading.current_thread().name)
await asyncio.sleep(x + y)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
print(threading.current_thread().name)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
tasks = [print_sum(1, 2), print_sum(3, 4)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
線程是內(nèi)核進(jìn)行搶占式的調(diào)度的,這樣就確保了每個(gè)線程都有執(zhí)行的機(jī)會(huì)。而 coroutine 運(yùn)行在同一個(gè)線程中,由語(yǔ)言的運(yùn)行時(shí)中的 EventLoop(事件循環(huán)) 來(lái)進(jìn)行調(diào)度。和大多數(shù)語(yǔ)言一樣,在 Python 中,協(xié)程的調(diào)度是非搶占式的,也就是說(shuō)一個(gè)協(xié)程必須主動(dòng)讓出執(zhí)行機(jī)會(huì),其他協(xié)程才有機(jī)會(huì)運(yùn)行。
讓出執(zhí)行的關(guān)鍵字就是 await。也就是說(shuō)一個(gè)協(xié)程如果阻塞了,持續(xù)不讓出 CPU,那么整個(gè)線程就卡住了,沒(méi)有任何并發(fā)。
PS: 作為服務(wù)端,event loop最核心的就是IO多路復(fù)用技術(shù),所有來(lái)自客戶端的請(qǐng)求都由IO多路復(fù)用函數(shù)來(lái)處理;作為客戶端,event loop的核心在于利用Future對(duì)象延遲執(zhí)行,并使用send函數(shù)激發(fā)協(xié)程,掛起,等待服務(wù)端處理完成返回后再調(diào)用CallBack函數(shù)繼續(xù)下面的流程
Go語(yǔ)言的協(xié)程是 語(yǔ)言本身特性 ,erlang和golang都是采用了CSP(Communicating Sequential Processes)模式(Python中的協(xié)程是eventloop模型),但是erlang是基于進(jìn)程的消息通信,go是基于goroutine和channel的通信。
Python和Go都引入了消息調(diào)度系統(tǒng)模型,來(lái)避免鎖的影響和進(jìn)程/線程開(kāi)銷大的問(wèn)題。
協(xié)程從本質(zhì)上來(lái)說(shuō)是一種用戶態(tài)的線程,不需要系統(tǒng)來(lái)執(zhí)行搶占式調(diào)度,而是在語(yǔ)言層面實(shí)現(xiàn)線程的調(diào)度 。因?yàn)閰f(xié)程 不再使用共享內(nèi)存/數(shù)據(jù) ,而是使用 通信 來(lái)共享內(nèi)存/鎖,因?yàn)樵谝粋€(gè)超級(jí)大系統(tǒng)里具有無(wú)數(shù)的鎖,共享變量等等會(huì)使得整個(gè)系統(tǒng)變得無(wú)比的臃腫,而通過(guò)消息機(jī)制來(lái)交流,可以使得每個(gè)并發(fā)的單元都成為一個(gè)獨(dú)立的個(gè)體,擁有自己的變量,單元之間變量并不共享,對(duì)于單元的輸入輸出只有消息。開(kāi)發(fā)者只需要關(guān)心在一個(gè)并發(fā)單元的輸入與輸出的影響,而不需要再考慮類似于修改共享內(nèi)存/數(shù)據(jù)對(duì)其它程序的影響。
網(wǎng)站欄目:go語(yǔ)言異步模型 go異步調(diào)用
本文URL:http://www.dlmjj.cn/article/dodpghp.html