欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 游戏 > 线程互斥与同步

线程互斥与同步

2025/7/5 19:41:46 来源:https://blog.csdn.net/2301_77525727/article/details/144276857  浏览:    关键词:线程互斥与同步

目录

  • 一、Linux线程互斥
    • 1、进程线程间的互斥相关背景概念
    • 2、互斥量mutex
    • 3、互斥量的接口
      • (1)初始化互斥量
        • (i)方法一:动态分配
        • (ii)静态分配
      • (2)销毁互斥量
      • (3)互斥量加锁
      • (4)互斥量解锁
      • (5)使用示例
    • 4、互斥量实现原理探究
  • 二、Linux线程同步
    • 1、同步概念与竞态条件
    • 2、条件变量
    • 3、条件变量函数
      • (1)初始化条件变量
        • (i)方法一:动态分配
        • (i)方法二:静态分配
      • (2)销毁条件变量
      • (3)等待条件变量满足
      • (4)唤醒等待
      • (5)使用示例
    • 4、生产者消费者模型
      • (1)生产者消费者模型的概念
      • (2)为何要使用生产者消费者模型
      • (3)生产者消费者模型的特点
      • (4)生产者消费者模型优点
    • 5、基于BlockingQueue的生产者消费者模型
      • (1)BlockingQueue
      • (2)C++queue模拟阻塞队列的生产消费模型
    • 6、为什么 pthread_cond_wait 需要互斥量?
      • (1)方案一:等待与锁分开写(错误)
      • (2)方案二:等待与锁合并(正确)
    • 7、条件变量使用规范
    • 8、条件变量的封装
    • 9、POSIX信号量
      • (1)信号量函数
        • (i)初始化信号量
        • (ii)销毁信号量
        • (iii)等待信号量(申请信号量)
        • (iv)发布信号量(释放信号量)
      • (2)基于环形队列的生产消费模型

一、Linux线程互斥

1、进程线程间的互斥相关背景概念

  • 临界资源: 多线程执行流共享的资源叫做临界资源。
  • 临界区: 每个线程内部,访问临界资源的代码,就叫做临界区。
  • 互斥: 任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用。
  • 原子性: 不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。

临界资源 && 临界区
进程之间如果要进行通信我们需要先创建第三方资源,让不同的进程看到同一份资源,由于这份第三方资源可以由操作系统中的不同模块提供,于是进程间通信的方式有很多种。进程间通信中的第三方资源就叫做临界资源,访问第三方资源的代码就叫做临界区。

而多线程的大部分资源都是共享的,线程之间进行通信不需要费那么大的劲去创建第三方资源。

例如,我们只需要在全局区定义一个count变量,让新线程每隔一秒对该变量加一操作,让主线程每隔一秒获取count变量的值进行打印。

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>int count = 0;
void* Routine(void* arg)
{while (1){count++;sleep(1);}pthread_exit((void*)0);
}
int main()
{pthread_t tid;pthread_create(&tid, NULL, Routine, NULL);while (1){printf("count: %d\n", count);sleep(1);}pthread_join(tid, NULL);return 0;
}

运行结果如下:
在这里插入图片描述
此时我们相当于实现了主线程和新线程之间的通信,其中全局变量count就叫做临界资源,因为它被多个执行流共享,而主线程中的printf和新线程中count++就叫做临界区,因为这些代码对临界资源进行了访问。

互斥 && 原子性
在多线程情况下,如果这多个执行流都自顾自的对临界资源进行操作,那么此时就可能导致数据不一致的问题。解决该问题的方案就叫做互斥,互斥的作用就是,保证在任何时候有且只有一个执行流进入临界区对临界资源进行访问。

原子性指的是不可被分割的操作,该操作不会被任何调度机制打断,该操作只有两态,要么完成,要么未完成。

