一 CDC概述
CDC的全称是Change Data Capture,翻译过来就是“变动数据捕获”。它的核心思想就是,检测并捕获数据库的变动(包括数据的插入、更新和删除等操作),把这些数据变更按发生的时间顺序记录下来,写入到消息中间件(Kafka、Pulsar等)供其他应用订阅、消费。
1.1 CDC的应用场景1.1.1 数据同步、备份和容灾
在MySQL、TiDB、PostgreSQL等数据库之间互相同步数据,可以通过CDC计算将这些数据同步到Kafka等消息中间件中,然后再通过Flink、Spark、ES等技术消费Kafka中的数据,供数据分析使用。使用这些工具订阅感兴趣的数据表变更,而不需要直接把分析流程接入到业务系统,起到解耦的作用。
也可以通过CDC技术对数据库进行备份。
1.1.2 数据采集
面向数据湖或者数据仓库源数据的采集工作。
1.1.3 微服务之间共享数据状态
CDC可以作为微服务数据之间共享的一种解决方案,可以通过CDC获取其他微服务数据库的变更,从而获取数据状态的更新,完成自己对应的逻辑。
1.2 常见的开源CDC技术对比1.2.1 maxwell
maxwell专门用来实时解析MySQL的Binlog日志,并生成Json格式的消息。作为生成者将消息发送到Kafka、Kinesis、RabbitMQ等消息队列中。它的常见应用场景有ETL、维护缓存、收集表级别的DML操作。maxwell提供以下功能:
支持SELECT * FROM table的方式进行全量数据初始化。支持在主库发生failover之后,自动恢复Binlog位置(GTID)。可以伪装为Slave,接收binlog events,然后根据schemas信息拼装,可以接受ddl、xid、row等各种event。1.2.2 Debezium
Debezium 是一个变更数据捕获 (CDC) 平台,它通过重用 Kafka 和 Kafka Connect 来实现其持久性、可靠性和容错质量。部署到 Kafka Connect 分布式、可扩展、容错服务的每个连接器监控单个上游数据库服务器,捕获所有更改并将它们记录在一个或多个 Kafka 主题中(通常每个数据库表一个主题)。支持监听 MySQL,MongoDB,PostgreSQL,Oracle,SQL Server 等数据库的变化。
1.2.3 Canal
主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。在canal1.1.4版本迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力。
1.2.4 Flink CDC
Flink CDC Connectors内部封装了Debezium特性,可以使用Flink CDC的方式替代canal+kafka的方式,直接通过sql的方式来实现对mysql数据的同步。
二 Flink CDC2.0简单上手例子
Flink在1.11版本开始引入Flink CDC功能,并且同时支持Table和SQL两种形式,Flink SQL CDC基于Debezium实现的,能够对CDC数据进行实时解析同步。当启动MySQL CDC源时,它将获取一个全局读取锁(FLUSH TABLES WITH READ LOCK),该锁将阻止其他数据库的写入,然后读取当前binlog位置以及数据库和表的schema,之后将释放全局读取锁。然后它扫描数据库表并从先前记录的位置读取binlog,Flink将定期执行checkpoints以记录binlog位置。如果发生故障,作业将重新启动并从checkpoint完成的binlog位置恢复,因此它保证了仅一次的语义。
Dynamic Table是Flink内部定义的表,它和流式可以相互转化的。可以简单的理解为:每张MySQL表对应一个Binlog日志,Binlog日志随着MySQL表的变化而变化,Dynamic Table相当于Binlog日志流在某一时刻的物化结果。在Flink中,数据从一个算子流向另外一个算子的时候,都是以Changelog Stream的格式发送到下游,在下游我们可以将其翻译成一张表或者一条流进行操作。
2.1 社区支持的数据库连接
2.2 CDC和Flink对应版本介绍
2.3 FlinkCDC2.0的maven配置
我们使用的是Flink1.13.0,因此选用FlinkCDC2.0
2.4.2 创建用户
我们知道MySQL用户的密码长度是由validate_password_length决定的,而validate_password_length的计算公式是:
validate_password_length = validate_password_number_count + validate_password_special_char_count + (2 * validate_password_mixed_case_count)
因为这是开发测试环境,我们设置validate_password_policy=0(仅仅对密码长度开启认证);另外,再设置validate_password_length=1(密码长度只要大于等于1即可)。
set global validate_password_policy=0;set global validate_password_length=1;CREATE USER 'flinkuser' IDENTIFIED BY 'flinkpwd';GRANT SELECT,INSERT,UPDATE,DELETE,CREATE,DROP,RELOAD,SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser' IDENTIFIED BY 'flinkpwd';FLUSH PRIVILEGES;2.4.3 开启MySQL数据库的binlog功能
编辑/etc/my.conf,开启flinkcdc这个Database的flinkcdc功能。
[bigdata@bigdata12 ~]$ sudo vi /etc/my.cnf# 开启flinkcdc这个Database的flinkcdc功能binlog-do-db=flinkcdc
开启binlog之后,需要重启MySQL数据库
# 重启MySQL服务[bigdata@bigdata12 ~]$ sudo systemctl restart mysqld.service
查看当前的最新binlog数据
创建表
create table dept( deptno int primary key, dname varchar(20), loc varchar(30));
再次查看binlog日志大小,说明binlog日志生效
2.5 完整的maven配置
public class MySqlBinlogFlinkCDCStream { public static void main(String[] args) throws Exception { // 1 通过FlinkCDC构建sourceDatabase DebeziumSourceFunction
对于消费MySQL的CDC程序而言,StartupOptions有两种选择:
initial():默认方式,从snapshot开始初始化,一直读取到最新的binlog日志。latest():程序在启动时,仅仅从最新的binlog日志开始读取数据。2.6.2 ExactlyOnce处理
Flink CDC首先从Snapshot开始读取解析,一直解析到binlog日志最新位置,因此能够提供ExactlyOnce支持。
2.6.3 数据流解析
-- 1 插入两条数据INSERT INTO dept VALUES (10,'ACCOUNTING','NEW YORK'); INSERT INTO dept VALUES (20,'RESEARCH','DALLAS'); -- 2 更新deptno=10的数据UPDATE dept SET loc='BEIJING' WHERE deptno=10;-- 3 删除deptno=20的数据DELETE FROM dept WHERE deptno=20;2.6.3.1 INSERT结构
op的c表示是insert操作标识符,此时有after,没有before。
value = Struct { after = Struct { deptno = 10, dname = ACCOUNTING, loc = NEW YORK }, source = Struct { ts_ms = , db = flinkcdc, table = dept, pos = 1209, row = 0 }, op = c, ts_ms = }2.6.3.2 UPDATE结构
op的u表示是update操作标识符,此时既有before又有after。
value = Struct { before = Struct { deptno = 10, dname = ACCOUNTING, loc = NEW YORK }, after = Struct { deptno = 10, dname = ACCOUNTING, loc = BEIJING }, source = Struct { ts_ms = , db = flinkcdc, table = dept, pos = 1783, row = 0 }, op = u, ts_ms = }2.6.3.3 DELETE结构
op的d表示是delete操作标识符,此时只有before,没有after。
value = Struct { before = Struct { deptno = 20, dname = RESEARCH, loc = DALLAS }, source = Struct { ts_ms = , db = flinkcdc, table = dept, pos = 2097, row = 0 }, op = d, ts_ms = }2.7 字段类型介绍
MySQL字段类型
FlinkSQL字段类型
TINYINT
TINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT
MEDIUMINT
SMALLINT UNSIGNED
INT
BIGINT
INT UNSIGNED
BIGINT
BIGINT UNSIGNED
DECIMAL(20, 0)
BIGINT
BIGINT
FLOAT
FLOAT
DOUBLE
DOUBLE PRECISION
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN
TINYINT(1)
BOOLEAN
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIMEZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)]
TIMESTAMP [(p)]
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
CHAR(n)
VARCHAR(n)
TEXT
STRING
BINARY
VARBINARY
BLOB
BYTES
2.8 Flink SQL版本源码
public class MySqlBinlogFlinkCDCSQL { public static void main(String[] args) throws Exception { // 1 创建Flink的执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 2 使用FlinkSQL DDL方式创建CDC表 tableEnv.executeSql("CREATE TABLE dept(" + "deptno INT primary key," + "dname STRING," + "loc STRING" + ") WITH (" + "'connector' = 'mysql-cdc'," + "'hostname' = 'bigdata12'," + "'port' = '3306'," + "'username' = 'flinkuser'," + "'password' = 'flinkpwd'," + "'database-name' = 'flinkcdc'," + "'table-name' = 'dept'"+ ")"); // 3 查询数据并转换为流输出 Table table = tableEnv.sqlQuery("SELECT * FROM dept"); DataStream
这篇文章介绍了如何开启和使用Flink-CDC2.0,并附有测试通过的源代码,欢迎大家评论、转发。
标签: Flink
②文章观点仅代表原作者本人不代表本站立场,并不完全代表本站赞同其观点和对其真实性负责。
③文章版权归原作者所有,部分转载文章仅为传播更多信息、受益服务用户之目的,如信息标记有误,请联系站长修正。
④本站一律禁止以任何方式发布或转载任何违法违规的相关信息,如发现本站上有涉嫌侵权/违规及任何不妥的内容,请第一时间反馈。发送邮件到 88667178@qq.com,经核实立即修正或删除。