【Canal源碼分析】client工做過程

client的工做過程,須要咱們本身去編寫對應的邏輯,咱們目前只能從example寫的例子來看。目前examle中提供了兩個例子,一個是單機的,一個是集羣的cluster,咱們後續若是須要進行開發的話,其實也是開發咱們本身的client,以及client的一些邏輯。咱們主要看下集羣的client是如何實現和消費的,又是怎麼和server進行數據交互的。java

咱們來看看具體的代碼:node

protected void process() {
    int batchSize = 5 * 1024;
    while (running) {
        try {
            MDC.put("destination", destination);
            connector.connect();
            connector.subscribe();
            waiting = false;
            while (running) {
                Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    // try {
                    // Thread.sleep(1000);
                    // } catch (InterruptedException e) {
                    // }
                } else {
                    printSummary(message, batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交確認
                // connector.rollback(batchId); // 處理失敗, 回滾數據
            }
        } catch (Exception e) {
            logger.error("process error!", e);
        } finally {
            connector.disconnect();
            MDC.remove("destination");
        }
    }
}

這個的這樣的過程是這樣的數據庫

  • 鏈接,connector.connect()
  • 訂閱,connector.subscribe
  • 獲取數據,connector.getWithoutAck()
  • 業務處理
  • 提交確認,connector.ack()
  • 回滾,connector.rollback()
  • 斷開鏈接,connector.disconnect()

咱們具體來看下。編程

1、創建鏈接

CanalConnector主要有兩個實現,一個是SimpleCanalConnector,一個是ClusterCanalConnector,咱們主要看下ClusterCanalConnector,這也是咱們要用的一個模式。json

咱們用的時候,經過一個工廠類生成咱們須要的Connector,這裏的工廠類是CanalConnectors,裏面包含了生成ClusterCanalConnector的方法。服務器

public static CanalConnector newClusterConnector(String zkServers, String destination, String username,
                                                 String password) {
    ClusterCanalConnector canalConnector = new ClusterCanalConnector(username,
        password,
        destination,
        new ClusterNodeAccessStrategy(destination, ZkClientx.getZkClient(zkServers)));
    canalConnector.setSoTimeout(30 * 1000);
    return canalConnector;
}

用到的參數有zk的地址,canal的名稱,數據庫的帳號密碼。裏面有個ClusterNodeAccessStrategy是用來選擇client的策略,這個ClusterNodeAccessStrategy的構造方法裏面有些東西須要咱們關注下。網絡

1.1 ClusterNodeAccessStrategy

public ClusterNodeAccessStrategy(String destination, ZkClientx zkClient){
    this.zkClient = zkClient;
    childListener = new IZkChildListener() {

        public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
            initClusters(currentChilds);
        }

    };

    dataListener = new IZkDataListener() {

        public void handleDataDeleted(String dataPath) throws Exception {
            runningAddress = null;
        }

        public void handleDataChange(String dataPath, Object data) throws Exception {
            initRunning(data);
        }

    };

    String clusterPath = ZookeeperPathUtils.getDestinationClusterRoot(destination);
    this.zkClient.subscribeChildChanges(clusterPath, childListener);
    initClusters(this.zkClient.getChildren(clusterPath));

    String runningPath = ZookeeperPathUtils.getDestinationServerRunning(destination);
    this.zkClient.subscribeDataChanges(runningPath, dataListener);
    initRunning(this.zkClient.readData(runningPath, true));
}

這邊起了兩個監聽器,都是監聽server端的活動服務器的。一個是獲取全部的server列表,一個是獲取活動的server服務器,都是從zk的對應節點上去取的。session

1.2 鏈接connect

獲取到CanalConnector以後,就是真正的鏈接了。在ClusterCanalConnector中,咱們能夠看到,其實他底層用的也是SimpleCanalConnector,只不過加了一個選擇的策略。app

public void connect() throws CanalClientException {
    if (connected) {
        return;
    }

    if (runningMonitor != null) {
        if (!runningMonitor.isStart()) {
            runningMonitor.start();
        }
    } else {
        waitClientRunning();
        if (!running) {
            return;
        }
        doConnect();
        if (filter != null) { // 若是存在條件,說明是自動切換,基於上一次的條件訂閱一次
            subscribe(filter);
        }
        if (rollbackOnConnect) {
            rollback();
        }
    }

    connected = true;
}

若是是集羣模式的客戶端,那麼這邊的runningMonitor不爲空,由於他進行了初始化。咱們主要看下runningMonitor.start()裏面的操做。socket

public void start() {
    super.start();

    String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
    zkClient.subscribeDataChanges(path, dataListener);
    initRunning();
}

