最近搞一個高併發的服務中心,須要把數據寫入到MySql中,結果測試發現最大TPS才4K,通過討論後決定先把接收到的數據寫到本地,而後經過同步線程再同步到MySql。java
最初本地存儲選用的SqlLite,結果測試發現SqlLite支持併發有問題;又選型BerkeleyDB,通過測試發現BerkeleyDB知足需求。併發
BerkeleyDB測試代碼以下:ide
注:代碼還有改造的地方,如initCheck方法去掉同步,改成初始化爲同步,請在項目中自行修改
高併發
package test.berkelyDb; import java.io.File; import java.util.Date; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.msgpack.MessagePack; import com.sleepycat.je.Cursor; import com.sleepycat.je.CursorConfig; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; public class TestBerkely { public Environment env; public Database db; private static final String dbName = "jsf"; public synchronized void initAndCheck() throws Exception { if (env != null && env.isValid()) { return; } EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(true); envConfig.setCacheSize(10*1024 * 1024); try { env = new Environment(new File("e:\\test"), envConfig); } catch (Exception e) { e.printStackTrace(); } } public void open() { if(db != null){ return; } DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setSortedDuplicates(true); dbConfig.setAllowCreate(true); try { db = env.openDatabase(null, dbName, dbConfig); } catch (Exception e) { e.printStackTrace(); } } public void close() { if (db != null) { try { db.close(); } catch (Exception e) { e.printStackTrace(); } } if (env != null) { try { env.close(); } catch (Exception e) { e.printStackTrace(); } } } public Object get(String key) throws Exception { DatabaseEntry queryKey = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); queryKey.setData(key.getBytes("UTF-8")); OperationStatus status = db.get(null, queryKey, value, LockMode.DEFAULT); if (status == OperationStatus.SUCCESS) { return new String(value.getData()); } return null; } public boolean put(String key, byte values[]) throws Exception { byte[] theKey = key.getBytes("UTF-8"); OperationStatus status = db.put(null, new DatabaseEntry(theKey), new DatabaseEntry(values)); if (status == OperationStatus.SUCCESS) { return true; } return false; } public boolean del(String key) throws Exception{ byte[] theKey = key.getBytes("UTF-8"); OperationStatus status = db.delete(null, new DatabaseEntry(theKey)); if(status == OperationStatus.SUCCESS) { return true; } return false; } public static void main(String[] args) throws Exception { final long len = 10000000; final TestBerkely tb = new TestBerkely(); final AtomicInteger counter = new AtomicInteger(1); tb.initAndCheck(); tb.open(); Timer timer = new Timer(); Client client = new Client(); client.setAlias("saf@0.0.1"); client.setAppPath("E:\\workspace\\MyProject\\bin"); client.setCreateTime(new Date()); client.setId(100000); client.setInsKey(TestBerkely.class.getCanonicalName() + "::saf@0.0.1"); client.setInterfaceId(10092389); client.setIp("192.168.229.39"); client.setPid(2398); client.setProtocol(1); client.setSafVer(120); client.setSrcType(1); client.setStartTime(System.currentTimeMillis()); client.setStatus(1); client.setUniqKey("uniqKey"); client.setUpdateTime(new Date()); client.setUrlDesc("89uf92438yq29384yf"); MessagePack mp = new MessagePack(); mp.register(Client.class); final byte data[] = mp.write(client); ExecutorService exePool = Executors.newFixedThreadPool(10); final long start = System.currentTimeMillis(); //統計TPS線程 timer.schedule(new TimerTask() { @Override public void run() { long end = System.currentTimeMillis(); long time = (end-start)/1000; if(time == 0){ return; } int current = counter.get(); System.out.println("***********----------------->" + (current*100/time/100f)); } }, 1000, 2000); for(int i=0; i<10; i++){ exePool.execute(new Runnable() { @Override public void run() { int num = counter.getAndIncrement(); String key = "key" + num; try { while(true){ tb.put(key, data); if(counter.get() < len) { num = counter.getAndIncrement(); key = "key" + num; continue; } break; } } catch (Exception e) { e.printStackTrace(); } } }); } exePool.shutdown(); try { while(!exePool.awaitTermination(1, TimeUnit.SECONDS)){ } } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); long tps = len/((end - start)/1000); System.out.println("tps------------------>" + tps); timer.cancel(); tb.env.sync(); tb.env.cleanLog(); counter.set(0); int errorNum = 0; //檢查寫入數據 while(counter.get() < 100000){ if(tb.get("key" + counter.getAndIncrement()) == null){ errorNum++; } } System.out.println("error data is ----------->" + errorNum); tb.del("key" + 5000); tb.env.sync(); tb.close(); } }