【大數據分析經常使用算法】九、馬爾科夫模型

簡介

這一章節咱們講解馬爾科夫模型。給定一組隨機變量(如顧客最近的交易日期),馬爾科夫模型只根據前一個狀態(前一個最近交易日期)的分部指示該變量最近的分佈。html

一、馬爾科夫鏈基本原理

令 $$ S = {S_1,S_2,...,S_n} $$ 是一個有限的狀態集,咱們但願獲得以下的結果: $$ P(S_n|S_{n-1},S_{n-2},...,S_{1}) \eqsim P(S_n|S_{n-1}) $$ 這個近似公式代表了一階馬爾科夫性質(markov property):系統在時間t+1的狀態只與系統在時間t的狀態有關。java

那麼,若是咱們在嚴謹一點,將這個關聯狀態再往前面推一個時間點,獲得的就是二階馬爾科夫模型: $$ P(S_n|S_{n-1},S_{n-2},...,S_{1}) \eqsim P(S_n|S_{n-1},S_{n-2}) $$c++

下面使用馬爾科夫假設描述聯合機率: $$ P(S_1,S_2,...,S_n) = \prod_{i=1}^n P(S_i,S_{i-1}) $$算法

總結apache

  • 若是一個隨機序列的分佈僅由其當前狀態肯定,則具備markov性質。具備這個性質的隨機過程稱爲馬爾科夫隨機過程(markov random process)。api

  • 對於可觀察的狀態序列(即狀態由數據可知),能夠獲得一個馬爾科夫鏈模型(markov chain model,MCM),咱們可使用這個模型來作一些預測。數組

  • 對於不可觀察狀態,會獲得一個隱式馬爾科夫模型(hidden markov model,HMM)ruby

接下來咱們給出將要用到的馬爾科夫鏈的形式化表示服務器

有限的狀態空間(state space) $$ S={S_1,S_2,...S_n} $$app

轉移機率(transition properties)

函數$f:S \times S \to R$:轉化結果爲一個N階方陣,其中N是狀態的數量,其中:

  • $0 \le f(a,b) \le 1$:即轉移機率的值在0到1之間;
  • $\sum_{b \in S} f(a,b) = 1$:即同一個狀態轉向當前全部可能的狀態的機率和爲1;

初始分佈(initial distribution)

函數$g:S \times R$,其中:

  • $0 \le g(a) \le 1$
  • $\sum_{a\in S}g(a) = 1$

馬爾科夫鏈是S中的一個隨機過程:

  • 時間0時,這個鏈的初始狀態用分佈函數$g$建立;
  • 若是時間t時馬爾科夫鏈的狀態爲$a$,則時間$t+1$時,對於各個$b \in S$,其狀態爲b的機率爲$f(a,b)$

二、馬爾科夫狀態表的運用

假設咱們已經獲得了一張馬爾科夫狀態表。以下表所示的城市天氣變化表:

今每天氣\明每天氣 晴天(sunny) 有雨(rainy) 多雲(cloudy) 有霧(foggy)
晴天(sunny) 0.6 0.1 0.2 0.1
有雨(rainy) 0.5 0.2 0.2 0.1
多雲(cloudy) 0.1 0.7 0.1 0.1
有霧(foggy) 0.0 0.3 0.4 0.3

咱們假設:

  • 城市天氣只包含四種可能,即晴天(sunny)、有雨(rainy)、多雲(cloudy)和有霧(foggy);
  • 一天中天氣不會變化;

咱們來解釋這張表的含義,其實咱們徹底能夠以4階方陣的形式表示,但爲了清晰說明,將其以表格展現。

其中每個數字表明:當前的天氣爲XX時,明天的天氣爲XX的機率。例如,第一行第一列的數據0.6,就表明的是當前天氣爲晴天時,明每天氣爲晴天的機率爲0.6.

咱們就能夠用馬爾科夫性質的思想回答下面的問題:

若是今天的天氣狀態是晴天,那麼明天是多雲並且後天有霧的機率是多大?根據馬爾科夫鏈的思想,明天是多餘的機率只與今天有關,後天有霧的機率只與明天有關,所以:

P = 0.2(第一行第三列數據) * 0.1(第三行第四列)
  = 0.02

可見,咱們的重點在於由數據推導出馬爾科夫狀態矩陣的過程。

二、分析客戶交易數據

假設咱們有這樣的一些交易數據:

