百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

多进程编程 - 共享内存 多线程共享内存变量

liebian365 2024-10-27 13:13 26 浏览 0 评论

共享内存是最高效的IPC机制,因为它不涉及进程之间的任何数据传输。这种高效率带来的问题是,我们必须用i其他辅助手段来同步进程对共享内存的访问,否则会产生竞态条件。因此,共享内存通常和其他进程间通信方式一起使用。

Linux共享内存的API都定义在sys/shm.h头文件中,包括4个系统调用:shmget、shmat、shmdt和shmctl。

shmget系统调用

shmget系统调用创建一段新的共享内存,或者获取一段已经存在的共享内存。定义如下:

#include <sys/shm.h>
int shmget(key_t key, size_t size, int shmflg);

和semget系统调用一样,key参数是一个键值,用来标识一段全局唯一的共享内存。size参数指定共享内存的大小,单位是字节。如果是创建新的共享内存,则size值必须指定。如果是获取已经存在的共享内存,则可以把size设置为0。

shmflg参数的使用和含义与semget系统调用的sem_flags参数相同。不过shmget支持两个额外的标志:

  • SHM_HUGETLB:类似于mmap的MAP_HUGETLB标志,系统将使用“大页面”来为共享内存分配空间
  • SHM_NORESERVE:类似于mmap的MAP_NORESERVE,不为共享内存保留交换分区(swap空间)。这样,当物理内存不足的时候,对该共享内存执行写操作将触发SIGSEGV信号。

shmget成功时返回一个正整数值,它是共享内存的标识符。shmget失败时返回-1,并设置errno。

如果shmget用于创建共享内存,则这段共享内存的所有自己都被初始化为0,与之关联的内核数据结构shmid_ds将被创建并初始化。shmid_ds结构体的定义如下:

struct shmid_ds
{
    struct ipc_prem shm_prem;   /*共享内存的操作权限*/
    size_t shm_segsz;           /*共享内存大小,单位是字节*/
    __time_t shm_atime;         /*对这段内存最后一次调用shmat的时间*/
    __time_t shm_dtime;         /*对这段内存最后一次调用shmdt的时间*/
    __time_t shm_ctime;         /*对这段内存最后一次调用shmctl的时间*/
    __pid_t shm_cpid;           /*创建者PID*/
    __pid_t shm_lpid;           /*最后一次执行shmat或shmdt操作的进程PID*/
    shmatt_t shm_nattach;       /*目前关联到此共享内存的进程数量*/
    /*省略一下填充字段*/
}

shmget对shmid_ds结构体的初始化包括:

  • 将shm_perm.cuid和shm_perm.uid设置为调用进程的有效用户ID
  • 将shm_perm.cgid和shm_perm.gid设置为调用进程的有效组ID
  • 将shm_perm.mode的最低9位设置为shmflg参数的最低9位
  • 将shm_segsz设置为size
  • 将shm_lpid、shm_nattach、shm_atime、shm_dtime设置为0
  • 将shm_ctime设置为当前的时间

shmat和shmdt调用

共享内存被创建/获取之后,我们不能立即访问它,而是需要先将它关联到进程的地址空间中。使用完共享内存之后,我们也需要将它从进程地址空间中分离。这两项任务分别由两个系统调用实现:

#include <sys/shm.h>

void* shmat(int shm_id, const void *shm_addr, int shmflg);

int shmdt(const void* shm_addr);

其中,shm_id参数是由shmget调用返回的共享内存标识符。

shm_addr参数指定将共享内存关联到进程的哪块地址空间,最终的效果还受到shmflg参数的可选标志SHM_RND影响:

  • 如果shm_addr为NULL,则被关联的地址由操作系统选择。这是推荐的做法,以确保代码的可移植性。
  • 如果shm_addr非空,并且SHM_RND标志未被设置,则共享内存被关联到addr指定的地址处。
  • 如果shm_addr非空,并且设置了SHM_RND标志,则被关联的地址是[shm_addr - (shm_addr % SHMLBA)]。SHMLBA的含义是“段低端边界地址倍数”(Segment Low Boundary Address Multiple),它必须是内存页面大小PAGE_SIZE的整数倍。现在的Linux内核中,它等于一个内存页大小。SHM_RND的含义是圆整(round),即将共享内存被关联的地址向下圆整到离shm_addr最近的SHMLBA的整数被地址处。