例如, 下面我们模拟实现一个抢票系统,我们将记录票的剩余张数的变量定义为全局变量,主线程创建四个新线程,让这四个新线程进行抢票,当票被抢完后这四个线程自动退出。

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>int tickets = 1000;
void* TicketGrabbing(void* arg)
{const char* name = (char*)arg;while (1){if (tickets > 0){usleep(10000);printf("[%s] get a ticket, left: %d\n", name, --tickets);}else{break;}}printf("%s quit!\n", name);pthread_exit((void*)0);
}
int main()
{pthread_t t1, t2, t3, t4;pthread_create(&t1, NULL, TicketGrabbing, "thread 1");pthread_create(&t2, NULL, TicketGrabbing, "thread 2");pthread_create(&t3, NULL, TicketGrabbing, "thread 3");pthread_create(&t4, NULL, TicketGrabbing, "thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);return 0;
}

运行结果显然不符合我们的预期,因为其中出现了剩余票数为负数的情况。
在这里插入图片描述
该代码中记录剩余票数的变量tickets就是临界资源,因为它被多个执行流同时访问,而判断tickets是否大于0、打印剩余票数以及- -tickets这些代码就是临界区,因为这些代码对临界资源进行了访问。

剩余票数出现负数的原因:

  • if语句判断条件为真以后,代码可以并发的切换到其他线程。
  • usleep用于模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段。
  • – -ticket操作本身就不是一个原子操作。

为什么- -ticket不是原子操作?
我们对一个变量进行- -,我们实际需要进行以下三个步骤:

  • load:将共享变量tickets从内存加载到寄存器中。
  • update:更新寄存器里面的值,执行-1操作。
  • store:将新值从寄存器写回共享变量tickets的内存地址。
    在这里插入图片描述- -操作对应的汇编代码如下:
    在这里插入图片描述
    既然- -操作需要三个步骤才能完成,那么就有可能当thread1刚把tickets的值读进CPU就被切走了,也就是从CPU上剥离下来,假设此时thread1读取到的值就是1000,而当thread1被切走时,寄存器中的1000叫做thread1的上下文信息,因此需要被保存起来,之后thread1就被挂起了。
    在这里插入图片描述
    假设此时thread2被调度了,由于thread1只进行了- -操作的第一步,因此thread2此时看到tickets的值还是1000,而系统给thread2的时间片可能较多,导致thread2一次性执行了100次- -才被切走,最终tickets由1000减到了900。
    在这里插入图片描述
    此时系统再把thread1恢复上来,恢复的本质就是继续执行thread1的代码,并且要将thread1曾经的硬件上下文信息恢复出来,此时寄存器当中的值是恢复出来的1000,然后thread1继续执行- -操作的第二步和第三步,最终将999写回内存。
    在这里插入图片描述
    在上述过程中,thread1抢了1张票,thread2抢了100张票,而此时剩余的票数却是999,也就相当于多出了100张票。
    因此对一个变量进行- -操作并不是原子的,虽然- -tickets看起来就是一行代码,但这行代码被编译器编译后本质上是三行汇编,相反,对一个变量进行++也需要对应的三个步骤,即++操作也不是原子操作。

所以从- -ticket不是原子操作 回答为什么票数会是负的

假设此时只有1张票了。当第 1 个线程 if 判断成功后,执行到 usleep,陷入内核休眠并执行第 2 个线程,以此类推。后来四个线程都通过了if判断,进入了if语句后,线程1先醒来把票减到0,并打印,此时后续进程再醒来,因为已经进入了if语句,所以不用判断是否大于0,就可以- - 打印,所以会一直减到。

要解决这种问题,所以就引出了线程互斥。

2、互斥量mutex

  • 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况变量归属单个线程,其他线程无法获得这种变量。
  • 但有时候,很多变量都需要在线程间共享,这样的变量成为共享变量,可以通过数据的共享,完成线程之间的交互。
  • 多个线程并发的操作共享变量,就会带来一些问题。

要解决上述抢票系统的问题,需要做到三点:

  • 代码必须有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
  • 如果多个线程同时要求执行临界区的代码,并且此时临界区没有线程在执行,那么只能允许一个线程进入该临界区。
  • 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。

要做到这三点,本质上就是需要一把锁,Linux上提供的这把锁叫互斥量。
在这里插入图片描述

3、互斥量的接口

(1)初始化互斥量

(i)方法一:动态分配

初始化互斥量的函数叫做pthread_mutex_init,该函数的函数原型如下:

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);

参数说明:

  • mutex:需要初始化的互斥量。
  • attr:初始化互斥量的属性,一般设置为NULL即可。

返回值说明:

  • 互斥量初始化成功返回0,失败返回错误码。
(ii)静态分配
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

(2)销毁互斥量

销毁互斥量的函数叫做pthread_mutex_destroy,该函数的函数原型如下:

int pthread_mutex_destroy(pthread_mutex_t *mutex);

参数说明:

  • mutex:需要销毁的互斥量。

返回值说明:

  • 互斥量销毁成功返回0,失败返回错误码。

销毁互斥量需要注意:

  • 使用PTHREAD_MUTEX_INITIALIZER初始化的互斥量不需要销毁。
  • 不要销毁一个已经加锁的互斥量。
  • 已经销毁的互斥量,要确保后面不会有线程再尝试加锁。

(3)互斥量加锁

互斥量加锁的函数叫做pthread_mutex_lock,该函数的函数原型如下:

int pthread_mutex_lock(pthread_mutex_t *mutex);

参数说明:

  • mutex:需要加锁的互斥量。

返回值说明:

  • 互斥量加锁成功返回0,失败返回错误码。

调用pthread_mutex_lock时,可能会遇到以下情况:

  • 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功。
  • 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_mutex_lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。

(4)互斥量解锁

互斥量解锁的函数叫做pthread_mutex_unlock,该函数的函数原型如下:

int pthread_mutex_unlock(pthread_mutex_t *mutex);

参数说明:

  • mutex:需要解锁的互斥量。

返回值说明:

  • 互斥量解锁成功返回0,失败返回错误码。

(5)使用示例