55FRL8G23B,1381907878,2013-01-09,129
9SX0DJG9L4,1381907879,2013-01-09,34
0ANMD0T113,1381907880,2013-01-09,144
W1F4412PA8,1381907881,2013-01-09,26
22Z10EAYC3,1381907882,2013-01-09,167
3R56N17P1H,1381907883,2013-01-09,25
LK0P6K3DE4,1381907884,2013-01-09,25
3A4S4BMPMU,1381907885,2013-01-09,113
OF4CEO2814,1381907886,2013-01-09,138
9ICNYOFS41,1381907887,2013-01-09,79
N4EOB264U6,1381907888,2013-01-09,108
3C0WARCKYJ,1381907889,2013-01-09,204
PTT3BI00AZ,1381907890,2013-01-09,27
14UHHAVQ2Q,1381907891,2013-01-09,73
Z9GFE6TDKF,1381907892,2013-01-09,32
...

數據意義爲(交易ID,顧客ID,交易日期,交易金額)。

2.一、生成狀態序列

這一節的目標是將交易序列(transcation sequence)轉化爲一個狀態序列(state sequence)。咱們的應用的目的是爲了生成如下的輸出 $$ customerId(Date_1,Amount_1);(Date_2,Amount_2),...,(Date_n,Amount_n) $$ 其中日期按增序排序。

咱們要把這個輸出轉化爲一個狀態序列,以下所示: $$ customerId,State_1,State_2,...,State_n $$

咱們還須要將(purchase-date,amount)對的有序序列轉換爲一組馬爾科夫鏈狀態。狀態由一個代號(兩個字母)表示,各個字母的定義以下所示:

上一次交易後通過的時間 與前次交易相比的交易額
S:小 L:顯著小於
M:中 E:基本相同
L:大 G:顯著大於

所以,咱們能夠獲得9種狀態

狀態名 上一次交易後的通過時間與其次交易相比的交易額
SL 小:顯著小於
SE 小:基本相同
SG 小:顯著大於
ML 中:顯著小於
ME 中:基本相同
MG 中:顯著大於
LL 大:顯著小於
LE 大:基本相同
LG 大:顯著大於

能夠看到,咱們的馬爾科夫鏈模型有9個狀態(9*9矩陣)。

轉換過程可使用一些小腳本實現,可是若是數據量過大,仍是須要一個map過程。

2.二、使用MapReduce生成馬爾科夫狀態轉移矩陣

三、Spark實現

3.一、主類

下面是spark的主任務類。下一章節會列出所使用到的輔助類。

package com.sunrun.movieshow.algorithm.markov;