除了SHM_RND标志外,shmflg参数还支持如下标志:

  • SHM_RDONLY:进程仅能读取共享内存中的内容。若没有指定该标志,则进程可同时对共享内存进行读写操作(当然,这需要在创建共享内存的时候指定其读写权限)
  • SHM_REMAP:如果地址shmaddr已经被关联到一段共享内存上,则重新关联
  • SHM_EXEC:它指定对共享内存段的执行权限。对共享内存而言,执行权限实际上和读权限是一样的。

shmat成功时返回共享内存被关联到的地址,失败则返回(void*)-1并设置errno。shmat成功时,将修改内核数据结构shmid_ds的部分字段:

  • 将shm_nattach加1
  • 将shm_lpid设置为调用进程PID
  • 将shm_atime设置为当前的时间

shmdt函数将关联到的shm_addr处的共享内存从进程中分离。它成功时返回0,失败则返回-1并设置errno。shmdt在成功调用时将修改内核数据结构shmid_ds的部分字段:

  • 将shm_nattach减1
  • 将shm_lpid设置为调用进程的PID
  • 将shm_dtime设置为当前的时间

shmctl系统调用

shmctl系统调用控制共享内存的某些属性。定义如下:

#include <sys/shm.h>
int shmctl(int shm_id, int command, struct shmid_ds* buf);

其中,shm_id参数是由shmget调用返回的共享内存标识符。command参数指定要执行的命令。shm_ctl支持的所有命令如下表:

shmctl成功时的返回值取决于command参数,如上表,失败是返回-1,并设置errno。

共享内存的POSIX方法

mmap函数利用它的MAP_ANONYMOUS标志可以实现父、子进程之间的匿名内存共享。通过打开同一个文件按,mmap可以实现无关进程之间的内存共享。Linux提供了另外一种利用mmap在无关进程之间共享内存的方式。这种方式无需任何文件的支持,但它需要先使用如下函数创建或打开一个POSIX可移植操作系统接口(Portable Operating System Interface of UNIX,缩写为POSIX)共享内存对象:

#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
int shm_open(const char* name, int oflag, mode_t mode);

shm_open的使用方法与open系统调用完全相同。

name参数指定要创建/打开的内存。从可移植性的角度考虑,该参数应该使用“/somename”的格式:

以“/”开始,后接多个字符,且这些字符都不是“/”;

以“\0"结尾,长度不超过NAME_MAX(通常是255)

oflag参数指定创建方式。它可以是下列标志中的一个或者多个按位或:

  • O_RDONLY:以只读方式打开共享内存对象
  • O_RDWR:以可读、可写方式打开内存共享对象
  • O_CREAT:如果共享内存对象不存在,则创建之。此时mode参数的最低9位将指定该共享内存对象的访问权限。共享内存对象被创建的时候,其初始长度为0。
  • O_EXCL:和O_CREAT一起使用,如果由name指定的共享内存已经存在,则shm_open调用返回错误,否则就创建一个新的共享内存对象
  • O_TRUNC:如果共享内存已经存在,则把它截断,使其长度为0

shm_open调用成功时返回一个文件描述符。该文件描述符可用于后续的mmap调用,从而将共享内存关联到调用进程。shm_open失败时返回-1,并设置errno。

和打开的文件最后需要关闭一样,由shm_open创建的共享内存对象使用完之后也需要被删除。这个通过如下函数实现:

#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
int shm_unlink(const char* name);

该函数将name参数指定的共享内存对象标记为等待删除。当所有使用该共享内存对象的进程都使用ummap将它从进程中分离之后,系统将销毁这个共享内存对象所占据的资源。