這邊監聽的路徑是:/otter/canal/destinations/{destination}/{clientId}/running。若是有任何的變化,或節點的刪除,那麼執行dataListener裏面的操做。

dataListener = new IZkDataListener() {

    public void handleDataChange(String dataPath, Object data) throws Exception {
        MDC.put("destination", destination);
        ClientRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ClientRunningData.class);
        if (!isMine(runningData.getAddress())) {
            mutex.set(false);
        }

        if (!runningData.isActive() && isMine(runningData.getAddress())) { // 說明出現了主動釋放的操做,而且本機以前是active
            release = true;
            releaseRunning();// 完全釋放mainstem
        }

        activeData = (ClientRunningData) runningData;
    }

    public void handleDataDeleted(String dataPath) throws Exception {
        MDC.put("destination", destination);
        mutex.set(false);
        // 觸發一下退出,多是人爲干預的釋放操做或者網絡閃斷引發的session expired timeout
        processActiveExit();
        if (!release && activeData != null && isMine(activeData.getAddress())) {
            // 若是上一次active的狀態就是本機,則即時觸發一下active搶佔
            initRunning();
        } else {
            // 不然就是等待delayTime,避免因網絡瞬端或者zk異常,致使出現頻繁的切換操做
            delayExector.schedule(new Runnable() {

                public void run() {
                    initRunning();
                }
            }, delayTime, TimeUnit.SECONDS);
        }
    }

};

這裏的註釋比較清楚,基本上若是數據發生了變化,那麼進行節點釋放後,將運行節點置爲活動節點。若是發生了數據刪除,那麼直接觸發退出,若是上一次的active狀態是本機,那麼觸發一下active搶佔,不然等待delayTime,默認5s後重試。下面咱們主要看下initRunning。

1.3 initRunning

這塊主要是建立運行節點的臨時節點。節點路徑是/otter/canal/destinations/{destination}/{clientId},節點內容是ClientRunningData的json序列化結果。鏈接的代碼:

public InetSocketAddress processActiveEnter() {
    InetSocketAddress address = doConnect();
    mutex.set(true);
    if (filter != null) { // 若是存在條件,說明是自動切換,基於上一次的條件訂閱一次
        subscribe(filter);
    }

    if (rollbackOnConnect) {
        rollback();
    }

    return address;
}

這塊有幾段邏輯,咱們慢慢看下。

1.3.1 doConnect

這裏是client直接連上了server,經過socket鏈接,也就是server暴露的socket端口。

private InetSocketAddress doConnect() throws CanalClientException {
    try {
        channel = SocketChannel.open();
        channel.socket().setSoTimeout(soTimeout);
        SocketAddress address = getAddress();
        if (address == null) {
            address = getNextAddress();
        }
        channel.connect(address);
        readableChannel = Channels.newChannel(channel.socket().getInputStream());
        writableChannel = Channels.newChannel(channel.socket().getOutputStream());
        Packet p = Packet.parseFrom(readNextPacket());
        if (p.getVersion() != 1) {
            throw new CanalClientException("unsupported version at this client.");
        }

        if (p.getType() != PacketType.HANDSHAKE) {
            throw new CanalClientException("expect handshake but found other type.");
        }
        //
        Handshake handshake = Handshake.parseFrom(p.getBody());
        supportedCompressions.addAll(handshake.getSupportedCompressionsList());
        //
        ClientAuth ca = ClientAuth.newBuilder()
            .setUsername(username != null ? username : "")
            .setPassword(ByteString.copyFromUtf8(password != null ? password : ""))
            .setNetReadTimeout(soTimeout)
            .setNetWriteTimeout(soTimeout)
            .build();
        writeWithHeader(Packet.newBuilder()
            .setType(PacketType.CLIENTAUTHENTICATION)
            .setBody(ca.toByteString())
            .build()
            .toByteArray());
        //
        Packet ack = Packet.parseFrom(readNextPacket());
        if (ack.getType() != PacketType.ACK) {
            throw new CanalClientException("unexpected packet type when ack is expected");
        }

        Ack ackBody = Ack.parseFrom(ack.getBody());
        if (ackBody.getErrorCode() > 0) {
            throw new CanalClientException("something goes wrong when doing authentication: "
                                       + ackBody.getErrorMessage());
        }

        connected = true;
        return new InetSocketAddress(channel.socket().getLocalAddress(), channel.socket().getLocalPort());
    } catch (IOException e) {
        throw new CanalClientException(e);
    }
}

