欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程语言 > java >内容正文

java

kafka->Flink->ElasticSearch(Java形式)

发布时间:2023/12/31 java 43 豆豆
生活随笔 收集整理的这篇文章主要介绍了 kafka->Flink->ElasticSearch(Java形式) 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

概述

本文主要是对[1]中内容的复现

环境

开源组件版本
KAFKA2.5.0
Flink1.6.0
Zookeeper3.6.0
ElasticSearch7.10.1
KIBANA7.10.1

因为ElasticSearch7的写法似乎不兼容 ES6的,所以代码中依然是ES6的写法。

代码中依然是ElasticSearch6的驱动
这个实验注意,不要追求太新的版本,最新版本Flink1.12的kafka驱动依赖包都还没有开发出来。

流程图

  lateLog用来保存侧边流输出的迟到的数据

ElasticSearch准备工作

KIBANA操作讲人话具体命令
删除原有的index索引删除原有的数据库curl -XDELETE 'Desktop:9201/auditindex'
新建index新建数据库curl -XPUT 'http://Desktop:9201/auditindex?pretty'

创建type的mapping信息

新建表格的字段信息curl -H "Content-Type: application/json" -XPOST 'http://Desktop:9201/auditindex/audittype/_mapping?include_type_name=true' -d '
{
 "audittype":{
    "properties":{
        "area":{"type":"keyword"},
        "type":{"type":"keyword"},
        "count":{"type":"long"},
        "time":{"type":"date","format": "yyyy-MM-dd HH:mm:ss"}
        }
     }
}
'

上述命令尤其是最后一个,不要直接粘贴到terminal中运行,而要写入一个bash脚本中再运行

代码与运行流程

https://gitee.com/appleyuchi/Flink_Code/tree/master/flink清洗数据案例/FlinkProj

ElasticSearch查看接收到的数据

http://desktop:9201/auditindex/_search?pretty=true

打开后可以看到一个大大的JSON

KIBANA设置时区

进入http://desktop:5601/app/management/kibana/settings

然后下面的dateFormat设置为

Etc/UTC

然后点击上图右下角的Save Changes

 

KIBANA可视化

可视化效果如下:

注意

[1]中的设置是area,这里复现的用的是_id

因为KIBANA操作的时候没有找到area,只有area.keyword

版本差异,暂时无法解决。

本实验相关的JPS进程

106851 TaskManagerRunner
85543 NailgunRunner
84330 ZooKeeperMain
81133 NameNode
87055 Kafka
106575 StandaloneSessionClusterEntrypoint
82193 NodeManager
81617 SecondaryNameNode
38320 RemoteMavenServer
81968 ResourceManager
36945 Main
83639 Elasticsearch
81078 QuorumPeerMain
89016 Launcher
89019 DataReport
126490 Jps
81341 DataNode
87391 kafkaProducerDataReport

 

Reference:

[1]【20】Flink 实战案例开发(二):数据报表

[2]kibana7.10.1基本操作(饼图+直方图)

创作挑战赛新人创作奖励来咯,坚持创作打卡瓜分现金大奖

总结

以上是生活随笔为你收集整理的kafka->Flink->ElasticSearch(Java形式)的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。