自定義source開發:mysql
source是收集日誌存入channel。web
Source提供了兩種機制:PollableSource(輪訓拉取)和EventDrivenSource(事件驅動),sql
若是使用EventDrivenSource,你能夠在start方法中啓動額外的線程,不斷的往channel中發數據。若是使用PollableSource,你能夠在process()實現不斷重發。apache
public class MySource extends AbstractSource implements Configurable, PollableSource { private String myProp; @Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); // Process the myProp value (e.g. validation, convert to another type, ...) // Store myProp for later retrieval by process() method this.myProp = myProp; } @Override public void start() { // Initialize the connection to the external client } @Override public void stop () { // Disconnect from external client and do any additional cleanup // (e.g. releasing resources or nulling-out field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; try { // This try clause includes whatever Channel/Event operations you want to do // Receive new data Event e = getSomeData(); // Store the Event into this Source's associated Channel(s) getChannelProcessor().processEvent(e); status = Status.READY; } catch (Throwable t) { // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); } return status; }}
或者ide
package
org.apache.flume;
import
org.apache.flume.conf.Configurable;
import
org.apache.flume.source.AbstractSource;
public
class
TailSource
extends
AbstractSource
implements
EventDrivenSource,
Configurable {
@Override
public
void
configure(Context context) {
}
@Override
public
synchronized
void
start() {
}
@Override
public
synchronized
void
stop() {
}
}
自定義sink:this
sink是從channel中拉取日誌處理。spa
process會不斷調用,你只需在process中去取channel的數據便可。線程
public class MySink extends AbstractSink implements Configurable { private String myProp; @Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); // Process the myProp value (e.g. validation) // Store myProp for later retrieval by process() method this.myProp = myProp; } @Override public void start() { // Initialize the connection to the external repository (e.g. HDFS) that // this Sink will forward Events to .. } @Override public void stop () { // Disconnect from the external respository and do any // additional cleanup (e.g. releasing resources or nulling-out // field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { // This try clause includes whatever Channel operations you want to do Event event = ch.take(); // Send the Event to the external repository. // storeSomeData(e); txn.commit(); status = Status.READY; } catch (Throwable t) { txn.rollback(); // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } return status; }}
自定義sink與mysql整合http://www.iteblog.com/archives/1109日誌