日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
孔乙己:Kotlin生產(chǎn)者消費(fèi)者問(wèn)題的八種解法

 

目前成都創(chuàng)新互聯(lián)已為數(shù)千家的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)頁(yè)空間、綿陽(yáng)服務(wù)器托管、企業(yè)網(wǎng)站設(shè)計(jì)、焉耆網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長(zhǎng),共同發(fā)展。

本文轉(zhuǎn)載自微信公眾號(hào)「AndroidPub」,作者fundroid。轉(zhuǎn)載本文請(qǐng)聯(lián)系A(chǔ)ndroidPub公眾號(hào)。

生產(chǎn)者和消費(fèi)者問(wèn)題是線程模型中的經(jīng)典問(wèn)題:生產(chǎn)者和消費(fèi)者在同一時(shí)間段內(nèi)共用同一個(gè)緩沖區(qū)(Buffer),生產(chǎn)者往 Buffer 中添加產(chǎn)品,消費(fèi)者從 Buffer 中取走產(chǎn)品,當(dāng) Buffer 為空時(shí),消費(fèi)者阻塞,當(dāng) Buffer 滿時(shí),生產(chǎn)者阻塞。

Kotlin 中有多種方法可以實(shí)現(xiàn)多線程的生產(chǎn)/消費(fèi)模型(大多也適用于Java)

  1. Synchronized
  2. ReentrantLock
  3. BlockingQueue
  4. Semaphore
  5. PipedXXXStream
  6. RxJava
  7. Coroutine
  8. Flow

1. Synchronized

