百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

浅谈Netty源码 netty源码详细解析

liebian365 2024-10-15 13:52 29 浏览 0 评论

前言

Netty框架的原理是Reactor模型中基于多个反应器的多线程模式,本篇文章主要介绍Netty较为重要的几个概念,编写思路借鉴了参考资料中的文章

ChannelFuture

我们先来了解了解Netty中几个较为重要的接口

public interface Future<V> extends java.util.concurrent.Future<V> {

    // I/O操作是否成功,成功返回true
    boolean isSuccess();
?
    // 是否可取消
    boolean isCancellable();
?
    // 抛出I/O操作失败原因
    Throwable cause();
?
    // 此处使用了观察者模式,该Future任务完成,这个Listener会立刻收到通知
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
?
    // 移除监听器
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
?
    // 等待Future任务完成,如果任务失败会抛出异常
    Future<V> sync() throws InterruptedException;
    Future<V> syncUninterruptibly();
?
    // 等待Future任务完成,如果任务失败不会抛出异常
    Future<V> await() throws InterruptedException;
    Future<V> awaitUninterruptibly();
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    boolean await(long timeoutMillis) throws InterruptedException;
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
    boolean awaitUninterruptibly(long timeoutMillis);
?
    // 立即获取Future任务结果,如果Future任务未完成会返回null,所以在使用这个方法之前最好先判断这个Future任务是否完成
    V getNow();
}
复制代码

Netty的Future接口继承了jdk1.5的Future接口,在本身已经有Future接口的情况下为什么要重复造轮子?

这是因为jdk1.5的Future接口不满足Netty的需求,jdk的Future接口可以获取异步计算的结果,并且提供了多种方法,可查看任务是否取消是否完成,但是使用者无法知道方法什么时候完成,比如某用户提交了一个Future任务,什么时候才能去调用get()方法获取结果,总不能循环调用isDone()方法吧,这样太消耗cpu资源。Netty的Future接口一定程度上弥补了这个缺陷,通过新增监听器,可以得知该任务是否完成以及任务完成后该做的事情,isSuccess可以得知任务是否成功,netty的Future接口更加智能

public interface ChannelFuture extends Future<Void> {
?
    // 返回ChannelFuture关联的Channel
    Channel channel();
?
    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
?
    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
?
    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
?
    @Override
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
?
    @Override
    ChannelFuture sync() throws InterruptedException;
?
    @Override
    ChannelFuture syncUninterruptibly();
?
    @Override
    ChannelFuture await() throws InterruptedException;
?
    @Override
    ChannelFuture awaitUninterruptibly();
}
复制代码

ChannelFuture继承了Netty的Future接口,由于Netty中所有的I/O操作都是异步了,所以当方法返回时,不代表I/O操作已经完成,所以ChannelFuture封装了异步I/O操作的结果,接口定义的方法与Netty的Future接口相似,并没有什么新鲜的,值得一提的是ChannelFuture关联了Channel

public interface GenericFutureListener<F extends Future<?>> extends EventListener {
?
    // 此operation关联的Future任务完成时,这个方法会被调用
    void operationComplete(F future) throws Exception;
}
复制代码

GenericFutureListener接口中定义了Listener的回调方法,当Future任务完成时,会回调此类中的方法

public interface Promise<V> extends Future<V> {
?
    // 标记该Future任务成功,并且会通知所有的Listener
    // 如果该操作失败,会抛出异常(失败是指该Future任务已经有了成功的结果或者失败的结果)
    Promise<V> setSuccess(V result);
?
    // 标记该Future任务成功,并且会通知所有的Listener
    // 操作失败不抛出异常,返回false
    boolean trySuccess(V result);
?
    // 标记该Future任务失败,并且会通知所有的Listener
    // 如果该操作失败,会抛出异常(失败是指该Future任务已经有了成功的结果或者失败的结果)
    Promise<V> setFailure(Throwable cause);
?
    // 标记该Future任务失败,并且会通知所有的Listener
    // 操作失败不抛出异常,返回false
    boolean tryFailure(Throwable cause);
?
    // 标记该Future任务不可取消
    boolean setUncancellable();
?
    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
?
    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
?
    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
?
    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
?
    @Override
    Promise<V> await() throws InterruptedException;
?
    @Override
    Promise<V> awaitUninterruptibly();
?
    @Override
    Promise<V> sync() throws InterruptedException;
?
    @Override
    Promise<V> syncUninterruptibly();
}
复制代码

Promise同样也继承了Netty的Future接口,由于Netty的Future接口中没有写操作相关的接口,所以Netty通过Promise进行扩展,用于设置I/O操作的结果,接口中的setSuccess()、setFailure()方法会在任务完成后调用,然后回调Listener中的方法,经过这些操作后,await() 或 sync() 的线程就会从等待中返回。

public interface ChannelPromise extends ChannelFuture, Promise<Void> {
?
    @Override
    Channel channel();
?
    @Override
    ChannelPromise setSuccess(Void result);
?
    ChannelPromise setSuccess();
?
    boolean trySuccess();
?
    @Override
    ChannelPromise setFailure(Throwable cause);
?
    @Override
    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
?
    @Override
    ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
?
    @Override
    ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
?
    @Override
    ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
?
    @Override
    ChannelPromise sync() throws InterruptedException;
?
    @Override
    ChannelPromise syncUninterruptibly();
?
    @Override
    ChannelPromise await() throws InterruptedException;
?
    @Override
    ChannelPromise awaitUninterruptibly();
?
    ChannelPromise unvoid();
}
?
复制代码

ChannelPromise接口同时继承了ChannelFuture, Promise,拥有双方的特性,接口中的方法同样跟之前的接口非常相似,只是返回值变成了ChannelPromise

看完以上的接口后,我们来看看Netty中对于这些接口的实现

观察这张类图,可以发现,DefaultPromise实现了Promise,DefaultChannelPromise实现了ChannelPromise并且继承了DefaultPromise,DefaultPromise由于没有实现ChannelFuture,所以没有ChannelFuture相关的特性,所以要看Netty中关于以上接口的实现,应该去看DefaultChannelPromise这个类

public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
?
    private final Channel channel;
    private long checkpoint;
?
    /**
     * Creates a new instance.
     *
     * @param channel
     *        the {@link Channel} associated with this future
     */
    public DefaultChannelPromise(Channel channel, EventExecutor executor) {
        super(executor);
        this.channel = checkNotNull(channel, "channel");
    }
?
?
    @Override
    public ChannelPromise setSuccess(Void result) {
        super.setSuccess(result);
        return this;
    }
?
    @Override
    public ChannelPromise setFailure(Throwable cause) {
        super.setFailure(cause);
        return this;
    }
?
    @Override
    public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
        super.addListener(listener);
        return this;
    }
}
复制代码

观察DefaultChannelPromise的代码,可以发现,很多方法都调用了DefaultPromise父类中的方法,所以我们转移一下战场,去看DefaultPromise的代码

@Override
public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {
      return this;
    }
    throw new IllegalStateException("complete already: " + this);
}
?
@Override
public boolean trySuccess(V result) {
    return setSuccess0(result);
}
?
private boolean setSuccess0(V result) {
    return setValue0(result == null ? SUCCESS : result);
}
?
private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
      if (checkNotifyWaiters()) {
          // 唤醒Listeners
          notifyListeners();
      }
      return true;
    }
    return false;
}
复制代码

可以看到setSuccess的步骤就是设置值,然后唤醒所有的Listeners,如果这个操作失败,会抛出异常,trySuccess也是同样的步骤,但是不会抛出异常

ChannelPipeline

ChannelPipeline本身是一个与Channel关联的容器对象,这个容器中存放了多个ChannelHandlerContext,ChannelHandlerContext中存放的是我们编写的ChannelHandler对象,多个ChannelHandlerContext使用链表串联,I/O事件按照顺序经过ChannelPipeline中的一个个ChannelHandler

如上图

Netty中的事件分为Inbound事件和Outbound事件,Inbound事件通常由I/O线程触发,例如TCP链路建立事件、读事件,Outbound事件通常是用户主动发起的网络I/O事件,例如连接事件、读事件

p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
复制代码

以上是一段添加ChannelHandler对象的代码,以Inbound开头的类意味着它是一个入站Handler,以Outbound开头的类表示它是一个出站Handler,我们猜测一下这些ChannelHandler的执行顺序。

3、4没有实现ChannelnboundHandler,1、2没有实现ChannelOutboundHandler,5既实现了ChannelnboundHandler又实现了ChannelOutboundHandler,按照先执行Inbound事件,再执行Outbound事件的规则的话,执行顺序应该是1->2->5->3->4->5。

实际上不是的,Inbound事件的执行顺序是从前往后,Outbound事件的执行顺序是从后往前,所以执行顺序是1->2->5->5->4->3

ChannelPipeline的创建时机

前面讲过ChannelPipeline和Channel是一一搭配的,所以Channel创建的时候ChannelPipeline也会随之创建

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    // 此处会调用下面的方法
    pipeline = newChannelPipeline();
}
?
protected DefaultChannelPipeline newChannelPipeline() {
    // 创建DefaultChannelPipeline
    return new DefaultChannelPipeline(this);
}
复制代码
// 此方法就是创建两个链表节点,并且让头节点和尾节点双向连接
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    // 观察TailContext,可以发现TailContext继承了AbstractChannelHandlerContext
    // 说明ChannelPipeline中的节点是ChannelHandlerContext,不是ChannelHandler
    tail = new TailContext(this);
    head = new HeadContext(this);
?
    head.next = tail;
    tail.prev = head;
}
复制代码

NioEventLoopGroup

可以看到NioEventLoopGroup最顶层继承的接口是Executor,说明NioEventLoopGroup就是一个线程池,NioEventLoop是其创建出来的一个线程

public NioEventLoopGroup() {
    this(0);
}
?
public NioEventLoopGroup(int nThreads) {
    // 注意,这里executor赋值为null
    this(nThreads, (Executor) null);
}
?
public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}
?
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
?
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
?
// 看到这里,我们可以知道Netty默认的线程数是2 * CPU 个
private static final int DEFAULT_EVENT_LOOP_THREADS;
?
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}
?
?
?
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
复制代码

构造器的代码一路追,终于找到干正事的方法

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
    checkPositive(nThreads, "nThreads");
    // 因为构造器中赋值为null,所以此处executor为ThreadPerTaskExecutor()
    if (executor == null) {
      executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
?
    children = new EventExecutor[nThreads];
?
    for (int i = 0; i < nThreads; i ++) {
      boolean success = false;
      try {
        // 这里很关键,下面细说
        children[i] = newChild(executor, args);
        success = true;
      } catch (Exception e) {
        // TODO: Think about if this is a good exception type
        throw new IllegalStateException("failed to create a child event loop", e);
      } finally {
        // 新建线程失败,将线程优雅关闭
        if (!success) {
          for (int j = 0; j < i; j ++) {
            children[j].shutdownGracefully();
          }
?
          for (int j = 0; j < i; j ++) {
            EventExecutor e = children[j];
            try {
              while (!e.isTerminated()) {
                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
              }
            } catch (InterruptedException interrupted) {
              // Let the caller handle the interruption.
              Thread.currentThread().interrupt();
              break;
            }
          }
        }
      }
    }
    // 选择合适的轮训机制
    chooser = chooserFactory.newChooser(children);
?
    // 新建一个监听器,用于监听是否所有线程都terminated了
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
      @Override
      public void operationComplete(Future<Object> future) throws Exception {
        if (terminatedChildren.incrementAndGet() == children.length) {
          terminationFuture.setSuccess(null);
        }
      }
    };
    // 给所有的EventExecutor都设置上这个监听器
    for (EventExecutor e: children) {
      e.terminationFuture().addListener(terminationListener);
    }
?
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
复制代码
// executors的数量是2的幂次方和非2的幂次方使用不同的轮训方式
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
    // 判断executors是否是2的幂次方
    if (isPowerOfTwo(executors.length)) {
      return new PowerOfTwoEventExecutorChooser(executors);
    } else {
      return new GenericEventExecutorChooser(executors);
    }
}
复制代码

上面说到,NioEventLoopGroup是一个线程池,NioEventLoop是其创建出来的一个个线程,上面的newChild()方法便是创建一个个NioEventLoop,我们来看看NioEventLoop的构造方法

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
    super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
          rejectedExecutionHandler);
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    final SelectorTuple selectorTuple = openSelector();
    // 这里可以看到selector跟NioEventLoop进行了绑定
    this.selector = selectorTuple.selector;
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
?
?
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                        RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
    this.executor = ThreadExecutorMap.apply(executor, this);
    // 这里是一个关键点,对一个任务队列进行了赋值,这里的任务队列有什么用呢?
    // NioEventLoop一部分时间会执行I/O任务,一部分时间执行非I/O任务,在执行I/O任务时,如果有任务过来,会先把任务放
    // 到任务队列中
    this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
    this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
复制代码

到这里为止,NioEventLoop便跟selector关联起来了


作者:Xiao镔
链接:https://juejin.cn/post/7007049829734383624
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

相关推荐

“版本末期”了?下周平衡补丁!国服最强5套牌!上分首选

