Skip to content
This repository was archived by the owner on Apr 15, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions examples/twitter-stream/README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,41 @@
### Twitter Streaming Example

kafka-rx composes well with streaming apis
kafka-rx composes well with streaming apis.

here we take the the [Twitter4J](http://twitter4j.org/en/index.html) streaming library and pour the events into kafka
Here we use the [Twitter4J](http://twitter4j.org/en/index.html) streaming library and pour tweets into Kafka.

To do this, we need to create an `Observable[ProducerRecord]` that represent the stream of messages we wish to produce.
To do this, we need to create an `Observable[ProducerRecord]` that represent the stream of messages we wish to produce.

```scala
val stream = ...
stream.map(toProducerRecord).saveToKafka(kafka)
```

In [TwitterUtils.scala](src/main/scala/TwitterUtils.scala) we wrap the twitter4j api to return an `Observable[twitter4j.Status]` - and also a place to configure your [app credentials](https://apps.twitter.com/) - required to connect to the twitter api.
In [TwitterUtils.scala](src/main/scala/TwitterUtils.scala) we wrap the twitter4j api to return an `Observable[twitter4j.Status]`.

Our entry point is [TwitterStream.scala](src/main/scala/TwitterStream.scala) where we create two threads, one to stream data out of twitter and into kafka, and another to stream data out of kafka and into our process.

In [KafkaUtils.scala](src/main/scala/KafkaUtils.scala) we configure our kafka producers and consumers with string keys and string values, since we'll be storing the json provided by the twitter api.

Our entry point is [TwitterStream.scala](src/main/scala/TwitterStream.scala) where we create two threads, one to stream data out of twitter and into kafka, and another to stream data out of kafka and into our process.
#### Configuration

Update [kafka.properties](src/main/resources/kafka.properties) to point to your Kafka server.

```text
kafka.brokers=localhost:9091
kafka.zkQuorum=localhost:2181
kafka.twitter_consumer_groupid=twitter-consumer
kafka.twitter_topic_prefix=twitter
```

Update [twitter.properties](src/main/resources/twitter.properties) to you Twitter API credentials.

```text
twitter.consumer_key=KEY
twitter.consumer_secret=SECRET
twitter.access_token=TOKEN
twitter.access_secret=TOKEN_SECRET
```

Once you have configured your api credentials, try it out:

Expand All @@ -27,4 +47,4 @@ If you have kafka and zookeeper running locally, you should start seeing some tw

Don't worry if it takes a while for the consumer to catch up, the kafka consumer threads wake up every so often to look for new topics.

Try changing the query or the code!
Try changing the query or the code!
4 changes: 4 additions & 0 deletions examples/twitter-stream/src/main/resources/kafka.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
kafka.brokers=localhost:9091
kafka.zkQuorum=localhost:2181
kafka.twitter_consumer_groupid=twitter-consumer
kafka.twitter_topic_prefix=twitter
16 changes: 16 additions & 0 deletions examples/twitter-stream/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n


# Turn on all our debugging info
log4j.logger.kafka=INFO,stdout
#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG,stdout
#log4j.logger.kafka.consumer.PartitionTopicInfo=TRACE,stdout
#log4j.logger.kafka.request.logger=TRACE,fileAppender
#log4j.additivity.kafka.request.logger=false
#log4j.logger.kafka.network.Processor=TRACE,fileAppender
#log4j.additivity.kafka.network.Processor=false
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
4 changes: 4 additions & 0 deletions examples/twitter-stream/src/main/resources/twitter.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
twitter.consumer_key=KEY
twitter.consumer_secret=SECRET
twitter.access_token=TOKEN
twitter.access_secret=TOKEN_SECRET
12 changes: 9 additions & 3 deletions examples/twitter-stream/src/main/scala/KafkaUtils.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import java.io.FileInputStream
import java.util.Properties

import com.cj.kafka.rx.{Record, RxConsumer}
Expand All @@ -8,10 +9,15 @@ import rx.lang.scala.Observable

object KafkaUtils {

val KAFKA = "localhost:9091"
val ZOOKEEPER = "localhost:2181"
val props = new Properties()
val input = new FileInputStream("src/main/resources/kafka.properties")
props.load(input)
input.close()

val CONSUMER_GROUP = "twitter-consumer"
val KAFKA = props.getProperty("kafka.brokers")
val ZOOKEEPER = props.getProperty("kafka.zkQuorum")
val CONSUMER_GROUP = props.getProperty("kafka.twitter_consumer_groupid")
val KAFKA_TWITTER_TOPIC_PREFIX = props.getProperty("kafka.twitter_topic_prefix")

type Key = String
type Value = String
Expand Down
4 changes: 2 additions & 2 deletions examples/twitter-stream/src/main/scala/TwitterStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ object TwitterStream {
def main(args: Array[String]) = {
await[Observable[String]] {
for {
producer <- getProducerStream("tweets", args)
consumer <- getConsumerStream("tweets")
producer <- getProducerStream(KAFKA_TWITTER_TOPIC_PREFIX, args)
consumer <- getConsumerStream(KAFKA_TWITTER_TOPIC_PREFIX)
} yield producer.merge(consumer)
} foreach (
onNext = { msg => println(msg) },
Expand Down
16 changes: 12 additions & 4 deletions examples/twitter-stream/src/main/scala/TwitterUtils.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import java.io.FileInputStream
import java.util.Properties

import org.apache.kafka.clients.producer.ProducerRecord
import rx.lang.scala.Observable
import rx.lang.scala.subjects.PublishSubject
Expand All @@ -8,10 +11,15 @@ object TwitterUtils {
// twitter app credentials:
// https://apps.twitter.com/

val CONSUMER_KEY = "CONSUMER_KEY"
val CONSUMER_SECRET = "CONSUMER_SECRET"
val ACCESS_TOKEN = "ACCESS_TOKEN"
val ACCESS_SECRET = "ACCESS_SECRET"
val props = new Properties()
val input = new FileInputStream("src/main/resources/twitter.properties")
props.load(input)
input.close()

val CONSUMER_KEY = props.getProperty("twitter.consumer_key")
val CONSUMER_SECRET = props.getProperty("twitter.consumer_secret")
val ACCESS_TOKEN = props.getProperty("twitter.access_token")
val ACCESS_SECRET = props.getProperty("twitter.access_secret")

def getTwitter(consumerKey: String, consumerSecret: String, accessToken: String, accessSecret: String): TwitterStream = {
val config = new twitter4j.conf.ConfigurationBuilder()
Expand Down