How to use Redis for real-time stream processing

An in-depth guide to overcoming fast data ingestion challenges with Redis Pub/Sub, Redis Lists, and Redis Sorted Sets

1 2 Page 2
Page 2 of 2

Ingest using Redis Sorted Sets

One of the concerns with the Pub/Sub method is that it is susceptible to connection loss and hence unreliable. The challenge with the Redis Lists solution is the problem of data duplication and tight coupling between producers and consumers.

The Redis Sorted Sets solution addresses both of these issues. A counter tracks the number of messages, and the messages are indexed against this message count. They are stored in a non-ephemeral state inside the Sorted Sets data structure, which is polled by consumer applications. The consumers check for new data and pull messages by running the ZRANGEBYSCORE command.

redis sorted sets Redis Labs

Figure 6. Fast data ingest with Redis Sorted Sets and Pub/Sub

Unlike the previous two solutions, this one allows subscribers to retrieve historical data when needed, and consume it more than once. Only one copy of data is stored at each stage, making it ideal for situations where the consumer to producer ratio is very high. However, this approach is more complex and less cost-effective when compared with the last two solutions.

Pros

  • It can fetch historical data when needed, because retrieved data is not removed from the Sorted Set.
  • The solution is resilient to data connection losses, because producers and consumers require no connection between them.
  • Only one copy of data is stored at each stage, making it ideal for situations where the consumer to producer ratio is very high.

Cons

  • Implementing the solution is more complex.
  • More storage space is required, as data is not deleted from the database when consumed. 

Code design for the Redis Sorted Sets solution

redis sorted sets class diagram Redis Labs

Figure 7. Class diagram of the fast data ingest solution with Redis Sorted Sets

You can download the source code here: https://github.com/redislabsdemo/IngestSortedSet. The main classes are explained below.

SortedSetPublisher inserts a message into a Sorted Set and increments the counter that tracks new messages. In many practical cases the counter can be replaced by the timestamp.

public class SortedSetPublisher
{

       public static String SORTEDSET_COUNT_SUFFIX ="count";

       // Redis connection
       RedisConnection conn = null;

       // Jedis object
       Jedis jedis = null;

       // name of the Sorted Set data structure
       private String sortedSetName = null;      

       /*
        * @param name: SortedSetPublisher constructor
        */
       public SortedSetPublisher(String name) throws Exception{
              sortedSetName = name;
              conn = RedisConnection.getRedisConnection();
              jedis = conn.getJedis();           
       }

       /*
        */
       public void publish(String message) throws Exception{
              // Get count
              long count = jedis.incr(sortedSetName+”:”+SORTEDSET_COUNT_SUFFIX);

              // Insert into sorted set
              jedis.zadd(sortedSetName, (double)count, message);
       }

}

The SortedSetFilter class is a parent class that implements logic to learn about new messages, pull them from the database, filter them, and push them to the next level. Classes that implement custom filters extend this class and override the processMessage() method with a custom implementation.

public class SortedSetFilter extends Thread
{
       // RedisConnection to query the database
       protected RedisConnection conn = null;

       protected Jedis jedis = null;

       protected String name ="SortedSetSubscriber"; // default name

       protected String subscriberChannel ="defaultchannel"; //default name

       // Name of the Sorted Set
       protected String sortedSetName = null;

       // Channel (sorted set) to publish
       protected String publisherChannel = null;

       // The key of the last message processed
       protected String lastMsgKey = null;

       // The key of the latest message count
       protected String currentMsgKey = null;

       // Count to store the last message processed
       protected volatile String lastMsgCount = null;

       // Time-series publisher for the next level
       protected SortedSetPublisher SortedSetPublisher = null;

       public static String LAST_MESSAGE_COUNT_SUFFIX="lastmessage";

       /*
        * @param name: name of the SortedSetFilter object
        * @param subscriberChannel: name of the channel to listen to the
        * availability of new messages
        * @param publisherChannel: name of the channel to publish the availability of
        * new messages
        */
       public SortedSetFilter(String name, String subscriberChannel, 
                                                                     String publisherChannel) throws Exception{
              this.name = name;
              this.subscriberChannel = subscriberChannel;
              this.sortedSetName = subscriberChannel;
              this.publisherChannel = publisherChannel;
this.lastMsgKey = name+”:”+LAST_MESSAGE_COUNT_SUFFIX;
        this.currentMsgKey =
                                           subscriberChannel+”:”
                                           +SortedSetPublisher.SORTEDSET_COUNT_SUFFIX;
       }

       @Override
       public void run(){
              try{

                      // Connection for reading/writing to sorted sets
                      conn = RedisConnection.getRedisConnection();
                      jedis = conn.getJedis();
                      if(publisherChannel != null){
                             sortedSetPublisher =
                                 new SortedSetPublisher(publisherChannel);
                      }

                      // load delta data since last connection
                      while(true){
                             fetchData(); 
                      }                                  
              }catch(Exception e){
                      e.printStackTrace();
              }
       }

       /*
        * init() method loads the count of the last message processed. It then loads
        * all messages since the last count.
        */
       private void fetchData() throws Exception{
              if(lastMsgCount == null){
                      lastMsgCount = jedis.get(lastMsgKey);
                      if(lastMsgCount == null){
                             lastMsgCount ="0";
                      }
              }

              String currentCount = jedis.get(currentMsgKey);

              if(currentCount != null && Long.parseLong(currentCount) >
                                         Long.parseLong(lastMsgCount)){
                      loadSortedSet(lastMsgCount, currentCount);
              }else{
                      Thread.sleep(1000); // sleep for a second if there’s no
                                                       // data to fetch
              }
       }

       //Call to load the data from last Count to current Count
       private void loadSortedSet(String lastMsgCount, String currentCount)
                                                                                         throws Exception{
              //Read from SortedSet
              Set<Tuple> CountTuple = jedis.zrangeByScoreWithScores(sortedSetName, lastMsgCount, currentCount);
              for(Tuple t : CountTuple){
                      processMessageTuple(t);
              }

       }

       // Override this method to customize the filters
       private void processMessageTuple(Tuple t) throws Exception{
              long score = new Double(t.getScore()).longValue();
              String message = t.getElement();
              lastMsgCount = (new Long(score)).toString();
              processMessage(message);

              jedis.set(lastMsgKey, lastMsgCount);
       }

       protected void processMessage(String message) throws Exception{
              //Override this method
       }
} 

EnglishTweetsFilter is a custom filter that extends SortedSetFilter with its own custom filter to select only tweets that are marked as English.

public class EnglishTweetsFilter extends SortedSetFilter
{
       /*
        * @param name: name of the SortedSetFilter object
        * @param subscriberChannel: name of the channel to listen to the
        * availability of new messages
        * @param publisherChannel: name of the channel to publish the availability
        * of new messages
        */
       public EnglishTweetsFilter(String name, String subscriberChannel, String publisherChannel) throws Exception{
              super(name, subscriberChannel, publisherChannel);
       }

       @Override
       protected void processMessage(String message) throws Exception{
              //Filter; add them to a new time series database and publish
              JsonParser jsonParser = new JsonParser();

              JsonElement jsonElement = jsonParser.parse(message);
              JsonObject jsonObject = jsonElement.getAsJsonObject();

              if(jsonObject.get(“lang”) != null &&
                   jsonObject.get(“lang”).getAsString().equals(“en”)){
                      System.out.println(jsonObject.get(“text”).getAsString());
                      if(sortedSetPublisher != null){
                             sortedSetPublisher.publish(jsonObject.toString());

                      }
              }
       }

       /*
        * Main method to start EnglishTweetsFilter
        */
       public static void main(String[] args) throws Exception{
              EnglishTweetsFilter englishFilter = new EnglishTweetsFilter(“EnglishFilter”, “alldata”, “englishtweets”);
              englishFilter.start();
       }

Final thoughts

When using Redis for fast data ingest, its data structures and pub/sub functionality offer a number of options for implementation. Each approach has its advantages and disadvantages. Redis Pub/Sub is easy to implement, and producers and consumers are decoupled. But Pub/Sub is not resilient to connection loss, and it requires many connections. It’s typically used for e-commerce workflows, job and queue management, social media communications, gaming, and log collection.

The Redis Lists method is also easy to implement, and unlike with Pub/Sub, data is not lost when the subscriber loses the connection. Disadvantages include tight coupling of producers and consumers and the duplication of data for each consumer, which makes it unsuitable for some scenarios. Suitable use cases would include financial transactions, gaming, social media, IoT, and fraud detection.

The Redis Sorted Sets has a larger footprint and is more complex to implement and maintain than the Pub/Sub and List methods, but overcomes their limitations. It is resilient to connection loss, and because retrieved data is not removed from the Sorted Set, it allows for time-series queries. And because only one copy of the data is stored at each stage, it is very efficient in cases where one producer has many consumers. The Sorted Sets method is a good match for IoT transactions, financial transactions, and metering.

New Tech Forum provides a venue to explore and discuss emerging enterprise technology in unprecedented depth and breadth. The selection is subjective, based on our pick of the technologies we believe to be important and of greatest interest to InfoWorld readers. InfoWorld does not accept marketing collateral for publication and reserves the right to edit all contributed content. Send all inquiries to newtechforum@infoworld.com.

1 2 Page 2
Page 2 of 2