近期的flink做業中,須要對上傳的日誌數據進行大量的校驗。
校驗規則大多比較簡單,僅爲字符串長度,數組長度,數據的最大值和最小值,非空判斷等。然而不想寫諸多校驗代碼,容易致使代碼又醜又繁瑣。聯想SpringBoot項目中的參數校驗,因而想着在純maven的項目中引入校驗。java
SpringBoot的基本參數校驗是基於Hibernate Validator實現的,所以在pom中引入如下依賴:數組
<dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-validator</artifactId> <version>6.1.0.Final</version> </dependency> <dependency> <groupId>org.glassfish</groupId> <artifactId>javax.el</artifactId> <version>3.0.1-b11</version> </dependency> <dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-validator-cdi</artifactId> <version>6.1.0.Final</version> </dependency>
在須要驗證的實體類中引入校驗註解(不得不說,註解真香).多線程
public class LogEvent { @NotNull private Instant timestamp; @NotNull private String filepath; @NotNull @Length(min = 1, max = 64) private String region; @NotNull private Boolean status; @NotNull @Min(-1) @Max(60 * 1000) private Integer timeDelay; @NotNull @Length(min = 1, max = 64) private String target; @Length(max = 1024) private String message; @Size(max = 5) private List<String> tags; }
由於Validator是thread safe
實現,所以多線程中能夠放心的使用。maven
@Slf4j public class LogEventUtil { // thread safe private static final Validator VALIDATOR = Validation.buildDefaultValidatorFactory().getValidator(); public static boolean validate(LogEvent event) { Set<ConstraintViolation<LogEvent>> constraintViolations = VALIDATOR.validate(event); if (!constraintViolations.isEmpty()) { return false; } // 此處省略若干複雜的校驗規則(髒活不可能一點都不接觸的) } }
經過VALIDATOR.validate便可實現對LogEvent的基本校驗。ui
寥寥幾筆,即完成數據讀取以及校驗。hibernate
private static DataStream<LogEvent> configureKafkaConsumer(final StreamExecutionEnvironment env, ParameterTool parameterTool) { String topic = parameterTool.get("kafka.topic", ""); Properties kafkaProperties = filterPrefix(parameterTool.getProperties(), "kafka"); return env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), kafkaProperties)) .map((MapFunction<String, LogEvent>) LogEventUtil::parseLogEvent) .filter((FilterFunction<LogEvent>) LogEventUtil::validate) .name("LogEventSourceStream") .returns(LogEvent.class); }