交易反欺詐是VoltDB適用場景之一,是典型的事件驅動的業務,核心是攝取高頻的交易數據,並逐條對交易進行一系列複雜的反欺詐規則校驗,最終生成評判交易可疑度的分值,發送給下游業務系統,觸發交易攔截動做。
反欺詐規則中涉及大量的經過分析歷史交易生成的指標項,在VoltDB中進行流式計算,可基於本地保存的豐富的上下文數據對事件進行分析決策,使實時計算靠近上下文數據,得到性能優點。java
下面咱們經過一個刷卡的應用,展現VoltDB是如何實現一個簡單的反欺詐用例的。爲了讓示例代碼更加簡潔,又能突出VoltDB的功能,這裏使用一個地鐵刷卡的場景替代金融交易(如信用卡刷卡),以免引入過多專業的金融業務知識。同時一個繁忙地鐵系統產生的交易吞吐量不可小覷,定義的反欺詐規則也更容易理解。
能夠經過這個連接來訪問詳細的代碼https://github.com/ssomagani/event-driven-transactions
在這個應用中,模擬以下幾個場景:node
1. 啓用VoltDB Topic功能
VoltDB提供一個統一的配置文件,主要的特性均可以在其中進行定義,如:持久化、高可用、安全性等等,這裏主要介紹與案例相關的VoltDB Topic功能。以下配置開啓了Topic服務,並在服務器上開啓端口9999,用於接受客戶端發來的消息。git
<Topics enabled="true"> <properties> <property name="port">9999</property> <property name="group.initial.rebalance.delay.ms">0</property> <property name="retention.policy.threads">1</property> </properties> <profiles> <profile name="retain_compact"> <retention policy="compact" limit="2048" /> </profile> </profiles> </Topics>
2.根據特定配置文件啓動VoltDB
3.建立Topic,Topic的用途後面的代碼分析中提到github
CREATE Topic TRAINTOPIC execute procedure train_events.insert; CREATE TOPIC RECHARGE execute procedure RechargeCard; CREATE TOPIC using stream CARD_ALERT_EXPORT properties(topic.format=avro); create topic using stream FRAUD properties(topic.format=avro,consumer.keys=TRANS_ID);
4.建立數據表
在處理實時事件流時,能夠充分利用底層的數據庫引擎,充分利用本地關係型數據進行數據分析,獲得反欺詐業務指標。在本例中將建立以下數據表和視圖(省略具體DDL)
5.初始化數據
經過VoltDB的數據導入功能,從csv文件中初始化站點和列車sql
csvloader --file $PROJ_HOME/data/redline.csv --reportdir log stations csvloader --file $PROJ_HOME/data/trains.csv --reportdir log trains
在這個場景中,客戶端模擬8輛列車在17個站點之間運行,產生進站事件併發送到Topic。因爲設定的列車進出站時間比較短(微秒爲單位),因此會產生高頻事件流。
在服務端,VoltDB完成:
1.消息接收
2.消費消息
3.將列車進站事件記錄到數據庫中
在客戶端,經過java類TrainProducer生成多輛列車進站事件,並將事件發送到VoltDB Topic中。TrainProducer的執行命令以下:數據庫
java metro.pub.TrainProducer localhost:9999 TRAINTOPIC 8
TrainProducer類接收四個參數:apache
分析一下TrainProducer的主要方法,main方法生成10個線程,每50毫秒執行一次publish()方法,將列車進出站時間發送到Topic「TRAINTOPIC」中。bootstrap
public static void main(String[] args) { ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(10); TrainProducer producer = new TrainProducer(args[0], args[1], Integer.parseInt(args[2])); System.out.println("Scheduling trains"); EXECUTOR.scheduleAtFixedRate ( () -> { producer.publish(producer.getNewEvents()); }, 1, 50, MILLISECONDS); }
跟蹤代碼找到producer的定義,它其實就是原生的KafkaProducer,因此能夠看到VoltDB Topic徹底兼容kafka api。而brokers便是main方法中的傳參localhost:9999,所以上面producer.getNewEvents()方法生成的數據將被髮送到VoltDB Topic中。api
private Producer<String, TrainEvent> createProducer() { Properties props = new Properties(); props.put("bootstrap.servers", brokers); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "metro.serde.TrainEventSer"); Producer<String, TrainEvent> producer = new KafkaProducer <String, TrainEvent>(props); return producer; }
Publish方法所發送的消息由producer.getNewEvents()方法生成。有必要提早看一下Stations類,其中定義了17個火車站點,包括每一個站點的到下一個站點的運行時間(Station.nextStnDuration)和本站點停車時間(Station.stnWaitDuration),時間以微秒爲單位。全部列車將依次在這些站點中運行。安全
static HashMap<Integer, Station> idToStationMap = new HashMap<>(); static { idToStationMap.put(1, new Station(1, 1200000, 450000)); idToStationMap.put(2, new Station(2, 1050000, 250000)); idToStationMap.put(3, new Station(3, 850000, 300000)); idToStationMap.put(4, new Station(4, 900000, 350000)); idToStationMap.put(5, new Station(5, 500000, 260000)); idToStationMap.put(6, new Station(6, 950000, 190000)); idToStationMap.put(7, new Station(7, 450000, 130000)); idToStationMap.put(8, new Station(8, 200000, 280000)); idToStationMap.put(9, new Station(9, 200000, 110000)); idToStationMap.put(10, new Station(10, 450000, 300000)); idToStationMap.put(11, new Station(11, 550000, 200000)); idToStationMap.put(12, new Station(12, 550000, 200000)); idToStationMap.put(13, new Station(13, 800000, 150000)); idToStationMap.put(14, new Station(14, 950000, 100000)); idToStationMap.put(15, new Station(15, 1000000, 130000)); idToStationMap.put(16, new Station(16, 1200000, 220000)); idToStationMap.put(17, new Station(17, 1500000, 500000)); } public static class Station { public final int stationId; public final int nextStnDuration; public final int stnWaitDuration; public Station(int stationId, int nextStnDuration, int stnWaitDuration) { this.stationId = stationId; this.nextStnDuration = nextStnDuration; this.stnWaitDuration = stnWaitDuration; } }
因此getNewEvents主要的邏輯是首先隨機設定列車從任意站點出發,而後調用next()根據系統當前時間和站點的Station.nextStnDuration、Station.stnWaitDuration來判斷每輛列車目前運行到哪一個站點,若是next返回的LastKnownLocation對象有變化,則判斷列車已進入下一站,將列車進站事件trainEvent放到records中,用於發送給Topic。(注:列車調度不是本樣例的重點,所以next方法不會考慮列車的衝突問題,它假設站點之間由足夠多的軌道,能夠供多個列車並行)。
public List<TrainEvent> getNewEvents() { ArrayList<TrainEvent> records = new ArrayList<>(); for(TrainEvent trainEvent : idToTrainMap.values()) { LastKnownLocation prevLoc = trainEvent.location; LastKnownLocation curLoc = next(prevLoc, LocalDateTime.now()); if(!prevLoc.equals(curLoc)) { trainEvent = new TrainEvent(trainEvent.trainId, curLoc); idToTrainMap.put(trainEvent.trainId, trainEvent); records.add(trainEvent); } } return records; }
Topic TRAINTOPIC定義以下,train_events.insert是VoltDB爲表建立的默認存儲過程,命名規則爲[tablename].insert。Topic與存儲過程連用,表示存儲過程train_events.insert消費該Topic TRAINTOPIC中的trainEvent數據,並寫入train_events表中。
CREATE Topic TRAINTOPIC execute procedure train_events.insert;
在這個場景中,客戶端將完成充值消息發送。
在服務端,VoltDB完成:
在客戶端經過執行java類CardsProducer,首先初始化公交卡記錄,並將記錄寫入數據庫表中。而後隨機生成卡片充值事件,發送事件到Topic RECHARGE中。CardsProducer的執行命令以下:
java metro.pub.CardsProducer --mode=recharge --servers=localhost:9999 --Topic=RECHARGE
CardsProducer類接收三個參數:
public static void main(String[] args) throws IOException { CONFIG.parse("CardsProducer", args); if(CONFIG.mode.equals("new")) { genCards(CONFIG); return; } ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(10); CardsProducer producer = new CardsProducer(CONFIG.servers, CONFIG.Topic); System.out.println("Recharging Cards"); EXECUTOR.scheduleAtFixedRate ( () -> { producer.publish(producer.getRechargeActivityRecords(1)); }, 1, 5, MILLISECONDS); }
和前面TrainProducer同樣,CardsProducer中的 producer也是KafkaProducer,很少介紹。getRechargeActivityRecords方法用來生成一條隨機的充值事件,包括卡號、充值金額和充值站點。每5毫秒執行一次。
public List<CardEvent> getRechargeActivityRecords(int count) { final ArrayList<CardEvent> records = new ArrayList<>(); int amt = (ThreadLocalRandom.current().nextInt(18)+2)*1000; int stationId = ThreadLocalRandom.current().nextInt(1, 18); ThreadLocalRandom.current().ints(count, 0, CONFIG.cardcount).forEach((cardId) -> { records.add(new CardEvent(cardId, amt, stationId)); } ); return records; }
這個場景中,Client端的代碼很是簡單,到此爲止。更多的邏輯在服務端定義,請看如下。
Topic用於接收充值事件,它的定義以下:
CREATE TOPIC RECHARGE execute procedure RechargeCard;
其中RechargeCard用於消費Topic中的數據,而RechargeCard是一個java procedure,它經過java+sql的方式,自定義了業務邏輯。java procedure是VoltDB在處理流數據時常常用到的對象,它是一個運行在VoltDB服務端的java類,而非client端代碼。它須要提早編譯成jar包(以下procs.jar),並加載到VoltDB java 運行時環境中。以後使用以下DDL定義。定義了RechargeCard後,在上面的CREATE TOPIC中才能被引用。
sqlcmd --query="load classes $PROJ_HOME/dist/procs.jar" CREATE PROCEDURE PARTITION ON TABLE cards COLUMN card_id PARAMETER 0 FROM CLASS metro.cards.RechargeCard;
讓咱們看一下RechargeCard中的邏輯,重點關注如何將java業務邏輯與SQL進行結合。其中定義run()方法和四個sql語句。RechargeCard從Topic RECHARGE中消費數據,進行反序列化以後,逐條將數據(即充值事件)做爲傳參交給run()方法,run()是procedure的入口方法。
voltQueueSQL是VoltDB的server 端api,用來執行sql並返回結果。Sql getCard和getStationName首先根據從Topic中獲取的數據進行充值事件合法性校驗,若是數據庫中沒有對應的充值站點或公交卡記錄,則執行sql exportNotif寫入一條錯誤信息。不然,update VoltDB數據庫中對應公交卡,增長餘額,並執行sql exportNotif寫入一條成功信息。
public class RechargeCard extends VoltProcedure { public final SQLStmt updateBalance = new SQLStmt("UPDATE cards SET balance = balance + ? WHERE card_id = ? AND card_type = 0"); public final SQLStmt getCard = new SQLStmt("SELECT * from cards WHERE card_id = ?"); public final SQLStmt exportNotif = new SQLStmt("INSERT INTO CARD_ALERT_EXPORT values (?, NOW, ?, ?, ?, ?, ?, ?)"); public final SQLStmt getStationName = new SQLStmt("SELECT name FROM stations WHERE station_id = ?"); public long run(int cardId, int amt, int stationId) { voltQueueSQL(getStationName, stationId); voltQueueSQL(getCard, cardId); String station = "UNKNOWN"; final VoltTable[] results = voltExecuteSQL(); if(results.length == 0) exportError(cardId, station); VoltTable stationResult = results[0]; if(stationResult.advanceRow()) station = stationResult.getString(0); VoltTable card = results[1]; if(card.advanceRow()) { voltQueueSQL(updateBalance, amt, cardId); String name = card.getString(5); String phone = card.getString(6); String email = card.getString(7); int notify = (int) card.getLong(8); voltQueueSQL(updateBalance, amt, cardId); voltQueueSQL(exportNotif, cardId, station, name, phone, email, notify, "Card recharged successfully"); voltExecuteSQL(true); } else { exportError(cardId, station); } return 0; } private void exportError(int cardId, String station) { exportError(cardId, station, "", "", "", 0, "Could not locate details of card for recharge"); } private void exportError(int cardId, String station, String name, String phone, String email, int notify, String msg) { voltQueueSQL(exportNotif, cardId, station, name, phone, email, notify, msg); voltExecuteSQL(true); } }
exportNotif的定義以下,其中CARD_ALERT_EXPORT是VoltDB的stream數據庫對象,一種數據管道,insert進去的數據逐一流過。
public final SQLStmt exportNotif = new SQLStmt("INSERT INTO CARD_ALERT_EXPORT values (?, NOW, ?, ?, ?, ?, ?, ?)");
能夠在CARD_ALERT_EXPORT上添加數據處理邏輯,實現流計算效果。這個場景中,簡單的在Stream上建立了一個視圖,用於生成實時統計報表。視圖的定義以下:
CREATE VIEW card_export_stats(card_id, station_name, rechargeCount) AS SELECT card_id, station_name, count(*) from CARD_ALERT_EXPORT GROUP BY card_id, station_name;
最後,咱們定義Stream中的數據最終流向另外的Topic,該Topic可讓VoltDB以外的大數據產品進行消費,完成下游數據處理邏輯。
CREATE TOPIC using stream CARD_ALERT_EXPORT properties(Topic.format=avro);
這個場景中,客戶端隨機生成大量乘客刷卡進站記錄,併發送給數據庫處理。
服務端完成以下操做:
1.首先進行一系列校驗,如驗證卡信息,卡餘額,是否盜刷等反欺詐操做。
2.將全部刷卡行爲都記錄到數據表中。並將餘額不足和複合欺詐邏輯的刷卡事件分別發佈到不一樣的Topic中,供其餘下游系統訂閱。
在客戶端經過執行java類RidersProducer,與前面兩個場景不一樣,RidersProducer類直接鏈接VoltDB數據庫將數據寫入數據表中,而不是將數據發送到VoltDB Topic中。用來展現VoltDB的多種使用方式。
connectToOneServerWithRetry使用VoltDB client api鏈接指定ip的VoltDB數據庫。
void connectToOneServerWithRetry(String server, Client client) { int sleep = 1000; while (true) { try { client.createConnection(server); break; } catch (Exception e) { System.err.printf("Connection failed - retrying in %d second(s).\n", sleep / 1000); try { Thread.sleep(sleep); } catch (Exception interruted) {} if (sleep < 8000) sleep += sleep; } } System.out.printf("Connected to VoltDB node at: %s.\n", server); }
RidersProducer類建立100個線程,runBenchmark方法中每200毫秒這些線程執行一次getEntryActivityRecords。getEntryActivityRecords隨機生成一條乘客進站乘車記錄,記錄內容包括卡號、當前時間、進站站點id等
private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(100); public void runBenchmark() throws Exception { int microsPerTrans = 1000000/RidersProducer.config.rate; EXECUTOR.scheduleAtFixedRate ( () -> { List<Object[]> entryRecords = getEntryActivityRecords(config.cardcount);//生成隨機的進站記錄 call(config.cardEntry, entryRecords);//將數據發送到VoltDB數據庫 }, 10000, microsPerTrans, MICROSECONDS); } public static List<Object[]> getEntryActivityRecords(int count) { final ArrayList<Object[]> records = new ArrayList<>(); long curTime = System.currentTimeMillis(); ThreadLocalRandom.current().ints(1, 0, count).forEach((cardId) -> { records.add(new Object[] {cardId, curTime, Stations.getRandomStation().stationId, ENTER.value, 0}); } ); return records; }
接着調用call方法,將數據records發送到數據庫進行處理。Call方法定義以下,callProcedure是VoltDB的client端api,用於將數據發送給指定名稱的procedure進行處理,能夠經過同步和異步IO兩種方式進行調用,異步調用時須要指定回調函數對數據庫調用的返回結果進行處理,即本例中的自定義了BenchmarkCallback。
protected static void call(String proc, Object[] args) { try { client.callProcedure(new BenchmarkCallback(proc, args), procName, args); } catch (IOException e) { e.printStackTrace(); } }
Call方法將數據發送給procedure,procedure名稱由以下代碼指定。一塊兒看看procedure中的具體邏輯。
@Option(desc = "Proc for card entry swipes") String cardEntry = "ValidateEntry";
Procedure ValidateEntry的部分定義,首先定義了6個SQL。
//查詢公交卡是否存在 public final SQLStmt checkCard = new SQLStmt( "SELECT enabled, card_type, balance, expires, name, phone, email, notify FROM cards WHERE card_id = ?;"); //卡充值 public final SQLStmt chargeCard = new SQLStmt( "UPDATE cards SET balance = ? WHERE card_id = ?;"); //查詢指定站點的入站費用 public final SQLStmt checkStationFare = new SQLStmt( "SELECT fare, name FROM stations WHERE station_id = ?;"); //記錄進站事件 public final SQLStmt insertActivity = new SQLStmt( "INSERT INTO card_events (card_id, date_time, station_id, activity_code, amount, accept) VALUES (?,?,?,?,?,?);"); //再次用到card_alert_export 這個stream對象,用於發送公交卡欠費消息 public final SQLStmt exportActivity = new SQLStmt( "INSERT INTO card_alert_export (card_id, export_time, station_name, name, phone, email, notify, alert_message) VALUES (?,?,?,?,?,?,?,?);"); //將刷卡欺詐行爲寫入stream對象fraud中 public final SQLStmt publishFraud = new SQLStmt( "INSERT INTO fraud (trans_id, card_id, date_time, station, activity_type, amt) values (?, ?, ?, ?, ?, ?)" );
值得說明的,上面最後一個sql中用到的fraud是另一個stream對象,用於插入刷卡欺詐事件,經過DDL定義其中的刷卡欺詐行爲最終會發布到VoltDB Topic中,用於下游處理產品消費。
CREATE STREAM FRAUD partition on column CARD_ID ( TRANS_ID varchar not null, CARD_ID integer not null, DATE_TIME timestamp not null, STATION integer not null, ACTIVITY_TYPE TINYINT not null, AMT integer not null ); create Topic using stream FRAUD properties(Topic.format=avro,consumer.keys=TRANS_ID);
前面已經提到run方法是procedure的入口方法,VoltDB運行procedure時,自動調用該方法。前面客戶端傳進的records記錄,被逐一傳遞到run方法到參數中進行處理。run方法定義以下
public VoltTable run(int cardId, long tsl, int stationId, byte activity_code, int amt) throws VoltAbortException { //查詢公交卡是否存在 voltQueueSQL(checkCard, EXPECT_ZERO_OR_ONE_ROW, cardId); //查詢指定站點的交通費用 voltQueueSQL(checkStationFare, EXPECT_ONE_ROW, stationId); VoltTable[] checks = voltExecuteSQL(); VoltTable cardInfo = checks[0]; VoltTable stationInfo = checks[1]; byte accepted = 0; //若是公交卡記錄等於0,說明卡不存在 if (cardInfo.getRowCount() == 0) { //記錄刷卡行爲到數據庫表中,將accept字段置爲拒絕「REJECTED」 voltQueueSQL(insertActivity, cardId, tsl, stationId, ACTIVITY_ENTER, amt, ACTIVITY_REJECTED); voltExecuteSQL(true); //返回「被拒絕」消息給客戶端。 return buildResult(accepted,"Card Invalid"); } // 若是卡存在,則取出卡信息。 cardInfo.advanceRow(); //卡狀態,0不可用,1可用 int enabled = (int)cardInfo.getLong(0); int cardType = (int)cardInfo.getLong(1); //卡餘額 int balance = (int)cardInfo.getLong(2); TimestampType expires = cardInfo.getTimestampAsTimestamp(3); String owner = cardInfo.getString(4); String phone = cardInfo.getString(5); String email = cardInfo.getString(6); int notify = (int)cardInfo.getLong(7); // 查詢指定站點的進站費用 stationInfo.advanceRow(); //指定站點的進站費用 int fare = (int)stationInfo.getLong(0); String stationName = stationInfo.getString(1); // 刷卡時間 TimestampType ts = new TimestampType(tsl); // 若是卡狀態爲不可用 if (enabled == 0) { //向客戶端返回「此卡不可用」 return buildResult(accepted,"Card Disabled"); } // 若是卡類型爲「非月卡」 if (cardType == 0) { // 若是卡內餘額充足 if (balance > fare) { //isFrand爲反欺詐策略,後面介紹 if (isFraud(cardId, ts, stationId)) { // 若是認定爲欺詐,記錄刷卡記錄,記錄類型爲「欺詐刷卡」 voltQueueSQL(insertActivity, cardId, ts, stationId, ACTIVITY_ENTER, fare, ACTIVITY_FRAUD); //而且把欺詐事件寫入stream,並最終被髮布到VoltDB Topic中。見前面STREAM FRAUD到ddl定義 voltQueueSQL(publishFraud, generateId(cardId, tsl), cardId, ts, stationId, ACTIVITY_ENTER, amt); voltExecuteSQL(true); //向客戶端返回「欺詐交易」消息 return buildResult(0, "Fraudulent transaction"); } else { // 若是不是欺詐行爲,則減小卡內餘額,完成正常消費 voltQueueSQL(chargeCard, balance - fare, cardId); //記錄正常的刷卡事件 voltQueueSQL(insertActivity, cardId, ts, stationId, ACTIVITY_ENTER, fare, ACTIVITY_ACCEPTED); voltExecuteSQL(true); //向客戶端返回卡內餘額 return buildResult(1, "Remaining Balance: " + intToCurrency(balance - fare)); } } else { // 若是卡內餘額不足,記錄刷卡失敗事件。 voltQueueSQL(insertActivity, cardId, ts, stationId, ACTIVITY_ENTER, 0, ACTIVITY_REJECTED); if (notify != 0) { //再次用到card_alert_export 這個stream對象,用於發送公交卡欠費消息 voltQueueSQL(exportActivity, cardId, getTransactionTime().getTime(), stationName, owner, phone, email, notify, "Insufficient Balance"); } voltExecuteSQL(true); //向客戶端返回「餘額不足「消息 return buildResult(0,"Card has insufficient balance: "+intToCurrency(balance)); } } }
以上代碼中有一個isFraud方法,用於斷定是否爲欺詐性刷卡。這裏定義了一些簡單反欺詐規則
同一張卡在過去一小時內,有過10次以上刷卡進站記錄。進出站次數太多,暫停使用一段時間。
isFraud方法根據當前刷卡記錄中的數據,結合數據庫中的歷史記錄實現以上反欺詐規則。歷史刷卡記錄被保存在card_events表中,另外基於這張表建立了視圖,統計每張卡在一秒鐘內是否有過刷卡記錄。
CREATE VIEW CARD_HISTORY_SECOND as select card_id, TRUNCATE(SECOND, date_time) scnd from card_events group by card_id, scnd; isFraud方法的定義 public final SQLStmt cardHistoryAtStations = new SQLStmt( "SELECT activity_code, COUNT(DISTINCT station_id) AS stations " + "FROM card_events " + "WHERE card_id = ? AND date_time >= DATEADD(HOUR, -1, ?) " + "GROUP BY activity_code;" ); public final SQLStmt cardEntries = new SQLStmt( "SELECT activity_code " + "FROM card_events " + "WHERE card_id = ? AND station_id = ? AND date_time >= DATEADD(HOUR, -1, ?) " + "ORDER BY date_time;" ); public final SQLStmt instantaneousCardActivity = new SQLStmt( "SELECT count(*) as activity_count " + "FROM CARD_HISTORY_SECOND " + "WHERE card_id = ? " + "AND scnd = TRUNCATE(SECOND, ?) " + "GROUP BY scnd;" ); public boolean isFraud(int cardId, TimestampType ts, int stationId) { voltQueueSQL(instantaneousCardActivity, cardId, ts); voltQueueSQL(cardHistoryAtStations, cardId, ts); voltQueueSQL(cardEntries, cardId, stationId, ts); final VoltTable[] results = voltExecuteSQL(); final VoltTable cardInstantaneousActivity = results[0]; final VoltTable cardHistoryAtStationisTable = results[1]; final VoltTable cardEntriesTable = results[2]; //一秒鐘以內已經有一次刷卡記錄的話,返回true while (cardInstantaneousActivity.advanceRow()) { if(cardInstantaneousActivity.getLong("activity_count") > 0) { return true; } } while (cardHistoryAtStationisTable.advanceRow()) { final byte activity_code = (byte) cardHistoryAtStationisTable.getLong("activity_code"); final long stations = cardHistoryAtStationisTable.getLong("stations"); if (activity_code == ACTIVITY_ENTER) { // 過去1小時以內在五個站點刷卡進站,返回true if (stations >= 5) { return true; } } } byte prevActivity = ACTIVITY_INVALID; int entranceCount = 0; while (cardEntriesTable.advanceRow()) { final byte activity_code = (byte) cardHistoryAtStationisTable.getLong("activity_code"); if (prevActivity == ACTIVITY_INVALID || prevActivity == activity_code) { if (activity_code == ACTIVITY_ENTER) { prevActivity = activity_code; entranceCount++; } else { prevActivity = ACTIVITY_INVALID; } } } // 若是在過去1小時內有10次連續的刷卡記錄,返回true。 if (entranceCount >= 10) { return true; } return false; }
您看好VoltDB嗎? 立刻行動吧!
歡迎私信,與更多小夥伴一塊兒探討。
關於VoltDBVoltDB支持強ACID和實時智能決策的應用程序,以實現互聯世界。沒有其它數據庫產品能夠像VoltDB這樣,能夠同時須要低延時、大規模、高併發數和準確性相結合的應用程序加油。VoltDB由2014年圖靈獎得到者Mike Stonebraker博士建立,他對關係數據庫進行了從新設計,以應對當今不斷增加的實時操做和機器學習挑戰。Stonebraker博士對數據庫技術研究已有40多年,在快速數據,流數據和內存數據庫方面帶來了衆多創新理念。在VoltDB的研發過程當中,他意識到了利用內存事務數據庫技術挖掘流數據的所有潛力,不但能夠知足處理數據的延遲和併發需求,還能提供實時分析和決策。VoltDB是業界可信賴的名稱,在諾基亞、金融時報、三菱電機、HPE、巴克萊、華爲等領先組織合做有實際場景落地案例。