canal1.1.5 将MySQL变化数据发送到kafka(单机模式)

模式

需要修改成 kafka 模式

修改配置文件

cd /program/canal.deployer-1.1.5

canal.properties

vim conf/canal.properties

修改成下面配置:

# kafka 模式
canal.serverMode = kafka

# 配置kafka服务器ip、port
kafka.bootstrap.servers = hadoop1:9092

instance.properties

vim conf/example/instance.properties

修改成下面配置:

# 唯一id
canal.instance.mysql.slaveId=100

# MySQL的ip、port
canal.instance.master.address=hadoop1:3306

# MySQL用户名
canal.instance.dbUsername=canal
# MySQL密码
canal.instance.dbPassword=canal

# kafka的topic
canal.mq.topic=canal

重启canal

cd /program/canal.deployer-1.1.5
bin/stop.sh
bin/startup.sh

启动 zookeeper

zookeeper-3.4.x 单机启动、停止服务、查看服务状态、jps

启动 kafka

kafka_2.12-2.4.x-启动、停止(单机)

启动 kafka 客户端

Offset Explorer(推荐)

操作见 kafka可视化工具 offset explorer

注意: 由于MySQL没有数据变化,所以还没有 canal 主题

自带客户端

cd cd /program/kafka_2.12-2.4.1/
bin/kafka-console-consumer.sh --bover hadoop1:9092 --topic canal

注意: 由于MySQL没有数据变化,所以还没有 canal 主题

测试

增加

执行下面sql:

INSERT INTO `scott`.`dept`(`deptno`, `dname`, `loc`) VALUES (60, '测试部', 'NEW YORK');

kafka接收数据如下:

{
  "data": [                 # 变化数据,数组类型
    {
      "deptno": "60",
      "dname": "测试部",
      "loc": "NEW YORK"
    }
  ],
  "database": "scott",      # 数据库名
  "es": 1656028167000,
  "id": 3,
  "isDdl": false,
  "mysqlType": {
    "deptno": "int(4)",
    "dname": "varchar(14)",
    "loc": "varchar(13)"
  },
  "old": null,
  "pkNames": [
    "deptno"
  ],
  "sql": "",
  "sqlType": {
    "deptno": 4,
    "dname": 12,
    "loc": 12
  },
  "table": "dept",         # 表名
  "ts": 1656028168060,
  "type": "INSERT"         # 操作类型
}

修改

执行下面sql:

update `scott`.`dept` set loc='伦敦' where deptno = 60

kafka接收数据如下:

{
  "data": [             # 新数据,数组类型
    {
      "deptno": "60",
      "dname": "测试部",
      "loc": "伦敦"
    }
  ],
  "database": "scott",      # 数据库名
  "es": 1656028259000,
  "id": 4,
  "isDdl": false,
  "mysqlType": {
    "deptno": "int(4)",
    "dname": "varchar(14)",
    "loc": "varchar(13)"
  },
  "old": [                # 旧数据
    {
      "loc": "NEW YORK"
    }
  ],
  "pkNames": [
    "deptno"
  ],
  "sql": "",
  "sqlType": {
    "deptno": 4,
    "dname": 12,
    "loc": 12
  },
  "table": "dept",             # 表名
  "ts": 1656028259585,
  "type": "UPDATE"             # 操作类型
}

删除

执行下面sql:

delete from scott.dept where deptno = 60

kafka接收数据如下:

{
  "data": [                # 变化数据,数组类型
    {
      "deptno": "60",
      "dname": "测试部",
      "loc": "伦敦"
    }
  ],
  "database": "scott",      # 数据库名
  "es": 1656028325000,
  "id": 5,
  "isDdl": false,
  "mysqlType": {
    "deptno": "int(4)",
    "dname": "varchar(14)",
    "loc": "varchar(13)"
  },
  "old": null,
  "pkNames": [
    "deptno"
  ],
  "sql": "",
  "sqlType": {
    "deptno": 4,
    "dname": 12,
    "loc": 12
  },
  "table": "dept",             # 表名
  "ts": 1656028325542,
  "type": "DELETE"             # 操作类型
}

一条sql影响多行

执行下面sql:

INSERT INTO `scott`.`dept`(`deptno`, `dname`, `loc`) VALUES (61, '人力部', '广州'),(62, '开发部', '天津');

kafka接收数据如下:

{
  "data": [                # 变化数据,数据类型,还有多个变化数据
    {
      "deptno": "61",
      "dname": "人力部",
      "loc": "广州"
    },
    {
      "deptno": "62",
      "dname": "开发部",
      "loc": "天津"
    }
  ],
  "database": "scott",      # 数据库名
  "es": 1656028504000,
  "id": 6,
  "isDdl": false,
  "mysqlType": {
    "deptno": "int(4)",
    "dname": "varchar(14)",
    "loc": "varchar(13)"
  },
  "old": null,
  "pkNames": [
    "deptno"
  ],
  "sql": "",
  "sqlType": {
    "deptno": 4,
    "dname": 12,
    "loc": 12
  },
  "table": "dept",             # 表名
  "ts": 1656028505186,
  "type": "INSERT"             # 操作类型
}

原文出处:https://www.malaoshi.top/show_1IX3YOJAyBHT.html