由於以前作了hbasesink的序列化類,以爲寫hdfs的應該會很簡單,但是沒想到居然不同。hdfs並無直接配置序列化類的選項須要根據fileType來選擇對相應序列化類,咱們使用的datastream的類型,對應的類是HDFSDataStream,這個類默認的序列化類TEXT(這是個枚舉類型) java
serializerType = context.getString("serializer", "TEXT");
public enum EventSerializerType { TEXT(BodyTextEventSerializer.Builder.class), HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.class), AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.class), CUSTOM(CUSTOMEventSerializer.Builder.class),//自定義的序列化類 OTHER(null); private final Class<? extends EventSerializer.Builder> builderClass; EventSerializerType(Class<? extends EventSerializer.Builder> builderClass) { this.builderClass = builderClass; } public Class<? extends EventSerializer.Builder> getBuilderClass() { return builderClass; } }
在裏面加了自定義的類型和枚舉,在配置agent的時候配置好filetype和serializer便可,一樣須要編譯上傳。 json
自定義的序列化類以下: app
public class CUSTOMEventSerializer implements EventSerializer { private final static Logger logger = LoggerFactory.getLogger(CUSTOMEventSerializer.class); private final String SPLITCHAR = "\001";//列分隔符 // for legacy reasons, by default, append a newline to each event written // out private final String APPEND_NEWLINE = "appendNewline"; private final boolean APPEND_NEWLINE_DFLT = true; private final OutputStream out; private final boolean appendNewline; private CUSTOMEventSerializer(OutputStream out, Context ctx) { this.appendNewline = ctx.getBoolean(APPEND_NEWLINE, APPEND_NEWLINE_DFLT); this.out = out; } @Override public boolean supportsReopen() { return true; } @Override public void afterCreate() { // noop } @Override public void afterReopen() { // noop } @Override public void beforeClose() { // noop } @Override public void write(Event e) throws IOException { // 獲取日誌信息 String log = new String(e.getBody(), StandardCharsets.UTF_8); logger.info("-----------logs-------" + log); // headers包含日誌中項目編號和host信息 Map<String, String> headers = e.getHeaders(); String parsedLog = parseJson2Value(log, headers); out.write(parsedLog.getBytes()); logger.info("-----------values-------" + parsedLog); logger.info("-----------valueSSSSSS-------" + parsedLog.getBytes()); out.write('\n'); } /** * * @Title: parseJson2Value * @Description: 解析出json日誌中的value。 * @param log json格式日誌 * @param headers event頭信息 * @return * @return String 解析後的日誌 * @throws */ private String parseJson2Value(String log, Map<String, String> headers) { log.replace("\\", "/"); String time = ""; String path = ""; Object value = ""; StringBuilder values = new StringBuilder(); ObjectMapper objectMapper = new ObjectMapper(); try { Map<String,Object> m = objectMapper.readValue(log, Map.class); for(String key:m.keySet()){ value = m.get(key); if (key.equals("uri")){ //解析訪問路徑 path = pasreUriToPath(value.toString()); } if(key.equals("time")){ time = value.toString().substring(10); } values.append(value).append(this.SPLITCHAR); } } catch (JsonParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (JsonMappingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } // 解析headers中的項目編號和服務host String pcode = headers.get("pcode"); String host = headers.get("host"); values.append(path).append(this.SPLITCHAR). append(pcode).append(this.SPLITCHAR). append(host).append(this.SPLITCHAR). append(time).append(this.SPLITCHAR); //value字符串 return values.toString(); } @Override public void flush() throws IOException { // noop } public static class Builder implements EventSerializer.Builder { @Override public EventSerializer build(Context context, OutputStream out) { CUSTOMEventSerializer s = new CUSTOMEventSerializer(out, context); return s; } } /** * 把請求uri轉換成具體的訪問路徑 * * @param uri 請求uri * @return 訪問路徑 */ protected String pasreUriToPath(String uri){ if(uri == null || "".equals(uri.trim())){ return uri; } int index = uri.indexOf("/"); if(index > -1){ uri = uri.substring(index); } index = uri.indexOf("?"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf(";"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf(" HTTP/1.1"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf("HTTP/1.1"); if(index > -1){ uri = uri.substring(0, index); } return uri; } }