百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

深入理解epoll 之 EPOLLONESHOT 事件

liebian365 2024-10-27 13:13 17 浏览 0 评论

epoll 即使使用ET模式,一个socket上的某个事件还是可能被触发多次。

这种情况在并发程序上会引起一个问题。当一个线程(或进程)读完某个socket上的数据后开始处理这些数据,而在处理这些数据的过程中该socket又有新的数据可读,会再次触发EPOLLIN事件,此时epoll_wait返回后有可能会被另一个线程(或进程)读取,这就导致同一个socket的数据被多个线程(或进程)读取的局面。

而我们期望的是同一个socket应该永远只被一个线程(或进程)处理。

对于这种情况,可以使用epoll中的EPOLLONESHOT事件来实现。

对于socket上注册了EPOLLONESHOT事件,则操作系统最多触发其上注册的一个可读、可写或者异常事件,且只触发一次。若想要再次触发则只能使用epoll_ctl重置该描述符上注册的事件,包括EPOLLONESHOT 事件。

所以若socket上注册EPOLLONESHOT 事件,当一个线程处理该socket上的事件时,其他线程不可能有机会操作该socket的。

当然,如果该线程在这个注册了EPOLLONESHOT 事件的socket处理完毕所有数据后,该线程就应该使用epoll_ctl重置该描述符上注册的事件,以确保这个socket下一次可读时,其EPOLLIN事件能触发,使得其他线程也有机会处理该socket上的事件。


使用EPOLLONESHOT事件的例子大致如下

// 线程处理数据逻辑
void* PthreadWorker(void *arg) {
        
    while(1) {
        int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0);
        if (ret == 0) {
            close(sockfd);
            break;
        }
        else if (ret < 0) {
            if (errno == EAGAIN) {
        
                 //说明该socket上的所有数据都处理完了,则重置epoll事件        


                 epoll_event event;
                 event.data.fd = sockfd;
                 event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
        
                 //该socket上的事件彻底处理完毕后,再把加回去,这样再次触发时,epoll_wait就可以把该socket事件给其他线程处理了
                 epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
  
                 break;
             }
         }else {
                       
             //对接收到的数据进行处理
         }
    }


}


int main(int argc, char *argv[]) {
     ...
     
     int epollfd = epoll_create(5);
 
     while(1) {
         int ret = epoll_wait(...);
               
         for (int i=0; i<ret; i++) {
             int sockfd = events[i].data.fd;
             if (sockfd == listenfd) {
                 int connfd = accept(...);
                 
                 /* 第一次添加 时加上EPOOLLONESHOT事件,这样当epoll_wait 第一次触发后有个线程进行处理,
                  后续epoll就不会再触发该socket上的事件了,直到处理该socket事件彻底处理完后把事件再加到上面
                */
                 evnet.events= EPOLLIN | EPOLLT | EPOOLLONESHOT;          
                 epoll_ctl( epollfd,EPOLL_CTL_ADD, connfd, &event );


              }
              else if (events[i].events & EPOLLIN) {


                  // 生成一个线程进行处理 EPOLLIN事                                
                  pthread_create(&thread, NULL, PthreadWorker, ..);


              }
                        
         }
      }


}


接下来从epoll的源码中分析EPOLLONESHOT事件处理逻辑。


socket套接字的epoll事件注册上去,然后进入epoll_wait等待事件的到来。

当该socket有数据到来后,数据经过网卡,驱动,内核协议栈,到达协议栈处理这些数据时,到达传输层后,进行唤醒用户进程(对于epoll来讲调用ep_poll_callback回调)

tcp_v4_rcv

-> tcp_v4_do_rcv

->tcp_rev_state_process

->tcp_data_queue

->sk->sk_data_read 也即是 sock_def_readable

->wake_up_interruptible(sk->sk_sleep)

->__wake_up()

->curr->func //也即是调用ep_insert 添加的ep_poll_callback回调函数。


在ep_poll_callback回调函数中会把触发事件的epitem 加到就绪队列中,然后唤醒阻塞在epoll_wait 上的用户进程

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
 613{
 614        int pwake = 0;
 615        unsigned long flags;
 616        struct epitem *epi = ep_item_from_wait(wait);
 617        struct eventpoll *ep = epi->ep;
 618
 619        DNPRINTK(3, (KERN_INFO "[%p] eventpoll: poll_callback(%p) epi=%p ep=%p\n",
 620                     current, epi->ffd.file, epi, ep));
 621
 622        spin_lock_irqsave(&ep->lock, flags);
 623
 624        /*
 625         * If the event mask does not contain any poll(2) event, we consider the
 626         * descriptor to be disabled. This condition is likely the effect of the
 627         * EPOLLONESHOT bit that disables the descriptor when an event is received,
 628         * until the next EPOLL_CTL_MOD will be issued.
 629         */
            /*
              #define EP_PRIVATE_BITS (EPOLLONESHOT | EPOLLET),
              也即是判断是否有非 EPOLLONESHOT 和 EPOLLET之外的事件,没有,则goto out_unlock
               */
 630        if (!(epi->event.events & ~EP_PRIVATE_BITS))
 631                goto out_unlock;
 632
 633        /*
 634         * If we are trasfering events to userspace, we can hold no locks
 635         * (because we're accessing user memory, and because of linux f_op->poll()
 636         * semantics). All the events that happens during that period of time are
 637         * chained in ep->ovflist and requeued later on.
 638         */
 639        if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
 640                if (epi->next == EP_UNACTIVE_PTR) {
 641                        epi->next = ep->ovflist;
 642                        ep->ovflist = epi;
 643                }
 644                goto out_unlock;
 645        }
 646
 647        /* If this file is already in the ready list we exit soon */
 648        if (ep_is_linked(&epi->rdllink))
 649                goto is_linked;
 650        //把触发的事件加到就绪队列中
 651        list_add_tail(&epi->rdllink, &ep->rdllist);
 652
 653is_linked:
 654        /*
 655         * Wake up ( if active ) both the eventpoll wait list and the ->poll()
 656         * wait list.
 657         */
             //唤醒阻塞在epoll_wait 上的用户进程
 658        if (waitqueue_active(&ep->wq))
 659                __wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE |
 660                                 TASK_INTERRUPTIBLE);
 661        if (waitqueue_active(&ep->poll_wait))
 662                pwake++;
 663
 664out_unlock:
 665        spin_unlock_irqrestore(&ep->lock, flags);
 666
 667        /* We have to call this outside the lock */
 668        if (pwake)
 669                ep_poll_safewake(&psw, &ep->poll_wait);
 670
 671        return 1;
 672}


阻塞在epoll_wait上的用户进程被唤醒,处理就绪队列上的event事件,把这些事件发送到用户态给用户进程

sys_epoll_wait

-->ep_poll

--->ep_send_events

在ep_send_events 中把事件发送给用户态后,把该事件

除了EPOLLONESHOT | EPOLLET 事件之外的所有事件都注销掉,所以 该事件由于没有了可触发的事件,所以该事件不会再触发了,直到用户进程把事件重新加上去。

static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events,
 873                          int maxevents)
 874{
 875        int eventcnt, error = -EFAULT, pwake = 0;
 876        unsigned int revents;
 877        unsigned long flags;
 878        struct epitem *epi, *nepi;
 879        struct list_head txlist;
 880
 881        INIT_LIST_HEAD(&txlist);
 882
 883        /*
 884         * We need to lock this because we could be hit by
 885         * eventpoll_release_file() and epoll_ctl(EPOLL_CTL_DEL).
 886         */
 887        mutex_lock(&ep->mtx);
 888
 889        /*
 890         * Steal the ready list, and re-init the original one to the
 891         * empty list. Also, set ep->ovflist to NULL so that events
 892         * happening while looping w/out locks, are not lost. We cannot
 893         * have the poll callback to queue directly on ep->rdllist,
 894         * because we are doing it in the loop below, in a lockless way.
 895         */
 896        spin_lock_irqsave(&ep->lock, flags);
 897        list_splice(&ep->rdllist, &txlist);
 898        INIT_LIST_HEAD(&ep->rdllist);
 899        ep->ovflist = NULL;
 900        spin_unlock_irqrestore(&ep->lock, flags);
 901
 902        /*
 903         * We can loop without lock because this is a task private list.
 904         * We just splice'd out the ep->rdllist in ep_collect_ready_items().
 905         * Items cannot vanish during the loop because we are holding "mtx".
 906         */
 907        for (eventcnt = 0; !list_empty(&txlist) && eventcnt < maxevents;) {
 908                epi = list_first_entry(&txlist, struct epitem, rdllink);
 909
 910                list_del_init(&epi->rdllink);
 911
 912                /*
 913                 * Get the ready file event set. We can safely use the file
 914                 * because we are holding the "mtx" and this will guarantee
 915                 * that both the file and the item will not vanish.
 916                 */
 917                revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);
 918                revents &= epi->event.events;
 919
 920                /*
 921                 * Is the event mask intersect the caller-requested one,
 922                 * deliver the event to userspace. Again, we are holding
 923                 * "mtx", so no operations coming from userspace can change
 924                 * the item.
 925                 */
                    /* 把触发的事件到用户的evnets 事件数组中。*/
 926                if (revents) {
 927                        if (__put_user(revents,
 928                                       &events[eventcnt].events) ||
 929                            __put_user(epi->event.data,
 930                                       &events[eventcnt].data))
 931                                goto errxit;
 
                             //若该事件注册了EPOLLONESHOT,则把除了EPOLLONESHOT | EPOLLET 事件之外的所有事件都注销掉
 932                        if (epi->event.events & EPOLLONESHOT)
 933                                epi->event.events &= EP_PRIVATE_BITS;
 934                        eventcnt++;
 935                }
 936                /*
 937                 * At this point, noone can insert into ep->rdllist besides
 938                 * us. The epoll_ctl() callers are locked out by us holding
 939                 * "mtx" and the poll callback will queue them in ep->ovflist.
 940                 */
 941                if (!(epi->event.events & EPOLLET) &&
 942                    (revents & epi->event.events))
 943                        list_add_tail(&epi->rdllink, &ep->rdllist);
 944        }
 945        error = 0;
 946
 947errxit:
 948
 949        spin_lock_irqsave(&ep->lock, flags);
 950        /*
 951         * During the time we spent in the loop above, some other events
 952         * might have been queued by the poll callback. We re-insert them
 953         * here (in case they are not already queued, or they're one-shot).
 954         */
 955        for (nepi = ep->ovflist; (epi = nepi) != NULL;
 956             nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
 957                if (!ep_is_linked(&epi->rdllink) &&
 958                    (epi->event.events & ~EP_PRIVATE_BITS))
 959                        list_add_tail(&epi->rdllink, &ep->rdllist);
 960        }
 961        /*
 962         * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
 963         * releasing the lock, events will be queued in the normal way inside
 964         * ep->rdllist.
 965         */
 966        ep->ovflist = EP_UNACTIVE_PTR;
 967
 968        /*
 969         * In case of error in the event-send loop, or in case the number of
 970         * ready events exceeds the userspace limit, we need to splice the
 971         * "txlist" back inside ep->rdllist.
 972         */
 973        list_splice(&txlist, &ep->rdllist);
 974
 975        if (!list_empty(&ep->rdllist)) {
 976                /*
 977                 * Wake up (if active) both the eventpoll wait list and the ->poll()
 978                 * wait list (delayed after we release the lock).
 979                 */
 980                if (waitqueue_active(&ep->wq))
 981                        __wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE |
 982                                         TASK_INTERRUPTIBLE);
 983                if (waitqueue_active(&ep->poll_wait))
 984                        pwake++;
 985        }
 986        spin_unlock_irqrestore(&ep->lock, flags);
 987
 988        mutex_unlock(&ep->mtx);
 989
 990        /* We have to call this outside the lock */
 991        if (pwake)
 992                ep_poll_safewake(&psw, &ep->poll_wait);
 993
 994        return eventcnt == 0 ? error: eventcnt;
 995}


对于边缘触发,就绪队列的事件发送给用户进程后还会加到就绪队列中,这样事件下次还会触发。但是由于发送给用户进程时,把事件注销掉了,所以下次触发时进入ep_send_events后,会进行和epi->event.events进行运算,而此时epi->event.events 由于上次注销了事件,所以此时revents为0,所以不会再发送给用户进程。

917                revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);
918                revents &= epi->event.events;  //此时  revents 为0
                   if (revents) {
 927                        if (__put_user(revents,
 928                                       &events[eventcnt].events) ||
 929                            __put_user(epi->event.data,
 930                                       &events[eventcnt].data))
 931                                goto errxit;
 
                             //若该事件注册了EPOLLONESHOT,则把除了EPOLLONESHOT | EPOLLET 事件之外的所有事件都注销掉
 932                        if (epi->event.events & EPOLLONESHOT)
 933                                epi->event.events &= EP_PRIVATE_BITS;
 934                        eventcnt++;
 935                }

相关推荐

4万多吨豪华游轮遇险 竟是因为这个原因……

(观察者网讯)4.7万吨豪华游轮搁浅,竟是因为油量太低?据观察者网此前报道,挪威游轮“维京天空”号上周六(23日)在挪威近海发生引擎故障搁浅。船上载有1300多人,其中28人受伤住院。经过数天的调...

“菜鸟黑客”必用兵器之“渗透测试篇二”

"菜鸟黑客"必用兵器之"渗透测试篇二"上篇文章主要针对伙伴们对"渗透测试"应该如何学习?"渗透测试"的基本流程?本篇文章继续上次的分享,接着介绍一下黑客们常用的渗透测试工具有哪些?以及用实验环境让大家...

科幻春晚丨《震动羽翼说“Hello”》两万年星间飞行,探测器对地球的最终告白

作者|藤井太洋译者|祝力新【编者按】2021年科幻春晚的最后一篇小说,来自大家喜爱的日本科幻作家藤井太洋。小说将视角放在一颗太空探测器上,延续了他一贯的浪漫风格。...

麦子陪你做作业(二):KEGG通路数据库的正确打开姿势

作者:麦子KEGG是通路数据库中最庞大的,涵盖基因组网络信息,主要注释基因的功能和调控关系。当我们选到了合适的候选分子,单变量研究也已做完,接着研究机制的时便可使用到它。你需要了解你的分子目前已有哪些...

知存科技王绍迪:突破存储墙瓶颈,详解存算一体架构优势

智东西(公众号:zhidxcom)编辑|韦世玮智东西6月5日消息,近日,在落幕不久的GTIC2021嵌入式AI创新峰会上,知存科技CEO王绍迪博士以《存算一体AI芯片:AIoT设备的算力新选择》...

每日新闻播报(September 14)_每日新闻播报英文

AnOscarstatuestandscoveredwithplasticduringpreparationsleadinguptothe87thAcademyAward...

香港新巴城巴开放实时到站数据 供科技界研发使用

中新网3月22日电据香港《明报》报道,香港特区政府致力推动智慧城市,鼓励公私营机构开放数据,以便科技界研发使用。香港运输署21日与新巴及城巴(两巴)公司签署谅解备忘录,两巴将于2019年第3季度,开...

5款不容错过的APP: Red Bull Alert,Flipagram,WifiMapper

本周有不少非常出色的app推出,鸵鸟电台做了一个小合集。亮相本周榜单的有WifiMapper's安卓版的app,其中包含了RedBull的一款新型闹钟,还有一款可爱的怪物主题益智游戏。一起来看看我...

Qt动画效果展示_qt显示图片

今天在这篇博文中,主要实践Qt动画,做一个实例来讲解Qt动画使用,其界面如下图所示(由于没有录制为gif动画图片,所以请各位下载查看效果):该程序使用应用程序单窗口,主窗口继承于QMainWindow...

如何从0到1设计实现一门自己的脚本语言

作者:dong...

三年级语文上册 仿写句子 需要的直接下载打印吧

描写秋天的好句好段1.秋天来了,山野变成了美丽的图画。苹果露出红红的脸庞,梨树挂起金黄的灯笼,高粱举起了燃烧的火把。大雁在天空一会儿写“人”字,一会儿写“一”字。2.花园里,菊花争奇斗艳,红的似火,粉...

C++|那些一看就很简洁、优雅、经典的小代码段

目录0等概率随机洗牌:1大小写转换2字符串复制...

二年级上册语文必考句子仿写,家长打印,孩子照着练

二年级上册语文必考句子仿写,家长打印,孩子照着练。具体如下:...

一年级语文上 句子专项练习(可打印)

...

亲自上阵!C++ 大佬深度“剧透”:C++26 将如何在代码生成上对抗 Rust?

...

取消回复欢迎 发表评论: