实现一个通用的生产者消费者队列(c语言版本)
背景:筆者之前一直從事嵌入式音視頻相關(guān)的開(kāi)發(fā)工作,對(duì)于音視頻的數(shù)據(jù)的處理,生產(chǎn)者消費(fèi)者隊(duì)列必不可少,而如何實(shí)現(xiàn)一個(gè)高效穩(wěn)定的生產(chǎn)者消費(fèi)者隊(duì)列則十分重要,不過(guò)按照筆者從業(yè)的經(jīng)驗(yàn),所看到的現(xiàn)象,不容樂(lè)觀,很多知名大廠在這種基礎(chǔ)組件的開(kāi)發(fā)能力上十分堪憂。
音視頻數(shù)據(jù)處理的特點(diǎn):
- 音視頻數(shù)據(jù)量大:音視頻數(shù)據(jù)特別是視頻數(shù)據(jù),占據(jù)了計(jì)算機(jī)數(shù)據(jù)的很大一塊,不信就看看每個(gè)人的硬盤(pán)里,去除電影,照片,mp3是不是很空蕩蕩的。
- 實(shí)時(shí)性要求高:音視頻的延時(shí)如果大于200ms,使用體驗(yàn)會(huì)十分糟糕。
- 處理流程復(fù)雜:一幀數(shù)據(jù)從sensor捕獲到最終網(wǎng)傳輸出或者lcd顯示,都需要經(jīng)過(guò)一系列的模塊進(jìn)行處理。特別是網(wǎng)絡(luò)傳輸,一般要經(jīng)過(guò)原始數(shù)據(jù)捕獲,視頻數(shù)據(jù)格式轉(zhuǎn)換,數(shù)據(jù)編碼壓縮,數(shù)據(jù)封裝打包,網(wǎng)絡(luò)傳輸。
生產(chǎn)者消費(fèi)者隊(duì)列在視頻數(shù)據(jù)處理的必要性:
視頻數(shù)據(jù)的處理為什么需要生產(chǎn)者消費(fèi)者隊(duì)列?其實(shí)上面提到的音視頻數(shù)據(jù)處理的特點(diǎn)就是答案。
- 一般對(duì)視頻數(shù)據(jù)處理的模塊都是運(yùn)行在多線程中,每一個(gè)處理模塊運(yùn)行在一個(gè)線程中(也是軟件工程分模塊的思想),相互之間通過(guò)生產(chǎn)者消費(fèi)者隊(duì)列進(jìn)行數(shù)據(jù)的交互,之所以用線程模型,而沒(méi)有用我一直推崇的進(jìn)程模型是因?yàn)榫€程間的共享內(nèi)存比較方便,而進(jìn)程則要相對(duì)復(fù)雜的多。
- 利用多線程/多進(jìn)程的并行處理能力 ,如果采用單線程單進(jìn)程的單線處理模式,一幀數(shù)據(jù)從采集到輸出線性經(jīng)過(guò)幾個(gè)模塊,時(shí)效性無(wú)法保證。
- 緩存數(shù)據(jù),保持平滑,通過(guò)隊(duì)列緩存視頻數(shù)據(jù),可以有效的去除一些數(shù)據(jù)抖動(dòng),幫助音視頻數(shù)據(jù)的平滑播放,同時(shí)這個(gè)數(shù)據(jù)的緩存又不易過(guò)多,否則加大了延時(shí),損傷實(shí)時(shí)性,所以隊(duì)列大小的設(shè)置是一個(gè)平衡的藝術(shù)。
常見(jiàn)的生產(chǎn)者消費(fèi)者隊(duì)列實(shí)現(xiàn)存在的問(wèn)題:
不注重效率性能:
- 對(duì)于buffer狀態(tài)的檢測(cè)采用loop輪詢方式。
loop輪詢是任何有追求的程序員都要避免的處理方式,而筆者以自己經(jīng)歷經(jīng)常看到以下類似的代碼:
以上代碼loop一個(gè)狀態(tài),如果狀態(tài)成立做有效的處理,不成立則睡眠一定時(shí)間,之后再次調(diào)用該段代碼進(jìn)行下一次的狀態(tài)檢測(cè),而這個(gè)睡眠時(shí)間是一個(gè)隨機(jī)經(jīng)驗(yàn)值,很有可能下次仍然是無(wú)效的檢測(cè),接著睡眠再loop,多余的loop是一種資源的浪費(fèi)。
數(shù)據(jù)的傳遞采用copy方式:
數(shù)據(jù)的傳遞,采用copy的方式,一幀數(shù)據(jù)在一個(gè)完整的處理流程中經(jīng)過(guò)n次copy(筆者見(jiàn)過(guò)一個(gè)系統(tǒng)中一幀數(shù)據(jù)copy了8次之多)
生產(chǎn)者消費(fèi)者隊(duì)列和業(yè)務(wù)代碼混雜在一起,沒(méi)有分離:
對(duì)于開(kāi)發(fā)者,都希望用最簡(jiǎn)單的接口完成某個(gè)功能,而不關(guān)心內(nèi)部實(shí)現(xiàn),在這里就是,只需要生產(chǎn)者生產(chǎn)數(shù)據(jù),消費(fèi)者消費(fèi)數(shù)據(jù),而內(nèi)部的處理(同步,數(shù)據(jù)的處理等)完全不關(guān)心,這樣開(kāi)發(fā)者就不需要去弄很多鎖,降低了開(kāi)發(fā)難度,也可以使代碼組件化,模塊化。
有經(jīng)驗(yàn)的開(kāi)發(fā)者應(yīng)該感覺(jué)以上都是基礎(chǔ)點(diǎn),不會(huì)有人犯這樣的錯(cuò)誤,不過(guò)筆者以自己的經(jīng)歷肯定的說(shuō),以上兩種問(wèn)題在某世界級(jí)大廠的視頻設(shè)備上隨處可見(jiàn)。
讓我悲哀的是,當(dāng)我指出這些問(wèn)題時(shí),某些開(kāi)發(fā)者完全無(wú)動(dòng)于衷。而我更無(wú)力的是,現(xiàn)在的cpu,memory性能實(shí)在是高,在某些不太高端的嵌入式芯片上,優(yōu)化過(guò)的數(shù)據(jù)并沒(méi)有十分明顯,在一個(gè)實(shí)際項(xiàng)目上,經(jīng)過(guò)優(yōu)化后cpu大概降低1%(原總系統(tǒng)cpu占用7%),有經(jīng)驗(yàn)的開(kāi)發(fā)者又會(huì)說(shuō)原7%的cpu占用率說(shuō)明這個(gè)芯片做這個(gè)系統(tǒng)浪費(fèi)了,不過(guò)也沒(méi)辦法其實(shí)已經(jīng)用了比較低端的芯片了。。。
以上的吐槽主要是想說(shuō)明:由于cpu,memory性能的提升,讓很多開(kāi)發(fā)者感覺(jué)軟件的優(yōu)化意義不大了,而我是一個(gè)理想主義者,對(duì)于某些設(shè)計(jì)ugly的代碼真的是零容忍啊。
如何優(yōu)化:
以上說(shuō)了這么多,那如何操作呢?
- 提高效率,去除多余的loop輪詢:
采用線程的同步機(jī)制,當(dāng)狀態(tài)條件符合要求時(shí),通過(guò)通知機(jī)制觸發(fā)后續(xù)處理,這樣去除了無(wú)效的loop輪詢檢測(cè),linux系統(tǒng)下可以采用條件變量,信號(hào)量來(lái)實(shí)現(xiàn),我更傾向于使用條件變量,因?yàn)樗莑inux系統(tǒng)原生支持的接口。
提到條件變量,不得不提一個(gè)概念:同步互斥,這么一個(gè)基本的操作系統(tǒng)概念,我最喜歡作為面試第一題,不過(guò)能用一句話切中要害的說(shuō)出之間區(qū)別和各自特點(diǎn)的人不是很多,答不出這題的,基本上就pass了。 - 減少多余copy:
采用預(yù)分配的方式,將buffer分為free和active兩大類,每一類buffer又切成幾個(gè)小buffer,然后通過(guò)指針將兩類buffer下的小buffer鏈接成兩個(gè)鏈表,使用者獲取buffer通過(guò)free鏈表獲取buffer,再將buffer put到active鏈表上,以上都是指針的操作,沒(méi)有數(shù)據(jù)的copy,極大的減少了copy操作。(再次強(qiáng)調(diào)指針是個(gè)好東西) - 模塊化組件化:
將生產(chǎn)者消費(fèi)者隊(duì)列的處理部分完全剝離成一個(gè)獨(dú)立的模塊組件,對(duì)外只提供幾個(gè)基本的接口,內(nèi)部完成同步通知的處理。
一個(gè)簡(jiǎn)單的實(shí)現(xiàn):
#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <stdio.h> #include <sys/ioctl.h> #include <sys/types.h> #include <sys/stat.h> #include <sys/time.h> #include <fcntl.h> #include <pthread.h> #include <signal.h> #include <time.h>#include "sfifo.h"//#define CONFIG_COND_FREE 1 #define CONFIG_COND_ACTIVE 1#define MAX_SFIFO_NUM 32struct sfifo_des_s sfifo_des[MAX_SFIFO_NUM]; struct sfifo_des_s *my_sfifo_des;struct sfifo_s* sfifo_get_free_buf(struct sfifo_des_s *sfifo_des_p) {static long empty_count = 0;struct sfifo_s *sfifo = NULL;pthread_mutex_lock(&(sfifo_des_p->free_list.lock_mutex)); #ifdef CONFIG_COND_FREEwhile (sfifo_des_p->free_list.head == NULL) {pthread_cond_wait(&(sfifo_des_p->free_list.cond), &(sfifo_des_p->free_list.lock_mutex));} #elseif (sfifo_des_p->free_list.head == NULL) {if (empty_count++ % 120 == 0) {printf("free list empty\n");}goto EXIT;} #endifsfifo = sfifo_des_p->free_list.head;sfifo_des_p->free_list.head = sfifo->next;EXIT:pthread_mutex_unlock(&(sfifo_des_p->free_list.lock_mutex));return sfifo; }int sfifo_put_free_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p) {int send_cond = 0;pthread_mutex_lock(&(sfifo_des_p->free_list.lock_mutex));if (sfifo_des_p->free_list.head == NULL) {sfifo_des_p->free_list.head = sfifo;sfifo_des_p->free_list.tail = sfifo;sfifo_des_p->free_list.tail->next = NULL;send_cond = 1;} else {sfifo_des_p->free_list.tail->next = sfifo;sfifo_des_p->free_list.tail = sfifo;sfifo_des_p->free_list.tail->next = NULL;}pthread_mutex_unlock(&(sfifo_des_p->free_list.lock_mutex)); #ifdef CONFIG_COND_FREEif (send_cond) {pthread_cond_signal(&(sfifo_des_p->free_list.cond));} #endifreturn 0; }struct sfifo_s* sfifo_get_active_buf(struct sfifo_des_s *sfifo_des_p) {struct sfifo_s *sfifo = NULL;pthread_mutex_lock(&(sfifo_des_p->active_list.lock_mutex)); #ifdef CONFIG_COND_ACTIVEwhile (sfifo_des_p->active_list.head == NULL) {//pthread_cond_timedwait(&(sfifo_des_p->active_list.cond), &(sfifo_des_p->active_list.lock_mutex), &outtime);pthread_cond_wait(&(sfifo_des_p->active_list.cond), &(sfifo_des_p->active_list.lock_mutex));} #elseif (sfifo_des_p->active_list.head == NULL) {printf("active list empty\n");goto EXIT;} #endifsfifo = sfifo_des_p->active_list.head;sfifo_des_p->active_list.head = sfifo->next;EXIT:pthread_mutex_unlock(&(sfifo_des_p->active_list.lock_mutex));return sfifo; }int sfifo_put_active_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p) {int send_cond = 0;pthread_mutex_lock(&(sfifo_des_p->active_list.lock_mutex));if (sfifo_des_p->active_list.head == NULL) {sfifo_des_p->active_list.head = sfifo;sfifo_des_p->active_list.tail = sfifo;sfifo_des_p->active_list.tail->next = NULL;send_cond = 1;} else {sfifo_des_p->active_list.tail->next = sfifo;sfifo_des_p->active_list.tail = sfifo;sfifo_des_p->active_list.tail->next = NULL;}pthread_mutex_unlock(&(sfifo_des_p->active_list.lock_mutex)); #ifdef CONFIG_COND_ACTIVEif (send_cond) {pthread_cond_signal(&(sfifo_des_p->active_list.cond));} #endifreturn 0; }int dump_sfifo_list(struct sfifo_list_des_s *list) {struct sfifo_s *sfifo = NULL;sfifo = list->head;do {printf("dump : %x\n", sfifo->buffer[0]);usleep(500 * 1000);} while (sfifo->next != NULL && (sfifo = sfifo->next));return 0; }struct sfifo_des_s *sfifo_init(int sfifo_num, int sfifo_buffer_size, int sfifo_active_max_num) {int i = 0;struct sfifo_s *sfifo;struct sfifo_des_s *sfifo_des_p;sfifo_des_p = (struct sfifo_des_s *)malloc(sizeof(struct sfifo_des_s));sfifo_des_p->sfifos_num = sfifo_num;sfifo_des_p->sfifos_active_max_num = sfifo_active_max_num;sfifo_des_p->free_list.sfifo_num = 0;sfifo_des_p->free_list.head = NULL;sfifo_des_p->free_list.tail = NULL;pthread_mutex_init(&sfifo_des_p->free_list.lock_mutex, NULL);pthread_cond_init(&sfifo_des_p->free_list.cond, NULL);sfifo_des_p->active_list.sfifo_num = 0;sfifo_des_p->active_list.head = NULL;sfifo_des_p->active_list.tail = NULL;pthread_mutex_init(&sfifo_des_p->active_list.lock_mutex, NULL);pthread_cond_init(&sfifo_des_p->active_list.cond, NULL);for (i = 0; i < sfifo_num; i++) {sfifo = (struct sfifo_s *)malloc(sizeof(struct sfifo_s));sfifo->buffer = (unsigned char *)malloc(sfifo_buffer_size);printf("sfifo_init: %x\n", sfifo->buffer);memset(sfifo->buffer, i, sfifo_buffer_size);sfifo->size = sfifo_buffer_size;sfifo->next = NULL;sfifo_put_free_buf(sfifo, sfifo_des_p);}return sfifo_des_p; }void *productor_thread_func(void *arg) {struct sfifo_s *sfifo;while (1) {sfifo = sfifo_get_free_buf(my_sfifo_des);if (sfifo != NULL) {printf("+++++++++++++++++ put : %x\n", sfifo->buffer[0]);sfifo_put_active_buf(sfifo, my_sfifo_des);}//usleep(20*1000);} }void *comsumer_thread_func(void *arg) {struct sfifo_s *sfifo;int count = 0;while (1) {sfifo = sfifo_get_active_buf(my_sfifo_des);if (sfifo != NULL) {printf("---------------- get %x\n", sfifo->buffer[0]);sfifo_put_free_buf(sfifo, my_sfifo_des);}//usleep(10 * 1000);// if (count++ > 10000) {// exit(-1);// }} }int main() {int ret;static pthread_t productor_thread;static pthread_t consumer_thread;struct sfifo_s *r_sfifo;my_sfifo_des = sfifo_init(10, 4096, 5);ret = pthread_create(&productor_thread, NULL, productor_thread_func, NULL);ret = pthread_create(&consumer_thread, NULL, comsumer_thread_func, NULL);while (1) {sleep(1);}return 0; }以上是一個(gè)簡(jiǎn)單的生產(chǎn)者消費(fèi)者隊(duì)列的c語(yǔ)言的實(shí)現(xiàn),對(duì)應(yīng)的頭文件在本文底部(貼代碼太長(zhǎng)看起來(lái)很崩潰)。
仔細(xì)的同學(xué)可能會(huì)發(fā)現(xiàn),以上代碼sfifo_get_free_buf()中默認(rèn)是loop輪詢檢測(cè)free buffer鏈表的,你前面不是說(shuō)了一大堆不能loop嗎?怎么還用loop呢?
這里其實(shí)有兩個(gè)原因:
- 生產(chǎn)者的loop是可以接受的,當(dāng)發(fā)生多余l(xiāng)oop,無(wú)法命中時(shí),說(shuō)明生產(chǎn)者太快,消費(fèi)者太慢,而其實(shí)對(duì)于一個(gè)生產(chǎn)者消費(fèi)者模型出現(xiàn)以上問(wèn)題時(shí),說(shuō)明整個(gè)業(yè)務(wù)流程要重新考慮,因?yàn)檎5那闆r是消費(fèi)者總是要快于生產(chǎn)者,這個(gè)業(yè)務(wù)模型才能正常的運(yùn)行下去。
- 對(duì)于有些業(yè)務(wù)模型,生產(chǎn)者業(yè)務(wù)模塊部分是不能阻塞的,也就是說(shuō),如果free list沒(méi)有數(shù)據(jù),我們采用pthread_cond_wait()阻塞后,會(huì)導(dǎo)致生產(chǎn)者出現(xiàn)問(wèn)題,這樣最好的處理方式就是生產(chǎn)者模塊接口返回出錯(cuò),生產(chǎn)者業(yè)務(wù)方丟棄數(shù)據(jù)(此時(shí)就是丟幀了,這種情況如果頻繁發(fā)生是不能接受了,不過(guò)也說(shuō)明了消費(fèi)者要有足夠的能力處理生產(chǎn)者生產(chǎn)出的數(shù)據(jù),否則整個(gè)業(yè)務(wù)都是有問(wèn)題)
這個(gè)實(shí)現(xiàn)有那些優(yōu)勢(shì):
走讀和運(yùn)行以上代碼的同學(xué)應(yīng)該可以發(fā)現(xiàn)這里做了一個(gè)簡(jiǎn)單可運(yùn)行的demo模擬了生產(chǎn)者和消費(fèi)者雙方:
void *productor_thread_func(void *arg) {struct sfifo_s *sfifo;while (1) {sfifo = sfifo_get_free_buf(my_sfifo_des);if (sfifo != NULL) {printf("+++++++++++++++++ put : %x\n", sfifo->buffer[0]);sfifo_put_active_buf(sfifo, my_sfifo_des);}//usleep(20*1000);} }void *comsumer_thread_func(void *arg) {struct sfifo_s *sfifo;int count = 0;while (1) {sfifo = sfifo_get_active_buf(my_sfifo_des);if (sfifo != NULL) {printf("---------------- get %x\n", sfifo->buffer[0]);sfifo_put_free_buf(sfifo, my_sfifo_des);}//usleep(10 * 1000);// if (count++ > 10000) {// exit(-1);// }} }這里面對(duì)于使用者的優(yōu)點(diǎn)有:
以上描述了一個(gè)生產(chǎn)者消費(fèi)者隊(duì)列c語(yǔ)言的實(shí)現(xiàn),為什么是c語(yǔ)言版本的?因?yàn)槠渌呒?jí)語(yǔ)言,有很多成熟的庫(kù)提供了該功能,完全不用自己寫(xiě),而c就沒(méi)這么完善了,不過(guò)這也說(shuō)明了c的簡(jiǎn)單靈活。但悲哀的是很多人因此進(jìn)行了很ugly的實(shí)現(xiàn)。
多吐槽幾句,嵌入式行業(yè)由于各種技術(shù)原因,導(dǎo)致開(kāi)發(fā)語(yǔ)言還是采用c,這樣對(duì)開(kāi)發(fā)人員有了不小的要求,而如何才能寫(xiě)一些優(yōu)雅的代碼,對(duì)人的素質(zhì)有了要求,但現(xiàn)狀是優(yōu)秀的開(kāi)發(fā)者都被互聯(lián)網(wǎng)行業(yè)搶走了,導(dǎo)致嵌入式行業(yè)開(kāi)發(fā)人員的水平參差不齊,本來(lái)應(yīng)該是一個(gè)對(duì)編碼能力要求很高的行業(yè)被一些水平低下的開(kāi)發(fā)者占據(jù)。so,我離開(kāi)了這個(gè)行業(yè)了。。。
附:模塊頭文件,類linux用戶可通過(guò)gcc xxx.c命令build該demo,然后運(yùn)行測(cè)試。
#ifndef SFIFO_H_ #define SFIFO_H_struct sfifo_list_des_s {int sfifo_num;struct sfifo_s *head;struct sfifo_s *tail;pthread_mutex_t lock_mutex;pthread_cond_t cond; };struct sfifo_des_s {int sfifo_init;unsigned int sfifos_num;unsigned int sfifos_active_max_num;struct sfifo_list_des_s free_list;struct sfifo_list_des_s active_list; };struct sfifo_s {unsigned char *buffer;unsigned int size;struct sfifo_s *next; };extern struct sfifo_des_s *sfifo_init(int sfifo_num, int sfifo_buffer_size, int sfifo_active_max_num);/* productor */ extern struct sfifo_s* sfifo_get_free_buf(struct sfifo_des_s *sfifo_des_p); extern int sfifo_put_free_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p);/* consumer */ extern struct sfifo_s* sfifo_get_active_buf(struct sfifo_des_s *sfifo_des_p); extern int sfifo_put_active_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p);#endif總結(jié)
以上是生活随笔為你收集整理的实现一个通用的生产者消费者队列(c语言版本)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: sql数据库挖坑
- 下一篇: 利用jquery修改elment的自定义