應用場景:實時儀表盤(即大屏),每一個集團下有多個mall,每一個mall下包含多家shop,需實時計算集團下各mall及其shop的實時銷售分析(區域、業態、店鋪TOP、總銷售額等指標)並提供可視化展示,以前時候一直在Strom實現,如今改成Spark2.3.2實現。java
一、數據源:首先數據源來自於MQ、Socket、Flume和DFS等,通常Kafka、RocketMQ等居多,此處示例代碼用的是RocketMQ;mysql
二、實時計算框架:Storm(實時計算,Spout發射Tuple到各個Bolt,來一條即處理一條,一百毫秒之內的延遲)、SparkStreaming(準實時計算,基於微批次的實時計算,即必定時間段內的micro batch,每一個micro batch的結構爲DStream,底層是RDD);web
三、此處Spark Streaming準實時處理應用流程:一、RocketMQ --> 二、SparkStreaming --> 三、SparkSQL(Parquet) --> 四、Redis(大屏使用) && HDFS(Hive數倉ODS層,ODS->DW[DWD-DWS]-DM);redis
四、系統設計sql
五、代碼以下(4.3是Spark Streaming實現,複製粘貼便可運行):mongodb
5.一、RocketMQConfig類(用於配置和構建MQ消費者)數據庫
package com.mengyao.graph.etl.apps.commons.datasource.mq.receiver; import org.apache.commons.lang.Validate; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.remoting.common.RemotingUtil; import java.util.HashMap; import java.util.UUID; /** * RocketMQConfig for Consumer * @author mengyao * */ public class RocketMQConfig { // ------- the following is for common usage ------- /** * RocketMq name server address */ public static final String NAME_SERVER_ADDR = "nameserver.addr"; // Required public static final String CLIENT_NAME = "client.name"; public static final String CLIENT_IP = "client.ip"; public static final String DEFAULT_CLIENT_IP = RemotingUtil.getLocalAddress(); public static final String CLIENT_CALLBACK_EXECUTOR_THREADS = "client.callback.executor.threads"; public static final int DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS = Runtime.getRuntime().availableProcessors();; public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval"; public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 seconds public static final String BROKER_HEART_BEAT_INTERVAL = "brokerserver.heartbeat.interval"; public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds // ------- the following is for push consumer mode ------- /** * RocketMq consumer group */ public static final String CONSUMER_GROUP = "consumer.group"; // Required /** * RocketMq consumer topic */ public static final String CONSUMER_TOPIC = "consumer.topic"; // Required public static final String CONSUMER_TAG = "consumer.tag"; public static final String DEFAULT_TAG = "*"; public static final String CONSUMER_OFFSET_RESET_TO = "consumer.offset.reset.to"; public static final String CONSUMER_OFFSET_LATEST = "latest"; public static final String CONSUMER_OFFSET_EARLIEST = "earliest"; public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp"; public static final String CONSUMER_MESSAGES_ORDERLY = "consumer.messages.orderly"; public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval"; public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds public static final String CONSUMER_MIN_THREADS = "consumer.min.threads"; public static final int DEFAULT_CONSUMER_MIN_THREADS = 20; public static final String CONSUMER_MAX_THREADS = "consumer.max.threads"; public static final int DEFAULT_CONSUMER_MAX_THREADS = 64; // ------- the following is for reliable Receiver ------- public static final String QUEUE_SIZE = "spout.queue.size"; public static final int DEFAULT_QUEUE_SIZE = 500; public static final String MESSAGES_MAX_RETRY = "spout.messages.max.retry"; public static final int DEFAULT_MESSAGES_MAX_RETRY = 3; public static final String MESSAGES_TTL = "spout.messages.ttl"; public static final int DEFAULT_MESSAGES_TTL = 300000; // 5min // ------- the following is for pull consumer mode ------- /** * Maximum rate (number of records per second) at which data will be read from each RocketMq partition , * and the default value is "-1", it means consumer can pull message from rocketmq as fast as the consumer can. * Other that, you also enables or disables Spark Streaming's internal backpressure mechanism by the config * "spark.streaming.backpressure.enabled". */ public static final String MAX_PULL_SPEED_PER_PARTITION = "pull.max.speed.per.partition"; /** * To pick up the consume speed, the consumer can pull a batch of messages at a time. And the default * value is "32" */ public static final String PULL_MAX_BATCH_SIZE = "pull.max.batch.size"; /** * pull timeout for the consumer, and the default time is "3000". */ public static final String PULL_TIMEOUT_MS = "pull.timeout.ms"; // the following configs for consumer cache public static final String PULL_CONSUMER_CACHE_INIT_CAPACITY = "pull.consumer.cache.initialCapacity"; public static final String PULL_CONSUMER_CACHE_MAX_CAPACITY = "pull.consumer.cache.maxCapacity"; public static final String PULL_CONSUMER_CACHE_LOAD_FACTOR = "pull.consumer.cache.loadFactor"; public static void buildConsumerConfigs(HashMap<String, String> props, DefaultMQPushConsumer consumer) { buildCommonConfigs(props, consumer); String group = props.get(CONSUMER_GROUP); Validate.notEmpty(group); consumer.setConsumerGroup(group); consumer.setPersistConsumerOffsetInterval(getInteger(props, CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL)); consumer.setConsumeThreadMin(getInteger(props, CONSUMER_MIN_THREADS, DEFAULT_CONSUMER_MIN_THREADS)); consumer.setConsumeThreadMax(getInteger(props, CONSUMER_MAX_THREADS, DEFAULT_CONSUMER_MAX_THREADS)); String initOffset = props.get(CONSUMER_OFFSET_RESET_TO) != null ? props.get(CONSUMER_OFFSET_RESET_TO) : CONSUMER_OFFSET_LATEST; switch (initOffset) { case CONSUMER_OFFSET_EARLIEST: consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); break; case CONSUMER_OFFSET_LATEST: consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); break; case CONSUMER_OFFSET_TIMESTAMP: consumer.setConsumeTimestamp(initOffset); break; default: consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); } String topic = props.get(CONSUMER_TOPIC); Validate.notEmpty(topic); try { consumer.subscribe(topic, props.get(CONSUMER_TAG) != null ? props.get(CONSUMER_TAG) : DEFAULT_TAG); } catch (MQClientException e) { throw new IllegalArgumentException(e); } } public static void buildCommonConfigs(HashMap<String, String> props, ClientConfig client) { String namesvr = props.get(NAME_SERVER_ADDR); Validate.notEmpty(namesvr); client.setNamesrvAddr(namesvr); client.setClientIP(props.get(CLIENT_IP) != null ? props.get(CLIENT_IP) : DEFAULT_CLIENT_IP); // use UUID for client name by default String defaultClientName = UUID.randomUUID().toString(); client.setInstanceName(props.get(CLIENT_NAME) != null ? props.get(CLIENT_NAME) : defaultClientName); client.setClientCallbackExecutorThreads(getInteger(props, CLIENT_CALLBACK_EXECUTOR_THREADS, DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS)); client.setPollNameServerInterval(getInteger(props, NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL)); client.setHeartbeatBrokerInterval(getInteger(props, BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL)); } public static int getInteger(HashMap<String, String> props, String key, int defaultValue) { return Integer.parseInt(props.get(key) != null ? props.get(key) : String.valueOf(defaultValue)); } public static boolean getBoolean(HashMap<String, String> props, String key, boolean defaultValue) { return Boolean.parseBoolean(props.get(key) != null ? props.get(key) : String.valueOf(defaultValue)); } }
4.二、SparkStreaming自定義RocketMQ接收器(可靠的接收器)apache
package com.mengyao.graph.etl.apps.commons.datasource.mq.receiver; import org.apache.activemq.util.ByteArrayInputStream; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.Validate; //import org.apache.commons.lang3.SerializationUtils; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.receiver.Receiver; import com.gooagoo.entity.mq.bill.MQBillMessage; import com.mengyao.graph.etl.apps.commons.beans.ods.fmt.BillInfoFmt; import com.mengyao.graph.etl.apps.commons.beans.ods.fmt.ConvertTool; import com.mengyao.graph.etl.apps.dashboard.beans.BCD; import java.io.IOException; import java.io.ObjectInputStream; import java.util.HashMap; import java.util.List; /** * RocketMQ Receiver * @author mengyao * */ public class RocketMQReceiver extends Receiver<BCD> { /** * */ private static final long serialVersionUID = 2274826339951693341L; private MQPushConsumer consumer; private boolean ordered; private HashMap<String, String> conf; public RocketMQReceiver(HashMap<String, String> conf, StorageLevel storageLevel) { super(storageLevel); this.conf = conf; } @Override public void onStart() { Validate.notEmpty(conf, "Consumer properties can not be empty"); ordered = RocketMQConfig.getBoolean(conf, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, true); consumer = new DefaultMQPushConsumer(); RocketMQConfig.buildConsumerConfigs(conf, (DefaultMQPushConsumer)consumer); if (ordered) { consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { if (process(msgs)) { return ConsumeOrderlyStatus.SUCCESS; } else { return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } }); } else { consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { if (process(msgs)) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } else { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); } try { consumer.start(); System.out.println("==== RocketMQReceiver start ===="); } catch (MQClientException e) { e.printStackTrace(); throw new RuntimeException(e); } } public boolean process(List<MessageExt> msgs) { if (msgs.isEmpty()) { System.out.println("==== Msgs is null! ===="); return true; } System.out.println("==== receiver msgs: "+msgs.size()+" record. ===="); try { for (MessageExt messageExt : msgs) { //MQBillMessage message = SerializationUtils.deserialize(messageExt.getBody()); MQBillMessage message = deserialize(messageExt.getBody()); if (null != message) { BillInfoFmt billFmt = ConvertTool.convertGagBillToOdsBillFmt(message.getData()); if (validBillType(billFmt)) {
/**
* this.store(BCD) 簡單的接收一條存儲一條,缺點是沒有確認機制,不具有容錯保證,會出現數據丟失。優勢則是效率更高。
* this.store(Iterator<BCD>)阻塞調用,當接收到的記錄都存儲到Spark後纔會確認成功,當接收方採用複製(默認存儲級別爲複製)則在複製完成後確認成功,但在緩衝中的數據不被保證,會被從新發送。具有容錯保證,可確保數據0丟失。
*/ this.store(new BCD(billFmt.getId(), billFmt.getReceivableAmount(), billFmt.getSaleTime(), billFmt.getBillType(), billFmt.getShopId(), billFmt.getShopEntityId())); } billFmt=null; message.setData(null); message = null; } else { System.out.println("==== receiver msg is:"+messageExt.getBody()+", deserialize faild. ===="); } } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 驗證帳單類型是否爲一、三、6 * @param bean * @return */ static boolean validBillType(BillInfoFmt bean) { if (null == bean) { System.out.println("==== BillBean is null! ===="); return false; } String billType = bean.getBillType(); if (StringUtils.isEmpty(billType)) { System.out.println("==== BillBean.billType is null! ===="); return false; } String billTypeTrim = billType.trim(); return billTypeTrim.equals("1") || billType.equals("3") || billType.equals("6"); } @Override public void onStop() { consumer.shutdown(); System.out.println("==== RocketMQReceiver stop ===="); } private static MQBillMessage deserialize(byte[] body) throws Exception { if (body == null) { throw new IllegalArgumentException("The byte array must not be null"); } MQBillMessage message = null; ObjectInputStream in = null; ByteArrayInputStream bais = null; try { bais = new ByteArrayInputStream(body); in = new ObjectInputStream(bais); message = (MQBillMessage) in.readObject(); } catch (final ClassNotFoundException ex) { ex.printStackTrace(); throw ex; } catch (final IOException ex) { ex.printStackTrace(); throw ex; } finally { try { if (bais != null) { bais.close(); } if (in != null) { in.close(); } } catch (final IOException ex) { // ignore close exception } } return message; } }
4.三、SparkStreaming銷售分析實時計json
package com.mengyao.graph.etl.apps.commons.datasource.bill; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.commons.lang.Validate; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.api.java.function.Function0; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.SizeEstimator; import org.apache.spark.streaming.Time; import com.mengyao.graph.etl.apps.commons.beans.dim.RuianMall; import com.mengyao.graph.etl.apps.commons.datasource.mq.receiver.RocketMQConfig; import com.mengyao.graph.etl.apps.commons.datasource.mq.receiver.RocketMQReceiver; import com.mengyao.graph.etl.apps.dashboard.beans.BCD; import com.mengyao.graph.etl.apps.dashboard.service.SaleAnalysisService; /** * BillConsumer 大屏實時計算 * 一、hdfs dfs -rm -r hdfs://bd001:8020/data/consumer/bill/checkpoint/* * 二、hdfs dfs -rm -r hdfs://bd001:8020/data/dashboard/ruian/sdt=curTime * 三、spark-submit --class com.mengyao.graph.etl.apps.commons.datasource.bill.BillConsumer --master yarn --deploy-mode cluster --driver-memory 4g --executor-cores 6 --executor-memory 5g --queue default --verbose data-graph-etl.jar * * @author mengyao * */ public class BillConsumer { private static final ThreadLocal<SimpleDateFormat> FORMATTER_YMD = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMdd")); private static final ThreadLocal<SimpleDateFormat> FORMATTER_YMDHMS = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMddHHmmss")); private static final ThreadLocal<SimpleDateFormat> FORMATTER_YMDHMSS = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMddHHmmssSSS")); private static final String BASE_PATH = "hdfs://bd001:8020/data/dashboard/ruian/"; private static final String TMP_PATH = "/merge"; private static String appName = "BillConsumer"; private static String logLevel = "ERROR"; public static void main(String[] args) { args = new String[] {"hdfs://bd001:8020/data/consumer/bill/checkpoint", "10", "base.mq.goo.com:9877", "Bill", "bill_dev_group_00013"}; if (args.length < 5) { System.err.println("Usage: "+appName+" <checkpointDir> <milliseconds> <namesrvAddr> <groupId> <topic>"); System.exit(1); } String checkPointDir = args[0]; int second = Integer.parseInt(args[1]); String namesrv = args[2]; Validate.notNull(namesrv, "RocketMQ namesrv not null!"); String topic = args[3]; Validate.notNull(topic, "RocketMQ topic not null!"); String group = args[4]; Validate.notNull(group, "RocketMQ group not null!"); Function0<JavaStreamingContext> jscFunc = () -> createJavaSparkStreamingContext(checkPointDir, second, namesrv, topic, group); JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(checkPointDir, jscFunc, new Configuration()); jssc.sparkContext().setLogLevel(logLevel); try { jssc.start(); jssc.awaitTermination(); } catch (InterruptedException e) { e.printStackTrace(); } finally { jssc.stop(false, true);//關閉StreamingContext但不關閉SparkContext,同時等待數據處理完成 } } /** * 獲取當前時間yyyyMMdd,匹配帳單數據saleTime * @param curTime * @return */ static String getCurrentDate(long curTime) { return FORMATTER_YMD.get().format(new Date(curTime)); } /** * 打印bill RDD<BCD>的分區及佔用空間大小,debug方法 * @param rdd * @param time */ static void printStream(JavaRDD<BCD> rdd, Time time) {rdd.id(); System.out.println("==== time: "+FORMATTER_YMDHMSS.get().format(new Date(time.milliseconds()))+", rdd: partitions="+rdd.getNumPartitions()+", space="+SizeEstimator.estimate(rdd)/1048576+"mb"); } /** * 合併parquet小文件 * @param session * @param dfsDir */ static void mergeSmallFiles(Dataset<Row> fulls, SparkSession session, String dfsDir) { fulls.coalesce(1).write().mode(SaveMode.Overwrite).parquet(BASE_PATH+TMP_PATH); session.read().parquet(BASE_PATH+TMP_PATH).coalesce(1).write().mode(SaveMode.Overwrite).parquet(dfsDir); } /** * 建立DFS Dir * @param session * @param dfsDir */ static void mkDfsDir(SparkSession session, String dfsDir) { FileSystem fs = null; try { fs = FileSystem.get(session.sparkContext().hadoopConfiguration()); Path path = new Path(dfsDir); if(!fs.exists(path)) { fs.mkdirs(path); } } catch (IOException e) { e.printStackTrace(); } finally { try { if (null != fs) {fs.close();} } catch (IOException e) { e.printStackTrace(); } } } /** * 容錯Driver * @param checkPointDir * @param second * @param namesrv * @param topic * @param group * @return */ static JavaStreamingContext createJavaSparkStreamingContext(String checkPointDir, int second, String namesrv, String topic, String group) { try { SparkConf conf = new SparkConf() //==== Enable Back Pressure .set("spark.streaming.backpressure.enabled", "true")//啓用被壓機制 .set("spark.streaming.backpressure.initialRate", "50000")//初始接收數據條數,如該值爲空時使用spark.streaming.backpressure.initialRate爲默認值 .set("spark.streaming.receiver.maxRate", "100")//每秒接收器可接收的最大記錄數 //==== Enable Dynamic Resource Allocation .set("spark.dynamicAllocation.enabled", "false")//禁用spark動態資源分配 .set("spark.streaming.dynamicAllocation.enabled", "true")//啓用SparkStreaming動態資源分配,該配置和spark動態資源分配存在衝突,只能使用一個 .set("spark.streaming.dynamicAllocation.minExecutors", "2")//啓用SparkStreaming動態資源分配後的給應用使用的最小executor數 .set("spark.streaming.dynamicAllocation.maxExecutors", "3")//啓用SparkStreaming動態資源分配後的給應用使用的最大executor數 //==== Spark Streaming Parallelism and WAL .set("spark.streaming.concurrentJobs", "1")//並行job數量,默認1 .set("spark.streaming.blockInterval", "5000")//SparkStreaming接收器接收數據後5000毫秒生成block,默認200毫秒 .set("spark.streaming.receiver.writeAheadLog.enable", "true")//開啓SparkStreaming接收器的WAL來確保接收器實現至少一次的容錯語義 .set("spark.streaming.driver.writeAheadLog.allowBatching", "true")//driver端WAL .set("spark.streaming.driver.writeAheadLog.batchingTimeout", "15000") //==== Hive on Spark .set("hive.execution.engine", "spark")//設置hive引擎爲spark,hdp-2.6.1.0默認支持tez、mr,可經過應用級別配置使用Hive on Spark .set("hive.enable.spark.execution.engine", "true")//啓用Hive on Spark .set("spark.driver.extraJavaOptions", "-Dhdp.version=2.6.1.0-129")//hdp-2.6.1.0中要求Hive on Spark必須指定driver的jvm參數 .set("spark.yarn.am.extraJavaOptions", "-Dhdp.version=2.6.1.0-129")//hdp-2.6.1.0中要求Hive on Spark必須指定ApplicationMaster的jvm參數 //==== Hive Merge Small Files .set("hive.metastore.uris", "thrift://bd001:9083")//hive ThriftServer .set("hive.merge.sparkfiles", "true")//合併spark小文件 .set("hive.merge.mapfiles", "true")//在只有map任務的做業結束時合併小文件。 .set("hive.merge.mapredfiles", "true")//在mapreduce做業結束時合併小文件。 .set("hive.merge.size.per.task", "268435456")//做業結束時合併文件的大小。 .set("hive.merge.smallfiles.avgsize", "100000")//看成業的平均輸出文件大小小於此數量時,Hive將啓動另外一個map-reduce做業,以將輸出文件合併爲更大的文件。若是hive.merge.mapfiles爲true,則僅對僅map做業執行此操做;對於hive.merge.mapredfiles爲true,僅對map-reduce做業執行此操做。 //==== Spark SQL Optimizer .set("spark.sql.warehouse.dir", "hdfs://bd001:8020/apps/hive/warehouse")//SparkSQL依賴的hive倉庫地址 .set("spark.sql.files.maxPartitionBytes", "134217728")//SparkSQL讀取文件數據時打包到一個分區的最大字節數 .set("spark.sql.files.openCostInBytes", "134217728")//當SparkSQL讀取的文件中有大量小文件時,小於該值的文件將被合併處理,默認4M,此處設置爲128M .set("spark.sql.shuffle.partitions", "600")//SparkSQL運行shuffle的並行度 .set("spark.sql.autoBroadcastJoinThreshold", "67108864")//設置爲64M,執行join時自動廣播小於該值的表,默認10M //==== Spark Core Configure .set("spark.rdd.compress","true")//開啓rdd壓縮以節省內存 .set("spark.default.parallelism", "600")//並行任務數 .set("spark.rpc.askTimeout", "300")//spark rpc超時時間 .set("spark.eventLog.enabled", "true")//開啓eventLog //==== Application Configure .set("spark.app.name", appName)//Spark Application名稱 .set("spark.master", "yarn")//運行模式爲Spark on YARN .set("spark.deploy.mode", "cluster")//部署模式爲yarn-cluster .set("spark.driver.memory", "4g")//driver內存4g .set("spark.driver.cores", "1")//driver計算vcore數量爲1 .set("spark.executor.memory", "5g")//executor內存爲4g .set("spark.executor.heartbeatInterval", "20000")//executor心跳間隔20秒,默認10秒 .set("spark.yarn.archive", "hdfs://bd001:8020/hdp/apps/2.6.1.0-129/spark2")//spark依賴jar存檔到hdfs指定位置 .set("spark.executor.extraJavaOptions", "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC")//打印GC詳情和耗時 .set("spark.jars", "/usr/hdp/2.6.1.0-129/sqoop/lib/mysql-connector-java.jar")//若是使用了數據庫驅動,則經過此配置便可 //==== Serialized Configure .set("spark.kryoserializer.buffer", "512k")//默認64k,設置爲256k .set("spark.kryoserializer.buffer.max", "256m")//默認64m,設置爲256m .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//使用kryo序列化庫 .registerKryoClasses(new Class[]{HashMap.class, BCD.class}) ; //構建MQ配置 @SuppressWarnings("serial") HashMap<String, String> mqConf = new HashMap<String, String>() {{ put(RocketMQConfig.NAME_SERVER_ADDR, namesrv); put(RocketMQConfig.CONSUMER_TOPIC, topic); put(RocketMQConfig.CONSUMER_GROUP, group); }}; JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(second)); jssc.checkpoint(checkPointDir); jssc.remember(Durations.minutes(1440)); //接収RocketMQ帳單 JavaReceiverInputDStream<BCD> billListRDD = jssc.receiverStream(new RocketMQReceiver(mqConf, StorageLevels.MEMORY_AND_DISK_SER)); billListRDD .foreachRDD((rdd, time) -> { boolean isEmpty = rdd.partitions().isEmpty(); if (!isEmpty) { printStream(rdd, time); long start = System.currentTimeMillis(); String curTime = getCurrentDate(start).intern(); String dfsDir = (BASE_PATH+"sdt="+curTime+"/").intern(); //帳單計數器 //LongAccumulator billAccumulator = BillAccumulator.getInstance(JavaSparkContext.fromSparkContext(rdd.context()), appName); //billAccumulator.add(rdd.count()); //初始化SparkSession SparkSession session = HiveSession.getInstance(conf); //維表等廣播 BroadcastDIM dim = BroadcastWrapper.getInstance(JavaSparkContext.fromSparkContext(rdd.context())).getValue(); SaleAnalysisService service = dim.getService(); Dataset<Row> shop = dim.getShop(); Dataset<Row> type = dim.getType(); Dataset<Row> ruian = dim.getRuian(); String mallIdStr = dim.getMallIdStr(); Set<String> areaSet=dim.getAreaSet(); Map<String, Set<String>> areaMall=dim.getAreaMall(); Set<String> mallSet=dim.getMallSet(); Set<String> typeSet=dim.getTypeSet(); //若是時間爲00:00:00時則認爲是新的一天,更新廣播數據 if ((curTime+"000000").equals(FORMATTER_YMDHMS.get().format(new Date(time.milliseconds())))) { BroadcastWrapper.update(session, JavaSparkContext.fromSparkContext(rdd.context()));//更新dim表數據 mkDfsDir(session, dfsDir);//初始化dfsDir } //先持久化本次接收到的帳單(寫入當日) session.createDataFrame(rdd, BCD.class) .filter("sdt = "+curTime) .write() .mode("append") .parquet(dfsDir); //再讀取全量帳單(讀取當日) Dataset<Row> bills = session.read().parquet(dfsDir) .filter("shopId in ("+mallIdStr+")") .dropDuplicates("billId") //.coalesce(1) .cache(); //計算大屏指標 System.out.println("==== 計算指標:時間條件:"+curTime+" ===="); service.totalSale(bills); service.totalRefund(bills); service.peakTime(bills); service.areaSaleTrendForAll(bills, ruian,areaSet, curTime); service.projectContrast(bills, ruian,areaMall); service.saleForShopTop10(bills, shop, ruian); service.projectTypeSaleContrast(bills, shop, type, ruian,areaSet,mallSet,typeSet); long end = System.currentTimeMillis(); System.out.println("==== 計算指標:耗時:"+(end-start)+"/ms ===="); bills.unpersist(); //天天最多存6個文件 if (bills.inputFiles().length > 6) { mergeSmallFiles(bills, session, dfsDir); } } else {//若是SparkStreaming接收的Batch爲空,則不作處理 System.out.println("==== rdd is null! "); } }); return jssc; } catch (Exception e) { e.printStackTrace(); } return null; } } /** * 廣播DIM相關數據 * @author mengyao * */ class BroadcastWrapper { private static volatile Broadcast<BroadcastDIM> instance = null; public static Broadcast<BroadcastDIM> getInstance(JavaSparkContext jsc) { if (instance == null) { synchronized (BroadcastWrapper.class) { if (instance == null) { SparkSession session = HiveSession.getInstance(jsc.getConf()); BroadcastDIM dim = new BroadcastDIM(session); dim.assign(); instance = jsc.broadcast(dim); } } } return instance; } /** * 每日更新數據 * @param batchTime batch發生時間 * @param dayES 每日開始時間 */ public static void update(SparkSession session, JavaSparkContext jsc) { BroadcastDIM dim = instance.getValue(); if (null!=dim) { dim.assign(); jsc.broadcast(dim); } } } class BroadcastDIM { private SparkSession session; private SaleAnalysisService service = new SaleAnalysisService(); private Dataset<Row> shop; private Dataset<Row> type; private Dataset<Row> ruian; private String mallIdStr; private List<RuianMall> ruianList ; private Set<String> areaSet; private Set<String> typeSet; private Set<String> mallSet; private Map<String, Set<String>> areaMall; public BroadcastDIM(SparkSession session) { this.session = session; } public void assign() { ruian = session.sql("select id,item,name,area_en,area_cn,channel,mid,rmid from tbl_dim_ruian").cache(); Row[] ruianRows = (Row[])ruian.collect(); setRuianList(ruianRows); setAreaSet(); setMallIdStr(); setRuianMallAll(); setMallSet(); shop = session.sql("select id,shop_entity_id,shop_entity_name,shop_id,shop_entity_type_root,shop_entity_type_leaf,bill_match_mode,leasing_model,shop_entity_status,open_time,close_time,open_time_list,close_time_list,monite_begin_time_list,monite_end_time_list,marketing_copywriter,marketing_image,font_style,font_size,contract_area,province,city,area,billhint,is_del,brand,brand_code,classify,in_aera,storey,leasing_resource,shop_entity_source,create_time,c_time_stamp,shop_entity_img,business_area_id,source,status,logo,door_head_photo,business_license,certificate,coordinates,consume_per,brand_name,brand_log,alipay,process " + "from tbl_ods_shop where shop_id in ("+mallIdStr+")").cache(); type = session.sql("select * from tbl_ods_type").cache(); setRuianType(); } public void setRuianList(Row[] rows) { if(rows.length>0){ ruianList=new ArrayList<>(); for(Row row:rows){ RuianMall rm=new RuianMall(); if(!row.isNullAt(0)){//id rm.setId(row.getInt(0)); } if(!row.isNullAt(1)){//item rm.setItem(row.getString(1)); } if(!row.isNullAt(2)){//name rm.setName(row.getString(2)); } if(!row.isNullAt(3)){//area_en rm.setAreaEn(row.getString(3)); } if(!row.isNullAt(4)){//area_cn rm.setArenCn(row.getString(4)); } if(!row.isNullAt(5)){//channel rm.setChannel(row.getString(5)); } if(!row.isNullAt(6)){//mid rm.setMid(row.getInt(6)); } if(!row.isNullAt(7)){//rmid rm.setRmid(row.getString(7)); } ruianList.add(rm); } } } /** * 提取rmid 拼接成字符串 * @param rows */ public void setMallIdStr() { if(ruianList.size()>0){ StringBuilder sbStr=new StringBuilder(); for(RuianMall row : ruianList){ sbStr.append("'").append(row.getRmid()).append("',"); } String tmpValue = sbStr.toString(); mallIdStr=tmpValue.substring(0, tmpValue.length()-1); } } /** * 提取非空惟一中文區域名稱 * @return */ public void setAreaSet() { if(ruianList.size()>0){ areaSet=new java.util.HashSet<>(); for(RuianMall row:ruianList){ areaSet.add(row.getArenCn()); } } } /** * 提取瑞安mall 14個機構中文名 * * @return * Map<String,Set<String>> * key:areaCn value : Set<mallName> */ public void setRuianMallAll( ){ areaMall=new HashMap<>(); ruianList.forEach(rm->{ if(areaMall.containsKey(rm.getArenCn())){//存在,更新mall的列表 areaMall.get(rm.getArenCn()).add(rm.getName()); }else{//新的區域,新的mall Set<String> mallSet=new HashSet<>(); mallSet.add(rm.getName()); areaMall.put(rm.getArenCn(),mallSet); } }); } /** * 提取瑞安mall 14個機構中文名 * * @return */ public void setMallSet(){ mallSet=new java.util.HashSet<>(); ruianList.forEach(rm->{ if(!mallSet.contains(rm.getName())){//存在,更新mall的列表 mallSet.add(rm.getName()); } }); } /** * 性能、性能、性能 考慮,先寫死 * 若是用帳單、店鋪、業態關聯查詢,效率會很慢 * * @return */ public void setRuianType(){ //TODO 目前寫死,須要改 typeSet=new HashSet<>(); typeSet.add("服務"); typeSet.add("零售"); typeSet.add("娛樂"); typeSet.add("主力店"); typeSet.add("餐飲"); typeSet.add("其它"); } public SaleAnalysisService getService() { return service; } public void setService(SaleAnalysisService service) { this.service = service; } public Dataset<Row> getShop() { return shop; } public void setShop(Dataset<Row> shop) { this.shop = shop; } public Dataset<Row> getType() { return type; } public void setType(Dataset<Row> type) { this.type = type; } public Dataset<Row> getRuian() { return ruian; } public void setRuian(Dataset<Row> ruian) { this.ruian = ruian; } public String getMallIdStr() { return mallIdStr; } public void setMallIdStr(String mallIdStr) { this.mallIdStr = mallIdStr; } public List<RuianMall> getRuianList() { return ruianList; } public Set<String> getAreaSet() { return this.areaSet; } public Map<String, Set<String>> getAreaMall() { return this.areaMall; } public Set<String> getMallSet() { return this.mallSet; } public Set<String> getTypeSet() { return this.typeSet; } } /** * 帳單累加器 * @author mengyao * */ class BillAccumulator { private static volatile LongAccumulator instance = null; public static LongAccumulator getInstance(JavaSparkContext jsc, String name) { if (instance == null) { synchronized (BillAccumulator.class) { if (instance == null) { instance = jsc.sc().longAccumulator(name); } } } return instance; } } /** * SparkSQL的Hive數據源 * @author mengyao * */ class HiveSession { private static transient SparkSession instance = null; public static SparkSession getInstance(SparkConf conf) { if (instance == null) { instance = SparkSession.builder() .config(conf) .enableHiveSupport() .getOrCreate(); } return instance; } }
4.四、帳單實體類api
package com.mengyao.graph.etl.apps.commons.beans.ods.fmt; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.commons.lang.StringUtils; import com.mengyao.graph.etl.apps.commons.beans.ods.BillInfo; import com.mengyao.graph.etl.apps.commons.beans.ods.DiscountDetailsInfo; import com.mengyao.graph.etl.apps.commons.beans.ods.GoodsDetailInfo; import com.mengyao.graph.etl.apps.commons.beans.ods.SettlementWayInfo; import com.mengyao.utils.DateTimeUtils; /** * 帳單信息 gag_bill.bill_info * * @author jmb * @update 2018-12-13 for mengyao */ public class BillInfoFmt implements Serializable { private static final long serialVersionUID = 1L; /** * 預結單 */ public static final String BILLTYPE_JUJIEDAN = "2"; /** * 結帳單 */ public static final String BILLTYPE_JIEZHANGDAN = "1"; /** * 日結帳單 */ public static final String BILLTYPE_RIJIEDAN = "3"; /** * 點菜單 */ public static final String BILLTYPE_DIANCAIDAN = "7"; /** * 帳單編號(系統產生),UUID */ private String id; /** * 帳單編號(系統產生),HBase主鍵 */ private String rowKey; /** * 商家編號(系統產生) */ private String shopId; /** * 商家名稱(系統產生) */ private String shopName; /** * 實體店編號(系統產生) */ private String shopEntityId; /** * 實體店名稱(系統產生) */ private String shopEntityName; /** * 建立時間(系統產生) */ private String createTime; /** * 最後一次修改時間(系統產生) */ private String cTimeStamp; /** * 信息上傳時間(以header中建立時間爲準yyyyMMddHHmmss) */ private String hcTime; /** * 流水號(header中的流水號) */ private String hserial; /** * 修正帳單數據的設備編號(若是沒有修正,則與原始上傳帳單的採集終端編號相同) */ private String fixTerminal; /** * 採集終端編號(截獲) */ private String terminalNumber; /** * 帳單文件名稱,惟一(截獲),12位MAC地址+17位時間 */ private String billfileName; /** * 反掃支付時終端生成的終端流水號UUID(全球惟一)[先支付後打單必填] */ private String tsuuid; /** * 歷史帳單文件名稱,記錄合併過程當中歷次帳單文件名稱,全量,mongodb爲Array */ private List<String> billfileNameHis; /** * 帳單序號,不保證惟一(截獲) */ private String billNo; /** * 截獲時間(截獲) */ private String interceptTime; /** * 店鋪全名(截獲) */ private String shopEntityFullName; /** * 店鋪地址(截獲) */ private String shopEntityAddress; /** * 電話(截獲) */ private String telephone; /** * 售貨員(截獲) */ private String saler; /** * 收銀臺(截獲) */ private String checkstand; /** * 收銀員(截獲) */ private String cashier; /** * 應收金額(截獲) */ private Double receivableAmount; /** * 原始應收金額(截獲) */ private Double defaultReceivableAmount; /** * 商品數量(截獲) */ private Double totalNum; /** * 原始商品數量(截獲) */ private Double defaultTotalNum; /** * 小票流水號(截獲) */ private String billSerialNumber; /** * 總金額(截獲) */ private Double totalFee; /** * 原始總金額(截獲) */ private Double defaultTotalFee; /** * 實收金額(截獲) */ private Double paidAmount; /** * 原始實收金額(截獲) */ private Double defaultPaidAmount; /** * 折扣金額(截獲) */ private Double discountAmount; /** * 原始折扣金額(截獲) */ private Double defaultDiscountAmount; /** * 優惠金額(截獲) */ private Double couponAmount; /** * 原始優惠金額(截獲) */ private Double defaultCouponAmount; /** * 找零金額(截獲) */ private Double changeAmount; /** * 原始找零金額(截獲) */ private Double defaultChangeAmount; /** * 結算方式(支持多項)(截獲)例:[{"a":5.0,"p":"現金"},{"a":10.01,"p":"書券"}],"a":結算金額,小數[截獲,必填], * "p":"結算方式[截獲,必填]" */ private List<String> settlementWay; /** * 銷售時間(截獲) */ private String saleTime; /** * 會員卡號(截獲) */ private String memberCardNumber; /** * 累計消費(截獲) */ private Double totalConsumption; /** * 原始累計消費(截獲) */ private Double defaultTotalConsumption; /** * 網址(截獲) */ private String website; /** * 小票圖片(截獲),只存url */ private String billImage; /** * 商品詳情(支持多項)(截獲)[{"name":"可樂","itemserial":"PUMU00123","price":5.01,"totalnum":5.0,"totalprice":25.05},{"name":"金槍魚","itemserial":"FOOD02012","price":10.55,"totalnum":1.5,"totalprice":15.83}] * ,"name":"商品名稱[截獲,選填]","itemserial":"條形碼[截獲,選填]","price":單價,小數[截獲,選填],"totalnum":總數,小數[截獲,選填],"totalprice":總價,小數[截獲,選填] */ private List<String> goodsDetails; /** * 房間號(截獲)(酒店特有) */ private String roomNo; /** * 入住姓名(截獲)(酒店特有) */ private String checkinName; /** * 桌號(截獲)(餐飲特有) */ private String deskNo; /** * 消費人數(截獲)(通常用於餐飲) */ private Double consumeNum; /** * 原始消費人數(截獲)(通常用於餐飲) */ private Double defaultConsumeNum; /** * 原始帳單全部文本信息(截獲) */ private String billText; /** * 入住時間(截獲)(酒店特有) */ private String inTime; /** * 離店時間(截獲)(鐘點房特有) */ private String outTime; /** * 默認打印時間 */ private String defaultPrintDate; /** * 默認入住時間 */ private String defaultInTime; /** * 默認離店時間 */ private String defaultOutTime; /** * 上傳類型 1:全單上傳 2. 篩選帳單上傳 */ private String uploadType; /** * 除了上面截獲外的自定義數據(截獲),json串(Map<String, Object>),json串中的key,value由採集終端自定義。 */ private Map<String, String> customRecord; /** * 帳單類型1:結帳單2:預結單 3:日結單 4:處方 5:預付押金單 6:退貨單 7:點菜單 8:發票單 [選填] */ private String billType; /** * 帳單修改方式 0:默認值 1:收銀員重打 2:人工修改金額 3:自動重解析 */ private String modifyType = "0"; /** * 帳單來源 1:設備採集 2:人工補錄三、解析服務 4.第三方 */ private String billSource = "1"; /** * 服務端解析路徑[選填,用於服務端解析分析問題] */ private String analyzPath; /** * 刷卡,錢包等匹配帳單的key,格式BILL_MAC_金額_截獲時間 */ private String billMatchKey; /** * 數據匹配支付結果等成功後,此字段保存支付結果等的主鍵 [選填] 匹配成功後爲必填,理論上應該保證此值爲全局惟一 */ private String matchId; /** * 默認銷售時間; */ private String defaultSaleTime; /** * 客戶名稱[截獲,選填] */ private String customerName; /** * 會員編號[截獲,選填] */ private String membershipId; /** * 會員級別[截獲,選填] */ private String memberLevels; /** * 寵物名稱[截獲,選填] */ private String petName; /** * 寵物編號[截獲,選填] */ private String petNumber; /** * 負責人(醫生)[截獲,選填] */ private String principal; /** * 預交押金[截獲,選填] */ private String deposit; /** * 打印時間yyyyMMddHHmmss[截獲,選填,若是截獲位數不夠,後面補0] */ private String printDate; /** * 默認攔截時間 */ private String defaultInterceptTime; /** * 匹配模型 sk0001:先刷卡後打單,單次刷卡; dd0001:先打單後刷卡,單次刷卡 */ private String printMatchType; /** * 帳單合併時使用,惟一 */ private String billMergeKey; /** * 存重打單的應收金額 */ private Double modifyAmount; /** * 存重打單的應收金額List */ private List<Double> modifyAmountList; /** * 存重打單的應收金額的銷售時間List */ private List<String> modifyAmountSaleTimeList; /** * 存重打單的應收金額的截獲時間List */ private List<String> modifyAmountInterceptTimeList; /** * 惟一標識[選填](目前可用來作積分標識使用) */ private String uniqueId; /** * 歷史惟一標識,記錄合併過程當中歷次惟一標識,全量,mongodb爲Array(目前可用來作積分標識使用) */ private List<String> uniqueIdHis; /** * 是否追加二維碼 1:是 2:否 [必填] */ private String ifqrcode; /** * 優惠券券碼 */ private String couponNum; /** * ERP會員 */ private String erpMemberCard; /** * 憑證類型 11:水單(商戶pos、ERP打印)12:收銀憑證(大pos打印) 13:支付憑證(籤購單)99:類型未知 [選填,默認寫99,表示未知] */ private String voucherType; /** * 小票二維碼[選填] */ private String qrCode; /** * 積分標識 0:本次積分字段沒找到或沒有配置 1:有本次積分字段[選填] */ private String integralmark; /** * 本次積分[選填] */ private String thisintegral; /** * 備註[選填] */ private String remarks; /** * 帳單子類型,庖丁用於配置文件分類 */ private String templateName; /** * 購貨方名稱[截獲,選填] */ private String custName; /** * 購貨方稅號[截獲,選填] */ private String custTaxNo; /** * 購貨方地址、電話[截獲,選填] */ private String custAdress; /** * 購貨方銀行及帳號[截獲,選填] */ private String custBankAccount; /** * 第三方訂單號(第三方系統惟一) */ private String thirdPartyOrderNo; /** * 充值卡消費金額[截獲,選填] */ private Double rechargeableCardConsumeAmount; /** * 原始充值卡消費金額[截獲,選填] */ private Double defaultRechargeableCardConsumeAmount; /** * 實付金額[截獲,選填] */ private Double outOfPocketAmount; /** * 原始實付金額[截獲,選填] */ private Double defaultOutOfPocketAmount; /** * 充值金額[截獲,選填] */ private Double rechargeAmount; /** * 原始充值金額[截獲,選填] */ private Double defaultRechargeAmount; /** * 會員價[截獲,選填] */ private Double memberPrice; /** * 原始會員價[截獲,選填] */ private Double defaultMemberPrice; /** * 會員折扣率[截獲,選填] */ private Double memberDiscountrate; /** * 原始會員折扣率[截獲,選填] */ private Double defaultMemberDiscountrate; /** * 會員累計消費[截獲,選填] */ private Double memberTotalConsumption; /** * 原始會員累計消費[截獲,選填] */ private Double defaultMemberTotalConsumption; /** * 外賣單 1:外賣單 2:非外賣單[截獲,選填] */ private String takeout; /** * 優惠商品詳情(支持多項)(截獲),{"name":"可樂","price":5.01} */ private List<String> discountDetails; public String getThirdPartyOrderNo() { return thirdPartyOrderNo; } public void setThirdPartyOrderNo(String thirdPartyOrderNo) { this.thirdPartyOrderNo = thirdPartyOrderNo; } public String getRemarks() { return remarks; } public void setRemarks(String remarks) { this.remarks = remarks; } public String getTemplateName() { return templateName; } public void setTemplateName(String templateName) { this.templateName = templateName; } public String getDefaultPrintDate() { return this.defaultPrintDate; } public void setDefaultPrintDate(String defaultPrintDate) { this.defaultPrintDate = defaultPrintDate; } public String getDefaultInTime() { return this.defaultInTime; } public void setDefaultInTime(String defaultInTime) { this.defaultInTime = defaultInTime; } public String getDefaultOutTime() { return this.defaultOutTime; } public void setDefaultOutTime(String defaultOutTime) { this.defaultOutTime = defaultOutTime; } public String getCouponNum() { return this.couponNum; } public void setCouponNum(String couponNum) { this.couponNum = couponNum; } public String getErpMemberCard() { return this.erpMemberCard; } public void setErpMemberCard(String erpMemberCard) { this.erpMemberCard = erpMemberCard; } public String getVoucherType() { return this.voucherType; } public void setVoucherType(String voucherType) { this.voucherType = voucherType; } public String getAnalyzPath() { return this.analyzPath; } public void setAnalyzPath(String analyzPath) { this.analyzPath = analyzPath; } public List<Double> getModifyAmountList() { return this.modifyAmountList; } public String getUniqueId() { return this.uniqueId; } public void setUniqueId(String uniqueId) { this.uniqueId = uniqueId; } public String getIfqrcode() { return this.ifqrcode; } public void setIfqrcode(String ifqrcode) { this.ifqrcode = ifqrcode; } public void setModifyAmountList(List<Double> modifyAmountList) { this.modifyAmountList = modifyAmountList; } public Double getModifyAmount() { return this.modifyAmount; } public void setModifyAmount(Double modifyAmount) { this.modifyAmount = modifyAmount; } public String getDefaultSaleTime() { return this.defaultSaleTime; } public void setDefaultSaleTime(String defaultSaleTime) { this.defaultSaleTime = defaultSaleTime; } public String getBillMergeKey() { return this.billMergeKey; } public void setBillMergeKey(String billMergeKey) { this.billMergeKey = billMergeKey; } public String getCustName() { return custName; } public void setCustName(String custName) { this.custName = custName; } public String getCustTaxNo() { return custTaxNo; } public void setCustTaxNo(String custTaxNo) { this.custTaxNo = custTaxNo; } public String getCustAdress() { return custAdress; } public void setCustAdress(String custAdress) { this.custAdress = custAdress; } public String getCustBankAccount() { return custBankAccount; } public void setCustBankAccount(String custBankAccount) { this.custBankAccount = custBankAccount; } public List<String> getUniqueIdHis() { return uniqueIdHis; } public void setUniqueIdHis(List<String> uniqueIdHis) { this.uniqueIdHis = uniqueIdHis; } public Double getRechargeableCardConsumeAmount() { return rechargeableCardConsumeAmount; } public void setRechargeableCardConsumeAmount(Double rechargeableCardConsumeAmount) { this.rechargeableCardConsumeAmount = rechargeableCardConsumeAmount; } public Double getDefaultRechargeableCardConsumeAmount() { return defaultRechargeableCardConsumeAmount; } public void setDefaultRechargeableCardConsumeAmount(Double defaultRechargeableCardConsumeAmount) { this.defaultRechargeableCardConsumeAmount = defaultRechargeableCardConsumeAmount; } public Double getOutOfPocketAmount() { return outOfPocketAmount; } public void setOutOfPocketAmount(Double outOfPocketAmount) { this.outOfPocketAmount = outOfPocketAmount; } public Double getDefaultOutOfPocketAmount() { return defaultOutOfPocketAmount; } public void setDefaultOutOfPocketAmount(Double defaultOutOfPocketAmount) { this.defaultOutOfPocketAmount = defaultOutOfPocketAmount; } public Double getRechargeAmount() { return rechargeAmount; } public void setRechargeAmount(Double rechargeAmount) { this.rechargeAmount = rechargeAmount; } public Double getDefaultRechargeAmount() { return defaultRechargeAmount; } public void setDefaultRechargeAmount(Double defaultRechargeAmount) { this.defaultRechargeAmount = defaultRechargeAmount; } public Double getMemberPrice() { return memberPrice; } public void setMemberPrice(Double memberPrice) { this.memberPrice = memberPrice; } public Double getDefaultMemberPrice() { return defaultMemberPrice; } public void setDefaultMemberPrice(Double defaultMemberPrice) { this.defaultMemberPrice = defaultMemberPrice; } public Double getMemberDiscountrate() { return memberDiscountrate; } public void setMemberDiscountrate(Double memberDiscountrate) { this.memberDiscountrate = memberDiscountrate; } public Double getDefaultMemberDiscountrate() { return defaultMemberDiscountrate; } public void setDefaultMemberDiscountrate(Double defaultMemberDiscountrate) { this.defaultMemberDiscountrate = defaultMemberDiscountrate; } public Double getMemberTotalConsumption() { return memberTotalConsumption; } public void setMemberTotalConsumption(Double memberTotalConsumption) { this.memberTotalConsumption = memberTotalConsumption; } public Double getDefaultMemberTotalConsumption() { return defaultMemberTotalConsumption; } public void setDefaultMemberTotalConsumption(Double defaultMemberTotalConsumption) { this.defaultMemberTotalConsumption = defaultMemberTotalConsumption; } public String getTakeout() { return takeout; } public void setTakeout(String takeout) { this.takeout = takeout; } public List<String> getDiscountDetails() { return discountDetails; } public void setDiscountDetails(List<String> discountDetails) { this.discountDetails = discountDetails; } public String getId() { return this.id; } public void setId(String id) { this.id = id; } public String getShopId() { return this.shopId; } public void setShopId(String shopId) { this.shopId = shopId; } public String getShopName() { return this.shopName; } public void setShopName(String shopName) { this.shopName = shopName; } public String getShopEntityId() { return this.shopEntityId; } public void setShopEntityId(String shopEntityId) { this.shopEntityId = shopEntityId; } public String getShopEntityName() { return this.shopEntityName; } public void setShopEntityName(String shopEntityName) { this.shopEntityName = shopEntityName; } public String getCreateTime() { return this.createTime; } public void setCreateTime(String createTime) { this.createTime = createTime; } public String getCTimeStamp() { return this.cTimeStamp; } public void setCTimeStamp(String cTimeStamp) { this.cTimeStamp = cTimeStamp; } public String getHcTime() { return this.hcTime; } public void setHcTime(String hcTime) { this.hcTime = hcTime; } public String getHserial() { return this.hserial; } public void setHserial(String hserial) { this.hserial = hserial; } public String getFixTerminal() { return this.fixTerminal; } public void setFixTerminal(String fixTerminal) { this.fixTerminal = fixTerminal; } public String getTerminalNumber() { return this.terminalNumber; } public void setTerminalNumber(String terminalNumber) { this.terminalNumber = terminalNumber; } public String getBillfileName() { return this.billfileName; } public void setBillfileName(String billfileName) { this.billfileName = billfileName; } public List<String> getBillfileNameHis() { return this.billfileNameHis; } public void setBillfileNameHis(List<String> billfileNameHis) { this.billfileNameHis = billfileNameHis; } public String getBillNo() { return this.billNo; } public void setBillNo(String billNo) { this.billNo = billNo; } public String getInterceptTime() { return this.interceptTime; } public void setInterceptTime(String interceptTime) { this.interceptTime = interceptTime; } public String getShopEntityFullName() { return this.shopEntityFullName; } public void setShopEntityFullName(String shopEntityFullName) { this.shopEntityFullName = shopEntityFullName; } public String getShopEntityAddress() { return this.shopEntityAddress; } public void setShopEntityAddress(String shopEntityAddress) { this.shopEntityAddress = shopEntityAddress; } public String getTelephone() { return this.telephone; } public void setTelephone(String telephone) { this.telephone = telephone; } public String getSaler() { return this.saler; } public void setSaler(String saler) { this.saler = saler; } public String getCheckstand() { return this.checkstand; } public void setCheckstand(String checkstand) { this.checkstand = checkstand; } public String getCashier() { return this.cashier; } public void setCashier(String cashier) { this.cashier = cashier; } public Double getReceivableAmount() { return this.receivableAmount; } public void setReceivableAmount(Double receivableAmount) { this.receivableAmount = receivableAmount; } public Double getTotalNum() { return this.totalNum; } public void setTotalNum(Double totalNum) { this.totalNum = totalNum; } public String getBillSerialNumber() { return this.billSerialNumber; } public void setBillSerialNumber(String billSerialNumber) { this.billSerialNumber = billSerialNumber; } public Double getTotalFee() { return this.totalFee; } public void setTotalFee(Double totalFee) { this.totalFee = totalFee; } public Double getPaidAmount() { return this.paidAmount; } public void setPaidAmount(Double paidAmount) { this.paidAmount = paidAmount; } public Double getDiscountAmount() { return this.discountAmount; } public void setDiscountAmount(Double discountAmount) { this.discountAmount = discountAmount; } public Double getCouponAmount() { return this.couponAmount; } public void setCouponAmount(Double couponAmount) { this.couponAmount = couponAmount; } public Double getChangeAmount() { return this.changeAmount; } public void setChangeAmount(Double changeAmount) { this.changeAmount = changeAmount; } public List<String> getSettlementWay() { return this.settlementWay; } public void setSettlementWay(List<String> settlementWay) { this.settlementWay = settlementWay; } public String getSaleTime() { return saleTime; } public void setSaleTime(String saleTime) { this.saleTime = saleTime; } public String getMemberCardNumber() { return this.memberCardNumber; } public void setMemberCardNumber(String memberCardNumber) { this.memberCardNumber = memberCardNumber; } public Double getTotalConsumption() { return this.totalConsumption; } public void setTotalConsumption(Double totalConsumption) { this.totalConsumption = totalConsumption; } public String getWebsite() { return this.website; } public void setWebsite(String website) { this.website = website; } public String getBillImage() { return this.billImage; } public void setBillImage(String billImage) { this.billImage = billImage; } public List<String> getGoodsDetails() { return this.goodsDetails; } public void setGoodsDetails(List<String> goodsDetails) { this.goodsDetails = goodsDetails; } public String getRoomNo() { return this.roomNo; } public void setRoomNo(String roomNo) { this.roomNo = roomNo; } public String getCheckinName() { return this.checkinName; } public void setCheckinName(String checkinName) { this.checkinName = checkinName; } public String getDeskNo() { return this.deskNo; } public void setDeskNo(String deskNo) { this.deskNo = deskNo; } public Double getConsumeNum() { return this.consumeNum; } public void setConsumeNum(Double consumeNum) { this.consumeNum = consumeNum; } public String getBillText() { return this.billText; } public void setBillText(String billText) { this.billText = billText; } public Map<String, String> getCustomRecord() { return this.customRecord; } public void setCustomRecord(Map<String, String> customRecord) { this.customRecord = customRecord; } public String getBillType() { return this.billType; } public void setBillType(String billType) { this.billType = billType; } public String getInTime() { return this.inTime; } public String getOutTime() { return this.outTime; } public void setInTime(String inTime) { this.inTime = inTime; } public void setOutTime(String outTime) { this.outTime = outTime; } public String getBillMatchKey() { return this.billMatchKey; } public void setBillMatchKey(String billMatchKey) { this.billMatchKey = billMatchKey; } public String getMatchId() { return this.matchId; } public void setMatchId(String matchId) { this.matchId = matchId; } public String getPrintMatchType() { return this.printMatchType; } public void setPrintMatchType(String printMatchType) { this.printMatchType = printMatchType; } public String getTsuuid() { return this.tsuuid; } public void setTsuuid(String tsuuid) { this.tsuuid = tsuuid; } public String getUploadType() { return this.uploadType; } public void setUploadType(String uploadType) { this.uploadType = uploadType; } public String getCustomerName() { return this.customerName; } public void setCustomerName(String customerName) { this.customerName = customerName; } public String getMembershipId() { return this.membershipId; } public void setMembershipId(String membershipId) { this.membershipId = membershipId; } public String getMemberLevels() { return this.memberLevels; } public void setMemberLevels(String memberLevels) { this.memberLevels = memberLevels; } public String getPetName() { return this.petName; } public void setPetName(String petName) { this.petName = petName; } public String getPetNumber() { return this.petNumber; } public void setPetNumber(String petNumber) { this.petNumber = petNumber; } public String getPrincipal() { return this.principal; } public void setPrincipal(String principal) { this.principal = principal; } public String getDeposit() { return this.deposit; } public void setDeposit(String deposit) { this.deposit = deposit; } public String getPrintDate() { return this.printDate; } public void setPrintDate(String printDate) { this.printDate = printDate; } public String getDefaultInterceptTime() { return this.defaultInterceptTime; } public void setDefaultInterceptTime(String defaultInterceptTime) { this.defaultInterceptTime = defaultInterceptTime; } public String getModifyType() { return this.modifyType; } public void setModifyType(String modifyType) { this.modifyType = modifyType; } public String getBillSource() { return this.billSource; } public void setBillSource(String billSource) { this.billSource = billSource; } public String getQrCode() { return qrCode; } public void setQrCode(String qrCode) { this.qrCode = qrCode; } public String getIntegralmark() { return integralmark; } public void setIntegralmark(String integralmark) { this.integralmark = integralmark; } public String getThisintegral() { return thisintegral; } public void setThisintegral(String thisintegral) { this.thisintegral = thisintegral; } public List<String> getModifyAmountSaleTimeList() { return modifyAmountSaleTimeList; } public void setModifyAmountSaleTimeList(List<String> modifyAmountSaleTimeList) { this.modifyAmountSaleTimeList = modifyAmountSaleTimeList; } public List<String> getModifyAmountInterceptTimeList() { return modifyAmountInterceptTimeList; } public void setModifyAmountInterceptTimeList(List<String> modifyAmountInterceptTimeList) { this.modifyAmountInterceptTimeList = modifyAmountInterceptTimeList; } public Double getDefaultReceivableAmount() { return defaultReceivableAmount; } public void setDefaultReceivableAmount(Double defaultReceivableAmount) { this.defaultReceivableAmount = defaultReceivableAmount; } public Double getDefaultTotalNum() { return defaultTotalNum; } public void setDefaultTotalNum(Double defaultTotalNum) { this.defaultTotalNum = defaultTotalNum; } public Double getDefaultTotalFee() { return defaultTotalFee; } public void setDefaultTotalFee(Double defaultTotalFee) { this.defaultTotalFee = defaultTotalFee; } public Double getDefaultPaidAmount() { return defaultPaidAmount; } public void setDefaultPaidAmount(Double defaultPaidAmount) { this.defaultPaidAmount = defaultPaidAmount; } public Double getDefaultDiscountAmount() { return defaultDiscountAmount; } public void setDefaultDiscountAmount(Double defaultDiscountAmount) { this.defaultDiscountAmount = defaultDiscountAmount; } public Double getDefaultCouponAmount() { return defaultCouponAmount; } public void setDefaultCouponAmount(Double defaultCouponAmount) { this.defaultCouponAmount = defaultCouponAmount; } public Double getDefaultChangeAmount() { return defaultChangeAmount; } public void setDefaultChangeAmount(Double defaultChangeAmount) { this.defaultChangeAmount = defaultChangeAmount; } public Double getDefaultTotalConsumption() { return defaultTotalConsumption; } public void setDefaultTotalConsumption(Double defaultTotalConsumption) { this.defaultTotalConsumption = defaultTotalConsumption; } public Double getDefaultConsumeNum() { return defaultConsumeNum; } public void setDefaultConsumeNum(Double defaultConsumeNum) { this.defaultConsumeNum = defaultConsumeNum; } public String getRowKey() { return rowKey; } public void setRowKey(String rowKey) { this.rowKey = rowKey; } @Override public String toString() { return id + "\t" + rowKey + "\t" + shopId + "\t" + shopName + "\t" + shopEntityId + "\t" + shopEntityName + "\t" + createTime + "\t" + cTimeStamp + "\t" + hcTime + "\t" + hserial + "\t" + fixTerminal + "\t" + terminalNumber + "\t" + billfileName + "\t" + tsuuid + "\t" + billfileNameHis + "\t" + billNo + "\t" + interceptTime + "\t" + shopEntityFullName + "\t" + shopEntityAddress + "\t" + telephone + "\t" + saler + "\t" + checkstand + "\t" + cashier + "\t" + receivableAmount + "\t" + defaultReceivableAmount + "\t" + totalNum + "\t" + defaultTotalNum + "\t" + billSerialNumber + "\t" + totalFee + "\t" + defaultTotalFee + "\t" + paidAmount + "\t" + defaultPaidAmount + "\t" + discountAmount + "\t" + defaultDiscountAmount + "\t" + couponAmount + "\t" + defaultCouponAmount + "\t" + changeAmount + "\t" + defaultChangeAmount + "\t" + settlementWay + "\t" + saleTime + "\t" + memberCardNumber + "\t" + totalConsumption + "\t" + defaultTotalConsumption + "\t" + website + "\t" + billImage + "\t" + goodsDetails + "\t" + roomNo + "\t" + checkinName + "\t" + deskNo + "\t" + consumeNum + "\t" + defaultConsumeNum + "\t" + billText + "\t" + inTime + "\t" + outTime + "\t" + defaultPrintDate + "\t" + defaultInTime + "\t" + defaultOutTime + "\t" + uploadType + "\t" + customRecord + "\t" + billType + "\t" + modifyType + "\t" + billSource + "\t" + analyzPath + "\t" + billMatchKey + "\t" + matchId + "\t" + defaultSaleTime + "\t" + customerName + "\t" + membershipId + "\t" + memberLevels + "\t" + petName + "\t" + petNumber + "\t" + principal + "\t" + deposit + "\t" + printDate + "\t" + defaultInterceptTime + "\t" + printMatchType + "\t" + billMergeKey + "\t" + modifyAmount + "\t" + modifyAmountList + "\t" + modifyAmountSaleTimeList + "\t" + modifyAmountInterceptTimeList + "\t" + uniqueId + "\t" + uniqueIdHis + "\t" + ifqrcode + "\t" + couponNum + "\t" + erpMemberCard + "\t" + voucherType + "\t" + qrCode + "\t" + integralmark + "\t" + thisintegral + "\t" + remarks + "\t" + templateName + "\t" + custName + "\t" + custTaxNo + "\t" + custAdress + "\t" + custBankAccount + "\t" + thirdPartyOrderNo + "\t" + rechargeableCardConsumeAmount + "\t" + defaultRechargeableCardConsumeAmount + "\t" + outOfPocketAmount + "\t" + defaultOutOfPocketAmount + "\t" + rechargeAmount + "\t" + defaultRechargeAmount + "\t" + memberPrice + "\t" + defaultMemberPrice + "\t" + memberDiscountrate + "\t" + defaultMemberDiscountrate + "\t" + memberTotalConsumption + "\t" + defaultMemberTotalConsumption + "\t" + takeout + "\t" + discountDetails; } public static BillInfoFmt cloneBill(BillInfo fromBean) { if (null == fromBean) { return null; } BillInfoFmt toBean = new BillInfoFmt(); toBean.setId(stringRemove(fromBean.getId())); toBean.setRowKey(stringRemove(fromBean.getRowKey())); toBean.setShopId(stringRemove(fromBean.getShopId())); toBean.setShopName(stringRemove(fromBean.getShopName())); toBean.setShopEntityId(stringRemove(fromBean.getShopEntityId())); toBean.setShopEntityName(stringRemove(fromBean.getShopEntityName())); toBean.setCreateTime(DateTimeUtils.getYmdhmsForNo(fromBean.getCreateTime())); toBean.setCTimeStamp(DateTimeUtils.getYmdhmsForNo(fromBean.getCTimeStamp())); toBean.setHcTime(stringRemove(fromBean.getHcTime())); toBean.setHserial(stringRemove(fromBean.getHserial())); toBean.setFixTerminal(stringRemove(fromBean.getFixTerminal())); toBean.setTerminalNumber(stringRemove(fromBean.getTerminalNumber())); toBean.setBillfileName(stringRemove(fromBean.getBillfileName())); toBean.setTsuuid(stringRemove(fromBean.getTsuuid())); toBean.setBillfileNameHis(fromBean.getBillfileNameHis()); toBean.setBillNo(stringRemove(fromBean.getBillNo())); toBean.setInterceptTime(DateTimeUtils.getYmdhmsForNo(fromBean.getInterceptTime())); toBean.setShopEntityFullName(stringRemove(fromBean.getShopEntityFullName())); toBean.setShopEntityAddress(stringRemove(fromBean.getShopEntityAddress())); toBean.setTelephone(stringRemove(fromBean.getTelephone())); toBean.setSaler(stringRemove(fromBean.getSaler())); toBean.setCheckstand(stringRemove(fromBean.getCheckstand())); toBean.setCashier(stringRemove(fromBean.getCashier())); toBean.setReceivableAmount(fromBean.getReceivableAmount()); toBean.setDefaultReceivableAmount(fromBean.getDefaultReceivableAmount()); toBean.setTotalNum(fromBean.getTotalNum()); toBean.setDefaultTotalNum(fromBean.getDefaultTotalNum()); toBean.setBillSerialNumber(stringRemove(fromBean.getBillSerialNumber())); toBean.setTotalFee(fromBean.getTotalFee()); toBean.setDefaultTotalFee(fromBean.getDefaultTotalFee()); toBean.setPaidAmount(fromBean.getPaidAmount()); toBean.setDefaultPaidAmount(fromBean.getDefaultPaidAmount()); toBean.setDiscountAmount(fromBean.getDiscountAmount()); toBean.setDefaultDiscountAmount(fromBean.getDefaultDiscountAmount()); toBean.setCouponAmount(fromBean.getCouponAmount()); toBean.setDefaultCouponAmount(fromBean.getDefaultCouponAmount()); toBean.setChangeAmount(fromBean.getChangeAmount()); toBean.setDefaultChangeAmount(fromBean.getDefaultChangeAmount()); if (null != fromBean.getSettlementWay()) { List<SettlementWayInfo> rawList = fromBean.getSettlementWay(); List<String> list = new ArrayList<>(); for (SettlementWayInfo raw : rawList) { list.add(raw.toString()); } toBean.setSettlementWay(list); } toBean.setSaleTime(fromBean.getSaleTime()); toBean.setMemberCardNumber(stringRemove(fromBean.getMemberCardNumber())); toBean.setTotalConsumption(fromBean.getTotalConsumption()); toBean.setDefaultTotalConsumption(fromBean.getDefaultTotalConsumption()); toBean.setWebsite(stringRemove(fromBean.getWebsite())); toBean.setBillImage(stringRemove(fromBean.getBillImage())); if (null != fromBean.getGoodsDetails()) { List<GoodsDetailInfo> rawList = fromBean.getGoodsDetails(); List<String> list = new ArrayList<>(); for (GoodsDetailInfo raw : rawList) { list.add(raw.toString()); } toBean.setGoodsDetails(list); } toBean.setRoomNo(stringRemove(fromBean.getRoomNo())); toBean.setCheckinName(stringRemove(fromBean.getCheckinName())); toBean.setDeskNo(stringRemove(fromBean.getDeskNo())); toBean.setConsumeNum(fromBean.getConsumeNum()); toBean.setDefaultConsumeNum(fromBean.getDefaultConsumeNum()); toBean.setBillText(stringRemove(fromBean.getBillText())); toBean.setInTime(DateTimeUtils.getYmdhmsForNo(fromBean.getInTime())); toBean.setOutTime(DateTimeUtils.getYmdhmsForNo(fromBean.getOutTime())); toBean.setDefaultPrintDate(DateTimeUtils.getYmdhmsForNo(fromBean.getDefaultPrintDate())); toBean.setDefaultInTime(DateTimeUtils.getYmdhmsForNo(fromBean.getDefaultInTime())); toBean.setDefaultOutTime(DateTimeUtils.getYmdhmsForNo(fromBean.getDefaultOutTime())); toBean.setUploadType(stringRemove(fromBean.getUploadType())); toBean.setCustomRecord(fromBean.getCustomRecord()); toBean.setBillType(stringRemove(fromBean.getBillType())); toBean.setModifyType(stringRemove(fromBean.getModifyType())); toBean.setBillSource(stringRemove(fromBean.getBillSource())); toBean.setAnalyzPath(stringRemove(fromBean.getAnalyzPath())); toBean.setBillMatchKey(stringRemove(fromBean.getBillMatchKey())); toBean.setMatchId(stringRemove(fromBean.getMatchId())); toBean.setDefaultSaleTime(DateTimeUtils.getYmdhmsForNo(fromBean.getDefaultSaleTime())); toBean.setCustomerName(stringRemove(fromBean.getCustomerName())); toBean.setMembershipId(stringRemove(fromBean.getMembershipId())); toBean.setMemberLevels(stringRemove(fromBean.getMemberLevels())); toBean.setPetName(stringRemove(fromBean.getPetName())); toBean.setPetNumber(stringRemove(fromBean.getPetNumber())); toBean.setPrincipal(stringRemove(fromBean.getPrincipal())); toBean.setDeposit(stringRemove(fromBean.getDeposit())); toBean.setPrintDate(DateTimeUtils.getYmdhmsForNo(fromBean.getPrintDate())); toBean.setDefaultInterceptTime(DateTimeUtils.getYmdhmsForNo(fromBean.getDefaultInterceptTime())); toBean.setPrintMatchType(stringRemove(fromBean.getPrintMatchType())); toBean.setBillMergeKey(stringRemove(fromBean.getBillMergeKey())); toBean.setModifyAmount(fromBean.getModifyAmount()); toBean.setModifyAmountList(fromBean.getModifyAmountList()); toBean.setModifyAmountSaleTimeList(DateTimeUtils.getYmdhmsForNo(fromBean.getModifyAmountSaleTimeList())); toBean.setModifyAmountInterceptTimeList( DateTimeUtils.getYmdhmsForNo(fromBean.getModifyAmountInterceptTimeList())); toBean.setUniqueId(stringRemove(fromBean.getUniqueId())); toBean.setUniqueIdHis(fromBean.getUniqueIdHis()); toBean.setIfqrcode(stringRemove(fromBean.getIfqrcode())); toBean.setCouponNum(stringRemove(fromBean.getCouponNum())); toBean.setErpMemberCard(stringRemove(fromBean.getErpMemberCard())); toBean.setVoucherType(stringRemove(fromBean.getVoucherType())); toBean.setQrCode(stringRemove(fromBean.getQrCode())); toBean.setIntegralmark(stringRemove(fromBean.getIntegralmark())); toBean.setThisintegral(stringRemove(fromBean.getThisintegral())); toBean.setRemarks(stringRemove(fromBean.getRemarks())); toBean.setTemplateName(stringRemove(fromBean.getTemplateName())); toBean.setCustName(stringRemove(fromBean.getCustName())); toBean.setCustName(stringRemove(fromBean.getCustTaxNo())); toBean.setCustAdress(stringRemove(fromBean.getCustAdress())); toBean.setCustBankAccount(stringRemove(fromBean.getCustBankAccount())); toBean.setThirdPartyOrderNo(stringRemove(fromBean.getThirdPartyOrderNo())); toBean.setRechargeableCardConsumeAmount(fromBean.getRechargeableCardConsumeAmount()); toBean.setDefaultRechargeableCardConsumeAmount(fromBean.getDefaultRechargeableCardConsumeAmount()); toBean.setOutOfPocketAmount(fromBean.getOutOfPocketAmount()); toBean.setDefaultOutOfPocketAmount(fromBean.getDefaultOutOfPocketAmount()); toBean.setRechargeAmount(fromBean.getRechargeAmount()); toBean.setDefaultRechargeAmount(fromBean.getDefaultRechargeAmount()); toBean.setMemberPrice(fromBean.getMemberPrice()); toBean.setDefaultMemberPrice(fromBean.getDefaultMemberPrice()); toBean.setMemberDiscountrate(fromBean.getMemberDiscountrate()); toBean.setDefaultMemberDiscountrate(fromBean.getDefaultMemberDiscountrate()); toBean.setMemberTotalConsumption(fromBean.getMemberTotalConsumption()); toBean.setDefaultMemberTotalConsumption(fromBean.getDefaultMemberTotalConsumption()); toBean.setTakeout(stringRemove(fromBean.getTakeout())); if (null != fromBean.getDiscountDetails()) { List<DiscountDetailsInfo> rawList = fromBean.getDiscountDetails(); List<String> list = new ArrayList<>(); for (DiscountDetailsInfo raw : rawList) { list.add(raw.toString()); } toBean.setDiscountDetails(list); } return toBean; } /** * 原始數據格式化處理,待完善 * * @param jsonStr * @return */ private static String stringRemove(String strVal) { if (StringUtils.isEmpty(strVal)) { return ""; } strVal = strVal.replace("\t", ""); strVal = strVal.replace("\n", ""); strVal = strVal.replace("\r", ""); return strVal; } }
4.五、BCD裁剪實體類
package com.mengyao.graph.etl.apps.dashboard.beans; import java.io.Serializable; /** * Bill Consumer Dashboard Bean * @author mengyao * */ public class BCD implements Serializable { private static final long serialVersionUID = 1749406742944513387L; private String billId; private double receivableAmount; private String saleTime; private String sdt;//yyyyMMdd private int hour; private String billType; private String shopId; private String shopEntityId; public BCD() { super(); } public BCD(String billId, double receivableAmount, String saleTime, String billType, String shopId, String shopEntityId) { super(); this.billId = billId; this.receivableAmount = receivableAmount; this.saleTime = saleTime; setDateTime(saleTime); this.billType = billType; this.shopId = shopId; this.shopEntityId = shopEntityId; } public String getBillId() { return billId; } public void setBillId(String billId) { this.billId = billId; } public double getReceivableAmount() { return receivableAmount; } public void setReceivableAmount(double receivableAmount) { this.receivableAmount = receivableAmount; } public String getSaleTime() { return saleTime; } public void setSaleTime(String saleTime) { this.saleTime = saleTime; setDateTime(saleTime); } public String getSdt() { return sdt; } public void setSdt(String sdt) { this.sdt = sdt; } public int getHour() { return hour; } public void setHour(int hour) { this.hour = hour; } public String getBillType() { return billType; } public void setBillType(String billType) { this.billType = billType; } public String getShopId() { return shopId; } public void setShopId(String shopId) { this.shopId = shopId; } public String getShopEntityId() { return shopEntityId; } public void setShopEntityId(String shopEntityId) { this.shopEntityId = shopEntityId; } private void setDateTime(String saleTime) { if (null!=saleTime&&saleTime.length()==17) { this.sdt = saleTime.substring(0, 8); this.hour = Integer.parseInt(saleTime.substring(8, 10)); } } @Override public String toString() { return billId + "\t" + receivableAmount + "\t" + saleTime + "\t" + sdt + "\t" + billType + "\t" + shopId + "\t" + shopEntityId; } }
4.六、銷售分析業務實現類
package com.mengyao.graph.etl.apps.dashboard.service; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.functions; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.mengyao.graph.etl.apps.dashboard.beans.AreaProjSale; import com.mengyao.graph.etl.apps.dashboard.beans.AreaSaleTrendAll; import com.mengyao.graph.etl.apps.dashboard.beans.PeakTime; import com.mengyao.graph.etl.apps.dashboard.beans.ProjectTypeShopNumber; import com.mengyao.graph.etl.apps.dashboard.beans.ShopSaleRank; import com.mengyao.graph.etl.apps.dashboard.beans.TotalRefund; import com.mengyao.graph.etl.apps.dashboard.beans.TotalSale; import com.mengyao.graph.etl.apps.dashboard.beans.TotalSettlementBillNumber; import com.mengyao.utils.RedisUtil; /** * 大屏指標分析 * @author mengyao * */ public class SaleAnalysisService implements Serializable { private static final long serialVersionUID = 8289368096001689148L; //解決區域名稱hash重複問題 private static final String SALT="aAb12"; /** * 每日重置大屏指標值 */ @Deprecated public void reset() { RedisUtil.setObject("dtsbn_6", "{\"結帳單數\":{\"val\":7270}}\""); RedisUtil.setObject("dpt_7", "{\"高峯時段\":{\"val\":13}}\""); RedisUtil.setObject("dtr_5", "{\"退款金額\":{\"val\":7301.8}}\""); RedisUtil.setObject("dts_4", "{\"總銷售額\":{\"val\":1300523.8000000005}}\""); RedisUtil.setObject("curDay", "20190227\""); RedisUtil.setObject("dastfa_2_20190227", "{\"各區域銷售額發展趨勢\":{\"重慶天地\":[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,9.5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0],\"創智天地\":[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,43.3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0],\"上海瑞虹\":[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,16.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0],\"上海新天地\":[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,553.75,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]}}\""); RedisUtil.setObject("dasp_1", "{\"各區域銷售額佔比\":{\"重慶天地\":[{\"bn\":1,\"n\":\"重慶天地\",\"sa\":9.5}],\"創智天地\":[{\"bn\":3,\"n\":\"壹方\",\"sa\":43.3}],\"上海新天地\":[{\"bn\":4,\"n\":\"新天地時尚購物中心\",\"sa\":152.0},{\"bn\":3,\"n\":\"上海新天地南里北里\",\"sa\":74.0},{\"bn\":2,\"n\":\"湖濱道購物中心\",\"sa\":100.0},{\"bn\":10,\"n\":\"新天地廣場\",\"sa\":227.75}],\"上海瑞虹\":[{\"bn\":1,\"n\":\"瑞虹天地星星堂\",\"sa\":16.0}]}}\""); RedisUtil.setObject("dpc_8", "{\"各項目銷售額對比\":{\"重慶天地\":[{\"bn\":1,\"n\":\"重慶天地\",\"sa\":9.5}],\"創智天地\":[{\"bn\":3,\"n\":\"壹方\",\"sa\":43.3}],\"上海新天地\":[{\"bn\":4,\"n\":\"新天地時尚購物中心\",\"sa\":152.0},{\"bn\":3,\"n\":\"上海新天地南里北里\",\"sa\":74.0},{\"bn\":2,\"n\":\"湖濱道購物中心\",\"sa\":100.0},{\"bn\":10,\"n\":\"新天地廣場\",\"sa\":227.75}],\"上海瑞虹\":[{\"bn\":1,\"n\":\"瑞虹天地星星堂\",\"sa\":16.0}]}}\""); RedisUtil.setObject("dpac_10", "{\"各項目單均消費\":{\"重慶天地\":[{\"bn\":1,\"n\":\"重慶天地\",\"sa\":9.5}],\"創智天地\":[{\"bn\":3,\"n\":\"壹方\",\"sa\":43.3}],\"上海新天地\":[{\"bn\":4,\"n\":\"新天地時尚購物中心\",\"sa\":152.0},{\"bn\":3,\"n\":\"上海新天地南里北里\",\"sa\":74.0},{\"bn\":2,\"n\":\"湖濱道購物中心\",\"sa\":100.0},{\"bn\":10,\"n\":\"新天地廣場\",\"sa\":227.75}],\"上海瑞虹\":[{\"bn\":1,\"n\":\"瑞虹天地星星堂\",\"sa\":16.0}]}}\""); RedisUtil.setObject("dsfst_9", "{\"店鋪銷售排行\":[{\"pn\":\"新天地時尚購物中心\",\"sa\":152.0,\"sn\":\"GREYBOX COFFEE\"},{\"pn\":\"新天地廣場\",\"sa\":114.75,\"sn\":\"Arabica\"},{\"pn\":\"湖濱道購物中心\",\"sa\":100.0,\"sn\":\"LOKAL\"},{\"pn\":\"上海新天地南里北里\",\"sa\":74.0,\"sn\":\"哈肯鋪_手感烘焙\"},{\"pn\":\"壹方\",\"sa\":60.3,\"sn\":\"新一天便利\"},{\"pn\":\"新天地廣場\",\"sa\":41.0,\"sn\":\"奈雪的茶\"},{\"pn\":\"新天地廣場\",\"sa\":39.0,\"sn\":\"蒲石小點\"},{\"pn\":\"新天地廣場\",\"sa\":18.0,\"sn\":\"Fresh_Every_Day\"},{\"pn\":\"瑞虹天地星星堂\",\"sa\":16.0,\"sn\":\"老盛昌\"},{\"pn\":\"新天地廣場\",\"sa\":15.0,\"sn\":\"多幾谷\"}]}\""); RedisUtil.setObject("dptsc_11", "{\"各項目業態銷售額對比\":{\"上海新天地南里北里\":[{\"n\":\"餐飲\",\"sa\":74.0,\"sn\":3}],\"新天地廣場\":[{\"n\":\"餐飲\",\"sa\":227.75,\"sn\":10}],\"重慶天地\":[{\"n\":\"其它\",\"sa\":9.5,\"sn\":0}],\"湖濱道購物中心\":[{\"n\":\"餐飲\",\"sa\":100.0,\"sn\":2}],\"壹方\":[{\"n\":\"餐飲\",\"sa\":43.3,\"sn\":3}],\"瑞虹天地星星堂\":[{\"n\":\"餐飲\",\"sa\":16.0,\"sn\":1}],\"新天地時尚購物中心\":[{\"n\":\"餐飲\",\"sa\":152.0,\"sn\":4}]}}\""); RedisUtil.setObject("dpsn_12", "{\"各項目店鋪數量\":{\"上海新天地南里北里\":[{\"n\":\"餐飲\",\"sa\":74.0,\"sn\":3}],\"新天地廣場\":[{\"n\":\"餐飲\",\"sa\":227.75,\"sn\":10}],\"重慶天地\":[{\"n\":\"其它\",\"sa\":9.5,\"sn\":0}],\"湖濱道購物中心\":[{\"n\":\"餐飲\",\"sa\":100.0,\"sn\":2}],\"壹方\":[{\"n\":\"餐飲\",\"sa\":43.3,\"sn\":3}],\"瑞虹天地星星堂\":[{\"n\":\"餐飲\",\"sa\":16.0,\"sn\":1}],\"新天地時尚購物中心\":[{\"n\":\"餐飲\",\"sa\":152.0,\"sn\":4}]}}\""); } /** * 總銷售額 * @param ds */ public void totalSale(Dataset<Row> bill) { Row[] rows = (Row[])bill .filter("billType=1 and receivableAmount>=0") .agg(functions.sum(new Column("receivableAmount")).alias("totalSale"), functions.count("receivableAmount")) .head(1); Map<String, TotalSale> map = new HashMap<String, TotalSale>(); double totalSale=0D; if(rows.length>0){ Row row = rows[0]; if (!row.isNullAt(0)) { //銷售額 totalSale=row.getDouble(0); } if (!row.isNullAt(1)) { //結帳單數 totalSettlementBillNumber(new TotalSettlementBillNumber(row.getLong(1))); } } map.put("總銷售額", new TotalSale(totalSale)); String str = JSONObject.toJSONString(map); System.out.println("====####--totalSale##" + str); // 將總銷售額存入redis RedisUtil.setObject("dts_4", str); } /** * 退款金額 * @param ds */ public void totalRefund(Dataset<Row> bill) { Row[] rows = (Row[]) bill .filter("((billType=6) or (billType=1 and receivableAmount<0))") .agg(functions.sum(functions.abs(new Column("receivableAmount"))).alias("totalSale")) .head(1); Map<String, TotalRefund> map=new HashMap<>(); map.put("退款金額", new TotalRefund(0)); if(rows.length>0){ Row row = rows[0]; if (!row.isNullAt(0)) { map.put("退款金額", new TotalRefund(row.getDouble(0))); } } String str=JSONObject.toJSONString(map); System.out.println("====####--totalRefund##"+str); //將退款金額存入redis RedisUtil.setObject("dtr_5",str); } /** * 高峯時段 * 一、時段:小時; * 二、高峯:當日每小時結帳單數累計最大; * 三、高峯時段:全部mall累計每小時結帳單數; * @param ds * @return {"高峯時段":{"val":12}} */ public void peakTime(Dataset<Row> bill) { //帳單表自己有mallid(shopId) 所以無需關聯店鋪表 Map<String, PeakTime> map=new HashMap<>(); map.put("高峯時段", new PeakTime(8)); Row[] rows = (Row[])bill .filter("billType=1") .groupBy("hour") .agg(functions.sum("receivableAmount").alias("totalSale")) .orderBy(new Column("totalSale").desc()) .limit(1) .head(1); if(rows.length>0){ Row row = rows[0]; if (!row.isNullAt(0)) { map.put("高峯時段", new PeakTime(row.getAs(0))); } } String str=JSONObject.toJSONString(map); System.out.println("====####--peakTime##"+str); //將高峯時段放入redis RedisUtil.setObject("dpt_7", str); } /** * 各區域銷售發展趨勢-多個區域(每一個區域下有多個mall)當日0點~24點的累計銷售額 * @param bill * @param ruian * @param curDay */ public void areaSaleTrendForAll(Dataset<Row> bill, Dataset<Row> ruian,Set<String> areaSet, String curDay) { Row[] rows = (Row[])bill .filter("billType=1 and receivableAmount>0") .join(ruian, bill.col("shopId").equalTo(ruian.col("rmid")), "leftouter") .groupBy("area_cn", "hour") .agg(functions.sum("receivableAmount").alias("areaDayHourSale")) .orderBy("areaDayHourSale") .select("area_cn","areaDayHourSale","hour") .collect(); // Map<String, Map<String,Map<Integer, Double>>> map = new HashMap<>(); Map<String, Map<String,Collection<Double>>> map = new HashMap<>(); Map<String,AreaSaleTrendAll> maps=new HashMap<>(); if(rows.length>0){ for (Row row : rows) { String areaCn=null; double areaDayHourSale=0D; int saleHour=0; if(!row.isNullAt(0)){ areaCn=row.getString(0); } if(!row.isNullAt(1)){ areaDayHourSale=row.getDouble(1); } if(!row.isNullAt(2)){ saleHour=row.getInt(2); } if(maps.containsKey(areaCn+SALT)){ AreaSaleTrendAll ast=maps.get(areaCn+SALT); ast.getVals().put(saleHour, areaDayHourSale); }else{ HashMap<Integer,Double> vals=new HashMap<Integer,Double>(); vals.put(saleHour, areaDayHourSale); maps.put(areaCn+SALT, new AreaSaleTrendAll(areaCn, saleHour, areaDayHourSale)); } } } //填充沒有銷售額的區域記錄,使數據更加完整。 areaSet.forEach(areaCn->{ if(!maps.containsKey(areaCn+SALT)){ maps.put(areaCn+SALT, new AreaSaleTrendAll(areaCn)); } }); System.out.println("==####========填充hou====begin====================="); for (String key:maps.keySet()) { System.out.println("Key:"+key); } System.out.println("==####========填充hou=====end===================="); // Map<String,Map<Integer, Double>> rs=new HashMap<>(); // for(AreaSaleTrendAll asta:maps.values()){ // rs.put(asta.getAreaCn(), asta.getVals()); // } Map<String,Collection<Double>> rs=new HashMap<>(); for(AreaSaleTrendAll asta:maps.values()){ rs.put(asta.getAreaCn(), asta.getVals().values()); } map.put("各區域銷售額發展趨勢", rs); //轉成json 字符串 String str=JSONObject.toJSONString(map); System.out.println("====####--areaSaleTrendForAll##"+str); //放入redis RedisUtil.setObject("dastfa_2_"+curDay, str); //維護redis中最新日期 if(!RedisUtil.existsObject("curDay")){//若是爲空,說明是第一次運行,直接將當前日期設置到redis中 RedisUtil.setObject("curDay", curDay); }else{ String curDayRedis=(String) RedisUtil.getObject("curDay"); if((curDayRedis.compareTo(curDay))<0){//說明當前日期大於redis中的日期,更新 RedisUtil.setObject("curDay", curDay); } } } /** * 各項目銷售額對比 * 一、各項目:各個mall; * 二、單一項目銷售額:mall的當日開始營業時間到當前時間累計銷售額; */ public void projectContrast(Dataset<Row> bill, Dataset<Row> ruian,Map<String, Set<String>> ruianMallAll) { Row[] rows = (Row[])bill .filter("billType=1 and receivableAmount>0") .join(ruian, bill.col("shopId").equalTo(ruian.col("rmid")), "leftouter") .groupBy("name", "area_cn") .agg(functions.sum("receivableAmount").alias("mallDaySale"), functions.count("receivableAmount").alias("mallDayBillNum")) // .orderBy("area_cn") .select("name","area_cn","mallDaySale","mallDayBillNum") .collect(); Map<String,List<AreaProjSale>> rs=new HashMap<>(); if(rows.length>0) { for (Row row : rows) { String mallName=null; String areaCn=null; double mallDaySale=0D; long mallDayBillNum=0L; AreaProjSale obj=new AreaProjSale(); if(!row.isNullAt(0)){//項目、mall的名稱 mallName=row.getString(0); obj.setN(mallName); } if(!row.isNullAt(1)){//區域名稱 areaCn=row.getString(1); } if(!row.isNullAt(2)){//mall的日銷售額 mallDaySale=row.getDouble(2); obj.setSa(mallDaySale); } if(!row.isNullAt(3)){//mall的日結帳單數 mallDayBillNum=row.getLong(3); obj.setBn(mallDayBillNum); } //判斷是否有該區域 if(rs.containsKey(areaCn)){ rs.get(areaCn).add(obj); }else{//不存在該區域 List<AreaProjSale> list=new ArrayList<>(); list.add(obj); rs.put(areaCn, list); } } } //填充未產生帳單的數據,默認0 //獲取所有瑞安的區域和mallName的集合 Set<Entry<String,Set<String>>> entrySet = ruianMallAll.entrySet(); for(Map.Entry<String, Set<String>> entry:entrySet){ String areaCn=entry.getKey();//獲取區域名稱 //獲取每一個區域的標準mall的集合 Set<String> mallSet=entry.getValue(); if(rs.containsKey(areaCn)){//實際數據中已經存在該區域相關數據 //判斷實際數據mall是否完整 //用來存放實際數據中mallname的集合 Set<String> mallSetCur=new HashSet<>(); //用來存放沒有帳單的mall的集合 Set<String> mallSetNew=new HashSet<>(); //遍歷該區域下實際數據集合 並填充set for(AreaProjSale obj:rs.get(areaCn)){ mallSetCur.add(obj.getN()); } //求兩個set的差集合 將標準數據放入 mallSetNew.addAll(mallSet); //求差集合 標準數據-實際數據 獲得差集 mallSetNew.removeAll(mallSetCur); //遍歷差集合 ,填充默認值 mallSetNew.forEach(mn->{ rs.get(areaCn).add(new AreaProjSale(mn)); }); }else{//該區域不存在 //便利該區域下標準mall集合,逐個放入 List<AreaProjSale> list=new ArrayList<>(); mallSet.forEach(mn->{ list.add(new AreaProjSale(mn)); }); rs.put(areaCn, list); } } //各區域銷售額佔比 areaSaleProportion(rs); //各區域單均消費 projectAvgConsumer(rs); //轉成json Map<String, Map<String,List<AreaProjSale>>> pmap=new HashMap<>(); pmap.put("各項目銷售額對比", rs); //轉成json String str=JSONObject.toJSONString(pmap); System.out.println("====####--projectContrast##"+str); //1 8各項目銷售額對比dpc_8 RedisUtil.setObject("dpc_8", str); } /** * 全部mall中銷售額最高的top10店鋪 * @param bill * @param shop * @param ruian */ public void saleForShopTop10(Dataset<Row> bill, Dataset<Row> shop, Dataset<Row> ruian) { Row[] rows = (Row[])bill .filter("billType=1 and receivableAmount>0") .join(shop, bill.col("shopEntityId").equalTo(shop.col("shop_entity_id")), "leftouter") .join(ruian, bill.col("shopId").equalTo(ruian.col("rmid")), "leftouter") .groupBy("shop_entity_name", "name") .agg(functions.sum("receivableAmount").alias("shopDaySale")) .orderBy(new Column("shopDaySale").desc()) .select("shop_entity_name","name","shopDaySale") .limit(10) .head(10); List<ShopSaleRank> ssrList=new LinkedList<>(); if(rows.length>0) { for (Row row : rows) { ShopSaleRank ssr=new ShopSaleRank(); if(!row.isNullAt(0)){//店鋪名稱 ssr.setSn(row.getString(0)); } if(!row.isNullAt(1)){//項目/mall名稱 ssr.setPn(row.getString(1)); } if(!row.isNullAt(2)){//店鋪日銷售額 ssr.setSa(row.getDouble(2)); } ssrList.add(ssr); } } //轉成json Map<String, List<ShopSaleRank>> pmap=new HashMap<>(); pmap.put("店鋪銷售排行", ssrList); String str=JSON.toJSONString(pmap); System.out.println("====####--saleForShopTop10##"+str); //將10個店鋪日銷售額放入redis RedisUtil.setObject("dsfst_9",str); } /** * 各項目業態銷售額對比 * * @param session * @param beginYMDH * @param endYMDH */ public void projectTypeSaleContrast(Dataset<Row> bill, Dataset<Row> shop, Dataset<Row> type, Dataset<Row> ruian, Set<String> areaSet,Set<String> mallSet,Set<String> ruianTypeAll) { Row[] rows = (Row[])bill .filter("billType=1 and receivableAmount>0") .join(shop, shop.col("shop_entity_id").equalTo(bill.col("shopEntityId")), "leftouter") .join(type, type.col("id").equalTo(shop.col("shop_entity_type_root")), "leftouter") .join(ruian, ruian.col("rmid").equalTo(bill.col("shopId"))) .groupBy("name", "shop_type_name") .agg(functions.sum("receivableAmount").alias("shopTypeDaySale"), functions.countDistinct("shop_entity_id").alias("shopNum")) .select("name","shop_type_name","shopTypeDaySale","shopNum") .collect(); Map<String,List<ProjectTypeShopNumber>> map=new HashMap<>(); if(rows.length>0) { for (Row row : rows) { ProjectTypeShopNumber pac=new ProjectTypeShopNumber(); String mallName=null; if(!row.isNullAt(0)){//項目、mall名稱 mallName=row.getString(0); } if(!row.isNullAt(1)){//業態名稱 pac.setN(row.getString(1)); }else{ pac.setN("其它"); } if(!row.isNullAt(2)){//mall的業態日銷售額 pac.setSa(row.getDouble(2)); } if(!row.isNullAt(3)){//mall的業態店鋪數量 pac.setSn(row.getLong(3)); } if(map.containsKey(mallName)){//更新map中的list列表 List<ProjectTypeShopNumber> pacList=map.get(mallName); pacList.add(pac); }else{//新的mall 新建列表放入map List<ProjectTypeShopNumber> pacList=new LinkedList<>(); pacList.add(pac); map.put(mallName, pacList); } } } //爲沒有產生帳單的業態或mall填充默認數據,是數據看起來完整 mallSet.forEach(mallName->{ if(!map.containsKey(mallName)) {//沒有帳單的mall List<ProjectTypeShopNumber> list=new ArrayList<>(); //遍歷全量業態,進行填充 ruianTypeAll.forEach(type_->{ list.add(new ProjectTypeShopNumber(type_,0.0,0)); }); map.put(mallName,list); }else{//有帳單的mall //用來獲取實際帳單數據中已存在的業態 Set<String> ruianTypeCur = new HashSet<>(); //用來存放沒有帳單的業態 Set<String> ruianTypeNew = new HashSet<>(); //遍歷數據,提取實際數據中的業態 for(ProjectTypeShopNumber obj:map.get(mallName)){ ruianTypeCur.add(obj.getN()); } //將產生帳單的業態和全量業態求差即 ruianTypeNew.addAll(ruianTypeAll); ruianTypeNew.removeAll(ruianTypeCur); //將沒有實際帳單的業態數據填從(默認值填充) for(String type_:ruianTypeNew){ map.get(mallName).add(new ProjectTypeShopNumber(type_,0.0,0)); } } }); //爲==各項目店鋪數量==填充數據 projectShopNumber(map); //轉成json Map<String, Map<String,List<ProjectTypeShopNumber>>> pmap=new HashMap<>(); pmap.put("各項目業態銷售額對比", map); String str=JSON.toJSONString(pmap); System.out.println("====####--projectTypeSaleContrast##"+str); RedisUtil.setObject("dptsc_11",str); } //============================================================================================================= /** * 結帳單數 * * @param tsb */ public void totalSettlementBillNumber(TotalSettlementBillNumber tsb) { Map<String, TotalSettlementBillNumber> map=new HashMap<>(); map.put("結帳單數", tsb); String str=JSONObject.toJSONString(map); System.out.println("====####--totalSettlementBillNumber##"+str); //將結帳單數放入redis RedisUtil.setObject("dtsbn_6", str); } /** * 各區域銷售佔比 * 直接將數據封裝成json,無需額外處理 * * @param rs 已經封裝好的數據,請參考{@link com.mengyao.graph.etl.apps.dashboard.service.SaleAnalysisService.projectContrast()} */ private void areaSaleProportion(Map<String, List<AreaProjSale>> rs) { Map<String, Map<String,List<AreaProjSale>>> pmap=new HashMap<>(); pmap.put("各區域銷售額佔比 ", rs); //轉成json String str=JSONObject.toJSONString(pmap); System.out.println("====####--areaSaleProportion##"+str); //各區域銷售佔比dasp_1 RedisUtil.setObject("dasp_1", str); } /** * 各項目單均消費 * 版本二 營業時間24小時制 * 一、各項目:各個mall; * 二、單均消費:mall的當日開始營業時間到當前時間累計銷售額,應收額累計/結帳單累計; * 直接將數據封裝成json,無需額外處理 * * @param rs 已經封裝好的數據,請參考{@link com.mengyao.graph.etl.apps.dashboard.service.SaleAnalysisService.projectContrast()} */ private void projectAvgConsumer(Map<String, List<AreaProjSale>> rs) { Map<String, Map<String,List<AreaProjSale>>> pmap=new HashMap<>(); pmap.put("各項目單均消費", rs); //轉成json String str=JSONObject.toJSONString(pmap); System.out.println("====####--projectAvgConsumer##"+str); //10各項目單均消費dpac_10 RedisUtil.setObject("dpac_10", str); } /** * 各項目店鋪數量 * 數據填充自上面的方法 * * {@link projectTypeSaleContrast(SparkSession session,String beginYMDH,String endYMDH)} */ private void projectShopNumber(Map<String,List<ProjectTypeShopNumber>> map) { //轉成json Map<String, Map<String,List<ProjectTypeShopNumber>>> pmap=new HashMap<>(); pmap.put("各項目店鋪數量", map); String str=JSON.toJSONString(pmap); System.out.println("====####--projectShopNumber##"+str); RedisUtil.setObject("dpsn_12",str); } }