本文主要研究一下nacos的TcpSuperSenseProcessorjava
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.javagit
@Component public class TcpSuperSenseProcessor implements HealthCheckProcessor, Runnable { @Autowired private HealthCheckCommon healthCheckCommon; @Autowired private SwitchDomain switchDomain; public static final int CONNECT_TIMEOUT_MS = 500; private Map<String, BeatKey> keyMap = new ConcurrentHashMap<>(); private BlockingQueue<Beat> taskQueue = new LinkedBlockingQueue<Beat>(); /** * this value has been carefully tuned, do not modify unless you're confident */ private static final int NIO_THREAD_COUNT = Runtime.getRuntime().availableProcessors() <= 1 ? 1 : Runtime.getRuntime().availableProcessors() / 2; /** * because some hosts doesn't support keep-alive connections, disabled temporarily */ private static final long TCP_KEEP_ALIVE_MILLIS = 0; private static ScheduledExecutorService TCP_CHECK_EXECUTOR = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("nacos.naming.tcp.check.worker"); t.setDaemon(true); return t; } }); private static ScheduledExecutorService NIO_EXECUTOR = Executors.newScheduledThreadPool(NIO_THREAD_COUNT, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("nacos.supersense.checker"); return thread; } } ); private Selector selector; public TcpSuperSenseProcessor() { try { selector = Selector.open(); TCP_CHECK_EXECUTOR.submit(this); } catch (Exception e) { throw new IllegalStateException("Error while initializing SuperSense(TM)."); } } @Override public void process(HealthCheckTask task) { List<Instance> ips = task.getCluster().allIPs(false); if (CollectionUtils.isEmpty(ips)) { return; } for (Instance ip : ips) { if (ip.isMarked()) { if (SRV_LOG.isDebugEnabled()) { SRV_LOG.debug("tcp check, ip is marked as to skip health check, ip:" + ip.getIp()); } continue; } if (!ip.markChecking()) { SRV_LOG.warn("tcp check started before last one finished, service: " + task.getCluster().getService().getName() + ":" + task.getCluster().getName() + ":" + ip.getIp() + ":" + ip.getPort()); healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2, task, switchDomain.getTcpHealthParams()); continue; } Beat beat = new Beat(ip, task); taskQueue.add(beat); MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet(); } } private void processTask() throws Exception { Collection<Callable<Void>> tasks = new LinkedList<>(); do { Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS); if (beat == null) { return; } tasks.add(new TaskProcessor(beat)); } while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64); for (Future<?> f : NIO_EXECUTOR.invokeAll(tasks)) { f.get(); } } @Override public void run() { while (true) { try { processTask(); int readyCount = selector.selectNow(); if (readyCount <= 0) { continue; } Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); NIO_EXECUTOR.execute(new PostProcessor(key)); } } catch (Throwable e) { SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e); } } } //...... @Override public String getType() { return "TCP"; } }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.javagithub
public class PostProcessor implements Runnable { SelectionKey key; public PostProcessor(SelectionKey key) { this.key = key; } @Override public void run() { Beat beat = (Beat) key.attachment(); SocketChannel channel = (SocketChannel) key.channel(); try { if (!beat.isHealthy()) { //invalid beat means this server is no longer responsible for the current service key.cancel(); key.channel().close(); beat.finishCheck(); return; } if (key.isValid() && key.isConnectable()) { //connected channel.finishConnect(); beat.finishCheck(true, false, System.currentTimeMillis() - beat.getTask().getStartTime(), "tcp:ok+"); } if (key.isValid() && key.isReadable()) { //disconnected ByteBuffer buffer = ByteBuffer.allocate(128); if (channel.read(buffer) == -1) { key.cancel(); key.channel().close(); } else { // not terminate request, ignore } } } catch (ConnectException e) { // unable to connect, possibly port not opened beat.finishCheck(false, true, switchDomain.getTcpHealthParams().getMax(), "tcp:unable2connect:" + e.getMessage()); } catch (Exception e) { beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(), "tcp:error:" + e.getMessage()); try { key.cancel(); key.channel().close(); } catch (Exception ignore) { } } } }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.javaless
private class TaskProcessor implements Callable<Void> { private static final int MAX_WAIT_TIME_MILLISECONDS = 500; Beat beat; public TaskProcessor(Beat beat) { this.beat = beat; } @Override public Void call() { long waited = System.currentTimeMillis() - beat.getStartTime(); if (waited > MAX_WAIT_TIME_MILLISECONDS) { Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms"); } SocketChannel channel = null; try { Instance instance = beat.getIp(); Cluster cluster = beat.getTask().getCluster(); BeatKey beatKey = keyMap.get(beat.toString()); if (beatKey != null && beatKey.key.isValid()) { if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) { instance.setBeingChecked(false); return null; } beatKey.key.cancel(); beatKey.key.channel().close(); } channel = SocketChannel.open(); channel.configureBlocking(false); // only by setting this can we make the socket close event asynchronous channel.socket().setSoLinger(false, -1); channel.socket().setReuseAddress(true); channel.socket().setKeepAlive(true); channel.socket().setTcpNoDelay(true); int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport(); channel.connect(new InetSocketAddress(instance.getIp(), port)); SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ); key.attach(beat); keyMap.put(beat.toString(), new BeatKey(key)); beat.setStartTime(System.currentTimeMillis()); NIO_EXECUTOR.schedule(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (Exception e) { beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(), "tcp:error:" + e.getMessage()); if (channel != null) { try { channel.close(); } catch (Exception ignore) { } } } return null; } }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.java異步
private static class TimeOutTask implements Runnable { SelectionKey key; public TimeOutTask(SelectionKey key) { this.key = key; } @Override public void run() { if (key != null && key.isValid()) { SocketChannel channel = (SocketChannel) key.channel(); Beat beat = (Beat) key.attachment(); if (channel.isConnected()) { return; } try { channel.finishConnect(); } catch (Exception ignore) { } try { beat.finishCheck(false, false, beat.getTask().getCheckRTNormalized() * 2, "tcp:timeout"); key.cancel(); key.channel().close(); } catch (Exception ignore) { } } } }