Spark系列-SparkSQL實戰

以前系統的計算大部分都是基於Kettle + Hive的方式,可是由於最近數據暴漲,不少Job的執行時間超過了1個小時,即便是在優化了HiveQL的狀況下也有超過30分鐘,因此近期把計算引擎從Hive變動爲Spark。html

普通的簡單Job就使用SparkSQL來計算,數據流是通過spark計算,把結果插入到Mysql中python

在項目中新建三個類,第一個Logger類用於日誌的輸出mysql

# coding=utf-8
import logging
from logging import handlers

class Logger(object):
    leven_relations = {
        'debug':logging.DEBUG,
        'info':logging.INFO,
        'warning': logging.WARNING,
        'error': logging.ERROR
    }

    def __init__(self, fileName, level='info', when='D', backCount=3, fmt='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'):
        self.logger = logging.getLogger(fileName)
        format_str = logging.Formatter(fmt)
        self.logger.setLevel(self.leven_relations.get(level))
        #屏幕日誌
        sh = logging.StreamHandler()
        sh.setFormatter(format_str)
        #文件日誌
        th = handlers.TimedRotatingFileHandler(filename=fileName, when=when, backupCount=backCount, encoding='utf-8')
        th.setFormatter(format_str)
        self.logger.addHandler(th)
        self.logger.addHandler(sh)

第二個是SparkSQL公共類,引用的是pysparksql

# coding=utf-8

from pyspark import SparkConf,SparkContext
from pyspark.sql import HiveContext

class SparkSqlCommon(object):
    sql_str = ''
    app_name = ''

    def __init__(self, sql, app_name):
        if sql is None:
            raise Exception('sql cannot be empty')
        self.sql_str = sql

        if app_name is None:
            raise Exception('app_name cannot be empty')
        self.app_name = app_name

    def execute(self):
        spark_conf = SparkConf().setAppName(self.app_name)
        spark_context = SparkContext(conf=spark_conf)
        spark_context.setLogLevel("INFO")
        hive_context = HiveContext(spark_context)
        result_rdd = hive_context.sql(self.sql_str)
        result = result_rdd.collect()
        return result

第三個是Mysql公共類,用於把計算結果落地到mysqlapp

# coding=utf-8

import pymysql
from com.randy.common.Logger import Logger

class DatacenterCommon(object):
    sql_str = ''
    jdbcHost = ''
    jdbcPort = ''
    jdbcSchema = ''
    jdbcUserName = ''
    jdbcPassword = ''

    def __init__(self, sql_str,log, jdbcHost = '127.0.0.1', jdbcPort = 3306, jdbcSchema = 'demo', jdbcUserName = 'root', jdbcPassword = '123456'):
        if sql_str is None:
            raise Exception('sql_str cannot be empty')

        self.sql_str = sql_str
        self.jdbcHost = jdbcHost
        self.jdbcPort = jdbcPort
        self.jdbcSchema = jdbcSchema
        self.jdbcUserName = jdbcUserName
        self.jdbcPassword = jdbcPassword
        self.log = log


    def execute(self):
        db = pymysql.connect(host=self.jdbcHost,
                             port=self.jdbcPort,
                             user=self.jdbcUserName,
                             passwd=self.jdbcPassword,
                             db=self.jdbcSchema,
                             charset='utf8')
        try:
            db_cursor = db.cursor()
            db_cursor.execute(self.sql_str)
            db.commit()
        except Exception, e:
            self.log.logger.error('str(e):\t\t', str(e))
            db.rollback()

 

調用的客戶端代碼以下python2.7

# coding=utf-8
# !/usr/bin/python2.7

import datetime
from com.randy.spark.Logger import Logger
from com.randy.spark.SparkSqlCommon import SparkSqlCommon
from com.randy.spark.DatacenterCommon import DatacenterCommon




#須要修改,每一個應用都不同
app_name = 'demo1'

# SparkSql(不能以分號結尾)
select_sql = '''
                  SELECT count(*) from futures.account
'''

# Mysql
insert_sql = '''
            insert into demo.demo1(id) values({0});
'''


if __name__ == '__main__':
    currentDay = datetime.datetime.now().strftime('%Y%m%d')
    log = Logger('/home/python-big-data-job/log/' + app_name + "_" + str(currentDay) + '.log')
    log.logger.info("**************************start invoke {0},{1} *****************".format(app_name,currentDay))

    sparkSqlCommon = SparkSqlCommon(sql=select_sql,app_name=app_name)
    selectResult = sparkSqlCommon.execute()
    log.logger.info("sparkSqlCommon result:{0}".format(selectResult))
    if selectResult is None:
        log.logger.error("taojin_1 selectResult while is empty")
    else:
        insert_sql = insert_sql.format(selectResult[0][0])
        log.logger.info(insert_sql)
        datacenterCommon = DatacenterCommon(sql_str=insert_sql, log=log)
        datacenterCommon.execute()

        log.logger.info("**************************end invoke {0},{1} *****************".format(app_name, currentDay))

其中spark-submit提交代碼以下:post

sudo -u hdfs spark-submit --master local[*] --py-files='/home/python-big-data-job/com.zip,/home/python-big-data-job/pymysql.zip' /home/python-big-data-job/taojin/demo1.py優化

由於項目中使用到了本地文件,全部把三個公共類打包到了com.zip中做爲依賴文件url

其中pymysql.zip是pymysql的源碼文件,由於我在過程當中發現了ImportError: No module named pymysqlspa

可是集羣已經使用pip安裝了pymysql,沒有找到有效解決辦法,按照https://zhuanlan.zhihu.com/p/43434216http://www.javashuo.com/article/p-dgnmrtwl-gh.html都無效,最終只能把pymysql以依賴文件的方式打包

 

其中使用yarn cluster部署也還存在問題

相關文章
相關標籤/搜索