欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

MySQL -> ES 数据同步 配置步骤

发布时间:2024/4/15 46 豆豆
生活随笔 收集整理的这篇文章主要介绍了 MySQL -> ES 数据同步 配置步骤 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

部署 MySQL -> ES 数据同步

(mysql 同步到 ES 是支持 多表查询 后把结果同步到ES 中的同一个索引中的)

1.服务器:

内网ip:192.168.0.60
登录name+key
实现功能: canal、kafka、es7;canal同步预生产polardb数据到es7

2.ES7 kafka服务

es web 管理url:
http://[internet ip]:9800
es:
公网: [internet ip] 9201
内网: 192.168.0.60 9201

kafka:
kafka-manage
http://[internet ip]:9001

kafka:
公网: [internet ip] 9092
内网: 192.168.0.60 9092
zookeeper:
192.168.0.60 2181

##3.ES 同步相关文件目录如下:

见文章最后

服务器部署列表:

applicationip: portinstall diruser/psd
mysql[mysql_server_ip]:3306rdscanal / 123456
zookeeper192.168.0.60:2181/opt/app/zookeeper-3.4.12#zookeeper
canal.deployer192.168.0.60:1111/opt/app/canal.deployer#canal
canal.adapter192.168.0.60:8081/opt/app/canal.adapter#canal
ES192.168.0.60:9201DOCKERes
kafka192.168.0.60:9092DOCKERkafka

1.安装zookeeper.

配置文件: vi conf/zoo.cfg 主要参数:initLimit=10syncLimit=5clientPort=2181dataDir=/opt/app/zookeeper-3.4.12/datavi conf/log4j.properties #日志类配置zookeeper.log.dir=.zookeeper.log.file=zookeeper.logzookeeper.log.threshold=DEBUGzookeeper.tracelog.dir=.zookeeper.tracelog.file=zookeeper_trace.log启动zookeeper# ./zkServer.sh start ../conf/zoo.cfgZooKeeper JMX enabled by defaultUsing config: ../conf/zoo.cfgStarting zookeeper ... STARTED在zookeeper 中查看同步canal 的信息:zkCli.sh -server localhost:2181ls /otterget /otter/canal/destinations/crm_canal/2/cursor

