vertx實現redis版session共享

如今愈來愈流行微服務架構了,提到微服務架構的話,你們能想到的是spring boot和vertx吧!前者你們聽的比交多些,可是今天我給你們分享的是後者vertx。想要了解更多請閱讀vertx官網http://vertx.io/docs/vertx-we... java

廢話很少說了,直接進主題。今天分享的是vertx web裏的session共享問題。在公司我用vertx開發了一個web平臺,可是須要防止宕機沒法繼續提供服務這種狀況,因此部署了兩臺機器,這裏就開始涉及到了session共享了。爲了性能考慮,我就想把session放入redis裏來達到目的,但是在vertx官網沒有這種實現,當時我就用Hazelcast(網友說,性能不怎麼好)將就先用着。前幾天我抽時間看了底層代碼,本身動手封裝了下,將session放入redis裏。github地址: https://github.com/robin0909/...git

原生vertx session 設計

下面給出 LocalSessionStoreImpl 和 ClusteredSessionStoreImpl 的結構關係:github

LocalSession:
localSession核心結構web

ClusteredSession:
redis

從上面的結構中咱們能找到一個繼承實現關係,頂級接口是SessionStore,
而SessionStore是什麼接口呢?在vertx裏,session有一個專門的設計,這裏的SessionStore就是專門爲存儲session而定義接口,看看這個接口裏定義了哪些方法吧!spring

public interface SessionStore {
  //主要在分佈式session共享時會用到的屬性,從store裏獲取session的重試時間
  long retryTimeout();
    
  Session createSession(long timeout);

  //根據sessionId從store裏獲取Session
  void get(String id, Handler<AsyncResult<@Nullable Session>> resultHandler);

  //刪除
  void delete(String id, Handler<AsyncResult<Boolean>> resultHandler);

  //增長session
  void put(Session session, Handler<AsyncResult<Boolean>> resultHandler);

  //清空
  void clear(Handler<AsyncResult<Boolean>> resultHandler);

  //store的size
  void size(Handler<AsyncResult<Integer>> resultHandler);

  //關閉,釋放資源操做
  void close();
}

上面不少會用到有一個屬性,就是sessionId(id)。在session機制裏,還須要依靠瀏覽器端的cookie。當服務器端session生成後,服務器會在cookie裏設置一個vertx-web.session=4d9db69d-7577-4b17-8a66-4d6a2472cd33 返回給瀏覽器。想必你們也看出來了,就是一個uuid碼,也就是sessionId。瀏覽器

接下來,咱們能夠看下二級子接口。二級子接口的做用,其實很簡單,直接上代碼,你們就懂了。安全

public interface LocalSessionStore extends SessionStore {

  long DEFAULT_REAPER_INTERVAL = 1000;

  String DEFAULT_SESSION_MAP_NAME = "vertx-web.sessions";

  static LocalSessionStore create(Vertx vertx) {
    return new LocalSessionStoreImpl(vertx, DEFAULT_SESSION_MAP_NAME, DEFAULT_REAPER_INTERVAL);
  }

  static LocalSessionStore create(Vertx vertx, String sessionMapName) {
    return new LocalSessionStoreImpl(vertx, sessionMapName, DEFAULT_REAPER_INTERVAL);
  }

  static LocalSessionStore create(Vertx vertx, String sessionMapName, long reaperInterval) {
    return new LocalSessionStoreImpl(vertx, sessionMapName, reaperInterval);
  }
}

這裏主要爲了方面在使用和構造時很優雅,router.route().handler(SessionHandler.create(LocalSessionStore.create(vertx))); 有點相似工廠,創造對象。在這個接口裏,也能夠初始化一些專有參數。因此沒有什麼難度。服務器

對官方代碼咱們也理解的差很少了,接下來開始動手封裝本身的RedisSessionStore吧!cookie

本身的RedisSessionStore封裝

首先咱們定義一個RedisSessionStore接口, 接口繼承SessionStore接口。

/**
 * Created by robinyang on 2017/3/13.
 */
public interface RedisSessionStore extends SessionStore {

    long DEFAULT_RETRY_TIMEOUT = 2 * 1000;

    String DEFAULT_SESSION_MAP_NAME = "vertx-web.sessions";

    static RedisSessionStore create(Vertx vertx) {
        return new RedisSessionStoreImpl(vertx, DEFAULT_SESSION_MAP_NAME, DEFAULT_RETRY_TIMEOUT);
    }

