Linux生产者消费模型
- 互联网
- 2025-08-20 03:00:01

1.生产者消费者模型 1.1 为何要使用生产者消费者模型 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。 1.2 生产者消费者模型优点 解耦 支持并发 支持忙闲不均
2.基于BlockingQueue的生产者消费者模型 2.1 BlockingQueue 在多线程编程中阻塞队列 (Blocking Queue) 是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出( 以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。 2.2 C++ queue模拟阻塞队列的生产消费模型 #include <iostream> #include <queue> #include <stdlib.h> #include <pthread.h> #define NUM 8 class BlockQueue { private: std::queue<int> q; int cap; pthread_mutex_t lock; pthread_cond_t full; pthread_cond_t empty; private: void LockQueue() { pthread_mutex_lock(&lock); } void UnLockQueue() { pthread_mutex_unlock(&lock); } void ProductWait() { pthread_cond_wait(&full, &lock); } void ConsumeWait() { pthread_cond_wait(&empty, &lock); } void NotifyProduct() { pthread_cond_signal(&full); } void NotifyConsume() { pthread_cond_signal(&empty); } bool IsEmpty() { return (q.size() == 0 ? true : false); } bool IsFull() { return (q.size() == cap ? true : false); } public: BlockQueue(int _cap = NUM) :cap(_cap) { pthread_mutex_init(&lock, NULL); pthread_cond_init(&full, NULL); pthread_cond_init(&empty, NULL); } void PushData(const int& data) { LockQueue(); while (IsFull()) { NotifyConsume(); std::cout << "queue full, notify consume data, product stop." << std::endl; ProductWait(); } q.push(data); // NotifyConsume(); UnLockQueue(); } void PopData(int& data) { LockQueue(); while (IsEmpty()) { NotifyProduct(); std::cout << "queue empty, notify product data, consume stop." << std::endl; ConsumeWait(); } data = q.front(); q.pop(); // NotifyProduct(); UnLockQueue(); } ~BlockQueue() { pthread_mutex_destroy(&lock); pthread_cond_destroy(&full); pthread_cond_destroy(&empty); } }; pthread_t c, p; pthread_create(&c, NULL, consumer, (void*)&bq); pthread_create(&p, NULL, producter, (void*)&bq); pthread_join(c, NULL); pthread_join(p, NULL); return 0; } void* consumer(void* arg) { BlockQueue* bqp = (BlockQueue*)arg; int data; for (; ; ) { bqp->PopData(data); std::cout << "Consume data done : " << data << std::endl; } } //more faster void* producter(void* arg) { BlockQueue* bqp = (BlockQueue*)arg; srand((unsigned long)time(NULL)); for (; ; ) { int data = rand() % 1024; bqp->PushData(data); std::cout << "Prodoct data done: " << data << std::endl; // sleep(1); } } int main() { BlockQueue bq; pthread_t c, p; pthread_create(&c, NULL, consumer, (void*)&bq); pthread_create(&p, NULL, producter, (void*)&bq); pthread_join(c, NULL); pthread_join(p, NULL); return 0; } 3.POSIX信号量 POSIX 信号量和 SystemV 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但 POSIX 可以用于线程间同步。 3.1 初始化信号量 #include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value); 参数: pshared:0表示线程间共享,非零表示进程间共享 value:信号量初始值 3.2 销毁信号量 int sem_destroy(sem_t *sem); 3.3 等待信号量 功能:等待信号量,会将信号量的值减1 int sem_wait(sem_t *sem); //P() 3.4 发布信号量 功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。 int sem_post(sem_t *sem);//V() 4.基于环形队列的生产消费模型 环形队列采用数组模拟,用模运算来模拟环状特性 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态 但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程 #include <iostream> #include <vector> #include <stdlib.h> #include <semaphore.h> #include <pthread.h> #define NUM 16 class RingQueue { private: std::vector<int> q; int cap; sem_t data_sem; sem_t space_sem; int consume_step; int product_step; public: RingQueue(int _cap = NUM) :q(_cap), cap(_cap) { sem_init(&data_sem, 0, 0); sem_init(&space_sem, 0, cap); consume_step = 0; product_step = 0; } void PutData(const int& data) { sem_wait(&space_sem); // P q[consume_step] = data; consume_step++; consume_step %= cap; sem_post(&data_sem); //V } void GetData(int& data) { sem_wait(&data_sem); data = q[product_step]; product_step++; product_step %= cap; sem_post(&space_sem); } ~RingQueue() { sem_destroy(&data_sem); sem_destroy(&space_sem); } }; void* consumer(void* arg) { RingQueue* rqp = (RingQueue*)arg; int data; for (; ; ) { rqp->GetData(data); std::cout << "Consume data done : " << data << std::endl; sleep(1); } } //more faster void* producter(void* arg) { RingQueue* rqp = (RingQueue*)arg; srand((unsigned long)time(NULL)); for (; ; ) { int data = rand() % 1024; rqp->PutData(data); std::cout << "Prodoct data done: " << data << std::endl; // sleep(1); } } int main() { RingQueue rq; pthread_t c, p; pthread_create(&c, NULL, consumer, (void*)&rq); pthread_create(&p, NULL, producter, (void*)&rq); pthread_join(c, NULL); pthread_join(p, NULL); } 5.线程池 /*threadpool.h*/ /* 线程池: * 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着 监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利 用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 * 线程池的应用场景: * 1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技 术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个 Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。 * 2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。 * 3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情 况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限, 出现错误. * 线程池的种类: * 线程池示例: * 1. 创建固定数量线程池,循环从任务队列中获取任务对象, * 2. 获取到任务对象后,执行任务对象中的任务接口 */ /*threadpool.hpp*/ #ifndef __M_TP_H__ #define __M_TP_H__ #include <iostream> #include <queue> #include <pthread.h> #define MAX_THREAD 5 typedef bool (*handler_t)(int); class ThreadTask { private: int _data; handler_t _handler; public: ThreadTask() :_data(-1), _handler(NULL) {} ThreadTask(int data, handler_t handler) { _data = data; _handler = handler; } void SetTask(int data, handler_t handler) { _data = data; _handler = handler; } void Run() { _handler(_data); } }; class ThreadPool { private: int _thread_max; int _thread_cur; bool _tp_quit; std::queue<ThreadTask*> _task_queue; pthread_mutex_t _lock; pthread_cond_t _cond; private: void LockQueue() { pthread_mutex_lock(&_lock); } void UnLockQueue() { pthread_mutex_unlock(&_lock); } void WakeUpOne() { pthread_cond_signal(&_cond); } void WakeUpAll() { pthread_cond_broadcast(&_cond); } void ThreadQuit() { _thread_cur--; UnLockQueue(); pthread_exit(NULL); } void ThreadWait() { if (_tp_quit) { ThreadQuit(); } pthread_cond_wait(&_cond, &_lock); } bool IsEmpty() { return _task_queue.empty(); } static void* thr_start(void* arg) { ThreadPool* tp = (ThreadPool*)arg; while (1) { tp->LockQueue(); while (tp->IsEmpty()) { tp->ThreadWait(); } ThreadTask* tt; tp->PopTask(&tt); tp->UnLockQueue(); tt->Run(); delete tt; } return NULL; } public: ThreadPool(int max = MAX_THREAD) :_thread_max(max), _thread_cur(max), _tp_quit(false) { pthread_mutex_init(&_lock, NULL); pthread_cond_init(&_cond, NULL); } ~ThreadPool() { pthread_mutex_destroy(&_lock); pthread_cond_destroy(&_cond); } bool PoolInit() { pthread_t tid; for (int i = 0; i < _thread_max; i++) { int ret = pthread_create(&tid, NULL, thr_start, this); if (ret != 0) { std::cout << "create pool thread error\n"; return false; } } return true; } bool PushTask(ThreadTask* tt) { LockQueue(); if (_tp_quit) { UnLockQueue(); return false; } _task_queue.push(tt); WakeUpOne(); UnLockQueue(); return true; } bool PopTask(ThreadTask** tt) { *tt = _task_queue.front(); _task_queue.pop(); return true; } bool PoolQuit() { LockQueue(); _tp_quit = true; UnLockQueue(); while (_thread_cur > 0) { WakeUpAll(); usleep(1000); } return true; } }; #endif /*main.cpp*/ bool handler(int data) { srand(time(NULL)); int n = rand() % 5; printf("Thread: %p Run Tast: %d--sleep %d sec\n", pthread_self(), data, n); sleep(n); return true; } int main() { int i; ThreadPool pool; pool.PoolInit(); for (i = 0; i < 10; i++) { ThreadTask* tt = new ThreadTask(i, handler); pool.PushTask(tt); } pool.PoolQuit(); return 0; } g++ -std=c++0x test.cpp -o test -pthread -lrt 6.线程安全的单例模式 6.1 什么是单例模式 单例模式是一种 " 经典的 , 常用的 , 常考的 " 设计模式。 6.2 什么是设计模式 IT 行业这么火 , 涌入的人很多 . 俗话说林子大了啥鸟都有 . 大佬和菜鸡们两极分化的越来越严重 . 为了让菜鸡们不太拖大佬的后腿, 于是大佬们针对一些经典的常见的场景 , 给定了一些对应的解决方案 , 这个就是 设计模式。 6.3 单例模式的特点 某些类 , 只应该具有一个对象 ( 实例 ), 就称之为单例。例如一个男人只能有一个媳妇。在很多服务器开发场景中, 经常需要让服务器加载很多的数据 ( 上百 G) 到内存中。 此时往往要用一个单例的类来管理这些数据。 6.4 饿汉实现方式和懒汉实现方式 吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭. 吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式. 懒汉方式最核心的思想是 " 延时加载 ". 从而能够优化服务器的启动速度。 6.5 饿汉方式实现单例模式 template <typename T> class Singleton { static T data; public: static T* GetInstance() { return &data; } }; 只要通过 Singleton 这个包装类来使用 T 对象 , 则一个进程中只有一个 T 对象的实例。 6.6 懒汉方式实现单例模式 template <typename T> class Singleton { static T* inst; public: static T* GetInstance() { if (inst == NULL) { inst = new T(); } return inst; } }; 存在一个严重的问题 , 线程不安全。 第一次调用 GetInstance 的时候 , 如果两个线程同时调用 , 可能会创建出两份 T 对象的实例。 但是后续再次调用, 就没有问题了。 6.7 懒汉方式实现单例模式(线程安全版本) // 懒汉模式, 线程安全 template <typename T> class Singleton { volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化. static std::mutex lock; public: static T* GetInstance() { if (inst == NULL) { // 双重判定空指针, 降低锁冲突的概率, 提高性能. lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new. if (inst == NULL) { inst = new T(); } lock.unlock(); } return inst; } }; 注意事项: 加锁解锁的位置 双重 if 判定 , 避免不必要的锁竞争 volatile关键字防止过度优化 7. STL,智能指针和线程安全 7.1 STL中的容器是否是线程安全的? 不是 . 原因是, STL 的设计初衷是将性能挖掘到极致 , 而一旦涉及到加锁保证线程安全 , 会对性能造成巨大的影响 . 而且对于不同的容器, 加锁方式的不同 , 性能可能也不同 ( 例如 hash 表的锁表和锁桶 ). 因此 STL 默认不是线程安全 . 如果需要在多线程环境下使用 , 往往需要调用者自行保证线程安全 . 7.2 智能指针是否是线程安全的? 对于 unique_ptr, 由于只是在当前代码块范围内生效 , 因此不涉及线程安全问题 . 对于shared_ptr, 多个对象需要共用一个引用计数变量 , 所以会存在线程安全问题 . 但是标准库实现的时候考虑到了这 个问题 , 基于原子操作 (CAS) 的方式保证 shared_ptr 能够高效 , 原子的操作引用计数 . 7.3 其他常见的各种锁 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS 操作。 CAS 操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。 自旋锁,公平锁,非公平锁? 8. 读者写者问题 8.1 读写锁 在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。 注意:写独占,读共享,读锁优先级高 8.2 读写锁接口 8.2.1 设置读写优先 int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref); /* pref 共有 3 种选择 PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况 PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和 PTHREAD_RWLOCK_PREFER_READER_NP 一致 PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁 */ 8.2.2 初始化 int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t *restrict attr); 8.2.3 销毁 int pthread_rwlock_destroy(pthread_rwlock_t *rwlock); 8.2.4 加锁和解锁 int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); int pthread_rwlock_unlock(pthread_rwlock_t *rwlock); 8.2.5 读写锁案例 #include <vector> #include <sstream> #include <cstdio> #include <cstdlib> #include <cstring> #include <unistd.h> #include <pthread.h> volatile int ticket = 1000; pthread_rwlock_t rwlock; void* reader(void* arg) { char* id = (char*)arg; while (1) { pthread_rwlock_rdlock(&rwlock); if (ticket <= 0) { pthread_rwlock_unlock(&rwlock); break; } printf("%s: %d\n", id, ticket); pthread_rwlock_unlock(&rwlock); usleep(1); } return nullptr; } void* writer(void* arg) { char* id = (char*)arg; while (1) { pthread_rwlock_wrlock(&rwlock); if (ticket <= 0) { pthread_rwlock_unlock(&rwlock); break; } printf("%s: %d\n", id, --ticket); pthread_rwlock_unlock(&rwlock); usleep(1); } return nullptr; } struct ThreadAttr { pthread_t tid; std::string id; }; std::string create_reader_id(std::size_t i) { // 利用 ostringstream 进行 string 拼接 std::ostringstream oss("thread reader ", std::ios_base::ate); oss << i; return oss.str(); } std::string create_writer_id(std::size_t i) { // 利用 ostringstream 进行 string 拼接 std::ostringstream oss("thread writer ", std::ios_base::ate); oss << i; return oss.str(); } void init_readers(std::vector<ThreadAttr>& vec) { for (std::size_t i = 0; i < vec.size(); ++i) { vec[i].id = create_reader_id(i); pthread_create(&vec[i].tid, nullptr, reader, (void*)vec[i].id.c_str()); } } void init_writers(std::vector<ThreadAttr>& vec) { for (std::size_t i = 0; i < vec.size(); ++i) { vec[i].id = create_writer_id(i); pthread_create(&vec[i].tid, nullptr, writer, (void*)vec[i].id.c_str()); } } void join_threads(std::vector<ThreadAttr> const& vec) { // 我们按创建的 逆序 来进行线程的回收 for (std::vector<ThreadAttr>::const_reverse_iterator it = vec.rbegin(); it != vec.rend(); ++it) { pthread_t const& tid = it->tid; pthread_join(tid, nullptr); } } void init_rwlock() { #if 0 // 写优先 pthread_rwlockattr_t attr; pthread_rwlockattr_init(&attr); pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); pthread_rwlock_init(&rwlock, &attr); pthread_rwlockattr_destroy(&attr); #else // 读优先,会造成写饥饿 pthread_rwlock_init(&rwlock, nullptr); #endif } int main() { // 测试效果不明显的情况下,可以加大 reader_nr // 但也不能太大,超过一定阈值后系统就调度不了主线程了 const std::size_t reader_nr = 1000; const std::size_t writer_nr = 2; std::vector<ThreadAttr> readers(reader_nr); std::vector<ThreadAttr> writers(writer_nr); init_rwlock(); init_readers(readers); init_writers(writers); join_threads(writers); join_threads(readers); pthread_rwlock_destroy(&rwlock); } main: main.cpp g++ -std=c++11 -Wall -Werror $^ -o $@ -lpthread
Linux生产者消费模型由讯客互联互联网栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Linux生产者消费模型”
下一篇
2023美赛F题:绿色经济