阅读:61
This is Maxwell’s daemon, an application that reads MySQL binlogs and writes row updates as JSON to Kafka, Kinesis, or other streaming platforms. Maxwell has low operational overhead, requiring nothing but mysql and a place to write to. Its common use cases include ETL, cache building/expiring, metrics collection, search indexing and inter-service communication. Maxwell gives you some of the benefits of event sourcing without having to re-architect your entire platform. (官网简介)
Maxwell 主要用于实时数据同步
当数据量和并发量特别高时,单个节点的MySQL就难以承受并发量。因此我们就需要对单个节点MySQL进行扩展,形成MySQL集群。MySQL集群中主节点为Master,从节点为slave。
这里进行主从复制后,我们就可以将读和写进行分离,master主库用于写,slave从库用于读,有效解决高并发。并且进行数据的多个备份,保障了数据的安全。
相应细节:
① Master主库将改变记录,写到二进制日志(binary log)中;
② Slave从库向mysql master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log);
③ Slave从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。
Maxwell的工作原理很简单,就是把自己伪装成MySQL的一个slave,然后以slave的身份假装从MySQL(master)复制数据。
对比 | Canal | Maxwell |
---|---|---|
语言 | java | java |
数据格式 | 格式自由 | json |
采集数据模式 | 增量 | 全量/增量 |
数据落地 | 定制 | 支持kafka等多种平台 |
注意:1.30版本以后支持JDK11,不支持JDK1.8
这里我们下载1.29.2
1)上传到Linux系统
2)解压到指定目录
[hadoop@hadoop101 software]$ tar -zxvf maxwell-1.29.2.tar.gz -C ../module/
[hadoop@hadoop101 module]$ ll | grep max
drwxrwxr-x. 4 hadoop hadoop 200 Jan 28 11:42 maxwell-1.29.2
MySql需要开启binlog设置
(1)修改mysql的配置文件,开启MySQL Binlog设置
[hadoop@hadoop101 module]$ sudo vim /etc/my.cnf
[sudo] password for hadoop:
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
[mysqld]
# 添加下面内容
server_id=1
log-bin=mysql-bin
binlog_format=row
#binlog-do-db=test_maxwell
重启MySQL
[hadoop@hadoop101 module]$ systemctl restart mysqld
登录MySQL,查看:
(2)进入/var/lib/mysql目录,查看MySQL生成的binlog文件
注:MySQL生成的binlog文件初始大小一定是154字节,然后前缀是log-bin参数配置的,后缀是默认从.000001,然后依次递增。除了binlog文件文件以外,MySQL还会额外生产一个.index索引文件用来记录当前使用的binlog文件。
(1)在MySQL中建立一个maxwell库用于存储Maxwell的元数据
mysql> CREATE DATABASE maxwell;
(2)设置mysql用户密码安全级别
mysql> set global validate_password_length=4;
mysql> set global validate_password_policy=0;
(3)分配账号操作该数据库
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '123456';
(4)分配这个账号可以监控其他数据库的权限
mysql> GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%';
(5)刷新mysql权限
mysql> flush privileges;
方式一:使用命令行参数
[hadoop@hadoop101 maxwell-1.29.2]$ bin/maxwell \
> --user='maxwell' \
> --password='123456' \
> --host='localhost' \
> --producer=stdout
# --user 数据库的用户名
# --password 登录密码
# --host 数据库主机
# --producer 生产者模式:stdout:控制台 kafka:kafka集群
# 完整指令如下:
bin/maxwell --user='maxwell' --password='123456' --host='localhost' --producer=stdout
方式二:通过配置文件启动
[hadoop@hadoop101 maxwell-1.29.2]$ cp config.properties.example config.properties
[hadoop@hadoop101 maxwell-1.29.2]$ vim config.properties
启动maxwell
[hadoop@hadoop101 maxwell-1.29.2]$ bin/maxwell --config ./config.propertie
## --config 指定启动的配置文件
这里启动hive并做了插入操作,可以看出,数据已经监控到了。
编写启动命令:
[hadoop@hadoop101 maxwell-1.29.2]$ bin/maxwell \
> --user='maxwell' \
> --password='123456' \
> --host='localhost' \
> --producer=stdout
向MySQL中插入数据:
mysql> create database testmaxwell;
Query OK, 1 row affected (0.00 sec)
mysql> use testmaxwell;
Database changed
mysql> create table emplyee(id int,name varchar(255));
Query OK, 0 rows affected (0.10 sec)
mysql> insert into emplyee values(1,'tom');
Query OK, 1 row affected (0.00 sec)
mysql> insert into emplyee values(2,'jerry');
Query OK, 1 row affected (0.00 sec)
mysql> insert into emplyee values(3,'tony');
Query OK, 1 row affected (0.00 sec)
可以看到Maxwell控制台:
相应数据以json格式发送到控制台来了。
1)、启动kafka集群
kafka集群启动完成。
2)、创建kafka主题
[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 2 --partitions 2 --topic maxwell
Created topic "maxwell".
3)、为方便展示,开始kafka消费者
[hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic maxwell
4)、编写maxwell启动命令
[hadoop@hadoop101 maxwell-1.29.2]$ bin/maxwell \
> --user='maxwell' \
> --password='123456' \
> --host='hadoop101' \
> --producer=kafka \
> --kafka.bootstrap.servers=hadoop101:9092 \
> --kafka_topic=maxwell
我们向mysql的表中添加数据
kafka消费者控制台输出数据:
注:
① 若需要指定监控某些表,可使用–filter参数,过滤表。Filtering - Maxwell’s Daemon (maxwells-daemon.io) ② 对数据库进行初始全量导入数据:
需要修改Maxwell的元数据,触发数据初始化机制,在mysql的maxwell库中bootstrap表中插入一条数据,写明需要全量数据的库名和表名,可在官网参考配置。