MySQL到Elasticsearch数据同步
一、使用logstash进行同步
下载logstash,地址:https://www.elastic.co/downloads/logstash,我这里使用的7.11.1版本,解压后文件如下:
其中mysql文件夹是自己导的,因为使用的是mysql8.0以及mysql-connector-java-8.0.20这个jar包
到导入的ealsticsearch的mysql表的结构如下:
bin文件夹下创建Logstash配置文件,文件名为mysql.conf
内容配置如下:
更多详细的配置:
- jdbc_driver_library: jdbc驱动的路径,在上一步中已经下载
- jdbc_driver_class: 驱动类的名字,mysql填com.mysql.jdbc.Driver
- jdbc_connection_string: mysql 地址
- jdbc_user: mysql 用户
- jdbc_password: mysql密码
- schedule: 执行sql时机,类似 crontab 的调度,上面配置表示每分钟刷新一次。
- statement: 要执行的sql,以 “:” 开头是定义的变量,可以通过parameters 来设置变量,这里的sql_last_value是内置的变量,表示上一次sql执行中> -> update_time的值
- statement_filepath:和上面statement参数二选一,存放需要执行的SQL语句的文件位置,适用于多个sql语句的场景。
- use_column_value: 使用递增列的值
- tracking_column_type: 递增字段的类型,numeric表示数值类型,
- timestamp 表示时间戳类型
- tracking_column: 递增字段的名称,这里使用updatetime这一列,这列的类型是timestamp
- last_run_metadata_path: 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改
- index: 导入到es中的index名,这里我直接设置成了mysql表的名字
- document_id: 导入到es中的文档id,这个需要设置成主键,否则同一条记录更新后在es中会出现两条记录,%{id} 表示引用mysql表中id字段的值
es新建poem索引:(注意跟mysql的数据段名保持一致)
#创建索引 PUT /poem {"settings": {"number_of_shards": 1,"number_of_replicas": 0}, "mappings":{"properties":{"id":{"type":"text"},"name":{"type":"text"},"author":{"type":"text"},"type":{"type": "text"},"content":{"type":"text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"href":{"type": "text"},"authordes":{"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"origin":{"type":"text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"categoryId":{"type": "text"}}} }进入bin目录,启动logstash服务,开始同步mysql数据到es
二、使用阿里云开源工具Canal
canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议,mysql master收到dump请求,开始推送binary log给slave(也就是canal)
canal解析binary log对象(原始为byte流)
简单来说,Canal 会将自己伪装成 MySQL 从节点(Slave),并从主节点(Master)获取 Binlog,解析和贮存后供下游消费端使用。Canal 包含两个组成部分:服务端和客户端。服务端负责连接至不同的 MySQL 实例,并为每个实例维护一个事件消息队列;客户端则可以订阅这些队列中的数据变更事件,处理并存储到数据仓库中。下面我们来看如何快速搭建起一个 Canal 服务。
MySQL 默认没有开启 Binlog,因此我们需要对 my.cnf 文件做以下修改:
server-id = 1 log_bin = /path/to/mysql-bin.log binlog_format = ROW注意 binlog_format 必须设置为 ROW, 因为在 STATEMENT 或 MIXED 模式下, Binlog 只会记录和传输
SQL 语句(以减少日志大小),而不包含具体数据,我们也就无法保存了。
从节点通过一个专门的账号连接主节点,这个账号需要拥有全局的 REPLICATION 权限。我们可以使用 GRANT 命令创建这样的账号:
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';从 GitHub 项目发布页中下载 Canal 服务端代码(https://github.com/alibaba/canal/releases )
配置文件在 conf 文件夹下,有以下目录结构:
canal.deployer/conf/canal.properties
canal.deployer/conf/instanceA/instance.properties
canal.deployer/conf/instanceB/instance.properties
conf/canal.properties 是主配置文件,如其中的 canal.port 用以指定服务端监听的端口。instanceA/instance.properties 则是各个实例的配置文件,主要的配置项有:
# slaveId 不能与 my.cnf 中的 server-id 项重复 canal.instance.mysql.slaveId = 1234 canal.instance.master.address = 127.0.0.1:3306 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.connectionCharset = UTF-8 # 订阅实例中所有的数据库和表 canal.instance.filter.regex = .*\\..*从服务端消费变更消息时,我们需要创建一个 Canal 客户端,指定需要订阅的数据库和表,并开启轮询。
参考文章
总结
以上是生活随笔为你收集整理的MySQL到Elasticsearch数据同步的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: 服务器 Font family [‘sa
- 下一篇: python拼接mysql时遇到unsu