    static RedisSessionStore create(Vertx vertx, String sessionMapName) {
        return new RedisSessionStoreImpl(vertx, sessionMapName, DEFAULT_RETRY_TIMEOUT);
    }

    static RedisSessionStore create(Vertx vertx, String sessionMapName, long reaperInterval) {
        return new RedisSessionStoreImpl(vertx, sessionMapName, reaperInterval);
    }

    RedisSessionStore host(String host);

    RedisSessionStore port(int port);
    
    RedisSessionStore auth(String pwd);
}

接着建立一個RedisSessionStoreImpl類, 這裏我先給出一個已經寫好的RedisSessionStoreImpl, 稍後解釋。

public class RedisSessionStoreImpl implements RedisSessionStore {

    private static final Logger logger = LoggerFactory.getLogger(RedisSessionStoreImpl.class);

    private final Vertx vertx;
    private final String sessionMapName;
    private final long retryTimeout;
    private final LocalMap<String, Session> localMap;

    //默認值
    private String host = "localhost";
    private int port = 6379;
    private String auth;

    RedisClient redisClient;

    // 清除全部時使用
    private List<String> localSessionIds;


    public RedisSessionStoreImpl(Vertx vertx, String defaultSessionMapName, long retryTimeout) {
        this.vertx = vertx;
        this.sessionMapName = defaultSessionMapName;
        this.retryTimeout = retryTimeout;

        localMap = vertx.sharedData().getLocalMap(sessionMapName);
        localSessionIds = new Vector<>();
        redisManager();
    }

    @Override
    public long retryTimeout() {
        return retryTimeout;
    }

    @Override
    public Session createSession(long timeout) {
        return new SessionImpl(new PRNG(vertx), timeout, DEFAULT_SESSIONID_LENGTH);
    }

    @Override
    public Session createSession(long timeout, int length) {
        return new SessionImpl(new PRNG(vertx), timeout, length);
    }

    @Override
    public void get(String id, Handler<AsyncResult<Session>> resultHandler) {
        redisClient.getBinary(id, res->{
            if(res.succeeded()) {
                Buffer buffer = res.result();
                if(buffer != null) {
                    SessionImpl session = new SessionImpl(new PRNG(vertx));
                    session.readFromBuffer(0, buffer);
                    resultHandler.handle(Future.succeededFuture(session));
                } else {
                    resultHandler.handle(Future.succeededFuture(localMap.get(id)));
                }
            } else {
                resultHandler.handle(Future.failedFuture(res.cause()));
            }
        });
    }

    @Override
    public void delete(String id, Handler<AsyncResult<Boolean>> resultHandler) {
        redisClient.del(id, res->{
            if (res.succeeded()) {
                localSessionIds.remove(id);
                resultHandler.handle(Future.succeededFuture(true));
            } else {
                resultHandler.handle(Future.failedFuture(res.cause()));
                logger.error("redis裏刪除sessionId: {} 失敗", id, res.cause());
            }
        });
    }

    @Override
    public void put(Session session, Handler<AsyncResult<Boolean>> resultHandler) {
        //put 以前判斷session是否存在,若是存在的話,校驗下
        redisClient.getBinary(session.id(), res1->{
            if (res1.succeeded()) {
                //存在數據
                if(res1.result()!=null) {
                    Buffer buffer = res1.result();
                    SessionImpl oldSession = new SessionImpl(new PRNG(vertx));
                    oldSession.readFromBuffer(0, buffer);
                    SessionImpl newSession = (SessionImpl)session;
                    if(oldSession.version() != newSession.version()) {
                        resultHandler.handle(Future.failedFuture("Version mismatch"));
                        return;
                    }
                    newSession.incrementVersion();
                    writeSession(session, resultHandler);
                } else {
                    //不存在數據
                    SessionImpl newSession = (SessionImpl)session;
                    newSession.incrementVersion();
                    writeSession(session, resultHandler);
                }
            } else {
                resultHandler.handle(Future.failedFuture(res1.cause()));
            }
        });
    }

    private void writeSession(Session session, Handler<AsyncResult<Boolean>> resultHandler) {

        Buffer buffer = Buffer.buffer();
        SessionImpl sessionImpl = (SessionImpl)session;
        //將session序列化到 buffer裏
        sessionImpl.writeToBuffer(buffer);

        SetOptions setOptions = new SetOptions().setPX(session.timeout());
        redisClient.setBinaryWithOptions(session.id(), buffer, setOptions, res->{
            if (res.succeeded()) {
                logger.debug("set key: {} ", session.data());
                localSessionIds.add(session.id());
                resultHandler.handle(Future.succeededFuture(true));
            } else {
                resultHandler.handle(Future.failedFuture(res.cause()));
            }
        });
    }

    @Override
    public void clear(Handler<AsyncResult<Boolean>> resultHandler) {
        localSessionIds.stream().forEach(id->{
            redisClient.del(id, res->{
                //若是在localSessionIds裏存在,可是在redis裏過時不存在了, 只要通知下就行
                localSessionIds.remove(id);
            });
        });
        resultHandler.handle(Future.succeededFuture(true));
    }

    @Override
    public void size(Handler<AsyncResult<Integer>> resultHandler) {
        resultHandler.handle(Future.succeededFuture(localSessionIds.size()));
    }

    @Override
    public void close() {
        redisClient.close(res->{
            logger.debug("關閉 redisClient ");
        });
    }

    private void redisManager() {
        RedisOptions redisOptions = new RedisOptions();
        redisOptions.setHost(host).setPort(port).setAuth(auth);

        redisClient = RedisClient.create(vertx, redisOptions);
    }

    @Override
    public RedisSessionStore host(String host) {
        this.host = host;
        return this;
    }

    @Override
    public RedisSessionStore port(int port) {
        this.port = port;
        return this;
    }

    @Override
    public RedisSessionStore auth(String pwd) {
        this.auth = pwd;
        return this;
    }
}

首先,從get()和put()這兩個方法開始,這兩方法比較核心。

  1. get(), 建立Cookie的時候會生成一個uuid,用這個id取session,第一次咱們發現沒法取到, 第56行代碼就會根據這個id去生成一個session。

  2. 每次發送請求的時候,咱們都會重置session過時時間,因此每次get完後,返回給瀏覽器以前都會有一個put操做,也就是更新數據。這裏的put就稍微複雜一點點,在put以前,咱們須要先根據傳過來的session裏的id從redis裏取到session。若是獲取不到,說明以前經過get獲取的session不是同一個對象,就出異常,這就至關於設置了一道安全的門檻吧!當獲取到了,再比較兩個session的版本是否是一致的,若是不一致,說明session被破環了,算是第二個安全門檻設置吧!都沒有問題了,就能夠put session了,而且從新設置時間。

  3. 這裏依賴vertx提供的redisClient來操做數據的,因此咱們必須引入這個依賴:io.vertx:vertx-redis-client:3.4.1 。

  4. 接下來還有一點須要提的是序列化問題。這裏我使用的是vertx封裝的一種序列化,將數據序列化到Buffer裏,而SessiomImpl類裏又已經實現好了序列化,從SessionImple序列化成Buffer和Buffer反序列化。

public class SessionImpl implements Session, ClusterSerializable, Shareable {
    //...
    
    @Override
  public void writeToBuffer(Buffer buff) {
    byte[] bytes = id.getBytes(UTF8);
    buff.appendInt(bytes.length).appendBytes(bytes);
    buff.appendLong(timeout);
    buff.appendLong(lastAccessed);
    buff.appendInt(version);
    Buffer dataBuf = writeDataToBuffer();
    buff.appendBuffer(dataBuf);
  }

  @Override
  public int readFromBuffer(int pos, Buffer buffer) {
    int len = buffer.getInt(pos);
    pos += 4;
    byte[] bytes = buffer.getBytes(pos, pos + len);
    pos += len;
    id = new String(bytes, UTF8);
    timeout = buffer.getLong(pos);
    pos += 8;
    lastAccessed = buffer.getLong(pos);
    pos += 8;
    version = buffer.getInt(pos);
    pos += 4;
    pos = readDataFromBuffer(pos, buffer);
    return pos;
  }
    
    //...
}

以上就是序列化和反序列化的實現。

  1. localSessionIds 主要考慮到清除session的時候使用,由於數據主要以保存在session爲主,本地localSessionIds 保存sessionId是輔助做用。

用法

用法很簡單,一行代碼就說明。

router.route().handler(SessionHandler.create(RedisSessionStore.create(vertx).host("127.0.0.1").port(6349)));
相關文章
相關標籤/搜索