欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

flink二阶提交(没有搞完)

发布时间:2023/12/31 编程问答 37 豆豆
生活随笔 收集整理的这篇文章主要介绍了 flink二阶提交(没有搞完) 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

举例说明

甲乙丙丁四人要组织一个会议,需要确定会议时间,不妨设甲是协调者,乙丙丁是参与者。

投票阶段:

(1)甲发邮件给乙丙丁,周二十点开会是否有时间;

(2)甲回复有时间;

(3)乙回复有时间;

(4)丙迟迟不回复,此时对于这个活动,甲乙丙均处于阻塞状态,算法无法继续进行;

(5)丙回复有时间(或者没有时间);

提交阶段:

(1)协调者甲将收集到的结果反馈给乙丙丁(什么时候反馈,以及反馈结果如何,在此例中取决与丙的时间与决定);

(2)乙收到;

(3)丙收到;

(4)丁收到;

 

 

来源

代码以[4]为主,我自己修正了一些bug,增加了依赖。

集群准备工作

启动Zookeeper集群、Kafka集群、Flink集群。

流程图

 

 

 

Mysql准备工作

create database test;

use test;

CREATE TABLE `mysqlExactlyOnce_test` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `value` varchar(255) DEFAULT NULL,
  `insert_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

 

文件说明与联系

文件说明需要设定的参数
DBConnectUtil.javajava连接数据库
KafkaUtils.java(生产者,顶层文件)生产者broker_list
MysqlExactlyOncePOJO.java一个pojo对象
MySqlTwoPhaseCommitSink.java(被调用)插入数据,制造异常jdbc:mysql://以及sql语句
streamdemokafka2mysql.java(调用MySqlTwoPhaseCommitSink,顶层架构)Flink source与sink

BOOTSTRAP_SERVERS_CONFIG

运行步骤

①运行StreamDemoKafka2Mysql.java
②运行KafkaUtils.java

 

调试用命令

$KAFKA/bin/kafka-topics.sh --list  --zookeeper  Desktop:2181

$KAFKA/bin/kafka-console-consumer.sh  --bootstrap-server Desktop:9091 --from-beginning --topic mysql-exactly-Once-4

修改的主题的话,需要同时在kafkautils.java和StreamDemoKafka2Mysql.java中修改

实验效果

 

 

 

tijiao最终代码如下

 

Reference:

[1]Flink两阶段提交

[2]Flink 之 MySQL二阶提交

[3]解决Flink消费Kafka信息,结果存储在Mysql的重复消费问题

[4]Flink实现Kafka到Mysql的Exactly-Once

[5]Kafka常用命令

[6]一分钟了解两阶段提交2PC(运营MM也懂了)

总结

以上是生活随笔为你收集整理的flink二阶提交(没有搞完)的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。