上一篇咱們介紹瞭如何將數據從mysql拋到kafka,此次咱們就專一於利用storm將數據寫入到hdfs的過程,因爲storm寫入hdfs的可定製東西有些多,咱們先不從kafka讀取,而先本身定義一個Spout數據充當數據源,下章再進行整合。這裏默認你是擁有必定的storm知識的基礎,起碼知道Spout和bolt是什麼。html
寫入hdfs能夠有如下的定製策略:java
本篇會先說明如何用storm寫入HDFS,寫入過程一些API的描述,以及最後給定一個例子:mysql
storm每接收到10個Tuple後就會改變hdfs寫入文件,新文件的名字就是第幾回改變。git
ps:storm版本:1.1.1。Hadoop版本:2.7.4。github
接下來咱們首先看看Storm如何寫入HDFS。sql
Storm官方有提供了相應的API讓咱們可使用。能夠經過建立HdfsBolt以及定義相應的規則,便可寫入HDFS 。apache
首先經過maven配置依賴以及插件。json
<properties> <storm.version>1.1.1</storm.version> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <!--<scope>provided</scope>--> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>15.0</version> </dependency> <!--hadoop模塊--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.4</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.4</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-hdfs --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hdfs</artifactId> <version>1.1.1</version> <!--<scope>test</scope>--> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <executable>java</executable> <includeProjectDependencies>true</includeProjectDependencies> <includePluginDependencies>false</includePluginDependencies> <classpathScope>compile</classpathScope> <mainClass>com.learningstorm.kafka.KafkaTopology</mainClass> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.7</version> <configuration> <createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
這裏要提一下,若是要打包部署到集羣上的話,打包的插件須要使用maven-shade-plugin這個插件,而後使用maven Lifecycle中的package打包。而不是用Maven-assembly-plugin插件進行打包。安全
由於使用Maven-assembly-plugin的時候,會將全部依賴的包unpack,而後在pack,這樣就會出現,一樣的文件被覆蓋的狀況。發佈到集羣上的時候就會報No FileSystem for scheme: hdfs的錯。dom
而後是使用HdfsBolt寫入Hdfs。這裏來看看官方文檔中的例子吧。
// 使用 "|" 來替代 ",",來進行字符分割 RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter("|"); // 每輸入 1k 後將內容同步到 Hdfs 中 SyncPolicy syncPolicy = new CountSyncPolicy(1000); // 當文件大小達到 5MB ,轉換寫入文件,即寫入到一個新的文件中 FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); //當轉換寫入文件時,生成新文件的名字並使用 FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath("/foo/"); HdfsBolt bolt = new HdfsBolt() .withFsUrl("hdfs://localhost:9000") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); //生成該 bolt topologyBuilder.setBolt("hdfsBolt", bolt, 5).globalGrouping("randomStrSpout");
到這裏就結束了。能夠將HdfsBolt看成一個Storm中特殊一些的bolt便可。這個bolt的做用即便根據接收信息寫入Hdfs。
而在新建HdfsBolt中,Storm爲咱們提供了至關強的靈活性,咱們能夠定義一些策略,好比當達成某個條件的時候轉換寫入文件,新寫入文件的名字,寫入時候的分隔符等等。
若是選擇使用的話,Storm有提供部分接口供咱們使用,但若是咱們以爲不夠豐富也能夠自定義相應的類。下面咱們看看如何控制這些策略吧。
這是一個接口,容許你自由定義接收到內容的格式。
public interface RecordFormat extends Serializable { byte[] format(Tuple tuple); }
Storm提供了DelimitedRecordFormat,使用方法在上面已經有了。這個類默認的分割符是逗號",",而你能夠經過withFieldDelimiter方法改變分隔符。
若是你的初始分隔符不是逗號的話,那麼也能夠重寫寫一個類實現RecordFormat接口便可。
一樣是一個接口。
public interface FileNameFormat extends Serializable { void prepare(Map conf, TopologyContext topologyContext); String getName(long rotation, long timeStamp); String getPath(); }
Storm所提供的默認的是org.apache.storm.hdfs.format.DefaultFileNameFormat。默認人使用的轉換文件名有點長,格式是這樣的:
{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
例如:
MyBolt-5-7-1390579837830.txt
默認狀況下,前綴是空的,擴展標識是".txt"。
同步策略容許你將buffered data緩衝到Hdfs文件中(從而client能夠讀取數據),經過實現org.apache.storm.hdfs.sync.SyncPolicy接口:
public interface SyncPolicy extends Serializable { boolean mark(Tuple tuple, long offset); void reset(); }
這個接口容許你控制什麼狀況下轉換寫入文件。
public interface FileRotationPolicy extends Serializable { boolean mark(Tuple tuple, long offset); void reset(); }
Storm有提供三個實現該接口的類:
最簡單的就是不進行轉換的org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy,就是什麼也不幹。
經過文件大小觸發轉換的org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy。
經過時間條件來觸發轉換的org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy。
若是有更加複雜的需求也能夠本身定義。
這個主要是提供一個或多個hook,可加可不加。主要是在觸發寫入文件轉換的時候會啓動。
public interface RotationAction extends Serializable { void execute(FileSystem fileSystem, Path filePath) throws IOException; }
瞭解了上面的狀況後,咱們會實現一個例子,根據寫入記錄的多少來控制寫入轉換(改變寫入的文件),而且轉換後文件的名字表示當前是第幾回轉換。
首先來看看HdfsBolt的內容:
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(" "); // sync the filesystem after every 1k tuples SyncPolicy syncPolicy = new CountSyncPolicy(1000); // FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.KB); /** rotate file with Date,every month create a new file * format:yyyymm.txt */ FileRotationPolicy rotationPolicy = new CountStrRotationPolicy(); FileNameFormat fileNameFormat = new TimesFileNameFormat().withPath("/test/"); RotationAction action = new NewFileAction(); HdfsBolt bolt = new HdfsBolt() .withFsUrl("hdfs://127.0.0.1:9000") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy) .addRotationAction(action);
而後分別來看各個策略的類。
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.tuple.Tuple; import java.text.SimpleDateFormat; import java.util.Date; /** * 計數以改變Hdfs寫入文件的位置,當寫入10次的時候,則更改寫入文件,更更名字取決於 「TimesFileNameFormat」 * 這個類是線程安全 */ public class CountStrRotationPolicy implements FileRotationPolicy { private SimpleDateFormat df = new SimpleDateFormat("yyyyMM"); private String date = null; private int count = 0; public CountStrRotationPolicy(){ this.date = df.format(new Date()); // this.date = df.format(new Date()); } /** * Called for every tuple the HdfsBolt executes. * * @param tuple The tuple executed. * @param offset current offset of file being written * @return true if a file rotation should be performed */ @Override public boolean mark(Tuple tuple, long offset) { count ++; if(count == 10) { System.out.print("num :" +count + " "); count = 0; return true; } else { return false; } } /** * Called after the HdfsBolt rotates a file. */ @Override public void reset() { } @Override public FileRotationPolicy copy() { return new CountStrRotationPolicy(); } }
import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.task.TopologyContext; import java.util.Map; /** * 決定從新寫入文件時候的名字 * 這裏會返回是第幾回轉換寫入文件,將這個第幾回作爲文件名 */ public class TimesFileNameFormat implements FileNameFormat { //默認路徑 private String path = "/storm"; //默認後綴 private String extension = ".txt"; private Long times = new Long(0); public TimesFileNameFormat withPath(String path){ this.path = path; return this; } @Override public void prepare(Map conf, TopologyContext topologyContext) { } @Override public String getName(long rotation, long timeStamp) { times ++ ; //返回文件名,文件名爲更換寫入文件次數 return times.toString() + this.extension; } public String getPath(){ return this.path; } }
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.storm.hdfs.common.rotation.RotationAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; /** 當轉換寫入文件時候調用的 hook ,這裏僅寫入日誌。 */ public class NewFileAction implements RotationAction { private static final Logger LOG = LoggerFactory.getLogger(NewFileAction.class); @Override public void execute(FileSystem fileSystem, Path filePath) throws IOException { LOG.info("Hdfs change the written file!!"); return; } }
OK,這樣就大功告成了。經過上面的代碼,每接收到10個Tuple後就會轉換寫入文件,新文件的名字就是第幾回轉換。
完整代碼包括一個隨機生成字符串的Spout,能夠到個人github上查看。
StormHdfsDemo:https://github.com/shezhiming/StormHdfsDemo
推薦閱讀:
Mysql 流增量寫入 Hdfs(一) --從 mysql 到 kafka
Spark SQL,如何將 DataFrame 轉爲 json 格式