玖叶教程网

前端编程开发入门

聊聊rocketmq-mysql的ColumnParser

本文主要研究一下rocketmq-mysql的ColumnParser



ColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.java

public abstract class ColumnParser {
?
    public static ColumnParser getColumnParser(String dataType, String colType, String charset) {
?
        switch (dataType) {
            case "tinyint":
            case "smallint":
            case "mediumint":
            case "int":
                return new IntColumnParser(dataType, colType);
            case "bigint":
                return new BigIntColumnParser(colType);
            case "tinytext":
            case "text":
            case "mediumtext":
            case "longtext":
            case "varchar":
            case "char":
                return new StringColumnParser(charset);
            case "date":
            case "datetime":
            case "timestamp":
                return new DateTimeColumnParser();
            case "time":
                return new TimeColumnParser();
            case "year":
                return new YearColumnParser();
            case "enum":
                return new EnumColumnParser(colType);
            case "set":
                return new SetColumnParser(colType);
            default:
                return new DefaultColumnParser();
        }
    }
?
    public static String[] extractEnumValues(String colType) {
        String[] enumValues = {};
        Matcher matcher = Pattern.compile("(enum|set)\\((.*)\\)").matcher(colType);
        if (matcher.matches()) {
            enumValues = matcher.group(2).replace("'", "").split(",");
        }
?
        return enumValues;
    }
?
    public abstract Object getValue(Object value);
?
}
  • ColumnParser定义了getValue抽象方法;它提供了getColumnParser方法用于根据dataType获取对应的ColumnParser实现类

IntColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.java

public class IntColumnParser extends ColumnParser {
?
    private int bits;
    private boolean signed;
?
    public IntColumnParser(String dataType, String colType) {
?
        switch (dataType) {
            case "tinyint":
                bits = 8;
                break;
            case "smallint":
                bits = 16;
                break;
            case "mediumint":
                bits = 24;
                break;
            case "int":
                bits = 32;
        }
?
        this.signed = !colType.matches(".* unsigned#34;);
    }
?
    @Override
    public Object getValue(Object value) {
?
        if (value == null) {
            return null;
        }
?
        if (value instanceof Long) {
            return value;
        }
?
        if (value instanceof Integer) {
            Integer i = (Integer) value;
            if (signed || i > 0) {
                return i;
            } else {
                return (1L << bits) + i;
            }
        }
?
        return value;
    }
}
  • IntColumnParser解析tinyint、smallint、mediumint、int类型

BigIntColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.java

public class BigIntColumnParser extends ColumnParser {
?
    private static BigInteger max = BigInteger.ONE.shiftLeft(64);
?
    private boolean signed;
?
    public BigIntColumnParser(String colType) {
        this.signed = !colType.matches(".* unsigned#34;);
    }
?
    @Override
    public Object getValue(Object value) {
?
        if (value == null) {
            return null;
        }
?
        if (value instanceof BigInteger) {
            return value;
        }
?
        Long l = (Long) value;
        if (!signed && l < 0) {
            return max.add(BigInteger.valueOf(l));
        } else {
            return l;
        }
    }
}
  • BigIntColumnParser解析bigint类型

StringColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.java

public class StringColumnParser extends ColumnParser {
?
    private String charset;
?
    public StringColumnParser(String charset) {
        this.charset = charset.toLowerCase();
    }
?
    @Override
    public Object getValue(Object value) {
?
        if (value == null) {
            return null;
        }
?
        if (value instanceof String) {
            return value;
        }
?
        byte[] bytes = (byte[]) value;
?
        switch (charset) {
            case "utf8":
            case "utf8mb4":
                return new String(bytes, Charsets.UTF_8);
            case "latin1":
            case "ascii":
                return new String(bytes, Charsets.ISO_8859_1);
            case "ucs2":
                return new String(bytes, Charsets.UTF_16);
            default:
                return new String(bytes, Charsets.toCharset(charset));
?
        }
    }
}
  • StringColumnParser解析tinytext、text、mediumtext、longtext、varchar、char类型

DateTimeColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java

public class DateTimeColumnParser extends ColumnParser {
?
    private static SimpleDateFormat dateTimeFormat;
    private static SimpleDateFormat dateTimeUtcFormat;
?
    static {
        dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
    }
?
    @Override
    public Object getValue(Object value) {
?
        if (value == null) {
            return null;
        }
?
        if (value instanceof Timestamp) {
            return dateTimeFormat.format(value);
        }
?
        if (value instanceof Long) {
            return dateTimeUtcFormat.format(new Date((Long) value));
        }
?
        return value;
    }
}
  • DateTimeColumnParser解析date、datetime、timestamp类型

TimeColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java

public class TimeColumnParser extends ColumnParser {
?
    @Override
    public Object getValue(Object value) {
?
        if (value == null) {
            return null;
        }
?
        if (value instanceof Timestamp) {
?
            return new Time(((Timestamp) value).getTime());
        }
?
        return value;
    }
}
  • TimeColumnParser解析time类型

YearColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.java

public class YearColumnParser extends ColumnParser {
?
    @Override
    public Object getValue(Object value) {
?
        if (value == null) {
            return null;
        }
?
        if (value instanceof Date) {
            Calendar calendar = Calendar.getInstance();
            calendar.setTime((Date) value);
            return calendar.get(Calendar.YEAR);
        }
?
        return value;
    }
}
  • YearColumnParser解析year类型

EnumColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.java

public class EnumColumnParser extends ColumnParser {
?
    private String[] enumValues;
?
    public EnumColumnParser(String colType) {
        enumValues = extractEnumValues(colType);
    }
?
    @Override
    public Object getValue(Object value) {
?
        if (value == null) {
            return null;
        }
?
        if (value instanceof String) {
            return value;
        }
?
        Integer i = (Integer) value;
        if (i == 0) {
            return null;
        } else {
            return enumValues[i - 1];
        }
    }
}
  • EnumColumnParser解析enum类型

SetColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.java

public class SetColumnParser extends ColumnParser {
?
    private String[] enumValues;
?
    public SetColumnParser(String colType) {
        enumValues = extractEnumValues(colType);
    }
?
    @Override
    public Object getValue(Object value) {
        if (value == null) {
            return null;
        }
?
        if (value instanceof String) {
            return value;
        }
?
        StringBuilder builder = new StringBuilder();
        long l = (Long) value;
?
        boolean needSplit = false;
        for (int i = 0; i < enumValues.length; i++) {
            if (((l >> i) & 1) == 1) {
                if (needSplit)
                    builder.append(",");
?
                builder.append(enumValues[i]);
                needSplit = true;
            }
        }
?
        return builder.toString();
    }
}
  • SetColumnParser解析set类型

DefaultColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java

public class DefaultColumnParser extends ColumnParser {
?
    @Override
    public Object getValue(Object value) {
?
        if (value == null) {
            return null;
        }
?
        if (value instanceof byte[]) {
            return Base64.encodeBase64String((byte[]) value);
        }
?
        return value;
    }
}
  • DefaultColumnParser通过base64将byte数组转为string

小结

ColumnParser定义了getValue抽象方法;它提供了getColumnParser方法用于根据dataType获取对应的ColumnParser实现类

doc

  • ColumnParser

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言