欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

Strom小实例,大小写转换

发布时间:2023/12/18 42 豆豆
生活随笔 收集整理的这篇文章主要介绍了 Strom小实例,大小写转换 小编觉得挺不错的,现在分享给大家,帮大家做个参考.
结构:

RandomWordSpout:
  • package cn.itcast.stormdemo;
  • import java.util.Map;
  • import java.util.Random;
  • import backtype.storm.spout.SpoutOutputCollector;
  • import backtype.storm.task.TopologyContext;
  • import backtype.storm.topology.OutputFieldsDeclarer;
  • import backtype.storm.topology.base.BaseRichSpout;
  • import backtype.storm.tuple.Fields;
  • import backtype.storm.tuple.Values;
  • import backtype.storm.utils.Utils;
  • public class RandomWordSpout extends BaseRichSpout{
  • private SpoutOutputCollector collector;
  • //模拟一些数据
  • String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};
  • //不断地往下一个组件发送tuple消息
  • //这里面是该spout组件的核心逻辑
  • @Override
  • public void nextTuple() {
  • //可以从kafka消息队列中拿到数据,简便起见,我们从words数组中随机挑选一个商品名发送出去
  • Random random = new Random();
  • int index = random.nextInt(words.length);
  • //通过随机数拿到一个商品名
  • String godName = words[index];
  • //将商品名封装成tuple,发送消息给下一个组件
  • collector.emit(new Values(godName));
  • //每发送一个消息,休眠500ms
  • Utils.sleep(500);
  • }
  • //初始化方法,在spout组件实例化时调用一次
  • @Override
  • public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  • this.collector = collector;
  • }
  • //声明本spout组件发送出去的tuple中的数据的字段名
  • @Override
  • public void declareOutputFields(OutputFieldsDeclarer declarer) {
  • declarer.declare(new Fields("orignname"));
  • }
  • }
  • SuffixBolt :
  • package cn.itcast.stormdemo;
  • import java.io.FileWriter;
  • import java.io.IOException;
  • import java.util.Map;
  • import java.util.UUID;
  • import backtype.storm.task.TopologyContext;
  • import backtype.storm.topology.BasicOutputCollector;
  • import backtype.storm.topology.OutputFieldsDeclarer;
  • import backtype.storm.topology.base.BaseBasicBolt;
  • import backtype.storm.tuple.Tuple;
  • public class SuffixBolt extends BaseBasicBolt{
  • FileWriter fileWriter = null;
  • //在bolt组件运行过程中只会被调用一次
  • @Override
  • public void prepare(Map stormConf, TopologyContext context) {
  • try {
  • fileWriter = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID());
  • } catch (IOException e) {
  • throw new RuntimeException(e);
  • }
  • }
  • //该bolt组件的核心处理逻辑
  • //每收到一个tuple消息,就会被调用一次
  • @Override
  • public void execute(Tuple tuple, BasicOutputCollector collector) {
  • //先拿到上一个组件发送过来的商品名称
  • String upper_name = tuple.getString(0);
  • String suffix_name = upper_name + "_itisok";
  • //为上一个组件发送过来的商品名称添加后缀
  • try {
  • fileWriter.write(suffix_name);
  • fileWriter.write("\n");
  • fileWriter.flush();
  • } catch (IOException e) {
  • throw new RuntimeException(e);
  • }
  • }
  • //本bolt已经不需要发送tuple消息到下一个组件,所以不需要再声明tuple的字段
  • @Override
  • public void declareOutputFields(OutputFieldsDeclarer arg0) {
  • }
  • }
  • TopoMain :
  • package cn.itcast.stormdemo;
  • import backtype.storm.Config;
  • import backtype.storm.StormSubmitter;
  • import backtype.storm.generated.AlreadyAliveException;
  • import backtype.storm.generated.InvalidTopologyException;
  • import backtype.storm.generated.StormTopology;
  • import backtype.storm.topology.TopologyBuilder;
  • /**
  • * 组织各个处理组件形成一个完整的处理流程,就是所谓的topology(类似于mapreduce程序中的job)
  • * 并且将该topology提交给storm集群去运行,topology提交到集群后就将永无休止地运行,除非人为或者异常退出
  • * @author duanhaitao@itcast.cn
  • *
  • */
  • public class TopoMain {
  • public static void main(String[] args) throws Exception {
  • TopologyBuilder builder = new TopologyBuilder();
  • //将我们的spout组件设置到topology中去
  • //parallelism_hint :4 表示用4个excutor来执行这个组件
  • //setNumTasks(8) 设置的是该组件执行时的并发task数量,也就意味着1个excutor会运行2个task
  • builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);
  • //将大写转换bolt组件设置到topology,并且指定它接收randomspout组件的消息
  • //.shuffleGrouping("randomspout")包含两层含义:
  • //1、upperbolt组件接收的tuple消息一定来自于randomspout组件
  • //2、randomspout组件和upperbolt组件的大量并发task实例之间收发消息时采用的分组策略是随机分组shuffleGrouping
  • builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
  • //将添加后缀的bolt组件设置到topology,并且指定它接收upperbolt组件的消息
  • builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
  • //用builder来创建一个topology
  • StormTopology demotop = builder.createTopology();
  • //配置一些topology在集群中运行时的参数
  • Config conf = new Config();
  • //这里设置的是整个demotop所占用的槽位数,也就是worker的数量
  • conf.setNumWorkers(4);
  • conf.setDebug(true);
  • conf.setNumAckers(0);
  • //将这个topology提交给storm集群运行
  • StormSubmitter.submitTopology("demotopo", conf, demotop);
  • }
  • }
  • UpperBolt :
  • package cn.itcast.stormdemo;
  • import backtype.storm.topology.BasicOutputCollector;
  • import backtype.storm.topology.OutputFieldsDeclarer;
  • import backtype.storm.topology.base.BaseBasicBolt;
  • import backtype.storm.tuple.Fields;
  • import backtype.storm.tuple.Tuple;
  • import backtype.storm.tuple.Values;
  • public class UpperBolt extends BaseBasicBolt{
  • //业务处理逻辑
  • @Override
  • public void execute(Tuple tuple, BasicOutputCollector collector) {
  • //先获取到上一个组件传递过来的数据,数据在tuple里面
  • String godName = tuple.getString(0);
  • //将商品名转换成大写
  • String godName_upper = godName.toUpperCase();
  • //将转换完成的商品名发送出去
  • collector.emit(new Values(godName_upper));
  • }
  • //声明该bolt组件要发出去的tuple的字段
  • @Override
  • public void declareOutputFields(OutputFieldsDeclarer declarer) {
  • declarer.declare(new Fields("uppername"));
  • }
  • }





  • 来自为知笔记(Wiz)

    转载于:https://www.cnblogs.com/xiaoxiao5ya/p/caf5daa5f8b5d3861583b24e1f2d8308.html

    总结

    以上是生活随笔为你收集整理的Strom小实例,大小写转换的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。