28 友盟大數據--flume源碼查看分析- ExecSource-參照主機名攔截器HostInterceptor ---寫限速攔截器

 1 //
 2 // Source code recreated from a .class file by IntelliJ IDEA  3 // (powered by Fernflower decompiler)  4 //  5 
 6 package org.apache.flume.source;  7 
 8 import com.google.common.base.Preconditions;  9 import com.google.common.util.concurrent.ThreadFactoryBuilder;  10 import java.io.BufferedReader;  11 import java.io.IOException;  12 import java.io.InputStreamReader;  13 import java.nio.charset.Charset;  14 import java.util.ArrayList;  15 import java.util.List;  16 import java.util.concurrent.ExecutorService;  17 import java.util.concurrent.Executors;  18 import java.util.concurrent.Future;  19 import java.util.concurrent.ScheduledExecutorService;  20 import java.util.concurrent.ScheduledFuture;  21 import java.util.concurrent.TimeUnit;  22 import org.apache.flume.Context;  23 import org.apache.flume.Event;  24 import org.apache.flume.EventDrivenSource;  25 import org.apache.flume.SystemClock;  26 import org.apache.flume.channel.ChannelProcessor;  27 import org.apache.flume.conf.Configurable;  28 import org.apache.flume.event.EventBuilder;  29 import org.apache.flume.instrumentation.SourceCounter;  30 import org.slf4j.Logger;  31 import org.slf4j.LoggerFactory;  32 
 33 public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable {  34     private static final Logger logger = LoggerFactory.getLogger(ExecSource.class);  35     private String shell;  36     private String command;  37     private SourceCounter sourceCounter;  38     private ExecutorService executor;  39     private Future<?> runnerFuture;  40     private long restartThrottle;  41     private boolean restart;  42     private boolean logStderr;  43     private Integer bufferCount;  44     private long batchTimeout;  45     private ExecSource.ExecRunnable runner;  46     private Charset charset;  47 
 48     public ExecSource() {  49  }  50 
 51     public void start() {  52         logger.info("Exec source starting with command:{}", this.command);  53         this.executor = Executors.newSingleThreadExecutor();  54         this.runner = new ExecSource.ExecRunnable(this.shell, this.command, this.getChannelProcessor(), this.sourceCounter, this.restart, this.restartThrottle, this.logStderr, this.bufferCount, this.batchTimeout, this.charset);  55         this.runnerFuture = this.executor.submit(this.runner);  56         this.sourceCounter.start();  57         super.start();  58         logger.debug("Exec source started");  59  }  60 
 61     public void stop() {  62         logger.info("Stopping exec source with command:{}", this.command);  63         if (this.runner != null) {  64             this.runner.setRestart(false);  65             this.runner.kill();  66  }  67 
 68         if (this.runnerFuture != null) {  69             logger.debug("Stopping exec runner");  70             this.runnerFuture.cancel(true);  71             logger.debug("Exec runner stopped");  72  }  73 
 74         this.executor.shutdown();  75 
 76         while(!this.executor.isTerminated()) {  77             logger.debug("Waiting for exec executor service to stop");  78 
 79             try {  80                 this.executor.awaitTermination(500L, TimeUnit.MILLISECONDS);  81             } catch (InterruptedException var2) {  82                 logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting.");  83  Thread.currentThread().interrupt();  84  }  85  }  86 
 87         this.sourceCounter.stop();  88         super.stop();  89         logger.debug("Exec source with command:{} stopped. Metrics:{}", this.command, this.sourceCounter);  90  }  91 
 92     public void configure(Context context) {  93         this.command = context.getString("command");  94         Preconditions.checkState(this.command != null, "The parameter command must be specified");  95         this.restartThrottle = context.getLong("restartThrottle", 10000L);  96         this.restart = context.getBoolean("restart", false);  97         this.logStderr = context.getBoolean("logStdErr", false);  98         this.bufferCount = context.getInteger("batchSize", 20);  99         this.batchTimeout = context.getLong("batchTimeout", 3000L); 100         this.charset = Charset.forName(context.getString("charset", "UTF-8")); 101         this.shell = context.getString("shell", (String)null); 102         if (this.sourceCounter == null) { 103             this.sourceCounter = new SourceCounter(this.getName()); 104  } 105 
106  } 107 
108     private static class StderrReader extends Thread { 109         private BufferedReader input; 110         private boolean logStderr; 111 
112         protected StderrReader(BufferedReader input, boolean logStderr) { 113             this.input = input; 114             this.logStderr = logStderr; 115  } 116 
117         public void run() { 118             try { 119                 int i = 0; 120                 String line = null; 121 
122                 while((line = this.input.readLine()) != null) { 123                     if (this.logStderr) { 124                         Logger var10000 = ExecSource.logger; 125                         ++i; 126                         var10000.info("StderrLogger[{}] = '{}'", i, line); 127  } 128  } 129             } catch (IOException var11) { 130                 ExecSource.logger.info("StderrLogger exiting", var11); 131             } finally { 132                 try { 133                     if (this.input != null) { 134                         this.input.close(); 135  } 136                 } catch (IOException var10) { 137                     ExecSource.logger.error("Failed to close stderr reader for exec source", var10); 138  } 139 
140  } 141 
142  } 143  } 144 
145     private static class ExecRunnable implements Runnable { 146         private final String shell; 147         private final String command; 148         private final ChannelProcessor channelProcessor; 149         private final SourceCounter sourceCounter; 150         private volatile boolean restart; 151         private final long restartThrottle; 152         private final int bufferCount; 153         private long batchTimeout; 154         private final boolean logStderr; 155         private final Charset charset; 156         private Process process = null; 157         private SystemClock systemClock = new SystemClock(); 158         private Long lastPushToChannel; 159  ScheduledExecutorService timedFlushService; 160         ScheduledFuture<?> future; 161 
162         public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor, SourceCounter sourceCounter, boolean restart, long restartThrottle, boolean logStderr, int bufferCount, long batchTimeout, Charset charset) { 163             this.lastPushToChannel = this.systemClock.currentTimeMillis(); 164             this.command = command; 165             this.channelProcessor = channelProcessor; 166             this.sourceCounter = sourceCounter; 167             this.restartThrottle = restartThrottle; 168             this.bufferCount = bufferCount; 169             this.batchTimeout = batchTimeout; 170             this.restart = restart; 171             this.logStderr = logStderr; 172             this.charset = charset; 173             this.shell = shell; 174  } 175 
176         public void run() { 177             do { 178                 String exitCode = "unknown"; 179                 BufferedReader reader = null; 180                 String line = null; 181                 final List<Event> eventList = new ArrayList(); 182                 this.timedFlushService = Executors.newSingleThreadScheduledExecutor((new ThreadFactoryBuilder()).setNameFormat("timedFlushExecService" + Thread.currentThread().getId() + "-%d").build()); 183 
184                 try { 185  String[] commandArgs; 186                     if (this.shell != null) { 187                         commandArgs = formulateShellCommand(this.shell, this.command); 188                         this.process = Runtime.getRuntime().exec(commandArgs); 189                     } else { 190                         commandArgs = this.command.split("\\s+"); 191                         this.process = (new ProcessBuilder(commandArgs)).start(); 192  } 193 
194                     reader = new BufferedReader(new InputStreamReader(this.process.getInputStream(), this.charset)); 195                     ExecSource.StderrReader stderrReader = new ExecSource.StderrReader(new BufferedReader(new InputStreamReader(this.process.getErrorStream(), this.charset)), this.logStderr); 196                     stderrReader.setName("StderrReader-[" + this.command + "]"); 197                     stderrReader.setDaemon(true); 198  stderrReader.start(); 199                     this.future = this.timedFlushService.scheduleWithFixedDelay(new Runnable() { 200                         public void run() { 201                             try { 202                                 List var1 = eventList; 203                                 synchronized(eventList) { 204                                     if (!eventList.isEmpty() && ExecRunnable.this.timeout()) { 205                                         ExecRunnable.this.flushEventBatch(eventList); 206  } 207  } 208                             } catch (Exception var4) { 209                                 ExecSource.logger.error("Exception occured when processing event batch", var4); 210                                 if (var4 instanceof InterruptedException) { 211  Thread.currentThread().interrupt(); 212  } 213  } 214 
215  } 216                     }, this.batchTimeout, this.batchTimeout, TimeUnit.MILLISECONDS); 217 
218                     while((line = reader.readLine()) != null) { 219                         synchronized(eventList) { 220                             this.sourceCounter.incrementEventReceivedCount(); 221                             eventList.add(EventBuilder.withBody(line.getBytes(this.charset))); 222                             if (eventList.size() >= this.bufferCount || this.timeout()) { 223                                 this.flushEventBatch(eventList); 224  } 225  } 226  } 227 
228                     synchronized(eventList) { 229                         if (!eventList.isEmpty()) { 230                             this.flushEventBatch(eventList); 231  } 232  } 233                 } catch (Exception var23) { 234                     ExecSource.logger.error("Failed while running command: " + this.command, var23); 235                     if (var23 instanceof InterruptedException) { 236  Thread.currentThread().interrupt(); 237  } 238                 } finally { 239                     if (reader != null) { 240                         try { 241  reader.close(); 242                         } catch (IOException var19) { 243                             ExecSource.logger.error("Failed to close reader for exec source", var19); 244  } 245  } 246 
247                     exitCode = String.valueOf(this.kill()); 248  } 249 
250                 if (this.restart) { 251                     ExecSource.logger.info("Restarting in {}ms, exit code {}", this.restartThrottle, exitCode); 252 
253                     try { 254                         Thread.sleep(this.restartThrottle); 255                     } catch (InterruptedException var20) { 256  Thread.currentThread().interrupt(); 257  } 258                 } else { 259                     ExecSource.logger.info("Command [" + this.command + "] exited with " + exitCode); 260  } 261             } while(this.restart); 262 
263  } 264 
265         private void flushEventBatch(List<Event> eventList) { 266             this.channelProcessor.processEventBatch(eventList);//通道處理器 詳細見下面代碼 267             this.sourceCounter.addToEventAcceptedCount((long)eventList.size()); 268  eventList.clear(); 269             this.lastPushToChannel = this.systemClock.currentTimeMillis(); 270  } 271 
272         private boolean timeout() { 273             return this.systemClock.currentTimeMillis() - this.lastPushToChannel >= this.batchTimeout; 274  } 275 
276         private static String[] formulateShellCommand(String shell, String command) { 277             String[] shellArgs = shell.split("\\s+"); 278             String[] result = new String[shellArgs.length + 1]; 279             System.arraycopy(shellArgs, 0, result, 0, shellArgs.length); 280             result[shellArgs.length] = command; 281             return result; 282  } 283 
284         public int kill() { 285             if (this.process != null) { 286                 Process var1 = this.process; 287                 synchronized(this.process) { 288                     this.process.destroy(); 289 
290                     int var10000; 291                     try { 292                         int exitValue = this.process.waitFor(); 293                         if (this.future != null) { 294                             this.future.cancel(true); 295  } 296 
297                         if (this.timedFlushService != null) { 298                             this.timedFlushService.shutdown(); 299 
300                             while(!this.timedFlushService.isTerminated()) { 301                                 try { 302                                     this.timedFlushService.awaitTermination(500L, TimeUnit.MILLISECONDS); 303                                 } catch (InterruptedException var5) { 304                                     ExecSource.logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting."); 305  Thread.currentThread().interrupt(); 306  } 307  } 308  } 309 
310                         var10000 = exitValue; 311                     } catch (InterruptedException var6) { 312  Thread.currentThread().interrupt(); 313                         return -2147483648; 314  } 315 
316                     return var10000; 317  } 318             } else { 319                 return -1073741824; 320  } 321  } 322 
323         public void setRestart(boolean restart) { 324             this.restart = restart; 325  } 326  } 327 }

 

ChannelProcessor  processEventBatch()java

 1  public void processEventBatch(List<Event> events) {  2         Preconditions.checkNotNull(events, "Event list must not be null");  3         events = this.interceptorChain.intercept(events);//攔截器鏈---攔截事件  4         Map<Channel, List<Event>> reqChannelQueue = new LinkedHashMap();  5         Map<Channel, List<Event>> optChannelQueue = new LinkedHashMap();  6         Iterator i$ = events.iterator();  7 
 8  List batch;  9  Iterator i$;  10         while(i$.hasNext()) {  11             Event event = (Event)i$.next();  12             List<Channel> reqChannels = this.selector.getRequiredChannels(event);  13 
 14  Object eventQueue;  15             for(Iterator i$ = reqChannels.iterator(); i$.hasNext(); ((List)eventQueue).add(event)) {  16                 Channel ch = (Channel)i$.next();  17                 eventQueue = (List)reqChannelQueue.get(ch);  18                 if (eventQueue == null) {  19                     eventQueue = new ArrayList();  20  reqChannelQueue.put(ch, eventQueue);  21  }  22  }  23 
 24             batch = this.selector.getOptionalChannels(event);  25 
 26  Object eventQueue;  27             for(i$ = batch.iterator(); i$.hasNext(); ((List)eventQueue).add(event)) {  28                 Channel ch = (Channel)i$.next();  29                 eventQueue = (List)optChannelQueue.get(ch);  30                 if (eventQueue == null) {  31                     eventQueue = new ArrayList();  32  optChannelQueue.put(ch, eventQueue);  33  }  34  }  35  }  36 
 37         i$ = reqChannelQueue.keySet().iterator();  38 
 39  Channel optChannel;  40  Transaction tx;  41  Event event;  42         while(i$.hasNext()) {  43             optChannel = (Channel)i$.next();  44             tx = optChannel.getTransaction();  45             Preconditions.checkNotNull(tx, "Transaction object must not be null");  46 
 47             try {  48  tx.begin();  49                 batch = (List)reqChannelQueue.get(optChannel);  50                 i$ = batch.iterator();  51 
 52                 while(i$.hasNext()) {  53                     event = (Event)i$.next();  54  optChannel.put(event);  55  }  56 
 57  tx.commit();  58             } catch (Throwable var23) {  59  tx.rollback();  60                 if (var23 instanceof Error) {  61                     LOG.error("Error while writing to required channel: " + optChannel, var23);  62                     throw (Error)var23;  63  }  64 
 65                 if (var23 instanceof ChannelException) {  66                     throw (ChannelException)var23;  67  }  68 
 69                 throw new ChannelException("Unable to put batch on required channel: " + optChannel, var23);  70             } finally {  71                 if (tx != null) {  72  tx.close();  73  }  74 
 75  }  76  }  77 
 78         i$ = optChannelQueue.keySet().iterator();  79 
 80         while(i$.hasNext()) {  81             optChannel = (Channel)i$.next();  82             tx = optChannel.getTransaction();  83             Preconditions.checkNotNull(tx, "Transaction object must not be null");  84 
 85             try {  86  tx.begin();  87                 batch = (List)optChannelQueue.get(optChannel);  88                 i$ = batch.iterator();  89 
 90                 while(i$.hasNext()) {  91                     event = (Event)i$.next();  92  optChannel.put(event);  93  }  94 
 95  tx.commit();  96             } catch (Throwable var21) {  97  tx.rollback();  98                 LOG.error("Unable to put batch on optional channel: " + optChannel, var21);  99                 if (var21 instanceof Error) { 100                     throw (Error)var21; 101  } 102             } finally { 103                 if (tx != null) { 104  tx.close(); 105  } 106 
107  } 108  } 109 
110     }

參照主機名攔截器HostInterceptor ---寫限速攔截器  實現  Interceptor shell

 1 //
 2 // Source code recreated from a .class file by IntelliJ IDEA  3 // (powered by Fernflower decompiler)  4 //  5 
 6 package org.apache.flume.interceptor;  7 
 8 import java.net.InetAddress;  9 import java.net.UnknownHostException;  10 import java.util.Iterator;  11 import java.util.List;  12 import java.util.Map;  13 import org.apache.flume.Context;  14 import org.apache.flume.Event;  15 import org.slf4j.Logger;  16 import org.slf4j.LoggerFactory;  17 
 18 public class HostInterceptor implements Interceptor {  19     private static final Logger logger = LoggerFactory.getLogger(HostInterceptor.class);  20     private final boolean preserveExisting;  21     private final String header;  22     private String host;  23 
 24     private HostInterceptor(boolean preserveExisting, boolean useIP, String header) {  25         this.host = null;  26         this.preserveExisting = preserveExisting;  27         this.header = header;  28 
 29         try {  30             InetAddress addr = InetAddress.getLocalHost();  31             if (useIP) {  32                 this.host = addr.getHostAddress();  33             } else {  34                 this.host = addr.getCanonicalHostName();  35  }  36         } catch (UnknownHostException var6) {  37             logger.warn("Could not get local host address. Exception follows.", var6);  38  }  39 
 40  }  41 
 42     public void initialize() {  43  }  44 
 45     public Event intercept(Event event) {  46         Map<String, String> headers = event.getHeaders();  47         if (this.preserveExisting && headers.containsKey(this.header)) {  48             return event;  49         } else {  50             if (this.host != null) {  51                 headers.put(this.header, this.host);  52  }  53 
 54             return event;  55  }  56  }  57 
 58     public List<Event> intercept(List<Event> events) {  59         Iterator i$ = events.iterator();  60 
 61         while(i$.hasNext()) {  62             Event event = (Event)i$.next();  63             this.intercept(event);  64  }  65 
 66         return events;  67  }  68 
 69     public void close() {  70  }  71 
 72     public static class Constants {  73         public static String HOST = "host";  74         public static String PRESERVE = "preserveExisting";  75         public static boolean PRESERVE_DFLT = false;  76         public static String USE_IP = "useIP";  77         public static boolean USE_IP_DFLT = true;  78         public static String HOST_HEADER = "hostHeader";  79 
 80         public Constants() {  81  }  82  }  83 
 84     public static class Builder implements org.apache.flume.interceptor.Interceptor.Builder {  85         private boolean preserveExisting;  86         private boolean useIP;  87         private String header;  88 
 89         public Builder() {  90             this.preserveExisting = HostInterceptor.Constants.PRESERVE_DFLT;  91             this.useIP = HostInterceptor.Constants.USE_IP_DFLT;  92             this.header = HostInterceptor.Constants.HOST;  93  }  94 
 95         public Interceptor build() {  96             return new HostInterceptor(this.preserveExisting, this.useIP, this.header);  97  }  98 
 99         public void configure(Context context) { 100             this.preserveExisting = context.getBoolean(HostInterceptor.Constants.PRESERVE, HostInterceptor.Constants.PRESERVE_DFLT); 101             this.useIP = context.getBoolean(HostInterceptor.Constants.USE_IP, HostInterceptor.Constants.USE_IP_DFLT); 102             this.header = context.getString(HostInterceptor.Constants.HOST_HEADER, HostInterceptor.Constants.HOST); 103  } 104  } 105 }
相關文章
相關標籤/搜索