Berkeley DB TPS測試代碼

最近搞一個高併發的服務中心,須要把數據寫入到MySql中,結果測試發現最大TPS才4K,通過討論後決定先把接收到的數據寫到本地,而後經過同步線程再同步到MySql。java

最初本地存儲選用的SqlLite,結果測試發現SqlLite支持併發有問題;又選型BerkeleyDB,通過測試發現BerkeleyDB知足需求。併發

BerkeleyDB測試代碼以下:ide

注:代碼還有改造的地方,如initCheck方法去掉同步,改成初始化爲同步,請在項目中自行修改
高併發

package test.berkelyDb;

import java.io.File;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.msgpack.MessagePack;

import com.sleepycat.je.Cursor;
import com.sleepycat.je.CursorConfig;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;

public class TestBerkely {

    public Environment env;

    public Database db;

    private static final String dbName = "jsf";

    public synchronized void initAndCheck() throws Exception {
        if (env != null && env.isValid()) {
            return;
        }
        EnvironmentConfig envConfig = new EnvironmentConfig();
        envConfig.setAllowCreate(true);
        envConfig.setCacheSize(10*1024 * 1024);
        try {
            env = new Environment(new File("e:\\test"), envConfig);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void open() {
        if(db != null){
            return;
        }
        DatabaseConfig dbConfig = new DatabaseConfig();
        dbConfig.setSortedDuplicates(true);
        dbConfig.setAllowCreate(true);
        try {
            db = env.openDatabase(null, dbName, dbConfig);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void close() {
        if (db != null) {
            try {
                db.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        if (env != null) {
            try {
                env.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public Object get(String key) throws Exception {
        DatabaseEntry queryKey = new DatabaseEntry();
        DatabaseEntry value = new DatabaseEntry();
        queryKey.setData(key.getBytes("UTF-8"));
        OperationStatus status = db.get(null, queryKey, value, LockMode.DEFAULT);
        if (status == OperationStatus.SUCCESS) {
            return new String(value.getData());
        }
        return null;
    }

    public boolean put(String key, byte values[]) throws Exception {
        byte[] theKey = key.getBytes("UTF-8");
        OperationStatus status = db.put(null, new DatabaseEntry(theKey),
                new DatabaseEntry(values));
        if (status == OperationStatus.SUCCESS) {
            return true;
        }
        return false;
    }
    
    public boolean del(String key) throws Exception{
        byte[] theKey = key.getBytes("UTF-8");
           OperationStatus status = db.delete(null, new DatabaseEntry(theKey));
           if(status == OperationStatus.SUCCESS) {
            return true;
           }
           return false;
    }

    public static void main(String[] args) throws Exception {
        final long len = 10000000;
        final TestBerkely tb = new TestBerkely();
        final AtomicInteger counter = new AtomicInteger(1);
        tb.initAndCheck();
        tb.open();
        Timer timer = new Timer();

        Client client = new Client();
        client.setAlias("saf@0.0.1");
        client.setAppPath("E:\\workspace\\MyProject\\bin");
        client.setCreateTime(new Date());
        client.setId(100000);
        client.setInsKey(TestBerkely.class.getCanonicalName() + "::saf@0.0.1");
        client.setInterfaceId(10092389);
        client.setIp("192.168.229.39");
        client.setPid(2398);
        client.setProtocol(1);
        client.setSafVer(120);
        client.setSrcType(1);
        client.setStartTime(System.currentTimeMillis());
        client.setStatus(1);
        client.setUniqKey("uniqKey");
        client.setUpdateTime(new Date());
        client.setUrlDesc("89uf92438yq29384yf");
        
        MessagePack mp = new MessagePack();
        mp.register(Client.class);
        final byte data[] = mp.write(client);
        
        ExecutorService exePool = Executors.newFixedThreadPool(10);


        final long start = System.currentTimeMillis();
        //統計TPS線程
        timer.schedule(new TimerTask() {
            
            @Override
            public void run() {
                long end = System.currentTimeMillis();
                long time = (end-start)/1000;
                if(time == 0){
                    return;
                }
                int current = counter.get();
                System.out.println("***********----------------->" + (current*100/time/100f));
                
            }
        },    1000, 2000);
        for(int i=0; i<10; i++){
            exePool.execute(new Runnable() {
                
                @Override
                public void run() {
                    int num = counter.getAndIncrement();
                    String key = "key" + num;
                    try {
                        while(true){
                            tb.put(key, data);
                            if(counter.get() < len) {
                                num = counter.getAndIncrement();
                                key = "key" + num;
                                continue;
                            }
                            break;
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        
        
        exePool.shutdown();
        try {
            while(!exePool.awaitTermination(1, TimeUnit.SECONDS)){
                
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();
        
        long tps = len/((end - start)/1000);
        System.out.println("tps------------------>" + tps);
        
        timer.cancel();
        
        tb.env.sync();
        tb.env.cleanLog();

        counter.set(0);
        int errorNum = 0;
        //檢查寫入數據
        while(counter.get() < 100000){
            if(tb.get("key" + counter.getAndIncrement()) == null){
                errorNum++;
            }
        }
        System.out.println("error data is ----------->" + errorNum);
        tb.del("key" + 5000);
        tb.env.sync();
        tb.close();
    }

}
相關文章
相關標籤/搜索