需求:如上圖這個查詢,不須要太大的數據量就足以上MySQL數據庫崩潰,解決方案之一就是將查詢的數據同步到es服務器中,數據庫持久層採用Mybatis + Mybatis-plusjava
gitee地址:https://gitee.com/ichiva/mybatis-sync-esgit
解決思路
使用Mybatis攔截器攔截update操做,獲取攔截的類名+方法名組成的id,以及傳入的參數,找到對應的處理器,交給處理器具體完成同步任務。處理器定義成接口,經過regHandler()方法註冊進攔截器。key就是類名+方法名spring
核心代碼
同步處理器接口
package com.gzwl.interceptor; /** * es 同步處理器 */ public interface SynEsHandler { void handler(Object parameter); }
攔截器
package com.gzwl.interceptor; import lombok.extern.slf4j.Slf4j; import org.apache.ibatis.executor.Executor; import org.apache.ibatis.mapping.MappedStatement; import org.apache.ibatis.plugin.*; import org.springframework.stereotype.Component; import java.util.*; /** * es 同步攔截器 * * 配置攔截器 @Signature * 前置攔截器 type = Executor.class * 只有寫入才須要攔截 method = "update" * 攔截方法的參數 args = {MappedStatement.class, Object.class} */ @Slf4j @Component @Intercepts({ @Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class})}) public class SyncEsInterceptor implements Interceptor { private Map<string,syneshandler> handlerMap = new HashMap<>(); /** * 攔截器在sql執行成功後同步到es, * 若是同步失敗拋出異常,保證數據一致性 * * @param invocation * @return * @throws Throwable */ @Override public Object intercept(Invocation invocation) throws Throwable { Object res = invocation.proceed(); Object[] args = invocation.getArgs(); if (args.length >= 2) { MappedStatement mappedStatement = (MappedStatement) args[0]; //攔截到的方法,也就是註冊的key String key = mappedStatement.getId(); log.debug("攔截到的方法 key=",key); SynEsHandler synEsHandler = handlerMap.get(key); if(null != synEsHandler){ try { synEsHandler.handler(args[1]); }catch (Exception e){ //包裝異常 throw new SyncEsException(e); } }else { log.debug("沒有處理的key={}",key); } } return res; } @Override public Object plugin(Object o) { if(o instanceof Executor) return Plugin.wrap(o, this); else return o; } @Override public void setProperties(Properties properties) { } /** * 註冊同步處理器 * @param key * @param parameterHandler */ public void regHandler(String key,SynEsHandler parameterHandler){ handlerMap.put(key,parameterHandler); } }
實現處理接口
package com.gzwl.interceptor.impl; import com.gzwl.entity.Employee; import com.gzwl.interceptor.SynEsHandler; import com.gzwl.interceptor.SyncEsInterceptor; import org.apache.ibatis.binding.MapperMethod; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component class EmployeeMapperHandler implements SynEsHandler { @Autowired public EmployeeMapperHandler(SyncEsInterceptor syncEsInterceptor){ //將本身註冊到攔截器中,每一個方法可用是獨立的handler,也可用註冊多個方法的處理器 syncEsInterceptor.regHandler("com.gzwl.dao.EmployeeMapper.insert",this); syncEsInterceptor.regHandler("com.gzwl.dao.EmployeeMapper.update",this); } @Override public void handler(Object parameter) { Employee entity; if(parameter instanceof Employee){ //insert 可用直接攔截到實體類 entity = (Employee) parameter; }else { //update 方法須要特殊處理 entity = (Employee) ((MapperMethod.ParamMap) parameter).get("param1"); } //在這裏編寫es的同步邏輯 } }
完整代碼:https://gitee.com/ichiva/mybatis-sync-essql
注意:這個方案也是有致命缺陷的,就是在es執行完成後,程序還有可能讓數據庫回滾,從而形成數據不一致。 </string,syneshandler>數據庫