欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

MySQL到Elasticsearch数据同步

发布时间:2025/3/15 54 豆豆
生活随笔 收集整理的这篇文章主要介绍了 MySQL到Elasticsearch数据同步 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

一、使用logstash进行同步

下载logstash,地址:https://www.elastic.co/downloads/logstash,我这里使用的7.11.1版本,解压后文件如下:

其中mysql文件夹是自己导的,因为使用的是mysql8.0以及mysql-connector-java-8.0.20这个jar包

到导入的ealsticsearch的mysql表的结构如下:

bin文件夹下创建Logstash配置文件,文件名为mysql.conf

内容配置如下:

input{ stdin{} jdbc{ jdbc_connection_string => "jdbc:mysql://xxx.xxx.xxx.xxx:3306/ELASTICSEARCH?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC" jdbc_user => "xxx" jdbc_password => "xxxxxx" jdbc_driver_library => "D:\logstash-7.11.1\mysql\mysql-connector-java-8.0.20.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" statement => "SELECT * FROM t_poem" schedule => "* * * * *" } } output { stdout { codec => json_lines } elasticsearch { hosts => "http://xxx.xxx.xxx.xxx:9200" #如果elasticsearsh有配置用户密码的话,配下以下 user => xxx password => xxxxxxxxx #事先在es中建立的索引 index => "poem" #使用mysql数据中的id作为es的id,注意,id只是名称,如果数据库中的名称不是id,那么这里也需要修改 document_id => "%{id}" } }

更多详细的配置:

  • jdbc_driver_library: jdbc驱动的路径,在上一步中已经下载
  • jdbc_driver_class: 驱动类的名字,mysql填com.mysql.jdbc.Driver
  • jdbc_connection_string: mysql 地址
  • jdbc_user: mysql 用户
  • jdbc_password: mysql密码
  • schedule: 执行sql时机,类似 crontab 的调度,上面配置表示每分钟刷新一次。
  • statement: 要执行的sql,以 “:” 开头是定义的变量,可以通过parameters 来设置变量,这里的sql_last_value是内置的变量,表示上一次sql执行中> -> update_time的值
  • statement_filepath:和上面statement参数二选一,存放需要执行的SQL语句的文件位置,适用于多个sql语句的场景。
  • use_column_value: 使用递增列的值
  • tracking_column_type: 递增字段的类型,numeric表示数值类型,
  • timestamp 表示时间戳类型
  • tracking_column: 递增字段的名称,这里使用updatetime这一列,这列的类型是timestamp
  • last_run_metadata_path: 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改
  • index: 导入到es中的index名,这里我直接设置成了mysql表的名字
  • document_id: 导入到es中的文档id,这个需要设置成主键,否则同一条记录更新后在es中会出现两条记录,%{id} 表示引用mysql表中id字段的值

es新建poem索引:(注意跟mysql的数据段名保持一致)

#创建索引 PUT /poem {"settings": {"number_of_shards": 1,"number_of_replicas": 0}, "mappings":{"properties":{"id":{"type":"text"},"name":{"type":"text"},"author":{"type":"text"},"type":{"type": "text"},"content":{"type":"text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"href":{"type": "text"},"authordes":{"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"origin":{"type":"text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"categoryId":{"type": "text"}}} }

进入bin目录,启动logstash服务,开始同步mysql数据到es

二、使用阿里云开源工具Canal

canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

  • canal工作原理
  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议,mysql master收到dump请求,开始推送binary log给slave(也就是canal)
    canal解析binary log对象(原始为byte流)

  • Canal 的组成部分
  • 简单来说,Canal 会将自己伪装成 MySQL 从节点(Slave),并从主节点(Master)获取 Binlog,解析和贮存后供下游消费端使用。Canal 包含两个组成部分:服务端和客户端。服务端负责连接至不同的 MySQL 实例,并为每个实例维护一个事件消息队列;客户端则可以订阅这些队列中的数据变更事件,处理并存储到数据仓库中。下面我们来看如何快速搭建起一个 Canal 服务。

  • 配置 MySQL 主节点
  • MySQL 默认没有开启 Binlog,因此我们需要对 my.cnf 文件做以下修改:

    server-id = 1 log_bin = /path/to/mysql-bin.log binlog_format = ROW

    注意 binlog_format 必须设置为 ROW, 因为在 STATEMENT 或 MIXED 模式下, Binlog 只会记录和传输
    SQL 语句(以减少日志大小),而不包含具体数据,我们也就无法保存了。

    从节点通过一个专门的账号连接主节点,这个账号需要拥有全局的 REPLICATION 权限。我们可以使用 GRANT 命令创建这样的账号:

    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
  • 启动 Canal 服务端
    从 GitHub 项目发布页中下载 Canal 服务端代码(https://github.com/alibaba/canal/releases )
    配置文件在 conf 文件夹下,有以下目录结构:
  • canal.deployer/conf/canal.properties
    canal.deployer/conf/instanceA/instance.properties
    canal.deployer/conf/instanceB/instance.properties

    conf/canal.properties 是主配置文件,如其中的 canal.port 用以指定服务端监听的端口。instanceA/instance.properties 则是各个实例的配置文件,主要的配置项有:

    # slaveId 不能与 my.cnf 中的 server-id 项重复 canal.instance.mysql.slaveId = 1234 canal.instance.master.address = 127.0.0.1:3306 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.connectionCharset = UTF-8 # 订阅实例中所有的数据库和表 canal.instance.filter.regex = .*\\..*
  • 编写 Canal 客户端
  • 从服务端消费变更消息时,我们需要创建一个 Canal 客户端,指定需要订阅的数据库和表,并开启轮询。

    参考文章

    总结

    以上是生活随笔为你收集整理的MySQL到Elasticsearch数据同步的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。