pos机的TS是什么意思(想要使用Flink-CDC20,掌握这一篇图文就够了)

快鱼网 27 0

一 CDC概述

CDC的全称是Change Data Capture,翻译过来就是“变动数据捕获”。它的核心思想就是,检测并捕获数据库的变动(包括数据的插入、更新和删除等操作),把这些数据变更按发生的时间顺序记录下来,写入到消息中间件(Kafka、Pulsar等)供其他应用订阅、消费。

1.1 CDC的应用场景1.1.1 数据同步、备份和容灾

在MySQL、TiDB、PostgreSQL等数据库之间互相同步数据,可以通过CDC计算将这些数据同步到Kafka等消息中间件中,然后再通过Flink、Spark、ES等技术消费Kafka中的数据,供数据分析使用。使用这些工具订阅感兴趣的数据表变更,而不需要直接把分析流程接入到业务系统,起到解耦的作用。

也可以通过CDC技术对数据库进行备份。

1.1.2 数据采集

面向数据湖或者数据仓库源数据的采集工作。

1.1.3 微服务之间共享数据状态

CDC可以作为微服务数据之间共享的一种解决方案,可以通过CDC获取其他微服务数据库的变更,从而获取数据状态的更新,完成自己对应的逻辑。

1.2 常见的开源CDC技术对比1.2.1 maxwell

maxwell专门用来实时解析MySQL的Binlog日志,并生成Json格式的消息。作为生成者将消息发送到Kafka、Kinesis、RabbitMQ等消息队列中。它的常见应用场景有ETL、维护缓存、收集表级别的DML操作。maxwell提供以下功能:

支持SELECT * FROM table的方式进行全量数据初始化。支持在主库发生failover之后,自动恢复Binlog位置(GTID)。可以伪装为Slave,接收binlog events,然后根据schemas信息拼装,可以接受ddl、xid、row等各种event。1.2.2 Debezium

Debezium 是一个变更数据捕获 (CDC) 平台,它通过重用 Kafka 和 Kafka Connect 来实现其持久性、可靠性和容错质量。部署到 Kafka Connect 分布式、可扩展、容错服务的每个连接器监控单个上游数据库服务器,捕获所有更改并将它们记录在一个或多个 Kafka 主题中(通常每个数据库表一个主题)。支持监听 MySQL,MongoDB,PostgreSQL,Oracle,SQL Server 等数据库的变化。

1.2.3 Canal

主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。在canal1.1.4版本迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力。

1.2.4 Flink CDC

Flink CDC Connectors内部封装了Debezium特性,可以使用Flink CDC的方式替代canal+kafka的方式,直接通过sql的方式来实现对mysql数据的同步。

二 Flink CDC2.0简单上手例子

Flink在1.11版本开始引入Flink CDC功能,并且同时支持Table和SQL两种形式,Flink SQL CDC基于Debezium实现的,能够对CDC数据进行实时解析同步。当启动MySQL CDC源时,它将获取一个全局读取锁(FLUSH TABLES WITH READ LOCK),该锁将阻止其他数据库的写入,然后读取当前binlog位置以及数据库和表的schema,之后将释放全局读取锁。然后它扫描数据库表并从先前记录的位置读取binlog,Flink将定期执行checkpoints以记录binlog位置。如果发生故障,作业将重新启动并从checkpoint完成的binlog位置恢复,因此它保证了仅一次的语义。

Dynamic Table是Flink内部定义的表,它和流式可以相互转化的。可以简单的理解为:每张MySQL表对应一个Binlog日志,Binlog日志随着MySQL表的变化而变化,Dynamic Table相当于Binlog日志流在某一时刻的物化结果。在Flink中,数据从一个算子流向另外一个算子的时候,都是以Changelog Stream的格式发送到下游,在下游我们可以将其翻译成一张表或者一条流进行操作。

2.1 社区支持的数据库连接

2.2 CDC和Flink对应版本介绍

2.3 FlinkCDC2.0的maven配置

我们使用的是Flink1.13.0,因此选用FlinkCDC2.0

com.ververica flink-connector-mysql-cdc 2.0.02.4 MySQL数据源的准备工作2.4.1 新建MySQL的DataBase

2.4.2 创建用户

我们知道MySQL用户的密码长度是由validate_password_length决定的,而validate_password_length的计算公式是:

validate_password_length = validate_password_number_count + validate_password_special_char_count + (2 * validate_password_mixed_case_count)

因为这是开发测试环境,我们设置validate_password_policy=0(仅仅对密码长度开启认证);另外,再设置validate_password_length=1(密码长度只要大于等于1即可)。

set global validate_password_policy=0;set global validate_password_length=1;CREATE USER 'flinkuser' IDENTIFIED BY 'flinkpwd';GRANT SELECT,INSERT,UPDATE,DELETE,CREATE,DROP,RELOAD,SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser' IDENTIFIED BY 'flinkpwd';FLUSH PRIVILEGES;2.4.3 开启MySQL数据库的binlog功能

编辑/etc/my.conf,开启flinkcdc这个Database的flinkcdc功能。

[bigdata@bigdata12 ~]$ sudo vi /etc/my.cnf# 开启flinkcdc这个Database的flinkcdc功能binlog-do-db=flinkcdc

开启binlog之后,需要重启MySQL数据库

# 重启MySQL服务[bigdata@bigdata12 ~]$ sudo systemctl restart mysqld.service

查看当前的最新binlog数据

创建表

create table dept( deptno int primary key, dname varchar(20), loc varchar(30));

