flink寫入hdfs

Maven依賴

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-filesystem_2.10</artifactId>
  <version>1.3.2</version>
</dependency>

Java代碼

DataStream<Tuple2<IntWritable,Text>> input = ...;

BucketingSink<String> sink = new BucketingSink<String>("/base/path");//若是跨集羣要帶上前綴,指定集羣
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
sink.setWriter(new StringWriter<>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,

input.addSink(sink);

主要設置三個屬性Bucketer,Writer,BatchSize。apache

Bucketer:數據寫入hdfs目錄劃分,DateTimeBucketer是根據當前系統時間劃分,具體粒度根據傳入的參數肯定。固然咱們也能夠設置本身的劃分規則,利用數據裏的字段肯定劃分目錄;ide

例如我根據Json數據裏的Timestamp字段肯定劃分目錄:this

class DateHourBucketer implements Bucketer<JSONObject>{
			private static final long serialVersionUID = 1L;
			private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd--HH");
			@Override
			public Path getBucketPath(Clock clock, Path basePath, JSONObject element) {
				// TODO Auto-generated method stub
				Long timetamp = element.getLong("Timestamp");
				String newDateTimeString = format.format(new Date(timetamp));
				return new Path(basePath + "/" + newDateTimeString);
			}
			
		}

Writer:數據寫入格式,默認轉化爲字符串寫入。若是數據格式爲SequenceFiles,咱們能夠用SequenceFileWriter;spa

BatchSize:默認每個線程一個part文件,batchsize指定part文件多大的時候生成新的文件線程

固然咱們仍是能夠設置路徑前綴、後綴,多長時間關閉文件句柄等等屬性。code

默認生成的路徑格式以下:orm

/base/path/{date-time}/part-{parallel-task}-{count}

count是因爲BatchSize而設定的part文件編號 element

注意要開啓checkpoint,否則文件一直處於pending狀態,句柄沒法關閉,不能讀取字符串

相關文章
相關標籤/搜索