C语言线程间通信
发布时间:2020-12-15 17:32:16 所属栏目:百科 来源:网络整理
导读:C11 标准为线程间通信提供了 条件变量(condition variable) 。线程可以使用条件变量,以等待来自另一个线程的通知,通知告知了指定的条件已被满足。例如,这类通知可能代表某些数据已经准备好进行处理。 条件变量由类型为 cnd_t 的对象表示,并配合互斥一起
C11 标准为线程间通信提供了条件变量(condition variable)。线程可以使用条件变量,以等待来自另一个线程的通知,通知告知了指定的条件已被满足。例如,这类通知可能代表某些数据已经准备好进行处理。 条件变量由类型为 cnd_t 的对象表示,并配合互斥一起使用。一般过程如下:线程获得互斥,然后测试条件。如果条件不满足,则线程继续等待条件变量(释放互斥),直到另一个线程再次唤醒它,然后该线程再次获得互斥,并再次测试条件,重复上述过程,直到条件满足。 头文件 threads.h 定义了使用条件变量的函数,它们如下所示: int cnd_init(cnd_t*cond);初始化 cond 引用的条件变量。 void cnd_destroy(cnd_t*cond);释放指定条件变量使用的所有资源。 int cnd_signal(cnd_t*cond);在等待指定条件变量的任意数量的线程中,唤醒其中一个线程。 int cnd_broadcast(cnd_t*cond);唤醒所有等待指定条件变量的线程。 int cnd_wait(cnd_t*cond,mtx_t*mtx);阻塞正在调用的线程,并释放指定的互斥。在调用 cnd_wait()之前,线程必须持有互斥。如果另一线程通过发送一个信号解除当前线程的阻塞(也就是说,通过指定同样的条件变量作为参数调用 cond_signal()或 cnd_broadcast()),那么调用 cnd_wait()的线程在 cnd_wait()返回之前会再次获得互斥。 int cnd_timedwait(cnd_t*restrict cond,mtx_t*restrict mtx,const struct timespec*restrict ts);与 cnd_wait()类似,cnd_timedwait()阻塞调用它们的线程,但仅维持由参数 ts 指定的时间。可以通过调用函数 timespec_get()获得一个 struct timespec 对象,它表示当前时间。 除 cnd_destroy()以外的所有条件变量函数,如果它们引发错误,则返回值 thrd_error,否则返回值 thrd_success。当时间达到限定值时,函数 cnd_timedwait()也会返回值 thrd_timedout。 例 1 与例 2 中的程序展示了在常见的“生产者-消费者”模型中使用条件变量。程序为每个生产者和消费者开启一个新线程。生产者将一个新产品(在我们的示例中,新产品为一个 int 变量)放入一个环形缓冲区中,假设这个缓冲区没有满,然后通知等待的消费者:产品已经准备好。每个消费者从该缓冲区中取出产品,然后将实际情况通知给正在等待的生产者。 在任一特定时间,只有一个线程可以修改环形缓冲器。因此,在函数 bufPut()和 bufGet()间将存在线程同步问题,函数 bufPut()将一个元素插入到缓冲区,函数 buf-Get()将一个元素从缓冲区移除。 有两个条件变量:生产者等待其中一个条件变量,以判断缓冲器是否满了;消费者等待另一个条件变量,以判断缓冲器是否空了。缓冲区的所有必需元素都包括在结构 Buffer 中。函数 bufInit()初始化具有指定大小的 Buffer 对象,而函数 bufDestroy()销毁 Buffer 对象。 【例1】用于“生产者-消费者”模型的环形缓冲区 /* buffer.h * 用于线程安全缓冲区的所有声明 */ #include <stdbool.h> #include <threads.h> typedef struct Buffer { int *data; // 指向数据数组的指针 size_t size,count; // 元素数量的最大值和当前值 size_t tip,tail; // tip = 下一个空点的索引 mtx_t mtx; // 一个互斥 cnd_t cndPut,cndGet; // 两个条件变量 } Buffer; bool bufInit( Buffer *bufPtr,size_t size ); void bufDestroy(Buffer *bufPtr); bool bufPut(Buffer *bufPtr,int data); bool bufGet(Buffer *bufPtr,int *dataPtr,int sec); /* ------------------------------------------------------------- * buffer.c * 定义用于处理Buffer的函数 */ #include "buffer.h" #include <stdlib.h> // 为了使用malloc()和free() bool bufInit( Buffer *bufPtr,size_t size) { if ((bufPtr->data = malloc( size * sizeof(int))) == NULL) return false; bufPtr->size = size; bufPtr->count = 0; bufPtr->tip = bufPtr->tail = 0; return mtx_init( &bufPtr->mtx,mtx_plain) == thrd_success && cnd_init( &bufPtr->cndPut) == thrd_success && cnd_init( &bufPtr->cndGet) == thrd_success; } void bufDestroy(Buffer *bufPtr) { cnd_destroy( &bufPtr->cndGet ); cnd_destroy( &bufPtr->cndPut ); mtx_destroy( &bufPtr->mtx ); free( bufPtr->data ); } // 在缓冲区中插入一个新元素 bool bufPut(Buffer *bufPtr,int data) { mtx_lock( &bufPtr->mtx ); while (bufPtr->count == bufPtr->size) if (cnd_wait( &bufPtr->cndPut,&bufPtr->mtx ) != thrd_success) return false; bufPtr->data[bufPtr->tip] = data; bufPtr->tip = (bufPtr->tip + 1) % bufPtr->size; ++bufPtr->count; mtx_unlock( &bufPtr->mtx ); cnd_signal( &bufPtr->cndGet ); return true; } // 从缓冲区中移除一个元素 // 如果缓冲区是空的,则等待不超过sec秒 bool bufGet(Buffer *bufPtr,int sec) { struct timespec ts; timespec_get( &ts,TIME_UTC ); // 当前时间 ts.tv_sec += sec; // + sec秒延时 mtx_lock( &bufPtr->mtx ); while ( bufPtr->count == 0 ) if (cnd_timedwait(&bufPtr->cndGet,&bufPtr->mtx,&ts) != thrd_success) return false; *dataPtr = bufPtr->data[bufPtr->tail]; bufPtr->tail = (bufPtr->tail + 1) % bufPtr->size; --bufPtr->count; mtx_unlock( &bufPtr->mtx ); cnd_signal( &bufPtr->cndPut ); return true; } 例 2 中的 main()函数创建了一个缓冲区,并启动了若干个生产者和消费者线程,给予每个线程一个识别号码和一个指向缓冲区的指针。每个生产者线程创建一定数量的“产品”,然后用一个 return 语句退出。一个消费者线程如果在给定延时期间无法获得产品以进行消费,则直接返回。 【例2】启动生产者和消费者线程 // producer_consumer.c #include "buffer.h" #include <stdio.h> #include <stdlib.h> #define NP 2 // 生产者的数量 #define NC 3 // 消费者的数量 int producer(void *); // 线程函数 int consumer(void *); struct Arg { int id; Buffer *bufPtr; }; // 线程函数的参数 _Noreturn void errorExit(const char* msg) { fprintf(stderr,"%sn",msg); exit(0xff); } int main(void) { printf("Producer-Consumer Demonn"); Buffer buf; // 为5个产品创建一个缓冲区 bufInit( &buf,5 ); thrd_t prod[NP],cons[NC]; // 线程 struct Arg prodArg[NP],consArg[NC]; // 线程的参数 int i = 0,res = 0; for ( i = 0; i < NP; ++i ) // 启动生产者 { prodArg[i].id = i+1,prodArg[i].bufPtr = &buf; if (thrd_create( &prod[i],producer,&prodArg[i] ) != thrd_success) errorExit("Thread error."); } for ( i = 0; i < NC; ++i ) // 启动消费者 { consArg[i].id = i+1,consArg[i].bufPtr = &buf; if ( thrd_create( &cons[i],consumer,&consArg[i] ) != thrd_success) errorExit("Thread error."); } for ( i = 0; i < NP; ++i ) // 等待线程结束 thrd_join(prod[i],&res),printf("nProducer %d ended with result %d.n",prodArg[i].id,res); for ( i = 0; i < NC; ++i ) thrd_join(cons[i],printf("Consumer %d ended with result %d.n",consArg[i].id,res); bufDestroy( &buf ); return 0; } int producer(void *arg) // 生产者线程函数 { struct Arg *argPtr = (struct Arg *)arg; int id = argPtr->id; Buffer *bufPtr = argPtr->bufPtr; int count = 0; for (int i = 0; i < 10; ++i) { int data = 10*id + i; if (bufPut( bufPtr,data )) printf("Producer %d produced %dn",id,data),++count; else { fprintf( stderr,"Producer %d: error storing %dn",data); return -id; } } return count; } int consumer(void *arg) // 消费者线程函数 { struct Arg *argPtr = (struct Arg *)arg; int id = argPtr->id; Buffer *bufPtr = argPtr->bufPtr; int count = 0; int data = 0; while (bufGet( bufPtr,&data,2 )) { ++count; printf("Consumer %d consumed %dn",data); } return count; } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
相关内容
- DOJO学习笔记(七)-日期控件DropdownDatePicker和DatePic
- 翻转句子的TDD讨论:)
- Oracle 12C ORA-01792: maximum number of columns in a ta
- ruby-on-rails – 在Rails引擎规范中使用正确的url_for方法
- ajax – 即使缓存响应,“Cache-Control:only-if-cached”在
- React做一个数秒表
- [oracle]Ubuntu下ORA-27102及ORA-00845错误处理
- c# – 由于Int32是一个值类型,为什么它继承.ToString()?
- 使用代理在vb.NET中调用Slack API
- sqlite3 中int字段的获取与转换