以前系統的計算大部分都是基於Kettle + Hive的方式,可是由於最近數據暴漲,不少Job的執行時間超過了1個小時,即便是在優化了HiveQL的狀況下也有超過30分鐘,因此近期把計算引擎從Hive變動爲Spark。html
普通的簡單Job就使用SparkSQL來計算,數據流是通過spark計算,把結果插入到Mysql中python
在項目中新建三個類,第一個Logger類用於日誌的輸出mysql
# coding=utf-8 import logging from logging import handlers class Logger(object): leven_relations = { 'debug':logging.DEBUG, 'info':logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR } def __init__(self, fileName, level='info', when='D', backCount=3, fmt='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'): self.logger = logging.getLogger(fileName) format_str = logging.Formatter(fmt) self.logger.setLevel(self.leven_relations.get(level)) #屏幕日誌 sh = logging.StreamHandler() sh.setFormatter(format_str) #文件日誌 th = handlers.TimedRotatingFileHandler(filename=fileName, when=when, backupCount=backCount, encoding='utf-8') th.setFormatter(format_str) self.logger.addHandler(th) self.logger.addHandler(sh)
第二個是SparkSQL公共類,引用的是pysparksql
# coding=utf-8 from pyspark import SparkConf,SparkContext from pyspark.sql import HiveContext class SparkSqlCommon(object): sql_str = '' app_name = '' def __init__(self, sql, app_name): if sql is None: raise Exception('sql cannot be empty') self.sql_str = sql if app_name is None: raise Exception('app_name cannot be empty') self.app_name = app_name def execute(self): spark_conf = SparkConf().setAppName(self.app_name) spark_context = SparkContext(conf=spark_conf) spark_context.setLogLevel("INFO") hive_context = HiveContext(spark_context) result_rdd = hive_context.sql(self.sql_str) result = result_rdd.collect() return result
第三個是Mysql公共類,用於把計算結果落地到mysqlapp
# coding=utf-8 import pymysql from com.randy.common.Logger import Logger class DatacenterCommon(object): sql_str = '' jdbcHost = '' jdbcPort = '' jdbcSchema = '' jdbcUserName = '' jdbcPassword = '' def __init__(self, sql_str,log, jdbcHost = '127.0.0.1', jdbcPort = 3306, jdbcSchema = 'demo', jdbcUserName = 'root', jdbcPassword = '123456'): if sql_str is None: raise Exception('sql_str cannot be empty') self.sql_str = sql_str self.jdbcHost = jdbcHost self.jdbcPort = jdbcPort self.jdbcSchema = jdbcSchema self.jdbcUserName = jdbcUserName self.jdbcPassword = jdbcPassword self.log = log def execute(self): db = pymysql.connect(host=self.jdbcHost, port=self.jdbcPort, user=self.jdbcUserName, passwd=self.jdbcPassword, db=self.jdbcSchema, charset='utf8') try: db_cursor = db.cursor() db_cursor.execute(self.sql_str) db.commit() except Exception, e: self.log.logger.error('str(e):\t\t', str(e)) db.rollback()
調用的客戶端代碼以下python2.7
# coding=utf-8 # !/usr/bin/python2.7 import datetime from com.randy.spark.Logger import Logger from com.randy.spark.SparkSqlCommon import SparkSqlCommon from com.randy.spark.DatacenterCommon import DatacenterCommon #須要修改,每一個應用都不同 app_name = 'demo1' # SparkSql(不能以分號結尾) select_sql = ''' SELECT count(*) from futures.account ''' # Mysql insert_sql = ''' insert into demo.demo1(id) values({0}); ''' if __name__ == '__main__': currentDay = datetime.datetime.now().strftime('%Y%m%d') log = Logger('/home/python-big-data-job/log/' + app_name + "_" + str(currentDay) + '.log') log.logger.info("**************************start invoke {0},{1} *****************".format(app_name,currentDay)) sparkSqlCommon = SparkSqlCommon(sql=select_sql,app_name=app_name) selectResult = sparkSqlCommon.execute() log.logger.info("sparkSqlCommon result:{0}".format(selectResult)) if selectResult is None: log.logger.error("taojin_1 selectResult while is empty") else: insert_sql = insert_sql.format(selectResult[0][0]) log.logger.info(insert_sql) datacenterCommon = DatacenterCommon(sql_str=insert_sql, log=log) datacenterCommon.execute() log.logger.info("**************************end invoke {0},{1} *****************".format(app_name, currentDay))
其中spark-submit提交代碼以下:post
sudo -u hdfs spark-submit --master local[*] --py-files='/home/python-big-data-job/com.zip,/home/python-big-data-job/pymysql.zip' /home/python-big-data-job/taojin/demo1.py優化
由於項目中使用到了本地文件,全部把三個公共類打包到了com.zip中做爲依賴文件url
其中pymysql.zip是pymysql的源碼文件,由於我在過程當中發現了ImportError: No module named pymysqlspa
可是集羣已經使用pip安裝了pymysql,沒有找到有效解決辦法,按照https://zhuanlan.zhihu.com/p/43434216和http://www.javashuo.com/article/p-dgnmrtwl-gh.html都無效,最終只能把pymysql以依賴文件的方式打包
其中使用yarn cluster部署也還存在問題