例如,我们在上述的抢票系统中引入互斥量,每一个线程要进入临界区之前都必须先申请锁,只有申请到锁的线程才可以进入临界区对临界资源进行访问,并且当线程出临界区的时候需要释放锁,这样才能让其余要进入临界区的线程继续竞争锁。

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>int tickets = 1000;
pthread_mutex_t mutex;
void* TicketGrabbing(void* arg)
{const char* name = (char*)arg;while (1){pthread_mutex_lock(&mutex);if (tickets > 0){usleep(100);printf("[%s] get a ticket, left: %d\n", name, --tickets);pthread_mutex_unlock(&mutex);}else{pthread_mutex_unlock(&mutex);break;}}printf("%s quit!\n", name);pthread_exit((void*)0);
}
int main()
{pthread_mutex_init(&mutex, NULL);pthread_t t1, t2, t3, t4;pthread_create(&t1, NULL, TicketGrabbing, "thread 1");pthread_create(&t2, NULL, TicketGrabbing, "thread 2");pthread_create(&t3, NULL, TicketGrabbing, "thread 3");pthread_create(&t4, NULL, TicketGrabbing, "thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);pthread_mutex_destroy(&mutex);return 0;
}

在这里插入图片描述
注意:

在大部分情况下,加锁本身都是有损于性能的事,它让多执行流由并行执行变为了串行执行,这几乎是不可避免的。
我们应该在合适的位置进行加锁和解锁,这样能尽可能减少加锁带来的性能开销成本。
进行临界资源的保护,是所有执行流都应该遵守的标准,这时程序员在编码时需要注意的。

4、互斥量实现原理探究

加锁后的原子性体现在哪里?
引入互斥量后,当一个线程申请到锁进入临界区时,在其他线程看来该线程只有两种状态,要么没有申请锁,要么锁已经释放了,因为只有这两种状态对其他线程才是有意义的。

例如,图中线程1进入临界区后,在线程2、3、4看来,线程1要么没有申请锁,要么线程1已经将锁释放了,因为只有这两种状态对线程2、3、4才是有意义的,当线程2、3、4检测到其他状态时也就被阻塞了。

在这里插入图片描述
此时对于线程2、3、4而言,它们就认为线程1的整个操作过程是原子的。

临界区内的线程可能进行线程切换吗?

临界区内的线程完全可能进行线程切换,但即便该线程被切走,其他线程也无法进入临界区进行资源访问,因为此时该线程是拿着锁被切走的,锁没有被释放也就意味着其他线程无法申请到锁,也就无法进入临界区进行资源访问了,所以是串行的,所以效率低。
其他想进入该临界区进行资源访问的线程,必须等该线程执行完临界区的代码并释放锁之后,才能申请锁,申请到锁之后才能进入临界区。

锁是否需要被保护?

我们说被多个执行流共享的资源叫做临界资源,访问临界资源的代码叫做临界区。所有的线程在进入临界区之前都必须竞争式的申请锁,因此锁也是被多个执行流共享的资源,也就是说锁本身就是临界资源。
既然锁是临界资源,那么锁就必须被保护起来,但锁本身就是用来保护临界资源的,那锁又由谁来保护的呢?
锁实际上是自己保护自己的,我们只需要保证申请锁的过程是原子的,那么锁就是安全的。

如何保证申请锁的过程是原子的?

上面我们已经说明了–和++操作不是原子操作,可能会导致数据不一致问题。
为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用就是把寄存器和内存单元的数据相交换。
由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时,另一个处理器的交换指令只能等待总线周期。

经过上面的例子,我们已经能够意识到单纯的 i++ 或者 ++i 都不是原子的,因为这有可能会出现数据不一致问题。为了实现互斥锁操作,大多数体系结构都提供了 swap 或 exchange 指令,该指令的作用是把 CPU 寄存器区和内存单元的数据相交换,由于只有一条指令,就保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时,另一个处理器的交换指令只能等待总线周期。也就是说锁的原子性实现是由寄存器级别的,如下是 lock 和 unlock 的伪代码以及理解:
我们来看看lock和unlock的伪代码:

我们可以认为mutex的初始值为1,al是计算机中的一个寄存器,当线程申请锁时,需要执行以下步骤:

  1. 先将al寄存器中的值清0。该动作可以被多个线程同时执行,因为每个线程都有自己的一组寄存器(上下文信息),执行该动作本质上是将自己的al寄存器清0。
  2. 然后交换al寄存器和mutex中的值。xchgb是体系结构提供的交换指令,该指令可以完成寄存器和内存单元之间数据的交换。
  3. 最后判断al寄存器中的值是否大于0。若大于0则申请锁成功,此时就可以进入临界区访问对应的临界资源;否则申请锁失败需要被挂起等待,直到锁被释放后再次竞争申请锁。

例如, 此时内存中mutex的值为1,线程申请锁时先将al寄存器中的值清0,然后将al寄存器中的值与内存中mutex的值进行交换。
在这里插入图片描述
交换完成后检测该线程的al寄存器中的值为1,则该线程申请锁成功,可以进入临界区对临界资源进行访问。

而此后的线程若是再申请锁,与内存中的mutex交换得到的值就是0了,此时该线程申请锁失败,需要被挂起等待,直到锁被释放后再次竞争申请锁。
在这里插入图片描述
当线程释放锁时,需要执行以下步骤:

  1. 将内存中的mutex置回1。使得下一个申请锁的线程在执行交换指令后能够得到1,形象地说就是“将锁的钥匙放回去”。
  2. 唤醒等待Mutex的线程。唤醒这些因为申请锁失败而被挂起的线程,让它们继续竞争申请锁。

注意:

  1. 在申请锁时本质上就是哪一个线程先执行了交换指令,那么该线程就申请锁成功,因为此时该线程的al寄存器中的值就是1了。而交换指令就只是一条汇编指令,一个线程要么执行了交换指令,要么没有执行交换指令,所以申请锁的过程是原子的。
  2. 在线程释放锁时没有将当前线程al寄存器中的值清0,这不会造成影响,因为每次线程在申请锁时都会先将自己al寄存器中的值清0,再执行交换指令。
  3. CPU内的寄存器不是被所有的线程共享的,每个线程都有自己的一组寄存器,但内存中的数据是各个线程共享的。申请锁实际就是,把内存中的mutex通过交换指令,原子性的交换到自己的al寄存器中。

二、Linux线程同步

1、同步概念与竞态条件

同步: 在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,这就叫做同步。
竞态条件: 因为时序问题,而导致程序异常,我们称之为竞态条件。

  • 首先需要明确的是,单纯的加锁是会存在某些问题的,如果个别线程的竞争力特别强,每次都能够申请到锁,但申请到锁之后什么也不做,所以在我们看来这个线程就一直在申请锁和释放锁,这就可能导致其他线程长时间竞争不到锁,引起饥饿问题。
  • 单纯的加锁是没有错的,它能够保证在同一时间只有一个线程进入临界区,但它没有高效的让每一个线程使用这份临界资源。
  • 现在我们增加一个规则,当一个线程释放锁后,这个线程不能立马再次申请锁,该线程必须排到这个锁的资源等待队列的最后
  • 增加这个规则之后,下一个获取到锁的资源的线程就一定是在资源等待队列首部的线程,如果有十个线程,此时我们就能够让这十个线程按照某种次序进行临界资源的访问。

例如,现在有两个线程访问一块临界区,一个线程往临界区写入数据,另一个线程从临界区读取数据,但负责数据写入的线程的竞争力特别强,该线程每次都能竞争到锁,那么此时该线程就一直在执行写入操作,直到临界区被写满,此后该线程就一直在进行申请锁和释放锁。而负责数据读取的线程由于竞争力太弱,每次都申请不到锁,因此无法进行数据的读取,引入同步后该问题就能很好的解决。

2、条件变量

条件变量是利用线程间共享的全局变量进行同步的一种机制,条件变量是用来描述某种资源是否就绪的一种数据化描述。

条件变量主要包括两个动作:

  • 一个线程等待条件变量的条件成立而被挂起。
  • 另一个线程使条件成立后唤醒等待的线程。

条件变量通常需要配合互斥锁一起使用。

3、条件变量函数

(1)初始化条件变量

(i)方法一:动态分配

初始化条件变量的函数叫做pthread_cond_init,该函数的函数原型如下:

int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

参数说明:

  • cond:需要初始化的条件变量。
  • attr:初始化条件变量的属性,一般设置为NULL即可。

返回值说明:

  • 条件变量初始化成功返回0,失败返回错误码。

调用pthread_cond_init函数初始化条件变量叫做动态分配,除此之外,我们还可以用下面这种方式初始化条件变量,该方式叫做静态分配。

(i)方法二:静态分配
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

(2)销毁条件变量

销毁条件变量的函数叫做pthread_cond_destroy,该函数的函数原型如下:

int pthread_cond_destroy(pthread_cond_t *cond);

参数说明:

  • cond:需要销毁的条件变量。

返回值说明:

  • 条件变量销毁成功返回0,失败返回错误码。

销毁条件变量需要注意:

  • 使用PTHREAD_COND_INITIALIZER初始化的条件变量不需要销毁。

(3)等待条件变量满足

等待条件变量满足的函数叫做pthread_cond_wait,该函数的函数原型如下:

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

参数说明:

  • cond:需要等待的条件变量。
  • mutex:当前线程所处临界区对应的互斥锁。

返回值说明:

  • 函数调用成功返回0,失败返回错误码。

(4)唤醒等待

唤醒等待的函数有以下两个:

int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

区别:

pthread_cond_signal函数用于唤醒等待队列中首个线程。
pthread_cond_broadcast函数用于唤醒等待队列中的全部线程。

参数说明:

  • cond:唤醒在cond条件变量下等待的线程。

返回值说明:

  • 函数调用成功返回0,失败返回错误码。

(5)使用示例

例如,下面我们用主线程创建三个新线程,让主线程控制这三个新线程活动。这三个新线程创建后都在条件变量下进行等待,直到主线程醒来时才发出信号唤醒一个等待线程,如此进行下去。
testCond.cc:

#include <iostream>
#include <cstdio>
#include <string>
#include <pthread.h>
#include <unistd.h>pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;void *active(void *arg)
{std::string name = static_cast<const char *>(arg);while (true){pthread_mutex_lock(&mutex);// 没有对于资源是否就绪的判定pthread_cond_wait(&cond, &mutex); // mutex??printf("%s is active!\n", name.c_str());pthread_mutex_unlock(&mutex);}
}int main()
{pthread_t tid1, tid2, tid3;pthread_create(&tid1, nullptr, active, (void *)"thread-1");pthread_create(&tid2, nullptr, active, (void *)"thread-2");pthread_create(&tid3, nullptr, active, (void *)"thread-3");sleep(1);printf("Main thread ctrl begin...\n");while (true){printf("main wakeup thread...\n");pthread_cond_signal(&cond);//pthread_cond_broadcast(&cond);sleep(1);}pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);pthread_join(tid3, nullptr);
}

Makfile:

bin=testCond
cc=g++
src=$(wildcard *.cc)
obj=$(src:.cc=.o)$(bin):$(obj)$(cc) -o $@ $^ -lpthread
%.o:%.cc$(cc) -c $< -std=c++17.PHONY:clean
clean:rm -f $(bin) $(obj).PHONY:test
test:echo $(src)echo $(obj)

运行结果:
在这里插入图片描述
如果我们想每次唤醒都将在该条件变量下等待的所有线程进行唤醒,可以将代码中的pthread_cond_signal函数改为pthread_cond_broadcast函数。

此时我们每一次唤醒都会将所有在该条件变量下等待的线程进行唤醒,也就是每次都将这三个线程唤醒。
在这里插入图片描述
小贴士: C++源文件的后缀可以是.cpp、.cc、.cxx。

4、生产者消费者模型

在这里插入图片描述

(1)生产者消费者模型的概念

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

(2)为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解的。

(3)生产者消费者模型的特点

生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:

  • 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
  • 两种角色: 生产者和消费者。(通常由进程或线程承担)
  • 一个交易场所: 通常指的是内存中的一段缓冲区。(可以自己通过某种方式组织起来)

我们用代码编写生产者消费者模型的时候,本质就是对这三个特点进行维护。

生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?

介于生产者和消费者之间的容器可能会被多个执行流同时访问,因此我们需要将该临界资源用互斥锁保护起来。
其中,所有的生产者和消费者都会竞争式的申请锁,因此生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。

生产者和消费者之间为什么会存在同步关系?

如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者再生产数据就会生产失败。
反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。
虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。

注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来。

(4)生产者消费者模型优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

如果我们在主函数中调用某一函数,那么我们必须等该函数体执行完后才继续执行主函数的后续代码,因此函数调用本质上是一种紧耦合。

对应到生产者消费者模型中,函数传参实际上就是生产者生产的过程,而执行函数体实际上就是消费者消费的过程,但生产者只负责生产数据,消费者只负责消费数据,在消费者消费期间生产者可以同时进行生产,因此生产者消费者模型本质是一种松耦合。

在这里插入图片描述

5、基于BlockingQueue的生产者消费者模型

(1)BlockingQueue

在多线程编程中阻塞队列(BlockingQueue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

在这里插入图片描述
知识联系: 看到以上阻塞队列的描述,我们很容易想到的就是管道,而阻塞队列最典型的应用场景实际上就是管道的实现。

(2)C++queue模拟阻塞队列的生产消费模型

BlockQueue.hpp:

#pragma once#include <iostream>
#include <queue>
#include <pthread.h>
#include "Mutex.hpp"
//#include "Cond.hpp"
namespace BlockQueueModule
{using namespace LockModule;//using namespace CondModule;// vesion 1static const int gcap = 10;template <typename T>class BlockQueue{private:bool IsFull() { return _q.size() == _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap = gcap) : _cap(cap), _cwait_num(0), _pwait_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_productor_cond, nullptr);pthread_cond_init(&_consumer_cond, nullptr);}void Equeue(const T &in) // 生产者{pthread_mutex_lock(&_mutex);// 你想放数据,就能放数据吗??生产数据是有条件的!// 结论1: 在临界区中等待是必然的(目前)while (IsFull()) // 5. 对条件进行判断,为了防止伪唤醒,我们通常使用while进行判断!{std::cout << "生产者进入等待..." << std::endl;// 2. 等是,释放_mutex_pwait_num++;pthread_cond_wait(&_productor_cond, &_mutex); // wait的时候,必定是持有锁的!!是有问题的!_pwait_num--;// 3. 返回,线程被唤醒&&重新申请并持有锁(它会在临界区内醒来!)std::cout << "生产者被唤醒..." << std::endl;}// 4. if(IsFull())不满足 || 线程被唤醒_q.push(in); // 生产// 肯定有数据!if(_cwait_num){std::cout << "叫醒消费者" << std::endl;pthread_cond_signal(&_consumer_cond);}pthread_mutex_unlock(&_mutex);}void Pop(T *out) // 消费者{pthread_mutex_lock(&_mutex);while(IsEmpty()){std::cout << "消费者进入等待..." << std::endl;_cwait_num++;pthread_cond_wait(&_consumer_cond, &_mutex); // 伪唤醒_cwait_num--;std::cout << "消费者被唤醒..." << std::endl;}// 4. if(IsEmpty())不满足 || 线程被唤醒*out = _q.front();_q.pop();// 肯定有空间if(_pwait_num){std::cout << "叫醒生产者" << std::endl;pthread_cond_signal(&_productor_cond);}pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_productor_cond);pthread_cond_destroy(&_consumer_cond);}private:std::queue<T> _q;               // 保存数据的容器,临界资源int _cap;                       // bq最大容量pthread_mutex_t _mutex;         // 互斥pthread_cond_t _productor_cond; // 生产者条件变量pthread_cond_t _consumer_cond;  // 消费者条件变量int _cwait_num;int _pwait_num;};
}

Mutex.hpp:

#pragma once
#include <iostream>
#include <pthread.h>namespace LockModule
{class Mutex{public:Mutex(const Mutex&) = delete;const Mutex& operator = (const Mutex&) = delete;Mutex(){int n = ::pthread_mutex_init(&_lock, nullptr);(void)n;}~Mutex(){int n = ::pthread_mutex_destroy(&_lock);(void)n;}void Lock(){int n = ::pthread_mutex_lock(&_lock);(void)n;}pthread_mutex_t *LockPtr(){return &_lock;}void Unlock(){int n = ::pthread_mutex_unlock(&_lock);(void)n;}private:pthread_mutex_t _lock;};class LockGuard{public:LockGuard(Mutex &mtx):_mtx(mtx){_mtx.Lock();}~LockGuard(){_mtx.Unlock();}private:Mutex &_mtx;};
}

Main.cc:

#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>using namespace BlockQueueModule;void *Consumer(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while(true){int data;// 1. 从bq拿到数据bq->Pop(&data);// 2.做处理printf("Consumer, 消费了一个数据: %d\n", data);}
}void *Productor(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);int data = 10;while (true){sleep(2);// 1. 从外部获取数据// data = 10; // 有数据???// 2. 生产到bq中bq->Equeue(data);printf("producter 生产了一个数据: %d\n", data);data++;}
}int main()
{// 交易场所,不仅仅可以用来进行传递数据// 传递任务!!!v1: 对象 v2BlockQueue<int> *bq = new BlockQueue<int>(5); // 共享资源 -> 临界资源// 单生产,单消费pthread_t c1, p1; //,c2, , p2, p3;pthread_create(&c1, nullptr, Consumer, bq);// pthread_create(&c2, nullptr, Consumer, bq);pthread_create(&p1, nullptr, Productor, bq);// pthread_create(&p2, nullptr, Productor, bq);// pthread_create(&p3, nullptr, Productor, bq);pthread_join(c1, nullptr);// pthread_join(c2, nullptr);pthread_join(p1, nullptr);// pthread_join(p2, nullptr);// pthread_join(p3, nullptr);delete bq;return 0;
}

Makefile:

bin=bq
cc=g++
src=$(wildcard *.cc)
obj=$(src:.cc=.o)$(bin):$(obj)$(cc) -o $@ $^ -lpthread
%.o:%.cc$(cc) -c $< -std=c++17.PHONY:clean
clean:rm -f $(bin) $(obj).PHONY:test
test:echo $(src)echo $(obj)

6、为什么 pthread_cond_wait 需要互斥量?

条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。
条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。

在这里插入图片描述

(1)方案一:等待与锁分开写(错误)

按照上面的说法,我们设计出如下的代码:先上锁,发现条件不满足,解锁,然后等待在条件变量上不就行了,如下代码:

// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false) {
pthread_mutex_unlock(&mutex);
//解锁之后,等待之前,条件可能已经满⾜,信号已经发出,但是该信号可能被错过
pthread_cond_wait(&cond);
pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);

由于解锁和等待不是原子操作。调用解锁之后,pthread_cond_wait 之前,如果已经有其他线程获取到互斥量,摒弃条件满足,发送了信号,那么 pthread_cond_wait 将错过这个信号,可能会导致线程永远阻塞在这个 pthread_cond_wait。所以解锁和等待必须是一个原子操作。

(2)方案二:等待与锁合并(正确)

如果是这样写:int pthread cond_wait(pthread_cond t *cond,pthread_mutex_t *mutex);进入该函数后,会去看条件量等于0不?等于,就把互斥量变成1,直到cond wait返回,把条件量改成1,把互斥量恢复成原样。

在这里插入图片描述

7、条件变量使用规范

等待条件代码:

pthread_mutex_lock(&mutex);while(条件为假)
pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);

给条件发送信号代码

pthread _mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex)

8、条件变量的封装

基于上面的基本认识,我们已经知道条件变量如何使用,但这里可以做一下基本的封装。cond里用了Mutex &mutex是我们封装的锁,临时的类。
Cond.hpp:

#pragma once#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"namespace CondModule
{using namespace LockModule;class Cond{public:Cond(){int n = ::pthread_cond_init(&_cond, nullptr);(void)n;}void Wait(Mutex &mutex) // 让我们的线程释放曾经持有的锁!{int n = ::pthread_cond_wait(&_cond, mutex.LockPtr());}void Notify(){int n = ::pthread_cond_signal(&_cond);(void)n;}void NotifyAll(){int n = ::pthread_cond_broadcast(&_cond);(void)n;}~Cond(){int n = ::pthread_cond_destroy(&_cond);}private:pthread_cond_t _cond;};
}

BlockQueu.hpp:

#pragma once#include <iostream>
#include <queue>
#include <pthread.h>
#include "Mutex.hpp"
#include "Cond.hpp"namespace BlockQueueModule
{using namespace LockModule;using namespace CondModule;// version 2static const int gcap = 10;template <typename T>class BlockQueue{private:bool IsFull() { return _q.size() == _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap = gcap) : _cap(cap), _cwait_num(0), _pwait_num(0){}void Equeue(const T &in) // 生产者{LockGuard lockguard(_mutex);// 你想放数据,就能放数据吗??生产数据是有条件的!// 结论1: 在临界区中等待是必然的(目前)while (IsFull()) // 5. 对条件进行判断,为了防止伪唤醒,我们通常使用while进行判断!{std::cout << "生产者进入等待..." << std::endl;// 2. 等是,释放_mutex_pwait_num++;_productor_cond.Wait(_mutex); // wait的时候,必定是持有锁的!!是有问题的!_pwait_num--;// 3. 返回,线程被唤醒&&重新申请并持有锁(它会在临界区内醒来!)std::cout << "生产者被唤醒..." << std::endl;}// 4. if(IsFull())不满足 || 线程被唤醒_q.push(in); // 生产// 肯定有数据!if (_cwait_num){std::cout << "叫醒消费者" << std::endl;_consumer_cond.Notify();}}void Pop(T *out) // 消费者{LockGuard lockguard(_mutex);while (IsEmpty()){std::cout << "消费者进入等待..." << std::endl;_cwait_num++;_consumer_cond.Wait(_mutex); // 伪唤醒_cwait_num--;std::cout << "消费者被唤醒..." << std::endl;}// 4. if(IsEmpty())不满足 || 线程被唤醒*out = _q.front();_q.pop();// 肯定有空间if (_pwait_num){std::cout << "叫醒生产者" << std::endl;_productor_cond.Notify();}}~BlockQueue(){}private:std::queue<T> _q;     // 保存数据的容器,临界资源int _cap;             // bq最大容量Mutex _mutex;         // 互斥Cond _productor_cond; // 生产者条件变量Cond _consumer_cond;  // 消费者条件变量int _cwait_num;int _pwait_num;};
}

Mutex.hpp:

#pragma once
#include <iostream>
#include <pthread.h>namespace LockModule
{class Mutex{public:Mutex(const Mutex&) = delete;const Mutex& operator = (const Mutex&) = delete;Mutex(){int n = ::pthread_mutex_init(&_lock, nullptr);(void)n;}~Mutex(){int n = ::pthread_mutex_destroy(&_lock);(void)n;}void Lock(){int n = ::pthread_mutex_lock(&_lock);(void)n;}pthread_mutex_t *LockPtr(){return &_lock;}void Unlock(){int n = ::pthread_mutex_unlock(&_lock);(void)n;}private:pthread_mutex_t _lock;};class LockGuard{public:LockGuard(Mutex &mtx):_mtx(mtx){_mtx.Lock();}~LockGuard(){_mtx.Unlock();}private:Mutex &_mtx;};
}

Main.cc:

#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>using namespace BlockQueueModule;void *Consumer(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while(true){int data;// 1. 从bq拿到数据bq->Pop(&data);// 2.做处理printf("Consumer, 消费了一个数据: %d\n", data);}
}void *Productor(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);int data = 10;while (true){sleep(2);// 1. 从外部获取数据// data = 10; // 有数据???// 2. 生产到bq中bq->Equeue(data);printf("producter 生产了一个数据: %d\n", data);data++;}
}int main()
{// 交易场所,不仅仅可以用来进行传递数据// 传递任务!!!v1: 对象 v2BlockQueue<int> *bq = new BlockQueue<int>(5); // 共享资源 -> 临界资源// 单生产,单消费pthread_t c1, p1; //,c2, , p2, p3;pthread_create(&c1, nullptr, Consumer, bq);// pthread_create(&c2, nullptr, Consumer, bq);pthread_create(&p1, nullptr, Productor, bq);// pthread_create(&p2, nullptr, Productor, bq);// pthread_create(&p3, nullptr, Productor, bq);pthread_join(c1, nullptr);// pthread_join(c2, nullptr);pthread_join(p1, nullptr);// pthread_join(p2, nullptr);// pthread_join(p3, nullptr);delete bq;return 0;
}