如果代码中使用了上述POSIX共享内存函数,则编译的时候需要指定链接选项-ltr。

共享内存实例

I/O复用高级应用中我们介绍过过一个聊天室程序。下面我们将它修改为一个多进程服务器:一个子进程处理一个客户连接。同时,我们将所有客户socket连接的读缓冲设计为一块共享内存:

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>

#define USER_LIMIT 5
#define BUFFER_SIZE 1024
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMIT 65536

/*处理一个客户连接必要的数据*/
struct client_data
{
    sockaddr_in address ; /*客户端socket地址*/
    int connfd;           /*socket文件描述符*/
    pid_t pid;            /*处理这个连接的子进程PID*/
    int pipefd[2];        /*和父进程通信用的管道*/
};

static const char* shm_name = "/my_shm";
int sig_pipefd[2];
int epollfd;
int listenfd;
int shmfd;

char * share_mem = 0;
/*客户连接数组。进程使用客户连接的编号来索引这个数据,及可取得相关的客户连接数据*/
client_data* users = 0;
/*子进程与客户连接的映射关系,用进程PID来索引这个数组。即可取得该进程所处理的客户链接的编号*/
int* sub_porcess = 0;
/*当前客户数量*/
int user_count = 0;
bool stop_child = false;


int setnonblocking(int fd)
{
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

void addfd(int epollfd, int fd)
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    setnonblocking(fd);
}

void sig_handler(int sig)
{
    int save_errno = errno;
    int msg = sig;
    send(sig_pipefd[1], (char*)&msg, 1, 0);
    errno = save_errno;
}

void addsig(int sig, void(*handler)(int), bool restart = true)
{
    struct sigaction sa;
    memset(&sa, '\0', sizeof(sa));
    sa.sa_handler = handler;
    if(restart){
        sa.sa_flags |= SA_RESTART;
    }
    sigfillset(&sa.sa_mask);
    assert(sigaction(sig, &sa, NULL) != -1);
}

void del_resource()
{
    close(sig_pipefd[0]);
    close(sig_pipefd[1]);
    close(listenfd);
    close(epollfd);
    shm_unlink(shm_name);
    delete [] users;
    delete [] sub_porcess;
}

/*停止一个子进程*/
void child_term_handler(int sig)
{
    stop_child = true;
}

/*子进程运行的函数,
参数idx指出该子进程处理客户链接的编号
users是保存连接数据的数组
参数share_mem指出共享内存的其实地址*/

int  run_child(int idx, client_data* users, char* share_mem)
{
    epoll_event events[MAX_EVENT_NUMBER];
    /*子进程使用I/O复用技术来同时监听两个文件描述副:
    客户连接socket、
    与父进程通信的管道文件描述符*/
    int child_epollfd = epoll_create(5);
    assert(child_epollfd != -1);
    int connfd = users[idx].connfd;
    addfd(child_epollfd, connfd);
    int pipefd = users[idx].pipefd[1];
    addfd(child_epollfd, pipefd);
    int ret = 0;

    /*子进程需要设置自己的信号处理函数*/
    addsig(SIGTERM, child_term_handler, false);

    while (!stop_child){
        int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1);
        if(number < 0 && errno != EINTR){
            printf("epoll failure\n");
            break;
        }

        for(int i = 0; i < number; ++i)
        {
            int sockfd = events[i].data.fd;
            /*本子进程负责的客户连接有数据到达*/
            if(sockfd == connfd && events[i].events & EPOLLIN){
                memset(share_mem + idx* BUFFER_SIZE, '\0', BUFFER_SIZE);
                /*将客户数据读取到对应的读缓存中。
                读缓存是共享内存的一段,它开始去idx*BUffER_SIZE处,长度为BUFFER_SIZE
                每个客户连接的读缓存是共享的*/
                ret = recv(connfd, share_mem + idx*BUFFER_SIZE, BUFFER_SIZE-1, 0);
                if(ret < 0){
                    if(errno != EAGAIN){
                        stop_child = true;
                    }
                }
                else if(ret == 0){
                    stop_child = true;
                }
                else{
                    /*成功读取客户端数据后就通知主进程(通过管道)来处理*/
                    send(pipefd, (char*)&idx, sizeof(idx), 0);
                }
            }
            /*主进程通知本进程(通过管道)将第client个客户的数据发送到本进程负责的客户端*/
            else if(sockfd == pipefd && events[i].events & EPOLLIN){
                int client = 0;
                /*接受主进程发送过来的数据,即有客户端数据到达的连接编号*/
                ret = recv(sockfd, (char*)&client, sizeof(client), 0);
                if(ret < 0){
                    if(errno != EAGAIN){
                        stop_child = true;
                    }
                }
                else if(ret = 0){
                    stop_child = true;
                }
                else{
                    send(connfd, share_mem + client*BUFFER_SIZE, BUFFER_SIZE, 0);
                }
            }
            else{
                continue;
            }
        }
    }

    close(connfd);
    close(pipefd);
    close(child_epollfd);
    return 0;
}

