本文主要研究一下dubbo的AccessLogFilterjava
dubbo-2.7.3/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/AccessLogFilter.javagit
@Activate(group = PROVIDER, value = ACCESS_LOG_KEY) public class AccessLogFilter implements Filter { private static final Logger logger = LoggerFactory.getLogger(AccessLogFilter.class); private static final String ACCESS_LOG_KEY = "dubbo.accesslog"; private static final int LOG_MAX_BUFFER = 5000; private static final long LOG_OUTPUT_INTERVAL = 5000; private static final String FILE_DATE_FORMAT = "yyyyMMdd"; // It's safe to declare it as singleton since it runs on single thread only private static final DateFormat FILE_NAME_FORMATTER = new SimpleDateFormat(FILE_DATE_FORMAT); private static final Map<String, Set<AccessLogData>> LOG_ENTRIES = new ConcurrentHashMap<String, Set<AccessLogData>>(); private static final ScheduledExecutorService LOG_SCHEDULED = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-Access-Log", true)); /** * Default constructor initialize demon thread for writing into access log file with names with access log key * defined in url <b>accesslog</b> */ public AccessLogFilter() { LOG_SCHEDULED.scheduleWithFixedDelay(this::writeLogToFile, LOG_OUTPUT_INTERVAL, LOG_OUTPUT_INTERVAL, TimeUnit.MILLISECONDS); } /** * This method logs the access log for service method invocation call. * * @param invoker service * @param inv Invocation service method. * @return Result from service method. * @throws RpcException */ @Override public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException { try { String accessLogKey = invoker.getUrl().getParameter(ACCESS_LOG_KEY); if (ConfigUtils.isNotEmpty(accessLogKey)) { AccessLogData logData = buildAccessLogData(invoker, inv); log(accessLogKey, logData); } } catch (Throwable t) { logger.warn("Exception in AccessLogFilter of service(" + invoker + " -> " + inv + ")", t); } return invoker.invoke(inv); } private void log(String accessLog, AccessLogData accessLogData) { Set<AccessLogData> logSet = LOG_ENTRIES.computeIfAbsent(accessLog, k -> new ConcurrentHashSet<>()); if (logSet.size() < LOG_MAX_BUFFER) { logSet.add(accessLogData); } else { //TODO we needs use force writing to file so that buffer gets clear and new log can be written. logger.warn("AccessLog buffer is full skipping buffer "); } } private void writeLogToFile() { if (!LOG_ENTRIES.isEmpty()) { for (Map.Entry<String, Set<AccessLogData>> entry : LOG_ENTRIES.entrySet()) { try { String accessLog = entry.getKey(); Set<AccessLogData> logSet = entry.getValue(); if (ConfigUtils.isDefault(accessLog)) { processWithServiceLogger(logSet); } else { File file = new File(accessLog); createIfLogDirAbsent(file); if (logger.isDebugEnabled()) { logger.debug("Append log to " + accessLog); } renameFile(file); processWithAccessKeyLogger(logSet, file); } } catch (Exception e) { logger.error(e.getMessage(), e); } } } } private void processWithAccessKeyLogger(Set<AccessLogData> logSet, File file) throws IOException { try (FileWriter writer = new FileWriter(file, true)) { for (Iterator<AccessLogData> iterator = logSet.iterator(); iterator.hasNext(); iterator.remove()) { writer.write(iterator.next().getLogMessage()); writer.write("\r\n"); } writer.flush(); } } private AccessLogData buildAccessLogData(Invoker<?> invoker, Invocation inv) { RpcContext context = RpcContext.getContext(); AccessLogData logData = AccessLogData.newLogData(); logData.setServiceName(invoker.getInterface().getName()); logData.setMethodName(inv.getMethodName()); logData.setVersion(invoker.getUrl().getParameter(VERSION_KEY)); logData.setGroup(invoker.getUrl().getParameter(GROUP_KEY)); logData.setInvocationTime(new Date()); logData.setTypes(inv.getParameterTypes()); logData.setArguments(inv.getArguments()); return logData; } private void processWithServiceLogger(Set<AccessLogData> logSet) { for (Iterator<AccessLogData> iterator = logSet.iterator(); iterator.hasNext(); iterator.remove()) { AccessLogData logData = iterator.next(); LoggerFactory.getLogger(ACCESS_LOG_KEY + "." + logData.getServiceName()).info(logData.getLogMessage()); } } private void createIfLogDirAbsent(File file) { File dir = file.getParentFile(); if (null != dir && !dir.exists()) { dir.mkdirs(); } } private void renameFile(File file) { if (file.exists()) { String now = FILE_NAME_FORMATTER.format(new Date()); String last = FILE_NAME_FORMATTER.format(new Date(file.lastModified())); if (!now.equals(last)) { File archive = new File(file.getAbsolutePath() + "." + last); file.renameTo(archive); } } } }
dubbo-2.7.3/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/AccessLogFilterTest.javagithub
public class AccessLogFilterTest { Filter accessLogFilter = new AccessLogFilter(); // Test filter won't throw an exception @Test public void testInvokeException() { Invoker<AccessLogFilterTest> invoker = new MyInvoker<AccessLogFilterTest>(null); Invocation invocation = new MockInvocation(); LogUtil.start(); accessLogFilter.invoke(invoker, invocation); assertEquals(1, LogUtil.findMessage("Exception in AccessLogFilter of service")); LogUtil.stop(); } // TODO how to assert thread action @Test public void testDefault() { URL url = URL.valueOf("test://test:11/test?accesslog=true&group=dubbo&version=1.1"); Invoker<AccessLogFilterTest> invoker = new MyInvoker<AccessLogFilterTest>(url); Invocation invocation = new MockInvocation(); accessLogFilter.invoke(invoker, invocation); } @Test public void testCustom() { URL url = URL.valueOf("test://test:11/test?accesslog=custom-access.log"); Invoker<AccessLogFilterTest> invoker = new MyInvoker<AccessLogFilterTest>(url); Invocation invocation = new MockInvocation(); accessLogFilter.invoke(invoker, invocation); } }
dubbo-2.7.3/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/AccessLogData.javaapache
public final class AccessLogData { private static final String MESSAGE_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; private static final DateFormat MESSAGE_DATE_FORMATTER = new SimpleDateFormat(MESSAGE_DATE_FORMAT); private static final String VERSION = "version"; private static final String GROUP = "group"; private static final String SERVICE = "service"; private static final String METHOD_NAME = "method-name"; private static final String INVOCATION_TIME = "invocation-time"; private static final String TYPES = "types"; private static final String ARGUMENTS = "arguments"; private static final String REMOTE_HOST = "remote-host"; private static final String REMOTE_PORT = "remote-port"; private static final String LOCAL_HOST = "localhost"; private static final String LOCAL_PORT = "local-port"; /** * This is used to store log data in key val format. */ private Map<String, Object> data; /** * Default constructor. */ private AccessLogData() { RpcContext context = RpcContext.getContext(); data = new HashMap<>(); setLocalHost(context.getLocalHost()); setLocalPort(context.getLocalPort()); setRemoteHost(context.getRemoteHost()); setRemotePort(context.getRemotePort()); } //...... public String getLogMessage() { StringBuilder sn = new StringBuilder(); sn.append("[") .append(MESSAGE_DATE_FORMATTER.format(getInvocationTime())) .append("] ") .append(get(REMOTE_HOST)) .append(":") .append(get(REMOTE_PORT)) .append(" -> ") .append(get(LOCAL_HOST)) .append(":") .append(get(LOCAL_PORT)) .append(" - "); String group = get(GROUP) != null ? get(GROUP).toString() : ""; if (StringUtils.isNotEmpty(group.toString())) { sn.append(group).append("/"); } sn.append(get(SERVICE)); String version = get(VERSION) != null ? get(VERSION).toString() : ""; if (StringUtils.isNotEmpty(version.toString())) { sn.append(":").append(version); } sn.append(" "); sn.append(get(METHOD_NAME)); sn.append("("); Class<?>[] types = get(TYPES) != null ? (Class<?>[]) get(TYPES) : new Class[0]; boolean first = true; for (Class<?> type : types) { if (first) { first = false; } else { sn.append(","); } sn.append(type.getName()); } sn.append(") "); Object[] args = get(ARGUMENTS) != null ? (Object[]) get(ARGUMENTS) : null; if (args != null && args.length > 0) { sn.append(JSON.toJSONString(args)); } return sn.toString(); } //...... }