spark+scala+spring整合提升搬磚效率

0.背景

爲何會想到把這三個整合在一塊兒? 固然是工做中遇到不舒服的地方。java

最近數據的需求特別多,有時候本身定位問題也常常要跑數據,一般就是spark+scala的常規畫風。雖然是提同一個jar包,但執行的每一個包的路徑都不同,這就致使我要不斷的去改腳本,很不舒服。提交spark job的畫風一般是這樣子的:git

spark-submit --cluster hadoop-spark2.0 \
	--class com.acceml.hit.User.PvCount \
    "xxx.jar" ${params1} ${params2} ${params3}

spark-submit --cluster hadoop-spark2.0 \
	--class com.acceml.hit.ShanghaiUser.UvCount \
    "xxx.jar" ${params1} ${params2} ${params3}

spark-submit --cluster hadoop-spark2.0 \
	--class com.acceml.hit.User.xxx.View \
    "xxx.jar" ${params1} ${params2} ${params3}

用spring整合了一下,提交一個job只要指定它執行的類名便可。以下,三條命令分別解析pv、uv、曝光...github

sh log_parser.sh PvCount 20180412
sh log_parser.sh UvCount 20180412
sh log_parser.sh View 20180412

1.實現

1.1 思路spring

說白了,這上面就是不須要指定包路徑,想讓程序根據類名執行相應的邏輯,利用控制反轉在spring中簡直再簡單不過了。java代碼以下:babel

@Service
public class TaskEngine {
    //定義task名字到Task的一個映射
    private final Map<String, Task> name2Task = new HashMap<>();
    //自動注入全部Task的子類,這裏task只是一個interface.
    @Autowired
    public TaskEngine(List<Task> tasks) {
        tasks.forEach(task -> name2Task.put(task.getClass().getSimpleName(), task));
    }
    public static void main(String[] args) throws Exception {
        //spring注入
        ApplicationContext appContext = new ClassPathXmlApplicationContext("applicationContext.xml");
        TaskEngine taskEngine = (TaskEngine) appContext.getBean("taskEngine");
        //根據指定參數跑job.
        taskEngine.name2Task.get(args[0]).runTask();
    }

因爲spark開發中我不喜歡用java,寫起來太冗長,雖然java8支持lambda表達式,可是java版本和spark兼容的問題可能又是一個坑,用scala寫job提交也能夠借鑑自動注入,而後根據參數(類名)去選擇提交哪一個job,就此開始了以下嘗試。app

1.2 scala+srping實現依賴注入的功能ide

一、定義一個scala的trait:oop

trait LogParser extends Serializable {
    def name(): String =  {
        this.getClass.getSimpleName
    }
    def run(params: ParseParams, sparkSession: SparkSession)
}

二、全部要執行的業務邏輯程序都實現它:this

@Component
class DemoParser extends LogParser {
    override def run(params: ParseParams, sparkSession: SparkSession): Unit = {
        //業務邏輯.
    }
}

三、定義一個LogparserFactory的Bean用spring的自動注入把LogParser全部子類注入進來。spa

@Service
class LogParserFactory {
    private var logParsers: java.util.List[LogParser] = _
    //自動注入全部子類.
    @Autowired
    def this(list: java.util.List[LogParser]) {
        this()
        logParsers = list
    }
}

四、spark主程序加載全部Bean,並選擇所須要的邏輯去執行.

//spring注入
val appContext = new ClassPathXmlApplicationContext("applicationContext.xml")
//獲取獲得相應的Bean
val logParserFactory = appContext.getBean("logParserFactory").asInstanceOf[LogParserFactory]
logParserFactory.getLogParsers()
      .filter(e => e.name().equals(className))
      .foreach(e => e.run(params, sparkSession))

五、新來了一個需求就啥都不用改,再寫一個類就能夠了.

@Component
class View extends LogParser {
    override def run(params: ParseParams, sparkSession: SparkSession): Unit = {
        //業務邏輯.
    }
}

2.遇到的坑

2.1 spring版本不統一.

其餘的依賴包裏面常常會依賴不一樣版本的spring,致使程序運行時報錯NoClassDefFoundError.這個時候用mvn dependency:tree查看一下。exclude掉其餘version的spring就行了。

2.2 spring的xsi規則配置

因爲spring是在spark集羣中跑,xsi有可能定義爲http形式可能獲取不到,因此指定到classpath的路徑下本地獲取:

xsi:schemaLocation="http://www.springframework.org/schema/beans
            classpath:/org/springframework/beans/factory/xml/spring-beans-4.1.xsd
http://www.springframework.org/schema/context
            classpath:/org/springframework/context/config/spring-context-4.1.xsd">

3.源碼

Acceml/offline_job_babel​github.com

4.約束

  • 類名不能重複,否則會報錯,這是spring的性質決定的.
相關文章
相關標籤/搜索