Qt之线程同步(生产者消费者模式 - QSemaphore)
簡(jiǎn)述
生產(chǎn)者將數(shù)據(jù)寫(xiě)入緩沖區(qū),直到它到達(dá)緩沖區(qū)的末尾,此時(shí),它將從開(kāi)始位置重新啟動(dòng),覆蓋現(xiàn)有數(shù)據(jù)。消費(fèi)者線程讀取數(shù)據(jù)并將其寫(xiě)入標(biāo)準(zhǔn)錯(cuò)誤。
Semaphore(信號(hào)量) 比 mutex(互斥量)有一個(gè)更高級(jí)的并發(fā)性。如果緩沖區(qū)的訪問(wèn)由一個(gè) QMutex 把守,當(dāng)生產(chǎn)者線程訪問(wèn)緩沖區(qū)時(shí),消費(fèi)者線程將無(wú)法訪問(wèn)。然而,有兩個(gè)線程同一時(shí)間訪問(wèn)不同的緩沖區(qū)是沒(méi)有害處的。
示例包括兩個(gè)類(lèi):Producer 和 Consumer,均繼承自 QThread。循環(huán)緩沖區(qū)用于這兩個(gè)類(lèi)之間的溝通,信號(hào)量用于保護(hù)全局變量。
另一種實(shí)現(xiàn)“生產(chǎn)者 - 消費(fèi)者”模式的方案是使用 QWaitCondition 和 QMutex - Qt之線程同步(生產(chǎn)者消費(fèi)者模式 - QWaitCondition)
- 簡(jiǎn)述
- 全局變量
- Producer
- Consumer
- main 函數(shù)
- 更多參考
全局變量
首先,一起來(lái)看循環(huán)緩沖區(qū)和相關(guān)的信號(hào)量:
const int DataSize = 100000;const int BufferSize = 8192; char buffer[BufferSize];QSemaphore freeBytes(BufferSize); QSemaphore usedBytes;DataSize 是生產(chǎn)者將生成的數(shù)據(jù)數(shù)量,為了讓示例盡可能地簡(jiǎn)單,把它定義為一個(gè)常數(shù)。BufferSize 是循環(huán)緩沖區(qū)的大小,小于 DataSize,這意味著在某一時(shí)刻生產(chǎn)者將達(dá)到緩沖區(qū)的末尾,會(huì)從開(kāi)始位置重新啟動(dòng)。
要同步生產(chǎn)者和消費(fèi)者,需要兩個(gè)信號(hào)量。freeBytes 信號(hào)量用于控制緩沖區(qū)的 "free" 區(qū)域(生產(chǎn)者尚未填充數(shù)據(jù),或消費(fèi)者已經(jīng)讀取的區(qū)域)。usedBytes 信號(hào)量用于控制緩沖區(qū)的 "used" 區(qū)域(生產(chǎn)者已經(jīng)填充數(shù)據(jù),但消費(fèi)者尚未讀取的區(qū)域)。
總之,這些信號(hào)量確保生產(chǎn)者不會(huì)先于消費(fèi)者超過(guò) BufferSize 的大小,而消費(fèi)者永遠(yuǎn)不會(huì)讀取生產(chǎn)者尚未生成的數(shù)據(jù)。
freeBytes 信號(hào)量用 BufferSize 來(lái)初始化,因?yàn)樽畛跽麄€(gè)緩沖區(qū)是空的。usedBytes 信號(hào)量初始化為 0(默認(rèn)值,如果未指定)。
Producer
Producer 類(lèi)的代碼如下:
class Producer : public QThread { public:void run() Q_DECL_OVERRIDE{qsrand(QTime(0,0,0).secsTo(QTime::currentTime()));for (int i = 0; i < DataSize; ++i) {freeBytes.acquire();buffer[i % BufferSize] = "ACGT"[(int)qrand() % 4];usedBytes.release();}} };生產(chǎn)者生成 DataSize 字節(jié)的數(shù)據(jù)。在往循環(huán)緩沖區(qū)寫(xiě)入一個(gè)字節(jié)之前,它必須使用 freeBytes 信號(hào)量來(lái)獲得一個(gè) "free" 字節(jié)。如果消費(fèi)者沒(méi)有與生產(chǎn)者保持著同樣的速度,QSemaphore::acquire() 調(diào)用可能會(huì)阻塞。
最后,生產(chǎn)者使用 usedBytes 信號(hào)量釋放一個(gè)字節(jié)。該 "free" 字節(jié)已成功轉(zhuǎn)化為一個(gè) "used" 字節(jié),準(zhǔn)備好供消費(fèi)者讀取。
Consumer
現(xiàn)在轉(zhuǎn)向 Consumer 類(lèi):
class Consumer : public QThread {Q_OBJECT public:void run() Q_DECL_OVERRIDE{for (int i = 0; i < DataSize; ++i) {usedBytes.acquire();fprintf(stderr, "%c", buffer[i % BufferSize]);freeBytes.release();}fprintf(stderr, "\n");}signals:void stringConsumed(const QString &text);protected:bool finish; };代碼非常類(lèi)似于生產(chǎn)者,不同的是,獲得 "used" 字節(jié)并釋放一個(gè) "free" 字節(jié),而非相反。
main() 函數(shù)
在 main() 函數(shù)中,我們創(chuàng)建兩個(gè)線程,并調(diào)用 QThread::wait(),以確保在退出之前,這兩個(gè)線程有時(shí)間完成。
int main(int argc, char *argv[]) {QCoreApplication app(argc, argv);Producer producer;Consumer consumer;producer.start();consumer.start();producer.wait();consumer.wait();return 0; }當(dāng)運(yùn)行這個(gè)程序時(shí),會(huì)發(fā)生什么呢?
最初,生產(chǎn)者是唯一一個(gè)可以做任何事情的線程,消費(fèi)者阻塞并等待 usedBytes 信號(hào)量的釋放(available() 初始數(shù)是 0)。一旦生產(chǎn)者把一個(gè)字節(jié)放入緩沖區(qū),freeBytes.available() 就會(huì)變?yōu)?BufferSize - 1,并且 usedBytes.available() 變?yōu)?1。這時(shí),可能發(fā)生兩件事:要么消費(fèi)者線程接管和讀取字節(jié),要么生產(chǎn)者開(kāi)始生成第二個(gè)字節(jié)。
此示例中提出的“生產(chǎn)者 - 消費(fèi)者”模式,適用于編寫(xiě)高并發(fā)多線程應(yīng)用。在多處理器計(jì)算機(jī)中,程序可能比基于 mutex 的方案快達(dá)兩倍之多,因?yàn)閮蓚€(gè)線程可以同一時(shí)間在緩沖區(qū)的不同部分處于激活狀態(tài)。
要知道,這些好處并不總能實(shí)現(xiàn),獲取和釋放一個(gè) QSemaphore 是需要成本的。在實(shí)踐中,可能需要把緩沖區(qū)分為塊,并針對(duì)塊操作而非單個(gè)字節(jié)。緩沖區(qū)的大小也是一個(gè)必須仔細(xì)選擇的參數(shù),需要基于實(shí)驗(yàn)。
總結(jié)
以上是生活随笔為你收集整理的Qt之线程同步(生产者消费者模式 - QSemaphore)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 【bzoj3879】SvT 后缀数组+
- 下一篇: Kotlin 文档 .Google 正式