百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

深入理解epoll 之 EPOLLONESHOT 事件

liebian365 2024-10-27 13:13 27 浏览 0 评论

epoll 即使使用ET模式,一个socket上的某个事件还是可能被触发多次。

这种情况在并发程序上会引起一个问题。当一个线程(或进程)读完某个socket上的数据后开始处理这些数据,而在处理这些数据的过程中该socket又有新的数据可读,会再次触发EPOLLIN事件,此时epoll_wait返回后有可能会被另一个线程(或进程)读取,这就导致同一个socket的数据被多个线程(或进程)读取的局面。

而我们期望的是同一个socket应该永远只被一个线程(或进程)处理。

对于这种情况,可以使用epoll中的EPOLLONESHOT事件来实现。

对于socket上注册了EPOLLONESHOT事件,则操作系统最多触发其上注册的一个可读、可写或者异常事件,且只触发一次。若想要再次触发则只能使用epoll_ctl重置该描述符上注册的事件,包括EPOLLONESHOT 事件。

所以若socket上注册EPOLLONESHOT 事件,当一个线程处理该socket上的事件时,其他线程不可能有机会操作该socket的。

当然,如果该线程在这个注册了EPOLLONESHOT 事件的socket处理完毕所有数据后,该线程就应该使用epoll_ctl重置该描述符上注册的事件,以确保这个socket下一次可读时,其EPOLLIN事件能触发,使得其他线程也有机会处理该socket上的事件。


使用EPOLLONESHOT事件的例子大致如下

// 线程处理数据逻辑
void* PthreadWorker(void *arg) {
        
    while(1) {
        int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0);
        if (ret == 0) {
            close(sockfd);
            break;
        }
        else if (ret < 0) {
            if (errno == EAGAIN) {
        
                 //说明该socket上的所有数据都处理完了,则重置epoll事件        


                 epoll_event event;
                 event.data.fd = sockfd;
                 event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
        
                 //该socket上的事件彻底处理完毕后,再把加回去,这样再次触发时,epoll_wait就可以把该socket事件给其他线程处理了
                 epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
  
                 break;
             }
         }else {
                       
             //对接收到的数据进行处理
         }
    }


}


int main(int argc, char *argv[]) {
     ...
     
     int epollfd = epoll_create(5);
 
     while(1) {
         int ret = epoll_wait(...);
               
         for (int i=0; i<ret; i++) {
             int sockfd = events[i].data.fd;
             if (sockfd == listenfd) {
                 int connfd = accept(...);
                 
                 /* 第一次添加 时加上EPOOLLONESHOT事件,这样当epoll_wait 第一次触发后有个线程进行处理,
                  后续epoll就不会再触发该socket上的事件了,直到处理该socket事件彻底处理完后把事件再加到上面
                */
                 evnet.events= EPOLLIN | EPOLLT | EPOOLLONESHOT;          
                 epoll_ctl( epollfd,EPOLL_CTL_ADD, connfd, &event );


              }
              else if (events[i].events & EPOLLIN) {


                  // 生成一个线程进行处理 EPOLLIN事                                
                  pthread_create(&thread, NULL, PthreadWorker, ..);


              }
                        
         }
      }


}


接下来从epoll的源码中分析EPOLLONESHOT事件处理逻辑。


socket套接字的epoll事件注册上去,然后进入epoll_wait等待事件的到来。

当该socket有数据到来后,数据经过网卡,驱动,内核协议栈,到达协议栈处理这些数据时,到达传输层后,进行唤醒用户进程(对于epoll来讲调用ep_poll_callback回调)

tcp_v4_rcv

-> tcp_v4_do_rcv

->tcp_rev_state_process

->tcp_data_queue

->sk->sk_data_read 也即是 sock_def_readable

->wake_up_interruptible(sk->sk_sleep)

->__wake_up()

->curr->func //也即是调用ep_insert 添加的ep_poll_callback回调函数。


在ep_poll_callback回调函数中会把触发事件的epitem 加到就绪队列中,然后唤醒阻塞在epoll_wait 上的用户进程

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
 613{
 614        int pwake = 0;
 615        unsigned long flags;
 616        struct epitem *epi = ep_item_from_wait(wait);
 617        struct eventpoll *ep = epi->ep;
 618
 619        DNPRINTK(3, (KERN_INFO "[%p] eventpoll: poll_callback(%p) epi=%p ep=%p\n",
 620                     current, epi->ffd.file, epi, ep));
 621
 622        spin_lock_irqsave(&ep->lock, flags);
 623
 624        /*
 625         * If the event mask does not contain any poll(2) event, we consider the
 626         * descriptor to be disabled. This condition is likely the effect of the
 627         * EPOLLONESHOT bit that disables the descriptor when an event is received,
 628         * until the next EPOLL_CTL_MOD will be issued.
 629         */
            /*
              #define EP_PRIVATE_BITS (EPOLLONESHOT | EPOLLET),
              也即是判断是否有非 EPOLLONESHOT 和 EPOLLET之外的事件,没有,则goto out_unlock
               */
 630        if (!(epi->event.events & ~EP_PRIVATE_BITS))
 631                goto out_unlock;
 632
 633        /*
 634         * If we are trasfering events to userspace, we can hold no locks
 635         * (because we're accessing user memory, and because of linux f_op->poll()
 636         * semantics). All the events that happens during that period of time are
 637         * chained in ep->ovflist and requeued later on.
 638         */
 639        if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
 640                if (epi->next == EP_UNACTIVE_PTR) {
 641                        epi->next = ep->ovflist;
 642                        ep->ovflist = epi;
 643                }
 644                goto out_unlock;
 645        }
 646
 647        /* If this file is already in the ready list we exit soon */
 648        if (ep_is_linked(&epi->rdllink))
 649                goto is_linked;
 650        //把触发的事件加到就绪队列中
 651        list_add_tail(&epi->rdllink, &ep->rdllist);
 652
 653is_linked:
 654        /*
 655         * Wake up ( if active ) both the eventpoll wait list and the ->poll()
 656         * wait list.
 657         */
             //唤醒阻塞在epoll_wait 上的用户进程
 658        if (waitqueue_active(&ep->wq))
 659                __wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE |
 660                                 TASK_INTERRUPTIBLE);
 661        if (waitqueue_active(&ep->poll_wait))
 662                pwake++;
 663
 664out_unlock:
 665        spin_unlock_irqrestore(&ep->lock, flags);
 666
 667        /* We have to call this outside the lock */
 668        if (pwake)
 669                ep_poll_safewake(&psw, &ep->poll_wait);
 670
 671        return 1;
 672}


阻塞在epoll_wait上的用户进程被唤醒,处理就绪队列上的event事件,把这些事件发送到用户态给用户进程

sys_epoll_wait

-->ep_poll

--->ep_send_events

在ep_send_events 中把事件发送给用户态后,把该事件

除了EPOLLONESHOT | EPOLLET 事件之外的所有事件都注销掉,所以 该事件由于没有了可触发的事件,所以该事件不会再触发了,直到用户进程把事件重新加上去。

static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events,
 873                          int maxevents)
 874{
 875        int eventcnt, error = -EFAULT, pwake = 0;
 876        unsigned int revents;
 877        unsigned long flags;
 878        struct epitem *epi, *nepi;
 879        struct list_head txlist;
 880
 881        INIT_LIST_HEAD(&txlist);
 882
 883        /*
 884         * We need to lock this because we could be hit by
 885         * eventpoll_release_file() and epoll_ctl(EPOLL_CTL_DEL).
 886         */
 887        mutex_lock(&ep->mtx);
 888
 889        /*
 890         * Steal the ready list, and re-init the original one to the
 891         * empty list. Also, set ep->ovflist to NULL so that events
 892         * happening while looping w/out locks, are not lost. We cannot
 893         * have the poll callback to queue directly on ep->rdllist,
 894         * because we are doing it in the loop below, in a lockless way.
 895         */
 896        spin_lock_irqsave(&ep->lock, flags);
 897        list_splice(&ep->rdllist, &txlist);
 898        INIT_LIST_HEAD(&ep->rdllist);
 899        ep->ovflist = NULL;
 900        spin_unlock_irqrestore(&ep->lock, flags);
 901
 902        /*
 903         * We can loop without lock because this is a task private list.
 904         * We just splice'd out the ep->rdllist in ep_collect_ready_items().
 905         * Items cannot vanish during the loop because we are holding "mtx".
 906         */
 907        for (eventcnt = 0; !list_empty(&txlist) && eventcnt < maxevents;) {
 908                epi = list_first_entry(&txlist, struct epitem, rdllink);
 909
 910                list_del_init(&epi->rdllink);
 911
 912                /*
 913                 * Get the ready file event set. We can safely use the file
 914                 * because we are holding the "mtx" and this will guarantee
 915                 * that both the file and the item will not vanish.
 916                 */
 917                revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);
 918                revents &= epi->event.events;
 919
 920                /*
 921                 * Is the event mask intersect the caller-requested one,
 922                 * deliver the event to userspace. Again, we are holding
 923                 * "mtx", so no operations coming from userspace can change
 924                 * the item.
 925                 */
                    /* 把触发的事件到用户的evnets 事件数组中。*/
 926                if (revents) {
 927                        if (__put_user(revents,
 928                                       &events[eventcnt].events) ||
 929                            __put_user(epi->event.data,
 930                                       &events[eventcnt].data))
 931                                goto errxit;
 
                             //若该事件注册了EPOLLONESHOT,则把除了EPOLLONESHOT | EPOLLET 事件之外的所有事件都注销掉
 932                        if (epi->event.events & EPOLLONESHOT)
 933                                epi->event.events &= EP_PRIVATE_BITS;
 934                        eventcnt++;
 935                }
 936                /*
 937                 * At this point, noone can insert into ep->rdllist besides
 938                 * us. The epoll_ctl() callers are locked out by us holding
 939                 * "mtx" and the poll callback will queue them in ep->ovflist.
 940                 */
 941                if (!(epi->event.events & EPOLLET) &&
 942                    (revents & epi->event.events))
 943                        list_add_tail(&epi->rdllink, &ep->rdllist);
 944        }
 945        error = 0;
 946
 947errxit:
 948
 949        spin_lock_irqsave(&ep->lock, flags);
 950        /*
 951         * During the time we spent in the loop above, some other events
 952         * might have been queued by the poll callback. We re-insert them
 953         * here (in case they are not already queued, or they're one-shot).
 954         */
 955        for (nepi = ep->ovflist; (epi = nepi) != NULL;
 956             nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
 957                if (!ep_is_linked(&epi->rdllink) &&
 958                    (epi->event.events & ~EP_PRIVATE_BITS))
 959                        list_add_tail(&epi->rdllink, &ep->rdllist);
 960        }
 961        /*
 962         * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
 963         * releasing the lock, events will be queued in the normal way inside
 964         * ep->rdllist.
 965         */
 966        ep->ovflist = EP_UNACTIVE_PTR;
 967
 968        /*
 969         * In case of error in the event-send loop, or in case the number of
 970         * ready events exceeds the userspace limit, we need to splice the
 971         * "txlist" back inside ep->rdllist.
 972         */
 973        list_splice(&txlist, &ep->rdllist);
 974
 975        if (!list_empty(&ep->rdllist)) {
 976                /*
 977                 * Wake up (if active) both the eventpoll wait list and the ->poll()
 978                 * wait list (delayed after we release the lock).
 979                 */
 980                if (waitqueue_active(&ep->wq))
 981                        __wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE |
 982                                         TASK_INTERRUPTIBLE);
 983                if (waitqueue_active(&ep->poll_wait))
 984                        pwake++;
 985        }
 986        spin_unlock_irqrestore(&ep->lock, flags);
 987
 988        mutex_unlock(&ep->mtx);
 989
 990        /* We have to call this outside the lock */
 991        if (pwake)
 992                ep_poll_safewake(&psw, &ep->poll_wait);
 993
 994        return eventcnt == 0 ? error: eventcnt;
 995}


对于边缘触发,就绪队列的事件发送给用户进程后还会加到就绪队列中,这样事件下次还会触发。但是由于发送给用户进程时,把事件注销掉了,所以下次触发时进入ep_send_events后,会进行和epi->event.events进行运算,而此时epi->event.events 由于上次注销了事件,所以此时revents为0,所以不会再发送给用户进程。

917                revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);
918                revents &= epi->event.events;  //此时  revents 为0
                   if (revents) {
 927                        if (__put_user(revents,
 928                                       &events[eventcnt].events) ||
 929                            __put_user(epi->event.data,
 930                                       &events[eventcnt].data))
 931                                goto errxit;
 
                             //若该事件注册了EPOLLONESHOT,则把除了EPOLLONESHOT | EPOLLET 事件之外的所有事件都注销掉
 932                        if (epi->event.events & EPOLLONESHOT)
 933                                epi->event.events &= EP_PRIVATE_BITS;
 934                        eventcnt++;
 935                }

相关推荐

go语言也可以做gui,go-fltk让你做出c++级别的桌面应用

大家都知道go语言生态并没有什么好的gui开发框架,“能用”的一个手就能数的清,好用的就更是少之又少。今天为大家推荐一个go的gui库go-fltk。它是通过cgo调用了c++的fltk库,性能非常高...

旧电脑的首选系统:TinyCore!体积小+精简+速度极快,你敢安装吗

这几天老毛桃整理了几个微型Linux发行版,准备分享给大家。要知道可供我们日常使用的Linux发行版有很多,但其中的一些发行版经常会被大家忽视。其实这些微型Linux发行版是一种非常强大的创新:在一台...

codeblocks和VS2019下的fltk使用中文

在fltk中用中文有点问题。英文是这样。中文就成这个样子了。我查了查资料,说用UTF-8编码就行了。edit->Fileencoding->UTF-8然后保存文件。看下下边的编码指示确...

FLTK(Fast Light Toolkit)一个轻量级的跨平台Python GUI库

FLTK(FastLightToolkit)是一个轻量级的跨平台GUI库,特别适用于开发需要快速、高效且简单界面的应用程序。本文将介绍Python中的FLTK库,包括其特性、应用场景以及如何通过代...

中科院开源 RISC-V 处理器“香山”流片,已成功运行 Linux

IT之家1月29日消息,去年6月份,中科院大学教授、中科院计算所研究员包云岗,发布了开源高性能RISC-V处理器核心——香山。近日,包云岗在社交平台晒出图片,香山芯片已流片,回片后...

Linux 5.13内核有望合并对苹果M1处理器支持的初步代码

预计Linux5.13将初步支持苹果SiliconM1处理器,不过完整的支持工作可能还需要几年时间才能完全完成。虽然Linux已经可以在苹果SiliconM1上运行,但这需要通过一系列的补丁才能...

Ubuntu系统下COM口测试教程(ubuntu port)

1、在待测试的板上下载minicom,下载minicom有两种方法:方法一:在Ubuntu软件中心里面搜索下载方法二:按“Ctrl+Alt+T”打开终端,打开终端后输入“sudosu”回车;在下...

湖北嵌入式软件工程师培训怎么选,让自己脱颖而出

很多年轻人毕业即失业、面试总是不如意、薪酬不满意、在家躺平。“就业难”该如何应对,参加培训是否能改变自己的职业走向,在湖北,有哪些嵌入式软件工程师培训怎么选值得推荐?粤嵌科技在嵌入式培训领域有十几年经...

新阁上位机开发---10年工程师的Modbus总结

前言我算了一下,今年是我跟Modbus相识的第10年,从最开始的简单应用到协议了解,从协议开发到协议讲解,这个陪伴了10年的协议,它一直没变,变的只是我对它的理解和认识。我一直认为Modbus协议的存...

创建你的第一个可运行的嵌入式Linux系统-5

@ZHangZMo在MicrochipBuildroot中配置QT5选择Graphic配置文件增加QT5的配置修改根文件系统支持QT5修改output/target/etc/profile配置文件...

如何在Linux下给zigbee CC2530实现上位机

0、前言网友提问如下:粉丝提问项目框架汇总下这个网友的问题,其实就是实现一个网关程序,内容分为几块:下位机,通过串口与上位机相连;下位机要能够接收上位机下发的命令,并解析这些命令;下位机能够根据这些命...

Python实现串口助手 - 03串口功能实现

 串口调试助手是最核心的当然是串口数据收发与显示的功能,pzh-py-com借助的是pySerial库实现串口收发功能,今天痞子衡为大家介绍pySerial是如何在pzh-py-com发挥功能的。一、...

为什么选择UART(串口)作为调试接口,而不是I2C、SPI等其他接口

UART(通用异步收发传输器)通常被选作调试接口有以下几个原因:简单性:协议简单:UART的协议非常简单,只需设置波特率、数据位、停止位和校验位就可以进行通信。相比之下,I2C和SPI需要处理更多的通...

同一个类,不同代码,Qt 串口类QSerialPort 与各种外设通讯处理

串口通讯在各种外设通讯中是常见接口,因为各种嵌入式CPU中串口标配,工业控制中如果不够还通过各种串口芯片进行扩展。比如spi接口的W25Q128FV.对于软件而言,因为驱动接口固定,软件也相对好写,因...

嵌入式linux为什么可以通过PC上的串口去执行命令?

1、uboot(负责初始化基本硬bai件,如串口,网卡,usb口等,然du后引导系统zhi运行)2、linux系统(真正的操作系统)3、你的应用程序(基于操作系统的软件应用)当你开发板上电时,u...

取消回复欢迎 发表评论: