Netty的Pipeline之源码分析

ChannelPipeline

架构设计

Channel都有且仅有一个ChannelPipeline与之对应,Channel包含了ChannelPipeline,ChannelPipeline内部包含了N个handler,每一个handler都是由一个线程去执行;

ChannelPipeline内部维护了一个由ChannelHandlerContext组成的双向链表头为HeadContext,尾为TailHandler(双向链表为自己写的,而不是使用JDK的链表,为了轻量级),并且每个ChannelHandlerContext中又关联着一个ChannelHandler,下图提供参考:

传播inbound事件

在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到channelRead方法中了呢?

在业务代码中, 传递channelRead事件方式是通过fireChannelRead方法进行传播的

两种写法:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //写法1:
    ctx.fireChannelRead(msg);
    //写法2
    ctx.pipeline().fireChannelRead(msg);
}

这里重写了channelRead方法, 并且方法体内继续通过fireChannelRead方法进行传播channelRead事件, 那么这两种写法有什么异同?

我们先以 写法2 为例, 将这种写法进行剖析

这里首先获取当前context的pipeline对象, 然后通过pipeline对象调用自身的fireChannelRead方法进行传播, 因为默认创建的 DefaultChannelpipeline

我们跟到DefaultChannelpipeline的fireChannelRead方法中:

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

这里首先调用的是AbstractChannelHandlerContext类的静态方法invokeChannelRead, 参数传入head节点和事件的消息

我们跟进invokeChannelRead方法:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

这里的Object m 中msg通常就是我们传入的msg, 而next, 目前是head节点, 然后再判断是否为当前eventLoop线程, 如果不是则将方法包装成task交给eventLoop线程处理

我们跟到invokeChannelRead方法中:

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

首先通过invokeHandler()判断当前handler是否已添加, 如果添加, 则执行当前handler的chanelRead方法, 其实这里我们基本上就明白了, 通过fireChannelRead方法传递事件的过程中, 其实就是找到相关handler执行其channelRead方法, 由于我们在这里的handler就是head节点, 所以我们跟到HeadContext的channelRead方法中:

HeadContext的channelRead方法:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //向下传递channelRead事件
    ctx.fireChannelRead(msg);
}

在这里我们看到, 这里通过fireChannelRead方法继续往下传递channelRead事件, 而这种调用方式, 就是我们刚才分析用户代码的第一种调用方式,回头看一开始的channelRead:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //写法1:
    ctx.fireChannelRead(msg);
    //写法2
    ctx.pipeline().fireChannelRead(msg);
}

这里直接通过context对象调用fireChannelRead方法, 那么和使用pipeline调用有什么区别呢

我们回到HeadContext的channelRead方法, 我们来剖析ctx.fireChannelRead(msg)这句, 大家就会对这个问题有答案了, 跟到ctx的fireChannelRead方法中, 这里会走到AbstractChannelHandlerContext类中的fireChannelRead方法中

跟到AbstractChannelHandlerContext类中的fireChannelRead方法:

public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

这里我们看到, invokeChannelRead方法中传入了一个findContextInbound()参数, 而这findContextInbound方法其实就是找到当前Context的下一个节点

跟到findContextInbound方法:

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

这里的逻辑也比较简单, 是通过一个doWhile循环, 找到当前handlerContext的下一个节点, 这里要注意循环的终止条件, while (!ctx.inbound)表示下一个context标志的事件不是inbound的事件, 则循环继续往下找, 言外之意就是要找到下一个标注inbound事件的节点

有关事件的标注, 如果是用户定义的handler, 是通过handler继承的接口而定的, 如果tail或者head, 那么是在初始化的时候就已经定义好

回到AbstractChannelHandlerContext类的fireChannelRead方法中:

public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

找到下一个节点后, 继续调用invokeChannelRead方法, 传入下一个和消息对象:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    //第一次执行next其实就是head
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

这里的逻辑我们又不陌生了, 因为我们传入的是当前context的下一个节点, 所以这里会调用下一个节点invokeChannelRead方法, 因我们刚才剖析的是head节点, 所以下一个节点有可能是用户添加的handler的包装类HandlerConext的对象

