Built for realtime: Big data messaging with Apache Kafka, Part 1

Build a continuous big data messaging system with Apache Kafka

1 2 Page 2
Page 2 of 2

Listing 2. KafkaConsumer

  public class Consumer {
      private static Scanner in;
      private static boolean stop = false;

      public static void main(String[] argv)throws Exception{
          if (argv.length != 2) {
              System.err.printf("Usage: %s <topicName> <groupId>\n",
          in = new Scanner(System.in);
          String topicName = argv[0];
          String groupId = argv[1];

          ConsumerThread consumerRunnable = new ConsumerThread(topicName,groupId);
          String line = "";
          while (!line.equals("exit")) {
              line = in.next();
          System.out.println("Stopping consumer .....");

      private static class ConsumerThread extends Thread{
          private String topicName;
          private String groupId;
          private KafkaConsumer<String,String> kafkaConsumer;

          public ConsumerThread(String topicName, String groupId){
              this.topicName = topicName;
              this.groupId = groupId;
          public void run() {
              Properties configProperties = new Properties();
              configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
              configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
              configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple");

              //Figure out where to start processing messages from
              kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
              //Start processing messages
              try {
                  while (true) {
                      ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                      for (ConsumerRecord<String, String> record : records)
              }catch(WakeupException ex){
                  System.out.println("Exception caught " + ex.getMessage());
                  System.out.println("After closing KafkaConsumer");
          public KafkaConsumer<String,String> getKafkaConsumer(){
             return this.kafkaConsumer;

Consumer and ConsumerThread

Writing the consumer code in Listing 2 in two parts ensures that we close the Consumer object before exiting. I'll describe each class in turn. First, ConsumerThread is an inner class that takes a topic name and group name as its arguments. In the run() method it creates a KafkaConsumer object, with appropriate properties. It subscribes to the topic that was passed as an argument in the constructor, by calling the kafkaConsumer.subscribe() method, then polls the Kafka server every 100 milliseconds to check if there are any new messages in the topic. It will iterate through the list of any new messages and print them to the console.

In the Consumer class we create a new object of ConsumerThread and start it in a different thread. The ConsumerThead starts an infinite loop and keeps polling the topic for new messages. Meanwhile in the Consumer class, the main thread waits for a user to enter exit on the console. Once a user enters exit, it calls the KafkaConsumer.wakeup() method, causing the KafkaConsumer to stop polling for new messages and throw a WakeupException. We can then close the KafkaConsumer gracefully, by calling kafkaConsumer's close() method.

Source code for the example application used in this article, "Big data messaging with Kafka, Part 1." Created for JavaWorld by Sunil Patil.

Run the application

To test this application you can run the code in Listings 1 and 2 from your IDE, or you can follow these steps:

  1. Download the sample code, KafkaAPIClient, by executing the command: git clone https://github.com/sdpatil/KafkaAPIClient.git.
  2. Compile the code and create a fat JAR with the command: mvn clean compile assembly:single.
  3. Start the consumer: java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer test group1.
  4. Start the producer: java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Producer test.
  5. Enter a message in the producer console and check to see whether that message appears in the consumer. Try a few messages.
  6. Type exit in the consumer and producer consoles to close them.
Figure 2:

Figure 2. A Kafka producer/consumer application

Conclusion to Part 1

In the first half of this tutorial you've learned the basics of big data messaging with Apache Kafka, including a conceptual overview of Kafka, setup instructions, and how to configure a producer/consumer messaging system with Kafka.

As you've seen, Kafka's architecture is both simple and efficient, designed for performance and throughput. The second half of this tutorial shows you how to use partitions to distribute load and scale your application horizontally, and how to protect your Apache Kafka messaging system against failure. You'll also learn how Kafka uses message offsets to track and manage complex message processing.

This story, "Built for realtime: Big data messaging with Apache Kafka, Part 1" was originally published by JavaWorld.

Copyright © 2018 IDG Communications, Inc.

1 2 Page 2
Page 2 of 2