Reactor-Netty系列1-TcpClient源碼分析-從示例程序開始

一、示例程序:

Reactor-Netty 版本:java

<dependency>
    <groupId>io.projectreactor.netty</groupId>
    <artifactId>reactor-netty</artifactId>
    <version>0.8.10.RELEASE</version>
</dependency>
複製代碼

示例程序:react

public class TcpServerApplication {
    public static void main(String[] args) {
        DisposableServer server = TcpServer
                .create()
                .host("127.0.0.1")
                .port(8080)
                .handle((inbound, outbound) ->
                        inbound.receive().asString().log().then()
                )
                .bindNow();
        
        server.onDispose()
                .block();
    }
}

public class TcpClientApplication {
    public static void main(String[] args) throws InterruptedException {
        TcpClient client = TcpClient.create()		// 1 TcpClientConnect
                .host("127.0.0.1")			// 2 TcpClientBootstrap
                .port(8080)			// 3 TcpClientBootstrap
                .handle((inbound, outbound) -> outbound.sendString(Mono.just("Hello World!")).then());				// 4 TcpClientDoOn
        client.connectNow();		// 5 Connection
        Thread.sleep(3000);
    }
}
複製代碼

TcpServerApplication 輸出結果:bootstrap

[ INFO] (reactor-tcp-nio-2) onSubscribe(FluxHandle.HandleSubscriber)
[ INFO] (reactor-tcp-nio-2) request(unbounded)
[ INFO] (reactor-tcp-nio-2) onNext(Hello World!)
[ INFO] (reactor-tcp-nio-2) cancel()
複製代碼

基本邏輯是:Server 端綁定 8080 端口並監聽請求;Client 端鏈接上端口後發送字符串 Hello World!;Server 端口收到請求後打印出來。app

下面進行具體源碼分析。tcp

二、TcpClient

TcpClient.create()

public static TcpClient create() {
   return create(TcpResources.get());
}

/** * 最終返回的是 TcpClientConnect * 從入參可知,TcpClientConnect 關注的是鏈接管理 ConnectionProvider */
public static TcpClient create(ConnectionProvider provider) {
	 return new TcpClientConnect(provider);
}

public class TcpResources implements ConnectionProvider, LoopResources {
  final ConnectionProvider defaultProvider;
	final LoopResources      defaultLoops;

	protected TcpResources(LoopResources defaultLoops, ConnectionProvider defaultProvider) {
		this.defaultLoops = defaultLoops;
		this.defaultProvider = defaultProvider;
	}

  /** * 該靜態方法最終返回的是 TcpResources,包括: * ConnectionProvider: 管理鏈接 * LoopResources: 管理線程 */
	public static TcpResources get() {
    // 若是不存在,那麼建立 TcpResources;不然,直接返回 TcpResources
		return getOrCreate(tcpResources, null, null, ON_TCP_NEW,  "tcp");
	}
複製代碼

host()

/** * 1. 最終返回的是 TcpClientBootstrap * 2. TcpClientBootstrap 類有一個 bootstrapMapper, 是一個 Function: b -> TcpUtils.updateHost(b, host),關注兩個地方:b 是一個 Bootstrap 對象,b 什麼時候生成?Function 接口的 apply 方法何時被執行?能夠看到 TcpClientBootstrap 類的 configure() 方法同時知足了上面 2 個地方,所以只須要關注該方法什麼時候被調用便可。 */
public final TcpClient host(String host) {
		Objects.requireNonNull(host, "host");
		return bootstrap(b -> TcpUtils.updateHost(b, host));
}

public final TcpClient bootstrap(Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper) {
		return new TcpClientBootstrap(this, bootstrapMapper);
}

final class TcpClientBootstrap extends TcpClientOperator {

	final Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper;

	TcpClientBootstrap(TcpClient client,
			Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper) {
		super(client);
		this.bootstrapMapper = Objects.requireNonNull(bootstrapMapper, "bootstrapMapper");
	}

