爲何會想到把這三個整合在一塊兒? 固然是工做中遇到不舒服的地方。java
最近數據的需求特別多,有時候本身定位問題也常常要跑數據,一般就是spark+scala的常規畫風。雖然是提同一個jar包,但執行的每一個包的路徑都不同,這就致使我要不斷的去改腳本,很不舒服。提交spark job的畫風一般是這樣子的:git
spark-submit --cluster hadoop-spark2.0 \ --class com.acceml.hit.User.PvCount \ "xxx.jar" ${params1} ${params2} ${params3} spark-submit --cluster hadoop-spark2.0 \ --class com.acceml.hit.ShanghaiUser.UvCount \ "xxx.jar" ${params1} ${params2} ${params3} spark-submit --cluster hadoop-spark2.0 \ --class com.acceml.hit.User.xxx.View \ "xxx.jar" ${params1} ${params2} ${params3}
用spring整合了一下,提交一個job只要指定它執行的類名便可。以下,三條命令分別解析pv、uv、曝光...github
sh log_parser.sh PvCount 20180412 sh log_parser.sh UvCount 20180412 sh log_parser.sh View 20180412
1.1 思路spring
說白了,這上面就是不須要指定包路徑,想讓程序根據類名執行相應的邏輯,利用控制反轉在spring中簡直再簡單不過了。java代碼以下:babel
@Service public class TaskEngine { //定義task名字到Task的一個映射 private final Map<String, Task> name2Task = new HashMap<>(); //自動注入全部Task的子類,這裏task只是一個interface. @Autowired public TaskEngine(List<Task> tasks) { tasks.forEach(task -> name2Task.put(task.getClass().getSimpleName(), task)); } public static void main(String[] args) throws Exception { //spring注入 ApplicationContext appContext = new ClassPathXmlApplicationContext("applicationContext.xml"); TaskEngine taskEngine = (TaskEngine) appContext.getBean("taskEngine"); //根據指定參數跑job. taskEngine.name2Task.get(args[0]).runTask(); }
因爲spark開發中我不喜歡用java,寫起來太冗長,雖然java8支持lambda表達式,可是java版本和spark兼容的問題可能又是一個坑,用scala寫job提交也能夠借鑑自動注入,而後根據參數(類名)去選擇提交哪一個job,就此開始了以下嘗試。app
1.2 scala+srping實現依賴注入的功能ide
一、定義一個scala的trait:oop
trait LogParser extends Serializable { def name(): String = { this.getClass.getSimpleName } def run(params: ParseParams, sparkSession: SparkSession) }
二、全部要執行的業務邏輯程序都實現它:this
@Component class DemoParser extends LogParser { override def run(params: ParseParams, sparkSession: SparkSession): Unit = { //業務邏輯. } }
三、定義一個LogparserFactory的Bean用spring的自動注入把LogParser全部子類注入進來。spa
@Service class LogParserFactory { private var logParsers: java.util.List[LogParser] = _ //自動注入全部子類. @Autowired def this(list: java.util.List[LogParser]) { this() logParsers = list } }
四、spark主程序加載全部Bean,並選擇所須要的邏輯去執行.
//spring注入 val appContext = new ClassPathXmlApplicationContext("applicationContext.xml") //獲取獲得相應的Bean val logParserFactory = appContext.getBean("logParserFactory").asInstanceOf[LogParserFactory] logParserFactory.getLogParsers() .filter(e => e.name().equals(className)) .foreach(e => e.run(params, sparkSession))
五、新來了一個需求就啥都不用改,再寫一個類就能夠了.
@Component class View extends LogParser { override def run(params: ParseParams, sparkSession: SparkSession): Unit = { //業務邏輯. } }
2.1 spring版本不統一.
其餘的依賴包裏面常常會依賴不一樣版本的spring,致使程序運行時報錯NoClassDefFoundError.這個時候用mvn dependency:tree查看一下。exclude掉其餘version的spring就行了。
2.2 spring的xsi規則配置
因爲spring是在spark集羣中跑,xsi有可能定義爲http形式可能獲取不到,因此指定到classpath的路徑下本地獲取:
xsi:schemaLocation="http://www.springframework.org/schema/beans classpath:/org/springframework/beans/factory/xml/spring-beans-4.1.xsd http://www.springframework.org/schema/context classpath:/org/springframework/context/config/spring-context-4.1.xsd">
Acceml/offline_job_babelgithub.com