Makefile:

bin=ringbuffer_cp
cc=g++
src=$(wildcard *.cc)
obj=$(src:.cc=.o)$(bin):$(obj)$(cc) -o $@ $^ -lpthread
%.o:%.cc$(cc) -c $< -std=c++17.PHONY:clean
clean:rm -f $(bin) $(obj).PHONY:test
test:echo $(src)echo $(obj)

9、POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。但POSIX可以用于线程间同步

(1)信号量函数

(i)初始化信号量

初始化信号量的函数叫做sem_init,该函数的函数原型如下:

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数说明:

  • sem:需要初始化的信号量。
  • pshared:传入0值表示线程间共享,传入非零值表示进程间共享。
  • value:信号量的初始值(计数器的初始值)。

注意: POSIX信号量和System V信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的,但POSIX信号量可以用于线程间同步。

(ii)销毁信号量

销毁信号量的函数叫做sem_destroy,该函数的函数原型如下:

int sem_destroy(sem_t *sem);

参数说明:

sem:需要销毁的信号量。

返回值说明:

销毁信号量成功返回0,失败返回-1。

(iii)等待信号量(申请信号量)

等待信号量的函数叫做sem_wait,该函数的函数原型如下:

int sem_wait(sem_t *sem);

参数说明:

sem:需要等待的信号量。

返回值说明:

等待信号量成功返回0,信号量的值减一。
等待信号量失败返回-1,信号量的值保持不变。

(iv)发布信号量(释放信号量)

发布信号量的函数叫做sem_post,该函数的函数原型如下:

int sem_post(sem_t *sem);

参数说明:

sem:需要发布的信号量。

返回值说明:

发布信号量成功返回0,信号量的值加一。
发布信号量失败返回-1,信号量的值保持不变。

(2)基于环形队列的生产消费模型

环形队列采用数组模拟,用模运算来模拟环状特性
在这里插入图片描述
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
在这里插入图片描述
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。

RingBuffer.hpp:

#pragma once#include <iostream>
#include <vector>
#include <pthread.h>#include "Sem.hpp"
#include "Mutex.hpp"namespace RingBufferModule
{using namespace SemModule;using namespace LockModule;// RingQueuetemplate <typename T>class RingBuffer{public:RingBuffer(int cap): _ring(cap),_cap(cap),_p_step(0),_c_step(0),_datasem(0),_spacesem(cap){}// 为什么没有判断??信号量,本身就是表示资源数目的,只要成功,就一定有,不需要判断!void Equeue(const T &in){// 生产者_spacesem.P();{LockGuard lockguard(_p_lock);_ring[_p_step] = in; // 生产完毕!_p_step++;_p_step %= _cap; // 维持唤醒特性}_datasem.V();}void Pop(T *out){// 消费者_datasem.P();{LockGuard lockguard(_c_lock);*out = _ring[_c_step];_c_step++;_c_step %= _cap;}_spacesem.V();}~RingBuffer(){}private:std::vector<T> _ring; // 环, 临界资源int _cap;             // 总容量int _p_step;          // 生产者位置int _c_step;          // 消费位置Sem _datasem;  // 数据信号量Sem _spacesem; // 空间信号量Mutex _p_lock;Mutex _c_lock;};
} // namespace RingBufferModule

Sem.hpp:

#pragma once
#include <semaphore.h>namespace SemModule
{int defalutsemval = 1;class Sem{public:Sem(int value = defalutsemval) : _init_value(value){int n = ::sem_init(&_sem, 0, _init_value);(void)n;}void P(){int n = ::sem_wait(&_sem);(void)n;}void V(){int n = ::sem_post(&_sem);(void)n;}~Sem(){int n = ::sem_destroy(&_sem);(void)n;}private:sem_t _sem;int _init_value;};
} // namespace SemModule

Mutex.hpp:

#pragma once
#include <iostream>
#include <pthread.h>namespace LockModule
{class Mutex{public:Mutex(const Mutex&) = delete;const Mutex& operator = (const Mutex&) = delete;Mutex(){int n = ::pthread_mutex_init(&_lock, nullptr);(void)n;}~Mutex(){int n = ::pthread_mutex_destroy(&_lock);(void)n;}void Lock(){int n = ::pthread_mutex_lock(&_lock);(void)n;}pthread_mutex_t *LockPtr(){return &_lock;}void Unlock(){int n = ::pthread_mutex_unlock(&_lock);(void)n;}private:pthread_mutex_t _lock;};class LockGuard{public:LockGuard(Mutex &mtx):_mtx(mtx){_mtx.Lock();}~LockGuard(){_mtx.Unlock();}private:Mutex &_mtx;};
}

