這一章節咱們講解馬爾科夫模型。給定一組隨機變量(如顧客最近的交易日期),馬爾科夫模型只根據前一個狀態(前一個最近交易日期)的分部指示該變量最近的分佈。html
令 $$ S = {S_1,S_2,...,S_n} $$ 是一個有限的狀態集,咱們但願獲得以下的結果: $$ P(S_n|S_{n-1},S_{n-2},...,S_{1}) \eqsim P(S_n|S_{n-1}) $$ 這個近似公式代表了一階馬爾科夫性質(markov property):系統在時間t+1的狀態只與系統在時間t的狀態有關。java
那麼,若是咱們在嚴謹一點,將這個關聯狀態再往前面推一個時間點,獲得的就是二階馬爾科夫模型: $$ P(S_n|S_{n-1},S_{n-2},...,S_{1}) \eqsim P(S_n|S_{n-1},S_{n-2}) $$c++
下面使用馬爾科夫假設描述聯合機率: $$ P(S_1,S_2,...,S_n) = \prod_{i=1}^n P(S_i,S_{i-1}) $$算法
總結:apache
若是一個隨機序列的分佈僅由其當前狀態肯定,則具備markov性質。具備這個性質的隨機過程稱爲馬爾科夫隨機過程(markov random process)。api
對於可觀察的狀態序列(即狀態由數據可知),能夠獲得一個馬爾科夫鏈模型(markov chain model,MCM),咱們可使用這個模型來作一些預測。數組
對於不可觀察狀態,會獲得一個隱式馬爾科夫模型(hidden markov model,HMM)ruby
接下來咱們給出將要用到的馬爾科夫鏈的形式化表示服務器
有限的狀態空間(state space) $$ S={S_1,S_2,...S_n} $$app
轉移機率(transition properties)
函數$f:S \times S \to R$:轉化結果爲一個N階方陣,其中N是狀態的數量,其中:
初始分佈(initial distribution)
函數$g:S \times R$,其中:
馬爾科夫鏈是S中的一個隨機過程:
假設咱們已經獲得了一張馬爾科夫狀態表。以下表所示的城市天氣變化表:
今每天氣\明每天氣 | 晴天(sunny) | 有雨(rainy) | 多雲(cloudy) | 有霧(foggy) |
---|---|---|---|---|
晴天(sunny) | 0.6 | 0.1 | 0.2 | 0.1 |
有雨(rainy) | 0.5 | 0.2 | 0.2 | 0.1 |
多雲(cloudy) | 0.1 | 0.7 | 0.1 | 0.1 |
有霧(foggy) | 0.0 | 0.3 | 0.4 | 0.3 |
咱們假設:
咱們來解釋這張表的含義,其實咱們徹底能夠以4階方陣的形式表示,但爲了清晰說明,將其以表格展現。
其中每個數字表明:當前的天氣爲XX時,明天的天氣爲XX的機率。例如,第一行第一列的數據0.6,就表明的是當前天氣爲晴天時,明每天氣爲晴天的機率爲0.6.
咱們就能夠用馬爾科夫性質的思想回答下面的問題:
若是今天的天氣狀態是晴天,那麼明天是多雲並且後天有霧的機率是多大?根據馬爾科夫鏈的思想,明天是多餘的機率只與今天有關,後天有霧的機率只與明天有關,所以:
P = 0.2(第一行第三列數據) * 0.1(第三行第四列) = 0.02
可見,咱們的重點在於由數據推導出馬爾科夫狀態矩陣的過程。
假設咱們有這樣的一些交易數據:
55FRL8G23B,1381907878,2013-01-09,129 9SX0DJG9L4,1381907879,2013-01-09,34 0ANMD0T113,1381907880,2013-01-09,144 W1F4412PA8,1381907881,2013-01-09,26 22Z10EAYC3,1381907882,2013-01-09,167 3R56N17P1H,1381907883,2013-01-09,25 LK0P6K3DE4,1381907884,2013-01-09,25 3A4S4BMPMU,1381907885,2013-01-09,113 OF4CEO2814,1381907886,2013-01-09,138 9ICNYOFS41,1381907887,2013-01-09,79 N4EOB264U6,1381907888,2013-01-09,108 3C0WARCKYJ,1381907889,2013-01-09,204 PTT3BI00AZ,1381907890,2013-01-09,27 14UHHAVQ2Q,1381907891,2013-01-09,73 Z9GFE6TDKF,1381907892,2013-01-09,32 ...
數據意義爲(交易ID,顧客ID,交易日期,交易金額)。
這一節的目標是將交易序列(transcation sequence)轉化爲一個狀態序列(state sequence)。咱們的應用的目的是爲了生成如下的輸出 $$ customerId(Date_1,Amount_1);(Date_2,Amount_2),...,(Date_n,Amount_n) $$ 其中日期按增序排序。
咱們要把這個輸出轉化爲一個狀態序列,以下所示: $$ customerId,State_1,State_2,...,State_n $$
咱們還須要將(purchase-date,amount)對的有序序列轉換爲一組馬爾科夫鏈狀態。狀態由一個代號(兩個字母)表示,各個字母的定義以下所示:
上一次交易後通過的時間 | 與前次交易相比的交易額 |
---|---|
S:小 | L:顯著小於 |
M:中 | E:基本相同 |
L:大 | G:顯著大於 |
所以,咱們能夠獲得9種狀態
狀態名 | 上一次交易後的通過時間與其次交易相比的交易額 |
---|---|
SL | 小:顯著小於 |
SE | 小:基本相同 |
SG | 小:顯著大於 |
ML | 中:顯著小於 |
ME | 中:基本相同 |
MG | 中:顯著大於 |
LL | 大:顯著小於 |
LE | 大:基本相同 |
LG | 大:顯著大於 |
能夠看到,咱們的馬爾科夫鏈模型有9個狀態(9*9矩陣)。
轉換過程可使用一些小腳本實現,可是若是數據量過大,仍是須要一個map過程。
下面是spark的主任務類。下一章節會列出所使用到的輔助類。
package com.sunrun.movieshow.algorithm.markov; import com.sunrun.movieshow.algorithm.common.SparkHelper; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.io.Serializable; import java.util.*; public class SparkMarkov { public static void main(String[] args) { JavaSparkContext sc = SparkHelper.getSparkContext("markov"); // 1.歸約到不一樣的分區 JavaRDD<String> rdd = sc.textFile("data/markov").coalesce(8); // 2.獲得(顧客ID,(交易時間,交易額)) JavaPairRDD<String, Tuple2<Long, Integer>> pairRDD = rdd.mapToPair(line -> { // tokens[0] = customer-id // tokens[1] = transaction-id // tokens[2] = purchase-date // tokens[3] = amount String[] tokens = line.split(","); if (tokens.length != 4) { return null; } else { long date = 0; try { date = DateUtil.getDateAsMilliSeconds(tokens[2]); } catch(Exception e) { // 會有異常數據 } int amount = Integer.parseInt(tokens[3]); Tuple2<Long, Integer> V = new Tuple2<>(date, amount); return new Tuple2<>(tokens[0], V); } }); /** * (V31E55G4FI,(1356969600000,123)) * (301UNH7I2F,(1356969600000,148)) * (PP2KVIR4LD,(1356969600000,163)) * (AC57MM3WNV,(1356969600000,188)) * ... */ // 4.依據交易ID進行分組 - 用戶信息在最後的狀態鏈中是沒用的,但他能夠限定一些狀態切換的內在關係, // 這在不少算法過程當中都是隱含條件,須要經驗判斷。 JavaPairRDD<String, Iterable<Tuple2<Long, Integer>>> customerRDD = pairRDD.groupByKey(); /** * (V31E55G4FI,(1356969600000,123),1356969600000,148)....) * ... */ // 5.建立馬爾科夫狀態序列 (c_id,<(time1,amount),(time2,amount)>) => (String) JavaPairRDD<String, List<String>> stateSequence = customerRDD.mapValues((Iterable<Tuple2<Long,Integer>> dateAndAmount) -> { List<Tuple2<Long,Integer>> list = toList(dateAndAmount); Collections.sort(list, TupleComparatorAscending.INSTANCE); // now convert sorted list (be date) into a "state sequence" List<String> stateSequence1 = toStateSequence(list); return stateSequence1; }); /** * stateSequence.saveAsTextFile("out/" + UUID.randomUUID()); * 顧客id,與其相關的狀態序列 * (K40E0LA5DL,[LL, MG, SG, SL, SG, MG, SL, SG, SL, SG, LL]) * (ICF0KFGK12,[SG, SL, SE, SG, LL, LG]) * (4N0B1U5HVG,[SG, ML, MG, SG, SL, SG, SL, SG, ML]) * (3KJR1907D9,[SG, SL, ML, SG, ML, LG]) * (47620I9LOD,[LG, SL, ML, MG, SG, SL, SG, SL, SG]) */ // 6.接下來,咱們將狀態序列以窗口爲2的方式依次移動,生成一個個形如((LL, MG),1)的對 JavaPairRDD<Tuple2<String, String>, Integer> model = stateSequence.flatMapToPair(s -> { // 輸出形式((fromState,toState),times) ArrayList<Tuple2<Tuple2<String, String>, Integer>> mapperOutput = new ArrayList<>(); List<String> states = s._2; if (states == null) { return Collections.emptyIterator(); } else { for (int i = 0; i < states.size() - 1; i++) { String fromState = states.get(i); String toState = states.get(i + 1); Tuple2<String, String> k = new Tuple2<>(fromState, toState); mapperOutput.add(new Tuple2<>(k, 1)); } } return mapperOutput.iterator(); }); /** * model.saveAsTextFile("out/model"); * ((LG,LL),1) * ((LL,MG),1) * ((MG,SG),1) * ((SG,SL),1) * ((SL,ME),1) * ((ME,MG),1) * */ // 7.咱們須要將這些結果組合歸約,將相同的狀態序列進行合併,即從(f,t),1) => ((f,t),3000)的形式。 JavaPairRDD<Tuple2<String, String>, Integer> markovModel = model.reduceByKey((a, b) -> a + b); /** * markovModel.saveAsTextFile("/out/markov"); * ((LL,SL),993) * ((MG,LL),1859) * ((LE,ME),25) * ((SL,ME),1490) * ((LG,ME),153) * ((ML,LG),3991) * ((ME,ME),58) */ // 8.咱們格式化一下輸出,將其形式轉變爲(f,t,times)的形式 JavaRDD<String> markovFormatModel = markovModel.map(t -> t._1._1 + "\t" + t._1._2 + "\t" + t._2); // 9.將結果存儲到HDFS服務器,固然也能夠存儲到本地,這時候解析類就要使用本地文件系統的API String markovFormatModelStorePath = "hdfs://10.21.1.24:9000/markov/"; markovFormatModel.saveAsTextFile(markovFormatModelStorePath); // 9.將最終結果進行轉換,生成馬爾科夫機率模型 MarkovTableBuilder.transport(markovFormatModelStorePath); /** 這是一個state階的方陣,A(ij)表示狀態i到狀態j的轉化機率 * 0.03318,0.02441,0.6608,0.1373,0.007937,0.02398,0.06456,0.009522,0.03832 * 0.3842,0.02532,0.2709,0.1260,0.009590,0.01331,0.1251,0.01083,0.03477 * 0.6403,0.02881,0.05017,0.1487,0.01338,0.008602,0.08359,0.01446,0.01196 * 0.02081,0.002368,0.7035,0.01170,0.004638,0.1518,0.03901,0.005933,0.06030 * 0.4309,0.02140,0.2657,0.1277,0.009697,0.02006,0.08159,0.009530,0.03344 * 0.1847,0.01816,0.5316,0.1303,0.007175,0.02351,0.06265,0.008136,0.03379 * 0.01847,0.004597,0.6115,0.02068,0.003091,0.1346,0.06828,0.01424,0.1245 * 0.2250,0.01900,0.2427,0.06291,0.006335,0.03182,0.2757,0.03801,0.09847 * 0.2819,0.01944,0.1954,0.07990,0.008015,0.03202,0.2612,0.04427,0.07781 */ } // 將迭代器轉化爲數組 static List<Tuple2<Long,Integer>> toList(Iterable<Tuple2<Long,Integer>> iterable) { List<Tuple2<Long,Integer>> list = new ArrayList<Tuple2<Long,Integer>>(); for (Tuple2<Long,Integer> element: iterable) { list.add(element); } return list; } // 按時間進行排序 static class TupleComparatorAscending implements Comparator<Tuple2<Long, Integer>>, Serializable { final static TupleComparatorAscending INSTANCE = new TupleComparatorAscending(); @Override public int compare(Tuple2<Long, Integer> t1, Tuple2<Long, Integer> t2) { // return -t1._1.compareTo(t2._1); // sorts RDD elements descending return t1._1.compareTo(t2._1); // sorts RDD elements ascending } } // 將一個有序的交易序列(List<Tuple2<Date,Amount>>)轉化爲狀態序列(List<String>),其中各個元素分別表示一個馬爾科夫狀態。 static List<String> toStateSequence(List<Tuple2<Long,Integer>> list){ // 沒有足夠的數據 if(list.size() < 2){ return null; } List<String> stateSequence = new ArrayList<>(); // == 兩兩配對計算結果 Tuple2<Long, Integer> prior = list.get(0); for (int i = 1; i < list.size(); i++) { Tuple2<Long, Integer> current = list.get(i); // === 計算時間差(天),因爲數據是以ms計數的,所以須要轉化爲天(1d = 24*60*60*1000=86400000ms) long dayDiff = (current._1 - prior._1) / 86400000; // === 獲取交易額信息 int priorAmount = prior._2; int currentAmount = current._2; // === 根據業務規則轉化爲字母表示 // ==== 處理時間關係 String dd = null; if(dayDiff < 30){ dd = "S"; }else if(dayDiff < 60){ dd = "M"; }else { dd = "L"; } // ==== 處理金額關係: 使用兩次交易額的比重 String ad = null; if(priorAmount < 0.9 * currentAmount){ ad = "L"; }else if(priorAmount < 1.1 * currentAmount){ ad = "E"; }else{ ad = "G"; } // === 組合結果 String element = dd + ad; stateSequence.add(element); // 大小爲2的窗口前進一格 prior = current; } return stateSequence; } }
日期處理類
package com.sunrun.movieshow.algorithm.markov; import java.text.SimpleDateFormat; import java.util.Date; public class DateUtil { static final String DATE_FORMAT = "yyyy-MM-dd"; static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat(DATE_FORMAT); /** * Returns the Date from a given dateAsString */ public static Date getDate(String dateAsString) { try { return SIMPLE_DATE_FORMAT.parse(dateAsString); } catch(Exception e) { return null; } } /** * Returns the number of milliseconds since January 1, 1970, * 00:00:00 GMT represented by this Date object. */ public static long getDateAsMilliSeconds(Date date) throws Exception { return date.getTime(); } /** * Returns the number of milliseconds since January 1, 1970, * 00:00:00 GMT represented by this Date object. */ public static long getDateAsMilliSeconds(String dateAsString) throws Exception { Date date = getDate(dateAsString); return date.getTime(); } public static String getDateAsString(long timestamp) { return SIMPLE_DATE_FORMAT.format(timestamp); } }
二、輸入輸出工具類
package com.sunrun.movieshow.algorithm.markov; import java.io.InputStream; import java.io.OutputStream; import java.io.BufferedReader; // import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.util.LineReader; /** * This class provides convenient methods for accessing * some Input/Output methods. * * @author Mahmoud Parsian (mahmoud.parsian@yahoo.com) * */ public class InputOutputUtil { public static void close(LineReader reader) { if (reader == null) { return; } // try { reader.close(); } catch (Exception ignore) { } } public static void close(OutputStream stream) { if (stream == null) { return; } // try { stream.close(); } catch (Exception ignore) { } } public static void close(InputStream stream) { if (stream == null) { return; } // try { stream.close(); } catch (Exception ignore) { } } public static void close(FSDataInputStream stream) { if (stream == null) { return; } // try { stream.close(); } catch (Exception ignore) { } } public static void close(BufferedReader reader) { if (reader == null) { return; } // try { reader.close(); } catch (Exception ignore) { } } }
三、馬爾科夫狀態處理類
package com.sunrun.movieshow.algorithm.markov; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 輸入數據生成馬爾科夫狀態表 */ public class MarkovTableBuilder { // 狀態的數量 private int numberOfState; private int scale; // 狀態表 private double[][] table = null; // 狀態序列 private Map<String, Integer> states = null; public MarkovTableBuilder(int numberOfState, int scale){ this(numberOfState); this.scale = scale; } private MarkovTableBuilder(int numberOfState){ this.numberOfState = numberOfState; table = new double[numberOfState][numberOfState]; initStates(); } // 初始化state狀態 private void initStates(){ states = new HashMap<String, Integer>(); states.put("SL", 0); states.put("SE", 1); states.put("SG", 2); states.put("ML", 3); states.put("ME", 4); states.put("MG", 5); states.put("LL", 6); states.put("LE", 7); states.put("LG", 8); } // 將狀態信息添加到狀態表 public void add(StateItem item){ // 獲取該狀態對應的角標 int row = states.get(item.fromState); int column = states.get(item.toState); table[row][column] = item.count; } public void normalize() { // // 拉普拉斯校訂:通常經過將全部的計數+1來進行。see: http://cs.nyu.edu/faculty/davise/ai/bayesText.html for (int r = 0; r < numberOfState; r++) { boolean gotZeroCount = false; for (int c = 0; c < numberOfState; c++) { if(table[r][c] == 0) { gotZeroCount = true; break; } } if (gotZeroCount) { for (int c = 0; c < numberOfState; c++) { table[r][c] += 1; } } } // normalize for (int r = 0; r < numberOfState; r++) { double rowSum = getRowSum(r); for (int c = 0; c < numberOfState; c++) { table[r][c] = table[r][c] / rowSum; } } } // 獲取rowNumber行的累加結果 public double getRowSum(int rowNumber) { double sum = 0.0; for (int column = 0; column < numberOfState; column++) { sum += table[rowNumber][column]; } return sum; } /** * 存儲狀態表:做爲示例,只是將結果輸出便可。 */ private void persist() { for (int row = 0; row < numberOfState; row++) { StringBuilder builder = new StringBuilder(); for (int column = 0; column < numberOfState; column++) { double element = table[row][column]; builder.append(String.format("%.4g", element)); if (column < (numberOfState - 1)) { builder.append(","); } } System.out.println(builder.toString()); } } public static void transport(String markovFormatDataPath){ List<StateItem> items = ReadDataFromHDFS.readDirectory(markovFormatDataPath); MarkovTableBuilder markovTableBuilder = new MarkovTableBuilder(9); for (StateItem item : items) { markovTableBuilder.add(item); } // 歸一化數據 markovTableBuilder.normalize(); // 存儲馬爾科夫狀態表 markovTableBuilder.persist(); } }
四、從文件系統讀取數據的處理類
package com.sunrun.movieshow.algorithm.markov; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; /** * Class containing a number of utility methods for manipulating * Hadoop's SequenceFiles. * * * @author Mahmoud Parsian * */ public class ReadDataFromHDFS { private ReadDataFromHDFS() { } public static List<StateItem> readDirectory(String path) { return ReadDataFromHDFS.readDirectory(new Path(path)); } public static List<StateItem> readDirectory(Path path) { FileSystem fs; try { fs = path.getFileSystem(new Configuration()); } catch (IOException e) { System.out.println("Unable to access the hadoop file system!"); throw new RuntimeException("Unable to access the hadoop file system!"); } List<StateItem> list = new ArrayList<StateItem>(); try { FileStatus[] stat = fs.listStatus(path); for (int i = 0; i < stat.length; ++i) { if (stat[i].getPath().getName().startsWith("part")) { List<StateItem> pairs = readFile(stat[i].getPath(), fs); list.addAll(pairs); } } } catch (IOException e) { System.out.println("Unable to access the hadoop file system!"); throw new RuntimeException("Error reading the hadoop file system!"); } return list; } @SuppressWarnings("unchecked") public static List<StateItem> readFile(Path path, FileSystem fs) { List<StateItem> list = new ArrayList<StateItem>(); FSDataInputStream stream = null; BufferedReader reader = null; try { stream = fs.open(path); reader = new BufferedReader(new InputStreamReader(stream)); String line; while ((line = reader.readLine()) != null) { // line = <fromState><,><toState><TAB><count> String[] tokens = line.split("\t"); // TAB separator if (tokens.length == 3) { StateItem item = new StateItem(tokens[0], tokens[1], Integer.parseInt(tokens[2])); list.add(item); } } } catch (IOException e) { System.out.println("readFileIntoCoxRegressionItem() failed!"); throw new RuntimeException("readFileIntoCoxRegressionItem() failed!"); } finally { InputOutputUtil.close(reader); InputOutputUtil.close(stream); } return list; } public static void main(String[] args) throws Exception { String path = "hdfs://10.21.1.24:9000/markov/"; List<StateItem> list = readDirectory(path); System.out.println("list="+list.toString()); } }
五、馬爾科夫狀態實體類
package com.sunrun.movieshow.algorithm.markov; /** * TableItem represents an item of a Markov State Transition Model * as a Tuple3<fromSate, toState, count> * */ public class StateItem { String fromState; String toState; int count; public StateItem(String fromState, String toState, int count) { this.fromState = fromState; this.toState = toState; this.count = count; } /** * for debugging ONLY */ public String toString() { return "{"+fromState+"," +toState+","+count+"}"; } }
前面我已經在代碼註釋中拿出了最後的分析結果:
SL | SE | SG | ML | ME | MG | LL | LE | LG | |
---|---|---|---|---|---|---|---|---|---|
SL | 0.03318 | 0.02441 | 0.6608 | 0.1373 | 0.007937 | 0.02398 | 0.06456 | 0.009522 | 0.03832 |
SE | 0.3842 | 0.02532 | 0.2709 | 0.1260 | 0.009590 | 0.01331 | 0.1251 | 0.01083 | 0.03477 |
SG | 0.6403 | 0.02881 | 0.05017 | 0.1487 | 0.01338 | 0.008602 | 0.08359 | 0.01446 | 0.01196 |
ML | 0.02081 | 0.002368 | 0.7035 | 0.01170 | 0.004638 | 0.1518 | 0.03901 | 0.005933 | 0.06030 |
ME | 0.4309 | 0.02140 | 0.2657 | 0.1277 | 0.009697 | 0.02006 | 0.08159 | 0.009530 | 0.03344 |
MG | 0.1847 | 0.01816 | 0.5316 | 0.1303 | 0.007175 | 0.02351 | 0.06265 | 0.008136 | 0.03379 |
LL | 0.01847 | 0.004597 | 0.6115 | 0.02068 | 0.003091 | 0.1346 | 0.06828 | 0.01424 | 0.1245 |
LE | 0.2250 | 0.01900 | 0.2427 | 0.06291 | 0.006335 | 0.03182 | 0.2757 | 0.03801 | 0.09847 |
LG | 0.2819 | 0.01944 | 0.1954 | 0.07990 | 0.008015 | 0.03202 | 0.2612 | 0.04427 | 0.07781 |
數據算法 —— Mahmoud Parsian(很喜歡該做者寫的這本書,很是詳細)
mark_plan.rb
#!/usr/bin/ruby require '../lib/util.rb' require 'Date' xaction_file = ARGV[0] model_file = ARGV[1] userXactions = {} model = [] states = ["SL", "SE", "SG", "ML", "ME", "MG", "LL", "LE", "LG"] # read all xactions File.open(xaction_file, "r").each_line do |line| items = line.split(",") custID = items[0] if (userXactions.key? custID) hist = userXactions[custID] else hist = [] userXactions[custID] = hist end hist << items[2, items.size - 2] end #read model File.open(model_file, "r").each_line do |line| items = line.split(",") #puts "#{line}" row = [] items.each do |item| row << item.to_i #puts "#{item}" end model << row #puts "#{row}" end # marketing time userXactions.each do |cid, xactions| seq = [] last_date = Date.parse "2000-01-01" xactions.each_with_index do |xaction, index| if (index > 0) prevXaction = xactions[index-1] prDate = Date.parse prevXaction[0] prAmt = prevXaction[1].to_i date = Date.parse xaction[0] last_date = date amt = xaction[1].to_i daysDiff = date - prDate amtDiff = amt - prAmt if (daysDiff < 30) dd = "S" elsif (daysDiff < 60) dd = "M" else dd = "L" end if (prAmt < 0.9 * amt) ad = "L" elsif (prAmt < 1.1 * amt) ad = "E" else ad = "G" end seq << (dd + ad) end end if (!seq.empty?) last = seq[-1] row_index = states.index(last) row = model[row_index] max_col = row.max col_index = row.index(max_col) next_state = states[col_index] if (next_state.start_with?("S")) next_date = last_date + 15 elsif (next_state.start_with?("M")) next_date = last_date + 45 else next_date = last_date + 90 end #puts "#{cid}, #{last}, #{row_index}, #{max_col}, #{col_index}, #{next_state}, #{last_date}, #{next_date}" puts "#{cid}, #{next_date}" end end
buy_xaction.rb
#!/usr/bin/ruby require '../lib/util.rb' require 'Date' custCount = ARGV[0].to_i daysCount = ARGV[1].to_i visitorPercent = ARGV[2].to_f custIDs = [] xactionHist = {} # transition probability matrix idGen = IdGenerator.new 1.upto custCount do custIDs << idGen.generate(10) end xid = Time.now().to_i date = Date.parse "2013-01-01" 1.upto daysCount do numXaction = visitorPercent * custCount factor = 85 + rand(30) numXaction = (numXaction * factor) / 100 1.upto numXaction do custID = custIDs[rand(custIDs.size)] if (xactionHist.key? custID) hist = xactionHist[custID] lastXaction = hist[-1] lastDate = lastXaction[0] lastAmt = lastXaction[1] numDays = date - lastDate if (numDays < 30) amount = lastAmt < 40 ? 50 + rand(20) - 10 : 30 + rand(10) - 5 elsif (numDays < 60) amount = lastAmt < 80 ? 100 + rand(40) - 20 : 60 + rand(20) - 10 else amount = lastAmt < 150 ? 180 + rand(60) - 30 : 120 + rand(40) - 20 end else hist = [] xactionHist[custID] = hist amount = 40 + rand(180) end xaction = [] xaction << date xaction << amount hist << xaction xid = xid + 1 puts "#{custID},#{xid},#{date},#{amount}" end date = date.next end