抽空記錄下,因爲項目須要把收集的日誌copy一份到一個nosql裏面時時的去分析查看,因此就經過flume 自帶的攔截功能把日誌分發一份。java
package com.autohome.flume.interceptor; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.cloudera.flume.conf.Context; import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder; import com.cloudera.flume.core.Event; import com.cloudera.flume.core.EventSink; import com.cloudera.flume.core.EventSinkDecorator; import com.cloudera.util.Pair; public class ParserDecorator<S extends EventSink> extends EventSinkDecorator<S> { private static final Logger logger = LoggerFactory .getLogger(DefaultStreamProcessor.class); public ParserDecorator(S s, String... argvs) { super(s); } @Override public void append(Event e) throws IOException, InterruptedException { // TODO Auto-generated method stub super.append(e); logger.info("ParserDecorator initialized****************************************************************"); Payload payLoad = new Payload(null, e.getBody(), e.getTimestamp(), e.getHost()); Processor processor = new DefaultStreamProcessor(); processor.action(payLoad); } public static SinkDecoBuilder builder() { return new SinkDecoBuilder() { @Override public EventSinkDecorator<EventSink> build(Context context, String... argv) { return new ParserDecorator<EventSink>(null, argv); } }; } public static List<Pair<String, SinkDecoBuilder>> getDecoratorBuilders() { List<Pair<String, SinkDecoBuilder>> builders = new ArrayList<Pair<String, SinkDecoBuilder>>(); builders.add(new Pair<String, SinkDecoBuilder>("ParserDecorator", builder())); return builders; } @Override public void close() throws IOException, InterruptedException { // TODO Auto-generated method stub super.close(); } @Override public void open() throws IOException, InterruptedException { // TODO Auto-generated method stub super.open(); } }
package com.autohome.flume.interceptor; import java.util.Map; public class Payload { private final Map<String, String> headers; private final byte[] body; private final long timeStamp; private final String host; public Payload(Map<String, String> headers, byte[] body,long timeStamp,String host) { this.headers = headers; this.body = body; this.timeStamp=timeStamp; this.host=host; } public Map<String, String> getHeaders() { return headers; } public byte[] getBody() { return body; } public long timeStamp(){ return timeStamp; } public String host(){ return host; } }
package com.autohome.flume.interceptor; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.autohome.flume.http.HttpClient; import com.autohome.flume.http.HttpRequest; import com.autohome.flume.util.PropertiesHelper; public class DefaultStreamProcessor implements Processor { private static final Logger logger = LoggerFactory .getLogger(DefaultStreamProcessor.class); @Override public void initialize() { // TODO Auto-generated method stub logger.info("default stream processor initialized."); } private static final String url = PropertiesHelper.getProperties("flume-interceptor.properties", "httpRequest.url").trim(); private static final String method=PropertiesHelper.getProperties("flume-interceptor.properties", "httpRequest.method").trim(); @Override public void action(Payload payload) { // TODO Auto-generated method stub logger.info("receive a payload:"); HttpRequest httpRequest = new HttpRequest(url, method); Map<String, String> map = new HashMap<String, String>(); String strInfo = new String(payload.getBody()); map.put("data", strInfo); httpRequest.setBody(map); HttpClient httpClient = new HttpClient(); try { httpClient.execute(httpRequest); } catch (IOException ex) { logger.error(ex.getStackTrace().toString()); } catch (Exception ex) { logger.error(ex.getStackTrace().toString()); } } @Override public void close() { // TODO Auto-generated method stub } }