public final class RestClientConfiguration { @Nullable private final SSLHandlerFactory sslHandlerFactory; private final long connectionTimeout; private final long idlenessTimeout; private final int maxContentLength; private RestClientConfiguration( @Nullable final SSLHandlerFactory sslHandlerFactory, final long connectionTimeout, final long idlenessTimeout, final int maxContentLength) { checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength); this.sslHandlerFactory = sslHandlerFactory; this.connectionTimeout = connectionTimeout; this.idlenessTimeout = idlenessTimeout; this.maxContentLength = maxContentLength; } /** * Returns the {@link SSLEngine} that the REST client endpoint should use. * * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled */ @Nullable public SSLHandlerFactory getSslHandlerFactory() { return sslHandlerFactory; } /** * {@see RestOptions#CONNECTION_TIMEOUT}. */ public long getConnectionTimeout() { return connectionTimeout; } /** * {@see RestOptions#IDLENESS_TIMEOUT}. */ public long getIdlenessTimeout() { return idlenessTimeout; } /** * Returns the max content length that the REST client endpoint could handle. * * @return max content length that the REST client endpoint could handle */ public int getMaxContentLength() { return maxContentLength; } /** * Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}. * * @param config configuration from which the REST client endpoint configuration should be created from * @return REST client endpoint configuration * @throws ConfigurationException if SSL was configured incorrectly */ public static RestClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException { Preconditions.checkNotNull(config); final SSLHandlerFactory sslHandlerFactory; if (SSLUtils.isRestSSLEnabled(config)) { try { sslHandlerFactory = SSLUtils.createRestClientSSLEngineFactory(config); } catch (Exception e) { throw new ConfigurationException("Failed to initialize SSLContext for the REST client", e); } } else { sslHandlerFactory = null; } final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT); final long idlenessTimeout = config.getLong(RestOptions.IDLENESS_TIMEOUT); int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH); return new RestClientConfiguration(sslHandlerFactory, connectionTimeout, idlenessTimeout, maxContentLength); } }
public class RestClient implements AutoCloseableAsync { private static final Logger LOG = LoggerFactory.getLogger(RestClient.class); private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); // used to open connections to a rest server endpoint private final Executor executor; private final Bootstrap bootstrap; private final CompletableFuture<Void> terminationFuture; private final AtomicBoolean isRunning = new AtomicBoolean(true); public RestClient(RestClientConfiguration configuration, Executor executor) { Preconditions.checkNotNull(configuration); this.executor = Preconditions.checkNotNull(executor); this.terminationFuture = new CompletableFuture<>(); final SSLHandlerFactory sslHandlerFactory = configuration.getSslHandlerFactory(); ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) { try { // SSL should be the first handler in the pipeline if (sslHandlerFactory != null) { socketChannel.pipeline().addLast("ssl", sslHandlerFactory.createNettySSLHandler()); } socketChannel.pipeline() .addLast(new HttpClientCodec()) .addLast(new HttpObjectAggregator(configuration.getMaxContentLength())) .addLast(new ChunkedWriteHandler()) // required for multipart-requests .addLast(new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS)) .addLast(new ClientHandler()); } catch (Throwable t) { t.printStackTrace(); ExceptionUtils.rethrow(t); } } }; NioEventLoopGroup group = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-client-netty")); bootstrap = new Bootstrap(); bootstrap .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(configuration.getConnectionTimeout())) .group(group) .channel(NioSocketChannel.class) .handler(initializer); LOG.info("Rest client endpoint started."); } @Override public CompletableFuture<Void> closeAsync() { return shutdownInternally(Time.seconds(10L)); } public void shutdown(Time timeout) { final CompletableFuture<Void> shutDownFuture = shutdownInternally(timeout); try { shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); LOG.info("Rest endpoint shutdown complete."); } catch (Exception e) { LOG.warn("Rest endpoint shutdown failed.", e); } } private CompletableFuture<Void> shutdownInternally(Time timeout) { if (isRunning.compareAndSet(true, false)) { LOG.info("Shutting down rest endpoint."); if (bootstrap != null) { if (bootstrap.group() != null) { bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) .addListener(finished -> { if (finished.isSuccess()) { terminationFuture.complete(null); } else { terminationFuture.completeExceptionally(finished.cause()); } }); } } } return terminationFuture; } //...... }