Main.cc:

#include "RingBuffer.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>using namespace RingBufferModule;void *Consumer(void *args)
{RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);while(true){sleep(1);// sleep(1);// 1. 消费数据int data;ring_buffer->Pop(&data);// 2. 处理:花时间std::cout << "消费了一个数据: " << data << std::endl;}
}void *Productor(void *args)
{RingBuffer<int> *ring_buffer = static_cast<RingBuffer<int> *>(args);int data = 0;while (true){// 1. 获取数据:花时间// sleep(1);// 2. 生产数据ring_buffer->Equeue(data);std::cout << "生产了一个数据: " << data << std::endl;data++;}
}int main()
{RingBuffer<int> *ring_buffer = new RingBuffer<int>(5); // 共享资源 -> 临界资源// 单生产,单消费pthread_t c1, p1, c2,c3,p2;pthread_create(&c1, nullptr, Consumer, ring_buffer);pthread_create(&c2, nullptr, Consumer, ring_buffer);pthread_create(&c3, nullptr, Consumer, ring_buffer);pthread_create(&p1, nullptr, Productor, ring_buffer);pthread_create(&p2, nullptr, Productor, ring_buffer);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(c3, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);delete ring_buffer;return 0;
}

Makefile:

bin=ringbuffer_cp
cc=g++
src=$(wildcard *.cc)
obj=$(src:.cc=.o)$(bin):$(obj)$(cc) -o $@ $^ -lpthread
%.o:%.cc$(cc) -c $< -std=c++17.PHONY:clean
clean:rm -f $(bin) $(obj).PHONY:test
test:echo $(src)echo $(obj)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词