上一章完成了c#訪問hbase的sdk封裝,接下來以一個具體Demo對sdk進行測試驗證。場景:每5秒抓取指定股票列表的實時價格波動行情,數據下載後,一方面實時刷新UI界面,另外一方面將數據放入到在內存中模擬的MQ (實際生產狀況,可用kafka等集羣代替)->存入HBase數據庫。提供按指定時間範圍股票價格數據查詢。html
目錄:shell
示例說明:數據庫
示例效果圖:c#
rest server運行狀態檢查:async
獲取股票實時數據代碼:ide
public class SnatchFormSina { #region SnatchFormSina HttpClient client; private const string dataurl = "http://hq.sinajs.cn/list={0}"; public SnatchFormSina() { this.client = new HttpClient(); } /// <summary> /// /// </summary> public static SnatchFormSina Current { get { return new SnatchFormSina(); } } #endregion #region GetCurrentInfos /// <summary> /// /// </summary> /// <param name="stockIDs"></param> /// <returns></returns> public async Task<List<StockRealInfo>> GetCurrentInfosAsync(List<string> stockIDs) { List<StockRealInfo> list = new List<StockRealInfo>(); string dataUrl = this.ParseStockIDs(stockIDs); dataUrl = dataUrl.Substring(0, dataUrl.Length - 1); string realInfo = await this.client.GetStringAsync(dataUrl); string[] infos = realInfo.Split('\n'); StockRealInfo stockInfo; foreach (string info in infos) { if (string.IsNullOrEmpty(info)) continue; stockInfo = new StockRealInfo(info); stockInfo.ID = SimulatorCache.StockAccount[stockInfo.Name]; SimulatorCache.StockInfos[stockInfo.ID] = stockInfo; list.Add(stockInfo); } return list; } #endregion #region ParseStockIDs /// <summary> /// /// </summary> /// <param name="stockIDs"></param> /// <returns></returns> private string ParseStockIDs(List<string> stockIDs) { StringBuilder sb = new StringBuilder(); foreach(string id in stockIDs) { if (id.Substring(0, 2) == "60")//上海是600打頭 { sb.Append(string.Format("sh{0},", id)); } else if (id.Substring(0, 2) == "51")//上海基金 { sb.Append(string.Format("sh{0},", id)); } else //if (stockIDs.Substring(0, 2) == "00")//深圳 { sb.Append(string.Format("sz{0},", id)); } } sb[sb.Length - 1].ToString().Replace(",", ""); return string.Format(dataurl, sb.ToString()); } #endregion #region ValiateStockID /// <summary> /// /// </summary> /// <param name="stockIDs"></param> /// <returns></returns> public async Task<string> ValiateStockID(string stockID) { string name = string.Empty; string dataUrl = this.ParseStockIDs(new List<string> { stockID }); dataUrl = dataUrl.Substring(0, dataUrl.Length - 1); string realInfo = await this.client.GetStringAsync(dataUrl); string[] infos = realInfo.Split('\n'); StockRealInfo stockInfo; foreach (string info in infos) { if (string.IsNullOrEmpty(info)) continue; stockInfo = new StockRealInfo(info); name = stockInfo.Name; } return name; } #endregion }
數據持續化到Hbase代碼示例:工具
public class StockRealWriter { #region StockRealWriter Queue<StockRealInfo> queue = new Queue<StockRealInfo>(); // use multithread write Thread writerThread; bool threadRunning = true; const string HBASESTOCKTBLNAME = "StockRealInfo"; public StockRealWriter() { // Start a thread for writting to HBase Task task = new Task(WriterThreadFunction); task.Start(); } ~StockRealWriter() { threadRunning = false; } #endregion #region WriterThreadFunction /// <summary> /// WriterThreadFunction /// </summary> public void WriterThreadFunction() { while (threadRunning) { if (queue.Count > 0) { lock (queue) { CellSet set = new CellSet(); do { StockRealInfo stock = queue.Dequeue(); this.CreateStockByRealInfos(set, stock); } while (queue.Count > 0); Utils.HBaseClient.StoreCellsAsync(HBASESTOCKTBLNAME, set); } } Thread.Sleep(5000); } } #endregion #region CreateStockByRealInfos /// <summary> /// /// </summary> /// <param name="set"></param> /// <param name="info"></param> private void CreateStockByRealInfos(CellSet set, StockRealInfo info) { string key = string.Format("{0}_{1}_{2}", info.ID, info.Date, info.Time); var row = new CellSet.Row { key = Encoding.UTF8.GetBytes(key) }; var value = new Cell { column = Encoding.UTF8.GetBytes("d:ID"), data = Encoding.UTF8.GetBytes(info.ID) }; row.values.Add(value); value = new Cell { column = Encoding.UTF8.GetBytes("d:Name"), data = Encoding.UTF8.GetBytes(info.Name) }; row.values.Add(value); //今日開盤價 value = new Cell { column = Encoding.UTF8.GetBytes("d:TodayOpen"), data = Encoding.UTF8.GetBytes(info.TodayOpen) }; row.values.Add(value); //昨日收盤價 value = new Cell { column = Encoding.UTF8.GetBytes("d:YesterdayClose"), data = Encoding.UTF8.GetBytes(info.YesterdayClose) }; row.values.Add(value); //當前價格 value = new Cell { column = Encoding.UTF8.GetBytes("d:Current"), data = Encoding.UTF8.GetBytes(info.Current) }; row.values.Add(value); //今日最高價 value = new Cell { column = Encoding.UTF8.GetBytes("d:High"), data = Encoding.UTF8.GetBytes(info.High) }; row.values.Add(value); //今日最低價 value = new Cell { column = Encoding.UTF8.GetBytes("d:Low"), data = Encoding.UTF8.GetBytes(info.Low) }; row.values.Add(value); //竟買價 買1 value = new Cell { column = Encoding.UTF8.GetBytes("d:Buy"), data = Encoding.UTF8.GetBytes(info.Buy) }; row.values.Add(value); //竟賣價 賣1 value = new Cell { column = Encoding.UTF8.GetBytes("d:Sell"), data = Encoding.UTF8.GetBytes(info.Sell) }; row.values.Add(value); // 成交數 單位股數 一般除於100成爲手 value = new Cell { column = Encoding.UTF8.GetBytes("d:VolAmount"), data = Encoding.UTF8.GetBytes(info.VolAmount) }; row.values.Add(value); // 成交多少錢,單位元 value = new Cell { column = Encoding.UTF8.GetBytes("d:VolMoney"), data = Encoding.UTF8.GetBytes(info.VolMoney) }; row.values.Add(value); // 日期 value = new Cell { column = Encoding.UTF8.GetBytes("d:Date"), data = Encoding.UTF8.GetBytes(info.Date) }; row.values.Add(value); // 時間 value = new Cell { column = Encoding.UTF8.GetBytes("d:Time"), data = Encoding.UTF8.GetBytes(info.Time) }; row.values.Add(value); // 差額 value = new Cell { column = Encoding.UTF8.GetBytes("d:Diff"), data = Encoding.UTF8.GetBytes(info.Diff) }; row.values.Add(value); // 百分比 value = new Cell { column = Encoding.UTF8.GetBytes("d:DiffPrec"), data = Encoding.UTF8.GetBytes(info.DiffPrec) }; row.values.Add(value); DataRow buyInfo; for(int i=0;i<5;i++) { buyInfo = info.BuyList.Rows[i]; value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Price0{0}",i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(buyInfo["Price"])) }; row.values.Add(value); value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Amount0{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(buyInfo["Amount"])) }; row.values.Add(value); } DataRow sellInfo; for (int i = 0; i < 5; i++) { sellInfo = info.SellList.Rows[i]; value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Price1{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(sellInfo["Price"])) }; row.values.Add(value); value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Amount1{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(sellInfo["Amount"])) }; row.values.Add(value); } set.rows.Add(row); } #endregion #region WriteStock /// <summary> /// /// </summary> /// <param name="stockInfo"></param> public void WriteStock(List<StockRealInfo> stockInfos) { lock (queue) { foreach(var stockInfo in stockInfos) { queue.Enqueue(stockInfo); } } } #endregion }
從HBase讀取數據代碼:post
public class StockRealReader { #region StockRealReader const string HBASESTOCKTBLNAME = "StockRealInfo"; public StockRealReader() { } #endregion #region QueryStockRealAsync public async Task<List<StockRealInfo>> QueryStockRealAsync(Scanner query) { List<StockRealInfo> list = new List<StockRealInfo>(); ScannerInformation info = await Utils.HBaseClient.CreateScannerAsync(HBASESTOCKTBLNAME, query); CellSet next; while ((next = await Utils.HBaseClient.ScannerGetNextAsync(info)) != null) { StockRealInfo realInfo; foreach (CellSet.Row row in next.rows) { realInfo = new StockRealInfo(); //開盤價 var temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:TodayOpen"); realInfo.TodayOpen = Encoding.UTF8.GetString(temp.data); //昨日收盤價 temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:YesterdayClose"); realInfo.YesterdayClose = Encoding.UTF8.GetString(temp.data); //當前價格 temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Current"); realInfo.Current = Encoding.UTF8.GetString(temp.data); //今日最高價 temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:High"); realInfo.High = Encoding.UTF8.GetString(temp.data); //今日最低價 temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Low"); realInfo.Low = Encoding.UTF8.GetString(temp.data); //竟買價 買1 temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Buy"); realInfo.Buy = Encoding.UTF8.GetString(temp.data); //竟賣價 賣1 temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Sell"); realInfo.Sell = Encoding.UTF8.GetString(temp.data); //成交數 單位股數 一般除於100成爲手 temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:VolAmount"); realInfo.VolAmount = Encoding.UTF8.GetString(temp.data); //成交多少錢,單位元 temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:VolMoney"); realInfo.VolMoney = Encoding.UTF8.GetString(temp.data); //日期 temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Date"); realInfo.Date = Encoding.UTF8.GetString(temp.data); //時間 temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Time"); realInfo.Time = Encoding.UTF8.GetString(temp.data); //差額 temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Diff"); realInfo.Diff = Encoding.UTF8.GetString(temp.data); //百分比 temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:DiffPrec"); realInfo.DiffPrec = Encoding.UTF8.GetString(temp.data); list.Add(realInfo); } } return list; } #endregion }