	@Override
	public Bootstrap configure() {
		return Objects.requireNonNull(bootstrapMapper.apply(source.configure()), "bootstrapMapper");
	}
}
複製代碼

port()

/** * 和 host(String host) 方法相似 */
public final TcpClient port(int port) {
		return bootstrap(b -> TcpUtils.updatePort(b, port));
}
複製代碼

handler()

/** * 最終返回的是 TcpClientDoOn; * handler 的入參是 BiFunction,而且在 doOnConnected 方法中直接調用了 apply 方法; * BiFunction 返回的 Publisher 也直接調用了 subscribe 方法; * 所以,只須要關注 doOnConnected 方法的入參 Consumer 什麼時候被調用便可 */
public final TcpClient handle(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler) {
		Objects.requireNonNull(handler, "handler");
		return doOnConnected(c -> {
			if (log.isDebugEnabled()) {
				log.debug(format(c.channel(), "Handler is being applied: {}"), handler);
			}

			Mono.fromDirect(handler.apply((NettyInbound) c, (NettyOutbound) c))
			    .subscribe(c.disposeSubscriber());
		});
}

public final TcpClient doOnConnected(Consumer<? super Connection> doOnConnected) {
		Objects.requireNonNull(doOnConnected, "doOnConnected");
		return new TcpClientDoOn(this, null, doOnConnected, null);
}

final class TcpClientDoOn extends TcpClientOperator implements ConnectionObserver {

	final Consumer<? super Bootstrap>  onConnect;
        // onConnected 即 handle 方法中調用的 doOnConnected 的 Consumer
	final Consumer<? super Connection> onConnected;
	final Consumer<? super Connection> onDisconnected;

	TcpClientDoOn(TcpClient client,
			@Nullable Consumer<? super Bootstrap> onConnect,
			@Nullable Consumer<? super Connection> onConnected,
			@Nullable Consumer<? super Connection> onDisconnected) {
                // 繼承上一個 TcpClient
		super(client);
		this.onConnect = onConnect;
		this.onConnected = onConnected;
		this.onDisconnected = onDisconnected;
	}

	@Override
	public Bootstrap configure() {
		Bootstrap b = source.configure();
		ConnectionObserver observer = BootstrapHandlers.connectionObserver(b);
                // 注意:這裏設置了 ConnectionObserver,後面會講到
		BootstrapHandlers.connectionObserver(b, observer.then(this));	
		return b;
	}

	@Override
	public Mono<? extends Connection> connect(Bootstrap b) {
		if (onConnect != null) {
			return source.connect(b)
			             .doOnSubscribe(s -> onConnect.accept(b));
		}
		return source.connect(b);
	}

	@Override
	public void onStateChange(Connection connection, State newState) {
                // onConnected 在這裏被調用,即 connection 狀態改變時
		if (onConnected != null && newState == State.CONFIGURED) {
			onConnected.accept(connection);
			return;
		}
		if (onDisconnected != null) {
			if (newState == State.DISCONNECTING) {
				connection.onDispose(() -> onDisconnected.accept(connection));
			}
			else if (newState == State.RELEASED) {
				onDisconnected.accept(connection);
			}
		}
	}
}
複製代碼

connectNow()

// 設置超時 45s
public final Connection connectNow() {
		return connectNow(Duration.ofSeconds(45));
}

public final Connection connectNow(Duration timeout) {
    Objects.requireNonNull(timeout, "timeout");
    try {
      	// 這裏 connect() 方法返回的是 Mono
        return Objects.requireNonNull(connect().block(timeout), "aborted");
    }
    catch (IllegalStateException e) {
        ...
    }
}

// 返回的是 Mono
public final Mono<? extends Connection> connect() {
    ...
    return connect(b);
}

// block 方法中直接開始訂閱
public T block(Duration timeout) {
    BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>();
    onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber));
    return subscriber.blockingGet(timeout.toMillis(), TimeUnit.MILLISECONDS);
}

final T blockingGet(long timeout, TimeUnit unit) {
    ...
    if (getCount() != 0) {
        try {
            if (!await(timeout, unit)) {
                dispose();	// 超時取消訂閱
                throw new IllegalStateException("Timeout on blocking read for " + timeout + " " + unit);
            }
        }
        catch (InterruptedException ex) {
            dispose();
            RuntimeException re = Exceptions.propagate(ex);
            //this is ok, as re is always a new non-singleton instance
            re.addSuppressed(new Exception("#block has been interrupted"));
            throw re;
        }
    }
    ...
}
複製代碼

由以上分析可知,在最後的 connectNow() 方法中,纔開始真正的訂閱執行。下面繼續分析 connect 方法。ide

connect()

public final Mono<? extends Connection> connect() {
    Bootstrap b;
    try {
      	// 1. 獲取默認的 Bootstrap
        b = configure();
    }
    catch (Throwable t) {
        Exceptions.throwIfJvmFatal(t);
        return Mono.error(t);
    }
    // 2. connect(b)
    return connect(b);
}

public Bootstrap configure() {
    return DEFAULT_BOOTSTRAP.clone();
}

static final Bootstrap DEFAULT_BOOTSTRAP =
    new Bootstrap().option(ChannelOption.AUTO_READ, false)             .remoteAddress(InetSocketAddressUtil.createUnresolved(NetUtil.LOCALHOST.getHostAddress(), DEFAULT_PORT));
複製代碼

繼續看 connect(Bootstrap b) 方法:函數

// 這是一個抽象方法,不少繼承類都實現了該方法。根據以前的代碼分析,首先調用的應該是 TcpClientDoOn 類
public abstract Mono<? extends Connection> connect(Bootstrap b);

// TcpClientDoOn 類
public Mono<? extends Connection> connect(Bootstrap b) {
    if (onConnect != null) {
        return source.connect(b)
                        .doOnSubscribe(s -> onConnect.accept(b));
    }
    // 往上傳遞,source 表明上一個 TcpClient;最終傳遞到初始的 TcpClientConnect
    return source.connect(b);
}

// TcpClientConnect 類
final ConnectionProvider provider;
public Mono<? extends Connection> connect(Bootstrap b) {
    // 填充 b 的屬性
    if (b.config()
            .group() == null) {
        TcpClientRunOn.configure(b,
                LoopResources.DEFAULT_NATIVE,
                TcpResources.get(),
                maxConnections != -1);
    }
    // 最終調用這個方法
    return provider.acquire(b);
}
複製代碼

ConnectionProvider

上面講到 connect 方法最終調用的是 ConnectionProvider 類中的方法。ConnectionProvider 在以前的分析中出現過,即TcpResources.get() 方法返回的 TcpResources 對象中包含這個屬性。oop

// 建立默認的 TcpResources
static <T extends TcpResources> T create(@Nullable T previous, @Nullable LoopResources loops, @Nullable ConnectionProvider provider, String name, BiFunction<LoopResources, ConnectionProvider, T> onNew) {
		if (previous == null) {
			loops = loops == null ? LoopResources.create("reactor-" + name) : loops;
                        // 建立 ConnectionProvider
			provider = provider == null ? ConnectionProvider.elastic(name) : provider;
		}
		else {
			loops = loops == null ? previous.defaultLoops : loops;
			provider = provider == null ? previous.defaultProvider : provider;
		}
		return onNew.apply(loops, provider);
	}
}

static ConnectionProvider elastic(String name) {
    // 這裏的第 2 個入參 PoolFactory 又是一個函數式接口,所以對象的生成時間點在於什麼時候調用 PoolFactory.newPool 方法; 生成的 ChannelPool 類型爲 SimpleChannelPool。
		return new PooledConnectionProvider(name,
				(bootstrap, handler, checker) -> new SimpleChannelPool(bootstrap,
						handler,
						checker,
						true,
						false));
}

final class PooledConnectionProvider implements ConnectionProvider {

	interface PoolFactory {

		ChannelPool newPool(Bootstrap b, ChannelPoolHandler handler, ChannelHealthChecker checker);
	}

	final ConcurrentMap<PoolKey, Pool> channelPools;
	final String                       name;
	final PoolFactory                  poolFactory;
	final int                          maxConnections;

	PooledConnectionProvider(String name, PoolFactory poolFactory) {
		this.name = name;
		this.poolFactory = poolFactory;
		this.channelPools = PlatformDependent.newConcurrentHashMap();
		this.maxConnections = -1;
	}
    ...
}
複製代碼

如今回到 provider.acquire(b) 方法,能夠知道調用的是 PooledConnectionProvider 類中的方法,繼續分析:源碼分析

// Map 結構,每一個 (remote address, handler) 組合都有一個鏈接池
final ConcurrentMap<PoolKey, Pool> channelPools;
final String                       name;
// 經過 poolFactory 生成 ChannelPool
final PoolFactory                  poolFactory;
final int                          maxConnections;

