一次使用spark進行離線計算的實踐

通過一個多月來的研究實踐,在親友團的幫助下,個人第一個spark程序終於上線了,程序雖然簡單,但也是我今天在新領域的探索,從業來,第一次去把一個java的小項目推上線。今天抽時間覆盤總結下過程。
關於hive和spark的基礎介紹,網上的說明比較多,我這裏就不說明了,只根據自身感悟說明吧。java

背景

全部的技術都是爲特定的業務服務的,spark也不例外。我也是碰到一個特定的場景,須要涉及單日大量的數據生成,而後處理,前期計算量比較大。想方案,當時依賴其餘業務資源,有幾種方法能夠研究。python

  1. 線上接口使用,咱們用業務方的線上的接口,須要咱們和業務方同時申請大規模資源,去作處理。
  2. 其餘部門兄弟推薦訪問離線數據表進行計算。這個得重點說下,經過了解,公司恰巧有大數據相關的業務,有些部門專門作過這些內容,因此咱們經過 問詢,能夠做爲客戶端應用層,接入業務,固然相關的業務邏輯得咱們本身完成。

經過調研,和資源評估成本等緣由,咱們決定使用離線的方式解決,這樣在知足需求的同時,共用系統資源,最低使用成本。web

hive

說到離線計算不得不說到hive sql。hive sql是個MapReduce的任務過程。我問了其餘部門作過相關業務的同事,開通了權限,而後用hive sql執行了下對離線表的查詢。發現生成數據的時間還能夠接受,但數據須要落地,而後傳輸到咱們本身的業務機上,再處理。這個過程就比較慢了,難以達到要求。sql

ps: 或許是使用的姿式不對吧,總之此次一些各類指標不達標,繼續需求新的方案。apache

環境搭建

其餘部門兄弟推薦,使用spark計算,而後介紹多麼多麼速度快,徹底實現咱們的需求沒有問題,最關鍵能給咱們找個顧問。之前是搞web應用服務的,對於離線計算的只是概念上,只存在幾個帖子裏和一些理論的知識。因此,我綜合評估了下,決定嘗試下看看。問了下「顧問」,他給我講了一遍,因而咱們的spark之旅開始了。項目參照他們的項目,簡化結構。編程

spark程序開發主要有幾種方式:服務器

  1. Scala 語言是一門類 Java 的多範式語言,其設計初衷就是爲了繼承函數式編程的面向對象編程的各類特性,網上了例子我百分之六七十都是使用scala做爲開發語言的,我買的書籍也是使用scala進行開發的。
  2. Java 語言,這個是參考的兄弟部門的項目例子使用的語言,網上的例子雖然不多,但相對於scala來講,根據之前的經驗,我仍是使用java做爲開發主語言,出了問題,至少在java上能夠請教相關經驗豐富的同事。
  3. python 雖然之前在開發遊戲的時候作過,但spark相關的開發,尚未進行過,這裏暫時不考慮。

通過裁判,我決定跟同事的項目保持一致,使用java,學習成本低,能夠快速參考實現。app

我本地用的intellij idea 做爲本地開發工具,採用maven管理相關的包。
java版本:maven

java -version
> java version "1.8.0_20"

新建項目:

一次使用spark進行離線計算的實踐

填寫相關信息:
一次使用spark進行離線計算的實踐ide

修改項目的pom.xml引入相關的依賴,主要是hive sql相關的部分。

<properties>
       ...
        <spark.version>2.2.1</spark.version>
        <spark.scope>compile</spark.scope>
    </properties>
  <dependencies>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>1.2.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>${spark.scope}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>${spark.scope}</scope>
        </dependency>
</dependencies>
...

初始化SparkSession

import org.apache.spark.sql.SparkSession;

 SparkSession sparkSession = SparkSession.builder()
                    .config("hive.exec.scratchdir", "/user_ext/{username}/hive-{username}")
                    .config("spark.sql.warehouse.dir", "/user_ext/{username}/warehouse")
                    .appName("{Your app name}")
                    .enableHiveSupport()
                    .getOrCreate();

建立臨時表

原本想再hive裏面進行數據查詢的,不過最終仍是找帖子在spark裏面實現。

import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.*;

//  如下是建立一個簡單結構
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField( "demo_id", DataTypes.StringType, true ));
structFields.add(DataTypes.createStructField( "demo_name", DataTypes.StringType, true ));
StructType structType = DataTypes.createStructType(structFields);

// 添加數據
List<Row> rows = new ArrayList<>();
rows.add(RowFactory.create("demo_id", "demo_name"));
... // 能夠添加多行

 // 建立臨時表
Dataset<Row> df = sparkSession.createDataFrame(rows, structType);
df.registerTempTable("tmp_demo");

至此,臨時表建立完成,tmp_demo可直接參與查詢了。

查詢

關於查詢,須要注意理解分區的含義。
在寫查找離線表以前,你要問清楚你的數據表是怎麼存儲的,是否有按照實踐分區的概念。我一開始不理解,一執行語句就掛住。後來問了下,得知這個表是一直按照時間進行存儲的,使用的使用得使用最新的分區。

show partitions tableName 能夠查找到全部的分區,選擇最新的一個便可。
select * from tableName where dt={最新分區}

如下是一個完整的實例:

sql = "select * from tmp_demo";
Dataset<Row> rets = spark.sql(sql);
List<Row> list = rets.collectAsList(); 

for (Row row: list) {
    do some action...
}

注意:少許的數據可使用collectAsList進行轉換,可是大量的數據就得考慮用 mapPartition,或者foreachPatition進行遍歷了。
具體內容可參考map,mapPartition,foreachPatition的比較。

踩過的坑

打包

對java項目不熟悉,什麼又沒有熟悉的同窗,因此有幾天卡在這個打包上。一開始我參照以前的例子,在Project Structure -> Artifacts 裏面進行配置,而後在build -> build Artifacts 裏面打包,生成的jar文件進行上傳。

! 注意:上面這是一種錯誤的打包方式。我將 output 的jar包上傳後,一直報告找不到類的錯誤。
一次使用spark進行離線計算的實踐

固然網上還有一種錯誤,就是main所在的類用的路徑在 spark-submit 的參數中沒有加上。

正確的打包方式,應該選用右側的maven projects方式進行,
這樣在 target 目錄下生成的.jar 文件就沒有問題了。
一次使用spark進行離線計算的實踐

上傳

由於是傳到遠端服務器上的jar包,因此決定上傳的效率有兩個。

  1. jar包的大小,我選擇最小的引入,能省略的省略。
  2. 傳輸媒介的速率,這個最好選擇內網有線,比無線強太多了。

foreachPatition的日誌

使用foreachPatition,在參數的地方使用new ForeachPartitionFunction進行處理。
在Call函數裏面的日誌,通常是在相似子任務裏面,在這裏我調試了數次,找了半天個人日誌在哪裏打印。

執行命令

最後執行必定落實到下面的命令上,此時 --class的參數必定要包含你的main函數,對應的包名。

spark-submit  --master yarn --deploy-mode cluster ... --class package1.package2.classname  demo.jar

總結

爲了此次的嘗試,我基本上把網上的例子翻遍了,發現有不少是基於scala的,看來scala是大趨勢。同時相關的靠譜的帖子少於其餘話題的,估計搞大數據的相關的人仍是相對搞應用的不太多吧。推薦本書《Spark大數據分析 源碼解析與實例詳解》,對理解spark相關的知識頗有幫助。最後,歡迎有興趣的朋友一塊兒討論啊。

相關文章
相關標籤/搜索