这里我们跟进invokeChannelRead方法中去:

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try { 
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            //发生异常的时候在这里捕获异常
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

又是我们熟悉的逻辑, 调用了自身handler的channelRead方法, 如果是用户自定义的handler, 则会走到用户定义的channelRead()方法中去, 所以这里就解释了为什么通过传递channelRead事件, 最终会走到用户重写的channelRead方法中去

同样, 也解释了最初提到过的两种写法的区别:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //写法1:
    ctx.fireChannelRead(msg);
    //写法2
    ctx.pipeline().fireChannelRead(msg);
}
  • 写法1是通过当前节点往下传播事件

  • 写法2是通过head头节点往下传递事件

所以, 在handler中如果如果要在channelRead方法中传递channelRead事件, 一定要采用写法2的方式向下传递, 或者交给其父类处理, 如果采用1的写法则每次事件传输到这里都会继续从head节点传输, 从而陷入死循环或者发生异常。

这里有一点需要注意, 如果用户代码中channelRead方法, 如果没有显示的调用ctx.fireChannelRead(msg)那么事件则不会再往下传播, 则事件会在这里终止, 所以如果我们写业务代码的时候要考虑有关资源释放的相关操作。

如果ctx.fireChannelRead(msg)则事件会继续往下传播, 如果每一个handler都向下传播事件, 当然, 根据我们之前的分析channelRead事件只会在标识为inbound事件的HandlerConetext中传播, 传播到最后, 则最终会调用到tail节点的channelRead方法

我们跟到tailConext的channelRead方法中:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    onUnhandledInboundMessage(msg);
}

我们跟进到onUnhandledInboundMessage方法中:

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg);
    } finally {
        //释放资源
        ReferenceCountUtil.release(msg);
    }
}

这里做了释放资源的相关的操作

至此, channelRead事件传输相关罗辑剖析完整, 对于inbound事件的传输流程都会遵循这一逻辑。

pipeline的addLast方法

简单来说,pipeline.addLast()添加handler节点后,回调callHandlerAdded()方法,最终调用ChannelInitializer的initChannel方法。

细节部分转载于:https://blog.csdn.net/qq_16192007/article/details/92752048

跟踪到DefaultChannelPipeline # ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler)

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        //(1)判断handler是否被重复添加
        checkMultiplicity(handler);
        //(2)创建一个HandlerContext并添加到列表
        newCtx = newContext(group, filterName(name, handler), handler);

        //(3)添加HandlerContext
        addLast0(newCtx);

        //是否已注册
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            //回调用户事件
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    //(4)回调添加事件
    callHandlerAdded0(newCtx);
    return this;
}

重复添加验证

我们跟到checkMultiplicity(handler)中:

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler; 
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        //满足条件设置为true, 代表已添加
        h.added = true;
    }
}

首先判断是不是ChannelHandlerAdapter类型, 因为我们自定义的handler通常会直接或者间接的继承该接口, 所以这里为true
拿到handler之后转换成ChannelHandlerAdapter类型, 然后进行条件判断
if (!h.isSharable() && h.added)代表如果不是共享的handler, 并且是未添加状态, 则抛出异常:

public boolean isSharable() { 
    Class<?> clazz = getClass();
    Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
    Boolean sharable = cache.get(clazz);
    if (sharable == null) { 
        //如果这个类注解了Sharable.class, 说明这个类会被多个channel共享
        sharable = clazz.isAnnotationPresent(Sharable.class);
        cache.put(clazz, sharable);
    }
    return sharable;
}

首先拿到当前handler的class对象然后再从netty自定义的一个ThreadLocalMap对象中获取一个盛放handler的class对象的map, 并获取其value。如果value值为空, 则会判断是否被Sharable注解, 并将自身 handler class 对象和判断结果存入map对象中, 最后返回判断结果

这说明了被Sharable注解的handler是一个共享handler,从这个逻辑我们可以判断, 共享对象是可以重复添加的

创建HandlerContext并添加到列表

如果是共享对象或者没有被添加, 则将ChannelHandlerAdapter的added设置为true, 代表已添加。剖析完了重复添加验证, 回到addLast方法中, 我们看第二步, 创建一个HandlerContext并添加到列表:

newCtx = newContext(group, filterName(name, handler), handler);

首先看filterName(name, handler)方法, 这个方法是判断添加handler的name是否重复

private String filterName(String name, ChannelHandler handler) {
    if (name == null) {
        //没有名字创建默认名字
        return generateName(handler);
    }
    //检查名字是否重复
    checkDuplicateName(name);
    return name;
}

因为我们添加handler时候, 不一定会会给handler命名, 所以这一步name有可能是null, 如果是null, 则创建一个默认的名字,generateName(handler)类似Spring的beanFactory给每个bean起名字,为类名首字母小写,这里同样会起名字,名字我们看源码:

private String generateName(ChannelHandler handler) {
    Map<Class<?>, String> cache = nameCaches.get();//会有一个缓存
    Class<?> handlerType = handler.getClass();
    String name = cache.get(handlerType);
    if (name == null) {
        //return StringUtil.simpleClassName(handlerType) + "#0";
        //假设handler名字为PdcHandler,则name为PdcHandler#0
        name = generateName0(handlerType);
        cache.put(handlerType, name);
    }
    return name;
}

然后再检查名字是否重复checkDuplicateName(name)这个方法中:

private void checkDuplicateName(String name) {
    //不为空
    if (context0(name) != null) {
        throw new IllegalArgumentException("Duplicate handler name: " + name);
}

private AbstractChannelHandlerContext context0(String name) {
    //遍历pipeline
    AbstractChannelHandlerContext context = head.next;
    while (context != tail) {
        //发现name相同, 说明存在handler
        if (context.name().equals(name)) {
            //返回
            return context;
        }
        context = context.next;
    }
    return null;
}

这里做的操作非常简单, 就是将pipeline中, 从head节点往下遍历HandlerContext, 一直遍历到tail, 如果发现名字相同则会认为重复并返回HandlerContext对象。

我们回到addLast()方法中并继续看添加创建相关的逻辑:newCtx = newContext(group, filterName(name, handler), handler)
中 filterName(name, handler)这步如果并没有重复则会返回handler的name。

跟到newContext(group, filterName(name, handler), handler)方法中

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

这里我们看到创建了一个DefaultChannelHandlerContext对象, 构造方法的参数中, 第一个this代表当前的pipeline对象, group为null, 所以childExecutor(group)也会返回null, name为handler的名字, handler为新添加的handler对象。

继续跟到DefaultChannelHandlerContext的构造方法

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
    super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
}

我们看到首先调用了父类的构造方法, 之后将handler赋值为自身handler的成员变量, HandlerContext和handler关系在此也展现了出来, 是一种组合关系。

我们首先看父类的构造方法,有这么两个参数:isInbound(handler), isOutbound(handler),这两个参数意思是判断需要添加的handler是inboundHandler还是outBoundHandler

同样我们看isOutbound(handler)方法

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}

通过判断是否实现 ChannelOutboundHandler 接口,判断是否为 outboundhandler

跟到其父类AbstractChannelHandlerContext的构造方法中:

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, 
                              boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

一切都不陌生了, 因为tail节点和head节点创建的时候同样走到了这里。这里初始化了name, pipeline, 以及标识添加的handler是inboundhanlder还是outboundhandler

添加HandlerContext

回到最初的addLast()方法中:跟完了创建HandlerContext的相关逻辑, 我们继续跟第三步, 添加HandlerContext

private void addLast0(AbstractChannelHandlerContext newCtx) {
    //拿到tail节点的前置节点
    AbstractChannelHandlerContext prev = tail.prev;
    //当前节点的前置节点赋值为tail节点的前置节点
    newCtx.prev = prev;
    //当前节点的下一个节点赋值为tail节点
    newCtx.next = tail;
    //tail前置节点的下一个节点赋值为当前节点
    prev.next = newCtx;
    //tail节点的前一个节点赋值为当前节点
    tail.prev = newCtx;
}

之后会判断当前线程线程是否为eventLoop线程, 如果不是eventLoop线程, 就将添加回调事件封装成task交给eventLoop线程执行, 否则, 直接执行添加回调事件callHandlerAdded0(newCtx)

回调添加事件(callHandlerAdded0方法)

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    try {
        ctx.handler().handlerAdded(ctx);
        ctx.setAddComplete();
    } catch (Throwable t) {
        //忽略代码
    }
}

我们重点关注这句ctx.handler().handlerAdded(ctx)
其中ctx是我们新创建的HandlerContext, 通过handler()方法拿到绑定的handler, 也就是新添加的handler, 然后执行handlerAdded(ctx)方法, 如果我们没有重写这个方法, 则会执行父类的该方法

在ChannelHandlerAdapter类中定义了该方法的实现:

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}

我们看到没做任何操作, 也就是如果我们没有重写该方法时, 如果添加handler之后将不会做任何操作, 这里如果我们需要做一些业务逻辑, 可以通过重写该方法进行实现。
以上就是添加handler的有关的业务逻辑。

handler的删除

如果用户在业务逻辑中进行ctx.pipeline().remove(this)这样的写法, 或者ch.pipeline().remove(new SimpleHandler())这样的写法, 则就是对handler进行删除, 我们学习过添加handler的逻辑, 所以对handler删除操作理解起来也会比较容易

public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));
    return this;
}

方法体里有个remove()方法, 传入一个 getContextOrDie(handler)参数,这个 getContextOrDie(handler), 其实就是根据handler拿到其包装类HandlerContext对象

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
    AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
    //代码省略
}

这里仍然会通过context(handler)方法去寻找, 再跟进去

public final ChannelHandlerContext context(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    //从头遍历节点
    AbstractChannelHandlerContext ctx = head.next;
    for (;;) {
        if (ctx == null) {
            return null;
        }
        //找到handler
        if (ctx.handler() == handler) {
            return ctx;
        }
        ctx = ctx.next;
    }
}

这里我们看到寻找的方法也非常的简单, 就是从头结点开始遍历, 遍历到如果其包装的handler对象是传入的handler对象, 则返回找到的handlerContext

回到remove(handler)方法:

public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));
    return this;
}

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {

    //当前删除的节点不能是head, 也不能是tail
    assert ctx != head && ctx != tail;

    synchronized (this) {
        //执行删除操作
        remove0(ctx);
        if (!registered) {
            callHandlerCallbackLater(ctx, false);
            return ctx;
        }

        //回调删除handler事件
        EventExecutor executor = ctx.executor();
        if (!executor.inEventLoop()) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerRemoved0(ctx);
                }
            });
            return ctx;
        }
    }
    callHandlerRemoved0(ctx);
    return ctx;
}

首先要断言删除的节点不能是tail和head,然后通过remove0(ctx)进行实际的删除操作, 跟到remove0(ctx)中:

private static void remove0(AbstractChannelHandlerContext ctx) {
    //当前节点的前置节点
    AbstractChannelHandlerContext prev = ctx.prev;
    //当前节点的后置节点
    AbstractChannelHandlerContext next = ctx.next;
    //前置节点的下一个节点设置为后置节点
    prev.next = next;
    //后置节点的上一个节点设置为前置节点
    next.prev = prev;
}

这里的操作也非常简单, 做了一个指针移动的操作, 熟悉双向链表的小伙伴应该不会陌生。

回到remove(ctx)方法:我们继续往下看, 如果当前线程不是eventLoop线程则将回调删除事件封装成task放在taskQueue中让eventLoop线程进行执行,这跟inbound事件的invokeChannelRead方法一个处理方式, 否则, 则直接执行回调删除事件。

private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
    try {
        try {
            //调用handler的handlerRemoved方法
            ctx.handler().handlerRemoved(ctx);
        } finally {
            //将当前节点状态设置为已移除
            ctx.setRemoved();
        }
    } catch (Throwable t) {
        fireExceptionCaught(new ChannelPipelineException(
                ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
    }
}

与添加handler的逻辑一样, 这里会调用当前handler的handlerRemoved方法, 如果用户没有重写该方法, 则会调用其父类的方法, 方法体在ChannelHandlerAdapter类中有定义, 我们跟进去

public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

}

同添加handler一样, 也是一个空实现, 这里用户可以通过重写来添加自己需要的逻辑。

以上就是删除handler的相关操作。


   转载规则


《Netty的Pipeline之源码分析》 锦泉 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录