实现一个通用的生产者消费者队列(c语言版本)
音视频数据处理的特点:
生产者消费者队列在视频数据处理的必要性:视频数据的处理为什么需要生产者消费者队列?其实上面提到的音视频数据处理的特点就是答案。
常见的生产者消费者队列实现存在的问题:不注重效率性能:
pthread_mutex_lock();
state = check_some_state();
if (state == xxx) {
do_some_process();
} else {
usleep(x);
}
pthread_mutex_unlock();
以上代码loop一个状态,如果状态成立做有效的处理,不成立则睡眠一定时间,之后再次调用该段代码进行下一次的状态检测,而这个睡眠时间是一个随机经验值,很有可能下次仍然是无效的检测,接着睡眠再loop,多余的loop是一种资源的浪费。 数据的传递采用copy方式: 数据的传递,采用copy的方式,一帧数据在一个完整的处理流程中经过n次copy(笔者见过一个系统中一帧数据copy了8次之多) 生产者消费者队列和业务代码混杂在一起,没有分离: 对于开发者,都希望用最简单的接口完成某个功能,而不关心内部实现,在这里就是,只需要生产者生产数据,消费者消费数据,而内部的处理(同步,数据的处理等)完全不关心,这样开发者就不需要去弄很多锁,降低了开发难度,也可以使代码组件化,模块化。
让我悲哀的是,当我指出这些问题时,某些开发者完全无动于衷。而我更无力的是,现在的cpu,memory性能实在是高,在某些不太高端的嵌入式芯片上,优化过的数据并没有十分明显,在一个实际项目上,经过优化后cpu大概降低1%(原总系统cpu占用7%),有经验的开发者又会说原7%的cpu占用率说明这个芯片做这个系统浪费了,不过也没办法其实已经用了比较低端的芯片了。。。 以上的吐槽主要是想说明:由于cpu,memory性能的提升,让很多开发者感觉软件的优化意义不大了,而我是一个理想主义者,对于某些设计ugly的代码真的是零容忍啊。 如何优化:以上说了这么多,那如何操作呢?
一个简单的实现:#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
#include <time.h>
#include "sfifo.h"
//#define CONFIG_COND_FREE 1
#define CONFIG_COND_ACTIVE 1
#define MAX_SFIFO_NUM 32
struct sfifo_des_s sfifo_des[MAX_SFIFO_NUM];
struct sfifo_des_s *my_sfifo_des;
struct sfifo_s* sfifo_get_free_buf(struct sfifo_des_s *sfifo_des_p)
{
static long empty_count = 0;
struct sfifo_s *sfifo = NULL;
pthread_mutex_lock(&(sfifo_des_p->free_list.lock_mutex));
#ifdef CONFIG_COND_FREE
while (sfifo_des_p->free_list.head == NULL) {
pthread_cond_wait(&(sfifo_des_p->free_list.cond),&(sfifo_des_p->free_list.lock_mutex));
}
#else
if (sfifo_des_p->free_list.head == NULL) {
if (empty_count++ % 120 == 0) {
printf("free list emptyn");
}
goto EXIT;
}
#endif
sfifo = sfifo_des_p->free_list.head;
sfifo_des_p->free_list.head = sfifo->next;
EXIT:
pthread_mutex_unlock(&(sfifo_des_p->free_list.lock_mutex));
return sfifo;
}
int sfifo_put_free_buf(struct sfifo_s *sfifo,struct sfifo_des_s *sfifo_des_p)
{
int send_cond = 0;
pthread_mutex_lock(&(sfifo_des_p->free_list.lock_mutex));
if (sfifo_des_p->free_list.head == NULL) {
sfifo_des_p->free_list.head = sfifo;
sfifo_des_p->free_list.tail = sfifo;
sfifo_des_p->free_list.tail->next = NULL;
send_cond = 1;
} else {
sfifo_des_p->free_list.tail->next = sfifo;
sfifo_des_p->free_list.tail = sfifo;
sfifo_des_p->free_list.tail->next = NULL;
}
pthread_mutex_unlock(&(sfifo_des_p->free_list.lock_mutex));
#ifdef CONFIG_COND_FREE
if (send_cond) {
pthread_cond_signal(&(sfifo_des_p->free_list.cond));
}
#endif
return 0;
}
struct sfifo_s* sfifo_get_active_buf(struct sfifo_des_s *sfifo_des_p)
{
struct sfifo_s *sfifo = NULL;
pthread_mutex_lock(&(sfifo_des_p->active_list.lock_mutex));
#ifdef CONFIG_COND_ACTIVE
while (sfifo_des_p->active_list.head == NULL) {
//pthread_cond_timedwait(&(sfifo_des_p->active_list.cond),&(sfifo_des_p->active_list.lock_mutex),&outtime);
pthread_cond_wait(&(sfifo_des_p->active_list.cond),&(sfifo_des_p->active_list.lock_mutex));
}
#else
if (sfifo_des_p->active_list.head == NULL) {
printf("active list emptyn");
goto EXIT;
}
#endif
sfifo = sfifo_des_p->active_list.head;
sfifo_des_p->active_list.head = sfifo->next;
EXIT:
pthread_mutex_unlock(&(sfifo_des_p->active_list.lock_mutex));
return sfifo;
}
int sfifo_put_active_buf(struct sfifo_s *sfifo,struct sfifo_des_s *sfifo_des_p)
{
int send_cond = 0;
pthread_mutex_lock(&(sfifo_des_p->active_list.lock_mutex));
if (sfifo_des_p->active_list.head == NULL) {
sfifo_des_p->active_list.head = sfifo;
sfifo_des_p->active_list.tail = sfifo;
sfifo_des_p->active_list.tail->next = NULL;
send_cond = 1;
} else {
sfifo_des_p->active_list.tail->next = sfifo;
sfifo_des_p->active_list.tail = sfifo;
sfifo_des_p->active_list.tail->next = NULL;
}
pthread_mutex_unlock(&(sfifo_des_p->active_list.lock_mutex));
#ifdef CONFIG_COND_ACTIVE
if (send_cond) {
pthread_cond_signal(&(sfifo_des_p->active_list.cond));
}
#endif
return 0;
}
int dump_sfifo_list(struct sfifo_list_des_s *list)
{
struct sfifo_s *sfifo = NULL;
sfifo = list->head;
do {
printf("dump : %xn",sfifo->buffer[0]);
usleep(500 * 1000);
} while (sfifo->next != NULL && (sfifo = sfifo->next));
return 0;
}
struct sfifo_des_s *sfifo_init(int sfifo_num,int sfifo_buffer_size,int sfifo_active_max_num)
{
int i = 0;
struct sfifo_s *sfifo;
struct sfifo_des_s *sfifo_des_p;
sfifo_des_p = (struct sfifo_des_s *)malloc(sizeof(struct sfifo_des_s));
sfifo_des_p->sfifos_num = sfifo_num;
sfifo_des_p->sfifos_active_max_num = sfifo_active_max_num;
sfifo_des_p->free_list.sfifo_num = 0;
sfifo_des_p->free_list.head = NULL;
sfifo_des_p->free_list.tail = NULL;
pthread_mutex_init(&sfifo_des_p->free_list.lock_mutex,NULL);
pthread_cond_init(&sfifo_des_p->free_list.cond,NULL);
sfifo_des_p->active_list.sfifo_num = 0;
sfifo_des_p->active_list.head = NULL;
sfifo_des_p->active_list.tail = NULL;
pthread_mutex_init(&sfifo_des_p->active_list.lock_mutex,NULL);
pthread_cond_init(&sfifo_des_p->active_list.cond,NULL);
for (i = 0; i < sfifo_num; i++) {
sfifo = (struct sfifo_s *)malloc(sizeof(struct sfifo_s));
sfifo->buffer = (unsigned char *)malloc(sfifo_buffer_size);
printf("sfifo_init: %xn",sfifo->buffer);
memset(sfifo->buffer,i,sfifo_buffer_size);
sfifo->size = sfifo_buffer_size;
sfifo->next = NULL;
sfifo_put_free_buf(sfifo,sfifo_des_p);
}
return sfifo_des_p;
}
void *productor_thread_func(void *arg)
{
struct sfifo_s *sfifo;
while (1) {
sfifo = sfifo_get_free_buf(my_sfifo_des);
if (sfifo != NULL) {
printf("+++++++++++++++++ put : %xn",sfifo->buffer[0]);
sfifo_put_active_buf(sfifo,my_sfifo_des);
}
//usleep(20*1000);
}
}
void *comsumer_thread_func(void *arg)
{
struct sfifo_s *sfifo;
int count = 0;
while (1) {
sfifo = sfifo_get_active_buf(my_sfifo_des);
if (sfifo != NULL) {
printf("---------------- get %xn",sfifo->buffer[0]);
sfifo_put_free_buf(sfifo,my_sfifo_des);
}
//usleep(10 * 1000);
// if (count++ > 10000) {
// exit(-1);
// }
}
}
int main()
{
int ret;
static pthread_t productor_thread;
static pthread_t consumer_thread;
struct sfifo_s *r_sfifo;
my_sfifo_des = sfifo_init(10,4096,5);
ret = pthread_create(&productor_thread,NULL,productor_thread_func,NULL);
ret = pthread_create(&consumer_thread,comsumer_thread_func,NULL);
while (1) {
sleep(1);
}
return 0;
}
以上是一个简单的生产者消费者队列的c语言的实现,对应的头文件在本文底部(贴代码太长看起来很崩溃)。 仔细的同学可能会发现,以上代码sfifo_get_free_buf()中默认是loop轮询检测free buffer链表的,你前面不是说了一大堆不能loop吗?怎么还用loop呢?
这个实现有那些优势:走读和运行以上代码的同学应该可以发现这里做了一个简单可运行的demo模拟了生产者和消费者双方: void *productor_thread_func(void *arg)
{
struct sfifo_s *sfifo;
while (1) {
sfifo = sfifo_get_free_buf(my_sfifo_des);
if (sfifo != NULL) {
printf("+++++++++++++++++ put : %xn",my_sfifo_des);
}
//usleep(20*1000);
}
}
void *comsumer_thread_func(void *arg)
{
struct sfifo_s *sfifo;
int count = 0;
while (1) {
sfifo = sfifo_get_active_buf(my_sfifo_des);
if (sfifo != NULL) {
printf("---------------- get %xn",my_sfifo_des);
}
//usleep(10 * 1000);
// if (count++ > 10000) {
// exit(-1);
// }
}
}
这里面对于使用者的优点有:
附:模块头文件,类linux用户可通过gcc xxx.c命令build该demo,然后运行测试。 #ifndef SFIFO_H_
#define SFIFO_H_
struct sfifo_list_des_s {
int sfifo_num;
struct sfifo_s *head;
struct sfifo_s *tail;
pthread_mutex_t lock_mutex;
pthread_cond_t cond;
};
struct sfifo_des_s {
int sfifo_init;
unsigned int sfifos_num;
unsigned int sfifos_active_max_num;
struct sfifo_list_des_s free_list;
struct sfifo_list_des_s active_list;
};
struct sfifo_s {
unsigned char *buffer;
unsigned int size;
struct sfifo_s *next;
};
extern struct sfifo_des_s *sfifo_init(int sfifo_num,int sfifo_active_max_num);
/* productor */
extern struct sfifo_s* sfifo_get_free_buf(struct sfifo_des_s *sfifo_des_p);
extern int sfifo_put_free_buf(struct sfifo_s *sfifo,struct sfifo_des_s *sfifo_des_p);
/* consumer */
extern struct sfifo_s* sfifo_get_active_buf(struct sfifo_des_s *sfifo_des_p);
extern int sfifo_put_active_buf(struct sfifo_s *sfifo,struct sfifo_des_s *sfifo_des_p);
#endif
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |