<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.10</artifactId> <version>1.3.2</version> </dependency>
DataStream<Tuple2<IntWritable,Text>> input = ...; BucketingSink<String> sink = new BucketingSink<String>("/base/path");//若是跨集羣要帶上前綴,指定集羣 sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm")); sink.setWriter(new StringWriter<>()); sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB, input.addSink(sink);
主要設置三個屬性Bucketer,Writer,BatchSize。apache
Bucketer:數據寫入hdfs目錄劃分,DateTimeBucketer是根據當前系統時間劃分,具體粒度根據傳入的參數肯定。固然咱們也能夠設置本身的劃分規則,利用數據裏的字段肯定劃分目錄;ide
例如我根據Json數據裏的Timestamp字段肯定劃分目錄:this
class DateHourBucketer implements Bucketer<JSONObject>{ private static final long serialVersionUID = 1L; private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd--HH"); @Override public Path getBucketPath(Clock clock, Path basePath, JSONObject element) { // TODO Auto-generated method stub Long timetamp = element.getLong("Timestamp"); String newDateTimeString = format.format(new Date(timetamp)); return new Path(basePath + "/" + newDateTimeString); } }
Writer:數據寫入格式,默認轉化爲字符串寫入。若是數據格式爲SequenceFiles,咱們能夠用SequenceFileWriter;spa
BatchSize:默認每個線程一個part文件,batchsize指定part文件多大的時候生成新的文件線程
固然咱們仍是能夠設置路徑前綴、後綴,多長時間關閉文件句柄等等屬性。code
默認生成的路徑格式以下:orm
/base/path/{date-time}/part-{parallel-task}-{count}
count是因爲BatchSize而設定的part文件編號 element
注意要開啓checkpoint,否則文件一直處於pending狀態,句柄沒法關閉,不能讀取字符串