log4j-core-2.7-sources.jar!/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.javajava
public void append(final LogEvent event) { if (event.getLoggerName().startsWith("org.apache.kafka")) { LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName()); } else { try { final Layout<? extends Serializable> layout = getLayout(); byte[] data; if (layout != null) { if (layout instanceof SerializedLayout) { final byte[] header = layout.getHeader(); final byte[] body = layout.toByteArray(event); data = new byte[header.length + body.length]; System.arraycopy(header, 0, data, 0, header.length); System.arraycopy(body, 0, data, header.length, body.length); } else { data = layout.toByteArray(event); } } else { data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8); } manager.send(data); } catch (final Exception e) { LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e); throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e); } } }
JsonLayout的話,是走data = layout.toByteArray(event);這一步,而toByteArray是調用
log4j-core-2.7-sources.jar!/org/apache/logging/log4j/core/layout/AbstractStringLayout.javaweb
/** * Formats the Log Event as a byte array. * * @param event The Log Event. * @return The formatted event as a byte array. */ @Override public byte[] toByteArray(final LogEvent event) { return getBytes(toSerializable(event)); } protected byte[] getBytes(final String s) { if (useCustomEncoding) { // rely on branch prediction to eliminate this check if false return StringEncoder.encodeSingleByteChars(s); } try { // LOG4J2-935: String.getBytes(String) gives better performance return s.getBytes(charsetName); } catch (final UnsupportedEncodingException e) { return s.getBytes(charset); } }
toSerializable是調用spring
log4j-core-2.7-sources.jar!/org/apache/logging/log4j/core/layout/AbstractJacksonLayout.javaapache
@Override public String toSerializable(final LogEvent event) { final StringBuilderWriter writer = new StringBuilderWriter(); try { toSerializable(event, writer); return writer.toString(); } catch (final IOException e) { // Should this be an ISE or IAE? LOGGER.error(e); return Strings.EMPTY; } } public void toSerializable(final LogEvent event, final Writer writer) throws JsonGenerationException, JsonMappingException, IOException { objectWriter.writeValue(writer, convertMutableToLog4jEvent(event)); writer.write(eol); markEvent(); } private static LogEvent convertMutableToLog4jEvent(final LogEvent event) { // TODO Jackson-based layouts have certain filters set up for Log4jLogEvent. // TODO Need to set up the same filters for MutableLogEvent but don't know how... // This is a workaround. return event instanceof MutableLogEvent ? ((MutableLogEvent) event).createMemento() : event; }
{ "timeMillis": 1508045813249, "thread": "http-nio-8080-exec-1", "level": "ERROR", "loggerName": "com.example.demo.controller.ErrorController", "message": "/ by zero", "thrown": { "commonElementCount": 0, "localizedMessage": "/ by zero", "message": "/ by zero", "name": "java.lang.ArithmeticException", "extendedStackTrace": [ { "class": "com.example.demo.controller.ErrorController", "method": "logError", "file": "ErrorController.java", "line": 20, "exact": true, "location": "classes/", "version": "?" }, { "class": "sun.reflect.NativeMethodAccessorImpl", "method": "invoke0", "file": "NativeMethodAccessorImpl.java", "line": -2, "exact": false, "location": "?", "version": "1.8.0_71" }, { "class": "sun.reflect.NativeMethodAccessorImpl", "method": "invoke", "file": "NativeMethodAccessorImpl.java", "line": 62, "exact": false, "location": "?", "version": "1.8.0_71" }, { "class": "sun.reflect.DelegatingMethodAccessorImpl", "method": "invoke", "file": "DelegatingMethodAccessorImpl.java", "line": 43, "exact": false, "location": "?", "version": "1.8.0_71" }, { "class": "java.lang.reflect.Method", "method": "invoke", "file": "Method.java", "line": 497, "exact": false, "location": "?", "version": "1.8.0_71" }, { "class": "org.springframework.web.method.support.InvocableHandlerMethod", "method": "doInvoke", "file": "InvocableHandlerMethod.java", "line": 205, "exact": true, "location": "spring-web-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.springframework.web.method.support.InvocableHandlerMethod", "method": "invokeForRequest", "file": "InvocableHandlerMethod.java", "line": 133, "exact": true, "location": "spring-web-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod", "method": "invokeAndHandle", "file": "ServletInvocableHandlerMethod.java", "line": 97, "exact": true, "location": "spring-webmvc-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter", "method": "invokeHandlerMethod", "file": "RequestMappingHandlerAdapter.java", "line": 827, "exact": true, "location": "spring-webmvc-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter", "method": "handleInternal", "file": "RequestMappingHandlerAdapter.java", "line": 738, "exact": true, "location": "spring-webmvc-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter", "method": "handle", "file": "AbstractHandlerMethodAdapter.java", "line": 85, "exact": true, "location": "spring-webmvc-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.springframework.web.servlet.DispatcherServlet", "method": "doDispatch", "file": "DispatcherServlet.java", "line": 967, "exact": true, "location": "spring-webmvc-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.springframework.web.servlet.DispatcherServlet", "method": "doService", "file": "DispatcherServlet.java", "line": 901, "exact": true, "location": "spring-webmvc-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.springframework.web.servlet.FrameworkServlet", "method": "processRequest", "file": "FrameworkServlet.java", "line": 970, "exact": true, "location": "spring-webmvc-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.springframework.web.servlet.FrameworkServlet", "method": "doGet", "file": "FrameworkServlet.java", "line": 861, "exact": true, "location": "spring-webmvc-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "javax.servlet.http.HttpServlet", "method": "service", "file": "HttpServlet.java", "line": 635, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.springframework.web.servlet.FrameworkServlet", "method": "service", "file": "FrameworkServlet.java", "line": 846, "exact": true, "location": "spring-webmvc-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "javax.servlet.http.HttpServlet", "method": "service", "file": "HttpServlet.java", "line": 742, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "internalDoFilter", "file": "ApplicationFilterChain.java", "line": 231, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "doFilter", "file": "ApplicationFilterChain.java", "line": 166, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.tomcat.websocket.server.WsFilter", "method": "doFilter", "file": "WsFilter.java", "line": 52, "exact": true, "location": "tomcat-embed-websocket-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "internalDoFilter", "file": "ApplicationFilterChain.java", "line": 193, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "doFilter", "file": "ApplicationFilterChain.java", "line": 166, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.springframework.boot.web.filter.ApplicationContextHeaderFilter", "method": "doFilterInternal", "file": "ApplicationContextHeaderFilter.java", "line": 55, "exact": true, "location": "spring-boot-1.5.7.RELEASE.jar", "version": "1.5.7.RELEASE" }, { "class": "org.springframework.web.filter.OncePerRequestFilter", "method": "doFilter", "file": "OncePerRequestFilter.java", "line": 107, "exact": true, "location": "spring-web-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "internalDoFilter", "file": "ApplicationFilterChain.java", "line": 193, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "doFilter", "file": "ApplicationFilterChain.java", "line": 166, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.springframework.boot.actuate.trace.WebRequestTraceFilter", "method": "doFilterInternal", "file": "WebRequestTraceFilter.java", "line": 110, "exact": true, "location": "spring-boot-actuator-1.5.7.RELEASE.jar", "version": "1.5.7.RELEASE" }, { "class": "org.springframework.web.filter.OncePerRequestFilter", "method": "doFilter", "file": "OncePerRequestFilter.java", "line": 107, "exact": true, "location": "spring-web-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "internalDoFilter", "file": "ApplicationFilterChain.java", "line": 193, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "doFilter", "file": "ApplicationFilterChain.java", "line": 166, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.springframework.web.filter.RequestContextFilter", "method": "doFilterInternal", "file": "RequestContextFilter.java", "line": 99, "exact": true, "location": "spring-web-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.springframework.web.filter.OncePerRequestFilter", "method": "doFilter", "file": "OncePerRequestFilter.java", "line": 107, "exact": true, "location": "spring-web-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "internalDoFilter", "file": "ApplicationFilterChain.java", "line": 193, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "doFilter", "file": "ApplicationFilterChain.java", "line": 166, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.springframework.web.filter.HttpPutFormContentFilter", "method": "doFilterInternal", "file": "HttpPutFormContentFilter.java", "line": 108, "exact": true, "location": "spring-web-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.springframework.web.filter.OncePerRequestFilter", "method": "doFilter", "file": "OncePerRequestFilter.java", "line": 107, "exact": true, "location": "spring-web-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "internalDoFilter", "file": "ApplicationFilterChain.java", "line": 193, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "doFilter", "file": "ApplicationFilterChain.java", "line": 166, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.springframework.web.filter.HiddenHttpMethodFilter", "method": "doFilterInternal", "file": "HiddenHttpMethodFilter.java", "line": 81, "exact": true, "location": "spring-web-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.springframework.web.filter.OncePerRequestFilter", "method": "doFilter", "file": "OncePerRequestFilter.java", "line": 107, "exact": true, "location": "spring-web-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "internalDoFilter", "file": "ApplicationFilterChain.java", "line": 193, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "doFilter", "file": "ApplicationFilterChain.java", "line": 166, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.springframework.web.filter.CharacterEncodingFilter", "method": "doFilterInternal", "file": "CharacterEncodingFilter.java", "line": 197, "exact": true, "location": "spring-web-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.springframework.web.filter.OncePerRequestFilter", "method": "doFilter", "file": "OncePerRequestFilter.java", "line": 107, "exact": true, "location": "spring-web-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "internalDoFilter", "file": "ApplicationFilterChain.java", "line": 193, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "doFilter", "file": "ApplicationFilterChain.java", "line": 166, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.springframework.boot.actuate.autoconfigure.MetricsFilter", "method": "doFilterInternal", "file": "MetricsFilter.java", "line": 106, "exact": true, "location": "spring-boot-actuator-1.5.7.RELEASE.jar", "version": "1.5.7.RELEASE" }, { "class": "org.springframework.web.filter.OncePerRequestFilter", "method": "doFilter", "file": "OncePerRequestFilter.java", "line": 107, "exact": true, "location": "spring-web-4.3.11.RELEASE.jar", "version": "4.3.11.RELEASE" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "internalDoFilter", "file": "ApplicationFilterChain.java", "line": 193, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.core.ApplicationFilterChain", "method": "doFilter", "file": "ApplicationFilterChain.java", "line": 166, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.core.StandardWrapperValve", "method": "invoke", "file": "StandardWrapperValve.java", "line": 198, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.core.StandardContextValve", "method": "invoke", "file": "StandardContextValve.java", "line": 96, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.authenticator.AuthenticatorBase", "method": "invoke", "file": "AuthenticatorBase.java", "line": 478, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.core.StandardHostValve", "method": "invoke", "file": "StandardHostValve.java", "line": 140, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.valves.ErrorReportValve", "method": "invoke", "file": "ErrorReportValve.java", "line": 80, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.core.StandardEngineValve", "method": "invoke", "file": "StandardEngineValve.java", "line": 87, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.catalina.connector.CoyoteAdapter", "method": "service", "file": "CoyoteAdapter.java", "line": 342, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.coyote.http11.Http11Processor", "method": "service", "file": "Http11Processor.java", "line": 799, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.coyote.AbstractProcessorLight", "method": "process", "file": "AbstractProcessorLight.java", "line": 66, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.coyote.AbstractProtocol$ConnectionHandler", "method": "process", "file": "AbstractProtocol.java", "line": 868, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.tomcat.util.net.NioEndpoint$SocketProcessor", "method": "doRun", "file": "NioEndpoint.java", "line": 1457, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "org.apache.tomcat.util.net.SocketProcessorBase", "method": "run", "file": "SocketProcessorBase.java", "line": 49, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "java.util.concurrent.ThreadPoolExecutor", "method": "runWorker", "file": "ThreadPoolExecutor.java", "line": 1142, "exact": true, "location": "?", "version": "1.8.0_71" }, { "class": "java.util.concurrent.ThreadPoolExecutor$Worker", "method": "run", "file": "ThreadPoolExecutor.java", "line": 617, "exact": true, "location": "?", "version": "1.8.0_71" }, { "class": "org.apache.tomcat.util.threads.TaskThread$WrappingRunnable", "method": "run", "file": "TaskThread.java", "line": 61, "exact": true, "location": "tomcat-embed-core-8.5.20.jar", "version": "8.5.20" }, { "class": "java.lang.Thread", "method": "run", "file": "Thread.java", "line": 745, "exact": true, "location": "?", "version": "1.8.0_71" } ] }, "endOfBatch": false, "loggerFqcn": "org.apache.logging.slf4j.Log4jLogger", "threadId": 26, "threadPriority": 5 }
能夠配置SerializedLayout或者PatternLayout,可是這兩種兼容性不如json好,前者是基於jdk的序列化,在log4j的2.9版本被廢棄了,不建議使用;而PatternLayout則受限於layout,不一樣的應用若是定義不一樣的PatternLayout,則解析舊程序就得相應修改。
若是以爲json傳輸文本太長,能夠壓縮一下或者基於pb,本身擴展定義一個layoutjson
在kafka的消費端去解析json的話,使用ObjectMapper來解析的話,segmentfault
Log4jLogEvent logEvent = mapper.readValue(value,Log4jLogEvent.class);
則報錯tomcat
com.fasterxml.jackson.databind.JsonMappingException: Can not find a deserializer for non-concrete Collection type [collection type; class org.apache.logging.log4j.ThreadContext$ContextStack, contains [simple type, class java.lang.String]] at [Source: {"timeMillis":1508047984078,"thread":"http-nio-8080-exec-6","level":"ERROR","loggerName":"com.example.demo.controller.ErrorController","message":"/ by zero","thrown":{"commonElementCount":0,"localizedMessage":"/ by zero","message":"/ by zero","name":"java.lang.ArithmeticException","extendedStackTrace":[{"class":"com.example.demo.controller.ErrorController","method":"logError","file":"ErrorController.java","line":20,"exact":true,"location":"classes/","version":"?"},{"class":"sun.reflect.NativeMethodAccessorImpl","method":"invoke0","file":"NativeMethodAccessorImpl.java","line":-2,"exact":false,"location":"?","version":"1.8.0_71"},{"class":"sun.reflect.NativeMethodAccessorImpl","method":"invoke","file":"NativeMethodAccessorImpl.java","line":62,"exact":false,"location":"?","version":"1.8.0_71"},{"class":"sun.reflect.DelegatingMethodAccessorImpl","method":"invoke","file":"DelegatingMethodAccessorImpl.java","line":43,"exact":false,"location":"?","version":"1.8.0_71"},{"class":"java.lang.reflect.Method","method":"invoke","file":"Method.java","line":497,"exact":false,"location":"?","version":"1.8.0_71"},{"class":"org.springframework.web.method.support.InvocableHandlerMethod","method":"doInvoke","file":"InvocableHandlerMethod.java","line":205,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.method.support.InvocableHandlerMethod","method":"invokeForRequest","file":"InvocableHandlerMethod.java","line":133,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod","method":"invokeAndHandle","file":"ServletInvocableHandlerMethod.java","line":97,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter","method":"invokeHandlerMethod","file":"RequestMappingHandlerAdapter.java","line":827,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter","method":"handleInternal","file":"RequestMappingHandlerAdapter.java","line":738,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter","method":"handle","file":"AbstractHandlerMethodAdapter.java","line":85,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.servlet.DispatcherServlet","method":"doDispatch","file":"DispatcherServlet.java","line":967,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.servlet.DispatcherServlet","method":"doService","file":"DispatcherServlet.java","line":901,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.servlet.FrameworkServlet","method":"processRequest","file":"FrameworkServlet.java","line":970,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.servlet.FrameworkServlet","method":"doGet","file":"FrameworkServlet.java","line":861,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"javax.servlet.http.HttpServlet","method":"service","file":"HttpServlet.java","line":635,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.springframework.web.servlet.FrameworkServlet","method":"service","file":"FrameworkServlet.java","line":846,"exact":true,"location":"spring-webmvc-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"javax.servlet.http.HttpServlet","method":"service","file":"HttpServlet.java","line":742,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":231,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.tomcat.websocket.server.WsFilter","method":"doFilter","file":"WsFilter.java","line":52,"exact":true,"location":"tomcat-embed-websocket-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":193,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.springframework.boot.web.filter.ApplicationContextHeaderFilter","method":"doFilterInternal","file":"ApplicationContextHeaderFilter.java","line":55,"exact":true,"location":"spring-boot-1.5.7.RELEASE.jar","version":"1.5.7.RELEASE"},{"class":"org.springframework.web.filter.OncePerRequestFilter","method":"doFilter","file":"OncePerRequestFilter.java","line":107,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":193,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.springframework.boot.actuate.trace.WebRequestTraceFilter","method":"doFilterInternal","file":"WebRequestTraceFilter.java","line":110,"exact":true,"location":"spring-boot-actuator-1.5.7.RELEASE.jar","version":"1.5.7.RELEASE"},{"class":"org.springframework.web.filter.OncePerRequestFilter","method":"doFilter","file":"OncePerRequestFilter.java","line":107,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":193,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.springframework.web.filter.RequestContextFilter","method":"doFilterInternal","file":"RequestContextFilter.java","line":99,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.filter.OncePerRequestFilter","method":"doFilter","file":"OncePerRequestFilter.java","line":107,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":193,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.springframework.web.filter.HttpPutFormContentFilter","method":"doFilterInternal","file":"HttpPutFormContentFilter.java","line":108,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.filter.OncePerRequestFilter","method":"doFilter","file":"OncePerRequestFilter.java","line":107,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":193,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.springframework.web.filter.HiddenHttpMethodFilter","method":"doFilterInternal","file":"HiddenHttpMethodFilter.java","line":81,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.filter.OncePerRequestFilter","method":"doFilter","file":"OncePerRequestFilter.java","line":107,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":193,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.springframework.web.filter.CharacterEncodingFilter","method":"doFilterInternal","file":"CharacterEncodingFilter.java","line":197,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.springframework.web.filter.OncePerRequestFilter","method":"doFilter","file":"OncePerRequestFilter.java","line":107,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":193,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.springframework.boot.actuate.autoconfigure.MetricsFilter","method":"doFilterInternal","file":"MetricsFilter.java","line":106,"exact":true,"location":"spring-boot-actuator-1.5.7.RELEASE.jar","version":"1.5.7.RELEASE"},{"class":"org.springframework.web.filter.OncePerRequestFilter","method":"doFilter","file":"OncePerRequestFilter.java","line":107,"exact":true,"location":"spring-web-4.3.11.RELEASE.jar","version":"4.3.11.RELEASE"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"internalDoFilter","file":"ApplicationFilterChain.java","line":193,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.ApplicationFilterChain","method":"doFilter","file":"ApplicationFilterChain.java","line":166,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.StandardWrapperValve","method":"invoke","file":"StandardWrapperValve.java","line":198,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.StandardContextValve","method":"invoke","file":"StandardContextValve.java","line":96,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.authenticator.AuthenticatorBase","method":"invoke","file":"AuthenticatorBase.java","line":478,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.StandardHostValve","method":"invoke","file":"StandardHostValve.java","line":140,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.valves.ErrorReportValve","method":"invoke","file":"ErrorReportValve.java","line":80,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.core.StandardEngineValve","method":"invoke","file":"StandardEngineValve.java","line":87,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.catalina.connector.CoyoteAdapter","method":"service","file":"CoyoteAdapter.java","line":342,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.coyote.http11.Http11Processor","method":"service","file":"Http11Processor.java","line":799,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.coyote.AbstractProcessorLight","method":"process","file":"AbstractProcessorLight.java","line":66,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.coyote.AbstractProtocol$ConnectionHandler","method":"process","file":"AbstractProtocol.java","line":868,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.tomcat.util.net.NioEndpoint$SocketProcessor","method":"doRun","file":"NioEndpoint.java","line":1457,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"org.apache.tomcat.util.net.SocketProcessorBase","method":"run","file":"SocketProcessorBase.java","line":49,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","file":"ThreadPoolExecutor.java","line":1142,"exact":true,"location":"?","version":"1.8.0_71"},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","file":"ThreadPoolExecutor.java","line":617,"exact":true,"location":"?","version":"1.8.0_71"},{"class":"org.apache.tomcat.util.threads.TaskThread$WrappingRunnable","method":"run","file":"TaskThread.java","line":61,"exact":true,"location":"tomcat-embed-core-8.5.20.jar","version":"8.5.20"},{"class":"java.lang.Thread","method":"run","file":"Thread.java","line":745,"exact":true,"location":"?","version":"1.8.0_71"}]},"endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":31,"threadPriority":5,"source":{"class":"com.example.demo.controller.ErrorController","method":"logError","file":"ErrorController.java","line":22}} ; line: 1, column: 1] at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:305) at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:268) at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:244) at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:142) at com.fasterxml.jackson.databind.DeserializationContext.findNonContextualValueDeserializer(DeserializationContext.java:466) at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.resolve(BeanDeserializerBase.java:479) at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:293) at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:244) at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:142) at com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:476) at com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:3915) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3810) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2858) at com.example.demo.KafkaLog4jDemoApplicationTests.lambda$consumeErrorLog$0(KafkaLog4jDemoApplicationTests.java:54) at java.lang.Iterable.forEach(Iterable.java:75) at com.example.demo.KafkaLog4jDemoApplicationTests.consumeErrorLog(KafkaLog4jDemoApplicationTests.java:47) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75) at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86) at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) Caused by: java.lang.IllegalArgumentException: Can not find a deserializer for non-concrete Collection type [collection type; class org.apache.logging.log4j.ThreadContext$ContextStack, contains [simple type, class java.lang.String]] at com.fasterxml.jackson.databind.deser.BasicDeserializerFactory.createCollectionDeserializer(BasicDeserializerFactory.java:1021) at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:391) at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:349) at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:264) ... 46 more
所以建議自定義一個bean來解析對應的json,或者基於JSONObject來解析便可。websocket
public class Log4jLogEventJson { private long timeMillis; private String thread; private String level; private String loggerName; private String message; private Thrown thrown; static class Thrown { private int commonElementCount; private String localizedMessage; private String message; private String name; @JSONField(name = "extendedStackTrace") private List<ExtendedStackTrace> extendedStackTraces; private boolean endOfBatch; private String loggerFqcn; private JSONObject contextMap; private int threadid; private int threadPriority; private Source source; public int getCommonElementCount() { return commonElementCount; } public void setCommonElementCount(int commonElementCount) { this.commonElementCount = commonElementCount; } public String getLocalizedMessage() { return localizedMessage; } public void setLocalizedMessage(String localizedMessage) { this.localizedMessage = localizedMessage; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } public String getName() { return name; } public void setName(String name) { this.name = name; } public List<ExtendedStackTrace> getExtendedStackTraces() { return extendedStackTraces; } public void setExtendedStackTraces(List<ExtendedStackTrace> extendedStackTraces) { this.extendedStackTraces = extendedStackTraces; } public boolean isEndOfBatch() { return endOfBatch; } public void setEndOfBatch(boolean endOfBatch) { this.endOfBatch = endOfBatch; } public String getLoggerFqcn() { return loggerFqcn; } public void setLoggerFqcn(String loggerFqcn) { this.loggerFqcn = loggerFqcn; } public JSONObject getContextMap() { return contextMap; } public void setContextMap(JSONObject contextMap) { this.contextMap = contextMap; } public int getThreadid() { return threadid; } public void setThreadid(int threadid) { this.threadid = threadid; } public int getThreadPriority() { return threadPriority; } public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; } public Source getSource() { return source; } public void setSource(Source source) { this.source = source; } } static class ExtendedStackTrace { @JSONField(name ="class") private String clz; private String method; private String file; private int line; private boolean exact; private String location; private String version; public String getClz() { return clz; } public void setClz(String clz) { this.clz = clz; } public String getMethod() { return method; } public void setMethod(String method) { this.method = method; } public String getFile() { return file; } public void setFile(String file) { this.file = file; } public int getLine() { return line; } public void setLine(int line) { this.line = line; } public boolean isExact() { return exact; } public void setExact(boolean exact) { this.exact = exact; } public String getLocation() { return location; } public void setLocation(String location) { this.location = location; } public String getVersion() { return version; } public void setVersion(String version) { this.version = version; } } static class Source { @JSONField(name = "class") private String clz; private String method; private String file; private int line; public String getClz() { return clz; } public void setClz(String clz) { this.clz = clz; } public String getMethod() { return method; } public void setMethod(String method) { this.method = method; } public String getFile() { return file; } public void setFile(String file) { this.file = file; } public int getLine() { return line; } public void setLine(int line) { this.line = line; } } public String stacktrace(){ if(thrown == null){ return ""; } StringBuilder builder = new StringBuilder(); builder.append(thrown.getName()); builder.append(": "); builder.append(thrown.getMessage()); builder.append("\n"); if(thrown.getExtendedStackTraces() == null){ return builder.toString(); } thrown.getExtendedStackTraces().stream().forEach(e -> { builder.append(" at "); builder.append(e.getClz()); builder.append("."); builder.append(e.getMethod()); builder.append("("); builder.append(e.getFile()); builder.append(":"); builder.append(e.getLine()); builder.append(")"); if(e.isExact()){ builder.append(" ["); }else{ builder.append(" ~["); } builder.append(e.getLocation()); builder.append(":"); builder.append(e.getVersion()); builder.append("]"); builder.append("\n"); }); return builder.toString(); } public long getTimeMillis() { return timeMillis; } public void setTimeMillis(long timeMillis) { this.timeMillis = timeMillis; } public String getThread() { return thread; } public void setThread(String thread) { this.thread = thread; } public String getLevel() { return level; } public void setLevel(String level) { this.level = level; } public String getLoggerName() { return loggerName; } public void setLoggerName(String loggerName) { this.loggerName = loggerName; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } public Thrown getThrown() { return thrown; } public void setThrown(Thrown thrown) { this.thrown = thrown; } @Override public String toString() { return JSON.toJSONString(this); } }
KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> source = builder.stream("error-log"); KStream<String, Log4jLogEventJson> beanStream = source.map(new KeyValueMapper<String, String, KeyValue<String, Log4jLogEventJson>>() { @Override public KeyValue<String, Log4jLogEventJson> apply(String key, String value) { Log4jLogEventJson bean = JSON.parseObject(value,Log4jLogEventJson.class); return new KeyValue<>(bean.getLoggerName(), bean); } }); GenericJsonSerde<ErrorLogStats> statsGenericJsonSerde = new GenericJsonSerde<>(ErrorLogStats.class); GenericJsonSerde<Log4jLogEventJson> log4jLogEventJsonGenericJsonSerde = new GenericJsonSerde<>(Log4jLogEventJson.class); beanStream.groupByKey(Serdes.String(),log4jLogEventJsonGenericJsonSerde) .aggregate(ErrorLogStats::new, (key, value, stats) -> { System.out.println("key:"+key.getClass()); System.out.println("value:"+value.getClass()); return stats.add(value); }, TimeWindows.of(10000).advanceBy(10000), statsGenericJsonSerde, "aggregate") .foreach((k,v) -> { LOGGER.info("key:{},start:{},end:{},value:{}",k.key(),k.window().start(),k.window().end(),v.errors.size()); //這裏進行報警便可 });
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:767) - stream-thread [StreamThread-1] Committing all tasks because the commit interval 10000ms has elapsed 2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 0_0 2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 0_1 2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 1_0 2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 0_2 2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 1_1 2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 1_2 key:class java.lang.String value:class com.example.demo.Log4jLogEventJson key:class java.lang.String value:class com.example.demo.Log4jLogEventJson key:class java.lang.String value:class com.example.demo.Log4jLogEventJson key:class java.lang.String value:class com.example.demo.Log4jLogEventJson 2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:767) - stream-thread [StreamThread-1] Committing all tasks because the commit interval 10000ms has elapsed 2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 0_0 2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 0_1 2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 1_0 2017-10-15 17:44:29 [StreamThread-1] (ErrorLogStreamTest.java:90) - key:com.example.demo.controller.DemoController,start:1508060660000,end:1508060670000,value:3 2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 0_2 2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 1_1 2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805) - stream-thread [StreamThread-1] Committing task StreamTask 1_2 2017-10-15 17:44:29 [StreamThread-1] (ErrorLogStreamTest.java:90) - key:com.example.demo.controller.ErrorController,start:1508060660000,end:1508060670000,value:1
自從kafka有了stream以後,感受能夠減小不少技術棧了,好比我能夠不用學storm或者spark,就能夠直接在kakfa上進行流式操做。關於kafka stream如何進行分佈式呢,後續再研究下。mvc