flink sql client讀取kafka數據的timestamp(DDL方式)
实验目的
Kafka的数据能让Flink SQL Client读取到
本文是对[1]的详细记载
具体操作步骤
①啓動hadoop集羣,離開安全模式
②各个节点都关闭防火墙:
service firewalld status(查看防火墙状态)
service firewalld stop(关闭防火墙)
各个节点分别启动zookeeper
③启动kafka集群
startkafka
startkafka2
startkafka3
④
flink-connector-kafka_2.12-1.12.0.jar
flink-json-1.12.0.jar
flink-jdbc_2.12-1.10.2.jar
放入$FLINK_HOME/lib中
启动flink集群
⑤
| 操作 | 命令 | 备注 |
| 查看topic | $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181 | 如果想删除topic,可以是:
|
| 往 order_sql 这个 topic发送 json 消息 | $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic order_sql | 这里可能碰到[2]中的报错,注意检查命令中端口与配置文件server.properties中的listeners的端口严格保持一致 [2]中的报错还可能是某个节点的kafka挂掉导致的.
可能碰到[3] 注意关闭防火墙
|
| 使用kafka自带消费端测试下消费 | $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic order_sql | 如果kafka自带消费者测试有问题,那么就不用继续往下面做了, 此时如果使用Flink SQL Client来消费也必然会出现问题 |
| 清除topic中所有数据[6](因为,万一你输错了呢?对吧) | $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic order_sql | 需要$KAFKA/config/server.properties设置 delete.topic.enable=true |
下面的需要手动输入(发送json消息,注意下面的信息千万不要一次性复制全部内容,必须一条一条手动拷贝)
{"order_id": "1","shop_id": "AF18","member_id": "3410211","trade_amt": "100.00","pay_time": "1556420980000"}
{"order_id": "2","shop_id": "AF20","member_id": "3410213","trade_amt": "130.00","pay_time": "1556421040000"}
{"order_id": "3","shop_id": "AF18","member_id": "3410212","trade_amt": "120.00","pay_time": "1556421100000"}
{"order_id": "4","shop_id": "AF19","member_id": "3410212","trade_amt": "100.00","pay_time": "1556421120000"}
{"order_id": "5","shop_id": "AF18","member_id": "3410211","trade_amt": "150.00","pay_time": "1556421480000"}
{"order_id": "6","shop_id": "AF18","member_id": "3410211","trade_amt": "110.00","pay_time": "1556421510000"}
{"order_id": "7","shop_id": "AF19","member_id": "3410213","trade_amt": "110.00","pay_time": "1556421570000"}
{"order_id": "8","shop_id": "AF20","member_id": "3410211","trade_amt": "100.00","pay_time": "1556421630000"}
{"order_id": "9","shop_id": "AF17","member_id": "3410212","trade_amt": "110.00","pay_time": "1556421655000"}
⑥
$FLINK_HOME/bin/sql-client.sh embedded -d $FLINK_HOME/conf/sql.my.yaml -l sql-libs/
注意这个实验涉及到两个.yaml文件,修改如下:
$FLINK_HOME/bin/sql-client.sh embedded -d $FLINK_HOME/conf/sql.my.yaml -l /home/appleyuchi/bigdata/flink-1.12/lib
⑦FLINK SQL执行
| 具体操作 | 具体FLINK SQL |
| 显示orders内容 | select * from orders; |
| 1分钟固定窗口计算 | SELECT shop_id , TUMBLE_START(payment_time, INTERVAL '1' MINUTE) AS tumble_start , TUMBLE_END(payment_time, INTERVAL '1' MINUTE) AS tumble_end , sum(trade_amt) AS amt FROM orders GROUP BY shop_id, TUMBLE(payment_time, INTERVAL '1' MINUTE); |
--------------------------------------------------------------------------------------------------------实验效果截图--------------------------------------------------------------------------------------------------------------------------------------------------------------------
解决方案:
$FLINK_HOME/conf/flink-conf.yaml中修改为:
classloader.resolve-order: parent-first
尽量集群中的每个节点都要修改
继续往下,碰到问题:
问了花名雪尽
根据提示:
放弃采用.yaml的方式,改用DDL方式(下面的直接拷贝到Flink SQL Client中然后按下回车即可):
CREATE TABLE orders (order_id BIGINT, member_id BIGINT, trade_amt DOUBLE, pay_time BIGINT,ts AS TO_TIMESTAMP(FROM_UNIXTIME(pay_time/ 1000, 'yyyy-MM-dd HH:mm:ss')), -- 定义事件时间WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 在ts上定义5 秒延迟的 watermark ) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'order_sql','connector.startup-mode' = 'earliest-offset','connector.properties.group.id' = 'testGroup','connector.properties.zookeeper.connect' = 'Desktop:2181,Laptop:2181,Laptop:2183','connector.properties.bootstrap.servers' = 'Desktop:9091','format.type' = 'json' );操作如下:
实验效果如下:
----------------------------------------------------------------------------附录(其他可能用到的常规操作)-----------------------------------------------------------------------------------------------------------------------------------------
删除topic:
$KAFKA/bin/kafka-topics.sh --delete --zookeeper Desktop:2181 --topic order_sql
或者:
deleteall /brokers/topics/order_sql
deleteall /config/topics/order_sql
deleteall /admin/delete_topics/order_sql
根据官方文档[4]用的是0.11的kafka,可能是版本存在不兼容的问题
Reference:
[1]Flink SQL-Client 的使用
[2]Kafka连接服务器出现:Connection to node 1 (localhost/127.0.0.1:9092) could not be established.
[3]kafka出现Unable to read additional data from server sessionid 0x0, likely server has closed socket
[4]SQL Client
[5]Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL
[6]Is there a way to delete all the data from a topic or delete the topic before every run?
总结
以上是生活随笔为你收集整理的flink sql client讀取kafka數據的timestamp(DDL方式)的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: 易捷加油(上的易捷加油)
- 下一篇: kafka抑制启动时的log INFO为