Apache Beam訪問HDFS

1、直接訪問

1.引入HDFS的相關jar包:java

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
    <version>2.1.0</version>
    </dependency>

2.使用HadoopFileSystemOptions代替PipelineOptionsapache

public interface WordCountOptions extends HadoopFileSystemOptions {
    @Description("input file")
    @Default.String("hdfs://localhost:9000/tmp/words2")
    String getInputFile();
    void setInputFile(String in);

    @Description("output")
    @Default.String("hdfs://localhost:9000/tmp/hdfsWordCount")
    String getOutput();
    void setOutput(String out);
}

3.給Options指定HDFS配置app

Configuration conf=new Configuration();
    conf.set("fs.default.name", "hdfs://localhost:9000");
    HDFSOption options= PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(HDFSOption.class);
    options.setHdfsConfiguration(ImmutableList.of(conf));

4.與訪問本地文件同樣訪問HDFS文件maven

Pipeline p = Pipeline.create(options);
    Data = p.apply("Read from HDFS",
            TextIO.read().from(options.getInputFile()));

實際測試中發現本地runner(如Direct, Flink Local, Spark Local...)可以成功讀寫HDFS,可是集羣模式下(如Flink Cluster, Spark Cluster...)讀寫HDFS失敗,緣由未知。oop

2、經過HBase訪問

除了直接讀寫HDFS的數據,還能夠經過HBase來進行讀寫。
1.添加相關jar包測試

<!--hbase-->
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-hbase</artifactId>
        <version>${beam.verson}</version>
    </dependency>

2.設置HBase鏈接信息插件

Configuration conf = new Configuration();
    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
    conf.setStrings("hbase.master.hostname", "localhost");
    conf.setStrings("hbase.regionserver.hostname", "localhost");

3.使用上述的conf讀HBase數據調試

pipe
            //指定配置和表名
            .apply("Read from HBase",
                    HBaseIO.read().withConfiguration(conf).withTableId("test_tb"))
                       
            .apply(ParDo.of(new DoFn<Result, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                 //讀到的數據是HBase API中定義的Result格式,須要按照HBase官方說明進行剝取 
                    Result result = c.element();
                    String rowkey = Bytes.toString(result.getRow());
                    System.out.println("row key: ");

                    for(Cell cell : result.listCells())
                    {
                        System.out.println("qualifier:"+Bytes.toString(CellUtil.cloneQualifier(cell)));
                        System.out.println("value:"+Bytes.toString(CellUtil.cloneValue(cell)));
                    }

                    c.output(rowkey);
                }
            }));

4.寫入到HBasecode

//寫入前須要將string數據封裝爲Hbase數據格式mutation
    .apply(ParDo.of(new DoFn<String, Mutation>() {
        @ProcessElement
        public void processElement(ProcessContext context) {
            byte[] qual = Bytes.toBytes("qual");
            byte[] cf = Bytes.toBytes("cf");
            byte[] row = Bytes.toBytes("kafka");
            byte[] val = Bytes.toBytes(context.element());
            final Charset UTF_8 = Charset.forName("UTF-8");
            Mutation mutation = new Put(row).addColumn(cf, qual, val);
            context.output(mutation);
        }

    }))
    .apply("write to Hbase",
            HBaseIO.write()
                    .withConfiguration(conf)
                    .withTableId("test_tb"));

經測試,不管本地runner仍是集羣runner都能成功讀寫。
可是發現一個問題,使用mvn exec:java進行調試成功,而使用shade插件打包成jar運行卻一直報錯,說Mutation沒有指定coder,beam論壇上求助後獲得的回覆是maven-shade-plugin版本太舊,須要更新到3.0.0以上版本,但我改了3.0的版本以後仍是同樣的錯誤。後來添加了ServicesResourceTransformer才解決。orm

<transformers>
    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
相關文章
相關標籤/搜索