百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

进程池和线程池 - 半同步/半异步进程池实现

liebian365 2024-10-27 13:13 19 浏览 0 评论

本文我们基于下图所示的半同步/半异步并发模式来实现进程池:

为了避免在父、子进程之间传递文件描述符,我们将接受新链接的操作放到子进程中。对于这种模式而言,一个客户连接上的所有任务始终是由一个子进程来处理的。

代码如下:

#ifndef PROCESSPOOL_H
#define PROCESSPOOL_H

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <wait.h>
#include <sys/stat.h>

/*描述一个子进程的类,m_pid是目标子进程的PID,m_pipefd是父进程和子进程通信用的管道*/
class  process
{
public:
    process():m_pid(-1){}
public:
    pid_t m_pid;
    int m_pipefd[2];
};

/*进程池类,将它定义为模板类是为了代码复用。其模板参数是处理逻辑任务的类*/
template <typename T>
class processpool
{
private:
    /*将构造函数定义为私有的,因此我们只能呢个通过后面的create静态函数来创建processpoll实例*/
    processpool(int listenfd, int process_number = 8);
public:
    /*单例模式,以保证程序最多创建一个processpool实例,这是程序正确处理信号的必要条件*/
    static processpool<T>* create(int listenfd, int process_number = 8)
    {
        if(!m_instance){
            m_instance = new processpool<T>(listenfd, process_number);
        }
        return m_instance;
    }
    ~processpool()
    {
        delete[] m_sub_process;
    }
    /*启动进程池*/
    void run();
private:
    void setup_sig_pipe();
    void run_parent();
    void run_child();

private:
    /*进程池允许的最大子进程数量*/
    static const int MAX_PROCESS_NUMBER = 16;
    /*每个子进程最多能处理的客户数量*/
    static const int USER_PER__PROCESS = 65535;
    /*epoll 最多能处理的事件数*/
    static const int MAX_EVENT_NUMBER = 10000;
    /*进程池中的进程总数*/
    int m_process_number;
    /*子进程在池中的序号,从0开始*/
    int m_idx;
    /*每个进程都有一个epoll内核时间表,用m_epollfd标识*/
    int m_epollfd;
    /*监听socket*/
    int m_listenfd;
    /*子进程通过m_stop来决定是否停止运行*/
    int m_stop;
    /*保存所有子进程的描述信息*/
    process* m_sub_process;
    /*进程池静态实例*/
    static processpool<T>* m_instance;
};
template<typename T>
processpool<T>* processpool<T>::m_instance = NULL;

/*用于处理信号的管道,以实现统一事件源,后面称之为信号管道*/
static int sig_pipefd[2];

static int setnonblocking(int fd)
{
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

static void addfd(int epollfd, int fd)
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    setnonblocking(fd);
}

/*从epollfd标识的epoll内核事件表中删除fd上的所有注册事件*/
static void removefd(int epollfd, int fd)
{
    epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0);
    close(fd);
}

static void sig_handler(int sig)
{
    int save_errno = errno;
    int msg = sig;
    send(sig_pipefd[1], (char*)&msg, 1, 0);
    errno = save_errno;
}

static void addsig(int sig, void (handler)(int), bool restart = true)
{
    struct sigaction sa;
    memset(&sa, '\0', sizeof(sa));
    sa.sa_handler = handler;
    if(restart){
        sa.sa_flags |= SA_RESTART;
    }
    sigfillset(&sa.sa_mask);
    assert(sigaction(sig, &sa, NULL) != -1);
}

