博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
docker运行storm及wordcount实例
阅读量:6764 次
发布时间:2019-06-26

本文共 6264 字,大约阅读时间需要 20 分钟。

  hot3.png

本文简单介绍下怎么使用docker运行storm以及在springboot中使用storm。

docker-compose

version: '2'services:    zookeeper:        image: zookeeper ##3.4.10        container_name: zookeeper        restart: always        ports:          - 2181:2181    nimbus:        image: storm ## 1.1.1        container_name: nimbus        command: storm nimbus        depends_on:            - zookeeper        links:            - zookeeper        restart: always        ports:            - 6627:6627    supervisor:        image: storm        container_name: supervisor        command: storm supervisor        depends_on:            - nimbus            - zookeeper        links:            - nimbus            - zookeeper        restart: always    ui:        image: storm        container_name: stormui        command: storm ui        depends_on:          - nimbus          - zookeeper        links:          - nimbus          - zookeeper        restart: always        ports:          - 8080:8080

启动之后访问192.168.99.100:8080就可以看见storm-ui的界面

wordcount实例

TestWordSpout

public class TestWordSpout extends BaseRichSpout {    public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);    boolean _isDistributed;    SpoutOutputCollector _collector;    public TestWordSpout() {        this(true);    }    public TestWordSpout(boolean isDistributed) {        _isDistributed = isDistributed;    }            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {        _collector = collector;    }        public void close() {            }            public void nextTuple() {        Utils.sleep(100);        final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};        final Random rand = new Random();        final String word = words[rand.nextInt(words.length)];        _collector.emit(new Values(word));    }        public void ack(Object msgId) {    }    public void fail(Object msgId) {            }        public void declareOutputFields(OutputFieldsDeclarer declarer) {        declarer.declare(new Fields("word"));    }    @Override    public Map
getComponentConfiguration() { if(!_isDistributed) { Map
ret = new HashMap
(); ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1); return ret; } else { return null; } } }

WordCountBolt

public class WordCountBolt extends BaseBasicBolt {    Map
counts = new HashMap
(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); }}

PrintBolt

public class PrintBolt extends BaseBasicBolt {    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {    }    @Override    public void execute(Tuple tuple, BasicOutputCollector collector) {        String first = tuple.getString(0);        int second = tuple.getInteger(1);        System.out.println(first + "," + second);    }}

本地运行

@SpringBootApplicationpublic class StormDemoApplication implements CommandLineRunner{	public static void main(String[] args) {		SpringApplication app = new SpringApplication((StormDemoApplication.class));		app.setWebEnvironment(false);		app.run(args);	}	@Override	public void run(String... args) throws Exception {		TopologyBuilder builder = new TopologyBuilder();		//并发度10		builder.setSpout("spout", new TestWordSpout(), 10);		builder.setBolt("count", new WordCountBolt(), 5).fieldsGrouping("spout", new Fields("word"));		builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("count");		String topologyName = "DemoTopology";		Config conf = new Config();		conf.setDebug(true);		//远程提交 mvn clean package -Dmaven.test.skip=true//		StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());		try {			LocalCluster cluster = new LocalCluster();			cluster.submitTopology(topologyName, conf,builder.createTopology());			Thread.sleep(60 * 1000);			cluster.shutdown();		} catch (Exception e) {			e.printStackTrace();		}	}}

远程提交

修改提交方式,然后打jar包

//远程提交 mvn clean package -Dmaven.test.skip=true		StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());

远程提交代码

@Test	public void remoteSubmit() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {		Config conf = new Config();		conf.put(Config.NIMBUS_SEEDS,Arrays.asList("192.168.99.100")); //配置nimbus连接主机地址,比如:192.168.10.1		conf.put(Config.NIMBUS_THRIFT_PORT,6627);//配置nimbus连接端口,默认 6627		conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.99.100")); //配置zookeeper连接主机地址,可以使用集合存放多个		conf.put(Config.STORM_ZOOKEEPER_PORT,2181); //配置zookeeper连接端口,默认2181		conf.setDebug(true);		conf.setNumWorkers(1);		TopologyBuilder builder = new TopologyBuilder();		//并发度10		builder.setSpout("spout", new TestWordSpout(), 10);		builder.setBolt("count", new WordCountBolt(), 5).fieldsGrouping("spout", new Fields("word"));		builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("count");		String topologyName = "DemoTopology";		//非常关键的一步,使用StormSubmitter提交拓扑时,不管怎么样,都是需要将所需的jar提交到nimbus上去,如果不指定jar文件路径,		//storm默认会使用System.getProperty("storm.jar")去取,如果不设定,就不能提交		System.setProperty("storm.jar","/Users/downloads/storm-demo-0.0.1-SNAPSHOT.jar");		StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());	}

clojars仓库问题

修改~/.m2/settings.xml

nexus-aliyun
*,!Clojars
Nexus aliyun
http://maven.aliyun.com/nexus/content/groups/public
Clojars
Clojars Repository
http://clojars.org/repo/
true
true

doc

转载于:https://my.oschina.net/go4it/blog/1518900

你可能感兴趣的文章