HBase(二): c#訪問HBase之股票行情Demo

     上一章完成了c#訪問hbase的sdk封裝,接下來以一個具體Demo對sdk進行測試驗證。場景:每5秒抓取指定股票列表的實時價格波動行情,數據下載後,一方面實時刷新UI界面,另外一方面將數據放入到在內存中模擬的MQ (實際生產狀況,可用kafka等集羣代替)->存入HBase數據庫。提供按指定時間範圍股票價格數據查詢。html

目錄:shell

  • 示例說明
  • 示例效果圖
  • rest server運行狀態檢查
  • 獲取股票實時數據代碼
  • 數據持續化至Hbase代碼
  • 從HBase讀取數據代碼

示例說明:數據庫

  • 在Hbase 中建立兩個表,分別爲:
  1. StocksInfo (股票信息表,用來存儲設置的股票代碼、股票名稱)
  2. StockRealInfo (股票實時行情數據,包含開盤價、當前價、最高價、最低價、五檔競買、賣單價和數量、成交單價、數量、漲跌幅等)
  • 每5秒鐘抓取StocksInfo表中全部股票的數據,自動更新UI,持續化到HBase;支持增長、刪除要監控的股票列表。
  • 提供按指定時間範圍從hbase中查詢歷史數據

示例效果圖:c#

  • 歷史數據查詢:

 

rest server運行狀態檢查:async

  • 在 HDP2.4安裝(五):集羣及組件安裝 章節,Hbase 主機安裝在 hdp4 192.168.2.21 上,使用xshell 工具鏈接到hbase master(hdp4)
  • 查看8080端口是否正常,也可從 ambari UI 界面查看HBase狀態,如圖:

獲取股票實時數據代碼:ide

  •  好多的網站提供股票實時交易數據的下載,我選擇的是從 hq.sina 下載,注意抓取數據的頻度不要設置的過高,不然你的IP可能會被封掉,代碼以下:
    public class SnatchFormSina 
        {
            #region SnatchFormSina
    
            HttpClient client;
    
            private const string dataurl = "http://hq.sinajs.cn/list={0}";
    
            public SnatchFormSina()
            {
                this.client = new HttpClient();
            }
    
            /// <summary>
            /// 
            /// </summary>
            public static SnatchFormSina Current
            {
                get {
                    return new SnatchFormSina();
                }
            }
    
            #endregion
    
            #region GetCurrentInfos
    
            /// <summary>
            /// 
            /// </summary>
            /// <param name="stockIDs"></param>
            /// <returns></returns>
            public async Task<List<StockRealInfo>> GetCurrentInfosAsync(List<string> stockIDs)
            {
                List<StockRealInfo> list = new List<StockRealInfo>();
                string dataUrl = this.ParseStockIDs(stockIDs);
                dataUrl = dataUrl.Substring(0, dataUrl.Length - 1);
    
                string realInfo = await this.client.GetStringAsync(dataUrl);
                string[] infos = realInfo.Split('\n');
    
                StockRealInfo stockInfo;
                foreach (string info in infos)
                {
                    if (string.IsNullOrEmpty(info))
                        continue;
    
                    stockInfo = new StockRealInfo(info);
                    stockInfo.ID = SimulatorCache.StockAccount[stockInfo.Name];
                    SimulatorCache.StockInfos[stockInfo.ID] = stockInfo;
                    list.Add(stockInfo);
                }
    
                return list;
            }
    
            #endregion
    
            #region ParseStockIDs
    
            /// <summary>
            /// 
            /// </summary>
            /// <param name="stockIDs"></param>
            /// <returns></returns>
            private string ParseStockIDs(List<string> stockIDs)
            {
                StringBuilder sb = new StringBuilder();
                foreach(string id in stockIDs)
                { 
                    if (id.Substring(0, 2) == "60")//上海是600打頭
                    {
                        sb.Append(string.Format("sh{0},", id));
                    }
                    else if (id.Substring(0, 2) == "51")//上海基金
                    {
                        sb.Append(string.Format("sh{0},", id));
                    }
                    else //if (stockIDs.Substring(0, 2) == "00")//深圳
                    {
                        sb.Append(string.Format("sz{0},", id));
                    }
                }
    
                sb[sb.Length - 1].ToString().Replace(",", "");
    
                return string.Format(dataurl, sb.ToString());
            }
    
            #endregion
    
            #region ValiateStockID
    
            /// <summary>
            /// 
            /// </summary>
            /// <param name="stockIDs"></param>
            /// <returns></returns>
            public async Task<string> ValiateStockID(string stockID)
            {
                string name = string.Empty;
                string dataUrl = this.ParseStockIDs(new List<string> { stockID });
                dataUrl = dataUrl.Substring(0, dataUrl.Length - 1);
    
                string realInfo = await this.client.GetStringAsync(dataUrl);
                string[] infos = realInfo.Split('\n');
    
                StockRealInfo stockInfo;
                foreach (string info in infos)
                {
                    if (string.IsNullOrEmpty(info))
                        continue;
    
                    stockInfo = new StockRealInfo(info);
                    name = stockInfo.Name;
                }
    
                return name;
            }
    
            #endregion
        }
    View Code

