MyBatise同步es(Elasticsearch)

image.png

需求:如上圖這個查詢,不須要太大的數據量就足以上MySQL數據庫崩潰,解決方案之一就是將查詢的數據同步到es服務器中,數據庫持久層採用Mybatis + Mybatis-plusjava

gitee地址:https://gitee.com/ichiva/mybatis-sync-esgit

解決思路

使用Mybatis攔截器攔截update操做,獲取攔截的類名+方法名組成的id,以及傳入的參數,找到對應的處理器,交給處理器具體完成同步任務。處理器定義成接口,經過regHandler()方法註冊進攔截器。key就是類名+方法名spring

核心代碼

同步處理器接口

package com.gzwl.interceptor;

/**
 * es 同步處理器
 */
public interface SynEsHandler {

    void handler(Object parameter);
}

攔截器

package com.gzwl.interceptor;

import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.*;
import org.springframework.stereotype.Component;

import java.util.*;

/**
 * es 同步攔截器
 *
 * 配置攔截器 @Signature
 * 前置攔截器 type = Executor.class
 * 只有寫入才須要攔截 method = "update"
 * 攔截方法的參數 args = {MappedStatement.class, Object.class}
 */
@Slf4j
@Component
@Intercepts({
        @Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class})})
public class SyncEsInterceptor implements Interceptor {

    private Map<string,syneshandler> handlerMap = new HashMap&lt;&gt;();

    /**
     * 攔截器在sql執行成功後同步到es,
     * 若是同步失敗拋出異常,保證數據一致性
     *
     * @param invocation
     * @return
     * @throws Throwable
     */
    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        Object res = invocation.proceed();

        Object[] args = invocation.getArgs();
        if (args.length &gt;= 2) {
            MappedStatement mappedStatement = (MappedStatement) args[0];

            //攔截到的方法,也就是註冊的key
            String key = mappedStatement.getId();
            log.debug("攔截到的方法 key=",key);

            SynEsHandler synEsHandler = handlerMap.get(key);
            if(null != synEsHandler){
                try {
                    synEsHandler.handler(args[1]);
                }catch (Exception e){
                    //包裝異常
                    throw new SyncEsException(e);
                }
            }else {
                log.debug("沒有處理的key={}",key);
            }
        }

        return res;
    }

    @Override
    public Object plugin(Object o) {
        if(o instanceof Executor) return Plugin.wrap(o, this);
        else return o;
    }

    @Override
    public void setProperties(Properties properties) {

    }

    /**
     * 註冊同步處理器
     * @param key
     * @param parameterHandler
     */
    public void regHandler(String key,SynEsHandler parameterHandler){
        handlerMap.put(key,parameterHandler);
    }
}

實現處理接口

package com.gzwl.interceptor.impl;

import com.gzwl.entity.Employee;
import com.gzwl.interceptor.SynEsHandler;
import com.gzwl.interceptor.SyncEsInterceptor;
import org.apache.ibatis.binding.MapperMethod;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
class EmployeeMapperHandler implements SynEsHandler {

    @Autowired
    public EmployeeMapperHandler(SyncEsInterceptor syncEsInterceptor){
        //將本身註冊到攔截器中,每一個方法可用是獨立的handler,也可用註冊多個方法的處理器
        syncEsInterceptor.regHandler("com.gzwl.dao.EmployeeMapper.insert",this);
        syncEsInterceptor.regHandler("com.gzwl.dao.EmployeeMapper.update",this);
    }

    @Override
    public void handler(Object parameter) {
        Employee entity;

        if(parameter instanceof Employee){
            //insert 可用直接攔截到實體類
            entity = (Employee) parameter;
        }else {
            //update 方法須要特殊處理
            entity = (Employee) ((MapperMethod.ParamMap) parameter).get("param1");
        }

        //在這裏編寫es的同步邏輯
    }
}

完整代碼:https://gitee.com/ichiva/mybatis-sync-essql

注意:這個方案也是有致命缺陷的,就是在es執行完成後,程序還有可能讓數據庫回滾,從而形成數據不一致。 </string,syneshandler>數據庫

相關文章
相關標籤/搜索