Sorm进阶(1):storm实现github提交数监控看板
生活随笔
收集整理的这篇文章主要介绍了
Sorm进阶(1):storm实现github提交数监控看板
小编觉得挺不错的,现在分享给大家,帮大家做个参考.
1. 基础组件及其API
- storm中有关spout类的层次
在本例中,spout基于github的API监控某指定项目仓库的动态,并将变动情况发射为元组,每个元组包含针对该仓库的全部提交消息。紧接着,spout类文件的changelog.txt文件包含了所期望格式的提交消息,如下所示:
代码实现:
import org.apache.logging.log4j.core.util.IOUtils; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.spout.SpoutOutputCollector; //发射元组 import org.apache.storm.topology.OutputFieldsDeclarer; // 为spout发射的所有元组定义字段命名 import org.apache.storm.tuple.Fields; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Values;import java.io.IOException; import java.nio.charset.Charset; import java.util.List; import java.lang.String; import java.util.Map;public class CommitFeedListener extends BaseRichSpout {private SpoutOutputCollector outputCollector; // 负责发射元组或者让元组失效private List<String> commits; //从 changelog.txt文件中读取提交的消息列表// 为spout发射的所有元组定义字段命名public void declareOutputFields(OutputFieldsDeclarer declarer){// Field类构造函数中的命名顺序,必须与发射元组中值的顺序匹配,而发射元值的顺序由Value类来定义declarer.declare(new Fields("commit")); // spout发射一个命名为commit的字段}// storm准备运行spout时候调用,连接数据源public void open(Map configMap,TopologyContext context,SpoutOutputCollector outputCollector){this.outputCollector = outputCollector;try{ // 读取changelog.txt到Listcommits = org.apache.storm.shade.org.apache.commons.io.IOUtils.readLines(ClassLoader.getSystemResourceAsStream("changelog.txt"),Charset.defaultCharset().name());} catch(IOException e) {throw new RuntimeException(e);}}public void nextTuple(){ // 当spout读取下一个元组时,由storm定时调用,数据源准备好一个完整的数据之后才会触发for (String commit:commits) {// 为每个提交消息发射一个元组outputCollector.emit(new Values(commit));}} }- storm中有关bolt的类层次
这里我们需要用到两个bolt,可以直接继承BaseBasicBolt,一个负责从元组中接受完整的提交消息,并提取提交Github代码用户的email地址,然后发射包含email地址的元组;另一个bolt维护一个内存映射表,并在映射表中更新用户提交的次数。
代码实现:
import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;public class EmailExtractor extends BaseBasicBolt{//public void declareOutputFields(OutputFieldsDeclarer declearer){// 用于声明bolt发射的元组中字段的命名为emaildeclearer.declare(new Fields("email"));}// 当一个tuple被发射到该bolt上时被调用public void execute(Tuple tuple,BasicOutputCollector outputCollector) {// 获取字段为commit的值String commit = tuple.getStringByField("commit");String[] parts = commit.split(" ");// 发射一个字段为email的新元组outputCollector.emit(new Values(parts[1]));} } import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple;import java.util.HashMap; import java.util.Map;public class EmailCounter extends BaseBasicBolt{private Map<String, Integer> counts;public void declareOutputFields(OutputFieldsDeclarer declarer){// does not emit anything}private Integer countFor(String email){Integer count = counts.get(email);return count == null? 0:count;}private void printCounts(){for(String email:counts.keySet()){System.out.println(String.format("s% has count of %s", email, counts.get(email)));}}// 获取字段为email的值public void execute(Tuple tuple,BasicOutputCollector outputCollector){String email = tuple.getStringByField("email");counts.put(email, countFor(email) + 1);printCounts();}// storm在bolt准备运行时调用public void prepare(Map config,TopologyContext context){counts = new HashMap<String, Integer>();} }2. Storm实现
完成spout和bolt部分,我们需要告诉storm数据流的位置以及每个流的分组策略,并构建spout-bolt的拓扑计算图。作为统筹的环节,需要完成三项工作:
- 构建拓扑计算图,并告诉storm数据流的位置,以及指明每个数据流的流分组策略
- 创建配置,建议打开日志
- 生成拓扑,连同配置提交到storm集群,最后kill并关闭
示例代码:
import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils;public class LocalTopologyRunner {private static final int TEN_MINUTES = 600000;// 构建拓扑计算图,将spout-bolt链接起来// 本地运行拓扑的mainpublic static void main (String[] args){TopologyBuilder builder = new TopologyBuilder();// 添加spout到拓扑图,并指定idbuilder.setSpout("commit-feed-listener", new CommitFeedListener());// 添加listener-bolt到拓扑图,并链接commit-spoutbuilder.setBolt("email-extractor", new EmailExtractor()).shuffleGrouping("commit-feed-listener");// 添加count-bolt到拓扑图,并链接extractoe-boltbuilder.setBolt("email-counter", new EmailCounter()).shuffleGrouping("email-extractor");// 拓扑层的配置类,为了保持调试,开启了debugConfig config = new Config();config.setDebug(true);// 创建拓扑计算图StormTopology topology = builder.createTopology();// 定义本地集群LocalCluster cluster = new LocalCluster();// 提交拓扑计算图,并配置到本地集群cluster.submitTopology("github-commit-count", config, topology);Utils.sleep(TEN_MINUTES);cluster.killTopology("github-commit-count");cluster.shutdown();} }3. 小结
- 一个拓扑计算图是一个结点图集,图中每个节点代表一个进程或者计算处理,每条边代表上个节点计算的输出,下个结点计算的输入
- 元组是一个有序地数值序列,其中每个数值都被赋予一个命名
- spout是数据流的源头,目的就是为了从数据源读取数据,并且发射元组作为输出流到数据流中
- bolt是实现核心业务逻辑的地方,执行过滤/聚合/连接或者数据库交互等操作
- spout/bolt都可以执行一个或者多个实例,这个是线程控制的
- main中需要完成统筹的工作,包括添加节点到拓扑图,配置拓扑图,创建拓扑图,最后将拓扑图提交到storm集群
- 所有的结点都有id唯一识别,所有的元组都有自己的字段名,并且结点上声明的Fieds类型的字段名必须与同节点发射的Value类型字段名相同
总结
以上是生活随笔为你收集整理的Sorm进阶(1):storm实现github提交数监控看板的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: 美国凤凰号探测器从火星土壤中提取到水
- 下一篇: 问一个AddDevice和设备符号链的问