首页 软件代码

FlinkCDC/DEBEZIUM自定义日期转换器


Flinkcdc研究

最近在研究Flinkcdc数据采集,底层技术为debezium,debezium会将日期转为5位数字,日期时间位13位的数字,看之前代码解决办法是:
1.识别十三位数字进行转换为日期格式。
2.对于date类型,人工穷举字段类型进行转换

我自己考虑了一下,这样可能会误转换、增大人工成本。感觉这样不是解决办法,就通过查看官网文档、看一些源码,看看是否有其他解决方法。

debezium自定义转换器

经过阅读debezium的官方文档,发现其是支持自定义转换器,因此可以通过自定义转换器时间事件类型的转换。
1.避免造轮子
经过搜索学习,发现github上有大佬已经写过针对mysql的时间点我直达转换器。
2.不得不造轮子
因为我日常参与的数据不仅是mysql、还有sqlserver。查看sqlserver并没有相关代码,就想着自己也写一个,能够兼容mysql和sqlserver的转换器(后来证明还是分开写比较好)。
3.学习分析
对大佬代码学习及jdbc源码查看,并且在实际的测试中。发现mysql、sqlserver的字段类型在快照、binlog(cdc)期间并不是完全一样。若要完全兼容,必须将映射类型找出来。
刚开始认为一个jdbc type只会对应一种java类。其实不是,而是在不同的jdbc中都有不同的映射。(通过chatgpt证实了一下)

chatgpt

mysql转换

mysql启动时,快照期间初始化转换器,在binlog期间仍进行一次初始化转换器。(使用的类不同)

字段类型快照类型(jdbc type)binlog类型(jdbc type)
DATEjava.time.LocalDate(93)java.time.LocalDate(91)
TIMEjava.time.Duration(92)java.time.Duration(92)
DATETIMEjava.sql.Timestamp(93)java.time.LocalDateTime(93)
TIMESTAMPjava.sql.Timestamp(2014)java.time.ZonedDateTime(2014)

sqlserver转换

sqlserver启动时 快照期间初始化转换器,在cdc期间不再进行初始化转换器。(使用的类相同)

timestamp类型在sqlserver中为byte[]类型,jdbc type为-2,因此不进行转换。
字段类型快照类型(jdbc type)cdc类型(jdbc type)
DATEjava.sql.Date(91)java.sql.Date(91)
TIMEjava.sql.Timestamp(92)java.sql.Time(92)
DATETIMEjava.sql.Timestamp(93)java.sql.Timestamp(93)
DATETIME2java.sql.Timestamp(93)java.sql.Timestamp(93)
DATETIMEOFFSETmicrosoft.sql.DateTimeOffset(-155)microsoft.sql.DateTimeOffset(-155)
SMALLDATETIMEjava.sql.Timestamp(93)java.sql.Timestamp(93)

开始写代码喽

java并不是我的擅长( 在学了, ),写代码期间借助了大量人工智能GitHub coplit,帮我实现。
刚开始涉及思路:大统一是全形式,将mysql和sqlserver都写到一个方法中去。

实际中发现并不是很合理。例如mysql的TIMESTAMP类型是时间戳,但sqlserver的TIMESTAMP是byte[]类型,还要在另外判断一下jdbc type是否为-2,也容易产生误解。最终决定分开写。

最后依照官网的模板重写方法就可以了。代码地址点击直达

使用方法

converters参数为:自定义转换器的名字,可以随意设置。设置的值就作为转换器的名字,在以后的参数中就要使用这个名字。
假设自定义的名字为mydebeziumconverter,则type参数为mydebeziumconverter.type

mydebeziumconverter.type参数为:自定义转换器的类名,必须设置。(转换器的方法)
mydebeziumconverter.database.type参数为:数据库类型,必须设置。(需要设置为mysql或sqlserver)
mydebeziumconverter.format.datetime参数为:datetime类型的格式,可选。
mydebeziumconverter.format.date参数为:date类型的格式,可选。
mydebeziumconverter.format.time参数为:time类型的格式,可选。

如果仅使用mysql或sqlserver建议独立编译代码,只保留mysql或sqlserver的转换器,减少依赖。

flinkcdc

可使用源代码也可使用编译好的jar包。只需要放入目录即可。并在配置中设置参数。

// 自定义解释器
// 设置解释器及解释器参数
debeziumProperties.put("converters", "mydebeziumconverter");
debeziumProperties.put("mydebeziumconverter.type", "org.util.MyDebeziumConverter");
debeziumProperties.put("mydebeziumconverter.database.type", "mysql");
// 自定义格式,可选
debeziumProperties.put("mydebeziumconverter.format.datetime", "yyyy-MM-dd HH:mm:ss");
debeziumProperties.put("mydebeziumconverter.format.date", "yyyy-MM-dd");
debeziumProperties.put("mydebeziumconverter.format.time", "HH:mm:ss");

debezium

使用jar包,并将其放在 debezium 插件的同一级别目录中。并在配置文件中设置参数。

"converters": "mydebeziumconverter",
"mydebeziumconverter.type": "org.util.MyDebeziumConverter",
"mydebeziumconverter.database.type": "mysql",
"mydebeziumconverter.format.datetime": "yyyy-MM-dd HH:mm:ss",
"mydebeziumconverter.format.date": "yyyy-MM-dd",
"mydebeziumconverter.format.time": "HH:mm:ss"

github

代码已发布到github,点击直达





文章评论

    ample 访客BrowserMac
    2023-02-12 12:59   回复

    真厉害

      布衣者 站长ChromeWindows
      2023-02-12 13:04   回复

      刚开始学习

目录