Apache Kafka 简明教程
Real Time Application(Twitter)
我们分析一下一个获取最新 Twitter 提要及其主题标签的实时应用程序。之前,我们已经看到过 Storm、Spark 与 Kafka 的集成。在这两种情况下,我们都创建了一个 Kafka 生产者(使用 CLI)以向 Kafka 生态系统发送消息。然后,Storm 和 Spark 集成分别使用 Kafka 消费者读取消息并将其注入 Storm 和 Spark 生态系统。所以,我们在实际中需要创建 Kafka 生产者,该生产者应:
Let us analyze a real time application to get the latest twitter feeds and its hashtags. Earlier, we have seen integration of Storm and Spark with Kafka. In both the scenarios, we created a Kafka Producer (using cli) to send message to the Kafka ecosystem. Then, the storm and spark inte-gration reads the messages by using the Kafka consumer and injects it into storm and spark ecosystem respectively. So, practically we need to create a Kafka Producer, which should −
-
Read the twitter feeds using “Twitter Streaming API”,
-
Process the feeds,
-
Extract the HashTags and
-
Send it to Kafka.
一旦 Kafka 接收了 HashTag,Storm/Spark 集成便会收到信息并将其发送至 Storm/Spark 生态系统。
Once the HashTags are received by Kafka, the Storm / Spark integration receive the infor-mation and send it to Storm / Spark ecosystem.
Twitter Streaming API
可以用任何编程语言访问“Twitter 流式传输 API”。“twitter4j”是一个开源的非官方 Java 库,它提供一个基于 Java 的模块来方便地访问“Twitter 流式传输 API”。“twitter4j”提供基于侦听器的框架来访问推文。要访问“Twitter 流式传输 API”,我们需要注册 Twitter 开发者帐户并获取以下 OAuth 认证详情。
The “Twitter Streaming API” can be accessed in any programming language. The “twitter4j” is an open source, unofficial Java library, which provides a Java based module to easily access the “Twitter Streaming API”. The “twitter4j” provides a listener based framework to access the tweets. To access the “Twitter Streaming API”, we need to sign in for Twitter developer account and should get the following OAuth authentication details.
-
Customerkey
-
CustomerSecret
-
AccessToken
-
AccessTookenSecret
创建开发人员帐户后,下载“twitter4j”JAR 文件并将其放入 Java 类路径中。
Once the developer account is created, download the “twitter4j” jar files and place it in the java class path.
完整的 Twitter Kafka 生产者编码(KafkaTwitterProducer.java)如下:
The Complete Twitter Kafka producer coding (KafkaTwitterProducer.java) is listed below −
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import twitter4j.*;
import twitter4j.conf.*;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaTwitterProducer {
public static void main(String[] args) throws Exception {
LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);
if(args.length < 5){
System.out.println(
"Usage: KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret> <twitter-access-token>
<twitter-access-token-secret>
<topic-name> <twitter-search-keywords>");
return;
}
String consumerKey = args[0].toString();
String consumerSecret = args[1].toString();
String accessToken = args[2].toString();
String accessTokenSecret = args[3].toString();
String topicName = args[4].toString();
String[] arguments = args.clone();
String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey(consumerKey)
.setOAuthConsumerSecret(consumerSecret)
.setOAuthAccessToken(accessToken)
.setOAuthAccessTokenSecret(accessTokenSecret);
TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
StatusListener listener = new StatusListener() {
@Override
public void onStatus(Status status) {
queue.offer(status);
// System.out.println("@" + status.getUser().getScreenName()
+ " - " + status.getText());
// System.out.println("@" + status.getUser().getScreen-Name());
/*for(URLEntity urle : status.getURLEntities()) {
System.out.println(urle.getDisplayURL());
}*/
/*for(HashtagEntity hashtage : status.getHashtagEntities()) {
System.out.println(hashtage.getText());
}*/
}
@Override
public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
// System.out.println("Got a status deletion notice id:"
+ statusDeletionNotice.getStatusId());
}
@Override
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
// System.out.println("Got track limitation notice:" +
num-berOfLimitedStatuses);
}
@Override
public void onScrubGeo(long userId, long upToStatusId) {
// System.out.println("Got scrub_geo event userId:" + userId +
"upToStatusId:" + upToStatusId);
}
@Override
public void onStallWarning(StallWarning warning) {
// System.out.println("Got stall warning:" + warning);
}
@Override
public void onException(Exception ex) {
ex.printStackTrace();
}
};
twitterStream.addListener(listener);
FilterQuery query = new FilterQuery().track(keyWords);
twitterStream.filter(query);
Thread.sleep(5000);
//Add Kafka producer config settings
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
int i = 0;
int j = 0;
while(i < 10) {
Status ret = queue.poll();
if (ret == null) {
Thread.sleep(100);
i++;
}else {
for(HashtagEntity hashtage : ret.getHashtagEntities()) {
System.out.println("Hashtag: " + hashtage.getText());
producer.send(new ProducerRecord<String, String>(
top-icName, Integer.toString(j++), hashtage.getText()));
}
}
}
producer.close();
Thread.sleep(5000);
twitterStream.shutdown();
}
}
Compilation
使用以下命令编译应用程序:
Compile the application using the following command −
javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java
Execution
打开两个控制台。在一个控制台中运行上述已编译应用程序,如下所示。
Open two consoles. Run the above compiled application as shown below in one console.
java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food
在另一个窗口中运行上一章中解释的任何 Spark/Storm 应用程序。主要的注意事项是,在两种情况下使用的主题应相同。此处,我们将“my-first-topic”用作主题名称。
Run any one of the Spark / Storm application explained in the previous chapter in another win-dow. The main point to note is that the topic used should be same in both cases. Here, we have used “my-first-topic” as the topic name.