再次查看binlog日志大小,说明binlog日志生效

2.5 完整的maven配置

1.13.0 org.apache.flink flink-java ${flink-version} org.apache.flink flink-streaming-java_2.12 ${flink-version} org.apache.flink flink-clients_2.12 ${flink-version} mysql mysql-connector-java 5.1.44 org.apache.flink flink-table-planner-blink_2.12 ${flink-version} com.ververica flink-connector-mysql-cdc 2.0.0 com.alibaba fastjson 1.2.75 2.6 Flink 流式代码

public class MySqlBinlogFlinkCDCStream { public static void main(String[] args) throws Exception { // 1 通过FlinkCDC构建sourceDatabase DebeziumSourceFunction sourceDatabase = MySqlSource.builder() .hostname("bigdata12") .port(3306) // 需要监控的database .databaseList("flinkcdc") .username("flinkuser") .password("flinkpwd") // 反序列化 .deserializer(new StringDebeziumDeserializationSchema()) .startupOptions(StartupOptions.initial()) .build(); // 2 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(50_000); DataStreamSource dataStreamSource = env.addSource(sourceDatabase); // 3 打印数据 dataStreamSource.print(); // 4 启动任务 env.execute(); }}2.6.1 阅读位置(Position)设置

对于消费MySQL的CDC程序而言,StartupOptions有两种选择:

initial():默认方式,从snapshot开始初始化,一直读取到最新的binlog日志。latest():程序在启动时,仅仅从最新的binlog日志开始读取数据。2.6.2 ExactlyOnce处理

Flink CDC首先从Snapshot开始读取解析,一直解析到binlog日志最新位置,因此能够提供ExactlyOnce支持。

2.6.3 数据流解析

-- 1 插入两条数据INSERT INTO dept VALUES (10,'ACCOUNTING','NEW YORK'); INSERT INTO dept VALUES (20,'RESEARCH','DALLAS'); -- 2 更新deptno=10的数据UPDATE dept SET loc='BEIJING' WHERE deptno=10;-- 3 删除deptno=20的数据DELETE FROM dept WHERE deptno=20;2.6.3.1 INSERT结构

op的c表示是insert操作标识符,此时有after,没有before。

value = Struct { after = Struct { deptno = 10, dname = ACCOUNTING, loc = NEW YORK }, source = Struct { ts_ms = , db = flinkcdc, table = dept, pos = 1209, row = 0 }, op = c, ts_ms = }2.6.3.2 UPDATE结构

op的u表示是update操作标识符,此时既有before又有after。

value = Struct { before = Struct { deptno = 10, dname = ACCOUNTING, loc = NEW YORK }, after = Struct { deptno = 10, dname = ACCOUNTING, loc = BEIJING }, source = Struct { ts_ms = , db = flinkcdc, table = dept, pos = 1783, row = 0 }, op = u, ts_ms = }2.6.3.3 DELETE结构

op的d表示是delete操作标识符,此时只有before,没有after。

value = Struct { before = Struct { deptno = 20, dname = RESEARCH, loc = DALLAS }, source = Struct { ts_ms = , db = flinkcdc, table = dept, pos = 2097, row = 0 }, op = d, ts_ms = }2.7 字段类型介绍

MySQL字段类型

FlinkSQL字段类型

TINYINT

TINYINT

SMALLINT

TINYINT UNSIGNED

SMALLINT

INT

MEDIUMINT

SMALLINT UNSIGNED

INT

BIGINT

INT UNSIGNED

BIGINT

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE PRECISION

DOUBLE

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

TINYINT(1)

BOOLEAN

DATE

DATE

TIME [(p)]

TIME [(p)] [WITHOUT TIMEZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)] WITH LOCAL TIME ZONE

CHAR(n)

VARCHAR(n)

TEXT

STRING

BINARY

VARBINARY

BLOB

BYTES

2.8 Flink SQL版本源码

public class MySqlBinlogFlinkCDCSQL { public static void main(String[] args) throws Exception { // 1 创建Flink的执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 2 使用FlinkSQL DDL方式创建CDC表 tableEnv.executeSql("CREATE TABLE dept(" + "deptno INT primary key," + "dname STRING," + "loc STRING" + ") WITH (" + "'connector' = 'mysql-cdc'," + "'hostname' = 'bigdata12'," + "'port' = '3306'," + "'username' = 'flinkuser'," + "'password' = 'flinkpwd'," + "'database-name' = 'flinkcdc'," + "'table-name' = 'dept'"+ ")"); // 3 查询数据并转换为流输出 Table table = tableEnv.sqlQuery("SELECT * FROM dept"); DataStream> deptStream = tableEnv.toRetractStream(table, Row.class); deptStream.print(); // 4 启动Flink程序 env.execute(); }}三 FlinkSQL和Flink DataStream在CDC2.0方面的对比3.1 FlinkSQLFlinkSQL只能在Flink1.13版本使用。只能监控单个表的变更。反序列化功能已经自动完成,可以非常方便转换为JavaBean使用,或者直接通过SQL访问。3.2 Flink DataStreamFlink DataStream可以在1.12和1.13两个版本使用。可以同时监控多库(database)、多表(table)的变更。默认的反序列化器StringDebeziumDeserializationSchema使用起来不是特别方便,我们通常需要自定义反序列化器。四 总结

这篇文章介绍了如何开启和使用Flink-CDC2.0,并附有测试通过的源代码,欢迎大家评论、转发。

标签: Flink

抱歉,评论功能暂时关闭!