數據持續化到Hbase代碼示例:工具

  • 代碼中Utils.HBaseClient 是在一個工具類裏面建立一個HBaseClient實例
    public class StockRealWriter
        {
            #region StockRealWriter
    
            Queue<StockRealInfo> queue = new Queue<StockRealInfo>();
    
            // use multithread write
            Thread writerThread;
            bool threadRunning = true;
    
            const string HBASESTOCKTBLNAME = "StockRealInfo";
    
            public StockRealWriter()
            {
                // Start a thread for writting to HBase
                Task task = new Task(WriterThreadFunction);
                task.Start();
            }
    
            ~StockRealWriter()
            {
                threadRunning = false;
            }
    
            #endregion
    
            #region WriterThreadFunction
    
            /// <summary>
            /// WriterThreadFunction
            /// </summary>
            public void WriterThreadFunction()
            {
                while (threadRunning)
                {
                    if (queue.Count > 0)
                    {
                        lock (queue)
                        {
                            CellSet set = new CellSet();
                            do
                            {
                                StockRealInfo stock = queue.Dequeue();
                                this.CreateStockByRealInfos(set, stock);
                            } while (queue.Count > 0);
    
                            Utils.HBaseClient.StoreCellsAsync(HBASESTOCKTBLNAME, set);
                        }
                    }
    
                    Thread.Sleep(5000);
                }
            }
    
            #endregion
    
            #region CreateStockByRealInfos
    
    
            /// <summary>
            /// 
            /// </summary>
            /// <param name="set"></param>
            /// <param name="info"></param>
            private void CreateStockByRealInfos(CellSet set, StockRealInfo info)
            {
                string key = string.Format("{0}_{1}_{2}", info.ID, info.Date, info.Time);
                var row = new CellSet.Row { key = Encoding.UTF8.GetBytes(key) };
                
                var value = new Cell { column = Encoding.UTF8.GetBytes("d:ID"), data = Encoding.UTF8.GetBytes(info.ID) };
                row.values.Add(value);
    
                value = new Cell { column = Encoding.UTF8.GetBytes("d:Name"), data = Encoding.UTF8.GetBytes(info.Name) };
                row.values.Add(value);
    
                //今日開盤價
                value = new Cell { column = Encoding.UTF8.GetBytes("d:TodayOpen"), data = Encoding.UTF8.GetBytes(info.TodayOpen) };
                row.values.Add(value);
    
                //昨日收盤價
                value = new Cell { column = Encoding.UTF8.GetBytes("d:YesterdayClose"), data = Encoding.UTF8.GetBytes(info.YesterdayClose) };
                row.values.Add(value);
    
                //當前價格
                value = new Cell { column = Encoding.UTF8.GetBytes("d:Current"), data = Encoding.UTF8.GetBytes(info.Current) };
                row.values.Add(value);
    
                //今日最高價
                value = new Cell { column = Encoding.UTF8.GetBytes("d:High"), data = Encoding.UTF8.GetBytes(info.High) };
                row.values.Add(value);
    
                //今日最低價
                value = new Cell { column = Encoding.UTF8.GetBytes("d:Low"), data = Encoding.UTF8.GetBytes(info.Low) };
                row.values.Add(value);
    
                //竟買價 買1
                value = new Cell { column = Encoding.UTF8.GetBytes("d:Buy"), data = Encoding.UTF8.GetBytes(info.Buy) };
                row.values.Add(value);
    
                //竟賣價 賣1
                value = new Cell { column = Encoding.UTF8.GetBytes("d:Sell"), data = Encoding.UTF8.GetBytes(info.Sell) };
                row.values.Add(value);
    
                // 成交數 單位股數 一般除於100成爲手
                value = new Cell { column = Encoding.UTF8.GetBytes("d:VolAmount"), data = Encoding.UTF8.GetBytes(info.VolAmount) };
                row.values.Add(value);
    
                //  成交多少錢,單位元
                value = new Cell { column = Encoding.UTF8.GetBytes("d:VolMoney"), data = Encoding.UTF8.GetBytes(info.VolMoney) };
                row.values.Add(value);
    
                //  日期
                value = new Cell { column = Encoding.UTF8.GetBytes("d:Date"), data = Encoding.UTF8.GetBytes(info.Date) };
                row.values.Add(value);
    
                //  時間 
                value = new Cell { column = Encoding.UTF8.GetBytes("d:Time"), data = Encoding.UTF8.GetBytes(info.Time) };
                row.values.Add(value);
    
                //  差額
                value = new Cell { column = Encoding.UTF8.GetBytes("d:Diff"), data = Encoding.UTF8.GetBytes(info.Diff) };
                row.values.Add(value);
    
                //  百分比
                value = new Cell { column = Encoding.UTF8.GetBytes("d:DiffPrec"), data = Encoding.UTF8.GetBytes(info.DiffPrec) };
                row.values.Add(value);
    
                DataRow buyInfo;
                for(int i=0;i<5;i++)
                {
                    buyInfo = info.BuyList.Rows[i];
                    
                    value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Price0{0}",i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(buyInfo["Price"])) };
                    row.values.Add(value);
    
                    value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Amount0{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(buyInfo["Amount"])) };
                    row.values.Add(value);
                }
    
                DataRow sellInfo;
                for (int i = 0; i < 5; i++)
                {
                    sellInfo = info.SellList.Rows[i];
    
                    value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Price1{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(sellInfo["Price"])) };
                    row.values.Add(value);
    
                    value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Amount1{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(sellInfo["Amount"])) };
                    row.values.Add(value);
                }
                
                set.rows.Add(row);
            }
    
            #endregion
    
            #region WriteStock
    
            /// <summary>
            /// 
            /// </summary>
            /// <param name="stockInfo"></param>
            public void WriteStock(List<StockRealInfo> stockInfos)
            {
                lock (queue)
                {
                    foreach(var stockInfo in stockInfos)
                    { 
                      queue.Enqueue(stockInfo);
                    }
                }
            }
    
            #endregion
        }
    View Code

從HBase讀取數據代碼:post

  • 代碼中 Scanner 參數是指設置的查詢範圍 (設置StartRow、EndRow、Batch等參數)
    public class StockRealReader
        {
            #region StockRealReader
    
            const string HBASESTOCKTBLNAME = "StockRealInfo";
    
            public StockRealReader()
            {
    
            }
    
            #endregion
    
            #region QueryStockRealAsync
    
            public async Task<List<StockRealInfo>> QueryStockRealAsync(Scanner query)
            {
                List<StockRealInfo> list = new List<StockRealInfo>();
                
                ScannerInformation info = await Utils.HBaseClient.CreateScannerAsync(HBASESTOCKTBLNAME, query);
    
                CellSet next;
                while ((next = await Utils.HBaseClient.ScannerGetNextAsync(info)) != null)
                {
                    StockRealInfo realInfo;
                    foreach (CellSet.Row row in next.rows)
                    {
                        realInfo = new StockRealInfo();
    
                        //開盤價
                        var temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:TodayOpen");
                        realInfo.TodayOpen = Encoding.UTF8.GetString(temp.data);
    
                        //昨日收盤價
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:YesterdayClose");
                        realInfo.YesterdayClose = Encoding.UTF8.GetString(temp.data);
    
                        //當前價格
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Current");
                        realInfo.Current = Encoding.UTF8.GetString(temp.data);
    
                        //今日最高價
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:High");
                        realInfo.High = Encoding.UTF8.GetString(temp.data);
    
                        //今日最低價
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Low");
                        realInfo.Low = Encoding.UTF8.GetString(temp.data);
    
                        //竟買價 買1
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Buy");
                        realInfo.Buy = Encoding.UTF8.GetString(temp.data);
    
                        //竟賣價 賣1
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Sell");
                        realInfo.Sell = Encoding.UTF8.GetString(temp.data);
    
                        //成交數 單位股數 一般除於100成爲手
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:VolAmount");
                        realInfo.VolAmount = Encoding.UTF8.GetString(temp.data);
    
                        //成交多少錢,單位元
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:VolMoney");
                        realInfo.VolMoney = Encoding.UTF8.GetString(temp.data);
    
                        //日期
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Date");
                        realInfo.Date = Encoding.UTF8.GetString(temp.data);
    
                        //時間
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Time");
                        realInfo.Time = Encoding.UTF8.GetString(temp.data);
    
                        //差額
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Diff");
                        realInfo.Diff = Encoding.UTF8.GetString(temp.data);
    
                        //百分比
                        temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:DiffPrec");
                        realInfo.DiffPrec = Encoding.UTF8.GetString(temp.data);
    
                        list.Add(realInfo);
                    
                    }
                }
    
                return list;
            }
    
            #endregion
        }
    View Code
相關文章
相關標籤/搜索