进程池和线程池 - 半同步/半异步进程池实现
liebian365 2024-10-27 13:13 24 浏览 0 评论
本文我们基于下图所示的半同步/半异步并发模式来实现进程池:
为了避免在父、子进程之间传递文件描述符,我们将接受新链接的操作放到子进程中。对于这种模式而言,一个客户连接上的所有任务始终是由一个子进程来处理的。
代码如下:
#ifndef PROCESSPOOL_H
#define PROCESSPOOL_H
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <wait.h>
#include <sys/stat.h>
/*描述一个子进程的类,m_pid是目标子进程的PID,m_pipefd是父进程和子进程通信用的管道*/
class process
{
public:
process():m_pid(-1){}
public:
pid_t m_pid;
int m_pipefd[2];
};
/*进程池类,将它定义为模板类是为了代码复用。其模板参数是处理逻辑任务的类*/
template <typename T>
class processpool
{
private:
/*将构造函数定义为私有的,因此我们只能呢个通过后面的create静态函数来创建processpoll实例*/
processpool(int listenfd, int process_number = 8);
public:
/*单例模式,以保证程序最多创建一个processpool实例,这是程序正确处理信号的必要条件*/
static processpool<T>* create(int listenfd, int process_number = 8)
{
if(!m_instance){
m_instance = new processpool<T>(listenfd, process_number);
}
return m_instance;
}
~processpool()
{
delete[] m_sub_process;
}
/*启动进程池*/
void run();
private:
void setup_sig_pipe();
void run_parent();
void run_child();
private:
/*进程池允许的最大子进程数量*/
static const int MAX_PROCESS_NUMBER = 16;
/*每个子进程最多能处理的客户数量*/
static const int USER_PER__PROCESS = 65535;
/*epoll 最多能处理的事件数*/
static const int MAX_EVENT_NUMBER = 10000;
/*进程池中的进程总数*/
int m_process_number;
/*子进程在池中的序号,从0开始*/
int m_idx;
/*每个进程都有一个epoll内核时间表,用m_epollfd标识*/
int m_epollfd;
/*监听socket*/
int m_listenfd;
/*子进程通过m_stop来决定是否停止运行*/
int m_stop;
/*保存所有子进程的描述信息*/
process* m_sub_process;
/*进程池静态实例*/
static processpool<T>* m_instance;
};
template<typename T>
processpool<T>* processpool<T>::m_instance = NULL;
/*用于处理信号的管道,以实现统一事件源,后面称之为信号管道*/
static int sig_pipefd[2];
static int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
static void addfd(int epollfd, int fd)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
/*从epollfd标识的epoll内核事件表中删除fd上的所有注册事件*/
static void removefd(int epollfd, int fd)
{
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0);
close(fd);
}
static void sig_handler(int sig)
{
int save_errno = errno;
int msg = sig;
send(sig_pipefd[1], (char*)&msg, 1, 0);
errno = save_errno;
}
static void addsig(int sig, void (handler)(int), bool restart = true)
{
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = handler;
if(restart){
sa.sa_flags |= SA_RESTART;
}
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != -1);
}
/*进程池构造函数,参数listenfd是监听socket,
它必须在创建进程池之前被创建,否则子进程
无法直接引用她。
参数process_number指定进程中子进程的数量*/
template<typename T>
processpool<T>::processpool(int listenfd, int process_number)
: m_listenfd(listenfd), m_process_number(process_number),
m_idx(-1), m_stop(false)
{
assert(process_number >0 && process_number <= MAX_PROCESS_NUMBER);
m_sub_process = new process[process_number];
assert(m_sub_process);
/*创建process_number个子进程,并创建它们和父进程之间的管道*/
for(int i = 0; i < process_number; ++i){
int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, m_sub_process[i].m_pipefd);
assert(ret == 0);
m_sub_process[i].m_pid = fork();
assert(m_sub_process[i].m_pid >= 0);
if(m_sub_process[i].m_pid > 0){
close(m_sub_process[i].m_pipefd[1]);
continue;
}
else{
close(m_sub_process[i].m_pipefd[0]);
m_idx = i;
break;
}
}
}
/*统一事件源*/
template<typename T>
void processpool<T>::setup_sig_pipe()
{
/*创建epoll事件监听表和信号管道*/
m_epollfd = epoll_create(5);
assert(m_epollfd != -1);
int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd);
assert(ret != -1);
setnonblocking(sig_pipefd[1]);
addfd(m_epollfd, sig_pipefd[0]);
/*设置信号处理函数*/
addsig(SIGCHLD, sig_handler);
addsig(SIGTERM, sig_handler);
addsig(SIGINT, sig_handler);
addsig(SIGPIPE, sig_handler);
}
/*父进程中没m_idx值为-1,子进程中m_idx值大于等于0,
我们据此判断接下来要运行的是父进程代码还是子进程代码*/
template<typename T>
void processpool<T>::run()
{
if(m_idx != -1){
run_child();
return;
}
run_parent();
}
template<typename T>
void processpool<T>::run_child()
{
setup_sig_pipe();
/*每个子进程都通过其在进程池中的序号m_idx找到与父进程通信的管道*/
int pipefd = m_sub_process[m_idx].m_pipefd[1];
/*子进程需要监听管道文件描述符pipefd,因为父进程将通过它来通知子进程accept新连接*/
addfd(m_epollfd, pipefd);
epoll_event events[MAX_EVENT_NUMBER];
T* users = new T[USER_PER__PROCESS];
assert(users);
int number = 0;
int ret = -1;
while(!m_stop){
number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
if (number < 0 && errno != EINTR){
printf("epoll failure");
break;
}
for(int i = 0; i < number; ++i){
int socket = events[i].data.fd;
if(socket == pipefd && events[i].events&EPOLLIN){
int client = 0;
/*从父、子进程之间的管道读取数据,并将结果保存在变量client中
如果读取成功,则表示有新客户连接到来*/
ret = recv(socket, (char*)&client, sizeof(client), 0);
if((ret < 0 && errno != EAGAIN) || ret == 0){
continue;
}
else{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(m_listenfd, (struct sockaddr*)&client_address,
&client_addrlength);
if(connfd<0){
printf("errno is %d - %s\n", errno, strerror(errno));
continue;
}
addfd(m_epollfd, connfd);
/*模板类T必须实现init方法,以初始化一个客户连接。
我们直接使用connfd来索引逻辑处理对象(T类型对象),
以提高程序效率*/
users[connfd].init(m_epollfd, connfd, client_address);
}
}
/*下面处理子进程收到的信号*/
else if(socket == sig_pipefd[0] && events[i].events &EPOLLIN){
int sig;
char signals[1024];
ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
if(ret < 0){
continue;
}
else{
for(int i = 0; i < ret; ++i){
switch (signals[i]){
case SIGCHLD:{
pid_t pid;
int stat;
while((pid = waitpid(-1, &stat, WNOHANG)) > 0){
continue;
}
break;
}
case SIGTERM:
case SIGINT:{
m_stop = true;
break;
}
default:
break;
}
}
}
}
/*如果是其他可读数据,那么必然是客户请求到来。
调用逻辑处理对象的process方法处理之*/
else if(events[i].events & EPOLLIN){
users[socket].process();
}
else{
continue;
}
}
}
delete [] users;
users = NULL;
close(pipefd);
/*提醒:应该由m_listenfd的创建者来关闭这个文件描述符
即“对象由哪个函数创建,就应该由哪个函数销毁”*/
//close(m_listenfd);
close(m_epollfd);
}
template<typename T>
void processpool<T>::run_parent()
{
setup_sig_pipe();
/*父进程监听m_listenfd*/
addfd(m_epollfd, m_listenfd);
epoll_event events[MAX_EVENT_NUMBER];
int sub_process_counter = 0;
int new_conn = 1;
int number = 0;
int ret = -1;
while(!m_stop){
number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
if (number < 0 && errno != EINTR){
printf("epoll failure\n");
break;
}
for(int i = 0; i < number; ++i){
int sockfd = events[i].data.fd;
if(sockfd == m_listenfd){
/*如果有新的连接到来,就采用Round Robin方式
将其分配给一个子进程处理*/
int i = sub_process_counter;
do{
if(m_sub_process[i].m_pid != -1){
break;
}
i = (i+1)%m_process_number;
}
while (i != sub_process_counter);
if(m_sub_process[i].m_pid == -1){
m_stop = true;
break;
}
sub_process_counter = (i+1)%m_process_number;
send(m_sub_process[i].m_pipefd[0], (char*)&new_conn,
sizeof(new_conn), 0);
printf("send request to child %d\n", i);
}
/*下面处理父进程接收到的信号*/
else if(sockfd == sig_pipefd[0] && events[i].events & EPOLLIN){
int sig;
char signals[1024];
ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
if(ret < 0){
continue;
}
else{
for(int i = 0; i < ret; ++i){
switch (signals[i]){
case SIGCHLD:{
pid_t pid;
int stat;
while((pid = waitpid(-1, &stat, WNOHANG)) > 0){
for(int i = 0; i < m_process_number; ++i){
/*
如果进程池中第i个子进程退出了,则主进程关闭相应
的通信管道,并设置相应的m_pid为-1,
以标记该子进程已经退出
*/
if(m_sub_process[i].m_pid == pid){
printf("child %d join\n", i);
close(m_sub_process[i].m_pipefd[0]);
m_sub_process[i].m_pid = -1;
}
}
}
/*如果子进程都已经退出了, 则父进程也退出*/
m_stop = true;
for(int i = 0; i< m_process_number; ++i){
if(m_sub_process[i].m_pid != -1){
m_stop = false;
}
}
break;
}
case SIGTERM:
case SIGINT:{
/*如果父进程接收到终止信号,那么就杀死所有子进程,
并等待它们全部结束。当然,通知子进程结束更好的办法,
是向父、子进程间的管道发送特殊数据*/
printf("kill all the child now\n");
for(int i =0; i < m_process_number; ++i){
int pid = m_sub_process[i].m_pid;
if(pid != -1){
kill(pid, SIGTERM);
}
}
break;
}
default:
break;
}
}
}
}
else{
continue;
}
}
}
//close(m_listenfd); /*由创建者关闭这个文件描述符*/
close(m_epollfd);
}
#endif
相关推荐
- go语言也可以做gui,go-fltk让你做出c++级别的桌面应用
-
大家都知道go语言生态并没有什么好的gui开发框架,“能用”的一个手就能数的清,好用的就更是少之又少。今天为大家推荐一个go的gui库go-fltk。它是通过cgo调用了c++的fltk库,性能非常高...
- 旧电脑的首选系统:TinyCore!体积小+精简+速度极快,你敢安装吗
-
这几天老毛桃整理了几个微型Linux发行版,准备分享给大家。要知道可供我们日常使用的Linux发行版有很多,但其中的一些发行版经常会被大家忽视。其实这些微型Linux发行版是一种非常强大的创新:在一台...
- codeblocks和VS2019下的fltk使用中文
-
在fltk中用中文有点问题。英文是这样。中文就成这个样子了。我查了查资料,说用UTF-8编码就行了。edit->Fileencoding->UTF-8然后保存文件。看下下边的编码指示确...
- FLTK(Fast Light Toolkit)一个轻量级的跨平台Python GUI库
-
FLTK(FastLightToolkit)是一个轻量级的跨平台GUI库,特别适用于开发需要快速、高效且简单界面的应用程序。本文将介绍Python中的FLTK库,包括其特性、应用场景以及如何通过代...
- 中科院开源 RISC-V 处理器“香山”流片,已成功运行 Linux
-
IT之家1月29日消息,去年6月份,中科院大学教授、中科院计算所研究员包云岗,发布了开源高性能RISC-V处理器核心——香山。近日,包云岗在社交平台晒出图片,香山芯片已流片,回片后...
- Linux 5.13内核有望合并对苹果M1处理器支持的初步代码
-
预计Linux5.13将初步支持苹果SiliconM1处理器,不过完整的支持工作可能还需要几年时间才能完全完成。虽然Linux已经可以在苹果SiliconM1上运行,但这需要通过一系列的补丁才能...
- Ubuntu系统下COM口测试教程(ubuntu port)
-
1、在待测试的板上下载minicom,下载minicom有两种方法:方法一:在Ubuntu软件中心里面搜索下载方法二:按“Ctrl+Alt+T”打开终端,打开终端后输入“sudosu”回车;在下...
- 湖北嵌入式软件工程师培训怎么选,让自己脱颖而出
-
很多年轻人毕业即失业、面试总是不如意、薪酬不满意、在家躺平。“就业难”该如何应对,参加培训是否能改变自己的职业走向,在湖北,有哪些嵌入式软件工程师培训怎么选值得推荐?粤嵌科技在嵌入式培训领域有十几年经...
- 新阁上位机开发---10年工程师的Modbus总结
-
前言我算了一下,今年是我跟Modbus相识的第10年,从最开始的简单应用到协议了解,从协议开发到协议讲解,这个陪伴了10年的协议,它一直没变,变的只是我对它的理解和认识。我一直认为Modbus协议的存...
- 创建你的第一个可运行的嵌入式Linux系统-5
-
@ZHangZMo在MicrochipBuildroot中配置QT5选择Graphic配置文件增加QT5的配置修改根文件系统支持QT5修改output/target/etc/profile配置文件...
- 如何在Linux下给zigbee CC2530实现上位机
-
0、前言网友提问如下:粉丝提问项目框架汇总下这个网友的问题,其实就是实现一个网关程序,内容分为几块:下位机,通过串口与上位机相连;下位机要能够接收上位机下发的命令,并解析这些命令;下位机能够根据这些命...
- Python实现串口助手 - 03串口功能实现
-
串口调试助手是最核心的当然是串口数据收发与显示的功能,pzh-py-com借助的是pySerial库实现串口收发功能,今天痞子衡为大家介绍pySerial是如何在pzh-py-com发挥功能的。一、...
- 为什么选择UART(串口)作为调试接口,而不是I2C、SPI等其他接口
-
UART(通用异步收发传输器)通常被选作调试接口有以下几个原因:简单性:协议简单:UART的协议非常简单,只需设置波特率、数据位、停止位和校验位就可以进行通信。相比之下,I2C和SPI需要处理更多的通...
- 同一个类,不同代码,Qt 串口类QSerialPort 与各种外设通讯处理
-
串口通讯在各种外设通讯中是常见接口,因为各种嵌入式CPU中串口标配,工业控制中如果不够还通过各种串口芯片进行扩展。比如spi接口的W25Q128FV.对于软件而言,因为驱动接口固定,软件也相对好写,因...
- 嵌入式linux为什么可以通过PC上的串口去执行命令?
-
1、uboot(负责初始化基本硬bai件,如串口,网卡,usb口等,然du后引导系统zhi运行)2、linux系统(真正的操作系统)3、你的应用程序(基于操作系统的软件应用)当你开发板上电时,u...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- go语言也可以做gui,go-fltk让你做出c++级别的桌面应用
- 旧电脑的首选系统:TinyCore!体积小+精简+速度极快,你敢安装吗
- codeblocks和VS2019下的fltk使用中文
- FLTK(Fast Light Toolkit)一个轻量级的跨平台Python GUI库
- 中科院开源 RISC-V 处理器“香山”流片,已成功运行 Linux
- Linux 5.13内核有望合并对苹果M1处理器支持的初步代码
- Ubuntu系统下COM口测试教程(ubuntu port)
- 湖北嵌入式软件工程师培训怎么选,让自己脱颖而出
- 新阁上位机开发---10年工程师的Modbus总结
- 创建你的第一个可运行的嵌入式Linux系统-5
- 标签列表
-
- wireshark怎么抓包 (75)
- qt sleep (64)
- cs1.6指令代码大全 (55)
- factory-method (60)
- sqlite3_bind_blob (52)
- hibernate update (63)
- c++ base64 (70)
- nc 命令 (52)
- wm_close (51)
- epollin (51)
- sqlca.sqlcode (57)
- lua ipairs (60)
- tv_usec (64)
- 命令行进入文件夹 (53)
- postgresql array (57)
- statfs函数 (57)
- .project文件 (54)
- lua require (56)
- for_each (67)
- c#工厂模式 (57)
- wxsqlite3 (66)
- dmesg -c (58)
- fopen参数 (53)
- tar -zxvf -c (55)
- 速递查询 (52)