Java讀取Level-1行情dbf文件極致優化(3)

最近架構一個項目,實現行情的接入和分發,須要達到極致的低時延特性,這對於證券系統是很是重要的。接入的行情源是能夠配置,既能夠是Level-1,也能夠是Level-2或其餘第三方的源。雖然Level-1行情沒有Level-2快,可是做爲系統支持的行情源,咱們仍是須要優化它,使得從文件讀取,到用戶經過socket收到行情,端到端的時延儘量的低。本文主要介紹對level-1行情dbf文件讀取的極致優化方案。相信對其餘的dbf文件讀取應該也有借鑑意義。 html

Level-1行情是由行情小站,定時每隔幾秒把dbf文件(上海是show2003.dbf,深圳是sjshq.dbf)更新一遍,用新的行情替換掉舊的。咱們的目標就是,在新文件完成更新後,在最短期內將文件讀取到內存,把每一行轉化爲對象,把每一個列轉化爲對應的數據類型。 java

咱們一共採用了6種優化方式。 算法

 

咱們在上文《Java讀取Level-1行情dbf文件極致優化(1)》Java讀取Level-1行情dbf文件極致優化(2)》中,已經介紹了4種優化策略:多線程

優化一:採用內存硬盤(RamDisk)
優化二:採用JNotify,用通知替代輪詢架構

優化三:採用NIO讀取文件
優化四:減小讀取文件時內存反覆分配和GC
socket

優化五:字段讀取優化

行情dbf文件不少字段是價格類型的字段,帶2位或者3位小數,從dbf讀取他們的後,咱們會把它們保存在Long類型或者Int類型,而不是Float或Double類型,好比1.23,轉換爲1230保存。由於Float型或Double型會丟失精度。ide

若是不優化,讀取步驟爲:性能

1,從byte[]對應的偏移中讀取並保存到String中。測試

2,對String作trim操做優化

3,把String轉換爲Float類型

4,把Float類型乘以1000並強轉爲Long類型。

 

不用多說,以上的過程必定是低效的,光前兩步就涉及到2次字符串拷貝,2次對象建立。第三步效率也不高。我這裏經過優化,在DBFReader.java中添加一個get_long_efficiently_and_multiply_1000方法,將4個步驟合併爲一步,經過一次掃描獲得結果。

public long get_long_efficiently_and_multiply_1000(byte[] src, final int index)
    {
        long multiplicand = 3; 
        long result =0;
        Field field = getFields()[index];
        boolean in_decimal_part = false;
        boolean negative = false;
        int offset = field.getOffset();
        int length = field.getLength();
        int end = offset+length;
        for(int i =field.getOffset(); i< end; i++)
        {
            byte ch = src[i];
            
            if(ch>=48 && ch<=57) //若是是數字
            {
                result *= 10;
                result += ch-48;
                if(in_decimal_part)
                    multiplicand--;
                if(multiplicand==0) break;
                continue;
            }
            
            if(ch==32) //若是是空格
                continue;
            
            if(ch == 46) //若是是小數點
            {
                in_decimal_part = true;
                continue;
            }
            
            if(ch == '-') //若是是負號
            {
                negative = true;
            }
            
            throw new NumberFormatException();
            
        }
        
        if(multiplicand == 3)
            result *= 1000;
        else if (multiplicand == 2)
            result *=100;
        else if (multiplicand == 1)
            result *=10;
        
        if(negative)
        {
            result= 0 - result;
        }
        
        return result;
    }

 

上面的算法負責讀取字段轉換爲數字的同時,對它乘以1000。而且代碼中儘可能優化了執行步驟。

 

對於整形的讀取,咱們也進行了優化,添加一個get_long_efficiently:

public long get_long_efficiently(byte[] src, final int index)
    {
        long result =0;
        boolean negative = false;
        Field field = getFields()[index];
        for(int i =field.getOffset(); i< field.getOffset()+ field.getLength(); i++)
        {
            byte ch = src[i];
            if(ch>=48 && ch<=57) //若是是數字
            {
                result = result*10 + (src[i]-48);
                continue;
            }
            
            if(src[i]==32) //若是是空格
                continue;
            
            if(ch == '-') //若是是負號
            {
                negative = true;
            }
            
            throw new NumberFormatException();
        }
        
        if(negative)
        {
            result= 0 - result;
        }
        
        return result;
    }

 

以上的2個算法並不複雜,但卻很是關鍵,一個dbf文件包含大約5000行,每行包括20~30個Float類型或者Int類型的字段,該優化涉及10萬+個字段的讀取。測試下來,這步改進將讀取速度從50ms-70ms提高至15ms至20ms,細節在魔鬼當中,這是速度提高最快的一項優化。

(優化五的代碼在改進的DBFReader中,上午中已經提供下載,這裏再提供下載連接:download2DBFReader庫 )

 

優化六:線程池並行處理

對5000多個行進行字段讀取並轉換成對象,採用多線程處理是最天然不過的優化方式。

 

通常咱們採用的方法是把任務分紅等份的塊,每一個線程處理一大塊。好比,若是採用5個線程處理,那麼把5000行分紅1000個行一塊,每一個線程處理一塊。這樣看貌似公平,其實否則,由於咱們的操做系統是分時操做系統,每一個線程開始工做的時間,佔用的CPU時間片,和任務的強度都不徹底一致。等分的辦法貌似平均,可是頗有可能致使有些線程完成工做了,另一些還有不少沒作完。

 

這裏介紹一種我喜歡的任務分配方式:每一個線程每次從5000個行的任務中申請一小塊,好比16個行,完成後,再申請16個行。這樣快的線程就會多工做些,慢的就少工做些,直到全部的行處理完畢。那麼,這些線程怎麼協調呢,任務分配豈不是要用到鎖?不用鎖,咱們採用CAS機制就能作到(實際用的是AtomicInteger,AtomicInteger就是基於CAS實現的),這裏不解釋太多了。看代碼:

class ReaderTask implements Runnable {
        Collector collector;
        List<byte[]> recordList;
        CountDownLatch countDownLatch;
        AtomicInteger cursor;
        DBFReader reader;

        public ReaderTask(Collector collector, DBFReader dbfreader, List<byte[]> recordList, AtomicInteger cursor,
                CountDownLatch countDownLatch) {
            this.collector = collector;
            this.reader = dbfreader;
            this.recordList = recordList;
            this.cursor = cursor;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                int length = recordList.size();
                do {
                    final int step = 16; //每次分配16行給該線程處理。
                    int endIndex = cursor.addAndGet(step);
                    int startIndex = endIndex - step ;

                    for (int i = startIndex; i < endIndex && i < length; i++) {
                        byte[] row = recordList.get(i);
                        MarketRealtimeData SHData = new MarketRealtimeData();
                        SHData.setMarketType(Constants.MARKET_SH_STOCK);
                        SHData.setIdNum(reader.get_string_efficiently(row, 0));
                        SHData.setPrefix(reader.get_string_efficiently(row, 1));
                        SHData.setPreClosePrice(reader.get_long_efficiently_and_multiply_1000(row, 2));
                        SHData.setOpenPrice(reader.get_long_efficiently_and_multiply_1000(row, 3));
                        SHData.setTurnover(reader.get_long_efficiently_and_multiply_1000(row, 4));
                        SHData.setHighPrice(reader.get_long_efficiently_and_multiply_1000(row, 5));
                        SHData.setLowPrice(reader.get_long_efficiently_and_multiply_1000(row, 6));
                        SHData.setMatchPrice(reader.get_long_efficiently_and_multiply_1000(row, 7));
                        //讀取全部的Field,如下省略若干行
                        //... ...
                        //... ...

                        if (collector != null) {
                            collector.collect(SHData);
                        }
                    }
                } while (cursor.get() < length);
            } finally {
                if (countDownLatch != null)
                    countDownLatch.countDown();
            }
        }
    }

 

private void readHangqingFile(String path, String name) throws Exception {
            // Long t1 = System.nanoTime();
            DBFReader dbfreader_SH = null;
            try {
                dbfreader_SH = new DBFReader(new File(path+File.separator + name));
                List<byte[]> list_sh = dbfreader_SH.recordsWithOutDel_efficiently(cacheManager);
                
                AtomicInteger cursor = new AtomicInteger(0); //原子變量,用於線程間分配任務
                CountDownLatch countDownLatch = new CountDownLatch(WORK_THREAD_COUNT);

                for (int i = 0; i < WORK_THREAD_COUNT - 1; i++) { //把任務分配給線程池多個線程
                    ReaderTask task = new ReaderTask(collector, dbfreader_SH, list_sh, cursor, countDownLatch);
                    globalExecutor.execute(task);
                }
                new ReaderTask(collector, dbfreader_SH, list_sh, cursor, countDownLatch).run(); //當前線程本身也做爲工做線程
                countDownLatch.await();
                //Long t2 = System.nanoTime();
                //System.out.println("speed time on read and object:" + (t2 - t1));

            } finally {
                if (dbfreader_SH != null)
                    dbfreader_SH.close();
            }
        }

 

測試代表,在使用4個線程並行處理的狀況下,處理時間從15ms-20ms縮短至4ms-7ms。

 

在使用本文章介紹的全部優化方法,整個讀取效率從耗時300ms以上,優化至5ms-10ms之間。咱們討論的是從文件更新始,到完成文件讀取,完成5000多個對象,100,000個字段的轉換的總耗時。

 

若是繼續深刻,咱們可能還有很多細節能夠改進。測試代表,時延的穩定性還不夠好,極可能是因爲GC形成的,咱們還能夠從減小對象的建立,以減小性能損耗,減小GC;而且控制GC執行的時間,讓GC在空閒時執行等方面優化。

 

Binhua Liu原創文章,轉載請註明原地址http://www.cnblogs.com/Binhua-Liu/p/5616761.html

相關文章
相關標籤/搜索