Apache Kafka 简明教程

Real Time Application(Twitter)

我们分析一下一个获取最新 Twitter 提要及其主题标签的实时应用程序。之前,我们已经看到过 Storm、Spark 与 Kafka 的集成。在这两种情况下,我们都创建了一个 Kafka 生产者(使用 CLI)以向 Kafka 生态系统发送消息。然后,Storm 和 Spark 集成分别使用 Kafka 消费者读取消息并将其注入 Storm 和 Spark 生态系统。所以,我们在实际中需要创建 Kafka 生产者,该生产者应:

  1. 使用“Twitter 流式传输 API”读取 Twitter 提要,

  2. Process the feeds,

  3. Extract the HashTags and

  4. Send it to Kafka.

一旦 Kafka 接收了 HashTag,Storm/Spark 集成便会收到信息并将其发送至 Storm/Spark 生态系统。

Twitter Streaming API

可以用任何编程语言访问“Twitter 流式传输 API”。“twitter4j”是一个开源的非官方 Java 库,它提供一个基于 Java 的模块来方便地访问“Twitter 流式传输 API”。“twitter4j”提供基于侦听器的框架来访问推文。要访问“Twitter 流式传输 API”,我们需要注册 Twitter 开发者帐户并获取以下 OAuth 认证详情。

  1. Customerkey

  2. CustomerSecret

  3. AccessToken

  4. AccessTookenSecret

创建开发人员帐户后,下载“twitter4j”JAR 文件并将其放入 Java 类路径中。

完整的 Twitter Kafka 生产者编码(KafkaTwitterProducer.java)如下:

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.*;
import twitter4j.conf.*;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaTwitterProducer {
   public static void main(String[] args) throws Exception {
      LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);

      if(args.length < 5){
         System.out.println(
            "Usage: KafkaTwitterProducer <twitter-consumer-key>
            <twitter-consumer-secret> <twitter-access-token>
            <twitter-access-token-secret>
            <topic-name> <twitter-search-keywords>");
         return;
      }

      String consumerKey = args[0].toString();
      String consumerSecret = args[1].toString();
      String accessToken = args[2].toString();
      String accessTokenSecret = args[3].toString();
      String topicName = args[4].toString();
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);

      ConfigurationBuilder cb = new ConfigurationBuilder();
      cb.setDebugEnabled(true)
         .setOAuthConsumerKey(consumerKey)
         .setOAuthConsumerSecret(consumerSecret)
         .setOAuthAccessToken(accessToken)
         .setOAuthAccessTokenSecret(accessTokenSecret);

      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
      StatusListener listener = new StatusListener() {

         @Override
         public void onStatus(Status status) {
            queue.offer(status);

            // System.out.println("@" + status.getUser().getScreenName()
               + " - " + status.getText());
            // System.out.println("@" + status.getUser().getScreen-Name());

            /*for(URLEntity urle : status.getURLEntities()) {
               System.out.println(urle.getDisplayURL());
            }*/

            /*for(HashtagEntity hashtage : status.getHashtagEntities()) {
               System.out.println(hashtage.getText());
            }*/
         }

         @Override
         public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
            // System.out.println("Got a status deletion notice id:"
               + statusDeletionNotice.getStatusId());
         }

         @Override
         public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
            // System.out.println("Got track limitation notice:" +
               num-berOfLimitedStatuses);
         }

         @Override
         public void onScrubGeo(long userId, long upToStatusId) {
            // System.out.println("Got scrub_geo event userId:" + userId +
            "upToStatusId:" + upToStatusId);
         }

         @Override
         public void onStallWarning(StallWarning warning) {
            // System.out.println("Got stall warning:" + warning);
         }

         @Override
         public void onException(Exception ex) {
            ex.printStackTrace();
         }
      };
      twitterStream.addListener(listener);

      FilterQuery query = new FilterQuery().track(keyWords);
      twitterStream.filter(query);

      Thread.sleep(5000);

      //Add Kafka producer config settings
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);

      props.put("key.serializer",
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      props.put("value.serializer",
         "org.apache.kafka.common.serializa-tion.StringSerializer");

      Producer<String, String> producer = new KafkaProducer<String, String>(props);
      int i = 0;
      int j = 0;

      while(i < 10) {
         Status ret = queue.poll();

         if (ret == null) {
            Thread.sleep(100);
            i++;
         }else {
            for(HashtagEntity hashtage : ret.getHashtagEntities()) {
               System.out.println("Hashtag: " + hashtage.getText());
               producer.send(new ProducerRecord<String, String>(
                  top-icName, Integer.toString(j++), hashtage.getText()));
            }
         }
      }
      producer.close();
      Thread.sleep(5000);
      twitterStream.shutdown();
   }
}

Compilation

使用以下命令编译应用程序:

javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java

Execution

打开两个控制台。在一个控制台中运行上述已编译应用程序,如下所示。

java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food

在另一个窗口中运行上一章中解释的任何 Spark/Storm 应用程序。主要的注意事项是,在两种情况下使用的主题应相同。此处,我们将“my-first-topic”用作主题名称。

Output

该应用程序的输出将取决于关键字和 Twitter 的当前提要。样本输出如下所示(Storm 集成)。

. . .
food : 1
foodie : 2
burger : 1
. . .