通過一個多月來的研究實踐,在親友團的幫助下,個人第一個spark程序終於上線了,程序雖然簡單,但也是我今天在新領域的探索,從業來,第一次去把一個java的小項目推上線。今天抽時間覆盤總結下過程。
關於hive和spark的基礎介紹,網上的說明比較多,我這裏就不說明了,只根據自身感悟說明吧。java
全部的技術都是爲特定的業務服務的,spark也不例外。我也是碰到一個特定的場景,須要涉及單日大量的數據生成,而後處理,前期計算量比較大。想方案,當時依賴其餘業務資源,有幾種方法能夠研究。python
經過調研,和資源評估成本等緣由,咱們決定使用離線的方式解決,這樣在知足需求的同時,共用系統資源,最低使用成本。web
說到離線計算不得不說到hive sql。hive sql是個MapReduce的任務過程。我問了其餘部門作過相關業務的同事,開通了權限,而後用hive sql執行了下對離線表的查詢。發現生成數據的時間還能夠接受,但數據須要落地,而後傳輸到咱們本身的業務機上,再處理。這個過程就比較慢了,難以達到要求。sql
ps: 或許是使用的姿式不對吧,總之此次一些各類指標不達標,繼續需求新的方案。apache
其餘部門兄弟推薦,使用spark計算,而後介紹多麼多麼速度快,徹底實現咱們的需求沒有問題,最關鍵能給咱們找個顧問。之前是搞web應用服務的,對於離線計算的只是概念上,只存在幾個帖子裏和一些理論的知識。因此,我綜合評估了下,決定嘗試下看看。問了下「顧問」,他給我講了一遍,因而咱們的spark之旅開始了。項目參照他們的項目,簡化結構。編程
spark程序開發主要有幾種方式:服務器
通過裁判,我決定跟同事的項目保持一致,使用java,學習成本低,能夠快速參考實現。app
我本地用的intellij idea 做爲本地開發工具,採用maven管理相關的包。
java版本:maven
java -version > java version "1.8.0_20"
填寫相關信息:ide
修改項目的pom.xml引入相關的依賴,主要是hive sql相關的部分。
<properties> ... <spark.version>2.2.1</spark.version> <spark.scope>compile</spark.scope> </properties> <dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> <scope>${spark.scope}</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> <scope>${spark.scope}</scope> </dependency> </dependencies> ...
import org.apache.spark.sql.SparkSession; SparkSession sparkSession = SparkSession.builder() .config("hive.exec.scratchdir", "/user_ext/{username}/hive-{username}") .config("spark.sql.warehouse.dir", "/user_ext/{username}/warehouse") .appName("{Your app name}") .enableHiveSupport() .getOrCreate();
原本想再hive裏面進行數據查詢的,不過最終仍是找帖子在spark裏面實現。
import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.*; // 如下是建立一個簡單結構 List<StructField> structFields = new ArrayList<StructField>(); structFields.add(DataTypes.createStructField( "demo_id", DataTypes.StringType, true )); structFields.add(DataTypes.createStructField( "demo_name", DataTypes.StringType, true )); StructType structType = DataTypes.createStructType(structFields); // 添加數據 List<Row> rows = new ArrayList<>(); rows.add(RowFactory.create("demo_id", "demo_name")); ... // 能夠添加多行 // 建立臨時表 Dataset<Row> df = sparkSession.createDataFrame(rows, structType); df.registerTempTable("tmp_demo");
至此,臨時表建立完成,tmp_demo可直接參與查詢了。
關於查詢,須要注意理解分區的含義。
在寫查找離線表以前,你要問清楚你的數據表是怎麼存儲的,是否有按照實踐分區的概念。我一開始不理解,一執行語句就掛住。後來問了下,得知這個表是一直按照時間進行存儲的,使用的使用得使用最新的分區。
show partitions tableName 能夠查找到全部的分區,選擇最新的一個便可。 select * from tableName where dt={最新分區}
如下是一個完整的實例:
sql = "select * from tmp_demo"; Dataset<Row> rets = spark.sql(sql); List<Row> list = rets.collectAsList(); for (Row row: list) { do some action... }
注意:少許的數據可使用collectAsList進行轉換,可是大量的數據就得考慮用 mapPartition,或者foreachPatition進行遍歷了。
具體內容可參考map,mapPartition,foreachPatition的比較。
對java項目不熟悉,什麼又沒有熟悉的同窗,因此有幾天卡在這個打包上。一開始我參照以前的例子,在Project Structure -> Artifacts 裏面進行配置,而後在build -> build Artifacts 裏面打包,生成的jar文件進行上傳。
! 注意:上面這是一種錯誤的打包方式。我將 output 的jar包上傳後,一直報告找不到類的錯誤。
固然網上還有一種錯誤,就是main所在的類用的路徑在 spark-submit 的參數中沒有加上。
正確的打包方式,應該選用右側的maven projects方式進行,
這樣在 target 目錄下生成的.jar 文件就沒有問題了。
由於是傳到遠端服務器上的jar包,因此決定上傳的效率有兩個。
使用foreachPatition,在參數的地方使用new ForeachPartitionFunction進行處理。
在Call函數裏面的日誌,通常是在相似子任務裏面,在這裏我調試了數次,找了半天個人日誌在哪裏打印。
最後執行必定落實到下面的命令上,此時 --class的參數必定要包含你的main函數,對應的包名。
spark-submit --master yarn --deploy-mode cluster ... --class package1.package2.classname demo.jar
爲了此次的嘗試,我基本上把網上的例子翻遍了,發現有不少是基於scala的,看來scala是大趨勢。同時相關的靠譜的帖子少於其餘話題的,估計搞大數據的相關的人仍是相對搞應用的不太多吧。推薦本書《Spark大數據分析 源碼解析與實例詳解》,對理解spark相關的知識頗有幫助。最後,歡迎有興趣的朋友一塊兒討論啊。