這邊採用NIO編程,創建和server的socket鏈接後,發送了握手包和認證包,當收到ack包後,認爲鏈接成功。認證包的服務端處理在ClientAuthenticationHandler類中,握手處理在HandshakeInitializationHandler類。

server接收到認證的消息後,會作以下的處理:

public void messageReceived(final ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
    final Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
    switch (packet.getVersion()) {
        case SUPPORTED_VERSION:
        default:
            final ClientAuth clientAuth = ClientAuth.parseFrom(packet.getBody());
            // 若是存在訂閱信息
            if (StringUtils.isNotEmpty(clientAuth.getDestination())
                && StringUtils.isNotEmpty(clientAuth.getClientId())) {
                ClientIdentity clientIdentity = new ClientIdentity(clientAuth.getDestination(),
                    Short.valueOf(clientAuth.getClientId()),
                    clientAuth.getFilter());
                try {
                    MDC.put("destination", clientIdentity.getDestination());
                    embeddedServer.subscribe(clientIdentity);
                    ctx.setAttachment(clientIdentity);// 設置狀態數據
                    // 嘗試啓動,若是已經啓動,忽略
                    if (!embeddedServer.isStart(clientIdentity.getDestination())) {
                        ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
                        if (!runningMonitor.isStart()) {
                            runningMonitor.start();
                        }
                    }
                } finally {
                    MDC.remove("destination");
                }
            }

            NettyUtils.ack(ctx.getChannel(), new ChannelFutureListener() {

                public void operationComplete(ChannelFuture future) throws Exception {
                    //忽略
                }

            });
            break;
    }
}

主要的邏輯在subscribe裏面。若是metaManager沒有啓動,那麼須要進行啓動。啓動時,會從zk節點下面拉取一些數據,包括客戶端的消費位點狀況等等。而後就是訂閱,訂閱是新建一個zk節點,路徑爲/otter/canal/destinations/{destination}/{clientId}。而後還有一些過濾器,也須要寫到zk中。以後就是獲取一下本client的位點信息,若是原來zk中包含,那麼直接從內存中獲取,不然取eventStore的第一條數據。

1.3.2 subscribe

發送訂閱消息給server,經過socket的方式。這邊是判斷,若是filter不爲空,才發送訂閱消息。服務端的處理過程是這樣的:

case SUBSCRIPTION:
    Sub sub = Sub.parseFrom(packet.getBody());
    if (StringUtils.isNotEmpty(sub.getDestination()) && StringUtils.isNotEmpty(sub.getClientId())) {
        clientIdentity = new ClientIdentity(sub.getDestination(),
                            Short.valueOf(sub.getClientId()),
                            sub.getFilter());
        MDC.put("destination", clientIdentity.getDestination());

        // 嘗試啓動,若是已經啓動,忽略
        if (!embeddedServer.isStart(clientIdentity.getDestination())) {
            ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
            if (!runningMonitor.isStart()) {
                runningMonitor.start();
            }
        }

        embeddedServer.subscribe(clientIdentity);
        ctx.setAttachment(clientIdentity);// 設置狀態數據
        NettyUtils.ack(ctx.getChannel(), null);
    } else {
        NettyUtils.error(401,
            MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage(),
            ctx.getChannel(),
            null);
}
break;

相似於connect的過程,不過這邊帶上了filter的參數。這邊啓動了server以及他的監聽器。

1.3.3 rollback

這裏的回滾是指回滾server端記錄的本client的位點信息。

public void rollback() throws CanalClientException {
    waitClientRunning();
    rollback(0);// 0代筆未設置
}

這裏發送了rollback的指令。服務端是這麼處理的:

case CLIENTROLLBACK:
    ClientRollback rollback = CanalPacket.ClientRollback.parseFrom(packet.getBody());
    MDC.put("destination", rollback.getDestination());
    if (StringUtils.isNotEmpty(rollback.getDestination())
        && StringUtils.isNotEmpty(rollback.getClientId())) {
        clientIdentity = new ClientIdentity(rollback.getDestination(),
            Short.valueOf(rollback.getClientId()));
        if (rollback.getBatchId() == 0L) {
            embeddedServer.rollback(clientIdentity);// 回滾全部批次
        } else {
            embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滾單個批次
        }
    } else {
        NettyUtils.error(401,
            MessageFormatter.format("destination or clientId is null", rollback.toString())
                .getMessage(),
            ctx.getChannel(),
            null);
    }
    break;

這裏的batchId傳入的是0,也就是要回滾全部的批次。咱們來看下這個回滾的動做:

@Override
public void rollback(ClientIdentity clientIdentity) throws CanalServerException {
    checkStart(clientIdentity.getDestination());
    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
    // 由於存在第一次連接時自動rollback的狀況,因此須要忽略未訂閱
    boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
    if (!hasSubscribe) {
        return;
    }

    synchronized (canalInstance) {
        // 清除batch信息
        canalInstance.getMetaManager().clearAllBatchs(clientIdentity);
        // rollback eventStore中的狀態信息
        canalInstance.getEventStore().rollback();
        logger.info("rollback successfully, clientId:{}", new Object[] { clientIdentity.getClientId() });
    }
}

這裏回滾的,實際上是eventStore中的指針,把get的指針設置爲以前ack的指針。

2、訂閱數據

當client鏈接server完成後,就須要進行binlog數據的訂閱。

public void subscribe() throws CanalClientException {
    subscribe(""); // 傳遞空字符便可
}

public void subscribe(String filter) throws CanalClientException {
    int times = 0;
    while (times < retryTimes) {
        try {
            currentConnector.subscribe(filter);
            this.filter = filter;
            return;
        } catch (Throwable t) {
            if (retryTimes == -1 && t.getCause() instanceof InterruptedException) {
                logger.info("block waiting interrupted by other thread.");
                return;
            } else {
                logger.warn(String.format(
                        "something goes wrong when subscribing from server: %s",
                        currentConnector != null ? currentConnector.getAddress() : "null"),
                        t);
                times++;
                restart();
                logger.info("restart the connector for next round retry.");
            }

        }
    }

    throw new CanalClientException("failed to subscribe after " + times + " times retry.");
}

訂閱這塊的內容再也不贅述,在上面的connect過程當中有提到。這邊還有一個失敗重試的機制,當異常不是中斷異常的狀況下,會重試重啓client connector,直到達到了閾值retryTimes。

3、獲取數據

在創建鏈接和進行數據訂閱以後,就能夠開始進行binlog數據的獲取了。主要的方法是getWithOutAck這個方法,這種是須要client本身進行數據ack的,保證了只有數據真正的被消費,並且進行了業務邏輯處理以後,纔會ack。固然,若是有了異常,也會進行必定次數的重試和重啓。

public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
    waitClientRunning();
    try {
        ...//忽略
        writeWithHeader(Packet.newBuilder()
            .setType(PacketType.GET)
            .setBody(Get.newBuilder()
            .setAutoAck(false)
            .setDestination(clientIdentity.getDestination())
            .setClientId(String.valueOf(clientIdentity.getClientId()))
                .setFetchSize(size)
                .setTimeout(time)
                .setUnit(unit.ordinal())
                .build()
                .toByteString())
            .build()
            .toByteArray());
        return receiveMessages();
    } catch (IOException e) {
        throw new CanalClientException(e);
    }
}

咱們能夠看到,實際上是發送了一個GET命令給server端,而後傳遞了一個參數batchSize,還有超時時間,並且不是自動提交的。服務端的處理是這樣的:

embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());

也是調用的這個方法:

@Override
public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
                                                throws CanalServerException {
    checkStart(clientIdentity.getDestination());
    checkSubscribe(clientIdentity);

    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
    synchronized (canalInstance) {
        // 獲取到流式數據中的最後一批獲取的位置
        PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);

        Events<Event> events = null;
        if (positionRanges != null) { // 存在流數據
            events = getEvents(canalInstance.getEventStore(), positionRanges.getStart(), batchSize, timeout, unit);
        } else {// ack後第一次獲取
            Position start = canalInstance.getMetaManager().getCursor(clientIdentity);
            if (start == null) { // 第一次,尚未過ack記錄,則獲取當前store中的第一條
                start = canalInstance.getEventStore().getFirstPosition();
            }

            events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);
        }

        if (CollectionUtils.isEmpty(events.getEvents())) {
            logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null",
                    clientIdentity.getClientId(), batchSize);
            return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪費性能
        } else {
            // 記錄到流式信息
            Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
            List<Entry> entrys = Lists.transform(events.getEvents(), new Function<Event, Entry>() {

                public Entry apply(Event input) {
                    return input.getEntry();
                }
            });
            if (logger.isInfoEnabled()) {
                logger.info("getWithoutAck successfully, clientId:{} batchSize:{}  real size is {} and result is [batchId:{} , position:{}]",
                        clientIdentity.getClientId(),
                        batchSize,
                        entrys.size(),
                        batchId,
                        events.getPositionRange());
            }
            return new Message(batchId, entrys);
        }

    }
}

最主要的邏輯在這裏:

  • 判斷canalInstance是否已經啓動:checkStart
  • 判斷訂閱列表中是否包含當前的client:checkSubscribe
  • 根據client信息從metaManager中獲取最後消費的批次:getLastestBatch,這塊在運行起來後,是從內存中取的,可是在instance啓動時,是從zk中拉取的,是從/otter/canal/destinations/{destination}/{clientId}/mark下面獲取的,後續也會定時(1s)刷新到這裏面
  • 若是能獲取到消費的批次,直接從eventStore的隊列中獲取數據。
  • 若是positionRanges爲空,那麼從metaManager中獲取指針。若是指針也沒有,說明原來沒有ack過數據,須要從store中第一條開始獲取。這個過程其實就是找start,也就是上一次ack的位置。
  • 調用getEvent,獲取數據。根據傳入的參數不一樣,調用不一樣的方法去獲取數據,可是最終都是調用的goGet方法。這個doGet方法不是很複雜,主要是根據參數從store隊列中獲取數據,而後把指針進行新的設置。
  • 若是沒有取到binlog數據,那麼直接返回,批次號爲-1。
  • 若是取到了數據,記錄一下流式數據後返回。

結果封裝在Messages中,最終改成Message,包含批次號和binlog列表。

4、業務處理

拿到message後,須要進行判斷batchId,若是batchId=-1或者binlog大小爲0,說明沒有拿到數據。不然在message基礎上進行邏輯處理。

Message的內容,後續咱們再進行討論。

5、提交確認

connector.ack(batchId); // 提交確認

提交批次id,底層發送CLIENTACK命令到server。server調用CanalServerWithEmbedded的ack方法來進行提交。

public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
    checkStart(clientIdentity.getDestination());
    checkSubscribe(clientIdentity);

    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
    PositionRange<LogPosition> positionRanges = null;
    positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); // 更新位置
    if (positionRanges == null) { // 說明是重複的ack/rollback
        throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check",
            clientIdentity.getClientId(),
            batchId));
    }

    // 更新cursor
    if (positionRanges.getAck() != null) {
        canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
        if (logger.isInfoEnabled()) {
            logger.info("ack successfully, clientId:{} batchId:{} position:{}",
                    clientIdentity.getClientId(),
                    batchId,
                    positionRanges);
        }
    }

    // 可定時清理數據
    canalInstance.getEventStore().ack(positionRanges.getEnd());

}

首先更新metaManager中的batch,而後更新ack指針,同時清理store中到ack指針位置的數據。

6、回滾

若是有失敗的狀況,須要進行回滾。發送CLIENTROLLBACK命令給server端,進行數據回滾。回滾單個批次時的處理邏輯是這樣的:

@Override
public void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException {
    checkStart(clientIdentity.getDestination());
    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());

    // 由於存在第一次連接時自動rollback的狀況,因此須要忽略未訂閱
    boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
    if (!hasSubscribe) {
        return;
    }
    synchronized (canalInstance) {
        // 清除batch信息
        PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity,
            batchId);
        if (positionRanges == null) { // 說明是重複的ack/rollback
            throw new CanalServerException(String.format("rollback error, clientId:%s batchId:%d is not exist , please check",
                clientIdentity.getClientId(),
                batchId));
        }

        // lastRollbackPostions.put(clientIdentity,
        // positionRanges.getEnd());// 記錄一下最後rollback的位置
        // TODO 後續rollback到指定的batchId位置
        canalInstance.getEventStore().rollback();// rollback
                                                 // eventStore中的狀態信息
        logger.info("rollback successfully, clientId:{} batchId:{} position:{}",
            clientIdentity.getClientId(),
            batchId,
            positionRanges);
    }
}

這裏的rollback到指定的batchId,實際上是假的。他的rollback也是全量回滾到ack的指針位置。

7、斷開鏈接

在發生異常狀況時,client會斷開與server的鏈接,也就是disconnect方法。

public void disconnect() throws CanalClientException {
    if (rollbackOnDisConnect && channel.isConnected()) {
        rollback();
    }

    connected = false;
    if (runningMonitor != null) {
        if (runningMonitor.isStart()) {
            runningMonitor.stop();
        }
    } else {
        doDisconnnect();
    }
}

判斷是否在斷開鏈接的時候回滾參數(默認false)和當前socket通道是否鏈接中,進行回滾。

不然調用runningMonitor.stop方法進行中止。主要的過程是這樣的:

  • 取消監聽/otter/canal/destinations/{destination}/{clientId}/running/節點變化信息
  • 刪除上面這個節點
  • 關閉socket的讀通道
  • 關閉socket的寫通道
  • 關閉socket channel
相關文章
相關標籤/搜索