2.安装配置canal.deployer

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gzcanal,adapter 最好下载同版本。adapter1.1.5 插件需要更新为:client-adapter.es7x-1.1.5-jar-with-dependencies.jar 相关说明如下: canal 1.1.5本身有个Bug存在(https://github.com/alibaba/canal/issues/3636),需要手动替换一个插件mkdir /opt/app/canal.adapter /opt/app/canal.deployer tar zxvf /opt/download/canal.deployer-1.1.5.tar.gz -C /opt/app/canal.deployer tar zxvf /opt/download/canal.adapter-1.1.5.tar.gz -C /opt/app/canal.adapter

2.1 canal deployer 配置文件主要参数:

vi conf/canal.properties # canal.id = 1 #如果是集群,编号要不相同canal.ip = 192.168.0.60 #本地IPcanal.port = 11111 #端口canal.metrics.pull.port = 11112canal.zkServers = 192.168.0.60 #zookeeper 服务器,这里是本地canal.serverMode = kafka #同步方式,kafka 数据流的方式canal.destinations = shop #同步后的目的地名称(在mq中可以查看到)kafka.bootstrap.servers = 192.168.0.60:9092

2.1 canal instance 配置文件主要参数:

cd /opt/app/canal.deplyer/conf
cp example/instance.properties ./shop/
vi shop/instance.properties

# enable gtid use true/falsecanal.instance.gtidon=true# position infocanal.instance.master.address=192.168.0.25:3306canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp=canal.instance.master.gtid=truecanal.instance.dbUsername=canalcanal.instance.dbPassword=123456canal.instance.connectionCharset = UTF-8# mq configcanal.mq.topic=SYNC_ES_shop# dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*canal.mq.partition=0

mysql数据库相关配置

canal.instance.master.address=mysql_server_ip:3306 canal.instance.dbUsername=canal canal.instance.dbPassword= [密码查看其它文件] 待同步数据表 canal.instance.filter.regex=shop.tb_building,shop.tb_article,shop.tb_home_news,shop.tb_home_store_product,shop. tb_travel_product 指定topic canal.mq.topic=SYNC_ES_SHOP

2.2 adapter 参数配置

vi conf/application.yml

canal.conf:
mode: kafka #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:127.0.0.1:2181
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:

srcDataSources:defaultDS:url: jdbc:mysql://mysql_server_ip:3306/mytest?useUnicode=trueusername: sync_user password: 123456canalAdapters:- instance: SYNC_ES_shop # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: es7key: exampleKey#hosts: localhost:9201 # 127.0.0.1:9200 for rest mode ~~#**注意: 此方式无法访问ES ERROR: Illegal character in scheme name at index 0: 127.0.0.1:9201****~~hosts: http://192.168.0.60:9201 # 127.0.0.1:9200 for rest modeproperties:mode: rest # or restcluster.name: docker-cluster
3.2 开启实时同步、全量同步数据到ES

1.启动canal-server
2.启动canal-adapter

3.建立ES 索引

cd /opt/json/es_index_json_file/curl -XPUT -H "Content-Type: application/json" http://127.0.0.1:9201/tb_home_store_product?include_type_name=true -d "@tb_home_store_product.json"curl -XPUT -H "Content-Type: application/json" http://127.0.0.1:9201/tb_travel_product?include_type_name=true -d "@tb_travel_product.json"#查看ES中的索引curl -XGET http://127.0.0.1:9201/_cat/indices?v#删除索引#curl -XDELETE http://127.0.0.1:9201/tb_building

4.全量同步数据到ES

curl -X POST http://127.0.0.1:8081/etl/es7/exampleKey/tb_home_store_product.yml curl -X POST http://127.0.0.1:8081/etl/es7/exampleKey/tb_travel_product.yml

ES web 查询 页面:

http://[internet ip]:9800/
连接内部 ESDH IP: http://192.168.0.60:9201

#以下文件内容在进行同步时,拷贝另存为对应的文件即可

****************************************** tb_home_store_product.json file ***************************************************************

tb_home_store_product.json file

{"mappings":{"home_store_product_doc":{"properties":{"id": {"type": "integer"},"store_name": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"store_info": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"content": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"mark_price": {"type": "double"},"ot_price": {"type": "double"},"sales": {"type": "long"},"ficti": {"type": "long"},"create_by": {"type": "keyword"},"create_time": {"type": "date"},"update_by": {"type": "keyword"},"update_time": {"type": "date"} } } } }

****************************************** tb_home_store_product.yml file ***************************************************************

dataSourceKey: defaultDS destination: SYNC_ES_SHOP_TEST outerAdapterKey: exampleKey groupId: g1 esMapping:_index: tb_home_store_product_id: idsql: "select id,store_name,store_info,content,mark_price,ot_price,sales,ficti,is_show,del_flag,img_size,image,create_by,create_time,update_by,update_time from tb_home_store_product t"etlCondition: "where t.id>={0} and t.id<={1}"commitBatch: 3000

****************************************** tb_travel_product.json file ***************************************************************

{"mappings":{"travel_product_doc":{"properties":{"id": {"type": "integer"},"on_sale": {"type": "integer"},"product_name": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"product_shortname": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"product_price": {"type": "double"},"product_img": {"type": "keyword"},"sale_count": {"type": "long"},"create_by": {"type": "keyword"},"create_time": {"type": "date"},"update_by": {"type": "keyword"},"update_time": {"type": "date"} } } } }

************************************************** tb_travel_product.yml file *******************************************************

###说明:这里可以看出,这个查询是2个表join 后的结果,说明是可以2个表导出到ES 为一个索引的。

dataSourceKey: defaultDS destination: SYNC_ES_SHOP_TEST outerAdapterKey: exampleKey groupId: g1 esMapping:_index: tb_travel_product_id: idsql: "SELECT t.id as id,t.product_no, t.on_sale, t.product_name, t.product_shortname,t.product_price, i. product_img, t.sale_count, t.create_by, t.create_time, t.update_by, t.update_time from tb_travel_product t left join tb_travel_product_info i on i.product_no = t.product_no"etlCondition: "where t.id>={0} and t.id<={1}"commitBatch: 3000

总结

以上是生活随笔为你收集整理的MySQL -> ES 数据同步 配置步骤的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。