本文主要研究一下sentinel的NettyHttpCommandCenterjava
com/alibaba/csp/sentinel/transport/command/NettyHttpCommandCenter.javagit
public class NettyHttpCommandCenter implements CommandCenter { private final HttpServer server = new HttpServer(); private final ExecutorService pool = Executors.newSingleThreadExecutor( new NamedThreadFactory("sentinel-netty-command-center-executor")); @Override public void start() throws Exception { pool.submit(new Runnable() { @Override public void run() { try { server.start(); } catch (Exception ex) { RecordLog.info("Start netty server error", ex); ex.printStackTrace(); System.exit(-1); } } }); } @Override public void stop() throws Exception { server.close(); pool.shutdownNow(); } @Override public void beforeStart() throws Exception { // Register handlers Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers(); server.registerCommands(handlers); } }
com/alibaba/csp/sentinel/transport/command/netty/HttpServer.javagithub
public final class HttpServer { private static final int DEFAULT_PORT = 8719; private Channel channel; final static Map<String, CommandHandler> handlerMap = new ConcurrentHashMap<String, CommandHandler>(); public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new HttpServerInitializer()); int port; try { if (StringUtil.isEmpty(TransportConfig.getPort())) { CommandCenterLog.info("Port not configured, using default port: " + DEFAULT_PORT); port = DEFAULT_PORT; } else { port = Integer.parseInt(TransportConfig.getPort()); } } catch (Exception e) { throw new IllegalArgumentException("Illegal port: " + TransportConfig.getPort()); } channel = b.bind(port).sync().channel(); channel.closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public void close() { channel.close(); } public void registerCommand(String commandName, CommandHandler handler) { if (StringUtil.isEmpty(commandName) || handler == null) { return; } if (handlerMap.containsKey(commandName)) { CommandCenterLog.info("Register failed (duplicate command): " + commandName); return; } handlerMap.put(commandName, handler); } public void registerCommands(Map<String, CommandHandler> handlerMap) { if (handlerMap != null) { for (Entry<String, CommandHandler> e : handlerMap.entrySet()) { registerCommand(e.getKey(), e.getValue()); } } } }
com/alibaba/csp/sentinel/transport/command/netty/HttpServerInitializer.javasegmentfault
public class HttpServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); p.addLast(new HttpRequestDecoder()); p.addLast(new HttpObjectAggregator(1024 * 1024)); p.addLast(new HttpResponseEncoder()); p.addLast(new HttpServerHandler()); } }
com/alibaba/csp/sentinel/transport/command/netty/HttpServerHandler.java異步
public class HttpServerHandler extends SimpleChannelInboundHandler<Object> { private final CodecRegistry codecRegistry = new CodecRegistry(); @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { FullHttpRequest httpRequest = (FullHttpRequest)msg; try { CommandRequest request = parseRequest(httpRequest); if (StringUtil.isBlank(HttpCommandUtils.getTarget(request))) { writeErrorResponse(BAD_REQUEST.code(), "Invalid command", ctx); return; } handleRequest(request, ctx, HttpUtil.isKeepAlive(httpRequest)); } catch (Exception ex) { writeErrorResponse(INTERNAL_SERVER_ERROR.code(), SERVER_ERROR_MESSAGE, ctx); CommandCenterLog.warn("Internal error", ex); } } private void handleRequest(CommandRequest request, ChannelHandlerContext ctx, boolean keepAlive) throws Exception { String commandName = HttpCommandUtils.getTarget(request); // Find the matching command handler. CommandHandler<?> commandHandler = getHandler(commandName); if (commandHandler != null) { CommandResponse<?> response = commandHandler.handle(request); writeResponse(response, ctx, keepAlive); } else { // No matching command handler. writeErrorResponse(BAD_REQUEST.code(), String.format("Unknown command \"%s\"", commandName), ctx); } } private Encoder<?> pickEncoder(Class<?> clazz) { if (clazz == null) { throw new IllegalArgumentException("Bad class metadata"); } for (Encoder<?> encoder : codecRegistry.getEncoderList()) { if (encoder.canEncode(clazz)) { return encoder; } } return null; } private void writeErrorResponse(int statusCode, String message, ChannelHandlerContext ctx) { FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(statusCode), Unpooled.copiedBuffer(message, Charset.forName(SentinelConfig.charset()))); httpResponse.headers().set("Content-Type", "text/plain; charset=" + SentinelConfig.charset()); ctx.write(httpResponse); ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } private void writeResponse(CommandResponse response, ChannelHandlerContext ctx, boolean keepAlive) throws Exception { byte[] body; if (response.isSuccess()) { if (response.getResult() == null) { body = new byte[] {}; } else { Encoder encoder = pickEncoder(response.getResult().getClass()); if (encoder == null) { writeErrorResponse(INTERNAL_SERVER_ERROR.code(), SERVER_ERROR_MESSAGE, ctx); CommandCenterLog.warn("Error when encoding object", new IllegalStateException("No compatible encoder")); return; } body = encoder.encode(response.getResult()); } } else { body = response.getException().getMessage().getBytes(SentinelConfig.charset()); } HttpResponseStatus status = response.isSuccess() ? OK : BAD_REQUEST; FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, Unpooled.copiedBuffer(body)); httpResponse.headers().set("Content-Type", "text/plain; charset=" + SentinelConfig.charset()); //if (keepAlive) { // httpResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, httpResponse.content().readableBytes()); // httpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); //} //ctx.write(httpResponse); //if (!keepAlive) { // ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); //} httpResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, httpResponse.content().readableBytes()); httpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); ctx.write(httpResponse); ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } private CommandRequest parseRequest(FullHttpRequest request) { QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.uri()); CommandRequest serverRequest = new CommandRequest(); Map<String, List<String>> paramMap = queryStringDecoder.parameters(); // Parse request parameters. if (!paramMap.isEmpty()) { for (Entry<String, List<String>> p : paramMap.entrySet()) { if (!p.getValue().isEmpty()) { serverRequest.addParam(p.getKey(), p.getValue().get(0)); } } } // Parse command name. String target = parseTarget(queryStringDecoder.rawPath()); serverRequest.addMetadata(HttpCommandUtils.REQUEST_TARGET, target); // Parse body. if (request.content().readableBytes() <= 0) { serverRequest.setBody(null); } else { serverRequest.setBody(request.content().array()); } return serverRequest; } private String parseTarget(String uri) { if (StringUtil.isEmpty(uri)) { return ""; } String[] arr = uri.split("/"); if (arr.length < 2) { return ""; } return arr[1]; } private CommandHandler getHandler(String commandName) { if (StringUtil.isEmpty(commandName)) { return null; } return HttpServer.handlerMap.get(commandName); } private void send100Continue(ChannelHandlerContext ctx) { FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE); ctx.write(response); } private static final String SERVER_ERROR_MESSAGE = "Command server error"; }
NettyHttpCommandCenter提供的是基於netty的http實現,sentinel-transport還有一個SimpleHttpCommandCenter,是基於java socket的bio外加工做線程池模式的實現。socket