/** * 主要做用是從鏈接池中獲取鏈接 * 首先須要找到對應的鏈接池, 經過 channelPools.get(holder) * 若是不存在,那麼建立新的鏈接池,並加入到 channelPools 中 * 最後調用 disposableAcquire(sink, obs, pool, false); */
public Mono<Connection> acquire(Bootstrap b) {
    return Mono.create(sink -> {
        Bootstrap bootstrap = b.clone();
	// TODO:
        ChannelOperations.OnSetup opsFactory =
                BootstrapHandlers.channelOperationFactory(bootstrap);
	// TODO: 鏈接生命週期的監聽器
        ConnectionObserver obs = BootstrapHandlers.connectionObserver(bootstrap);
      	// 懶加載,這裏須要設置 bootstrap 的 remote address(ip:port)
        NewConnectionProvider.convertLazyRemoteAddress(bootstrap);
      	// 每一個 (remote address, handler) 都有一個 Pool
        ChannelHandler handler = bootstrap.config().handler();
        PoolKey holder = new PoolKey(bootstrap.config().remoteAddress(),
                handler != null ? handler.hashCode() : -1);

        Pool pool;
        for (; ; ) {
            // 直接獲取
            pool = channelPools.get(holder);
            if (pool != null) {
                break;
            }
            // 不存在則建立新的鏈接池
            pool = new Pool(bootstrap, poolFactory, opsFactory);
            if (channelPools.putIfAbsent(holder, pool) == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Creating new client pool [{}] for {}",
                            name,
                            bootstrap.config()
                                    .remoteAddress());
                }
                break;
            }
            // 關閉多建立的 pool
            pool.close();
        }
        disposableAcquire(sink, obs, pool, false);
    });
}

Pool(Bootstrap bootstrap,
				PoolFactory provider,
				ChannelOperations.OnSetup opsFactory) {
			this.bootstrap = bootstrap;
			this.opsFactory = opsFactory;
  		        // 建立新的鏈接池
			this.pool = provider.newPool(bootstrap, this, this);
			this.defaultGroup = bootstrap.config()
			                             .group();
			HEALTHY = defaultGroup.next()
			                      .newSucceededFuture(true);
			UNHEALTHY = defaultGroup.next()
			                        .newSucceededFuture(false);
}
複製代碼

繼續 disposableAcquire 方法,ui

static void disposableAcquire(MonoSink<Connection> sink, ConnectionObserver obs, Pool pool, boolean retried) {
  	        // 獲取 Channel
		Future<Channel> f = pool.acquire();
		DisposableAcquire disposableAcquire =
				new DisposableAcquire(sink, f, pool, obs, retried);
  	        // 設置監聽器, 該方法最終會調用 disposableAcquire.operationComplete() 方法,operationComplete() 方法會調用 disposableAcquire.run()
		f.addListener(disposableAcquire);
		sink.onCancel(disposableAcquire);
	}

final static class DisposableAcquire implements Disposable, GenericFutureListener<Future<Channel>>, ConnectionObserver , Runnable {

    final Future<Channel>      f;
    final MonoSink<Connection> sink;
    final Pool                 pool;
    final ConnectionObserver   obs;
    final boolean              retried;

    DisposableAcquire(MonoSink<Connection> sink,
            Future<Channel> future,
            Pool pool,
            ConnectionObserver obs,
            boolean retried) {
        this.f = future;
        this.pool = pool;
        this.sink = sink;
        this.obs = obs;
        this.retried = retried;
    }

    // 當鏈接的狀態改變時,調用 obs.onStateChange;而這裏的 obs 就是咱們在 TcpClientDoOn.configure() 方法中設置的;因此一旦鏈接狀態改變,就會調用 TcpClient.handle 中的方法
    @Override
    public void onStateChange(Connection connection, State newState) {
        if (newState == State.CONFIGURED) {
            sink.success(connection);
        }
        obs.onStateChange(connection, newState);
    }
    ...
}
複製代碼

DisposableAcquire 是一個監聽器,監聽的是鏈接,即上面代碼中的 Future f = pool.acquire()。那麼這個 f 是什麼類型呢?以前的代碼分析中已經知道 pool 爲 SimpleChannelPool 類型。

public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck, boolean releaseHealthCheck, boolean lastRecentUsed) {
        this.handler = checkNotNull(handler, "handler");
        this.healthCheck = checkNotNull(healthCheck, "healthCheck");
        this.releaseHealthCheck = releaseHealthCheck;
        // Clone the original Bootstrap as we want to set our own handler
        this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
        this.bootstrap.handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                assert ch.eventLoop().inEventLoop();
              	// 當新建鏈接時,會調用該方法
                handler.channelCreated(ch);
            }
        });
        this.lastRecentUsed = lastRecentUsed;
    }
}

public void channelCreated(Channel ch) {
			inactiveConnections.incrementAndGet();
			...
                        // 這裏把 ch 包裝成了一下, PooledConnection 這個類同時實現了 Connection 以及 ConnectionObserver 接口,也就是說既是一個 channel,又是一個 listener。後續若是 channel 的狀態發生改變,會調用 PooledConnection 的 onStateChange 方法。
			PooledConnection pooledConnection = new PooledConnection(ch, this);
			pooledConnection.bind();
			Bootstrap bootstrap = this.bootstrap.clone();
			BootstrapHandlers.finalizeHandler(bootstrap, opsFactory, pooledConnection);
			ch.pipeline()
			  .addFirst(bootstrap.config()
			                     .handler());
}
複製代碼

