Spark-2.3.2【SparkStreaming+SparkSQL-實時儀表盤應用】

 應用場景:實時儀表盤(即大屏),每一個集團下有多個mall,每一個mall下包含多家shop,需實時計算集團下各mall及其shop的實時銷售分析(區域、業態、店鋪TOP、總銷售額等指標)並提供可視化展示,以前時候一直在Strom實現,如今改成Spark2.3.2實現。java

 

一、數據源:首先數據源來自於MQ、Socket、Flume和DFS等,通常Kafka、RocketMQ等居多,此處示例代碼用的是RocketMQ;mysql

二、實時計算框架:Storm(實時計算,Spout發射Tuple到各個Bolt,來一條即處理一條,一百毫秒之內的延遲)、SparkStreaming(準實時計算,基於微批次的實時計算,即必定時間段內的micro batch,每一個micro batch的結構爲DStream,底層是RDD);web

三、此處Spark Streaming準實時處理應用流程:一、RocketMQ --> 二、SparkStreaming --> 三、SparkSQL(Parquet) --> 四、Redis(大屏使用) && HDFS(Hive數倉ODS層,ODS->DW[DWD-DWS]-DM);redis

四、系統設計sql

 

五、代碼以下(4.3是Spark Streaming實現,複製粘貼便可運行):mongodb

  • 5.一、RocketMQConfig類(用於配置和構建MQ消費者)
  • 5.二、SparkStreaming自定義RocketMQ接收器(可靠的接收器)
  • 5.三、SparkStreaming銷售分析實時計算
  • 5.四、帳單實體類
  • 5.五、BCD即帳單裁剪後的實體類(減小數據量傳輸即下降節點間的序列化和反序列化開銷)
  • 5.六、SparkSQL銷售分析業務實現類

5.一、RocketMQConfig類(用於配置和構建MQ消費者)數據庫

package com.mengyao.graph.etl.apps.commons.datasource.mq.receiver;

import org.apache.commons.lang.Validate;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.remoting.common.RemotingUtil;

import java.util.HashMap;
import java.util.UUID;

/**
 * RocketMQConfig for Consumer
 * @author mengyao
 * 
 */
public class RocketMQConfig {
    
    // ------- the following is for common usage -------
    /**
     * RocketMq name server address
     */
    public static final String NAME_SERVER_ADDR = "nameserver.addr"; // Required

    public static final String CLIENT_NAME = "client.name";

    public static final String CLIENT_IP = "client.ip";
    public static final String DEFAULT_CLIENT_IP = RemotingUtil.getLocalAddress();

    public static final String CLIENT_CALLBACK_EXECUTOR_THREADS = "client.callback.executor.threads";
    public static final int DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS = Runtime.getRuntime().availableProcessors();;

    public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval";
    public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 seconds

    public static final String BROKER_HEART_BEAT_INTERVAL = "brokerserver.heartbeat.interval";
    public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds


    // ------- the following is for push consumer mode -------
    /**
     * RocketMq consumer group
      */
    public static final String CONSUMER_GROUP = "consumer.group"; // Required

    /**
     * RocketMq consumer topic
     */
    public static final String CONSUMER_TOPIC = "consumer.topic"; // Required

    public static final String CONSUMER_TAG = "consumer.tag";
    public static final String DEFAULT_TAG = "*";

    public static final String CONSUMER_OFFSET_RESET_TO = "consumer.offset.reset.to";
    public static final String CONSUMER_OFFSET_LATEST = "latest";
    public static final String CONSUMER_OFFSET_EARLIEST = "earliest";
    public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp";

    public static final String CONSUMER_MESSAGES_ORDERLY = "consumer.messages.orderly";

    public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval";
    public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds

    public static final String CONSUMER_MIN_THREADS = "consumer.min.threads";
    public static final int DEFAULT_CONSUMER_MIN_THREADS = 20;

    public static final String CONSUMER_MAX_THREADS = "consumer.max.threads";
    public static final int DEFAULT_CONSUMER_MAX_THREADS = 64;


    // ------- the following is for reliable Receiver -------
    public static final String QUEUE_SIZE = "spout.queue.size";
    public static final int DEFAULT_QUEUE_SIZE = 500;

    public static final String MESSAGES_MAX_RETRY = "spout.messages.max.retry";
    public static final int DEFAULT_MESSAGES_MAX_RETRY = 3;

    public static final String MESSAGES_TTL = "spout.messages.ttl";
    public static final int DEFAULT_MESSAGES_TTL = 300000;  // 5min


    // ------- the following is for pull consumer mode -------

    /**
     * Maximum rate (number of records per second) at which data will be read from each RocketMq partition ,
     * and the default value is "-1", it means consumer can pull message from rocketmq as fast as the consumer can.
     * Other that, you also enables or disables Spark Streaming's internal backpressure mechanism by the config
     *  "spark.streaming.backpressure.enabled".
     */
    public static final String  MAX_PULL_SPEED_PER_PARTITION = "pull.max.speed.per.partition";

    /**
     * To pick up the consume speed, the consumer can pull a batch of messages at a time. And the default
     * value is "32"
     */
    public static final String PULL_MAX_BATCH_SIZE = "pull.max.batch.size";

    /**
     * pull timeout for the consumer, and the default time is "3000".
     */
    public static final String PULL_TIMEOUT_MS = "pull.timeout.ms";

    // the following configs for consumer cache
    public static final String PULL_CONSUMER_CACHE_INIT_CAPACITY = "pull.consumer.cache.initialCapacity";
    public static final String PULL_CONSUMER_CACHE_MAX_CAPACITY = "pull.consumer.cache.maxCapacity";
    public static final String PULL_CONSUMER_CACHE_LOAD_FACTOR = "pull.consumer.cache.loadFactor";


