1.引入HDFS的相關jar包:java
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId> <version>2.1.0</version> </dependency>
2.使用HadoopFileSystemOptions代替PipelineOptionsapache
public interface WordCountOptions extends HadoopFileSystemOptions { @Description("input file") @Default.String("hdfs://localhost:9000/tmp/words2") String getInputFile(); void setInputFile(String in); @Description("output") @Default.String("hdfs://localhost:9000/tmp/hdfsWordCount") String getOutput(); void setOutput(String out); }
3.給Options指定HDFS配置app
Configuration conf=new Configuration(); conf.set("fs.default.name", "hdfs://localhost:9000"); HDFSOption options= PipelineOptionsFactory.fromArgs(args).withValidation() .as(HDFSOption.class); options.setHdfsConfiguration(ImmutableList.of(conf));
4.與訪問本地文件同樣訪問HDFS文件maven
Pipeline p = Pipeline.create(options); Data = p.apply("Read from HDFS", TextIO.read().from(options.getInputFile()));
實際測試中發現本地runner(如Direct, Flink Local, Spark Local...)可以成功讀寫HDFS,可是集羣模式下(如Flink Cluster, Spark Cluster...)讀寫HDFS失敗,緣由未知。oop
除了直接讀寫HDFS的數據,還能夠經過HBase來進行讀寫。
1.添加相關jar包測試
<!--hbase--> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hbase</artifactId> <version>${beam.verson}</version> </dependency>
2.設置HBase鏈接信息插件
Configuration conf = new Configuration(); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); conf.setStrings("hbase.master.hostname", "localhost"); conf.setStrings("hbase.regionserver.hostname", "localhost");
3.使用上述的conf讀HBase數據調試
pipe //指定配置和表名 .apply("Read from HBase", HBaseIO.read().withConfiguration(conf).withTableId("test_tb")) .apply(ParDo.of(new DoFn<Result, String>() { @ProcessElement public void processElement(ProcessContext c) { //讀到的數據是HBase API中定義的Result格式,須要按照HBase官方說明進行剝取 Result result = c.element(); String rowkey = Bytes.toString(result.getRow()); System.out.println("row key: "); for(Cell cell : result.listCells()) { System.out.println("qualifier:"+Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println("value:"+Bytes.toString(CellUtil.cloneValue(cell))); } c.output(rowkey); } }));
4.寫入到HBasecode
//寫入前須要將string數據封裝爲Hbase數據格式mutation .apply(ParDo.of(new DoFn<String, Mutation>() { @ProcessElement public void processElement(ProcessContext context) { byte[] qual = Bytes.toBytes("qual"); byte[] cf = Bytes.toBytes("cf"); byte[] row = Bytes.toBytes("kafka"); byte[] val = Bytes.toBytes(context.element()); final Charset UTF_8 = Charset.forName("UTF-8"); Mutation mutation = new Put(row).addColumn(cf, qual, val); context.output(mutation); } })) .apply("write to Hbase", HBaseIO.write() .withConfiguration(conf) .withTableId("test_tb"));
經測試,不管本地runner仍是集羣runner都能成功讀寫。
可是發現一個問題,使用mvn exec:java進行調試成功,而使用shade插件打包成jar運行卻一直報錯,說Mutation沒有指定coder,beam論壇上求助後獲得的回覆是maven-shade-plugin版本太舊,須要更新到3.0.0以上版本,但我改了3.0的版本以後仍是同樣的錯誤。後來添加了ServicesResourceTransformer才解決。orm
<transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers>