Hands on: Build a Storm analytics solution

Storm lets you create real-time analytics for every conceivable need. Here's a tasty example using Twitter data and source code hosted on GitHub

Two weeks ago, we examined the two most popular real-time processing frameworks, Apache Storm and Apache Spark. Now we're going to take a much deeper look at Storm and walk through a basic Storm deployment for consuming Twitter messages and performing analytics on the Twitter stream.

To this end, we'll extract important keywords from individual tweets and calculate rolling metrics related to how actively a given keyword is being discussed. Plus, we'll do some lightweight sentiment analysis to determine the tenor of the discussion on a given topic. We'll also look at how Storm and XMPP combine nicely for extracting important "moment in time" events from a stream and for sending those events out as alerts.

All about Storm

Storm is an open source, distributed, stream-processing platform, designed to make it easy to build massively scalable systems for performing real-time computations on continuous streams of data.

People sometimes refer to Storm as the Hadoop of real-time processing, but it's important to note that Storm has no particular dependency on the MapReduce programming model. You may, if your needs so dictate, code a Storm solution to use a MapReduce model, but nothing about Storm requires it. In fact, Storm bears a slight resemblance to pre-Hadoop distributed computing systems like MPI in terms of the flexibility you have in designing your application.

To accomplish this, Storm depends on a small number of key concepts, including topologies, spouts, bolts, and tuples. A spout is an interface into a stream of data; it can process that stream, possibly do some initial filtering or processing, and transform it into a stream of tuples, which are then sent to bolts for processing.

Tuples are labeled collections of fields that are passed among Storm components (spouts and bolts). Bolts are the Storm components that do the heavy lifting of handling complex calculations. A Storm topology is a collection of spouts and bolts that have been wired together to create a specific solution.

A significant distinction between Storm and Hadoop is that Hadoop MapReduce jobs run in batch mode where they are started, run until a given set of data has been processed, and terminate. By contrast, Storm topologies default to running forever until they are explicitly killed. As long as the topology is running, the spouts will collect data and send tuples to the bolts for processing.

Storm provides considerable flexibility for designing topologies to match the problem at hand. A topology may include multiple spouts, and bolts may consume tuples from one more more spouts, then in turn emit tuples that are sent to one or more subsequent bolts for additional processing. Generally speaking, the chain of processing steps may be as long as needed, with components represented as a directed acyclic graph of operations. Within this framework you can implement MapReduce algorithms, ScatterGather operations, pipeline-oriented algorithms, or whatever is required to perform the desired calculation.

Storm addresses parallelism by running instances of the various components (spouts and bolts) on multiple nodes in the cluster. Your topology configuration specifies the desired level of parallelism for each component. This allows you to use a large number of nodes for a computationally expensive step in the overall workflow, for example, while other steps may require only one node.

Field groupings allow the programmer to control the distribution of tuples between multiple instances of a given component. Default options include the "shuffle" grouping, where tuples are distributed randomly among instances, "field" grouping, where all tuples with the same value for a specified field go to the same instance (similar to how MapReduce works), and "all" grouping where each instance receives all tuples.

Storm allows the use of any arbitrary library and essentially any valid Java code within components. Spouts and bolts can read from, or write to, any database, file system, queue, cache, socket, or endpoint that is required. In the following examples, we will examine a Storm topology that consumes Twitter data, reads files from a file system, and writes to an XMPP server.

Before we move on to the demo code, there is one important point to understand about statefulness in Storm components: Bolts may be stateful or stateless depending on their function; a bolt that is calculating a "rolling average" of some metric over a period of time must maintain data across invocations. Other bolts may process a given tuple, perform the calculation at hand, and discard the data. In the case of a stateful component, it is up to the programmer to ensure that intermediate state is saved to an external location, and reloaded as necessary, to allow for the failure of a node.

Getting started with Storm

To run the code samples, you need a local installation of Storm. Storm includes a "local" mode for development purposes that simulates parallelism using threads, so you won't need to build a full-fledged Storm cluster to follow along. To install Storm, download the Storm 0.9.3 bundle and extract it somewhere on your file system. Once extracted, add the apache-storm-0.9.2/bin directory to your path, so you can run the storm command on the command line.

All source code for this article is available on GitHub:

Clone this project, and build it using the command:

$ mvn clean compile package

After the build completes, cd into the target/ directory, and run the topology using the command:

$ storm jar storm-samples-0.0.1-SNAPSHOT-jar-with-dependencies.jar \ com.osintegrators.example.storm.main.TwitterAnalyticsTopology

Exploring Storm code

The basic framework for running Storm topologies here is borrowed from the tutorial in the official Storm documentation. To see how a topology is wired up and run, take a look in TwitterAnalyticsTopology.java. Here our main() method instantiates an instance of our topology and submits it to run locally or on a cluster, depending on the provided command-line arguments. Since we did not pass the keyword "remote" as an argument in our launch command, we will run in local mode.

boolean runLocally = true;
if (args.length >= 2 && args[1].equalsIgnoreCase("remote"))
{
runLocally = false;
}

TwitterAnalyticsTopology ttt = null;
if( !topologyName.isEmpty() )
{
ttt = new TwitterAnalyticsTopology(topologyName);
}
else
{
ttt = new TwitterAnalyticsTopology();
}

logger.info( "Topology Name: " + ttt.topologyName );

if (runLocally)
{
logger.info("Running in local mode");
ttt.runLocally();
}
else
{
logger.info("Running in remote (cluster) mode");
if (runLocally)
ttt.runRemotely();
}

The actual wiring of the topology happens in the wireTopology() method, which is called from the constructor for TwitterAnalyticsTopology. As you can see, wiring a topology consists of creating instances of our components, passing and required setup/configuration data to those instances, assigning the spouts and bolts to the topology, and defining the connections between the components. The calls to the shuffleGrouping() method are how we declare the flow of tuples between components.

In this example, we pass our Twitter credentials into the TwitterSpout, so it can open a connection to Twitter and receive tweets. Thus, mentionTargets is a list of keywords that we specifically want to look for in our Twitter stream. We pass this list into the TweetToMentionBolt, which we will examine momentarily. 

private void wireTopology()
{
logger.info( "Wiring topology...");

String consumerKey = "your_consumer_key";
String consumerSecret = "your_consumer_secret";
String accessToken = "your_access_token";
String accessTokenSecret = "your_token_secret";
String[] keywords = {};

List<String> mentionTargets = new ArrayList<String>();

mentionTargets.add( "facebook" );
mentionTargets.add( "tor" );
mentionTargets.add( "oracle" );
mentionTargets.add( "jive" );
mentionTargets.add( "manufacturing" );
mentionTargets.add( "openstack" );
mentionTargets.add( "barrett" );
mentionTargets.add( "hadoop" );
mentionTargets.add( "arduino" );
mentionTargets.add( "memristor" );
mentionTargets.add( "sony" );
mentionTargets.add( "scala" );

this.builder.setSpout( "twitterSpout", new TwitterSpout(consumerKey, consumerSecret, accessToken, accessTokenSecret, keywords ), 1);
this.builder.setBolt( "tweetToMentionBolt", new TweetToMentionBolt(mentionTargets), 3 ).shuffleGrouping("twitterSpout");
this.builder.setBolt( "singleMentionAlerterBolt", new SingleMentionAlerterBolt( "talk.google.com", "5222", "gmail.com" ), 1).shuffleGrouping("tweetToMentionBolt");
this.builder.setBolt( "mentionsThresholdBolt", new MentionsThresholdBolt("talk.google.com", "5222", "gmail.com"), 1).shuffleGrouping("tweetToMentionBolt");
this.builder.setBolt( "sentimentAnalysisBolt", new SentimentAnalysisBolt( "talk.google.com", "5222", "gmail.com", "/home/prhodes/workspaces/storm/storm-samples/AFINN/AFINN-111.txt" ), 3).shuffleGrouping( "tweetToMentionBolt" );
}

We set the parallelism for our TwitterSpout to 1 since we need only one instance of this class running. If we ran multiple instances of this, we would receive multiple copies of the same message, and we'd risk running afoul of Twitter's restrictions on multiple simultaneous connections from the same account/Internet protocol.

In the TwitterSpout class we take advantage of the open() method, which is called by Storm as part of initializing the topology, to open our connection to Twitter and set up our UserStreamListener instance. Any tweets that are received are stored in the LinkedBlockingQueue instance, where they are available when Storm initiates a call to the nextTuple() method.

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
{
logger.info( "TwitterSpout opening...");
this.collector = collector;
this.queue = new LinkedBlockingQueue<Status>(1000);

UserStreamListener listener = new UserStreamListener()
{

@Override
public void onStatus(Status status)
{
logger.info( "onStatus() called");
logger.info( "Status: " + status.getUser().getName() + " : " + status.getText() );
queue.offer(status);
}

@Override
public void onException(Exception ex)
{
logger.error( "Exception: \n", ex);
}

};

this.twitterStream = new TwitterStreamFactory(
new ConfigurationBuilder().setJSONStoreEnabled(true).build()).getInstance();

this.twitterStream = new TwitterStreamFactory(
this.twitterStream.setOAuthConsumer(consumerKey, consumerSecret);
AccessToken token = new AccessToken(accessToken, accessTokenSecret);
this.twitterStream.setOAuthAccessToken(token);

if (keyWords.length == 0)
{
logger.info( "Sample Twitter Stream");
this.twitterStream.user();
}

else {

FilterQuery query = new FilterQuery().track(keyWords);
this.twitterStream.filter(query);
}
}

When nextTuple() is called, we extract any available tweets from the queue and emit them using the SpoutOutputCollector instance. The order in which arguments are passed to the Values() constructor must match the order in which the output field names are declared in the declareOutputFields() method to allow downstream components to extract fields from the supplied tuple.

public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields("tweet"));

}

public void nextTuple()
{
// logger.info( "Called nextTuple()");
Status ret = queue.poll();
if (ret == null)
{
// logger.info( "No value to emit...");
Utils.sleep(50);
}
else
{
// logger.info( "Emitting value...");
collector.emit(new Values(ret));
}
}

Once the TwitterSpout has emitted a tuple, it will be processed by the execute() method in the next component in the chain -- in our case, TweetToMentionBolt. This bolt is responsible for examining tweets to see if they contain one of the keywords we are monitoring for. If one of our keywords is found, this bolt emits a new tuple with the specified keyword, the tweet text, and a Date object representing when the mention occurred. Again, the ordering of fields in the emit() method must match the declaration in the declareOutputFields() method.

@Override
public void execute(Tuple input, BasicOutputCollector collector)
{
Status status = (Status)input.getValueByField("tweet");

// parse the status and look for mentions of the entity we're interested in...
String statusText = status.getText();

logger.info( "status: " + statusText );

for( String mentionTarget : mentionTargets )
{
if( statusText.toLowerCase().matches( ".*\\s*" + mentionTarget + "\\s+.*" ))
{
logger.info( "emitting metion: " + mentionTarget );
collector.emit( new Values( mentionTarget, new Date(), status ) );
}
else
{
// NOP
}
}

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields("mentionTarget", "date", "status" ));
}

TweetToMentionBolt is a stateless bolt, and multiple instances of this class could freely operate on any random tuple from upstream. Since our downstream bolts don't care where their tuples come from, this is a perfect example of a bolt where we might increase the parallelism if this were a computationally expensive operation. We can use as many instances of this bolt as we need in order to handle the load in a timely fashion.

1 2 Page 1
Page 1 of 2