最近架構一個項目,實現行情的接入和分發,需要達到極致的低時延特性,這對於證券系統是非常重要的。接入的行情源是可以配置,既可以是Level-1,也可以是Level-2或其他第三方的源。雖然Level-1行情沒有Level-2快,但是作為系統支持的行情源,我們還是需要優化它,使得從文件讀取,到用戶通過socket收到行情,端到端的時延盡可能的低。本文主要介紹對level-1行情dbf文件讀取的極致優化方案。相信對其他的dbf文件讀取應該也有借鑒意義。
Level-1行情是由行情小站,定時每隔幾秒把dbf文件(上海是show2003.dbf,深圳是sjshq.dbf)更新一遍,用新的行情替換掉舊的。我們的目標就是,在新文件完成更新後,在最短時間內將文件讀取到內存,把每一行轉化為對象,把每個列轉化為對應的數據類型。
我們一共采用了6種優化方式。
我們在上文《Java讀取Level-1行情dbf文件極致優化(1)》 《Java讀取Level-1行情dbf文件極致優化(2)》中,已經介紹了4種優化策略:
優化一:采用內存硬盤(RamDisk)
優化二:采用JNotify,用通知替代輪詢
優化三:采用NIO讀取文件
優化四:減少讀取文件時內存反復分配和GC
行情dbf文件很多字段是價格類型的字段,帶2位或者3位小數,從dbf讀取他們的後,我們會把它們保存在Long類型或者Int類型,而不是Float或Double類型,比如1.23,轉換為1230保存。因為Float型或Double型會丟失精度。
如果不優化,讀取步驟為:
1,從byte[]對應的偏移中讀取並保存到String中。
2,對String做trim操作
3,把String轉換為Float類型
4,把Float類型乘以1000並強轉為Long類型。
不用多說,以上的過程一定是低效的,光前兩步就涉及到2次字符串拷貝,2次對象創建。第三步效率也不高。我這裡通過優化,在DBFReader.java中添加一個get_long_efficiently_and_multiply_1000方法,將4個步驟合並為一步,通過一次掃描得到結果。
public long get_long_efficiently_and_multiply_1000(byte[] src, final int index) { long multiplicand = 3; long result =0; Field field = getFields()[index]; boolean in_decimal_part = false; boolean negative = false; int offset = field.getOffset(); int length = field.getLength(); int end = offset+length; for(int i =field.getOffset(); i< end; i++) { byte ch = src[i]; if(ch>=48 && ch<=57) //如果是數字 { result *= 10; result += ch-48; if(in_decimal_part) multiplicand--; if(multiplicand==0) break; continue; } if(ch==32) //如果是空格 continue; if(ch == 46) //如果是小數點 { in_decimal_part = true; continue; } if(ch == '-') //如果是負號 { negative = true; } throw new NumberFormatException(); } if(multiplicand == 3) result *= 1000; else if (multiplicand == 2) result *=100; else if (multiplicand == 1) result *=10; if(negative) { result= 0 - result; } return result; }
上面的算法負責讀取字段轉換為數字的同時,對它乘以1000。並且代碼中盡量優化了執行步驟。
對於整形的讀取,我們也進行了優化,添加一個get_long_efficiently:
public long get_long_efficiently(byte[] src, final int index) { long result =0; boolean negative = false; Field field = getFields()[index]; for(int i =field.getOffset(); i< field.getOffset()+ field.getLength(); i++) { byte ch = src[i]; if(ch>=48 && ch<=57) //如果是數字 { result = result*10 + (src[i]-48); continue; } if(src[i]==32) //如果是空格 continue; if(ch == '-') //如果是負號 { negative = true; } throw new NumberFormatException(); } if(negative) { result= 0 - result; } return result; }
以上的2個算法並不復雜,但卻非常關鍵,一個dbf文件包含大約5000行,每行包括20~30個Float類型或者Int類型的字段,該優化涉及10萬+個字段的讀取。測試下來,這步改進將讀取速度從50ms-70ms提升至15ms至20ms,細節在魔鬼當中,這是速度提升最快的一項優化。
(優化五的代碼在改進的DBFReader中,上午中已經提供下載,這裡再提供下載鏈接:DBFReader庫 )
對5000多個行進行字段讀取並轉換成對象,采用多線程處理是最自然不過的優化方式。
一般我們采用的方法是把任務分成等份的塊,每個線程處理一大塊。比如,如果采用5個線程處理,那麼把5000行分成1000個行一塊,每個線程處理一塊。這樣看貌似公平,其實不然,因為我們的操作系統是分時操作系統,每個線程開始工作的時間,占用的CPU時間片,和任務的強度都不完全一致。等分的辦法貌似平均,但是很有可能導致有些線程完成工作了,另外一些還有很多沒做完。
這裡介紹一種我喜歡的任務分配方式:每個線程每次從5000個行的任務中申請一小塊,比如16個行,完成後,再申請16個行。這樣快的線程就會多工作些,慢的就少工作些,直到所有的行處理完畢。那麼,這些線程怎麼協調呢,任務分配豈不是要用到鎖?不用鎖,我們采用CAS機制就能做到(實際用的是AtomicInteger,AtomicInteger就是基於CAS實現的),這裡不解釋太多了。看代碼:
class ReaderTask implements Runnable { Collector collector; List<byte[]> recordList; CountDownLatch countDownLatch; AtomicInteger cursor; DBFReader reader; public ReaderTask(Collector collector, DBFReader dbfreader, List<byte[]> recordList, AtomicInteger cursor, CountDownLatch countDownLatch) { this.collector = collector; this.reader = dbfreader; this.recordList = recordList; this.cursor = cursor; this.countDownLatch = countDownLatch; } @Override public void run() { try { int length = recordList.size(); do { final int step = 16; //每次分配16行給該線程處理。 int endIndex = cursor.addAndGet(step); int startIndex = endIndex - step ; for (int i = startIndex; i < endIndex && i < length; i++) { byte[] row = recordList.get(i); MarketRealtimeData SHData = new MarketRealtimeData(); SHData.setMarketType(Constants.MARKET_SH_STOCK); SHData.setIdNum(reader.get_string_efficiently(row, 0)); SHData.setPrefix(reader.get_string_efficiently(row, 1)); SHData.setPreClosePrice(reader.get_long_efficiently_and_multiply_1000(row, 2)); SHData.setOpenPrice(reader.get_long_efficiently_and_multiply_1000(row, 3)); SHData.setTurnover(reader.get_long_efficiently_and_multiply_1000(row, 4)); SHData.setHighPrice(reader.get_long_efficiently_and_multiply_1000(row, 5)); SHData.setLowPrice(reader.get_long_efficiently_and_multiply_1000(row, 6)); SHData.setMatchPrice(reader.get_long_efficiently_and_multiply_1000(row, 7)); //讀取所有的Field,以下省略若干行 //... ... //... ... if (collector != null) { collector.collect(SHData); } } } while (cursor.get() < length); } finally { if (countDownLatch != null) countDownLatch.countDown(); } } }
private void readHangqingFile(String path, String name) throws Exception { // Long t1 = System.nanoTime(); DBFReader dbfreader_SH = null; try { dbfreader_SH = new DBFReader(new File(path+File.separator + name)); List<byte[]> list_sh = dbfreader_SH.recordsWithOutDel_efficiently(cacheManager); AtomicInteger cursor = new AtomicInteger(0); //原子變量,用於線程間分配任務 CountDownLatch countDownLatch = new CountDownLatch(WORK_THREAD_COUNT); for (int i = 0; i < WORK_THREAD_COUNT - 1; i++) { //把任務分配給線程池多個線程 ReaderTask task = new ReaderTask(collector, dbfreader_SH, list_sh, cursor, countDownLatch); globalExecutor.execute(task); } new ReaderTask(collector, dbfreader_SH, list_sh, cursor, countDownLatch).run(); //當前線程自己也作為工作線程 countDownLatch.await(); //Long t2 = System.nanoTime(); //System.out.println("speed time on read and object:" + (t2 - t1)); } finally { if (dbfreader_SH != null) dbfreader_SH.close(); } }
測試表明,在使用4個線程並行處理的情況下,處理時間從15ms-20ms縮短至4ms-7ms。
在使用本文章介紹的所有優化方法,整個讀取效率從耗時300ms以上,優化至5ms-10ms之間。我們討論的是從文件更新始,到完成文件讀取,完成5000多個對象,100,000個字段的轉換的總耗時。
如果繼續深入,我們可能還有不少細節可以改進。測試表明,時延的穩定性還不夠好,很可能是由於GC造成的,我們還可以從減少對象的創建,以減少性能損耗,減少GC;並且控制GC執行的時間,讓GC在空閒時執行等方面優化。
Binhua Liu原創文章,轉載請注明原地址http://www.cnblogs.com/Binhua-Liu/p/5616761.html