Flink CDC2.0 数据格式和op事件分类(c、u、d、r) 作者:马育民 • 2025-08-13 18:16 • 阅读:10009 # 介绍 Flink CDC(Change Data Capture)用于捕获数据库的变更事件,其事件分类主要基于数据库操作类型和数据变更的生命周期。 Flink CDC 会将捕获到的变更转换为统一格式的事件,常见分类如下: - c:插入数据 - u:更新数据 - d:删除数据 - r:全量读取 # 增量同步 这是最基础的分类,对应数据库的 `INSERT`、`UPDATE`、`DELETE` 操作,是业务处理中最常关注的事件类型。 ### INSERT 事件 - **触发场景**:当数据库表中插入新记录时产生。 - **事件特征**:包含新插入记录的**完整字段和值**。 **数据格式:** ```json { "before": null, // 插入前无数据 "after": { // 插入后的完整记录 "id": 1, "name": "Alice", "age": 20 }, "op": "c", // 操作标识(c = create) "ts_ms": 1620000000000 // 事件时间戳 } ``` ### UPDATE 事件 - **触发场景**:当数据库表中已有记录被修改时产生。 - **事件特征**:包含修改**前的旧值**(`before`)和修改**后的新值**(`after`),便于追踪变更细节。 **数据格式:** ```json { "before": { // 修改前的记录 "id": 1, "name": "Alice", "age": 20 }, "after": { // 修改后的记录 "id": 1, "name": "Alice", "age": 21 }, "op": "u", // 操作标识(u = update) "ts_ms": 1620000001000 } ``` ### DELETE 事件 - **触发场景**:当数据库表中记录被删除时产生。 - **事件特征**:包含被删除记录的**完整旧值**(`before`),`after` 为 `null`。 **数据格式:** ```json { "before": { // 被删除的记录 "id": 1, "name": "Alice", "age": 21 }, "after": null, // 删除后无数据 "op": "d", // 操作标识(d = delete) "ts_ms": 1620000002000 } ``` # 全量同步 - **触发场景**:全量同步开始和结束时产生。 - **作用**:通知下游系统“全量同步开始”或“全量同步结束”,便于区分全量和增量数据。 **数据格式:** ```json // 快照开始 { "source": { "snapshot": "start" }, "op": "r", // r = read(全量读取) "ts_ms": 1620000003000 } // 快照结束 { "source": { "snapshot": "end" }, "op": "r", "ts_ms": 1620000004000 } ``` # 事件标识处理 在 Flink 代码中,可通过事件的 `op` 字段(操作标识)区分事件类型,进行针对性处理: **提示:**伪代码,使用时需要根据情况改写代码 ```java // 假设从 CDC 源获取的数据流为 DataStream dataStream.process(new ProcessFunction() { @Override public void processElement(RowData row, Context ctx, Collector out) { String op = row.getString(row.getFieldIndex("op")).toString(); switch (op) { case "c": // INSERT 事件 out.collect("新增数据: " + row); break; case "u": // UPDATE 事件 out.collect("修改数据: " + row); break; case "d": // DELETE 事件 out.collect("删除数据: " + row); break; case "r": // 全量同步事件 out.collect("全量数据: " + row); break; } } }); ``` 原文出处:http://www.malaoshi.top/show_1GW1fYMNlQHW.html