探索 Maxwell:高效捕获 MySQL 数据变更的轻量级中间件

一、简介

  • Maxwell 是一个专门为 MySQL 数据库设计的变更数据捕获(CDC)工具
  • 可以读取 MySQL 的二进制日志,并将数据变更以 JSON 格式发布到 Kafka、redis、rabbitmq 等消息队列平台
  • 适用于多种应用场景,如 ETL 处理、审计日志记录、缓存管理、搜索索引构建和服务间通信等
  • 支持在命令行中使用,也提供Docker容器方式启动
  • 可作为canal等工具的替代品,且更轻量、更简单
  • 该工具的开源地址参考:https://github.com/zendesk/maxwell
  • 该工具的工作原理参考下图:

二、安装与使用

1. 配置与启动

1.1. Mysql需要提前开启binlog功能,如果已经开启,这一步可忽略

  • 配置Mysql的my.cnf(或者其他*.cnf),在[mysqld]配置项下增加配置,如下
    [mysqld]
    log_bin=mysql-bin
    server_id=1
  • 重启Mysql
  • 查看是否生效,登录Mysql,输入如下命令
    show master status

    出现binlog文件,即表示已开启binlog日志

1.2. 配置启动Maxwell

  • 新建docker-compose.yml配置文件,配置内容如下
    services:
      maxwell:
        image: zendesk/maxwell:latest
        container_name: maxwell
        command: >
          bin/maxwell
          --host=127.0.0.1
          --port=3306
          --user=root
          --password=root
          --filter='include: *.*'
          --producer=redis
          --redis_host=127.0.0.1
          --redis_port=6379
          --redis_auth=redispass
          --redis_type=lpush
          --redis_key=maxwell
        restart: unless-stopped

    更多配置参数参考:https://maxwells-daemon.io/config/

  • 配置完docker-compose.yml,在目录执行如下命令启动
    docker-compose up -d

    成功启动并监听Mysql截图如下

2. Mysql变更会自动推送到消息队列

  • 在Mysql测试执行一个更新命令,如下

    UPDATE `user` set `name`='test' WHERE id =3
  • 数据会同步到redis的列表中,格式如下

    {
      "database": "test",
      "table": "user",
      "type": "update",
      "ts": 1762166231,
      "xid": 4663,
      "commit": true,
      "data": {
        "id": 3,
        "name": "test",
        "status": 1,
        "created_at": "2023-10-27 11:00:00.000",
        "updated_at": "2023-10-27 11:00:00.000"
      },
      "old": {
        "name": "test3"
      }
    }

  • 自定义消费队列,把数据同步到异构数据库或者更新缓存等
    PHP消费代码示例

    <?php
    $redis = new \Redis();
    $redis->connect('127.0.0.1', '6379');
    $redis->auth('redispass');
    $redis->select(0);
    $key = 'maxwell';
    while (true) {
        try {
            $data = $redis->brPop($key, 5);
            if (isset($data[1])) {
                $info = json_decode($data[1], true);
                // 处理数据更新逻辑
                var_dump($info);
            }
        } catch (\Exception $e) {
            // TODO: 错误处理
        }
    }

    执行PHP程序,消费redis队列数据输出示例截图

三、总结

  • 利用该工具可以轻松实现实时监听Mysql数据变更,及时同步到异构数据或者更新缓存
  • 非侵入式接入,不影响业务代码与逻辑,配置简单,启动方便
  • 开发友好,运行稳定,可以平替canal、debezium等工具
© 版权声明
THE END
喜欢就支持一下吧
点赞11 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容