    public static void buildConsumerConfigs(HashMap<String, String> props, DefaultMQPushConsumer consumer) {
        buildCommonConfigs(props, consumer);

        String group = props.get(CONSUMER_GROUP);
        Validate.notEmpty(group);
        consumer.setConsumerGroup(group);

        consumer.setPersistConsumerOffsetInterval(getInteger(props,
                CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
        consumer.setConsumeThreadMin(getInteger(props,
                CONSUMER_MIN_THREADS, DEFAULT_CONSUMER_MIN_THREADS));
        consumer.setConsumeThreadMax(getInteger(props,
                CONSUMER_MAX_THREADS, DEFAULT_CONSUMER_MAX_THREADS));

        String initOffset = props.get(CONSUMER_OFFSET_RESET_TO) != null ? props.get(CONSUMER_OFFSET_RESET_TO) : CONSUMER_OFFSET_LATEST;
        switch (initOffset) {
            case CONSUMER_OFFSET_EARLIEST:
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                break;
            case CONSUMER_OFFSET_LATEST:
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
                break;
            case CONSUMER_OFFSET_TIMESTAMP:
                consumer.setConsumeTimestamp(initOffset);
                break;
            default:
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        }

        String topic = props.get(CONSUMER_TOPIC);
        Validate.notEmpty(topic);
        try {
            consumer.subscribe(topic, props.get(CONSUMER_TAG) != null ? props.get(CONSUMER_TAG) : DEFAULT_TAG);
        } catch (MQClientException e) {
            throw new IllegalArgumentException(e);
        }
    }

    public static void buildCommonConfigs(HashMap<String, String> props, ClientConfig client) {
        String namesvr = props.get(NAME_SERVER_ADDR);
        Validate.notEmpty(namesvr);
        client.setNamesrvAddr(namesvr);

        client.setClientIP(props.get(CLIENT_IP) != null ? props.get(CLIENT_IP) : DEFAULT_CLIENT_IP);
        // use UUID for client name by default
        String defaultClientName = UUID.randomUUID().toString();
        client.setInstanceName(props.get(CLIENT_NAME) != null ? props.get(CLIENT_NAME) : defaultClientName);

        client.setClientCallbackExecutorThreads(getInteger(props,
                CLIENT_CALLBACK_EXECUTOR_THREADS, DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS));
        client.setPollNameServerInterval(getInteger(props,
                NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL));
        client.setHeartbeatBrokerInterval(getInteger(props,
                BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));
    }

    public static int getInteger(HashMap<String, String> props, String key, int defaultValue) {
        return Integer.parseInt(props.get(key) != null ? props.get(key) : String.valueOf(defaultValue));
    }

    public static boolean getBoolean(HashMap<String, String> props, String key, boolean defaultValue) {
        return Boolean.parseBoolean(props.get(key) != null ? props.get(key) : String.valueOf(defaultValue));
    }

}

4.二、SparkStreaming自定義RocketMQ接收器(可靠的接收器)apache

package com.mengyao.graph.etl.apps.commons.datasource.mq.receiver;

import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
//import org.apache.commons.lang3.SerializationUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;

import com.gooagoo.entity.mq.bill.MQBillMessage;
import com.mengyao.graph.etl.apps.commons.beans.ods.fmt.BillInfoFmt;
import com.mengyao.graph.etl.apps.commons.beans.ods.fmt.ConvertTool;
import com.mengyao.graph.etl.apps.dashboard.beans.BCD;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.HashMap;
import java.util.List;

/**
 * RocketMQ Receiver
 * @author mengyao
 *
 */
public class RocketMQReceiver extends Receiver<BCD> {
    
    /**
     * 
     */
    private static final long serialVersionUID = 2274826339951693341L;
    private MQPushConsumer consumer;
    private boolean ordered;
    private HashMap<String, String> conf;
    

    public RocketMQReceiver(HashMap<String, String> conf, StorageLevel storageLevel) {
        super(storageLevel);
        this.conf = conf;
    }

    @Override
    public void onStart() {
        Validate.notEmpty(conf, "Consumer properties can not be empty");
        ordered = RocketMQConfig.getBoolean(conf, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, true);
        consumer = new DefaultMQPushConsumer();
        RocketMQConfig.buildConsumerConfigs(conf, (DefaultMQPushConsumer)consumer);
        if (ordered) {
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    if (process(msgs)) {
                        return ConsumeOrderlyStatus.SUCCESS;
                    } else {
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }
            });
        } else {
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    if (process(msgs)) {
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    } else {
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            });
        }
        try {
            consumer.start();
            System.out.println("==== RocketMQReceiver start ====");
        } catch (MQClientException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    public boolean process(List<MessageExt> msgs) {
        if (msgs.isEmpty()) {
            System.out.println("==== Msgs is null! ====");
            return true;
        }
        System.out.println("==== receiver msgs: "+msgs.size()+" record. ====");
        try {
            for (MessageExt messageExt : msgs) {
                //MQBillMessage message = SerializationUtils.deserialize(messageExt.getBody());
                MQBillMessage message = deserialize(messageExt.getBody());
                if (null != message) {
                    BillInfoFmt billFmt = ConvertTool.convertGagBillToOdsBillFmt(message.getData());
                    if (validBillType(billFmt)) {
               /**
                * this.store(BCD) 簡單的接收一條存儲一條,缺點是沒有確認機制,不具有容錯保證,會出現數據丟失。優勢則是效率更高。
                * this.store(Iterator<BCD>)阻塞調用,當接收到的記錄都存儲到Spark後纔會確認成功,當接收方採用複製(默認存儲級別爲複製)則在複製完成後確認成功,但在緩衝中的數據不被保證,會被從新發送。具有容錯保證,可確保數據0丟失。
                */
this.store(new BCD(billFmt.getId(), billFmt.getReceivableAmount(), billFmt.getSaleTime(), billFmt.getBillType(), billFmt.getShopId(), billFmt.getShopEntityId())); } billFmt=null; message.setData(null); message = null; } else { System.out.println("==== receiver msg is:"+messageExt.getBody()+", deserialize faild. ===="); } } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 驗證帳單類型是否爲一、三、6 * @param bean * @return */ static boolean validBillType(BillInfoFmt bean) { if (null == bean) { System.out.println("==== BillBean is null! ===="); return false; } String billType = bean.getBillType(); if (StringUtils.isEmpty(billType)) { System.out.println("==== BillBean.billType is null! ===="); return false; } String billTypeTrim = billType.trim(); return billTypeTrim.equals("1") || billType.equals("3") || billType.equals("6"); } @Override public void onStop() { consumer.shutdown(); System.out.println("==== RocketMQReceiver stop ===="); } private static MQBillMessage deserialize(byte[] body) throws Exception { if (body == null) { throw new IllegalArgumentException("The byte array must not be null"); } MQBillMessage message = null; ObjectInputStream in = null; ByteArrayInputStream bais = null; try { bais = new ByteArrayInputStream(body); in = new ObjectInputStream(bais); message = (MQBillMessage) in.readObject(); } catch (final ClassNotFoundException ex) { ex.printStackTrace(); throw ex; } catch (final IOException ex) { ex.printStackTrace(); throw ex; } finally { try { if (bais != null) { bais.close(); } if (in != null) { in.close(); } } catch (final IOException ex) { // ignore close exception } } return message; } }

 

4.三、SparkStreaming銷售分析實時計json

package com.mengyao.graph.etl.apps.commons.datasource.bill;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang.Validate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.api.java.function.Function0;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.LongAccumulator;
import org.apache.spark.util.SizeEstimator;
import org.apache.spark.streaming.Time;

import com.mengyao.graph.etl.apps.commons.beans.dim.RuianMall;
import com.mengyao.graph.etl.apps.commons.datasource.mq.receiver.RocketMQConfig;
import com.mengyao.graph.etl.apps.commons.datasource.mq.receiver.RocketMQReceiver;
import com.mengyao.graph.etl.apps.dashboard.beans.BCD;
import com.mengyao.graph.etl.apps.dashboard.service.SaleAnalysisService;

/**
 * BillConsumer 大屏實時計算
 *         一、hdfs dfs -rm -r hdfs://bd001:8020/data/consumer/bill/checkpoint/*
 *         二、hdfs dfs -rm -r hdfs://bd001:8020/data/dashboard/ruian/sdt=curTime
 *         三、spark-submit --class com.mengyao.graph.etl.apps.commons.datasource.bill.BillConsumer --master yarn --deploy-mode cluster --driver-memory 4g --executor-cores 6 --executor-memory 5g --queue default --verbose data-graph-etl.jar
 * 
 * @author mengyao
 *
 */
public class BillConsumer {

    private static final ThreadLocal<SimpleDateFormat> FORMATTER_YMD = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMdd"));
    private static final ThreadLocal<SimpleDateFormat> FORMATTER_YMDHMS = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMddHHmmss"));
    private static final ThreadLocal<SimpleDateFormat> FORMATTER_YMDHMSS = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMddHHmmssSSS"));
    private static final String BASE_PATH = "hdfs://bd001:8020/data/dashboard/ruian/";
    private static final String TMP_PATH = "/merge";
    private static String appName = "BillConsumer";
    private static String logLevel = "ERROR";

    
    public static void main(String[] args) {
        args = new String[] {"hdfs://bd001:8020/data/consumer/bill/checkpoint", "10", "base.mq.goo.com:9877", "Bill", "bill_dev_group_00013"};
        if (args.length < 5) {
            System.err.println("Usage: "+appName+" <checkpointDir> <milliseconds> <namesrvAddr> <groupId> <topic>");
            System.exit(1);
        }
        
        String checkPointDir = args[0];
        int second = Integer.parseInt(args[1]);
        String namesrv = args[2];
        Validate.notNull(namesrv, "RocketMQ namesrv not null!");
        String topic = args[3];
        Validate.notNull(topic, "RocketMQ topic not null!");
        String group = args[4];
        Validate.notNull(group, "RocketMQ group not null!");
        
        Function0<JavaStreamingContext> jscFunc = () -> createJavaSparkStreamingContext(checkPointDir, second, namesrv, topic, group);
        JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(checkPointDir, jscFunc, new Configuration());
        jssc.sparkContext().setLogLevel(logLevel);
        
        try {
            jssc.start();
            jssc.awaitTermination();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            jssc.stop(false, true);//關閉StreamingContext但不關閉SparkContext,同時等待數據處理完成
        }
    }
    
    /**
     * 獲取當前時間yyyyMMdd,匹配帳單數據saleTime
     * @param curTime
     * @return
     */
    static String getCurrentDate(long curTime) {
        return FORMATTER_YMD.get().format(new Date(curTime));
    }
    
    /**
     * 打印bill RDD<BCD>的分區及佔用空間大小,debug方法
     * @param rdd
     * @param time
     */
    static void printStream(JavaRDD<BCD> rdd, Time time) {rdd.id();
        System.out.println("==== time: "+FORMATTER_YMDHMSS.get().format(new Date(time.milliseconds()))+", rdd: partitions="+rdd.getNumPartitions()+", space="+SizeEstimator.estimate(rdd)/1048576+"mb");
    }

    /**
     * 合併parquet小文件
     * @param session
     * @param dfsDir
     */
    static void mergeSmallFiles(Dataset<Row> fulls, SparkSession session, String dfsDir) {
        fulls.coalesce(1).write().mode(SaveMode.Overwrite).parquet(BASE_PATH+TMP_PATH);
        session.read().parquet(BASE_PATH+TMP_PATH).coalesce(1).write().mode(SaveMode.Overwrite).parquet(dfsDir);
    }
    
    /**
     * 建立DFS Dir
     * @param session
     * @param dfsDir
     */
    static void mkDfsDir(SparkSession session, String dfsDir) {
        FileSystem fs = null;
        try {
            fs = FileSystem.get(session.sparkContext().hadoopConfiguration());
            Path path = new Path(dfsDir);
            if(!fs.exists(path)) {
                fs.mkdirs(path);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != fs) {fs.close();}
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    /**
     * 容錯Driver
     * @param checkPointDir
     * @param second
     * @param namesrv
     * @param topic
     * @param group
     * @return
     */
    static JavaStreamingContext createJavaSparkStreamingContext(String checkPointDir, int second, String namesrv, String topic, String group) {
        try {
            SparkConf conf = new SparkConf()
                    //==== Enable Back Pressure
                    .set("spark.streaming.backpressure.enabled", "true")//啓用被壓機制
                    .set("spark.streaming.backpressure.initialRate", "50000")//初始接收數據條數,如該值爲空時使用spark.streaming.backpressure.initialRate爲默認值
                    .set("spark.streaming.receiver.maxRate", "100")//每秒接收器可接收的最大記錄數
                    //==== Enable Dynamic Resource Allocation
                    .set("spark.dynamicAllocation.enabled", "false")//禁用spark動態資源分配
                    .set("spark.streaming.dynamicAllocation.enabled", "true")//啓用SparkStreaming動態資源分配,該配置和spark動態資源分配存在衝突,只能使用一個
                    .set("spark.streaming.dynamicAllocation.minExecutors", "2")//啓用SparkStreaming動態資源分配後的給應用使用的最小executor數
                    .set("spark.streaming.dynamicAllocation.maxExecutors", "3")//啓用SparkStreaming動態資源分配後的給應用使用的最大executor數
                    //==== Spark Streaming Parallelism and WAL
                    .set("spark.streaming.concurrentJobs", "1")//並行job數量,默認1
                    .set("spark.streaming.blockInterval", "5000")//SparkStreaming接收器接收數據後5000毫秒生成block,默認200毫秒
                    .set("spark.streaming.receiver.writeAheadLog.enable", "true")//開啓SparkStreaming接收器的WAL來確保接收器實現至少一次的容錯語義
                    .set("spark.streaming.driver.writeAheadLog.allowBatching", "true")//driver端WAL
                    .set("spark.streaming.driver.writeAheadLog.batchingTimeout", "15000")
                    //==== Hive on Spark
                    .set("hive.execution.engine", "spark")//設置hive引擎爲spark,hdp-2.6.1.0默認支持tez、mr,可經過應用級別配置使用Hive on Spark
                    .set("hive.enable.spark.execution.engine", "true")//啓用Hive on Spark
                    .set("spark.driver.extraJavaOptions", "-Dhdp.version=2.6.1.0-129")//hdp-2.6.1.0中要求Hive on Spark必須指定driver的jvm參數
                    .set("spark.yarn.am.extraJavaOptions", "-Dhdp.version=2.6.1.0-129")//hdp-2.6.1.0中要求Hive on Spark必須指定ApplicationMaster的jvm參數
                    //==== Hive Merge Small Files
                    .set("hive.metastore.uris", "thrift://bd001:9083")//hive ThriftServer
                    .set("hive.merge.sparkfiles", "true")//合併spark小文件
                    .set("hive.merge.mapfiles", "true")//在只有map任務的做業結束時合併小文件。
                    .set("hive.merge.mapredfiles", "true")//在mapreduce做業結束時合併小文件。
                    .set("hive.merge.size.per.task", "268435456")//做業結束時合併文件的大小。
                    .set("hive.merge.smallfiles.avgsize", "100000")//看成業的平均輸出文件大小小於此數量時,Hive將啓動另外一個map-reduce做業,以將輸出文件合併爲更大的文件。若是hive.merge.mapfiles爲true,則僅對僅map做業執行此操做;對於hive.merge.mapredfiles爲true,僅對map-reduce做業執行此操做。
                    //==== Spark SQL Optimizer
                    .set("spark.sql.warehouse.dir", "hdfs://bd001:8020/apps/hive/warehouse")//SparkSQL依賴的hive倉庫地址
                    .set("spark.sql.files.maxPartitionBytes", "134217728")//SparkSQL讀取文件數據時打包到一個分區的最大字節數
                    .set("spark.sql.files.openCostInBytes", "134217728")//當SparkSQL讀取的文件中有大量小文件時,小於該值的文件將被合併處理,默認4M,此處設置爲128M
                    .set("spark.sql.shuffle.partitions", "600")//SparkSQL運行shuffle的並行度
                    .set("spark.sql.autoBroadcastJoinThreshold", "67108864")//設置爲64M,執行join時自動廣播小於該值的表,默認10M
                    //==== Spark Core Configure
                    .set("spark.rdd.compress","true")//開啓rdd壓縮以節省內存
                    .set("spark.default.parallelism", "600")//並行任務數
                    .set("spark.rpc.askTimeout", "300")//spark rpc超時時間
                    .set("spark.eventLog.enabled", "true")//開啓eventLog
                    //==== Application Configure
                    .set("spark.app.name", appName)//Spark Application名稱
                    .set("spark.master", "yarn")//運行模式爲Spark on YARN
                    .set("spark.deploy.mode", "cluster")//部署模式爲yarn-cluster
                    .set("spark.driver.memory", "4g")//driver內存4g
                    .set("spark.driver.cores", "1")//driver計算vcore數量爲1
                    .set("spark.executor.memory", "5g")//executor內存爲4g
                    .set("spark.executor.heartbeatInterval", "20000")//executor心跳間隔20秒,默認10秒
                    .set("spark.yarn.archive", "hdfs://bd001:8020/hdp/apps/2.6.1.0-129/spark2")//spark依賴jar存檔到hdfs指定位置
                    .set("spark.executor.extraJavaOptions", "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC")//打印GC詳情和耗時
                    .set("spark.jars", "/usr/hdp/2.6.1.0-129/sqoop/lib/mysql-connector-java.jar")//若是使用了數據庫驅動,則經過此配置便可
                    //==== Serialized Configure
                    .set("spark.kryoserializer.buffer", "512k")//默認64k,設置爲256k
                    .set("spark.kryoserializer.buffer.max", "256m")//默認64m,設置爲256m
                    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//使用kryo序列化庫
                    .registerKryoClasses(new Class[]{HashMap.class, BCD.class})
                    ;
            
            //構建MQ配置
            @SuppressWarnings("serial")
            HashMap<String, String> mqConf = new HashMap<String, String>() {{
                put(RocketMQConfig.NAME_SERVER_ADDR, namesrv);
                put(RocketMQConfig.CONSUMER_TOPIC, topic);
                put(RocketMQConfig.CONSUMER_GROUP, group);
            }};
            
            JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(second));
            jssc.checkpoint(checkPointDir);
            jssc.remember(Durations.minutes(1440));
            
            //接収RocketMQ帳單
            JavaReceiverInputDStream<BCD> billListRDD = jssc.receiverStream(new RocketMQReceiver(mqConf, StorageLevels.MEMORY_AND_DISK_SER));
            
            billListRDD
                .foreachRDD((rdd, time) -> {
                    boolean isEmpty = rdd.partitions().isEmpty();
                    if (!isEmpty) {
                        printStream(rdd, time);
                        
                        long start = System.currentTimeMillis();
                        String curTime = getCurrentDate(start).intern();
                        String dfsDir = (BASE_PATH+"sdt="+curTime+"/").intern();
                        
                        //帳單計數器
                        //LongAccumulator billAccumulator = BillAccumulator.getInstance(JavaSparkContext.fromSparkContext(rdd.context()), appName);
                        //billAccumulator.add(rdd.count());
                        
                        //初始化SparkSession
                        SparkSession session = HiveSession.getInstance(conf);
                        
                        //維表等廣播
                        BroadcastDIM dim = BroadcastWrapper.getInstance(JavaSparkContext.fromSparkContext(rdd.context())).getValue();
                        SaleAnalysisService service = dim.getService();
                        Dataset<Row> shop = dim.getShop();
                        Dataset<Row> type = dim.getType();
                        Dataset<Row> ruian = dim.getRuian();
                        String mallIdStr = dim.getMallIdStr();
                        
                        Set<String> areaSet=dim.getAreaSet();
                        Map<String, Set<String>> areaMall=dim.getAreaMall();
                        Set<String> mallSet=dim.getMallSet();
                        Set<String> typeSet=dim.getTypeSet();
                        
                        
                        //若是時間爲00:00:00時則認爲是新的一天,更新廣播數據
                        if ((curTime+"000000").equals(FORMATTER_YMDHMS.get().format(new Date(time.milliseconds())))) {
                            BroadcastWrapper.update(session, JavaSparkContext.fromSparkContext(rdd.context()));//更新dim表數據
                            mkDfsDir(session, dfsDir);//初始化dfsDir
                        }
                        
                        //先持久化本次接收到的帳單(寫入當日)
                        session.createDataFrame(rdd, BCD.class)
                            .filter("sdt = "+curTime)
                            .write()
                            .mode("append")
                            .parquet(dfsDir);
                        
                        //再讀取全量帳單(讀取當日)
                        Dataset<Row> bills = session.read().parquet(dfsDir)
                            .filter("shopId in ("+mallIdStr+")")
                            .dropDuplicates("billId")
                            //.coalesce(1)
                            .cache();
                        
                        //計算大屏指標
                        System.out.println("==== 計算指標:時間條件:"+curTime+" ====");
                        service.totalSale(bills);
                        service.totalRefund(bills);
                        service.peakTime(bills);
                        service.areaSaleTrendForAll(bills, ruian,areaSet, curTime);
                        service.projectContrast(bills, ruian,areaMall);
                        service.saleForShopTop10(bills, shop, ruian);
                        service.projectTypeSaleContrast(bills, shop, type, ruian,areaSet,mallSet,typeSet);
                        long end = System.currentTimeMillis();
                        System.out.println("==== 計算指標:耗時:"+(end-start)+"/ms ====");
                        bills.unpersist();
                        //天天最多存6個文件
                        if (bills.inputFiles().length > 6) {
                            mergeSmallFiles(bills, session, dfsDir);
                        }
                    } else {//若是SparkStreaming接收的Batch爲空,則不作處理
                        System.out.println("==== rdd is null! ");
                    }
                });
            return jssc;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

}

/**
 * 廣播DIM相關數據
 * @author mengyao
 *
 */
class BroadcastWrapper {
    private static volatile Broadcast<BroadcastDIM> instance = null;
    
    public static Broadcast<BroadcastDIM> getInstance(JavaSparkContext jsc) {
        if (instance == null) {
            synchronized (BroadcastWrapper.class) {
                if (instance == null) {
                    SparkSession session = HiveSession.getInstance(jsc.getConf());
                    BroadcastDIM dim = new BroadcastDIM(session);
                    dim.assign();
                    instance = jsc.broadcast(dim);
                }
            }
        }
        return instance;
    }
    
    /**
     * 每日更新數據
     * @param batchTime    batch發生時間
     * @param dayES    每日開始時間
     */
    public static void update(SparkSession session, JavaSparkContext jsc) {
        BroadcastDIM dim = instance.getValue();
        if (null!=dim) {
            dim.assign();
            jsc.broadcast(dim);
        }
    }
}

class BroadcastDIM {
    private SparkSession session;
    private SaleAnalysisService service = new SaleAnalysisService();
    private Dataset<Row> shop;
    private Dataset<Row> type;
    private Dataset<Row> ruian;
    private String mallIdStr;
    private List<RuianMall> ruianList ;
    private Set<String>  areaSet;
    private Set<String>  typeSet;
    private Set<String> mallSet;
    private Map<String, Set<String>> areaMall;
    public BroadcastDIM(SparkSession session) {
        this.session = session;
    }
    public void assign() {
        ruian = session.sql("select id,item,name,area_en,area_cn,channel,mid,rmid from tbl_dim_ruian").cache();
        Row[] ruianRows = (Row[])ruian.collect();
        setRuianList(ruianRows);
        setAreaSet();
        setMallIdStr();
        setRuianMallAll();
        setMallSet();
        shop = session.sql("select id,shop_entity_id,shop_entity_name,shop_id,shop_entity_type_root,shop_entity_type_leaf,bill_match_mode,leasing_model,shop_entity_status,open_time,close_time,open_time_list,close_time_list,monite_begin_time_list,monite_end_time_list,marketing_copywriter,marketing_image,font_style,font_size,contract_area,province,city,area,billhint,is_del,brand,brand_code,classify,in_aera,storey,leasing_resource,shop_entity_source,create_time,c_time_stamp,shop_entity_img,business_area_id,source,status,logo,door_head_photo,business_license,certificate,coordinates,consume_per,brand_name,brand_log,alipay,process  "
                + "from tbl_ods_shop where shop_id in ("+mallIdStr+")").cache();
        type = session.sql("select * from tbl_ods_type").cache();
        setRuianType();
    }
    public void  setRuianList(Row[] rows) {
        if(rows.length>0){
            ruianList=new ArrayList<>();
            for(Row row:rows){
                RuianMall rm=new RuianMall();
                if(!row.isNullAt(0)){//id
                    rm.setId(row.getInt(0));
                }
                if(!row.isNullAt(1)){//item
                    rm.setItem(row.getString(1));
                }
                if(!row.isNullAt(2)){//name
                    rm.setName(row.getString(2));
                }
                if(!row.isNullAt(3)){//area_en
                    rm.setAreaEn(row.getString(3));
                }
                if(!row.isNullAt(4)){//area_cn
                    rm.setArenCn(row.getString(4));
                }
                if(!row.isNullAt(5)){//channel
                    rm.setChannel(row.getString(5));
                }
                if(!row.isNullAt(6)){//mid
                    rm.setMid(row.getInt(6));
                }
                if(!row.isNullAt(7)){//rmid
                    rm.setRmid(row.getString(7));
                }
                ruianList.add(rm);
            }
        }
    }
    /**
     * 提取rmid 拼接成字符串
     * @param rows
     */
    public void setMallIdStr() {
        if(ruianList.size()>0){
            StringBuilder sbStr=new StringBuilder();
            for(RuianMall row : ruianList){
                sbStr.append("'").append(row.getRmid()).append("',");
            }
            String tmpValue = sbStr.toString();
            mallIdStr=tmpValue.substring(0, tmpValue.length()-1);
        }
    }
    /**
     * 提取非空惟一中文區域名稱 
     * @return
     */
    public void setAreaSet() {
        if(ruianList.size()>0){
            areaSet=new java.util.HashSet<>();
            for(RuianMall row:ruianList){
                areaSet.add(row.getArenCn());
            }
        }
    }
    /**
     * 提取瑞安mall 14個機構中文名
     * 
     * @return
     * Map<String,Set<String>>
     * key:areaCn value :  Set<mallName>
     */
    public void setRuianMallAll( ){
        areaMall=new HashMap<>();
        ruianList.forEach(rm->{
            if(areaMall.containsKey(rm.getArenCn())){//存在,更新mall的列表
                areaMall.get(rm.getArenCn()).add(rm.getName());
            }else{//新的區域,新的mall
                Set<String> mallSet=new HashSet<>();
                mallSet.add(rm.getName());
                areaMall.put(rm.getArenCn(),mallSet);
            }
        });
    }
    /**
     * 提取瑞安mall 14個機構中文名
     * 
     * @return
     */
    public void setMallSet(){
        mallSet=new java.util.HashSet<>();
        ruianList.forEach(rm->{
            if(!mallSet.contains(rm.getName())){//存在,更新mall的列表
                mallSet.add(rm.getName());
            }
        });
    }
    /**
     * 性能、性能、性能 考慮,先寫死
     * 若是用帳單、店鋪、業態關聯查詢,效率會很慢
     * 
     * @return
     */
    public void setRuianType(){
        //TODO 目前寫死,須要改
        typeSet=new HashSet<>();
        typeSet.add("服務");
        typeSet.add("零售");
        typeSet.add("娛樂");
        typeSet.add("主力店");
        typeSet.add("餐飲");
        typeSet.add("其它");
    }
    public SaleAnalysisService getService() {
        return service;
    }
    public void setService(SaleAnalysisService service) {
        this.service = service;
    }
    public Dataset<Row> getShop() {
        return shop;
    }
    public void setShop(Dataset<Row> shop) {
        this.shop = shop;
    }
    public Dataset<Row> getType() {
        return type;
    }
    public void setType(Dataset<Row> type) {
        this.type = type;
    }
    public Dataset<Row> getRuian() {
        return ruian;
    }
    public void setRuian(Dataset<Row> ruian) {
        this.ruian = ruian;
    }
    public String getMallIdStr() {
        return mallIdStr;
    }
    public void setMallIdStr(String mallIdStr) {
        this.mallIdStr = mallIdStr;
    }
    public List<RuianMall> getRuianList() {
        return ruianList;
    }
    public Set<String> getAreaSet() {
        return this.areaSet;
    }
    public Map<String, Set<String>> getAreaMall() {
        return this.areaMall;
    }
    public Set<String> getMallSet() {
        return this.mallSet;
    }
    public Set<String> getTypeSet() {
        return this.typeSet;
    }
}

/**
 * 帳單累加器
 * @author mengyao
 *
 */
class BillAccumulator {
    private static volatile LongAccumulator instance = null;

    public static LongAccumulator getInstance(JavaSparkContext jsc, String name) {
        if (instance == null) {
            synchronized (BillAccumulator.class) {
                if (instance == null) {
                    instance = jsc.sc().longAccumulator(name);
                }
            }
        }
        return instance;
    }
}

/**
 * SparkSQL的Hive數據源
 * @author mengyao
 *
 */
class HiveSession {
    private static transient SparkSession instance = null;
    
    public static SparkSession getInstance(SparkConf conf) {
        if (instance == null) {
            instance = SparkSession.builder()
                    .config(conf)
                    .enableHiveSupport()
                    .getOrCreate();
        }
        return instance;
    }
}

 

4.四、帳單實體類api

package com.mengyao.graph.etl.apps.commons.beans.ods.fmt;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;

import com.mengyao.graph.etl.apps.commons.beans.ods.BillInfo;
import com.mengyao.graph.etl.apps.commons.beans.ods.DiscountDetailsInfo;
import com.mengyao.graph.etl.apps.commons.beans.ods.GoodsDetailInfo;
import com.mengyao.graph.etl.apps.commons.beans.ods.SettlementWayInfo;
import com.mengyao.utils.DateTimeUtils;

/**
 * 帳單信息 gag_bill.bill_info
 * 
 * @author jmb
 * @update 2018-12-13 for mengyao
 */
public class BillInfoFmt implements Serializable {
    private static final long serialVersionUID = 1L;

    /**
     * 預結單
     */
    public static final String BILLTYPE_JUJIEDAN = "2";
    /**
     * 結帳單
     */
    public static final String BILLTYPE_JIEZHANGDAN = "1";
    /**
     * 日結帳單
     */
    public static final String BILLTYPE_RIJIEDAN = "3";
    /**
     * 點菜單
     */
    public static final String BILLTYPE_DIANCAIDAN = "7";

    /**
     * 帳單編號(系統產生),UUID
     */
    private String id;

    /**
     * 帳單編號(系統產生),HBase主鍵
     */
    private String rowKey;

    /**
     * 商家編號(系統產生)
     */
    private String shopId;

    /**
     * 商家名稱(系統產生)
     */
    private String shopName;

    /**
     * 實體店編號(系統產生)
     */
    private String shopEntityId;

    /**
     * 實體店名稱(系統產生)
     */
    private String shopEntityName;

    /**
     * 建立時間(系統產生)
     */
    private String createTime;

    /**
     * 最後一次修改時間(系統產生)
     */
    private String cTimeStamp;

    /**
     * 信息上傳時間(以header中建立時間爲準yyyyMMddHHmmss)
     */
    private String hcTime;

    /**
     * 流水號(header中的流水號)
     */
    private String hserial;

    /**
     * 修正帳單數據的設備編號(若是沒有修正,則與原始上傳帳單的採集終端編號相同)
     */
    private String fixTerminal;

    /**
     * 採集終端編號(截獲)
     */
    private String terminalNumber;

    /**
     * 帳單文件名稱,惟一(截獲),12位MAC地址+17位時間
     */
    private String billfileName;

    /**
     * 反掃支付時終端生成的終端流水號UUID(全球惟一)[先支付後打單必填]
     */
    private String tsuuid;

    /**
     * 歷史帳單文件名稱,記錄合併過程當中歷次帳單文件名稱,全量,mongodb爲Array
     */
    private List<String> billfileNameHis;

    /**
     * 帳單序號,不保證惟一(截獲)
     */
    private String billNo;

    /**
     * 截獲時間(截獲)
     */
    private String interceptTime;

    /**
     * 店鋪全名(截獲)
     */
    private String shopEntityFullName;

    /**
     * 店鋪地址(截獲)
     */
    private String shopEntityAddress;

    /**
     * 電話(截獲)
     */
    private String telephone;

    /**
     * 售貨員(截獲)
     */
    private String saler;

    /**
     * 收銀臺(截獲)
     */
    private String checkstand;

    /**
     * 收銀員(截獲)
     */
    private String cashier;

    /**
     * 應收金額(截獲)
     */
    private Double receivableAmount;

    /**
     * 原始應收金額(截獲)
     */
    private Double defaultReceivableAmount;

    /**
     * 商品數量(截獲)
     */
    private Double totalNum;

    /**
     * 原始商品數量(截獲)
     */
    private Double defaultTotalNum;

    /**
     * 小票流水號(截獲)
     */
    private String billSerialNumber;

    /**
     * 總金額(截獲)
     */
    private Double totalFee;

    /**
     * 原始總金額(截獲)
     */
    private Double defaultTotalFee;

    /**
     * 實收金額(截獲)
     */
    private Double paidAmount;

    /**
     * 原始實收金額(截獲)
     */
    private Double defaultPaidAmount;

    /**
     * 折扣金額(截獲)
     */
    private Double discountAmount;

    /**
     * 原始折扣金額(截獲)
     */
    private Double defaultDiscountAmount;

    /**
     * 優惠金額(截獲)
     */
    private Double couponAmount;

    /**
     * 原始優惠金額(截獲)
     */
    private Double defaultCouponAmount;

    /**
     * 找零金額(截獲)
     */
    private Double changeAmount;

    /**
     * 原始找零金額(截獲)
     */
    private Double defaultChangeAmount;

    /**
     * 結算方式(支持多項)(截獲)例:[{"a":5.0,"p":"現金"},{"a":10.01,"p":"書券"}],"a":結算金額,小數[截獲,必填],
     * "p":"結算方式[截獲,必填]"
     */
    private List<String> settlementWay;

    /**
     * 銷售時間(截獲)
     */
    private String saleTime;

    /**
     * 會員卡號(截獲)
     */
    private String memberCardNumber;

    /**
     * 累計消費(截獲)
     */
    private Double totalConsumption;

    /**
     * 原始累計消費(截獲)
     */
    private Double defaultTotalConsumption;

    /**
     * 網址(截獲)
     */
    private String website;

    /**
     * 小票圖片(截獲),只存url
     */
    private String billImage;

    /**
     * 商品詳情(支持多項)(截獲)[{"name":"可樂","itemserial":"PUMU00123","price":5.01,"totalnum":5.0,"totalprice":25.05},{"name":"金槍魚","itemserial":"FOOD02012","price":10.55,"totalnum":1.5,"totalprice":15.83}]
     * ,"name":"商品名稱[截獲,選填]","itemserial":"條形碼[截獲,選填]","price":單價,小數[截獲,選填],"totalnum":總數,小數[截獲,選填],"totalprice":總價,小數[截獲,選填]
     */
    private List<String> goodsDetails;

    /**
     * 房間號(截獲)(酒店特有)
     */
    private String roomNo;

    /**
     * 入住姓名(截獲)(酒店特有)
     */
    private String checkinName;

    /**
     * 桌號(截獲)(餐飲特有)
     */
    private String deskNo;

    /**
     * 消費人數(截獲)(通常用於餐飲)
     */
    private Double consumeNum;

    /**
     * 原始消費人數(截獲)(通常用於餐飲)
     */
    private Double defaultConsumeNum;

    /**
     * 原始帳單全部文本信息(截獲)
     */
    private String billText;

    /**
     * 入住時間(截獲)(酒店特有)
     */
    private String inTime;

    /**
     * 離店時間(截獲)(鐘點房特有)
     */
    private String outTime;
    /**
     * 默認打印時間
     */
    private String defaultPrintDate;
    /**
     * 默認入住時間
     */
    private String defaultInTime;
    /**
     * 默認離店時間
     */
    private String defaultOutTime;

    /**
     * 上傳類型 1:全單上傳 2. 篩選帳單上傳
     */
    private String uploadType;

    /**
     * 除了上面截獲外的自定義數據(截獲),json串(Map<String, Object>),json串中的key,value由採集終端自定義。
     */
    private Map<String, String> customRecord;

    /**
     * 帳單類型1:結帳單2:預結單 3:日結單 4:處方 5:預付押金單 6:退貨單 7:點菜單 8:發票單 [選填]
     */
    private String billType;

    /**
     * 帳單修改方式 0:默認值 1:收銀員重打 2:人工修改金額 3:自動重解析
     */
    private String modifyType = "0";

    /**
     * 帳單來源 1:設備採集 2:人工補錄三、解析服務 4.第三方
     */
    private String billSource = "1";

    /**
     * 服務端解析路徑[選填,用於服務端解析分析問題]
     */
    private String analyzPath;

    /**
     * 刷卡,錢包等匹配帳單的key,格式BILL_MAC_金額_截獲時間
     */
    private String billMatchKey;

    /**
     * 數據匹配支付結果等成功後,此字段保存支付結果等的主鍵 [選填] 匹配成功後爲必填,理論上應該保證此值爲全局惟一
     */
    private String matchId;

    /**
     * 默認銷售時間;
     */
    private String defaultSaleTime;

    /**
     * 客戶名稱[截獲,選填]
     */
    private String customerName;
    /**
     * 會員編號[截獲,選填]
     */
    private String membershipId;
    /**
     * 會員級別[截獲,選填]
     */
    private String memberLevels;
    /**
     * 寵物名稱[截獲,選填]
     */
    private String petName;
    /**
     * 寵物編號[截獲,選填]
     */
    private String petNumber;
    /**
     * 負責人(醫生)[截獲,選填]
     */
    private String principal;
    /**
     * 預交押金[截獲,選填]
     */
    private String deposit;
    /**
     * 打印時間yyyyMMddHHmmss[截獲,選填,若是截獲位數不夠,後面補0]
     */
    private String printDate;
    /**
     * 默認攔截時間
     */
    private String defaultInterceptTime;
    /**
     * 匹配模型 sk0001:先刷卡後打單,單次刷卡; dd0001:先打單後刷卡,單次刷卡
     */
    private String printMatchType;

    /**
     * 帳單合併時使用,惟一
     */
    private String billMergeKey;
    /**
     * 存重打單的應收金額
     */
    private Double modifyAmount;
    /**
     * 存重打單的應收金額List
     */
    private List<Double> modifyAmountList;
    /**
     * 存重打單的應收金額的銷售時間List
     */
    private List<String> modifyAmountSaleTimeList;
    /**
     * 存重打單的應收金額的截獲時間List
     */
    private List<String> modifyAmountInterceptTimeList;
    /**
     * 惟一標識[選填](目前可用來作積分標識使用)
     */
    private String uniqueId;
    /**
     * 歷史惟一標識,記錄合併過程當中歷次惟一標識,全量,mongodb爲Array(目前可用來作積分標識使用)
     */
    private List<String> uniqueIdHis;
    /**
     * 是否追加二維碼 1:是 2:否 [必填]
     */
    private String ifqrcode;
    /**
     * 優惠券券碼
     */
    private String couponNum;
    /**
     * ERP會員
     */
    private String erpMemberCard;
    /**
     * 憑證類型 11:水單(商戶pos、ERP打印)12:收銀憑證(大pos打印) 13:支付憑證(籤購單)99:類型未知 [選填,默認寫99,表示未知]
     */
    private String voucherType;
    /**
     * 小票二維碼[選填]
     */
    private String qrCode;
    /**
     * 積分標識 0:本次積分字段沒找到或沒有配置 1:有本次積分字段[選填]
     */
    private String integralmark;
    /**
     * 本次積分[選填]
     */
    private String thisintegral;
    /**
     * 備註[選填]
     */
    private String remarks;
    /**
     * 帳單子類型,庖丁用於配置文件分類
     */
    private String templateName;
    /**
     * 購貨方名稱[截獲,選填]
     */
    private String custName;
    /**
     * 購貨方稅號[截獲,選填]
     */
    private String custTaxNo;
    /**
     * 購貨方地址、電話[截獲,選填]
     */
    private String custAdress;
    /**
     * 購貨方銀行及帳號[截獲,選填]
     */
    private String custBankAccount;

    /**
     * 第三方訂單號(第三方系統惟一)
     */
    private String thirdPartyOrderNo;
    /**
     * 充值卡消費金額[截獲,選填]
     */
    private Double rechargeableCardConsumeAmount;
    /**
     * 原始充值卡消費金額[截獲,選填]
     */
    private Double defaultRechargeableCardConsumeAmount;
    /**
     * 實付金額[截獲,選填]
     */
    private Double outOfPocketAmount;
    /**
     * 原始實付金額[截獲,選填]
     */
    private Double defaultOutOfPocketAmount;
    /**
     * 充值金額[截獲,選填]
     */
    private Double rechargeAmount;
    /**
     * 原始充值金額[截獲,選填]
     */
    private Double defaultRechargeAmount;
    /**
     * 會員價[截獲,選填]
     */
    private Double memberPrice;
    /**
     * 原始會員價[截獲,選填]
     */
    private Double defaultMemberPrice;
    /**
     * 會員折扣率[截獲,選填]
     */
    private Double memberDiscountrate;
    /**
     * 原始會員折扣率[截獲,選填]
     */
    private Double defaultMemberDiscountrate;
    /**
     * 會員累計消費[截獲,選填]
     */
    private Double memberTotalConsumption;
    /**
     * 原始會員累計消費[截獲,選填]
     */
    private Double defaultMemberTotalConsumption;
    /**
     * 外賣單 1:外賣單 2:非外賣單[截獲,選填]
     */
    private String takeout;

    /**
     * 優惠商品詳情(支持多項)(截獲),{"name":"可樂","price":5.01}
     */
    private List<String> discountDetails;

    public String getThirdPartyOrderNo() {
        return thirdPartyOrderNo;
    }

    public void setThirdPartyOrderNo(String thirdPartyOrderNo) {
        this.thirdPartyOrderNo = thirdPartyOrderNo;
    }

    public String getRemarks() {
        return remarks;
    }

    public void setRemarks(String remarks) {
        this.remarks = remarks;
    }

    public String getTemplateName() {
        return templateName;
    }

    public void setTemplateName(String templateName) {
        this.templateName = templateName;
    }

    public String getDefaultPrintDate() {
        return this.defaultPrintDate;
    }

    public void setDefaultPrintDate(String defaultPrintDate) {
        this.defaultPrintDate = defaultPrintDate;
    }

    public String getDefaultInTime() {
        return this.defaultInTime;
    }

    public void setDefaultInTime(String defaultInTime) {
        this.defaultInTime = defaultInTime;
    }

    public String getDefaultOutTime() {
        return this.defaultOutTime;
    }

    public void setDefaultOutTime(String defaultOutTime) {
        this.defaultOutTime = defaultOutTime;
    }

    public String getCouponNum() {
        return this.couponNum;
    }

    public void setCouponNum(String couponNum) {
        this.couponNum = couponNum;
    }

    public String getErpMemberCard() {
        return this.erpMemberCard;
    }

    public void setErpMemberCard(String erpMemberCard) {
        this.erpMemberCard = erpMemberCard;
    }

    public String getVoucherType() {
        return this.voucherType;
    }

    public void setVoucherType(String voucherType) {
        this.voucherType = voucherType;
    }

    public String getAnalyzPath() {
        return this.analyzPath;
    }

    public void setAnalyzPath(String analyzPath) {
        this.analyzPath = analyzPath;
    }

    public List<Double> getModifyAmountList() {
        return this.modifyAmountList;
    }

    public String getUniqueId() {
        return this.uniqueId;
    }

    public void setUniqueId(String uniqueId) {
        this.uniqueId = uniqueId;
    }

    public String getIfqrcode() {
        return this.ifqrcode;
    }

    public void setIfqrcode(String ifqrcode) {
        this.ifqrcode = ifqrcode;
    }

    public void setModifyAmountList(List<Double> modifyAmountList) {
        this.modifyAmountList = modifyAmountList;
    }

    public Double getModifyAmount() {
        return this.modifyAmount;
    }

    public void setModifyAmount(Double modifyAmount) {
        this.modifyAmount = modifyAmount;
    }

    public String getDefaultSaleTime() {
        return this.defaultSaleTime;
    }

    public void setDefaultSaleTime(String defaultSaleTime) {
        this.defaultSaleTime = defaultSaleTime;
    }

    public String getBillMergeKey() {
        return this.billMergeKey;
    }

    public void setBillMergeKey(String billMergeKey) {
        this.billMergeKey = billMergeKey;
    }

    public String getCustName() {
        return custName;
    }

    public void setCustName(String custName) {
        this.custName = custName;
    }

    public String getCustTaxNo() {
        return custTaxNo;
    }

    public void setCustTaxNo(String custTaxNo) {
        this.custTaxNo = custTaxNo;
    }

    public String getCustAdress() {
        return custAdress;
    }

    public void setCustAdress(String custAdress) {
        this.custAdress = custAdress;
    }

    public String getCustBankAccount() {
        return custBankAccount;
    }

    public void setCustBankAccount(String custBankAccount) {
        this.custBankAccount = custBankAccount;
    }

    public List<String> getUniqueIdHis() {
        return uniqueIdHis;
    }

    public void setUniqueIdHis(List<String> uniqueIdHis) {
        this.uniqueIdHis = uniqueIdHis;
    }

    public Double getRechargeableCardConsumeAmount() {
        return rechargeableCardConsumeAmount;
    }

    public void setRechargeableCardConsumeAmount(Double rechargeableCardConsumeAmount) {
        this.rechargeableCardConsumeAmount = rechargeableCardConsumeAmount;
    }

    public Double getDefaultRechargeableCardConsumeAmount() {
        return defaultRechargeableCardConsumeAmount;
    }

    public void setDefaultRechargeableCardConsumeAmount(Double defaultRechargeableCardConsumeAmount) {
        this.defaultRechargeableCardConsumeAmount = defaultRechargeableCardConsumeAmount;
    }

    public Double getOutOfPocketAmount() {
        return outOfPocketAmount;
    }

    public void setOutOfPocketAmount(Double outOfPocketAmount) {
        this.outOfPocketAmount = outOfPocketAmount;
    }

    public Double getDefaultOutOfPocketAmount() {
        return defaultOutOfPocketAmount;
    }

    public void setDefaultOutOfPocketAmount(Double defaultOutOfPocketAmount) {
        this.defaultOutOfPocketAmount = defaultOutOfPocketAmount;
    }

    public Double getRechargeAmount() {
        return rechargeAmount;
    }

    public void setRechargeAmount(Double rechargeAmount) {
        this.rechargeAmount = rechargeAmount;
    }

    public Double getDefaultRechargeAmount() {
        return defaultRechargeAmount;
    }

    public void setDefaultRechargeAmount(Double defaultRechargeAmount) {
        this.defaultRechargeAmount = defaultRechargeAmount;
    }

    public Double getMemberPrice() {
        return memberPrice;
    }

    public void setMemberPrice(Double memberPrice) {
        this.memberPrice = memberPrice;
    }

    public Double getDefaultMemberPrice() {
        return defaultMemberPrice;
    }

    public void setDefaultMemberPrice(Double defaultMemberPrice) {
        this.defaultMemberPrice = defaultMemberPrice;
    }

    public Double getMemberDiscountrate() {
        return memberDiscountrate;
    }

    public void setMemberDiscountrate(Double memberDiscountrate) {
        this.memberDiscountrate = memberDiscountrate;
    }

    public Double getDefaultMemberDiscountrate() {
        return defaultMemberDiscountrate;
    }

    public void setDefaultMemberDiscountrate(Double defaultMemberDiscountrate) {
        this.defaultMemberDiscountrate = defaultMemberDiscountrate;
    }

    public Double getMemberTotalConsumption() {
        return memberTotalConsumption;
    }

    public void setMemberTotalConsumption(Double memberTotalConsumption) {
        this.memberTotalConsumption = memberTotalConsumption;
    }

    public Double getDefaultMemberTotalConsumption() {
        return defaultMemberTotalConsumption;
    }

    public void setDefaultMemberTotalConsumption(Double defaultMemberTotalConsumption) {
        this.defaultMemberTotalConsumption = defaultMemberTotalConsumption;
    }

    public String getTakeout() {
        return takeout;
    }

    public void setTakeout(String takeout) {
        this.takeout = takeout;
    }

    public List<String> getDiscountDetails() {
        return discountDetails;
    }

    public void setDiscountDetails(List<String> discountDetails) {
        this.discountDetails = discountDetails;
    }

    public String getId() {
        return this.id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getShopId() {
        return this.shopId;
    }

    public void setShopId(String shopId) {
        this.shopId = shopId;
    }

    public String getShopName() {
        return this.shopName;
    }

    public void setShopName(String shopName) {
        this.shopName = shopName;
    }

    public String getShopEntityId() {
        return this.shopEntityId;
    }

    public void setShopEntityId(String shopEntityId) {
        this.shopEntityId = shopEntityId;
    }

    public String getShopEntityName() {
        return this.shopEntityName;
    }

    public void setShopEntityName(String shopEntityName) {
        this.shopEntityName = shopEntityName;
    }

    public String getCreateTime() {
        return this.createTime;
    }

    public void setCreateTime(String createTime) {
        this.createTime = createTime;
    }

    public String getCTimeStamp() {
        return this.cTimeStamp;
    }

    public void setCTimeStamp(String cTimeStamp) {
        this.cTimeStamp = cTimeStamp;
    }

    public String getHcTime() {
        return this.hcTime;
    }

    public void setHcTime(String hcTime) {
        this.hcTime = hcTime;
    }

    public String getHserial() {
        return this.hserial;
    }

    public void setHserial(String hserial) {
        this.hserial = hserial;
    }

    public String getFixTerminal() {
        return this.fixTerminal;
    }

    public void setFixTerminal(String fixTerminal) {
        this.fixTerminal = fixTerminal;
    }

    public String getTerminalNumber() {
        return this.terminalNumber;
    }

    public void setTerminalNumber(String terminalNumber) {
        this.terminalNumber = terminalNumber;
    }

    public String getBillfileName() {
        return this.billfileName;
    }

    public void setBillfileName(String billfileName) {
        this.billfileName = billfileName;
    }

    public List<String> getBillfileNameHis() {
        return this.billfileNameHis;
    }

    public void setBillfileNameHis(List<String> billfileNameHis) {
        this.billfileNameHis = billfileNameHis;
    }

    public String getBillNo() {
        return this.billNo;
    }

    public void setBillNo(String billNo) {
        this.billNo = billNo;
    }

    public String getInterceptTime() {
        return this.interceptTime;
    }

    public void setInterceptTime(String interceptTime) {
        this.interceptTime = interceptTime;
    }

    public String getShopEntityFullName() {
        return this.shopEntityFullName;
    }

    public void setShopEntityFullName(String shopEntityFullName) {
        this.shopEntityFullName = shopEntityFullName;
    }

    public String getShopEntityAddress() {
        return this.shopEntityAddress;
    }

    public void setShopEntityAddress(String shopEntityAddress) {
        this.shopEntityAddress = shopEntityAddress;
    }

    public String getTelephone() {
        return this.telephone;
    }

    public void setTelephone(String telephone) {
        this.telephone = telephone;
    }

    public String getSaler() {
        return this.saler;
    }

    public void setSaler(String saler) {
        this.saler = saler;
    }

    public String getCheckstand() {
        return this.checkstand;
    }

    public void setCheckstand(String checkstand) {
        this.checkstand = checkstand;
    }

    public String getCashier() {
        return this.cashier;
    }

    public void setCashier(String cashier) {
        this.cashier = cashier;
    }

    public Double getReceivableAmount() {
        return this.receivableAmount;
    }

    public void setReceivableAmount(Double receivableAmount) {
        this.receivableAmount = receivableAmount;
    }

    public Double getTotalNum() {
        return this.totalNum;
    }

    public void setTotalNum(Double totalNum) {
        this.totalNum = totalNum;
    }

    public String getBillSerialNumber() {
        return this.billSerialNumber;
    }

    public void setBillSerialNumber(String billSerialNumber) {
        this.billSerialNumber = billSerialNumber;
    }

    public Double getTotalFee() {
        return this.totalFee;
    }

    public void setTotalFee(Double totalFee) {
        this.totalFee = totalFee;
    }

    public Double getPaidAmount() {
        return this.paidAmount;
    }

    public void setPaidAmount(Double paidAmount) {
        this.paidAmount = paidAmount;
    }

    public Double getDiscountAmount() {
        return this.discountAmount;
    }

    public void setDiscountAmount(Double discountAmount) {
        this.discountAmount = discountAmount;
    }

    public Double getCouponAmount() {
        return this.couponAmount;
    }

    public void setCouponAmount(Double couponAmount) {
        this.couponAmount = couponAmount;
    }

    public Double getChangeAmount() {
        return this.changeAmount;
    }

    public void setChangeAmount(Double changeAmount) {
        this.changeAmount = changeAmount;
    }

    public List<String> getSettlementWay() {
        return this.settlementWay;
    }

    public void setSettlementWay(List<String> settlementWay) {
        this.settlementWay = settlementWay;
    }

    public String getSaleTime() {
        return saleTime;
    }

    public void setSaleTime(String saleTime) {
        this.saleTime = saleTime;
    }

    public String getMemberCardNumber() {
        return this.memberCardNumber;
    }

    public void setMemberCardNumber(String memberCardNumber) {
        this.memberCardNumber = memberCardNumber;
    }

    public Double getTotalConsumption() {
        return this.totalConsumption;
    }

    public void setTotalConsumption(Double totalConsumption) {
        this.totalConsumption = totalConsumption;
    }

    public String getWebsite() {
        return this.website;
    }

    public void setWebsite(String website) {
        this.website = website;
    }

    public String getBillImage() {
        return this.billImage;
    }

    public void setBillImage(String billImage) {
        this.billImage = billImage;
    }

    public List<String> getGoodsDetails() {
        return this.goodsDetails;
    }

    public void setGoodsDetails(List<String> goodsDetails) {
        this.goodsDetails = goodsDetails;
    }

    public String getRoomNo() {
        return this.roomNo;
    }

    public void setRoomNo(String roomNo) {
        this.roomNo = roomNo;
    }

    public String getCheckinName() {
        return this.checkinName;
    }

    public void setCheckinName(String checkinName) {
        this.checkinName = checkinName;
    }

    public String getDeskNo() {
        return this.deskNo;
    }

    public void setDeskNo(String deskNo) {
        this.deskNo = deskNo;
    }

    public Double getConsumeNum() {
        return this.consumeNum;
    }

    public void setConsumeNum(Double consumeNum) {
        this.consumeNum = consumeNum;
    }

    public String getBillText() {
        return this.billText;
    }

    public void setBillText(String billText) {
         this.billText = billText;
    }

    public Map<String, String> getCustomRecord() {
        return this.customRecord;
    }

    public void setCustomRecord(Map<String, String> customRecord) {
        this.customRecord = customRecord;
    }

    public String getBillType() {
        return this.billType;
    }

    public void setBillType(String billType) {
        this.billType = billType;
    }

    public String getInTime() {
        return this.inTime;
    }

    public String getOutTime() {
        return this.outTime;
    }

    public void setInTime(String inTime) {
        this.inTime = inTime;
    }

    public void setOutTime(String outTime) {
        this.outTime = outTime;
    }

    public String getBillMatchKey() {
        return this.billMatchKey;
    }

    public void setBillMatchKey(String billMatchKey) {
        this.billMatchKey = billMatchKey;
    }

    public String getMatchId() {
        return this.matchId;
    }

    public void setMatchId(String matchId) {
        this.matchId = matchId;
    }

    public String getPrintMatchType() {
        return this.printMatchType;
    }

    public void setPrintMatchType(String printMatchType) {
        this.printMatchType = printMatchType;
    }

    public String getTsuuid() {
        return this.tsuuid;
    }

    public void setTsuuid(String tsuuid) {
        this.tsuuid = tsuuid;
    }

    public String getUploadType() {
        return this.uploadType;
    }

    public void setUploadType(String uploadType) {
        this.uploadType = uploadType;
    }

    public String getCustomerName() {
        return this.customerName;
    }

    public void setCustomerName(String customerName) {
        this.customerName = customerName;
    }

    public String getMembershipId() {
        return this.membershipId;
    }

    public void setMembershipId(String membershipId) {
        this.membershipId = membershipId;
    }

    public String getMemberLevels() {
        return this.memberLevels;
    }

    public void setMemberLevels(String memberLevels) {
        this.memberLevels = memberLevels;
    }

    public String getPetName() {
        return this.petName;
    }

    public void setPetName(String petName) {
        this.petName = petName;
    }

    public String getPetNumber() {
        return this.petNumber;
    }

    public void setPetNumber(String petNumber) {
        this.petNumber = petNumber;
    }

    public String getPrincipal() {
        return this.principal;
    }

    public void setPrincipal(String principal) {
        this.principal = principal;
    }

    public String getDeposit() {
        return this.deposit;
    }

    public void setDeposit(String deposit) {
        this.deposit = deposit;
    }

    public String getPrintDate() {
        return this.printDate;
    }

    public void setPrintDate(String printDate) {
        this.printDate = printDate;
    }

    public String getDefaultInterceptTime() {
        return this.defaultInterceptTime;
    }

    public void setDefaultInterceptTime(String defaultInterceptTime) {
        this.defaultInterceptTime = defaultInterceptTime;
    }

    public String getModifyType() {
        return this.modifyType;
    }

    public void setModifyType(String modifyType) {
        this.modifyType = modifyType;
    }

    public String getBillSource() {
        return this.billSource;
    }

    public void setBillSource(String billSource) {
        this.billSource = billSource;
    }

    public String getQrCode() {
        return qrCode;
    }

    public void setQrCode(String qrCode) {
        this.qrCode = qrCode;
    }

    public String getIntegralmark() {
        return integralmark;
    }

    public void setIntegralmark(String integralmark) {
        this.integralmark = integralmark;
    }

    public String getThisintegral() {
        return thisintegral;
    }

    public void setThisintegral(String thisintegral) {
        this.thisintegral = thisintegral;
    }

    public List<String> getModifyAmountSaleTimeList() {
        return modifyAmountSaleTimeList;
    }

    public void setModifyAmountSaleTimeList(List<String> modifyAmountSaleTimeList) {
        this.modifyAmountSaleTimeList = modifyAmountSaleTimeList;
    }

    public List<String> getModifyAmountInterceptTimeList() {
        return modifyAmountInterceptTimeList;
    }

    public void setModifyAmountInterceptTimeList(List<String> modifyAmountInterceptTimeList) {
        this.modifyAmountInterceptTimeList = modifyAmountInterceptTimeList;
    }

    public Double getDefaultReceivableAmount() {
        return defaultReceivableAmount;
    }

    public void setDefaultReceivableAmount(Double defaultReceivableAmount) {
        this.defaultReceivableAmount = defaultReceivableAmount;
    }

    public Double getDefaultTotalNum() {
        return defaultTotalNum;
    }

    public void setDefaultTotalNum(Double defaultTotalNum) {
        this.defaultTotalNum = defaultTotalNum;
    }

    public Double getDefaultTotalFee() {
        return defaultTotalFee;
    }

    public void setDefaultTotalFee(Double defaultTotalFee) {
        this.defaultTotalFee = defaultTotalFee;
    }

    public Double getDefaultPaidAmount() {
        return defaultPaidAmount;
    }

    public void setDefaultPaidAmount(Double defaultPaidAmount) {
        this.defaultPaidAmount = defaultPaidAmount;
    }

    public Double getDefaultDiscountAmount() {
        return defaultDiscountAmount;
    }

    public void setDefaultDiscountAmount(Double defaultDiscountAmount) {
        this.defaultDiscountAmount = defaultDiscountAmount;
    }

    public Double getDefaultCouponAmount() {
        return defaultCouponAmount;
    }

    public void setDefaultCouponAmount(Double defaultCouponAmount) {
        this.defaultCouponAmount = defaultCouponAmount;
    }

    public Double getDefaultChangeAmount() {
        return defaultChangeAmount;
    }

    public void setDefaultChangeAmount(Double defaultChangeAmount) {
        this.defaultChangeAmount = defaultChangeAmount;
    }

    public Double getDefaultTotalConsumption() {
        return defaultTotalConsumption;
    }

    public void setDefaultTotalConsumption(Double defaultTotalConsumption) {
        this.defaultTotalConsumption = defaultTotalConsumption;
    }

    public Double getDefaultConsumeNum() {
        return defaultConsumeNum;
    }

    public void setDefaultConsumeNum(Double defaultConsumeNum) {
        this.defaultConsumeNum = defaultConsumeNum;
    }

    public String getRowKey() {
        return rowKey;
    }

    public void setRowKey(String rowKey) {
        this.rowKey = rowKey;
    }

    @Override
    public String toString() {
        return id + "\t" + rowKey + "\t" + shopId + "\t" + shopName + "\t" + shopEntityId + "\t" + shopEntityName + "\t"
                + createTime + "\t" + cTimeStamp + "\t" + hcTime + "\t" + hserial + "\t" + fixTerminal + "\t"
                + terminalNumber + "\t" + billfileName + "\t" + tsuuid + "\t" + billfileNameHis + "\t" + billNo + "\t"
                + interceptTime + "\t" + shopEntityFullName + "\t" + shopEntityAddress + "\t" + telephone + "\t" + saler
                + "\t" + checkstand + "\t" + cashier + "\t" + receivableAmount + "\t" + defaultReceivableAmount + "\t"
                + totalNum + "\t" + defaultTotalNum + "\t" + billSerialNumber + "\t" + totalFee + "\t" + defaultTotalFee
                + "\t" + paidAmount + "\t" + defaultPaidAmount + "\t" + discountAmount + "\t" + defaultDiscountAmount
                + "\t" + couponAmount + "\t" + defaultCouponAmount + "\t" + changeAmount + "\t" + defaultChangeAmount
                + "\t" + settlementWay + "\t" + saleTime + "\t" + memberCardNumber + "\t" + totalConsumption + "\t"
                + defaultTotalConsumption + "\t" + website + "\t" + billImage + "\t" + goodsDetails + "\t" + roomNo
                + "\t" + checkinName + "\t" + deskNo + "\t" + consumeNum + "\t" + defaultConsumeNum + "\t" + billText
                + "\t" + inTime + "\t" + outTime + "\t" + defaultPrintDate + "\t" + defaultInTime + "\t"
                + defaultOutTime + "\t" + uploadType + "\t" + customRecord + "\t" + billType + "\t" + modifyType + "\t"
                + billSource + "\t" + analyzPath + "\t" + billMatchKey + "\t" + matchId + "\t" + defaultSaleTime + "\t"
                + customerName + "\t" + membershipId + "\t" + memberLevels + "\t" + petName + "\t" + petNumber + "\t"
                + principal + "\t" + deposit + "\t" + printDate + "\t" + defaultInterceptTime + "\t" + printMatchType
                + "\t" + billMergeKey + "\t" + modifyAmount + "\t" + modifyAmountList + "\t" + modifyAmountSaleTimeList
                + "\t" + modifyAmountInterceptTimeList + "\t" + uniqueId + "\t" + uniqueIdHis + "\t" + ifqrcode + "\t"
                + couponNum + "\t" + erpMemberCard + "\t" + voucherType + "\t" + qrCode + "\t" + integralmark + "\t"
                + thisintegral + "\t" + remarks + "\t" + templateName + "\t" + custName + "\t" + custTaxNo + "\t"
                + custAdress + "\t" + custBankAccount + "\t" + thirdPartyOrderNo + "\t" + rechargeableCardConsumeAmount
                + "\t" + defaultRechargeableCardConsumeAmount + "\t" + outOfPocketAmount + "\t"
                + defaultOutOfPocketAmount + "\t" + rechargeAmount + "\t" + defaultRechargeAmount + "\t" + memberPrice
                + "\t" + defaultMemberPrice + "\t" + memberDiscountrate + "\t" + defaultMemberDiscountrate + "\t"
                + memberTotalConsumption + "\t" + defaultMemberTotalConsumption + "\t" + takeout + "\t"
                + discountDetails;
    }

    public static BillInfoFmt cloneBill(BillInfo fromBean) {
        if (null == fromBean) {
            return null;
        }
        BillInfoFmt toBean = new BillInfoFmt();
        toBean.setId(stringRemove(fromBean.getId()));
        toBean.setRowKey(stringRemove(fromBean.getRowKey()));
        toBean.setShopId(stringRemove(fromBean.getShopId()));
        toBean.setShopName(stringRemove(fromBean.getShopName()));
        toBean.setShopEntityId(stringRemove(fromBean.getShopEntityId()));
        toBean.setShopEntityName(stringRemove(fromBean.getShopEntityName()));
        toBean.setCreateTime(DateTimeUtils.getYmdhmsForNo(fromBean.getCreateTime()));
        toBean.setCTimeStamp(DateTimeUtils.getYmdhmsForNo(fromBean.getCTimeStamp()));
        toBean.setHcTime(stringRemove(fromBean.getHcTime()));
        toBean.setHserial(stringRemove(fromBean.getHserial()));
        toBean.setFixTerminal(stringRemove(fromBean.getFixTerminal()));
        toBean.setTerminalNumber(stringRemove(fromBean.getTerminalNumber()));
        toBean.setBillfileName(stringRemove(fromBean.getBillfileName()));
        toBean.setTsuuid(stringRemove(fromBean.getTsuuid()));
        toBean.setBillfileNameHis(fromBean.getBillfileNameHis());
        toBean.setBillNo(stringRemove(fromBean.getBillNo()));
        toBean.setInterceptTime(DateTimeUtils.getYmdhmsForNo(fromBean.getInterceptTime()));
        toBean.setShopEntityFullName(stringRemove(fromBean.getShopEntityFullName()));
        toBean.setShopEntityAddress(stringRemove(fromBean.getShopEntityAddress()));
        toBean.setTelephone(stringRemove(fromBean.getTelephone()));
        toBean.setSaler(stringRemove(fromBean.getSaler()));
        toBean.setCheckstand(stringRemove(fromBean.getCheckstand()));
        toBean.setCashier(stringRemove(fromBean.getCashier()));
        toBean.setReceivableAmount(fromBean.getReceivableAmount());
        toBean.setDefaultReceivableAmount(fromBean.getDefaultReceivableAmount());
        toBean.setTotalNum(fromBean.getTotalNum());
        toBean.setDefaultTotalNum(fromBean.getDefaultTotalNum());
        toBean.setBillSerialNumber(stringRemove(fromBean.getBillSerialNumber()));
        toBean.setTotalFee(fromBean.getTotalFee());
        toBean.setDefaultTotalFee(fromBean.getDefaultTotalFee());
        toBean.setPaidAmount(fromBean.getPaidAmount());
        toBean.setDefaultPaidAmount(fromBean.getDefaultPaidAmount());
        toBean.setDiscountAmount(fromBean.getDiscountAmount());
        toBean.setDefaultDiscountAmount(fromBean.getDefaultDiscountAmount());
        toBean.setCouponAmount(fromBean.getCouponAmount());
        toBean.setDefaultCouponAmount(fromBean.getDefaultCouponAmount());
        toBean.setChangeAmount(fromBean.getChangeAmount());
        toBean.setDefaultChangeAmount(fromBean.getDefaultChangeAmount());
        if (null != fromBean.getSettlementWay()) {
            List<SettlementWayInfo> rawList = fromBean.getSettlementWay();
            List<String> list = new ArrayList<>();
            for (SettlementWayInfo raw : rawList) {
                list.add(raw.toString());
            }
            toBean.setSettlementWay(list);
        }
        toBean.setSaleTime(fromBean.getSaleTime());
        toBean.setMemberCardNumber(stringRemove(fromBean.getMemberCardNumber()));
        toBean.setTotalConsumption(fromBean.getTotalConsumption());
        toBean.setDefaultTotalConsumption(fromBean.getDefaultTotalConsumption());
        toBean.setWebsite(stringRemove(fromBean.getWebsite()));
        toBean.setBillImage(stringRemove(fromBean.getBillImage()));
        if (null != fromBean.getGoodsDetails()) {
            List<GoodsDetailInfo> rawList = fromBean.getGoodsDetails();
            List<String> list = new ArrayList<>();
            for (GoodsDetailInfo raw : rawList) {
                list.add(raw.toString());
            }
            toBean.setGoodsDetails(list);
        }
        toBean.setRoomNo(stringRemove(fromBean.getRoomNo()));
        toBean.setCheckinName(stringRemove(fromBean.getCheckinName()));
        toBean.setDeskNo(stringRemove(fromBean.getDeskNo()));
        toBean.setConsumeNum(fromBean.getConsumeNum());
        toBean.setDefaultConsumeNum(fromBean.getDefaultConsumeNum());
        toBean.setBillText(stringRemove(fromBean.getBillText()));
        toBean.setInTime(DateTimeUtils.getYmdhmsForNo(fromBean.getInTime()));
        toBean.setOutTime(DateTimeUtils.getYmdhmsForNo(fromBean.getOutTime()));
        toBean.setDefaultPrintDate(DateTimeUtils.getYmdhmsForNo(fromBean.getDefaultPrintDate()));
        toBean.setDefaultInTime(DateTimeUtils.getYmdhmsForNo(fromBean.getDefaultInTime()));
        toBean.setDefaultOutTime(DateTimeUtils.getYmdhmsForNo(fromBean.getDefaultOutTime()));
        toBean.setUploadType(stringRemove(fromBean.getUploadType()));
        toBean.setCustomRecord(fromBean.getCustomRecord());
        toBean.setBillType(stringRemove(fromBean.getBillType()));
        toBean.setModifyType(stringRemove(fromBean.getModifyType()));
        toBean.setBillSource(stringRemove(fromBean.getBillSource()));
        toBean.setAnalyzPath(stringRemove(fromBean.getAnalyzPath()));
        toBean.setBillMatchKey(stringRemove(fromBean.getBillMatchKey()));
        toBean.setMatchId(stringRemove(fromBean.getMatchId()));
        toBean.setDefaultSaleTime(DateTimeUtils.getYmdhmsForNo(fromBean.getDefaultSaleTime()));
        toBean.setCustomerName(stringRemove(fromBean.getCustomerName()));
        toBean.setMembershipId(stringRemove(fromBean.getMembershipId()));
        toBean.setMemberLevels(stringRemove(fromBean.getMemberLevels()));
        toBean.setPetName(stringRemove(fromBean.getPetName()));
        toBean.setPetNumber(stringRemove(fromBean.getPetNumber()));
        toBean.setPrincipal(stringRemove(fromBean.getPrincipal()));
        toBean.setDeposit(stringRemove(fromBean.getDeposit()));
        toBean.setPrintDate(DateTimeUtils.getYmdhmsForNo(fromBean.getPrintDate()));
        toBean.setDefaultInterceptTime(DateTimeUtils.getYmdhmsForNo(fromBean.getDefaultInterceptTime()));
        toBean.setPrintMatchType(stringRemove(fromBean.getPrintMatchType()));
        toBean.setBillMergeKey(stringRemove(fromBean.getBillMergeKey()));
        toBean.setModifyAmount(fromBean.getModifyAmount());
        toBean.setModifyAmountList(fromBean.getModifyAmountList());
        toBean.setModifyAmountSaleTimeList(DateTimeUtils.getYmdhmsForNo(fromBean.getModifyAmountSaleTimeList()));
        toBean.setModifyAmountInterceptTimeList(
                DateTimeUtils.getYmdhmsForNo(fromBean.getModifyAmountInterceptTimeList()));
        toBean.setUniqueId(stringRemove(fromBean.getUniqueId()));
        toBean.setUniqueIdHis(fromBean.getUniqueIdHis());
        toBean.setIfqrcode(stringRemove(fromBean.getIfqrcode()));
        toBean.setCouponNum(stringRemove(fromBean.getCouponNum()));
        toBean.setErpMemberCard(stringRemove(fromBean.getErpMemberCard()));
        toBean.setVoucherType(stringRemove(fromBean.getVoucherType()));
        toBean.setQrCode(stringRemove(fromBean.getQrCode()));
        toBean.setIntegralmark(stringRemove(fromBean.getIntegralmark()));
        toBean.setThisintegral(stringRemove(fromBean.getThisintegral()));
        toBean.setRemarks(stringRemove(fromBean.getRemarks()));
        toBean.setTemplateName(stringRemove(fromBean.getTemplateName()));
        toBean.setCustName(stringRemove(fromBean.getCustName()));
        toBean.setCustName(stringRemove(fromBean.getCustTaxNo()));
        toBean.setCustAdress(stringRemove(fromBean.getCustAdress()));
        toBean.setCustBankAccount(stringRemove(fromBean.getCustBankAccount()));
        toBean.setThirdPartyOrderNo(stringRemove(fromBean.getThirdPartyOrderNo()));
        toBean.setRechargeableCardConsumeAmount(fromBean.getRechargeableCardConsumeAmount());
        toBean.setDefaultRechargeableCardConsumeAmount(fromBean.getDefaultRechargeableCardConsumeAmount());
        toBean.setOutOfPocketAmount(fromBean.getOutOfPocketAmount());
        toBean.setDefaultOutOfPocketAmount(fromBean.getDefaultOutOfPocketAmount());
        toBean.setRechargeAmount(fromBean.getRechargeAmount());
        toBean.setDefaultRechargeAmount(fromBean.getDefaultRechargeAmount());
        toBean.setMemberPrice(fromBean.getMemberPrice());
        toBean.setDefaultMemberPrice(fromBean.getDefaultMemberPrice());
        toBean.setMemberDiscountrate(fromBean.getMemberDiscountrate());
        toBean.setDefaultMemberDiscountrate(fromBean.getDefaultMemberDiscountrate());
        toBean.setMemberTotalConsumption(fromBean.getMemberTotalConsumption());
        toBean.setDefaultMemberTotalConsumption(fromBean.getDefaultMemberTotalConsumption());
        toBean.setTakeout(stringRemove(fromBean.getTakeout()));
        if (null != fromBean.getDiscountDetails()) {
            List<DiscountDetailsInfo> rawList = fromBean.getDiscountDetails();
            List<String> list = new ArrayList<>();
            for (DiscountDetailsInfo raw : rawList) {
                list.add(raw.toString());
            }
            toBean.setDiscountDetails(list);
        }
        return toBean;
    }

    /**
     * 原始數據格式化處理,待完善
     * 
     * @param jsonStr
     * @return
     */
    private static String stringRemove(String strVal) {
        if (StringUtils.isEmpty(strVal)) {
            return "";
        }
        strVal = strVal.replace("\t", "");
        strVal = strVal.replace("\n", "");
        strVal = strVal.replace("\r", "");
        return strVal;
    }
}

 

4.五、BCD裁剪實體類

package com.mengyao.graph.etl.apps.dashboard.beans;

import java.io.Serializable;

/**
 * Bill Consumer Dashboard Bean
 * @author mengyao
 *
 */
public class BCD implements Serializable {
    private static final long serialVersionUID = 1749406742944513387L;
    private String billId;
    private double receivableAmount;
    private String saleTime;
    private String sdt;//yyyyMMdd
    private int hour;
    private String billType;
    private String shopId;
    private String shopEntityId;
    public BCD() {
        super();
    }
    public BCD(String billId, double receivableAmount, String saleTime, String billType, String shopId, String shopEntityId) {
        super();
        this.billId = billId;
        this.receivableAmount = receivableAmount;
        this.saleTime = saleTime;
        setDateTime(saleTime);
        this.billType = billType;
        this.shopId = shopId;
        this.shopEntityId = shopEntityId;
    }
    public String getBillId() {
        return billId;
    }
    public void setBillId(String billId) {
        this.billId = billId;
    }
    public double getReceivableAmount() {
        return receivableAmount;
    }
    public void setReceivableAmount(double receivableAmount) {
        this.receivableAmount = receivableAmount;
    }
    public String getSaleTime() {
        return saleTime;
    }
    public void setSaleTime(String saleTime) {
        this.saleTime = saleTime;
        setDateTime(saleTime);
    }
    public String getSdt() {
        return sdt;
    }
    public void setSdt(String sdt) {
        this.sdt = sdt;
    }
    public int getHour() {
        return hour;
    }
    public void setHour(int hour) {
        this.hour = hour;
    }
    public String getBillType() {
        return billType;
    }
    public void setBillType(String billType) {
        this.billType = billType;
    }
    public String getShopId() {
        return shopId;
    }
    public void setShopId(String shopId) {
        this.shopId = shopId;
    }
    public String getShopEntityId() {
        return shopEntityId;
    }
    public void setShopEntityId(String shopEntityId) {
        this.shopEntityId = shopEntityId;
    }
    private void setDateTime(String saleTime) {
        if (null!=saleTime&&saleTime.length()==17) {
            this.sdt = saleTime.substring(0, 8);
            this.hour = Integer.parseInt(saleTime.substring(8, 10));
        }
    }
    @Override
    public String toString() {
        return  billId + "\t" + receivableAmount + "\t" + saleTime + "\t" + sdt + "\t" + billType + "\t" + shopId + "\t" + shopEntityId;
    }
    
}

 

4.六、銷售分析業務實現類

package com.mengyao.graph.etl.apps.dashboard.service;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.mengyao.graph.etl.apps.dashboard.beans.AreaProjSale;
import com.mengyao.graph.etl.apps.dashboard.beans.AreaSaleTrendAll;
import com.mengyao.graph.etl.apps.dashboard.beans.PeakTime;
import com.mengyao.graph.etl.apps.dashboard.beans.ProjectTypeShopNumber;
import com.mengyao.graph.etl.apps.dashboard.beans.ShopSaleRank;
import com.mengyao.graph.etl.apps.dashboard.beans.TotalRefund;
import com.mengyao.graph.etl.apps.dashboard.beans.TotalSale;
import com.mengyao.graph.etl.apps.dashboard.beans.TotalSettlementBillNumber;
import com.mengyao.utils.RedisUtil;


/**
 * 大屏指標分析
 * @author mengyao
 *
 */
public class SaleAnalysisService implements Serializable {

    private static final long serialVersionUID = 8289368096001689148L;
    //解決區域名稱hash重複問題
    private static final String SALT="aAb12";

    /**
     * 每日重置大屏指標值
     */
    @Deprecated
    public void reset() {
        RedisUtil.setObject("dtsbn_6", "{\"結帳單數\":{\"val\":7270}}\"");
        RedisUtil.setObject("dpt_7", "{\"高峯時段\":{\"val\":13}}\"");
        RedisUtil.setObject("dtr_5", "{\"退款金額\":{\"val\":7301.8}}\"");
        RedisUtil.setObject("dts_4", "{\"總銷售額\":{\"val\":1300523.8000000005}}\"");
        RedisUtil.setObject("curDay", "20190227\"");
        RedisUtil.setObject("dastfa_2_20190227", "{\"各區域銷售額發展趨勢\":{\"重慶天地\":[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,9.5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0],\"創智天地\":[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,43.3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0],\"上海瑞虹\":[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,16.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0],\"上海新天地\":[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,553.75,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]}}\"");
        RedisUtil.setObject("dasp_1", "{\"各區域銷售額佔比\":{\"重慶天地\":[{\"bn\":1,\"n\":\"重慶天地\",\"sa\":9.5}],\"創智天地\":[{\"bn\":3,\"n\":\"壹方\",\"sa\":43.3}],\"上海新天地\":[{\"bn\":4,\"n\":\"新天地時尚購物中心\",\"sa\":152.0},{\"bn\":3,\"n\":\"上海新天地南里北里\",\"sa\":74.0},{\"bn\":2,\"n\":\"湖濱道購物中心\",\"sa\":100.0},{\"bn\":10,\"n\":\"新天地廣場\",\"sa\":227.75}],\"上海瑞虹\":[{\"bn\":1,\"n\":\"瑞虹天地星星堂\",\"sa\":16.0}]}}\"");
        RedisUtil.setObject("dpc_8", "{\"各項目銷售額對比\":{\"重慶天地\":[{\"bn\":1,\"n\":\"重慶天地\",\"sa\":9.5}],\"創智天地\":[{\"bn\":3,\"n\":\"壹方\",\"sa\":43.3}],\"上海新天地\":[{\"bn\":4,\"n\":\"新天地時尚購物中心\",\"sa\":152.0},{\"bn\":3,\"n\":\"上海新天地南里北里\",\"sa\":74.0},{\"bn\":2,\"n\":\"湖濱道購物中心\",\"sa\":100.0},{\"bn\":10,\"n\":\"新天地廣場\",\"sa\":227.75}],\"上海瑞虹\":[{\"bn\":1,\"n\":\"瑞虹天地星星堂\",\"sa\":16.0}]}}\"");
        RedisUtil.setObject("dpac_10", "{\"各項目單均消費\":{\"重慶天地\":[{\"bn\":1,\"n\":\"重慶天地\",\"sa\":9.5}],\"創智天地\":[{\"bn\":3,\"n\":\"壹方\",\"sa\":43.3}],\"上海新天地\":[{\"bn\":4,\"n\":\"新天地時尚購物中心\",\"sa\":152.0},{\"bn\":3,\"n\":\"上海新天地南里北里\",\"sa\":74.0},{\"bn\":2,\"n\":\"湖濱道購物中心\",\"sa\":100.0},{\"bn\":10,\"n\":\"新天地廣場\",\"sa\":227.75}],\"上海瑞虹\":[{\"bn\":1,\"n\":\"瑞虹天地星星堂\",\"sa\":16.0}]}}\"");
        RedisUtil.setObject("dsfst_9", "{\"店鋪銷售排行\":[{\"pn\":\"新天地時尚購物中心\",\"sa\":152.0,\"sn\":\"GREYBOX COFFEE\"},{\"pn\":\"新天地廣場\",\"sa\":114.75,\"sn\":\"Arabica\"},{\"pn\":\"湖濱道購物中心\",\"sa\":100.0,\"sn\":\"LOKAL\"},{\"pn\":\"上海新天地南里北里\",\"sa\":74.0,\"sn\":\"哈肯鋪_手感烘焙\"},{\"pn\":\"壹方\",\"sa\":60.3,\"sn\":\"新一天便利\"},{\"pn\":\"新天地廣場\",\"sa\":41.0,\"sn\":\"奈雪的茶\"},{\"pn\":\"新天地廣場\",\"sa\":39.0,\"sn\":\"蒲石小點\"},{\"pn\":\"新天地廣場\",\"sa\":18.0,\"sn\":\"Fresh_Every_Day\"},{\"pn\":\"瑞虹天地星星堂\",\"sa\":16.0,\"sn\":\"老盛昌\"},{\"pn\":\"新天地廣場\",\"sa\":15.0,\"sn\":\"多幾谷\"}]}\"");
        RedisUtil.setObject("dptsc_11", "{\"各項目業態銷售額對比\":{\"上海新天地南里北里\":[{\"n\":\"餐飲\",\"sa\":74.0,\"sn\":3}],\"新天地廣場\":[{\"n\":\"餐飲\",\"sa\":227.75,\"sn\":10}],\"重慶天地\":[{\"n\":\"其它\",\"sa\":9.5,\"sn\":0}],\"湖濱道購物中心\":[{\"n\":\"餐飲\",\"sa\":100.0,\"sn\":2}],\"壹方\":[{\"n\":\"餐飲\",\"sa\":43.3,\"sn\":3}],\"瑞虹天地星星堂\":[{\"n\":\"餐飲\",\"sa\":16.0,\"sn\":1}],\"新天地時尚購物中心\":[{\"n\":\"餐飲\",\"sa\":152.0,\"sn\":4}]}}\"");
        RedisUtil.setObject("dpsn_12", "{\"各項目店鋪數量\":{\"上海新天地南里北里\":[{\"n\":\"餐飲\",\"sa\":74.0,\"sn\":3}],\"新天地廣場\":[{\"n\":\"餐飲\",\"sa\":227.75,\"sn\":10}],\"重慶天地\":[{\"n\":\"其它\",\"sa\":9.5,\"sn\":0}],\"湖濱道購物中心\":[{\"n\":\"餐飲\",\"sa\":100.0,\"sn\":2}],\"壹方\":[{\"n\":\"餐飲\",\"sa\":43.3,\"sn\":3}],\"瑞虹天地星星堂\":[{\"n\":\"餐飲\",\"sa\":16.0,\"sn\":1}],\"新天地時尚購物中心\":[{\"n\":\"餐飲\",\"sa\":152.0,\"sn\":4}]}}\"");
    }
    
    /**
     * 總銷售額
     * @param ds
     */
    public void totalSale(Dataset<Row> bill) {
        Row[] rows = (Row[])bill
                .filter("billType=1 and receivableAmount>=0")
                .agg(functions.sum(new Column("receivableAmount")).alias("totalSale"), functions.count("receivableAmount"))
                .head(1);
        Map<String, TotalSale> map = new HashMap<String, TotalSale>();
        double totalSale=0D;
        if(rows.length>0){
            Row row = rows[0];
            if (!row.isNullAt(0)) {
                //銷售額
                totalSale=row.getDouble(0);
            }
            if (!row.isNullAt(1)) {
                //結帳單數
                totalSettlementBillNumber(new TotalSettlementBillNumber(row.getLong(1)));
            }
        }
        map.put("總銷售額", new TotalSale(totalSale));
        String str = JSONObject.toJSONString(map);
        System.out.println("====####--totalSale##" + str);
        // 將總銷售額存入redis
        RedisUtil.setObject("dts_4", str);
    }
    
    /**
     * 退款金額
     * @param ds
     */
    public void totalRefund(Dataset<Row> bill) {
        Row[] rows = (Row[]) bill
                .filter("((billType=6) or (billType=1 and receivableAmount<0))")
                .agg(functions.sum(functions.abs(new Column("receivableAmount"))).alias("totalSale"))
                .head(1);
        Map<String, TotalRefund> map=new HashMap<>();
        map.put("退款金額", new TotalRefund(0));
        if(rows.length>0){
            Row row = rows[0];
            if (!row.isNullAt(0)) {
                map.put("退款金額", new TotalRefund(row.getDouble(0)));
            }
        }
        String str=JSONObject.toJSONString(map);
        System.out.println("====####--totalRefund##"+str);
        //將退款金額存入redis
        RedisUtil.setObject("dtr_5",str);
    }
    
    /**
     * 高峯時段
     *  一、時段:小時;
     *    二、高峯:當日每小時結帳單數累計最大;
     *    三、高峯時段:全部mall累計每小時結帳單數;
     * @param ds
     * @return {"高峯時段":{"val":12}}
     */
    public void peakTime(Dataset<Row> bill) {
        //帳單表自己有mallid(shopId) 所以無需關聯店鋪表
        Map<String, PeakTime> map=new HashMap<>();
        map.put("高峯時段", new PeakTime(8));
        Row[] rows = (Row[])bill
                .filter("billType=1")
                .groupBy("hour")
                .agg(functions.sum("receivableAmount").alias("totalSale"))
                .orderBy(new Column("totalSale").desc())
                .limit(1)
                .head(1);
        if(rows.length>0){
            Row row = rows[0];
            if (!row.isNullAt(0)) {
                map.put("高峯時段", new PeakTime(row.getAs(0)));
            }
        }
        String str=JSONObject.toJSONString(map);
        System.out.println("====####--peakTime##"+str);
        //將高峯時段放入redis
        RedisUtil.setObject("dpt_7", str);
    }
    
    /**
     * 各區域銷售發展趨勢-多個區域(每一個區域下有多個mall)當日0點~24點的累計銷售額
     * @param bill
     * @param ruian
     * @param curDay
     */
    public void areaSaleTrendForAll(Dataset<Row> bill, Dataset<Row> ruian,Set<String> areaSet, String curDay) {
        Row[] rows = (Row[])bill
                .filter("billType=1 and receivableAmount>0")
                .join(ruian, bill.col("shopId").equalTo(ruian.col("rmid")), "leftouter")
                .groupBy("area_cn", "hour")
                .agg(functions.sum("receivableAmount").alias("areaDayHourSale"))
                .orderBy("areaDayHourSale")
                .select("area_cn","areaDayHourSale","hour")
                .collect();
//        Map<String, Map<String,Map<Integer, Double>>> map = new HashMap<>();
        Map<String, Map<String,Collection<Double>>> map = new HashMap<>();
        Map<String,AreaSaleTrendAll> maps=new HashMap<>();
        if(rows.length>0){
            for (Row row : rows) {
                String areaCn=null;
                double areaDayHourSale=0D;
                int saleHour=0;
                if(!row.isNullAt(0)){
                    areaCn=row.getString(0);
                }
                if(!row.isNullAt(1)){
                    areaDayHourSale=row.getDouble(1);
                }
                if(!row.isNullAt(2)){
                    saleHour=row.getInt(2);
                }
                if(maps.containsKey(areaCn+SALT)){
                    AreaSaleTrendAll ast=maps.get(areaCn+SALT);
                    ast.getVals().put(saleHour, areaDayHourSale);
                }else{
                    HashMap<Integer,Double> vals=new HashMap<Integer,Double>();
                    vals.put(saleHour, areaDayHourSale);
                    maps.put(areaCn+SALT, new AreaSaleTrendAll(areaCn, saleHour, areaDayHourSale));
                }
            }
        }
        //填充沒有銷售額的區域記錄,使數據更加完整。
        areaSet.forEach(areaCn->{
            if(!maps.containsKey(areaCn+SALT)){
                maps.put(areaCn+SALT, new AreaSaleTrendAll(areaCn));
            }
        });

        System.out.println("==####========填充hou====begin=====================");
        for (String key:maps.keySet()) {
            System.out.println("Key:"+key);
        }
        System.out.println("==####========填充hou=====end====================");
        
//        Map<String,Map<Integer, Double>> rs=new HashMap<>();
//        for(AreaSaleTrendAll asta:maps.values()){
//            rs.put(asta.getAreaCn(), asta.getVals());
//        }
        Map<String,Collection<Double>> rs=new HashMap<>();
        for(AreaSaleTrendAll asta:maps.values()){
            rs.put(asta.getAreaCn(), asta.getVals().values());
        }
        map.put("各區域銷售額發展趨勢", rs);
        //轉成json 字符串
        String str=JSONObject.toJSONString(map);
        System.out.println("====####--areaSaleTrendForAll##"+str);
        //放入redis
        RedisUtil.setObject("dastfa_2_"+curDay, str);
        //維護redis中最新日期
        if(!RedisUtil.existsObject("curDay")){//若是爲空,說明是第一次運行,直接將當前日期設置到redis中
            RedisUtil.setObject("curDay", curDay);
        }else{
            String curDayRedis=(String) RedisUtil.getObject("curDay");
            if((curDayRedis.compareTo(curDay))<0){//說明當前日期大於redis中的日期,更新
                RedisUtil.setObject("curDay", curDay);
            }
        }
    }

    /**
     * 各項目銷售額對比
     *     一、各項目:各個mall;
     *    二、單一項目銷售額:mall的當日開始營業時間到當前時間累計銷售額;
     */
    public void projectContrast(Dataset<Row> bill, Dataset<Row> ruian,Map<String, Set<String>> ruianMallAll) {
        Row[] rows = (Row[])bill
                .filter("billType=1 and receivableAmount>0")
                .join(ruian, bill.col("shopId").equalTo(ruian.col("rmid")), "leftouter")
                .groupBy("name", "area_cn")
                .agg(functions.sum("receivableAmount").alias("mallDaySale"), functions.count("receivableAmount").alias("mallDayBillNum"))
//                .orderBy("area_cn")
                .select("name","area_cn","mallDaySale","mallDayBillNum")
                .collect();
        Map<String,List<AreaProjSale>> rs=new HashMap<>();
        if(rows.length>0) {
            for (Row row : rows) {
                String mallName=null;
                String areaCn=null;
                double mallDaySale=0D;
                long mallDayBillNum=0L;
                AreaProjSale obj=new AreaProjSale();
                if(!row.isNullAt(0)){//項目、mall的名稱
                    mallName=row.getString(0);
                    obj.setN(mallName);
                }
                if(!row.isNullAt(1)){//區域名稱
                    areaCn=row.getString(1);
                }
                if(!row.isNullAt(2)){//mall的日銷售額
                    mallDaySale=row.getDouble(2);
                    obj.setSa(mallDaySale);
                }
                if(!row.isNullAt(3)){//mall的日結帳單數
                    mallDayBillNum=row.getLong(3);
                    obj.setBn(mallDayBillNum);
                }
                //判斷是否有該區域
                if(rs.containsKey(areaCn)){
                    rs.get(areaCn).add(obj);
                }else{//不存在該區域
                    List<AreaProjSale> list=new ArrayList<>();
                    list.add(obj);
                    rs.put(areaCn, list);
                }
            }
        }
        //填充未產生帳單的數據,默認0
        //獲取所有瑞安的區域和mallName的集合
        Set<Entry<String,Set<String>>> entrySet = ruianMallAll.entrySet();
        for(Map.Entry<String, Set<String>> entry:entrySet){
            String areaCn=entry.getKey();//獲取區域名稱
            //獲取每一個區域的標準mall的集合
            Set<String> mallSet=entry.getValue();
            if(rs.containsKey(areaCn)){//實際數據中已經存在該區域相關數據
                //判斷實際數據mall是否完整
                //用來存放實際數據中mallname的集合
                Set<String> mallSetCur=new HashSet<>();
                //用來存放沒有帳單的mall的集合
                Set<String> mallSetNew=new HashSet<>();
                //遍歷該區域下實際數據集合 並填充set
                for(AreaProjSale obj:rs.get(areaCn)){
                    mallSetCur.add(obj.getN());
                }
                //求兩個set的差集合  將標準數據放入
                mallSetNew.addAll(mallSet);
                //求差集合  標準數據-實際數據 獲得差集
                mallSetNew.removeAll(mallSetCur);
                //遍歷差集合 ,填充默認值
                mallSetNew.forEach(mn->{
                    rs.get(areaCn).add(new AreaProjSale(mn));
                });
            }else{//該區域不存在
                //便利該區域下標準mall集合,逐個放入
                List<AreaProjSale> list=new ArrayList<>();
                mallSet.forEach(mn->{
                    list.add(new AreaProjSale(mn));
                });
                rs.put(areaCn, list);
            }
        }
        //各區域銷售額佔比
        areaSaleProportion(rs);
        //各區域單均消費
        projectAvgConsumer(rs);
        //轉成json
        Map<String, Map<String,List<AreaProjSale>>> pmap=new HashMap<>();
        pmap.put("各項目銷售額對比", rs);
        //轉成json
        String str=JSONObject.toJSONString(pmap);
        System.out.println("====####--projectContrast##"+str);
        //1 8各項目銷售額對比dpc_8 
        RedisUtil.setObject("dpc_8", str);
    }
    
    /**
     * 全部mall中銷售額最高的top10店鋪
     * @param bill
     * @param shop
     * @param ruian
     */
    public void saleForShopTop10(Dataset<Row> bill, Dataset<Row> shop, Dataset<Row> ruian) {
        Row[] rows = (Row[])bill
                .filter("billType=1 and receivableAmount>0")
                .join(shop, bill.col("shopEntityId").equalTo(shop.col("shop_entity_id")), "leftouter")
                .join(ruian, bill.col("shopId").equalTo(ruian.col("rmid")), "leftouter")
                .groupBy("shop_entity_name", "name")
                .agg(functions.sum("receivableAmount").alias("shopDaySale"))
                .orderBy(new Column("shopDaySale").desc())
                .select("shop_entity_name","name","shopDaySale")
                .limit(10)
                .head(10);
        List<ShopSaleRank> ssrList=new LinkedList<>();
        if(rows.length>0) {
            for (Row row : rows) {
                ShopSaleRank ssr=new ShopSaleRank();
                if(!row.isNullAt(0)){//店鋪名稱
                    ssr.setSn(row.getString(0));
                }
                if(!row.isNullAt(1)){//項目/mall名稱
                    ssr.setPn(row.getString(1));
                }
                if(!row.isNullAt(2)){//店鋪日銷售額
                    ssr.setSa(row.getDouble(2));
                }
                ssrList.add(ssr);
            }
        }
        //轉成json
        Map<String, List<ShopSaleRank>> pmap=new HashMap<>();
        pmap.put("店鋪銷售排行", ssrList);
        String str=JSON.toJSONString(pmap);
        System.out.println("====####--saleForShopTop10##"+str);
        //將10個店鋪日銷售額放入redis
        RedisUtil.setObject("dsfst_9",str);
    }
    
    /**
     * 各項目業態銷售額對比
     * 
     * @param session
     * @param beginYMDH
     * @param endYMDH
     */
    public void projectTypeSaleContrast(Dataset<Row> bill, Dataset<Row> shop, Dataset<Row> type, Dataset<Row> ruian,
            Set<String> areaSet,Set<String> mallSet,Set<String> ruianTypeAll) {
        Row[] rows = (Row[])bill
                .filter("billType=1 and receivableAmount>0")
                .join(shop, shop.col("shop_entity_id").equalTo(bill.col("shopEntityId")), "leftouter")
                .join(type, type.col("id").equalTo(shop.col("shop_entity_type_root")), "leftouter")
                .join(ruian, ruian.col("rmid").equalTo(bill.col("shopId")))
                .groupBy("name", "shop_type_name")
                .agg(functions.sum("receivableAmount").alias("shopTypeDaySale"), functions.countDistinct("shop_entity_id").alias("shopNum"))
                .select("name","shop_type_name","shopTypeDaySale","shopNum")
                .collect();
        Map<String,List<ProjectTypeShopNumber>> map=new HashMap<>();
        if(rows.length>0) {
            for (Row row : rows) {
                ProjectTypeShopNumber pac=new ProjectTypeShopNumber();
                String mallName=null;
                if(!row.isNullAt(0)){//項目、mall名稱
                    mallName=row.getString(0);
                }
                if(!row.isNullAt(1)){//業態名稱
                    pac.setN(row.getString(1));
                }else{
                    pac.setN("其它");
                }
                if(!row.isNullAt(2)){//mall的業態日銷售額
                    pac.setSa(row.getDouble(2));
                }
                if(!row.isNullAt(3)){//mall的業態店鋪數量
                    pac.setSn(row.getLong(3));
                }
                if(map.containsKey(mallName)){//更新map中的list列表
                    List<ProjectTypeShopNumber> pacList=map.get(mallName);
                    pacList.add(pac);
                }else{//新的mall 新建列表放入map
                    List<ProjectTypeShopNumber> pacList=new LinkedList<>();
                    pacList.add(pac);
                    map.put(mallName, pacList);
                }
            }
        }
        //爲沒有產生帳單的業態或mall填充默認數據,是數據看起來完整
        mallSet.forEach(mallName->{
            if(!map.containsKey(mallName)) {//沒有帳單的mall
                List<ProjectTypeShopNumber> list=new ArrayList<>();
                //遍歷全量業態,進行填充
                ruianTypeAll.forEach(type_->{
                    list.add(new ProjectTypeShopNumber(type_,0.0,0));
                });
                map.put(mallName,list);
            }else{//有帳單的mall
                //用來獲取實際帳單數據中已存在的業態
                Set<String> ruianTypeCur = new HashSet<>();
                //用來存放沒有帳單的業態
                Set<String> ruianTypeNew = new HashSet<>();
                //遍歷數據,提取實際數據中的業態
                for(ProjectTypeShopNumber obj:map.get(mallName)){
                    ruianTypeCur.add(obj.getN());
                }
                //將產生帳單的業態和全量業態求差即
                ruianTypeNew.addAll(ruianTypeAll);
                ruianTypeNew.removeAll(ruianTypeCur);
                //將沒有實際帳單的業態數據填從(默認值填充)                            
                for(String type_:ruianTypeNew){
                    map.get(mallName).add(new ProjectTypeShopNumber(type_,0.0,0));
                }
            }
        });
        //爲==各項目店鋪數量==填充數據
        projectShopNumber(map);
        //轉成json
        Map<String, Map<String,List<ProjectTypeShopNumber>>> pmap=new HashMap<>();
        pmap.put("各項目業態銷售額對比", map);
        String str=JSON.toJSONString(pmap);
        System.out.println("====####--projectTypeSaleContrast##"+str);
        RedisUtil.setObject("dptsc_11",str);
    }
    
    //=============================================================================================================
    /**
     * 結帳單數
     * 
     * @param tsb
     */
    public void totalSettlementBillNumber(TotalSettlementBillNumber tsb) {
         Map<String, TotalSettlementBillNumber> map=new HashMap<>();
         map.put("結帳單數", tsb);
         String str=JSONObject.toJSONString(map);
         System.out.println("====####--totalSettlementBillNumber##"+str);
         //將結帳單數放入redis
         RedisUtil.setObject("dtsbn_6", str);
    }
    
    /**
     * 各區域銷售佔比
     * 直接將數據封裝成json,無需額外處理
     * 
     * @param rs 已經封裝好的數據,請參考{@link com.mengyao.graph.etl.apps.dashboard.service.SaleAnalysisService.projectContrast()}
     */
    private void areaSaleProportion(Map<String, List<AreaProjSale>> rs) {
        Map<String, Map<String,List<AreaProjSale>>> pmap=new HashMap<>();
        pmap.put("各區域銷售額佔比 ", rs);
        //轉成json
        String str=JSONObject.toJSONString(pmap);
        System.out.println("====####--areaSaleProportion##"+str);
        //各區域銷售佔比dasp_1
        RedisUtil.setObject("dasp_1", str);
    }
    
    /**
     *  各項目單均消費
     *  版本二 營業時間24小時制
     *    一、各項目:各個mall;
     *    二、單均消費:mall的當日開始營業時間到當前時間累計銷售額,應收額累計/結帳單累計;
     *  直接將數據封裝成json,無需額外處理
     *  
     *  @param rs 已經封裝好的數據,請參考{@link com.mengyao.graph.etl.apps.dashboard.service.SaleAnalysisService.projectContrast()}
     */
    private void projectAvgConsumer(Map<String, List<AreaProjSale>> rs) {
        Map<String, Map<String,List<AreaProjSale>>> pmap=new HashMap<>();
        pmap.put("各項目單均消費", rs);
        //轉成json
        String str=JSONObject.toJSONString(pmap);
        System.out.println("====####--projectAvgConsumer##"+str);
        //10各項目單均消費dpac_10
        RedisUtil.setObject("dpac_10", str);        
    }
    
    /**
     * 各項目店鋪數量
     * 數據填充自上面的方法
     * 
     * {@link projectTypeSaleContrast(SparkSession session,String beginYMDH,String endYMDH)}
     */
    private void projectShopNumber(Map<String,List<ProjectTypeShopNumber>> map) {
        //轉成json
        Map<String, Map<String,List<ProjectTypeShopNumber>>> pmap=new HashMap<>();
        pmap.put("各項目店鋪數量", map);
        String str=JSON.toJSONString(pmap);
        System.out.println("====####--projectShopNumber##"+str);
        RedisUtil.setObject("dpsn_12",str);
    }
    
    
}
相關文章
相關標籤/搜索