监控 MySQL Binlog 使用 Flink 将数据实时写入 Kafka
一、前言
在实时数据处理场景中,很多企业需要将 MySQL 数据库中的变化(新增、修改、删除)实时同步到下游系统,例如 Kafka、Elasticsearch、HBase 等,以实现数据分析、监控预警或搜索查询。
本文将介绍如何通过 Flink CDC(Change Data Capture)监控 MySQL Binlog,并将变更数据实时写入 Kafka,实现准实时数据同步。
二、整体架构
实现流程如下: MySQL(Binlog) –> Flink CDC –> Kafka(Topic) –> 下游消费者
- MySQL 需要开启 binlog,并设置为 ROW 格式。
- Flink CDC 使用 ververica/flink-cdc-connectors 直接解析 MySQL Binlog。
- Kafka 用作消息中间件,方便数据被下游系统订阅消费。
三、环境准备
1. MySQL 配置
修改 my.cnf(Linux 可能是 /etc/my.cnf 或 /etc/mysql/mysql.conf.d/mysqld.cnf):
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL
执行以下命令,给 Flink 连接用户授权:
CREATE USER \’flink\’@\’%\’ IDENTIFIED BY \’flink123\’;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO \’flink\’@\’%\’;
FLUSH PRIVILEGES;
2. Kafka 启动
如果本地是单机测试,可以直接用
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
创建 Topic:
bin/kafka-topics.sh –create –topic mysql_binlog_topic –bootstrap-server localhost:9092 –partitions 3 –replication-factor 1
3. Maven 依赖
在 Flink 项目中引入必要依赖(以 Flink 1.15 为例):
<dependencies>
<!— Flink 核心依赖 —>
<dependency>
<groupId>org.apache.flink</groupId
评论前必须登录!
注册