import com.sunrun.movieshow.algorithm.common.SparkHelper;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.io.Serializable;
import java.util.*;
public class SparkMarkov {
    public static void main(String[] args) {
        JavaSparkContext sc = SparkHelper.getSparkContext("markov");

        // 1.歸約到不一樣的分區
        JavaRDD<String> rdd = sc.textFile("data/markov").coalesce(8);

        // 2.獲得(顧客ID,(交易時間,交易額))
        JavaPairRDD<String, Tuple2<Long, Integer>> pairRDD = rdd.mapToPair(line -> {
            // tokens[0] = customer-id
            // tokens[1] = transaction-id
            // tokens[2] = purchase-date
            // tokens[3] = amount
            String[] tokens = line.split(",");
            if (tokens.length != 4) {
                return null;
            } else {
                long date = 0;
                try {
                    date = DateUtil.getDateAsMilliSeconds(tokens[2]);
                }
                catch(Exception e) {
                    // 會有異常數據
                }
                int amount = Integer.parseInt(tokens[3]);
                Tuple2<Long, Integer> V = new Tuple2<>(date, amount);
                return new Tuple2<>(tokens[0], V);
            }
        });
        /**
         * (V31E55G4FI,(1356969600000,123))
         * (301UNH7I2F,(1356969600000,148))
         * (PP2KVIR4LD,(1356969600000,163))
         * (AC57MM3WNV,(1356969600000,188))
         * ...
         */

        // 4.依據交易ID進行分組 - 用戶信息在最後的狀態鏈中是沒用的,但他能夠限定一些狀態切換的內在關係,
        // 這在不少算法過程當中都是隱含條件,須要經驗判斷。
        JavaPairRDD<String, Iterable<Tuple2<Long, Integer>>> customerRDD = pairRDD.groupByKey();
        /**
         * (V31E55G4FI,(1356969600000,123),1356969600000,148)....)
         * ...
         */

        // 5.建立馬爾科夫狀態序列 (c_id,<(time1,amount),(time2,amount)>) => (String)
        JavaPairRDD<String, List<String>> stateSequence =
                customerRDD.mapValues((Iterable<Tuple2<Long,Integer>> dateAndAmount) -> {
                    List<Tuple2<Long,Integer>> list = toList(dateAndAmount);
                    Collections.sort(list, TupleComparatorAscending.INSTANCE);
                    // now convert sorted list (be date) into a "state sequence"
                    List<String> stateSequence1 = toStateSequence(list);
                    return stateSequence1;
                });

        /**
         * stateSequence.saveAsTextFile("out/" + UUID.randomUUID());
         * 顧客id,與其相關的狀態序列
         * (K40E0LA5DL,[LL, MG, SG, SL, SG, MG, SL, SG, SL, SG, LL])
         * (ICF0KFGK12,[SG, SL, SE, SG, LL, LG])
         * (4N0B1U5HVG,[SG, ML, MG, SG, SL, SG, SL, SG, ML])
         * (3KJR1907D9,[SG, SL, ML, SG, ML, LG])
         * (47620I9LOD,[LG, SL, ML, MG, SG, SL, SG, SL, SG])
         */

        // 6.接下來,咱們將狀態序列以窗口爲2的方式依次移動,生成一個個形如((LL, MG),1)的對
        JavaPairRDD<Tuple2<String, String>, Integer> model = stateSequence.flatMapToPair(s -> {
            // 輸出形式((fromState,toState),times)
            ArrayList<Tuple2<Tuple2<String, String>, Integer>> mapperOutput = new ArrayList<>();
            List<String> states = s._2;

            if (states == null) {
                return Collections.emptyIterator();
            } else {
                for (int i = 0; i < states.size() - 1; i++) {
                    String fromState = states.get(i);
                    String toState = states.get(i + 1);
                    Tuple2<String, String> k = new Tuple2<>(fromState, toState);
                    mapperOutput.add(new Tuple2<>(k, 1));
                }
            }
            return mapperOutput.iterator();
        });
        /**
         * model.saveAsTextFile("out/model");
         * ((LG,LL),1)
         * ((LL,MG),1)
         * ((MG,SG),1)
         * ((SG,SL),1)
         * ((SL,ME),1)
         * ((ME,MG),1)
         *
         */

        // 7.咱們須要將這些結果組合歸約,將相同的狀態序列進行合併,即從(f,t),1) => ((f,t),3000)的形式。
        JavaPairRDD<Tuple2<String, String>, Integer> markovModel = model.reduceByKey((a, b) -> a + b);

        /**
         * markovModel.saveAsTextFile("/out/markov");
         * ((LL,SL),993)
         * ((MG,LL),1859)
         * ((LE,ME),25)
         * ((SL,ME),1490)
         * ((LG,ME),153)
         * ((ML,LG),3991)
         * ((ME,ME),58)
         */

        // 8.咱們格式化一下輸出,將其形式轉變爲(f,t,times)的形式
        JavaRDD<String> markovFormatModel = markovModel.map(t -> t._1._1 + "\t" + t._1._2 + "\t" + t._2);

        // 9.將結果存儲到HDFS服務器,固然也能夠存儲到本地,這時候解析類就要使用本地文件系統的API
        String markovFormatModelStorePath = "hdfs://10.21.1.24:9000/markov/";
        markovFormatModel.saveAsTextFile(markovFormatModelStorePath);

        // 9.將最終結果進行轉換,生成馬爾科夫機率模型
        MarkovTableBuilder.transport(markovFormatModelStorePath);
        /** 這是一個state階的方陣,A(ij)表示狀態i到狀態j的轉化機率
         * 0.03318,0.02441,0.6608,0.1373,0.007937,0.02398,0.06456,0.009522,0.03832
         * 0.3842,0.02532,0.2709,0.1260,0.009590,0.01331,0.1251,0.01083,0.03477
         * 0.6403,0.02881,0.05017,0.1487,0.01338,0.008602,0.08359,0.01446,0.01196
         * 0.02081,0.002368,0.7035,0.01170,0.004638,0.1518,0.03901,0.005933,0.06030
         * 0.4309,0.02140,0.2657,0.1277,0.009697,0.02006,0.08159,0.009530,0.03344
         * 0.1847,0.01816,0.5316,0.1303,0.007175,0.02351,0.06265,0.008136,0.03379
         * 0.01847,0.004597,0.6115,0.02068,0.003091,0.1346,0.06828,0.01424,0.1245
         * 0.2250,0.01900,0.2427,0.06291,0.006335,0.03182,0.2757,0.03801,0.09847
         * 0.2819,0.01944,0.1954,0.07990,0.008015,0.03202,0.2612,0.04427,0.07781
         */

    }