/*进程池构造函数,参数listenfd是监听socket, 
它必须在创建进程池之前被创建,否则子进程
无法直接引用她。
参数process_number指定进程中子进程的数量*/
template<typename T>
processpool<T>::processpool(int listenfd, int process_number)
: m_listenfd(listenfd), m_process_number(process_number), 
m_idx(-1), m_stop(false)
{
    assert(process_number >0 && process_number <= MAX_PROCESS_NUMBER);
    m_sub_process = new process[process_number];
    assert(m_sub_process);

    /*创建process_number个子进程,并创建它们和父进程之间的管道*/
    for(int i = 0; i < process_number; ++i){
        int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, m_sub_process[i].m_pipefd);
        assert(ret == 0);

        m_sub_process[i].m_pid = fork();
        assert(m_sub_process[i].m_pid >= 0);
        if(m_sub_process[i].m_pid > 0){
            close(m_sub_process[i].m_pipefd[1]);
            continue;
        }
        else{
            close(m_sub_process[i].m_pipefd[0]);
            m_idx = i;
            break;
        }
    } 
}

/*统一事件源*/
template<typename T>
void processpool<T>::setup_sig_pipe()
{
    /*创建epoll事件监听表和信号管道*/
    m_epollfd = epoll_create(5);
    assert(m_epollfd != -1);

    int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd);
    assert(ret != -1);

    setnonblocking(sig_pipefd[1]);
    addfd(m_epollfd, sig_pipefd[0]);

    /*设置信号处理函数*/
    addsig(SIGCHLD, sig_handler);
    addsig(SIGTERM, sig_handler);
    addsig(SIGINT, sig_handler);
    addsig(SIGPIPE, sig_handler);
}

/*父进程中没m_idx值为-1,子进程中m_idx值大于等于0, 
我们据此判断接下来要运行的是父进程代码还是子进程代码*/
template<typename T>
void processpool<T>::run()
{
    if(m_idx != -1){
        run_child();
        return;
    }
    run_parent();
}


template<typename T>
void processpool<T>::run_child()
{
    setup_sig_pipe();

    /*每个子进程都通过其在进程池中的序号m_idx找到与父进程通信的管道*/
    int pipefd = m_sub_process[m_idx].m_pipefd[1];
    /*子进程需要监听管道文件描述符pipefd,因为父进程将通过它来通知子进程accept新连接*/
    addfd(m_epollfd, pipefd);

    epoll_event events[MAX_EVENT_NUMBER];
    T* users = new T[USER_PER__PROCESS];
    assert(users);
    int number = 0;
    int ret = -1;

    while(!m_stop){
        number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
        if (number < 0 && errno != EINTR){
            printf("epoll failure");
            break;
        }

        for(int i = 0; i < number; ++i){
            int socket = events[i].data.fd;
            
            if(socket == pipefd && events[i].events&EPOLLIN){
                int client = 0;
                /*从父、子进程之间的管道读取数据,并将结果保存在变量client中
                如果读取成功,则表示有新客户连接到来*/
                ret = recv(socket, (char*)&client, sizeof(client), 0);
                if((ret < 0 && errno != EAGAIN) || ret == 0){
                    continue;
                }
                else{
                    struct sockaddr_in client_address;
                    socklen_t client_addrlength = sizeof(client_address);
                    int connfd = accept(m_listenfd, (struct sockaddr*)&client_address,
                            &client_addrlength);
                    if(connfd<0){
                        printf("errno is %d - %s\n", errno, strerror(errno));
                        continue;
                    }
                    addfd(m_epollfd, connfd);
                    /*模板类T必须实现init方法,以初始化一个客户连接。
                    我们直接使用connfd来索引逻辑处理对象(T类型对象),
                    以提高程序效率*/
                    users[connfd].init(m_epollfd, connfd, client_address);
                }
            }
            /*下面处理子进程收到的信号*/
            else if(socket == sig_pipefd[0] && events[i].events &EPOLLIN){
                int sig;
                char signals[1024];
                ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
                if(ret < 0){
                    continue;
                }
                else{
                    for(int i = 0; i < ret; ++i){
                        switch (signals[i]){
                            case SIGCHLD:{
                                pid_t pid;
                                int stat;
                                while((pid = waitpid(-1, &stat, WNOHANG)) > 0){
                                    continue;
                                }
                                break;
                            }
                            case SIGTERM:
                            case SIGINT:{
                                m_stop = true;
                                break;
                            }
                            default:
                                break;
                        }
                    }
                }
            }
            /*如果是其他可读数据,那么必然是客户请求到来。
            调用逻辑处理对象的process方法处理之*/
            else if(events[i].events & EPOLLIN){
                users[socket].process();
            }
            else{
                continue;
            }
        }
    }    

    delete [] users;
    users = NULL;
    close(pipefd);
    /*提醒:应该由m_listenfd的创建者来关闭这个文件描述符
    即“对象由哪个函数创建,就应该由哪个函数销毁”*/
    //close(m_listenfd);
    close(m_epollfd);
}


