package com.mycom; import; import; import; import; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MyFlumeSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(MyFlumeSink.class); private static final String PROP_KEY_ROOTPATH = "fileName"; private String fileName; @Override public void configure(Context context) { // TODO Auto-generated method stub fileName = context.getString(PROP_KEY_ROOTPATH); } @Override public Status process() throws EventDeliveryException { // TODO Auto-generated method stub Channel ch = getChannel(); Transaction txn = ch.getTransaction(); Event event = null; txn.begin(); while (true) { event = ch.take(); if (event != null) { break; } } try { logger.debug("Get event."); String body = new String(event.getBody()); System.out.println("event.getBody()-----" + body); String res = body + ":" + System.currentTimeMillis() + "\r\n"; File file = new File(fileName); FileOutputStream fos = null; try { fos = new FileOutputStream(file, true); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { fos.write(res.getBytes()); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { fos.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } txn.commit(); return Status.READY; } catch (Throwable th) { txn.rollback(); if (th instanceof Error) { throw (Error) th; } else { throw new EventDeliveryException(th); } } finally { txn.close(); } } }
[root@d1 apache-flume-1.8.0-bin]# cat conf/http_test.conf a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=http a1.sources.r1.bind= a1.sources.r1.port=50000 a1.sources.r1.channels=c1 a1.sinks.k1.type=logger a1.sinks.k1.type = com.mycom.MyFlumeSink a1.sinks.k1.fileName=/home/mysinks.txt a1.channels.c1.type=memory #a1.channels.c1.capacity=1000 #a1.channels.c1.transactionCapacity=100 a1.channels.c1.capacity=8 a1.channels.c1.transactionCapacity=4 [root@d1 apache-flume-1.8.0-bin]#
[root@d1 home]# cat ~/.bash_profile
# .bash_profile
# Get the aliases and functions
if [ -f ~/.bashrc ]; then
. ~/.bashrc
# User specific environment and startup programs
export PATH
JAVA_HOME=/usr/local/jdk;export JAVA_HOME;
[root@d1 home]#
bin/flume-ng agent -c /home/apache-flume-1.8.0-bin/conf/ -f /home/apache-flume-1.8.0-bin/conf/http_test.conf -n a1 -Dflume.root.logger=INFO,console -C /home/MyBgJavaLan/target/MyAid-1.0.0-jar-with-dependencies.jar
[root@d1 MyBgJavaLan]# mvn clean;mvn compile;mvn package;
package com.mycom; import; import; import; import; import java.util.HashMap; import java.util.Map; import; import; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import; import; import; import; import; public class MyFlumeSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(MyFlumeSink.class); private static final String PROP_KEY_ROOTPATH = "fileName"; private String fileName; @Override public void configure(Context context) { // TODO Auto-generated method stub fileName = context.getString(PROP_KEY_ROOTPATH); } @Override public Status process() throws EventDeliveryException { // TODO Auto-generated method stub Channel ch = getChannel(); Transaction txn = ch.getTransaction(); Event event = null; txn.begin(); while (true) { event = ch.take(); if (event != null) { break; } } try { logger.debug("Get event."); // request.getParameter("username") // JSON json = JSONObject.parseObject(event.getBody()); //JSONObject jsonObject1 = JSONObject.parseObject(JSON_OBJ_STR); //由於JSONObject繼承了JSON,因此這樣也是能夠的 // System.out.println(jsonObject.getString("studentName")+":"+jsonObject.getInteger("studentAge")); // String body = jsonObject.getString("body"); // //old //JAVA解析JSON數據 - monsterLin - 博客園 // String body = new String(event.getBody()); String body = event.getBody().toString(); JsonParser parse = new JsonParser(); //建立json解析器 JsonObject json = (JsonObject) parse.parse(body); //建立jsonObject對象 String bodyReal = json.get("body").getAsString(); System.out.println("event.getBody()-----" + bodyReal); String res = bodyReal + ":" + System.currentTimeMillis() + "\r\n"; File file = new File(fileName); FileOutputStream fos = null; try { fos = new FileOutputStream(file, true); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { fos.write(res.getBytes()); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { fos.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } txn.commit(); return Status.READY; } catch (Throwable th) { txn.rollback(); if (th instanceof Error) { throw (Error) th; } else { throw new EventDeliveryException(th); } } finally { txn.close(); } } }
package com.mycom; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.event.EventBuilder; import java.nio.charset.Charset; import java.text.SimpleDateFormat; // public class MyAppFlume { public static void main(String[] args) { MyRpcClientFacade client = new MyRpcClientFacade(); // Initialize client with the remote Flume agent's host and port // client.init("hadoop1", 41414); // client.init("", 41414); client.init("", 41414); // Send 10 events to the remote Flume agent. That agent should be configured to listen with an AvroSource. String sampleData = "Hello Flume!"; SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd--HH-mm-ss"); long cTm = System.currentTimeMillis(); String df = sdf.format(cTm); System.out.println(df); String str_ = ""; for (int i = 0; i < 14; i++) { str_ = str_ + i + df + "加油!!" + sampleData; } System.out.println(str_); // for (int i = 0; i < 20; i++) { // String str = i + "------------" + str_ + df + "加油!!" + sampleData; // System.out.println(str); // client.sendDataToFlume(str); // } String str = "對StringBuilder拋出ArrayIndexOutOfBoundsException的探究 - CSDN博客" + "------------" + str_ + df + "加油!!" + sampleData; System.out.println(str); client.sendDataToFlume(str); client.cleanUp(); } } class MyRpcClientFacade { private RpcClient client; private String hostname; private int port; public void init(String hostname, int port) { // Setup the RPC connection this.hostname = hostname; this.port = port; this.client = RpcClientFactory.getDefaultInstance(hostname, port); // Use the following method to create a thrift client(instead of the above line); // this.client=RpcClientFactory.getThriftInstance(hostname,port); } public void sendDataToFlume(String data) { // Create a Flume Event object that encapsulate the sample data Event event = EventBuilder.withBody(data, Charset.forName("UTF-8")); System.out.println("--->"); System.out.println(data); System.out.println(event); System.out.println("<--->"); /* * int getBatchSize(); void append(Event var1) throws EventDeliveryException; void appendBatch(List<Event> var1) throws EventDeliveryException; boolean isActive(); void close() throws FlumeException; * * */ // Send the event try { this.client.append(event); System.out.println("<----------client.append(event)--------------->"); } catch (EventDeliveryException e) { e.printStackTrace(); System.out.println(e); // clean up and recreate the client client.close(); client = null; client = RpcClientFactory.getDefaultInstance(hostname, port); } } public void cleanUp() { // Close the RPC connection client.close(); } }
版本V a
protected function data2unifiedlog($data)
"headers" : {
"timestamp" : "434324343",
"host" : ""
"body" : "random_body"
"headers" : {
"namenode" : "",
"datanode" : ""
"body" : "really_random_body"
$ch = curl_init();
$now = date('y-m-d h:i:s', time()) . gethostname();
$post_data_json = '[{
"headers" : {
"timestamp" : "434324343",
"host" : ""
"body" : \'str86677' . $now . '\'}]';
$s = '{"hostname":' . gethostname() . ',"timestamp":"' . time() . '"';
foreach ($data as $k => $v) {
$s = $s . ',"' . $k . '":"' . $v . '"';
$s = $s . '}';
$post_data_json = '[{
"headers" : {
"timestamp" : "434324343",
"host" : ""
"body" : \'' . $s . '\'}]';
$curlopt_url = '';
curl_setopt($ch, CURLOPT_URL, $curlopt_url);
curl_setopt($ch, CURLOPT_HEADER, TRUE);
curl_setopt($ch, CURLOPT_HTTPHEADER, array('Content-Type:application/json'));
curl_setopt($ch, CURLOPT_POSTFIELDS, $post_data_json);
#a1.sinks.k1.type = com.product.FlumeApp
a1.sinks.k1.type = file_roll = /data/UnifiedLog/log
a1.sinks.k1.sink.serializer = text
export FlumeHome=/data/UnifiedLog/flume; $FlumeHome/bin/flume-ng agent -c $FlumeHome/conf/ -f $FlumeHome/conf/httpSourceApp.conf -n a1 -Dflume.root.logger=INFO,console -Xms10240m -Xmx10240m ;
root 23619 1.4 7.9 16559800 2608576 pts/3 Sl+ 15:31 1:05 /usr/java/jdk1.8.0_101/bin/java -Xmx20m -Dflume.root.logger=INFO,console -Xms10240m -Xmx10240m -cp /data/UnifiedLog/flume/conf:/data/UnifiedLog/flume/lib/*:/lib/* -Djava.library.path= org.apache.flume.node.Application -f /data/UnifiedLog/flume/conf/httpSourceApp.conf -n a1
tcp 0 0* LISTEN 23619/java
8.0K -rw-r--r-- 1 root root 5.3K Nov 2 16:32 1541143920191-102
8.0K -rw-r--r-- 1 root root 8.0K Nov 2 16:32 1541143920191-103
8.0K -rw-r--r-- 1 root root 8.0K Nov 2 16:32 1541143920191-104
8.0K -rw-r--r-- 1 root root 8.0K Nov 2 16:32 1541143920191-105
8.0K -rw-r--r-- 1 root root 8.0K Nov 2 16:32 1541143920191-106
8.0K -rw-r--r-- 1 root root 5.3K Nov 2 16:32 1541143920191-107
8.0K -rw-r--r-- 1 root root 8.0K Nov 2 16:32 1541143920191-108
8.0K -rw-r--r-- 1 root root 5.3K Nov 2 16:32 1541143920191-109
8.0K -rw-r--r-- 1 root root 8.0K Nov 2 16:32 1541143920191-110
8.0K -rw-r--r-- 1 root root 8.0K Nov 2 16:32 1541143920191-111
8.0K -rw-r--r-- 1 root root 8.0K Nov 2 16:32 1541143920191-112
8.0K -rw-r--r-- 1 root root 5.3K Nov 2 16:32 1541143920191-113
8.0K -rw-r--r-- 1 root root 5.3K Nov 2 16:32 1541143920191-114
8.0K -rw-r--r-- 1 root root 8.0K Nov 2 16:32 1541143920191-115
8.0K -rw-r--r-- 1 root root 5.3K Nov 2 16:32 1541143920191-116
8.0K -rw-r--r-- 1 root root 8.0K Nov 2 16:32 1541143920191-117
8.0K -rw-r--r-- 1 root root 8.0K Nov 2 16:32 1541143920191-118
8.0K -rw-r--r-- 1 root root 5.3K Nov 2 16:32 1541143920191-119
4.0K -rw-r--r-- 1 root root 2.7K Nov 2 16:32 1541143920191-120
@Test | |
public void testSimpleUTF16() throws IOException, InterruptedException { | |
StringEntity input = new StringEntity("[{\"headers\":{\"a\": \"b\"},\"body\": \"random_body\"}," | |
+ "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]", "UTF-16"); | |
input.setContentType("application/json; charset=utf-16"); | |
postRequest.setEntity(input); | |
HttpResponse response = httpClient.execute(postRequest); | |
Assert.assertEquals(HttpServletResponse.SC_OK, | |
response.getStatusLine().getStatusCode()); | |
Transaction tx = httpChannel.getTransaction(); | |
tx.begin(); | |
Event e = httpChannel.take(); | |
Assert.assertNotNull(e); | |
Assert.assertEquals("b", e.getHeaders().get("a")); | |
Assert.assertEquals("random_body", new String(e.getBody(), "UTF-16")); | |
e = httpChannel.take(); | |
Assert.assertNotNull(e); | |
Assert.assertEquals("f", e.getHeaders().get("e")); | |
Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-16")); | |
tx.commit(); | |
tx.close(); | |
} | |
@Test | |
public void testInvalid() throws Exception { | |
StringEntity input = new StringEntity("[{\"a\": \"b\",[\"d\":\"e\"],\"body\": \"random_body\"}," | |
+ "{\"e\": \"f\",\"body\": \"random_body2\"}]"); | |
input.setContentType("application/json"); | |
postRequest.setEntity(input); | |
HttpResponse response = httpClient.execute(postRequest); | |
Assert.assertEquals(HttpServletResponse.SC_BAD_REQUEST, | |
response.getStatusLine().getStatusCode()); | |
SourceCounter sc = (SourceCounter) Whitebox.getInternalState(httpSource, "sourceCounter"); | |
Assert.assertEquals(1, sc.getEventReadFail()); | |
} |
headers application/json
body [{"headers" : {"a":"b", "c":"d"},"body": "random_body"}, {"headers" : {"e": "f"},"body": "random_body2"}]
public class JSONHandler
extends Object
implements HTTPSourceHandler
1. headers - the key for this key-value pair is "headers". The value for this key is another map, which represent the event headers. These headers are inserted into the Flume event as is.
2. body - The body is a string which represents the body of the event. The key for this key-value pair is "body". All key-value pairs are considered to be headers. An example:
[{"headers" : {"a":"b", "c":"d"},"body": "random_body"}, {"headers" : {"e": "f"},"body": "random_body2"}]
would be interpreted as the following two flume events:
* Event with body: "random_body" (in UTF-8/UTF-16/UTF-32 encoded bytes) and headers : (a:b, c:d)
* Event with body: "random_body2" (in UTF-8/UTF-16/UTF-32 encoded bytes) and headers : (e:f)
The charset of the body is read from the request and used. If no charset is set in the request, then the charset is assumed to be JSON's default - UTF-8. The JSON handler supports UTF-8, UTF-16 and UTF-32. To set the charset, the request must have content type specified as "application/json; charset=UTF-8" (replace UTF-8 with UTF-16 or UTF-32 as required). One way to create an event in the format expected by this handler, is to use JSONEvent and use Gson to create the JSON string using the Gson.toJson(java.lang.Object, java.lang.reflect.Type) method. The type token to pass as the 2nd argument of this method for list of events can be created by:
Type type = new TypeToken<List<JSONEvent>>() {}.getType();