    // 將迭代器轉化爲數組
    static List<Tuple2<Long,Integer>> toList(Iterable<Tuple2<Long,Integer>> iterable) {
        List<Tuple2<Long,Integer>> list = new ArrayList<Tuple2<Long,Integer>>();
        for (Tuple2<Long,Integer> element: iterable) {
            list.add(element);
        }
        return list;
    }

    // 按時間進行排序
    static class TupleComparatorAscending implements Comparator<Tuple2<Long, Integer>>, Serializable {
        final static TupleComparatorAscending INSTANCE = new TupleComparatorAscending();
        @Override
        public int compare(Tuple2<Long, Integer> t1, Tuple2<Long, Integer> t2) {
            // return -t1._1.compareTo(t2._1);     // sorts RDD elements descending
            return t1._1.compareTo(t2._1);         // sorts RDD elements ascending
        }
    }

    // 將一個有序的交易序列(List<Tuple2<Date,Amount>>)轉化爲狀態序列(List<String>),其中各個元素分別表示一個馬爾科夫狀態。
    static List<String> toStateSequence(List<Tuple2<Long,Integer>> list){
        // 沒有足夠的數據
        if(list.size() < 2){
            return null;
        }

        List<String> stateSequence = new ArrayList<>();
        // == 兩兩配對計算結果
        Tuple2<Long, Integer> prior = list.get(0);
        for (int i = 1; i < list.size(); i++) {
            Tuple2<Long, Integer> current = list.get(i);

            // === 計算時間差(天),因爲數據是以ms計數的,所以須要轉化爲天(1d = 24*60*60*1000=86400000ms)
            long dayDiff = (current._1 - prior._1) / 86400000;

            // === 獲取交易額信息
            int priorAmount = prior._2;
            int currentAmount = current._2;

            // === 根據業務規則轉化爲字母表示
            // ==== 處理時間關係
            String dd = null;
            if(dayDiff < 30){
                dd = "S";
            }else if(dayDiff < 60){
                dd = "M";
            }else {
                dd = "L";
            }

            // ==== 處理金額關係: 使用兩次交易額的比重
            String ad = null;
            if(priorAmount < 0.9 * currentAmount){
                ad = "L";
            }else if(priorAmount < 1.1 * currentAmount){
                ad = "E";
            }else{
                ad = "G";
            }

            // === 組合結果
            String element = dd + ad;
            stateSequence.add(element);
            // 大小爲2的窗口前進一格
            prior = current;
        }
        return stateSequence;
    }
}

3.二、輔助類

日期處理類

package com.sunrun.movieshow.algorithm.markov;

import java.text.SimpleDateFormat;
import java.util.Date;

public class DateUtil {

	static final String DATE_FORMAT = "yyyy-MM-dd";
	static final SimpleDateFormat SIMPLE_DATE_FORMAT = 
	   new SimpleDateFormat(DATE_FORMAT);

    /**
     *  Returns the Date from a given dateAsString
     */
	public static Date getDate(String dateAsString)  {
        try {
        	return SIMPLE_DATE_FORMAT.parse(dateAsString);
        }
        catch(Exception e) {
        	return null;
        }
	}

    /**
     *  Returns the number of milliseconds since January 1, 1970, 
     *  00:00:00 GMT represented by this Date object.
     */
	public static long getDateAsMilliSeconds(Date date) throws Exception {
        return date.getTime();
	}
	
	
    /**
     *  Returns the number of milliseconds since January 1, 1970, 
     *  00:00:00 GMT represented by this Date object.
     */
	public static long getDateAsMilliSeconds(String dateAsString) throws Exception {
		Date date = getDate(dateAsString);	
        return date.getTime();
	}
	
	public static String getDateAsString(long timestamp) {
        return SIMPLE_DATE_FORMAT.format(timestamp);
	}	
	
}