template<typename T>
void processpool<T>::run_parent()
{
    setup_sig_pipe();
    /*父进程监听m_listenfd*/
    addfd(m_epollfd, m_listenfd);

    epoll_event events[MAX_EVENT_NUMBER];
    int sub_process_counter = 0;
    int new_conn = 1;
    int number = 0;
    int ret = -1;

    while(!m_stop){
        number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
        if (number < 0 && errno != EINTR){
            printf("epoll failure\n");
            break;
        }

        for(int i = 0; i < number; ++i){
            int sockfd = events[i].data.fd;
            if(sockfd == m_listenfd){
                /*如果有新的连接到来,就采用Round Robin方式
                将其分配给一个子进程处理*/
                int i = sub_process_counter;
                do{
                    if(m_sub_process[i].m_pid != -1){
                        break;
                    }
                    i = (i+1)%m_process_number;
                }
                while (i != sub_process_counter);

                if(m_sub_process[i].m_pid == -1){
                    m_stop = true;
                    break;
                }

                sub_process_counter = (i+1)%m_process_number;
                send(m_sub_process[i].m_pipefd[0], (char*)&new_conn,
                    sizeof(new_conn), 0);
                printf("send request to child %d\n", i);
            }
            /*下面处理父进程接收到的信号*/
            else if(sockfd == sig_pipefd[0] && events[i].events & EPOLLIN){
                int sig;
                char signals[1024];
                ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
                if(ret < 0){
                    continue;
                }
                else{
                    for(int i = 0; i < ret; ++i){
                        switch (signals[i]){
                            case SIGCHLD:{
                                pid_t pid;
                                int stat;
                                while((pid = waitpid(-1, &stat, WNOHANG)) > 0){
                                    for(int i = 0; i < m_process_number; ++i){
                                        /*
                                        如果进程池中第i个子进程退出了,则主进程关闭相应
                                        的通信管道,并设置相应的m_pid为-1, 
                                        以标记该子进程已经退出
                                        */
                                       if(m_sub_process[i].m_pid == pid){
                                           printf("child %d join\n", i);
                                           close(m_sub_process[i].m_pipefd[0]);
                                           m_sub_process[i].m_pid = -1;
                                       }
                                    }
                                }
                                /*如果子进程都已经退出了, 则父进程也退出*/
                                m_stop = true;
                                for(int i = 0; i< m_process_number; ++i){
                                    if(m_sub_process[i].m_pid != -1){
                                        m_stop = false;
                                    }
                                }
                                break;
                            }
                            case SIGTERM:
                            case SIGINT:{
                                /*如果父进程接收到终止信号,那么就杀死所有子进程,
                                并等待它们全部结束。当然,通知子进程结束更好的办法,
                                是向父、子进程间的管道发送特殊数据*/
                                printf("kill all the child now\n");
                                for(int i =0; i < m_process_number; ++i){
                                    int pid = m_sub_process[i].m_pid;
                                    if(pid != -1){
                                        kill(pid, SIGTERM);
                                    }
                                }
                                break;
                            }
                            default:
                                break;
                        }
                    }
                }
            }
            else{
                continue;
            }
        }
    }
    //close(m_listenfd); /*由创建者关闭这个文件描述符*/
    close(m_epollfd);
}