Synchronized 是最最基本的線程同步工具,配合 wait/notify 可以實(shí)現(xiàn)實(shí)現(xiàn)生產(chǎn)消費(fèi)問(wèn)題。

 
 
 
 
  1. val buffer = LinkedList() 
  2. val MAX = 5 //buffer最大size 
  3.  
  4. val lock = Object() 
  5.  
  6. fun produce(data: Data) { 
  7.     sleep(2000) // mock produce 
  8.     synchronized(lock) { 
  9.         while (buffer.size >= MAX) { 
  10.            // 當(dāng)buffer滿時(shí),停止生產(chǎn) 
  11.            // 注意此處使用while不能使用if,因?yàn)橛锌赡苁潜涣硪粋€(gè)生產(chǎn)線程而非消費(fèi)線程喚醒,所以要再次檢查buffer狀態(tài) 
  12.            // 如果生產(chǎn)消費(fèi)兩把鎖,則不必?fù)?dān)心此問(wèn)題 
  13.            lock.wait() 
  14.         } 
  15.  
  16.       buffer.push(data) 
  17.         // notify方法只喚醒其中一個(gè)線程,選擇哪個(gè)線程取決于操作系統(tǒng)對(duì)多線程管理的實(shí)現(xiàn)。 
  18.         // notifyAll會(huì)喚醒所有等待中線程,哪一個(gè)線程將會(huì)第一個(gè)處理取決于操作系統(tǒng)的實(shí)現(xiàn),但是都有機(jī)會(huì)處理。 
  19.         // 此處使用notify有可能喚醒的是另一個(gè)生產(chǎn)線程從而造成死鎖,所以必須使用notifyAll 
  20.         lock.notifyAll() 
  21.     } 
  22.  
  23. fun consume() { 
  24.     synchronized(lock) { 
  25.         while (buffer.isEmpty()) 
  26.             lock.wait() // 暫停消費(fèi) 
  27.         buffer.removeFirst() 
  28.         lock.notifyAll() 
  29.     } 
  30.     sleep(2000) // mock consume 
  31.  
  32.  
  33.  
  34. @Test 
  35. fun test() { 
  36.     // 同時(shí)啟動(dòng)多個(gè)生產(chǎn)、消費(fèi)線程 
  37.     repeat(10) { 
  38.         Thread { produce(Data()) }.start() 
  39.     } 
  40.     repeat(10) { 
  41.         Thread { consume() }.start() 
  42.     } 

2. ReentrantLock

Lock 相對(duì)于 Synchronized 好處是當(dāng)有多個(gè)生產(chǎn)線/消費(fèi)線程時(shí),我們可以通過(guò)定義多個(gè) condition 精確指定喚醒哪一個(gè)。下面的例子展示 Lock 配合 await/single 替換前面 Synchronized 寫(xiě)法。

 
 
 
 
  1. val buffer = LinkedList() 
  2. val MAX = 5 //buffer最大size 
  3.               
  4. val lock = ReentrantLock()                      
  5. val condition = lock.newCondition()           
  6.                                                 
  7. fun produce(data: Data) {                       
  8.     sleep(2000) // mock produce                 
  9.     lock.lock()                                 
  10.                                                 
  11.     while (buffer.size >= 5)                       
  12.         condition.await()                       
  13.                                                 
  14.     buffer.push(data)                           
  15.     condition.signalAll()                       
  16.     lock.unlock()                               
  17. }                                               
  18.                                                 
  19. fun consume() {                                 
  20.     lock.lock()                                 
  21.     while (buffer.isEmpty())                       
  22.         condition.await()                       
  23.                                                 
  24.     buffer.removeFirst() 
  25.     condition.singleAll()                         
  26.     lock.unlock()                               
  27.     sleep(2000) // mock consume                 
  28. }                                             

3. BlockingQueue (阻塞隊(duì)列)

BlockingQueue在達(dá)到臨界條件時(shí),再進(jìn)行讀寫(xiě)會(huì)自動(dòng)阻塞當(dāng)前線程等待鎖的釋放,天然適合這種生產(chǎn)/消費(fèi)場(chǎng)景。

 
 
 
 
  1. val buffer = LinkedBlockingQueue(5)                
  2.                                                          
  3. fun produce(data: Data) {                                
  4.     sleep(2000) // mock produce                          
  5.     buffer.put(data) //buffer滿時(shí)自動(dòng)阻塞                        
  6.                                                          
  7. fun consume() {                                          
  8.     buffer.take() // buffer空時(shí)自動(dòng)阻塞 
  9.     sleep(2000) // mock consume                          
  10. }                                                        
  11.                                       

注意 BlockingQueue 的有三組讀/寫(xiě)方法,只有一組有阻塞效果,不要用錯(cuò)。

方法 說(shuō)明
add(o)/remove(o)add 方法在添加元素的時(shí)候,若超出了隊(duì)列的長(zhǎng)度會(huì)直接拋出異常
offer(o)/poll(o)offer 在添加元素時(shí),如果發(fā)現(xiàn)隊(duì)列已滿無(wú)法添加的話,會(huì)直接返回false
put(o)/take(o)put 向隊(duì)尾添加元素的時(shí)候發(fā)現(xiàn)隊(duì)列已經(jīng)滿了會(huì)發(fā)生阻塞一直等待空間,以加入元素
 

4. Semaphore(信號(hào)量)

Semaphore 是 JUC 提供的一種共享鎖機(jī)制,可以進(jìn)行擁塞控制,此特性可用來(lái)控制 buffer 的大小。

 
 
 
 
  1. // canProduce: 可以生產(chǎn)數(shù)量(即buffer可用的數(shù)量),生產(chǎn)者調(diào)用acquire,減少permit數(shù)目     
  2. val canProduce = Semaphore(5)                                                                                            
  3. // canConsumer:可以消費(fèi)數(shù)量,生產(chǎn)者調(diào)用release,增加permit數(shù)目                   
  4. val canConsume = Semaphore(5)                                                                                       
  5. // 控制buffer訪問(wèn)互斥                                                 
  6. val mutex = Semaphore(0)                                        
  7.                                                                 
  8. val buffer = LinkedList()                                 
  9.                                                                 
  10. fun produce(data: Data) {                                       
  11.     if (canProduce.tryAcquire()) {                              
  12.         sleep(2000) // mock produce                             
  13.                                                                 
  14.         mutex.acquire()                                         
  15.         buffer.push(data)                                       
  16.         mutex.release()                                         
  17.                                                                 
  18.         canConsume.release() //通知消費(fèi)端新增加了一個(gè)產(chǎn)品                    
  19.     }                                                           
  20. }                                                               
  21.                                                                 
  22. fun consume() {                                                 
  23.     if (canConsume.tryAcquire()) {                              
  24.         sleep(2000) // mock consume                             
  25.                                                                 
  26.         mutex.acquire()                                         
  27.         buffer.removeFirst()                                    
  28.         mutex.release()                                         
  29.                                                                 
  30.         canProduce.release() //通知生產(chǎn)端可以再追加生產(chǎn)                     
  31.     }                                                           
  32.                                                                 
  33. }                                         

5. PipedXXXStream (管道)

Java 里的管道輸入/輸出流 PipedInputStream / PipedOutputStream 實(shí)現(xiàn)了類似管道的功能,用于不同線程之間的相互通信,輸入流中有一個(gè)緩沖數(shù)組,當(dāng)緩沖數(shù)組為空的時(shí)候,輸入流 PipedInputStream 所在的線程將阻塞。

 
 
 
 
  1. val pis: PipedInputStream = PipedInputStream() 
  2. val pos: PipedOutputStream by lazy { 
  3.     PipedOutputStream().apply { 
  4.         pis.connect(this) //輸入輸出流之間建立連接 
  5.     } 
  6.  
  7. fun produce(data: ContactsContract.Data) { 
  8.     while (true) { 
  9.         sleep(2000) 
  10.         pos.use { // Kotlin 使用 use 方便的進(jìn)行資源釋放 
  11.             it.write(data.getBytes()) 
  12.             it.flush() 
  13.         } 
  14.     } 
  15.  
  16. fun consume() { 
  17.     while (true) { 
  18.         sleep(2000) 
  19.         pis.use { 
  20.             val byteArray = ByteArray(1024) 
  21.             it.read(byteArray) 
  22.         } 
  23.     } 
  24.  
  25. @Test 
  26. fun Test() { 
  27.     repeat(10) { 
  28.         Thread { produce(Data()) }.start() 
  29.     } 
  30.  
  31.     repeat(10) { 
  32.         Thread { consume() }.start() 
  33.     } 

6. RxJava

RxJava 從概念上,可以將 Observable/Subject 作為生產(chǎn)者, Subscriber 作為消費(fèi)者, 但是無(wú)論 Subject 或是 Observable 都缺少 Buffer 溢出時(shí)的阻塞機(jī)制,難以獨(dú)立實(shí)現(xiàn)生產(chǎn)者/消費(fèi)者模型。

Flowable 的背壓機(jī)制,可以用來(lái)控制 buffer 數(shù)量,并在上下游之間建立通信, 配合 Atomic 可以變向?qū)崿F(xiàn)單生產(chǎn)者/單消費(fèi)者場(chǎng)景,(不適用于多生產(chǎn)者/多消費(fèi)者場(chǎng)景)。

 
 
 
 
  1. class Producer : Flowable() { 
  2.  
  3.     override fun subscribeActual(subscriber: org.reactivestreams.Subscriber) { 
  4.         subscriber.onSubscribe(object : Subscription { 
  5.             override fun cancel() { 
  6.                 //... 
  7.             } 
  8.  
  9.             private val outStandingRequests = AtomicLong(0) 
  10.  
  11.             override fun request(n: Long) { //收到下游通知,開(kāi)始生產(chǎn) 
  12.                 outStandingRequests.addAndGet(n) 
  13.  
  14.                 while (outStandingRequests.get() > 0) { 
  15.                     sleep(2000) 
  16.                     subscriber.onNext(Data()) 
  17.                     outStandingRequests.decrementAndGet() 
  18.                 } 
  19.             } 
  20.  
  21.         }) 
  22.     } 
  23.  
  24.  
  25.  
  26.  
  27. class Consumer : DefaultSubscriber() { 
  28.  
  29.     override fun onStart() { 
  30.         request(1) 
  31.     } 
  32.  
  33.     override fun onNext(i: Data?) { 
  34.         sleep(2000) //mock consume 
  35.         request(1) //通知上游可以增加生產(chǎn) 
  36.     } 
  37.  
  38.     override fun onError(throwable: Throwable) { 
  39.         //... 
  40.     } 
  41.  
  42.     override fun onComplete() { 
  43.         //... 
  44.     } 
  45.  
  46.  
  47.  
  48. @Test 
  49. fun test_rxjava() { 
  50.     try { 
  51.         val testProducer = Producer) 
  52.         val testConsumer = Consumer() 
  53.  
  54.         testProducer 
  55.             .subscribeOn(Schedulers.computation()) 
  56.             .observeOn(Schedulers.single()) 
  57.             .blockingSubscribe(testConsumer) 
  58.  
  59.     } catch (t: Throwable) { 
  60.         t.printStackTrace() 
  61.     } 
  62.  

7. Coroutine Channel

協(xié)程中的 Channel 具有擁塞控制機(jī)制,可以實(shí)現(xiàn)生產(chǎn)者消費(fèi)者之間的通信??梢园?Channel 理解為一個(gè)協(xié)程版本的阻塞隊(duì)列,capacity 指定隊(duì)列容量。

 
 
 
 
  1. val channel = Channel(capacity = 5) 
  2.  
  3. suspend fun produce(data: ContactsContract.Contacts.Data) = run { 
  4.     delay(2000) //mock produce 
  5.     channel.send(data) 
  6.  
  7.  
  8. suspend fun consume() = run { 
  9.     delay(2000)//mock consume 
  10.     channel.receive() 
  11.  
  12. @Test 
  13. fun test_channel() { 
  14.     repeat(10) { 
  15.         GlobalScope.launch { 
  16.             produce(Data()) 
  17.         } 
  18.     } 
  19.  
  20.     repeat(10) { 
  21.         GlobalScope.launch { 
  22.            consume() 
  23.         } 
  24.     } 

此外,Coroutine 提供了 produce 方法,在聲明 Channel 的同時(shí)生產(chǎn)數(shù)據(jù),寫(xiě)法上更簡(jiǎn)單,適合單消費(fèi)者單生產(chǎn)者的場(chǎng)景:

 
 
 
 
  1. fun CoroutineScope.produce(): ReceiveChannel = produce { 
  2.     repeat(10) { 
  3.         delay(2000) //mock produce 
  4.         send(Data()) 
  5.     } 
  6.  
  7. @Test 
  8. fun test_produce() { 
  9.     GlobalScope.launch { 
  10.         produce.consumeEach { 
  11.             delay(2000) //mock consume 
  12.         } 
  13.     } 

8. Coroutine Flow

Flow 跟 RxJava 一樣,因?yàn)槿鄙?Buffer 溢出時(shí)的阻塞機(jī)制,不適合處理生產(chǎn)消費(fèi)問(wèn)題,其背壓機(jī)制也比較簡(jiǎn)單,無(wú)法像 RxJava 那樣收到下游通知。但是 Flow 后來(lái)發(fā)布了 SharedFlow, 作為帶緩沖的熱流,提供了 Buffer 溢出策略,可以用作生產(chǎn)者/消費(fèi)者之間的同步。

 
 
 
 
  1. val flow : MutableSharedFlow = MutableSharedFlow( 
  2.     extraBufferCapacity = 5  //緩沖大小 
  3.     , onBufferOverflow = BufferOverflow.SUSPEND // 緩沖溢出時(shí)的策略:掛起 
  4.  
  5. @Test 
  6. fun test() { 
  7.  
  8.     GlobalScope.launch { 
  9.         repeat(10) { 
  10.             delay(2000) //mock produce 
  11.             sharedFlow.emit(Data()) 
  12.         } 
  13.     } 
  14.  
  15.     GlobalScope.launch { 
  16.         sharedFlow.collect { 
  17.             delay(2000) //mock consume 
  18.         } 
  19.     } 

注意 SharedFlow 也只能用在單生產(chǎn)者/單消費(fèi)者場(chǎng)景。

總結(jié)

生產(chǎn)者/消費(fèi)者問(wèn)題,其本質(zhì)核心還是多線程讀寫(xiě)共享資源(Buffer)時(shí)的同步問(wèn)題,理論上只要具有同步機(jī)制的多線程框架,例如線程鎖、信號(hào)量、阻塞隊(duì)列、協(xié)程 Channel等,都是可以實(shí)現(xiàn)生產(chǎn)消費(fèi)模型的。 

另外,RxJava 和 Flow 雖然也是多線程框架,但是缺少Buffer溢出時(shí)的阻塞機(jī)制,不適用于生產(chǎn)/消費(fèi)場(chǎng)景,更適合在純響應(yīng)式場(chǎng)景中使用。


分享文章:孔乙己:Kotlin生產(chǎn)者消費(fèi)者問(wèn)題的八種解法
瀏覽路徑:http://www.dlmjj.cn/article/dhospgj.html