二、輸入輸出工具類

package com.sunrun.movieshow.algorithm.markov;

import java.io.InputStream;
import java.io.OutputStream;
import java.io.BufferedReader;
//
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.util.LineReader;

/**
 * This class provides convenient methods for accessing 
 * some Input/Output methods.
 *
 * @author Mahmoud Parsian (mahmoud.parsian@yahoo.com)
 *
 */
public class InputOutputUtil {

    public static void close(LineReader reader) {
        if (reader == null) {
            return;
        }
        //
        try {
            reader.close();
        } 
        catch (Exception ignore) {
        }
    }

    public static void close(OutputStream stream) {
        if (stream == null) {
            return;
        }
        //
        try {
            stream.close();
        } 
        catch (Exception ignore) {
        }
    }

    public static void close(InputStream stream) {
        if (stream == null) {
            return;
        }
        //
        try {
            stream.close();
        } 
        catch (Exception ignore) {
        }
    }

    public static void close(FSDataInputStream stream) {
        if (stream == null) {
            return;
        }
        //
        try {
            stream.close();
        } 
        catch (Exception ignore) {
        }
    }

    public static void close(BufferedReader reader) {
        if (reader == null) {
            return;
        }
        //
        try {
            reader.close();
        } 
        catch (Exception ignore) {
        }
    }

}

三、馬爾科夫狀態處理類

package com.sunrun.movieshow.algorithm.markov;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 輸入數據生成馬爾科夫狀態表
 */
public class MarkovTableBuilder {
    // 狀態的數量
    private int numberOfState;

    private int scale;

    // 狀態表
    private double[][] table = null;

    // 狀態序列
    private Map<String, Integer> states = null;


    public MarkovTableBuilder(int numberOfState, int scale){
        this(numberOfState);
        this.scale = scale;
    }

    private MarkovTableBuilder(int numberOfState){
        this.numberOfState = numberOfState;
        table = new double[numberOfState][numberOfState];
        initStates();
    }

    // 初始化state狀態
    private void initStates(){
        states = new HashMap<String, Integer>();
        states.put("SL", 0);
        states.put("SE", 1);
        states.put("SG", 2);
        states.put("ML", 3);
        states.put("ME", 4);
        states.put("MG", 5);
        states.put("LL", 6);
        states.put("LE", 7);
        states.put("LG", 8);
    }

    // 將狀態信息添加到狀態表
    public void add(StateItem item){
        // 獲取該狀態對應的角標
        int row = states.get(item.fromState);
        int column = states.get(item.toState);
        table[row][column] = item.count;
    }

    public void normalize() {
        //
        // 拉普拉斯校訂:通常經過將全部的計數+1來進行。see: http://cs.nyu.edu/faculty/davise/ai/bayesText.html
        for (int r = 0; r < numberOfState; r++) {
            boolean gotZeroCount = false;
            for (int c = 0; c < numberOfState; c++) {
                if(table[r][c] == 0) {
                    gotZeroCount = true;
                    break;
                }
            }
            if (gotZeroCount) {
                for (int c = 0; c < numberOfState; c++) {
                    table[r][c] += 1;
                }
            }
        }
        // normalize
        for (int r = 0; r < numberOfState; r++) {
            double rowSum = getRowSum(r);
            for (int c = 0; c < numberOfState; c++) {
                table[r][c] = table[r][c] / rowSum;
            }
        }
    }

    // 獲取rowNumber行的累加結果
    public double getRowSum(int rowNumber) {
        double sum = 0.0;
        for (int column = 0; column < numberOfState; column++) {
            sum += table[rowNumber][column];
        }
        return sum;
    }

    /**
     * 存儲狀態表:做爲示例,只是將結果輸出便可。
     */
    private void persist() {
        for (int row = 0; row < numberOfState; row++) {
            StringBuilder builder = new StringBuilder();
            for (int column = 0; column < numberOfState; column++) {
                double element = table[row][column];
                builder.append(String.format("%.4g", element));
                if (column < (numberOfState - 1)) {
                    builder.append(",");
                }
            }
            System.out.println(builder.toString());
        }
    }



    public static void transport(String markovFormatDataPath){
        List<StateItem> items = ReadDataFromHDFS.readDirectory(markovFormatDataPath);
        MarkovTableBuilder markovTableBuilder = new MarkovTableBuilder(9);
        for (StateItem item : items) {
            markovTableBuilder.add(item);
        }

        // 歸一化數據
        markovTableBuilder.normalize();

        // 存儲馬爾科夫狀態表
        markovTableBuilder.persist();
        
    }
}

四、從文件系統讀取數據的處理類

package com.sunrun.movieshow.algorithm.markov;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

/**
 * Class containing a number of utility methods for manipulating 
 * Hadoop's SequenceFiles.
 *
 *
 * @author Mahmoud Parsian
 *
 */
public class ReadDataFromHDFS {

	private ReadDataFromHDFS() {
	}
	
	public static List<StateItem> readDirectory(String path) {
		return ReadDataFromHDFS.readDirectory(new Path(path));
	}
	
	public static List<StateItem> readDirectory(Path path) {
		FileSystem fs;
		try {
			fs = path.getFileSystem(new Configuration());
		} catch (IOException e) {
            System.out.println("Unable to access the hadoop file system!");
			throw new RuntimeException("Unable to access the hadoop file system!");
		}
		List<StateItem> list = new ArrayList<StateItem>();
		try {
			FileStatus[] stat = fs.listStatus(path);
			for (int i = 0; i < stat.length; ++i) {
				if (stat[i].getPath().getName().startsWith("part")) {
					List<StateItem> pairs = readFile(stat[i].getPath(), fs);
					list.addAll(pairs);
				}
			}
		} 
		catch (IOException e) {
            System.out.println("Unable to access the hadoop file system!");
			throw new RuntimeException("Error reading the hadoop file system!");
		}

		return list;		
	}	

	@SuppressWarnings("unchecked")
	public static List<StateItem> readFile(Path path, FileSystem fs) {
		List<StateItem> list = new ArrayList<StateItem>();
		FSDataInputStream stream = null;
		BufferedReader reader = null;
		try {
			stream = fs.open(path);
			reader = new BufferedReader(new InputStreamReader(stream));
			String line;
			while ((line = reader.readLine()) != null) {
				// line = <fromState><,><toState><TAB><count>
				String[] tokens = line.split("\t"); // TAB separator
				if (tokens.length == 3) {
					StateItem item = new StateItem(tokens[0], tokens[1], Integer.parseInt(tokens[2]));
					list.add(item);
				}
			}		
		}
		catch (IOException e) {
            System.out.println("readFileIntoCoxRegressionItem() failed!");
			throw new RuntimeException("readFileIntoCoxRegressionItem() failed!");
		}
		finally {
			InputOutputUtil.close(reader);
			InputOutputUtil.close(stream);
		}
			
		return list;
	}
	

	
	
	public static void main(String[] args) throws Exception {
		String path =  "hdfs://10.21.1.24:9000/markov/";
		List<StateItem> list = readDirectory(path);
        System.out.println("list="+list.toString());
	}
		
}

五、馬爾科夫狀態實體類

package com.sunrun.movieshow.algorithm.markov;

/**
 * TableItem represents an item of a Markov State Transition Model 
 * as a Tuple3<fromSate, toState, count>
 *
 */
public class StateItem  {
	String fromState;
	String toState;
	int count;
	
	public StateItem(String fromState, String toState, int count) {
		this.fromState = fromState;
		this.toState = toState;
		this.count = count;
	}
	
	/**
	 * for debugging ONLY
	 */
	public String toString() {
		return "{"+fromState+"," +toState+","+count+"}";
	}
}

四、使用最終的馬爾科夫表進行預測

前面我已經在代碼註釋中拿出了最後的分析結果:

SL SE SG ML ME MG LL LE LG
SL 0.03318 0.02441 0.6608 0.1373 0.007937 0.02398 0.06456 0.009522 0.03832
SE 0.3842 0.02532 0.2709 0.1260 0.009590 0.01331 0.1251 0.01083 0.03477
SG 0.6403 0.02881 0.05017 0.1487 0.01338 0.008602 0.08359 0.01446 0.01196
ML 0.02081 0.002368 0.7035 0.01170 0.004638 0.1518 0.03901 0.005933 0.06030
ME 0.4309 0.02140 0.2657 0.1277 0.009697 0.02006 0.08159 0.009530 0.03344
MG 0.1847 0.01816 0.5316 0.1303 0.007175 0.02351 0.06265 0.008136 0.03379
LL 0.01847 0.004597 0.6115 0.02068 0.003091 0.1346 0.06828 0.01424 0.1245
LE 0.2250 0.01900 0.2427 0.06291 0.006335 0.03182 0.2757 0.03801 0.09847
LG 0.2819 0.01944 0.1954 0.07990 0.008015 0.03202 0.2612 0.04427 0.07781

參考

數據算法 —— Mahmoud Parsian(很喜歡該做者寫的這本書,很是詳細)

附錄

一、ruby腳本

mark_plan.rb

#!/usr/bin/ruby

require '../lib/util.rb'      
require 'Date'

xaction_file = ARGV[0]
model_file = ARGV[1]
userXactions = {}
model = []
states = ["SL", "SE", "SG", "ML", "ME", "MG", "LL", "LE", "LG"]

# read all xactions
File.open(xaction_file, "r").each_line do |line|
	items =  line.split(",")
	custID = items[0]
	if (userXactions.key? custID)
		hist = userXactions[custID]
	else 
		hist = []
		userXactions[custID] = hist
	end	
	hist << items[2, items.size - 2]
	
end

#read model
File.open(model_file, "r").each_line do |line|
	items =  line.split(",")
	#puts "#{line}"
	row = []
	items.each do |item|
		row << item.to_i
		#puts "#{item}"
	end	
	model << row
	#puts "#{row}"
end

# marketing time
userXactions.each do |cid, xactions|
	seq = []
	last_date = Date.parse "2000-01-01"
	xactions.each_with_index do |xaction, index|	
		if (index > 0) 
			prevXaction = xactions[index-1]
			prDate = Date.parse prevXaction[0]
			prAmt = prevXaction[1].to_i
			date = Date.parse xaction[0]
			last_date = date
			amt = xaction[1].to_i
			daysDiff = date - prDate
			amtDiff = amt - prAmt
			
			if (daysDiff < 30)
				dd = "S"
			elsif (daysDiff < 60)
				dd = "M"
			else
				dd = "L"
			end
			
			if (prAmt < 0.9 * amt)
				ad = "L"
			elsif (prAmt < 1.1 * amt) 
				ad = "E"
			else 
				ad = "G"
			end
			seq << (dd + ad)
		end
	end
	
	if (!seq.empty?)
		last = seq[-1]
		row_index = states.index(last)
		row = model[row_index]
		max_col = row.max
		col_index = row.index(max_col) 
		next_state = states[col_index]
		
		if (next_state.start_with?("S"))
			next_date = last_date + 15
		elsif (next_state.start_with?("M"))
			next_date = last_date + 45
		else
			next_date = last_date + 90
		end
		
		#puts "#{cid}, #{last},  #{row_index}, #{max_col}, #{col_index}, #{next_state}, #{last_date}, #{next_date}"
		puts "#{cid}, #{next_date}"
	end
end

buy_xaction.rb

#!/usr/bin/ruby

require '../lib/util.rb'      
require 'Date'

custCount = ARGV[0].to_i
daysCount = ARGV[1].to_i
visitorPercent = ARGV[2].to_f

custIDs = []
xactionHist = {}

# transition probability matrix

idGen = IdGenerator.new
1.upto custCount do
	custIDs << idGen.generate(10)
end

xid = Time.now().to_i

date = Date.parse "2013-01-01"
1.upto daysCount do 
	numXaction = visitorPercent * custCount
	factor = 85 + rand(30)
	numXaction = (numXaction * factor) / 100
	
	1.upto numXaction do
		custID = custIDs[rand(custIDs.size)]
		if (xactionHist.key? custID)
			hist = xactionHist[custID]
			lastXaction = hist[-1]
			lastDate = lastXaction[0]
			lastAmt = lastXaction[1]
			numDays = date - lastDate
			if (numDays < 30) 
				amount = lastAmt < 40 ? 50 + rand(20) - 10  : 30 + rand(10) - 5
			elsif (numDays < 60)
				amount = lastAmt < 80 ? 100 + rand(40) - 20 : 60 + rand(20) - 10
			else 
				amount = lastAmt < 150 ? 180 + rand(60) - 30 :  120 + rand(40) - 20
			end						
		else
			hist = []
			xactionHist[custID] = hist	
			amount = 40 + rand(180)
		end
		xaction = []
		xaction << date
		xaction << amount
		hist << xaction
		xid = xid + 1
		puts "#{custID},#{xid},#{date},#{amount}"
		
	end

	date = date.next
end
相關文章
相關標籤/搜索