最近架構一個項目,實現行情的接入和分發,須要達到極致的低時延特性,這對於證券系統是很是重要的。接入的行情源是能夠配置,既能夠是Level-1,也能夠是Level-2或其餘第三方的源。雖然Level-1行情沒有Level-2快,可是做爲系統支持的行情源,咱們仍是須要優化它,使得從文件讀取,到用戶經過socket收到行情,端到端的時延儘量的低。本文主要介紹對level-1行情dbf文件讀取的極致優化方案。相信對其餘的dbf文件讀取應該也有借鑑意義。 html
Level-1行情是由行情小站,定時每隔幾秒把dbf文件(上海是show2003.dbf,深圳是sjshq.dbf)更新一遍,用新的行情替換掉舊的。咱們的目標就是,在新文件完成更新後,在最短期內將文件讀取到內存,把每一行轉化爲對象,把每一個列轉化爲對應的數據類型。 java
咱們一共採用了6種優化方式。 算法
咱們在上文《Java讀取Level-1行情dbf文件極致優化(1)》 《Java讀取Level-1行情dbf文件極致優化(2)》中,已經介紹了4種優化策略:多線程
優化一:採用內存硬盤(RamDisk)
優化二:採用JNotify,用通知替代輪詢架構
優化三:採用NIO讀取文件
優化四:減小讀取文件時內存反覆分配和GC
socket
行情dbf文件不少字段是價格類型的字段,帶2位或者3位小數,從dbf讀取他們的後,咱們會把它們保存在Long類型或者Int類型,而不是Float或Double類型,好比1.23,轉換爲1230保存。由於Float型或Double型會丟失精度。ide
若是不優化,讀取步驟爲:性能
1,從byte[]對應的偏移中讀取並保存到String中。測試
2,對String作trim操做優化
3,把String轉換爲Float類型
4,把Float類型乘以1000並強轉爲Long類型。
不用多說,以上的過程必定是低效的,光前兩步就涉及到2次字符串拷貝,2次對象建立。第三步效率也不高。我這裏經過優化,在DBFReader.java中添加一個get_long_efficiently_and_multiply_1000方法,將4個步驟合併爲一步,經過一次掃描獲得結果。
public long get_long_efficiently_and_multiply_1000(byte[] src, final int index) { long multiplicand = 3; long result =0; Field field = getFields()[index]; boolean in_decimal_part = false; boolean negative = false; int offset = field.getOffset(); int length = field.getLength(); int end = offset+length; for(int i =field.getOffset(); i< end; i++) { byte ch = src[i]; if(ch>=48 && ch<=57) //若是是數字 { result *= 10; result += ch-48; if(in_decimal_part) multiplicand--; if(multiplicand==0) break; continue; } if(ch==32) //若是是空格 continue; if(ch == 46) //若是是小數點 { in_decimal_part = true; continue; } if(ch == '-') //若是是負號 { negative = true; } throw new NumberFormatException(); } if(multiplicand == 3) result *= 1000; else if (multiplicand == 2) result *=100; else if (multiplicand == 1) result *=10; if(negative) { result= 0 - result; } return result; }
上面的算法負責讀取字段轉換爲數字的同時,對它乘以1000。而且代碼中儘可能優化了執行步驟。
對於整形的讀取,咱們也進行了優化,添加一個get_long_efficiently:
public long get_long_efficiently(byte[] src, final int index) { long result =0; boolean negative = false; Field field = getFields()[index]; for(int i =field.getOffset(); i< field.getOffset()+ field.getLength(); i++) { byte ch = src[i]; if(ch>=48 && ch<=57) //若是是數字 { result = result*10 + (src[i]-48); continue; } if(src[i]==32) //若是是空格 continue; if(ch == '-') //若是是負號 { negative = true; } throw new NumberFormatException(); } if(negative) { result= 0 - result; } return result; }
以上的2個算法並不複雜,但卻很是關鍵,一個dbf文件包含大約5000行,每行包括20~30個Float類型或者Int類型的字段,該優化涉及10萬+個字段的讀取。測試下來,這步改進將讀取速度從50ms-70ms提高至15ms至20ms,細節在魔鬼當中,這是速度提高最快的一項優化。
(優化五的代碼在改進的DBFReader中,上午中已經提供下載,這裏再提供下載連接:DBFReader庫 )
對5000多個行進行字段讀取並轉換成對象,採用多線程處理是最天然不過的優化方式。
通常咱們採用的方法是把任務分紅等份的塊,每一個線程處理一大塊。好比,若是採用5個線程處理,那麼把5000行分紅1000個行一塊,每一個線程處理一塊。這樣看貌似公平,其實否則,由於咱們的操做系統是分時操做系統,每一個線程開始工做的時間,佔用的CPU時間片,和任務的強度都不徹底一致。等分的辦法貌似平均,可是頗有可能致使有些線程完成工做了,另一些還有不少沒作完。
這裏介紹一種我喜歡的任務分配方式:每一個線程每次從5000個行的任務中申請一小塊,好比16個行,完成後,再申請16個行。這樣快的線程就會多工做些,慢的就少工做些,直到全部的行處理完畢。那麼,這些線程怎麼協調呢,任務分配豈不是要用到鎖?不用鎖,咱們採用CAS機制就能作到(實際用的是AtomicInteger,AtomicInteger就是基於CAS實現的),這裏不解釋太多了。看代碼:
class ReaderTask implements Runnable { Collector collector; List<byte[]> recordList; CountDownLatch countDownLatch; AtomicInteger cursor; DBFReader reader; public ReaderTask(Collector collector, DBFReader dbfreader, List<byte[]> recordList, AtomicInteger cursor, CountDownLatch countDownLatch) { this.collector = collector; this.reader = dbfreader; this.recordList = recordList; this.cursor = cursor; this.countDownLatch = countDownLatch; } @Override public void run() { try { int length = recordList.size(); do { final int step = 16; //每次分配16行給該線程處理。 int endIndex = cursor.addAndGet(step); int startIndex = endIndex - step ; for (int i = startIndex; i < endIndex && i < length; i++) { byte[] row = recordList.get(i); MarketRealtimeData SHData = new MarketRealtimeData(); SHData.setMarketType(Constants.MARKET_SH_STOCK); SHData.setIdNum(reader.get_string_efficiently(row, 0)); SHData.setPrefix(reader.get_string_efficiently(row, 1)); SHData.setPreClosePrice(reader.get_long_efficiently_and_multiply_1000(row, 2)); SHData.setOpenPrice(reader.get_long_efficiently_and_multiply_1000(row, 3)); SHData.setTurnover(reader.get_long_efficiently_and_multiply_1000(row, 4)); SHData.setHighPrice(reader.get_long_efficiently_and_multiply_1000(row, 5)); SHData.setLowPrice(reader.get_long_efficiently_and_multiply_1000(row, 6)); SHData.setMatchPrice(reader.get_long_efficiently_and_multiply_1000(row, 7)); //讀取全部的Field,如下省略若干行 //... ... //... ... if (collector != null) { collector.collect(SHData); } } } while (cursor.get() < length); } finally { if (countDownLatch != null) countDownLatch.countDown(); } } }
private void readHangqingFile(String path, String name) throws Exception { // Long t1 = System.nanoTime(); DBFReader dbfreader_SH = null; try { dbfreader_SH = new DBFReader(new File(path+File.separator + name)); List<byte[]> list_sh = dbfreader_SH.recordsWithOutDel_efficiently(cacheManager); AtomicInteger cursor = new AtomicInteger(0); //原子變量,用於線程間分配任務 CountDownLatch countDownLatch = new CountDownLatch(WORK_THREAD_COUNT); for (int i = 0; i < WORK_THREAD_COUNT - 1; i++) { //把任務分配給線程池多個線程 ReaderTask task = new ReaderTask(collector, dbfreader_SH, list_sh, cursor, countDownLatch); globalExecutor.execute(task); } new ReaderTask(collector, dbfreader_SH, list_sh, cursor, countDownLatch).run(); //當前線程本身也做爲工做線程 countDownLatch.await(); //Long t2 = System.nanoTime(); //System.out.println("speed time on read and object:" + (t2 - t1)); } finally { if (dbfreader_SH != null) dbfreader_SH.close(); } }
測試代表,在使用4個線程並行處理的狀況下,處理時間從15ms-20ms縮短至4ms-7ms。
在使用本文章介紹的全部優化方法,整個讀取效率從耗時300ms以上,優化至5ms-10ms之間。咱們討論的是從文件更新始,到完成文件讀取,完成5000多個對象,100,000個字段的轉換的總耗時。
若是繼續深刻,咱們可能還有很多細節能夠改進。測試代表,時延的穩定性還不夠好,極可能是因爲GC形成的,咱們還能夠從減小對象的建立,以減小性能損耗,減小GC;而且控制GC執行的時間,讓GC在空閒時執行等方面優化。
Binhua Liu原創文章,轉載請註明原地址http://www.cnblogs.com/Binhua-Liu/p/5616761.html