前言:
本篇博客所涉及到的代码以同步到本人gitee
:进程池· 迟来的grown/linux - 码云 - 开源中国
一、池化技术
在之前的学习中,多多少少都听说过池,例如内存池,线程池等等。
那这些池到底是干什么的呢?池又指的是什么呢?
简单来说:池化技术是一种通过复用预先创建的资源,来提升系统性能和资源利用率的优化方法。
其核心思想:预先创建并管理一组资源示例,提供给多个任务共享;这样避免频繁的创建和销毁资源的开销。
这种思想就好比在vector
动态数组扩容一样,是按照2
或者1.5
倍进行扩容,这样减少开辟空间的开销从而提高效率。
二、进程池原理
我们也了解了进程间通信——管道,可以实现进程之间的通信技术;那现在我们可不可以利用父进程管理所有的子进程,并且让子进程去完成某些任务呢?
当然是可以的,父进程通过匿名管道和子进程进行通信;通过给子进程传输信息来让子进执行某个任务。
这样,通过匿名管道来实现父子进程间的通信;
父进程传输信息给子进程,子进程根据父进程传输的信息来执行不同的任务。
那进程池又是什么东西呢?
池化技术是预先创建资源,通过复用来提升系统性能和资源利用率。
- 这里子进程要执行任务,如果这里父进程要传输信息给子进程时再去创建进程,子进程执行完任务之后就退出;
- 这势必存储非常多的系统调用,而系统调用也是有成本的。
- 那这里我们就可以预先创建多个子进程,让这些子进程等待父进程传输信息;执行完任务后继续等待父进程传输信息。
这样我们预先创建进程,让这些进程执行任务而不是在要执行任务时再创建进程;并且执行完任务的子进程还可以继续完成下一个任务,这样通过复用进程来通过系统性能和进制资源的利用率。
三、进程池实现
了解了进程池原理,现在来看它应该如何去实现:
- 首先,我们要预先创建一个进程池,并把它管理起来。
- 其次,父进程要通过传输信息来控制子进程完成不同的任务;父进程就要发送信息,子进程就要接受信息并执行任务
- 最后,进程池能够被创建出来,当然也要能够被释放(销毁)。
1. 描述进程池
我们要预先创建一个进程池,并且要将它管理起来;那就要像将这个进程池描述出来;如何描述并管理这个进程池呢?
这里我们要实现的本质还是要进行父子进程间通信,要让父进程发信息来控制子进程;
所以这里我们要实现的实质就是将一个一个的管道文件管理起来,如何管理呢?
先描述、再组织
先描述
那站着父进程的角度:
一个文件描述符对应一个子进程(关闭不用的文件描述符之后);
所以要让父进程将这些子进程管理起来,就只需要将管道文件管理起来。
所以就可以设计一个channel
类用来描述一个管道文件;那这个类具有哪些属性呢?
wfd
:父进程中写端的文件描述符pid
:管道文件对应子进程的pid
,后续用来回收子进程
class channel
{
public:channel(int wfd, int pid): _wfd(wfd), _pid(pid){}~channel() {}
private:int _wfd;int _pid;
};
再组织
一个父进程它要创建多个管道文件也就是多个子进程,就要将这些子进程管理起来;
所以,这里可以设计一个channel_manage
类来讲管道文件channel
管理起来。
class channel_manage
{
public:channel_manage(){}~channel_manage(){}
private:std::vector<channel> _channels;
};
描述进程池
在进程池中,一定是存在上面的组织管道文件channel_manage
,因为我们要对其进行管理;
在进程池中还可能存在其他信息:进程负载情况,进程状态等等。
这里就只记录进程的数量。
class channel_pool
{
public:const int NUM = 5; // 进程池中进程数量channel_pool(): _processnum(NUM){}~channel_pool(){}
private:channel_manage _cm;int _processnum;
};
2. 初始化进程池
描述出了进程池channel_pool
,现在我们能够根据这个channel_pool
创建出一个进程池对象,但是创建出来的这个进程池对象里面什么是都没有,一个进程都没有。
所以,我们就要对进程池进行初始化:那如何初始化呢?(这里暂定进程池中进程个数为5个)
初始化,就要创建子进程,并且让父子进程之间创建联系(就是创建管道文件)。
并且我们要将管道文件的
channel
记录在channel_pool
的成员_cm
中。
简单来说初始化进程池时就要将所有的子进程创建出来,那创建完子进程,子进程应该做什么呢,父进程又该做什么呢?
- 这里创建子进程就是为了让子进程完成某个任务,所以创建子进程之后(记得关闭不用的文件描述符),就让子进程等待父进程发送信息;在子进程接受到信息之后再去完成任务。
- 而创建完子进程之后,父进程关闭不用的文件描述符,然后就要在
_cm
中新增一个管道文件channel
对象。
而channel
在channel_manage
中,所以channel_manage
就要通过新增channel
对象的方法。
//这里只显示新增方法和成员变量
class channel_manage
{
public:void _insert(int wfd, int pid){_channels.emplace_back(wfd,pid);}
private:std::vector<channel> _channels;
};
class channel_pool
{
public:void work(int rfd){};//任务方法void _init(){for(int i = 0;i<_processnum;i++){int fd[2];int n = pipe(fd);if(n < 0){std::cerr<<"pipe failed"<<std::endl;exit(1);}//创建子进程int id = fork();if(id < 0){std::cerr<<"fork failed"<<std::endl;exit(1);}else if(id == 0){//childclose(fd[1]);//关闭写端_work(fd[0]);//等到父进程发送信息close(fd[0]);exit(1);}//parentclose(fd[0]);//在_cm中新增channel对象_cm._insert(fd[1],id);}}
private:channel_manage _cm;int _processnum;
};
3. 子进程接受信息
完成了上述操作,现在进程池被创建出来,也被初始化了。
在创建子进程时,子进程被创建出来,然后就等待父进程发送信息然后完成任务,所以说,子进程就要支持接受信息。
这里使用的是匿名管道来完成父子进程间的通信,所以子进程接受信息的实质就是从匿名管道在读取数据。
这里子进程是直到管道文件的文件描述符的就是fd[0]
;所以子进程就要在fd[0]
文件描述符对应的管道文件中读取数据,然后根据读取到的数据来执行不同的任务。
那父子之间发送什么样的信息呢?
这个就有很多了,可以发送一个整数,一个整数对应一个任务。
也可以按照位图来传递参数,一个二进制位对应一个任务。
这里就采用一个整数对应一个任务,让父进程发送一个整数给子进程。
子进程读取数据:
我们直到写入和读取是独立的,父进程按照整数进行写入,但是子进程读取到的不一定是整数啊。
所以
read
函数返回值:
- 读取数据失败就返回
-1
;- 写端退出就返回
0
;- 返回值大于0 :表示读取到的实际字节数。(当读取到的字节数不等于4,就表示不是按照
int
读取的,就指读取到一个位置信息)。
void work(int rfd){while (true){int massage = 0;int n = read(rfd, &massage, sizeof(massage));if (n < 0){std::cerr << "read failed" << std::endl;exit(1);}else if (n == 0){std::cout << "exit, because write exit" << std::endl;break;}else if (n != 4){std::cout << "unkonw massage : " << massage << std::endl;}// 读取成功,执行任务std::cout << "receive massage : " << massage << std::endl;}}
4. 父进程发送信息
实现子进程接受信息,现在来看父进程发送信息;如何发送呢?
这里父进程发送信息无非就以下三个问题:
- 给哪一个进程发送信息?
- 发送什么信息?
- 如何发送信息?
选择一个进程发送信息
对于这个问题,我们可以按照顺序选择一个进程去完成任务、也可以完全随机的选择一个进程去执行、还可以根据每一个进程的负载情况去挑选一个进程完成任务。
这里就按照顺序选择一个进程去执行任务
而我们的描述进程(管道)的
channel
封装在channel_manage
中,所以这个类就要提供一个方法,按照顺序选择一个channel
。
发送什么信息?
这里并不存在什么任务可以去执行的, 当存在任务时就可以根据实际任务来发送任务码给子进程然后让子进程去完成任务。
如何发送信息
父进程如何给子进程发送信息呢?
很简单父进程在对应的管道文件中写入任务码即可。
而对应管道文件的文件描述符封装在
channel
中,所以发送信息这个方法就要由channel
类提供,在调用时只需传递任务码即可。
class channel
{
public:bool _send(int taskcode){int n = write(_wfd, &taskcode, sizeof(taskcode));if (n < 0){ // 写入失败std::cerr << "write failed" << std::endl;return false;}// 写入成功return true;}
private:int _wfd;int _pid;
};
class channel_manage
{
public:channel &select(){auto &ret = _channels[_next];_next++;_next %= _channels.size();return ret;}
private:std::vector<channel> _channels;int _next = 0;
};
class channel_pool
{
public:void send(int taskcode){// 选择一个进程auto &c = _cm._select();// 任务码由上层调用决定// 发送信息c._send(taskcode);}
private:channel_manage _cm;int _processnum;
};
到这里,进程池就可以大致的运行起来了,这里简单测试一下
//test.cc
#include "channelpool.hpp"
int main()
{srand((int)time(nullptr));channel_pool cp;cp._init();cp.Print(); // 输出进程池中的所有进程信息while (1){int taskcode = rand() % 5;cp.send(taskcode);std::cout << std::endl;sleep(1);}return 0;
}
这里进程池也是能够正常运行,子进程也能够接受到父进程发送的信息。
5. 回收进程池资源
做完上述的内容,这里的进程池就大致可以运行起来;
但是现在还缺少一个步骤,那就是回收进程池的资源。
如何回收进程池的资源呢?
- 首先,要关闭父进程中所有的管道文件的文件描述符。
- 其次就是父进程等待子进程退出,回收子进程。
而_wfd
文件描述符、_pid
子进程pid
都封装在channel
中;
如何关闭管道文件,如何等待子进程退出,这都要channel
来提供。
而我们想要通过进程池对象调用回收函数,那channel_manage
也要提供对应的关闭文件和等待子进程退出的函数。
class channel
{
public:void _close(){close(_wfd);}void _wait(){wait(nullptr);}
private:int _wfd;int _pid;
};
class channel_manage
{
public:void _close(){for (auto &channel : _channels){channel._close();std::cout << "关闭管道文件 : " << channel.getname() << std::endl;}}void _wait(){for (auto &channel : _channels){channel._wait();std::cout << "等待子进程退出 : " << channel.getname() << std::endl;}}
private:std::vector<channel> _channels;int _next = 0;
};
class channel_pool
{
public:void _quit(){//关闭所有w端文件_cm._close();//回收子进程_cm._wait();}
private:channel_manage _cm;int _processnum;
};
这里就发送一次信息然后退出,测试一下:
//test.cc
#include "channelpool.hpp"
int main()
{srand((int)time(nullptr));channel_pool cp;cp._init();cp.Print(); // 输出进程池中的所有进程信息int cnt = 1;while (cnt--){int taskcode = rand() % 5;cp.send(taskcode);std::cout << std::endl;}cp._quit();sleep(10);return 0;
}
可以看到,进程池成功创建了
5
个进程,并且父进程发送了一条信息给子进程;然后成功关闭了所有的管道文件,关闭的同时子进程发现写端退出,子进程节关闭读端然后退出了。
最后也成功回收了所有的子进程。
四、隐藏的问题
在上述的代码中存在一个隐藏的问题:
上面代码在进程池退出回收资源时,是先关闭了所有的写端文件,再等待子进程退出。
这样进程池是可以运行的,但是会忽略一个问题。
现在来看如果这样回收进程池资源:
class channel
{
public:void _close(){close(_wfd);}void _wait(){wait(nullptr);}
private:int _wfd;int _pid;
};
class channel_manage
{
public:void _quit(){for (auto &channel : _channels){channel._close();std::cout << "关闭管道文件 : " << channel.getname() << std::endl;channel._wait();std::cout << "等待子进程退出 : " << channel.getname() << std::endl;}}
private:std::vector<channel> _channels;int _next = 0;
};
class channel_pool
{
public:void _quit(){//关闭所有w端文件_cm._close();//回收子进程_cm._wait();}
private:channel_manage _cm;int _processnum;
};
这样关闭一个写端,等待一个子进程退出。
我们会发现,程序卡到了这里,这是为什么呢?
这是因为,在我们创建子进程时,子进程的文件描述符表来源于父进程;
- 这样父进程在创建子进程时,这个子进程的文件描述符表中是存储前面创建的管道文件的
w
端;这样我们调用channel
类的_close
就只关闭了父进程的w
端,在其他进程中还存在管道文件的w
端。- 这样子进程就会阻塞到
read
出,等待w
端关闭,而父进程就等待子进程退出,这样程序就卡在了这样。
通过上图我们可以发现,子进程的文件描述符
3
始终指向自己的读端管道文件。
- 而第一个创建的子进程,文件描述符
4
指向自己的w
端,然后被关闭了。- 第二个创建的子进程,
4
指向第一个子进程对应管道文件的w
端。- 第三个创建的子进程,
4
指向第一个子进程管道文件的w
端,5
指向第二个子进程对应管道文件的w
端。- …
这样,在回收进程池的资源时,关闭父进程第一个子进程的w
端,此时第二、第三…个子进程的文件描述符中都存在第一个子进程对应管道文件的w
端。
解决方法:
在回收进程池资源时,从最后一个被创建的子进程开始回收。
因为一个子进程中只存在比自己创建的早的子进程的
w
端;所以最后一个被创创建的子进程的w
端只有父进程存在。在创建子进程时,关闭比该子进程创建早的子进程的
w
端。
对于第一种方法这里就不演示了,来看第二种方法
对于第二种方法可能存在疑问:子进程和父进程是同时执行的,那子进程不会把自己的w
段关闭吗?那不就同一个文件描述符关闭2次了吗?
当然是不会的,父进程在为新的子进程创建
channel
对象并插入到channel_masage
中就是对数据的修改,会发生写时拷贝。所以子进程只会拿到比自己创建早的子进程的
w
端
实现第二种方法也很简单,只需在创建子进程之后,让子进程调用一次channel_massage
的_close
函数即可。
这里就不演示了。
到这里,本篇文章内容就结束了
简单总结:
本篇文章主要基于匿名管道实现了父子进程的通信:父进程通过传输信息来控制子进程执行某些任务。