序
本文简单介绍下怎么使用docker运行storm以及在springboot中使用storm。
docker-compose
version: '2'services: zookeeper: image: zookeeper ##3.4.10 container_name: zookeeper restart: always ports: - 2181:2181 nimbus: image: storm ## 1.1.1 container_name: nimbus command: storm nimbus depends_on: - zookeeper links: - zookeeper restart: always ports: - 6627:6627 supervisor: image: storm container_name: supervisor command: storm supervisor depends_on: - nimbus - zookeeper links: - nimbus - zookeeper restart: always ui: image: storm container_name: stormui command: storm ui depends_on: - nimbus - zookeeper links: - nimbus - zookeeper restart: always ports: - 8080:8080
启动之后访问192.168.99.100:8080就可以看见storm-ui的界面
wordcount实例
TestWordSpout
public class TestWordSpout extends BaseRichSpout { public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class); boolean _isDistributed; SpoutOutputCollector _collector; public TestWordSpout() { this(true); } public TestWordSpout(boolean isDistributed) { _isDistributed = isDistributed; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void close() { } public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); } public void ack(Object msgId) { } public void fail(Object msgId) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public MapgetComponentConfiguration() { if(!_isDistributed) { Map ret = new HashMap (); ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1); return ret; } else { return null; } } }
WordCountBolt
public class WordCountBolt extends BaseBasicBolt { Mapcounts = new HashMap (); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); }}
PrintBolt
public class PrintBolt extends BaseBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String first = tuple.getString(0); int second = tuple.getInteger(1); System.out.println(first + "," + second); }}
本地运行
@SpringBootApplicationpublic class StormDemoApplication implements CommandLineRunner{ public static void main(String[] args) { SpringApplication app = new SpringApplication((StormDemoApplication.class)); app.setWebEnvironment(false); app.run(args); } @Override public void run(String... args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); //并发度10 builder.setSpout("spout", new TestWordSpout(), 10); builder.setBolt("count", new WordCountBolt(), 5).fieldsGrouping("spout", new Fields("word")); builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("count"); String topologyName = "DemoTopology"; Config conf = new Config(); conf.setDebug(true); //远程提交 mvn clean package -Dmaven.test.skip=true// StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); try { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, conf,builder.createTopology()); Thread.sleep(60 * 1000); cluster.shutdown(); } catch (Exception e) { e.printStackTrace(); } }}
远程提交
修改提交方式,然后打jar包
//远程提交 mvn clean package -Dmaven.test.skip=true StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
远程提交代码
@Test public void remoteSubmit() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { Config conf = new Config(); conf.put(Config.NIMBUS_SEEDS,Arrays.asList("192.168.99.100")); //配置nimbus连接主机地址,比如:192.168.10.1 conf.put(Config.NIMBUS_THRIFT_PORT,6627);//配置nimbus连接端口,默认 6627 conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.99.100")); //配置zookeeper连接主机地址,可以使用集合存放多个 conf.put(Config.STORM_ZOOKEEPER_PORT,2181); //配置zookeeper连接端口,默认2181 conf.setDebug(true); conf.setNumWorkers(1); TopologyBuilder builder = new TopologyBuilder(); //并发度10 builder.setSpout("spout", new TestWordSpout(), 10); builder.setBolt("count", new WordCountBolt(), 5).fieldsGrouping("spout", new Fields("word")); builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("count"); String topologyName = "DemoTopology"; //非常关键的一步,使用StormSubmitter提交拓扑时,不管怎么样,都是需要将所需的jar提交到nimbus上去,如果不指定jar文件路径, //storm默认会使用System.getProperty("storm.jar")去取,如果不设定,就不能提交 System.setProperty("storm.jar","/Users/downloads/storm-demo-0.0.1-SNAPSHOT.jar"); StormSubmitter.submitTopology(topologyName, conf, builder.createTopology()); }
clojars仓库问题
修改~/.m2/settings.xml
nexus-aliyun *,!Clojars Nexus aliyun http://maven.aliyun.com/nexus/content/groups/public Clojars Clojars Repository http://clojars.org/repo/ true true