int main(int argc, char const *argv[])
{
    if(argc <= 2){
        printf("usage: %s ip_address port_number\n", basename(argv[0]));
        return 1;
    }

    const char* ip = argv[1];
    int port = atoi(argv[2]);

    int ret = 0;
    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);

    listenfd = socket(PF_INET, SOCK_STREAM, 0);
    assert(listenfd >= 0);

    ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
    assert(ret != -1);

    ret = listen(listenfd, 5);
    assert(ret != -1);

    user_count = 0;
    users = new client_data[USER_LIMIT+1];
    sub_porcess = new int[PROCESS_LIMIT];
    for(int i = 0; i < PROCESS_LIMIT; ++i){
        sub_porcess[i] = -1;
    }

    epoll_event events[MAX_EVENT_NUMBER];
    epollfd = epoll_create(5);
    assert(epollfd != -1);
    addfd(epollfd, listenfd);

    ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd);
    assert(ret != -1);
    setnonblocking(sig_pipefd[1]);
    addfd(epollfd, sig_pipefd[0]);

    addsig(SIGCHLD, sig_handler);
    addsig(SIGTERM, sig_handler);
    addsig(SIGINT, sig_handler);
    addsig(SIGPIPE, SIG_IGN);

    bool stop_server = false;
    bool terminiate = false;

    /*创建共享内存,作为所有客户socket链接的读缓存*/
    shmfd = shm_open(shm_name, O_CREAT| O_RDWR, 0666);
    assert(shmfd != -1);
    ret = ftruncate(shmfd, USER_LIMIT* BUFFER_SIZE);
    assert(ret != -1);

    share_mem = (char*)mmap(NULL, USER_LIMIT*BUFFER_SIZE, PROT_READ|
                    PROT_WRITE, MAP_SHARED, shmfd, 0);
    assert(share_mem != MAP_FAILED);
    close(shmfd);

    while(!stop_server){
        int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
        if(number < 0 && errno != EINTR){
            printf("epoll failure\n");
            break;
        }

        for(int i=0; i < number; ++i){
            int sockfd = events[i].data.fd;
            /*新的客户连接*/
            if(sockfd == listenfd){
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);
                int connfd = accept(listenfd, (struct sockaddr*)&client_address,
                    &client_addrlength);
                if(connfd < 0){
                    printf("errno is %d\n", errno);
                    continue;
                }
                if(user_count >= USER_LIMIT){
                    const char *info = "too many users\n";
                    printf("%s", info);
                    send(connfd, info, strlen(info), 0);
                    close(connfd);
                    continue;
                }
                /*保存第user_count个客户连接的相关数据*/
                users[user_count].address = client_address;
                users[user_count].connfd = connfd;
                /*在主进程和子进程间建立管道,以传递必要的数据*/
                ret = socketpair(PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd);
                assert(ret != -1);
                pid_t pid = fork();
                if(pid < 0){
                    close(connfd);
                    continue;
                }
                else if (pid == 0){
                    close(epollfd);
                    close(listenfd);
                    close(users[user_count].pipefd[0]);
                    close(sig_pipefd[0]);
                    close(sig_pipefd[1]);

                    run_child(user_count, users, share_mem);
                    munmap((void*)share_mem, USER_LIMIT* BUFFER_SIZE);
                    exit(0);
                }
                else{
                    close(connfd);
                    close(users[user_count].pipefd[1]);
                    addfd(epollfd, users[user_count].pipefd[0]);
                    users[user_count].pid = pid;
                    
                    /*记录新的客户连接在数组users中的索引值,
                    建立进程pid和该索引值之间的映射关系*/
                    sub_porcess[pid] = user_count;
                    user_count++;
                }
            }
            /*处理信号事件*/
            else if(sockfd == sig_pipefd[0] && events[i].events & EPOLLIN){
                int sig;
                char signals[1024];
                ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
                if(ret == -1){
                    continue;
                }
                else if(ret == 0){
                    continue;
                }
                else{
                    for(int i = 0; i < ret; ++i){
                        switch (signals[i])
                        {
                        case SIGCHLD:{
                            /*子进程退出,表示有某个客户端关闭了连接*/
                            pid_t pid;
                            int stat;
                            while((pid = waitpid(-1, &stat, WNOHANG)) > 0){
                                int del_user = sub_porcess[pid];
                                sub_porcess[pid] =  -1;
                                if(del_user < 0 || del_user > USER_LIMIT){
                                    continue;
                                }
                                epoll_ctl(epollfd, EPOLL_CTL_DEL, 
                                    users[del_user].pipefd[0],0);
                                close(users[del_user].pipefd[0]);
                                users[del_user] = users[--user_count];
                                sub_porcess[users[del_user].pid] = del_user;
                            }
                            if(terminiate && user_count == 0){
                                stop_server = true;
                            }
                            break;
                        }
                        case SIGTERM:
                        case SIGINT:{
                            /*结束服务程序*/
                            printf("kill all the child now\n");
                            if(user_count == 0){
                                stop_server = true;
                                break;
                            }
                            for(int i = 0; i < user_count; ++i){
                                int pid = users[i].pid;
                                kill(pid, SIGTERM);
                            }
                            terminiate = true;
                            break;
                        }
                        default:
                            break;
                        }
                    }
                }
            }
            /*某个紫禁城向父进程写入了数据*/
            else if(events[i].events &EPOLLIN){
                int child = 0;
                ret = recv(sockfd, (char*)&child, sizeof(child),0);
                printf("read data from child process accross pipe\n");
                if(ret == -1){
                    continue;
                } 
                else if(ret == 0){
                    continue;
                }
                else{
                    /*向除负责处理第child和客户链接的子进程之外的其他子进程发送消息,
                    通知有客户要写*/
                    for(int j = 0; j < user_count; ++j){
                        if(users[j].pipefd[0] != sockfd){
                            printf("send data to child accross pipe\n");
                            send(users[j].pipefd[0], (char*)&child, sizeof(child), 0);
                        }
                    }
                }
            }
        }
    }

    del_resource();
    return 0;
}

相关推荐

go语言也可以做gui,go-fltk让你做出c++级别的桌面应用

大家都知道go语言生态并没有什么好的gui开发框架,“能用”的一个手就能数的清,好用的就更是少之又少。今天为大家推荐一个go的gui库go-fltk。它是通过cgo调用了c++的fltk库,性能非常高...

旧电脑的首选系统:TinyCore!体积小+精简+速度极快,你敢安装吗

这几天老毛桃整理了几个微型Linux发行版,准备分享给大家。要知道可供我们日常使用的Linux发行版有很多,但其中的一些发行版经常会被大家忽视。其实这些微型Linux发行版是一种非常强大的创新:在一台...

codeblocks和VS2019下的fltk使用中文

在fltk中用中文有点问题。英文是这样。中文就成这个样子了。我查了查资料,说用UTF-8编码就行了。edit->Fileencoding->UTF-8然后保存文件。看下下边的编码指示确...

FLTK(Fast Light Toolkit)一个轻量级的跨平台Python GUI库

FLTK(FastLightToolkit)是一个轻量级的跨平台GUI库,特别适用于开发需要快速、高效且简单界面的应用程序。本文将介绍Python中的FLTK库,包括其特性、应用场景以及如何通过代...

中科院开源 RISC-V 处理器“香山”流片,已成功运行 Linux

IT之家1月29日消息,去年6月份,中科院大学教授、中科院计算所研究员包云岗,发布了开源高性能RISC-V处理器核心——香山。近日,包云岗在社交平台晒出图片,香山芯片已流片,回片后...

Linux 5.13内核有望合并对苹果M1处理器支持的初步代码

预计Linux5.13将初步支持苹果SiliconM1处理器,不过完整的支持工作可能还需要几年时间才能完全完成。虽然Linux已经可以在苹果SiliconM1上运行,但这需要通过一系列的补丁才能...

Ubuntu系统下COM口测试教程(ubuntu port)

1、在待测试的板上下载minicom,下载minicom有两种方法:方法一:在Ubuntu软件中心里面搜索下载方法二:按“Ctrl+Alt+T”打开终端,打开终端后输入“sudosu”回车;在下...

湖北嵌入式软件工程师培训怎么选,让自己脱颖而出

很多年轻人毕业即失业、面试总是不如意、薪酬不满意、在家躺平。“就业难”该如何应对,参加培训是否能改变自己的职业走向,在湖北,有哪些嵌入式软件工程师培训怎么选值得推荐?粤嵌科技在嵌入式培训领域有十几年经...

新阁上位机开发---10年工程师的Modbus总结

前言我算了一下,今年是我跟Modbus相识的第10年,从最开始的简单应用到协议了解,从协议开发到协议讲解,这个陪伴了10年的协议,它一直没变,变的只是我对它的理解和认识。我一直认为Modbus协议的存...

创建你的第一个可运行的嵌入式Linux系统-5

@ZHangZMo在MicrochipBuildroot中配置QT5选择Graphic配置文件增加QT5的配置修改根文件系统支持QT5修改output/target/etc/profile配置文件...

如何在Linux下给zigbee CC2530实现上位机

0、前言网友提问如下:粉丝提问项目框架汇总下这个网友的问题,其实就是实现一个网关程序,内容分为几块:下位机,通过串口与上位机相连;下位机要能够接收上位机下发的命令,并解析这些命令;下位机能够根据这些命...

Python实现串口助手 - 03串口功能实现

 串口调试助手是最核心的当然是串口数据收发与显示的功能,pzh-py-com借助的是pySerial库实现串口收发功能,今天痞子衡为大家介绍pySerial是如何在pzh-py-com发挥功能的。一、...

为什么选择UART(串口)作为调试接口,而不是I2C、SPI等其他接口

UART(通用异步收发传输器)通常被选作调试接口有以下几个原因:简单性:协议简单:UART的协议非常简单,只需设置波特率、数据位、停止位和校验位就可以进行通信。相比之下,I2C和SPI需要处理更多的通...

同一个类,不同代码,Qt 串口类QSerialPort 与各种外设通讯处理

串口通讯在各种外设通讯中是常见接口,因为各种嵌入式CPU中串口标配,工业控制中如果不够还通过各种串口芯片进行扩展。比如spi接口的W25Q128FV.对于软件而言,因为驱动接口固定,软件也相对好写,因...

嵌入式linux为什么可以通过PC上的串口去执行命令?

1、uboot(负责初始化基本硬bai件,如串口,网卡,usb口等,然du后引导系统zhi运行)2、linux系统(真正的操作系统)3、你的应用程序(基于操作系统的软件应用)当你开发板上电时,u...

取消回复欢迎 发表评论: