使用netty作文件的上傳

這個例子可以使用多線程處理,只須要在須要返回數據時,調用一下sendAndClose的方法,具體看例子吧,這個例子並無作異常處理等,真實狀況下,須要保證出錯時關閉connection,而不是等待超時。 css

只是測試代碼,因此代碼在貼出來時沒有通過修改。 html

... import static org.jboss.netty.buffer.ChannelBuffers.copiedBuffer; import static org.jboss.netty.channel.Channels.connect; import static org.jboss.netty.channel.Channels.pipeline; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK; import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; /** * 12-8-20 下午6:21 * * @author jiaguotian Copyright 2012 Tianjiaguo.com Inc. All Rights Reserved. */ public class UploadTest { public static void main(String[] args) { final Timer timer = new HashedWheelTimer(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = pipeline(); pipeline.addLast("timeoutChecker", new ReadTimeoutHandler(timer, 30)); pipeline.addLast("decoder", new HttpRequestDecoder()); pipeline.addLast("aggregator", new HttpChunkAggregator(65536)); pipeline.addLast("encoder", new HttpResponseEncoder()); pipeline.addLast("handler", new ChannelHandler()); return pipeline; } }); bootstrap.setFactory(new NioServerSocketChannelFactory( Executors.newFixedThreadPool(1), Executors.newFixedThreadPool(1))); Channel channel = bootstrap.bind(new InetSocketAddress(8888)); if (channel.isConnected()) { System.out.println("connected"); } } private static class Connection { private static final AtomicLong ID_SEQ = new AtomicLong(0); private final long id = ID_SEQ.getAndIncrement(); private final HttpRequest httpRequest; private final ChannelHandlerContext ctx; private Connection(HttpRequest httpRequest, ChannelHandlerContext ctx) { this.httpRequest = httpRequest; this.ctx = ctx; } public long getId() { return this.id; } public void send(String message) { ctx.getChannel().write(copiedBuffer(message, CharsetUtil.UTF_8)); } public void sendAndClose(String message) { ChannelFuture future = ctx.getChannel().write(copiedBuffer(message, CharsetUtil.UTF_8)); future.addListener(ChannelFutureListener.CLOSE); } } private static class ChannelHandler extends SimpleChannelUpstreamHandler { private static final ExecutorService executor = Executors.newFixedThreadPool(20); public ChannelHandler() { } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { executor.execute(new Runnable() { @Override public void run() { } }); } @Override public void messageReceived(final ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception { final HttpRequest httpRequest = (HttpRequest) messageEvent.getMessage(); final HttpResponse httpResponse = new DefaultHttpResponse(HTTP_1_1, OK); final org.jboss.netty.channel.ChannelHandler channelHandler = this; executor.execute(new Runnable() { @Override public void run() { try { QueryStringDecoder queryStringDecoder = new QueryStringDecoder(httpRequest.getUri()); String path = queryStringDecoder.getPath(); System.out.println(path); httpResponse.setStatus(HttpResponseStatus.OK); httpResponse.addHeader("Content-Type", "text/event-stream"); httpResponse.addHeader("Cache-Control", "no-cache"); ctx.getChannel().write(httpResponse); ChannelPipeline p = ctx.getChannel().getPipeline(); p.remove("aggregator"); p.replace("handler", "sse_handler", channelHandler); final Map<String, Object> parameters = new HashMap<String, Object>(); final Connection connection = new Connection(httpRequest, ctx); HttpMethod method = httpRequest.getMethod(); if (method.equals(HttpMethod.POST) || method.equals(HttpMethod.PUT)) { HttpPostRequestDecoder postRequestDecoder = new HttpPostRequestDecoder( new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE), httpRequest, CharsetUtil.UTF_8); List<InterfaceHttpData> dataList = postRequestDecoder.getBodyHttpDatas(); for (InterfaceHttpData data : dataList) { String name = data.getName(); if (InterfaceHttpData.HttpDataType.Attribute == data.getHttpDataType()) { MixedAttribute attribute = (MixedAttribute) data; attribute.setCharset(CharsetUtil.UTF_8); parameters.put(name, attribute.getValue()); } else if (InterfaceHttpData.HttpDataType.FileUpload == data.getHttpDataType()) { FileUpload fileUpload = (FileUpload) data; File dest = File.createTempFile("up_", null, new File("/tmp")); fileUpload.renameTo(dest); parameters.put("name", dest.getAbsolutePath()); } else if (InterfaceHttpData.HttpDataType.InternalAttribute == data.getHttpDataType()) { } } } new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(new Random().nextInt(10000)); } catch (InterruptedException e) { e.printStackTrace(); } connection.sendAndClose(parameters.toString()); } }).start(); } catch (Exception exception) { exception.printStackTrace(); } } }); } } } // upload.html <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/> <title>Upload Demo</title> <style type="text/css" media="all">label{display:block;}</style> </head> <body> <h2>Multi Upload Demo</h2> <hr/> <form action="http://localhost:8888/" enctype="multipart/form-data" method="post"> <label>文件: <input type="file" name="Filedata"/> </label> <label>用戶ID: <input type="text" name="uid" value="100000"/> </label> <input type="submit" value="Submit"/> </form> </body> </html>
相關文章
相關標籤/搜索