本文主要研究一下rocketmq-mysql的ColumnParserjava
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.javamysql
public abstract class ColumnParser { public static ColumnParser getColumnParser(String dataType, String colType, String charset) { switch (dataType) { case "tinyint": case "smallint": case "mediumint": case "int": return new IntColumnParser(dataType, colType); case "bigint": return new BigIntColumnParser(colType); case "tinytext": case "text": case "mediumtext": case "longtext": case "varchar": case "char": return new StringColumnParser(charset); case "date": case "datetime": case "timestamp": return new DateTimeColumnParser(); case "time": return new TimeColumnParser(); case "year": return new YearColumnParser(); case "enum": return new EnumColumnParser(colType); case "set": return new SetColumnParser(colType); default: return new DefaultColumnParser(); } } public static String[] extractEnumValues(String colType) { String[] enumValues = {}; Matcher matcher = Pattern.compile("(enum|set)\\((.*)\\)").matcher(colType); if (matcher.matches()) { enumValues = matcher.group(2).replace("'", "").split(","); } return enumValues; } public abstract Object getValue(Object value); }
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.javagit
public class IntColumnParser extends ColumnParser { private int bits; private boolean signed; public IntColumnParser(String dataType, String colType) { switch (dataType) { case "tinyint": bits = 8; break; case "smallint": bits = 16; break; case "mediumint": bits = 24; break; case "int": bits = 32; } this.signed = !colType.matches(".* unsigned$"); } @Override public Object getValue(Object value) { if (value == null) { return null; } if (value instanceof Long) { return value; } if (value instanceof Integer) { Integer i = (Integer) value; if (signed || i > 0) { return i; } else { return (1L << bits) + i; } } return value; } }
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.javagithub
public class BigIntColumnParser extends ColumnParser { private static BigInteger max = BigInteger.ONE.shiftLeft(64); private boolean signed; public BigIntColumnParser(String colType) { this.signed = !colType.matches(".* unsigned$"); } @Override public Object getValue(Object value) { if (value == null) { return null; } if (value instanceof BigInteger) { return value; } Long l = (Long) value; if (!signed && l < 0) { return max.add(BigInteger.valueOf(l)); } else { return l; } } }
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.javasql
public class StringColumnParser extends ColumnParser { private String charset; public StringColumnParser(String charset) { this.charset = charset.toLowerCase(); } @Override public Object getValue(Object value) { if (value == null) { return null; } if (value instanceof String) { return value; } byte[] bytes = (byte[]) value; switch (charset) { case "utf8": case "utf8mb4": return new String(bytes, Charsets.UTF_8); case "latin1": case "ascii": return new String(bytes, Charsets.ISO_8859_1); case "ucs2": return new String(bytes, Charsets.UTF_16); default: return new String(bytes, Charsets.toCharset(charset)); } } }
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.javaapache
public class DateTimeColumnParser extends ColumnParser { private static SimpleDateFormat dateTimeFormat; private static SimpleDateFormat dateTimeUtcFormat; static { dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("UTC")); } @Override public Object getValue(Object value) { if (value == null) { return null; } if (value instanceof Timestamp) { return dateTimeFormat.format(value); } if (value instanceof Long) { return dateTimeUtcFormat.format(new Date((Long) value)); } return value; } }
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java數組
public class TimeColumnParser extends ColumnParser { @Override public Object getValue(Object value) { if (value == null) { return null; } if (value instanceof Timestamp) { return new Time(((Timestamp) value).getTime()); } return value; } }
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.javaapp
public class YearColumnParser extends ColumnParser { @Override public Object getValue(Object value) { if (value == null) { return null; } if (value instanceof Date) { Calendar calendar = Calendar.getInstance(); calendar.setTime((Date) value); return calendar.get(Calendar.YEAR); } return value; } }
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.javaide
public class EnumColumnParser extends ColumnParser { private String[] enumValues; public EnumColumnParser(String colType) { enumValues = extractEnumValues(colType); } @Override public Object getValue(Object value) { if (value == null) { return null; } if (value instanceof String) { return value; } Integer i = (Integer) value; if (i == 0) { return null; } else { return enumValues[i - 1]; } } }
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.javaui
public class SetColumnParser extends ColumnParser { private String[] enumValues; public SetColumnParser(String colType) { enumValues = extractEnumValues(colType); } @Override public Object getValue(Object value) { if (value == null) { return null; } if (value instanceof String) { return value; } StringBuilder builder = new StringBuilder(); long l = (Long) value; boolean needSplit = false; for (int i = 0; i < enumValues.length; i++) { if (((l >> i) & 1) == 1) { if (needSplit) builder.append(","); builder.append(enumValues[i]); needSplit = true; } } return builder.toString(); } }
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java
public class DefaultColumnParser extends ColumnParser { @Override public Object getValue(Object value) { if (value == null) { return null; } if (value instanceof byte[]) { return Base64.encodeBase64String((byte[]) value); } return value; } }
ColumnParser定義了getValue抽象方法;它提供了getColumnParser方法用於根據dataType獲取對應的ColumnParser實現類