下面繼續看 PooledConnection 的 onStateChange 方法。

public void onStateChange(Connection connection, State newState) {
		if (newState == State.DISCONNECTING) {
        ...
			}
  	        // 其餘狀態走這裏
		owner().onStateChange(connection, newState);
}

ConnectionObserver owner() {
			ConnectionObserver obs;
			for (;;) {
				obs = channel.attr(OWNER)
				             .get();
				if (obs == null) {
					obs = new PendingConnectionObserver();
				}
				else {
					return obs;
				}
                                // 設置 channel.attr(OWNER) 爲新建立的 PendingConnectionObserver
                                // 以後再次調用 own() 方法時直接返回該 PendingConnectionObserver
				if (channel.attr(OWNER)
				           .compareAndSet(null, obs)) {
					return obs;
				}
			}
}

final static class PendingConnectionObserver implements ConnectionObserver {

		final Queue<Pending> pendingQueue = Queues.<Pending>unbounded(4).get();

		@Override
		public void onUncaughtException(Connection connection, Throwable error) {
			pendingQueue.add(new Pending(connection, error, null));
		}

		@Override
		public void onStateChange(Connection connection, State newState) {
                        // 把狀態變動放入了等待隊列,其餘什麼都不作
			pendingQueue.add(new Pending(connection, null, newState));
		}

		static class Pending {
			final Connection connection;
			final Throwable error;
			final State state;

			Pending(Connection connection, @Nullable Throwable error, @Nullable State state) {
				this.connection = connection;
				this.error = error;
				this.state = state;
			}
		}
	}
複製代碼

從上面代碼可知,Channel 的狀態變動最終放入了一個等待隊列,缺乏了通知各個監聽器的調用。繼續回到 DisposableAcquire 類,發現同時實現了 Runnable 接口。

final static class DisposableAcquire implements Disposable, GenericFutureListener<Future<Channel>>, ConnectionObserver , Runnable {

    final Future<Channel>      f;
    final MonoSink<Connection> sink;
    final Pool                 pool;
    final ConnectionObserver   obs;
    final boolean              retried;

    @Override
    public void onStateChange(Connection connection, State newState) {
        if (newState == State.CONFIGURED) {
            sink.success(connection);
        }
        obs.onStateChange(connection, newState);
    }

    @Override
    public void run() {
        Channel c = f.getNow();
        pool.activeConnections.incrementAndGet();
        pool.inactiveConnections.decrementAndGet();
	// 以前 owner() 方法設置了 PendingConnectionObserver
        ConnectionObserver current = c.attr(OWNER)
                                        .getAndSet(this);

        if (current instanceof PendingConnectionObserver) {
            PendingConnectionObserver pending = (PendingConnectionObserver)current;
            PendingConnectionObserver.Pending p;
            current = null;
            // 監聽鏈接關閉
            registerClose(c, pool);		
	    // 依次處理等待隊列中的事件(鏈接狀態變動)
            while((p = pending.pendingQueue.poll()) != null) {
                if (p.error != null) {
                    onUncaughtException(p.connection, p.error);
                }
                else if (p.state != null) {
                    // 通知各個監聽器
                    onStateChange(p.connection, p.state);
                }
            }
        }
        else if (current == null) {
            registerClose(c, pool);
        }
	// TODO: 什麼狀況會走這邊?
        if (current != null) {
            Connection conn = Connection.from(c);
            if (log.isDebugEnabled()) {
                log.debug(format(c, "Channel acquired, now {} active connections and {} inactive connections"),
                        pool.activeConnections, pool.inactiveConnections);
            }
            obs.onStateChange(conn, State.ACQUIRED);

            PooledConnection con = conn.as(PooledConnection.class);
            if (con != null) {
                ChannelOperations<?, ?> ops = pool.opsFactory.create(con, con, null);
                if (ops != null) {
                    ops.bind();
                    obs.onStateChange(ops, State.CONFIGURED);
                    sink.success(ops);
                }
                else {
                    //already configured, just forward the connection
                    sink.success(con);
                }
            }
            else {
                //already bound, just forward the connection
                sink.success(conn);
            }
            return;
        }
        //Connected, leave onStateChange forward the event if factory
				...
        if (pool.opsFactory == ChannelOperations.OnSetup.empty()) {
            sink.success(Connection.from(c));
        }
    }
}
複製代碼

至此,TcpClient 示例程序中的幾行代碼差很少就算是分析完了。

相關文章
相關標籤/搜索