明天,酒馆战棋就将迎来大更新,也聊了很多天战棋相关的内容了,趁此机会,给兄弟们穿插一篇构筑模式的卡组推荐!老规矩,我们先来看10职业胜率。目前10职业胜率排名与一周前基本类似,没有太多的变化。平衡补丁...

VS2017 C++ 程序报错“error C2065:“M_PI”: 未声明的标识符&quot;

首先,程序中头文件的选择,要选择头文件,在文件中是没有对M_PI的定义的。选择:项目——>”XXX属性"——>配置属性——>C/C++——>预处理器——>预处理器定义,...

东营交警实名曝光一批酒驾人员名单 88人受处罚

齐鲁网·闪电新闻5月24日讯酒后驾驶是对自己和他人生命安全极不负责的行为,为守护大家的平安出行路,东营交警一直将酒驾作为重点打击对象。5月23日,东营交警公布最新一批饮酒、醉酒名单。对以下驾驶人醉酒...

Qt界面——搭配QCustomPlot(qt platform)

这是我第一个使用QCustomPlot控件的上位机,通过串口精确的5ms发送一次数据,再将读取的数据绘制到图表中。界面方面,尝试卡片式设计,外加QSS简单的配了个色。QCustomPlot官网:Qt...

大话西游2分享赢取种族坐骑手办!PK趣闻录由你书写

老友相聚,仗剑江湖!《大话西游2》2021全民PK季4月激燃打响,各PK玩法鏖战齐开,零门槛参与热情高涨。PK季期间,不仅各种玩法奖励丰厚,参与PK趣闻录活动,投稿自己在PK季遇到的趣事,还有机会带走...

测试谷歌VS Code AI 编程插件 Gemini Code Assist

用ClaudeSonnet3.7的天气测试编码,让谷歌VSCodeAI编程插件GeminiCodeAssist自动编程。生成的文件在浏览器中的效果如下:(附源代码)VSCode...

顾爷想知道第4.5期 国服便利性到底需优化啥?

前段时间DNF国服推出了名为“阿拉德B计划”的系列改版计划,截至目前我们已经看到了两项实装。不过关于便利性上,国服似乎还有很多路要走。自从顾爷回归DNF以来,几乎每天都在跟我抱怨关于DNF里面各种各样...

掌握Visual Studio项目配置【基础篇】

1.前言VisualStudio是Windows上最常用的C++集成开发环境之一,简称VS。VS功能十分强大,对应的,其配置系统较为复杂。不管是对于初学者还是有一定开发经验的开发者来说,捋清楚VS...

还嫌LED驱动设计套路深?那就来看看这篇文章吧

随着LED在各个领域的不同应用需求,LED驱动电路也在不断进步和发展。本文从LED的特性入手,推导出适合LED的电源驱动类型,再进一步介绍各类LED驱动设计。设计必读:LED四个关键特性特性一:非线...

Visual Studio Community 2022(VS2022)安装图文方法

直接上步骤:1,首先可以下载安装一个VisualStudio安装器,叫做VisualStudioinstaller。这个安装文件很小,很快就安装完成了。2,打开VisualStudioins...

Qt添加MSVC构建套件的方法(qt添加c++11)

前言有些时候,在Windows下因为某些需求需要使用MSVC编译器对程序进行编译,假设我们安装Qt的时候又只是安装了MingW构建套件,那么此时我们该如何给现有的Qt添加一个MSVC构建套件呢?本文以...

Qt为什么站稳c++GUI的top1(qt c)

为什么现在QT越来越成为c++界面编程的第一选择,从事QT编程多年,在这之前做C++界面都是基于MFC。当时为什么会从MFC转到QT?主要原因是MFC开发界面想做得好看一些十分困难,引用第三方基于MF...

qt开发IDE应该选择VS还是qt creator

如果一个公司选择了qt来开发自己的产品,在面临IDE的选择时会出现vs或者qtcreator,选择qt的IDE需要结合产品需求、部署平台、项目定位、程序猿本身和公司战略,因为大的软件产品需要明确IDE...

Qt 5.14.2超详细安装教程,不会来打我

Qt简介Qt(官方发音[kju:t],音同cute)是一个跨平台的C++开库,主要用来开发图形用户界面(GraphicalUserInterface,GUI)程序。Qt是纯C++开...

Cygwin配置与使用(四)——VI字体和颜色的配置

简介:VI的操作模式,基本上VI可以分为三种状态,分别是命令模式(commandmode)、插入模式(Insertmode)和底行模式(lastlinemode),各模式的功能区分如下:1)...

取消回复欢迎 发表评论: