聊聊canal的CanalEventFilter

本文主要研究一下canal的CanalEventFilterjava

CanalEventFilter

canal-1.1.4/filter/src/main/java/com/alibaba/otter/canal/filter/CanalEventFilter.javagit

public interface CanalEventFilter<T> {

    boolean filter(T event) throws CanalFilterException;
}
  • CanalEventFilter接口定義了filter方法

AviaterELFilter

canal-1.1.4/filter/src/main/java/com/alibaba/otter/canal/filter/aviater/AviaterELFilter.javagithub

public class AviaterELFilter implements CanalEventFilter<CanalEntry.Entry> {

    public static final String ROOT_KEY = "entry";
    private String             expression;

    public AviaterELFilter(String expression){
        this.expression = expression;
    }

    public boolean filter(CanalEntry.Entry entry) throws CanalFilterException {
        if (StringUtils.isEmpty(expression)) {
            return true;
        }

        Map<String, Object> env = new HashMap<String, Object>();
        env.put(ROOT_KEY, entry);
        return (Boolean) AviatorEvaluator.execute(expression, env);
    }

}
  • AviaterELFilter實現了CanalEventFilter接口,其構造器接收expression參數,其filter方法經過AviatorEvaluator.execute(expression, env)計算結果

AviaterSimpleFilter

canal-1.1.4/filter/src/main/java/com/alibaba/otter/canal/filter/aviater/AviaterSimpleFilter.javaexpress

public class AviaterSimpleFilter implements CanalEventFilter<String> {

    private static final String SPLIT             = ",";

    private static final String FILTER_EXPRESSION = "include(list,target)";

    private final Expression    exp               = AviatorEvaluator.compile(FILTER_EXPRESSION, true);

    private final List<String>  list;

    public AviaterSimpleFilter(String filterExpression){
        if (StringUtils.isEmpty(filterExpression)) {
            list = new ArrayList<String>();
        } else {
            String[] ss = filterExpression.toLowerCase().split(SPLIT);
            list = Arrays.asList(ss);
        }
    }

    public boolean filter(String filtered) throws CanalFilterException {
        if (list.isEmpty()) {
            return true;
        }
        if (StringUtils.isEmpty(filtered)) {
            return true;
        }
        Map<String, Object> env = new HashMap<String, Object>();
        env.put("list", list);
        env.put("target", filtered.toLowerCase());
        return (Boolean) exp.execute(env);
    }

}
  • AviaterSimpleFilter實現了CanalEventFilter接口;它定義了Expression屬性,其值爲AviatorEvaluator.compile(FILTER_EXPRESSION, true);其構造器接收filterExpression;其filter方法經過exp.execute(env)返回結果

AviaterRegexFilter

canal-1.1.4/filter/src/main/java/com/alibaba/otter/canal/filter/aviater/AviaterRegexFilter.javaide

public class AviaterRegexFilter implements CanalEventFilter<String> {

    private static final String             SPLIT             = ",";
    private static final String             PATTERN_SPLIT     = "|";
    private static final String             FILTER_EXPRESSION = "regex(pattern,target)";
    private static final RegexFunction      regexFunction     = new RegexFunction();
    private final Expression                exp               = AviatorEvaluator.compile(FILTER_EXPRESSION, true);
    static {
        AviatorEvaluator.addFunction(regexFunction);
    }

    private static final Comparator<String> COMPARATOR        = new StringComparator();

    final private String                    pattern;
    final private boolean                   defaultEmptyValue;

    public AviaterRegexFilter(String pattern){
        this(pattern, true);
    }

    public AviaterRegexFilter(String pattern, boolean defaultEmptyValue){
        this.defaultEmptyValue = defaultEmptyValue;
        List<String> list = null;
        if (StringUtils.isEmpty(pattern)) {
            list = new ArrayList<String>();
        } else {
            String[] ss = StringUtils.split(pattern, SPLIT);
            list = Arrays.asList(ss);
        }

        // 對pattern按照從長到短的排序
        // 由於 foo|foot 匹配 foot 會出錯,緣由是 foot 匹配了 foo 以後,會返回 foo,可是 foo 的長度和 foot
        // 的長度不同
        Collections.sort(list, COMPARATOR);
        // 對pattern進行頭尾徹底匹配
        list = completionPattern(list);
        this.pattern = StringUtils.join(list, PATTERN_SPLIT);
    }

    public boolean filter(String filtered) throws CanalFilterException {
        if (StringUtils.isEmpty(pattern)) {
            return defaultEmptyValue;
        }

        if (StringUtils.isEmpty(filtered)) {
            return defaultEmptyValue;
        }

        Map<String, Object> env = new HashMap<String, Object>();
        env.put("pattern", pattern);
        env.put("target", filtered.toLowerCase());
        return (Boolean) exp.execute(env);
    }

    //......

    @Override
    public String toString() {
        return pattern;
    }

}
  • AviaterRegexFilter實現了CanalEventFilter接口;它定義了Expression屬性,其值爲AviatorEvaluator.compile(FILTER_EXPRESSION, true);其構造器接收pattern參數,它會先對pattern進行sort以及修正;其filter方法經過exp.execute(env)返回結果

小結

CanalEventFilter接口定義了filter方法;它三個實現類分別是AviaterELFilter、AviaterSimpleFilter、AviaterRegexFilterthis

doc

相關文章
相關標籤/搜索