#endif

相关推荐

4万多吨豪华游轮遇险 竟是因为这个原因……

(观察者网讯)4.7万吨豪华游轮搁浅,竟是因为油量太低?据观察者网此前报道,挪威游轮“维京天空”号上周六(23日)在挪威近海发生引擎故障搁浅。船上载有1300多人,其中28人受伤住院。经过数天的调...

“菜鸟黑客”必用兵器之“渗透测试篇二”

"菜鸟黑客"必用兵器之"渗透测试篇二"上篇文章主要针对伙伴们对"渗透测试"应该如何学习?"渗透测试"的基本流程?本篇文章继续上次的分享,接着介绍一下黑客们常用的渗透测试工具有哪些?以及用实验环境让大家...

科幻春晚丨《震动羽翼说“Hello”》两万年星间飞行,探测器对地球的最终告白

作者|藤井太洋译者|祝力新【编者按】2021年科幻春晚的最后一篇小说,来自大家喜爱的日本科幻作家藤井太洋。小说将视角放在一颗太空探测器上,延续了他一贯的浪漫风格。...

麦子陪你做作业(二):KEGG通路数据库的正确打开姿势

作者:麦子KEGG是通路数据库中最庞大的,涵盖基因组网络信息,主要注释基因的功能和调控关系。当我们选到了合适的候选分子,单变量研究也已做完,接着研究机制的时便可使用到它。你需要了解你的分子目前已有哪些...

知存科技王绍迪:突破存储墙瓶颈,详解存算一体架构优势

智东西(公众号:zhidxcom)编辑|韦世玮智东西6月5日消息,近日,在落幕不久的GTIC2021嵌入式AI创新峰会上,知存科技CEO王绍迪博士以《存算一体AI芯片:AIoT设备的算力新选择》...

每日新闻播报(September 14)_每日新闻播报英文

AnOscarstatuestandscoveredwithplasticduringpreparationsleadinguptothe87thAcademyAward...

香港新巴城巴开放实时到站数据 供科技界研发使用

中新网3月22日电据香港《明报》报道,香港特区政府致力推动智慧城市,鼓励公私营机构开放数据,以便科技界研发使用。香港运输署21日与新巴及城巴(两巴)公司签署谅解备忘录,两巴将于2019年第3季度,开...

5款不容错过的APP: Red Bull Alert,Flipagram,WifiMapper

本周有不少非常出色的app推出,鸵鸟电台做了一个小合集。亮相本周榜单的有WifiMapper's安卓版的app,其中包含了RedBull的一款新型闹钟,还有一款可爱的怪物主题益智游戏。一起来看看我...

Qt动画效果展示_qt显示图片

今天在这篇博文中,主要实践Qt动画,做一个实例来讲解Qt动画使用,其界面如下图所示(由于没有录制为gif动画图片,所以请各位下载查看效果):该程序使用应用程序单窗口,主窗口继承于QMainWindow...

如何从0到1设计实现一门自己的脚本语言

作者:dong...

三年级语文上册 仿写句子 需要的直接下载打印吧

描写秋天的好句好段1.秋天来了,山野变成了美丽的图画。苹果露出红红的脸庞,梨树挂起金黄的灯笼,高粱举起了燃烧的火把。大雁在天空一会儿写“人”字,一会儿写“一”字。2.花园里,菊花争奇斗艳,红的似火,粉...

C++|那些一看就很简洁、优雅、经典的小代码段

目录0等概率随机洗牌:1大小写转换2字符串复制...

二年级上册语文必考句子仿写,家长打印,孩子照着练

二年级上册语文必考句子仿写,家长打印,孩子照着练。具体如下:...

一年级语文上 句子专项练习(可打印)

...

亲自上阵!C++ 大佬深度“剧透”:C++26 将如何在代码生成上对抗 Rust?

...

取消回复欢迎 发表评论: