From 2f00772740d5a779e521492a8361b0393330f30e Mon Sep 17 00:00:00 2001 From: Anastasios Zouzias Date: Thu, 15 Oct 2015 13:37:54 +0200 Subject: [PATCH 1/5] examples: twitter: kafka: add log4j properties file --- .../src/main/resources/log4j.properties | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 examples/twitter-stream/src/main/resources/log4j.properties diff --git a/examples/twitter-stream/src/main/resources/log4j.properties b/examples/twitter-stream/src/main/resources/log4j.properties new file mode 100644 index 0000000..56b80f4 --- /dev/null +++ b/examples/twitter-stream/src/main/resources/log4j.properties @@ -0,0 +1,16 @@ +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + + +# Turn on all our debugging info +log4j.logger.kafka=INFO,stdout +#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG,stdout +#log4j.logger.kafka.consumer.PartitionTopicInfo=TRACE,stdout +#log4j.logger.kafka.request.logger=TRACE,fileAppender +#log4j.additivity.kafka.request.logger=false +#log4j.logger.kafka.network.Processor=TRACE,fileAppender +#log4j.additivity.kafka.network.Processor=false +#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG \ No newline at end of file From 528bd60ed7b0c5c2228f836ae7f208f3f4bb57b2 Mon Sep 17 00:00:00 2001 From: Anastasios Zouzias Date: Thu, 15 Oct 2015 13:39:23 +0200 Subject: [PATCH 2/5] examples: twitter: use properties files for twitter and kafka --- .../src/main/scala/KafkaUtils.scala | 11 ++++++++--- .../src/main/scala/TwitterUtils.scala | 17 +++++++++++++---- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/examples/twitter-stream/src/main/scala/KafkaUtils.scala b/examples/twitter-stream/src/main/scala/KafkaUtils.scala index 54370b5..502166e 100644 --- a/examples/twitter-stream/src/main/scala/KafkaUtils.scala +++ b/examples/twitter-stream/src/main/scala/KafkaUtils.scala @@ -1,3 +1,4 @@ +import java.io.FileInputStream import java.util.Properties import com.cj.kafka.rx.{Record, RxConsumer} @@ -8,10 +9,14 @@ import rx.lang.scala.Observable object KafkaUtils { - val KAFKA = "localhost:9091" - val ZOOKEEPER = "localhost:2181" + val props = new Properties() + val input = new FileInputStream("src/main/resources/kafka.properties") + props.load(input) + input.close() - val CONSUMER_GROUP = "twitter-consumer" + val KAFKA = props.getProperty("kafka.brokers") + val ZOOKEEPER = props.getProperty("kafka.zkQuorum") + val CONSUMER_GROUP = props.getProperty("kafka.twitter_consumer_groupid") type Key = String type Value = String diff --git a/examples/twitter-stream/src/main/scala/TwitterUtils.scala b/examples/twitter-stream/src/main/scala/TwitterUtils.scala index 46cc511..98917b0 100644 --- a/examples/twitter-stream/src/main/scala/TwitterUtils.scala +++ b/examples/twitter-stream/src/main/scala/TwitterUtils.scala @@ -1,3 +1,6 @@ +import java.io.FileInputStream +import java.util.Properties + import org.apache.kafka.clients.producer.ProducerRecord import rx.lang.scala.Observable import rx.lang.scala.subjects.PublishSubject @@ -8,10 +11,16 @@ object TwitterUtils { // twitter app credentials: // https://apps.twitter.com/ - val CONSUMER_KEY = "CONSUMER_KEY" - val CONSUMER_SECRET = "CONSUMER_SECRET" - val ACCESS_TOKEN = "ACCESS_TOKEN" - val ACCESS_SECRET = "ACCESS_SECRET" + val props = new Properties() + val input = new FileInputStream("src/main/resources/twitter.properties") + props.load(input) + input.close() + + val CONSUMER_KEY = props.getProperty("twitter.consumer_key") + val CONSUMER_SECRET = props.getProperty("twitter.consumer_secret") + val ACCESS_TOKEN = props.getProperty("twitter.access_token") + val ACCESS_SECRET = props.getProperty("twitter.access_secret") + def getTwitter(consumerKey: String, consumerSecret: String, accessToken: String, accessSecret: String): TwitterStream = { val config = new twitter4j.conf.ConfigurationBuilder() From d5567fa480bbcd329be15d4827c0c1c790302f8d Mon Sep 17 00:00:00 2001 From: Anastasios Zouzias Date: Thu, 15 Oct 2015 13:39:43 +0200 Subject: [PATCH 3/5] examples: twitter: resources: add kafka and twitter properties files --- examples/twitter-stream/src/main/resources/kafka.properties | 3 +++ examples/twitter-stream/src/main/resources/twitter.properties | 4 ++++ 2 files changed, 7 insertions(+) create mode 100644 examples/twitter-stream/src/main/resources/kafka.properties create mode 100644 examples/twitter-stream/src/main/resources/twitter.properties diff --git a/examples/twitter-stream/src/main/resources/kafka.properties b/examples/twitter-stream/src/main/resources/kafka.properties new file mode 100644 index 0000000..a485873 --- /dev/null +++ b/examples/twitter-stream/src/main/resources/kafka.properties @@ -0,0 +1,3 @@ +kafka.brokers=localhost:9091 +kafka.zkQuorum=localhost:2181 +kafka.twitter_consumer_groupid=twitter-consumer \ No newline at end of file diff --git a/examples/twitter-stream/src/main/resources/twitter.properties b/examples/twitter-stream/src/main/resources/twitter.properties new file mode 100644 index 0000000..c118828 --- /dev/null +++ b/examples/twitter-stream/src/main/resources/twitter.properties @@ -0,0 +1,4 @@ +twitter.consumer_key=KEY +twitter.consumer_secret=SECRET +twitter.access_token=TOKEN +twitter.access_secret=TOKEN_SECRET \ No newline at end of file From 3b12ee33c1ca098081e685d3414858834bb05d13 Mon Sep 17 00:00:00 2001 From: Anastasios Zouzias Date: Thu, 15 Oct 2015 14:34:22 +0200 Subject: [PATCH 4/5] examples: twitter: refactor kafka topic name --- examples/twitter-stream/src/main/resources/kafka.properties | 3 ++- examples/twitter-stream/src/main/scala/KafkaUtils.scala | 1 + examples/twitter-stream/src/main/scala/TwitterStream.scala | 4 ++-- examples/twitter-stream/src/main/scala/TwitterUtils.scala | 1 - 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/examples/twitter-stream/src/main/resources/kafka.properties b/examples/twitter-stream/src/main/resources/kafka.properties index a485873..01d9611 100644 --- a/examples/twitter-stream/src/main/resources/kafka.properties +++ b/examples/twitter-stream/src/main/resources/kafka.properties @@ -1,3 +1,4 @@ kafka.brokers=localhost:9091 kafka.zkQuorum=localhost:2181 -kafka.twitter_consumer_groupid=twitter-consumer \ No newline at end of file +kafka.twitter_consumer_groupid=twitter-consumer +kafka.twitter_topic_prefix=twitter \ No newline at end of file diff --git a/examples/twitter-stream/src/main/scala/KafkaUtils.scala b/examples/twitter-stream/src/main/scala/KafkaUtils.scala index 502166e..a4ddf5b 100644 --- a/examples/twitter-stream/src/main/scala/KafkaUtils.scala +++ b/examples/twitter-stream/src/main/scala/KafkaUtils.scala @@ -17,6 +17,7 @@ object KafkaUtils { val KAFKA = props.getProperty("kafka.brokers") val ZOOKEEPER = props.getProperty("kafka.zkQuorum") val CONSUMER_GROUP = props.getProperty("kafka.twitter_consumer_groupid") + val KAFKA_TWITTER_TOPIC_PREFIX = props.getProperty("kafka.twitter_topic_prefix") type Key = String type Value = String diff --git a/examples/twitter-stream/src/main/scala/TwitterStream.scala b/examples/twitter-stream/src/main/scala/TwitterStream.scala index 812efce..f3d401d 100644 --- a/examples/twitter-stream/src/main/scala/TwitterStream.scala +++ b/examples/twitter-stream/src/main/scala/TwitterStream.scala @@ -14,8 +14,8 @@ object TwitterStream { def main(args: Array[String]) = { await[Observable[String]] { for { - producer <- getProducerStream("tweets", args) - consumer <- getConsumerStream("tweets") + producer <- getProducerStream(KAFKA_TWITTER_TOPIC_PREFIX, args) + consumer <- getConsumerStream(KAFKA_TWITTER_TOPIC_PREFIX) } yield producer.merge(consumer) } foreach ( onNext = { msg => println(msg) }, diff --git a/examples/twitter-stream/src/main/scala/TwitterUtils.scala b/examples/twitter-stream/src/main/scala/TwitterUtils.scala index 98917b0..c074f2a 100644 --- a/examples/twitter-stream/src/main/scala/TwitterUtils.scala +++ b/examples/twitter-stream/src/main/scala/TwitterUtils.scala @@ -21,7 +21,6 @@ object TwitterUtils { val ACCESS_TOKEN = props.getProperty("twitter.access_token") val ACCESS_SECRET = props.getProperty("twitter.access_secret") - def getTwitter(consumerKey: String, consumerSecret: String, accessToken: String, accessSecret: String): TwitterStream = { val config = new twitter4j.conf.ConfigurationBuilder() .setJSONStoreEnabled(true) From 040afc82a78dbadcecbce4ec1b846b344d43f8d9 Mon Sep 17 00:00:00 2001 From: Anastasios Zouzias Date: Tue, 27 Oct 2015 09:09:04 +0100 Subject: [PATCH 5/5] examples:twitter: Readme: update for property files --- examples/twitter-stream/README.md | 32 +++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/examples/twitter-stream/README.md b/examples/twitter-stream/README.md index 38fed96..1cf9753 100644 --- a/examples/twitter-stream/README.md +++ b/examples/twitter-stream/README.md @@ -1,21 +1,41 @@ ### Twitter Streaming Example -kafka-rx composes well with streaming apis +kafka-rx composes well with streaming apis. -here we take the the [Twitter4J](http://twitter4j.org/en/index.html) streaming library and pour the events into kafka +Here we use the [Twitter4J](http://twitter4j.org/en/index.html) streaming library and pour tweets into Kafka. -To do this, we need to create an `Observable[ProducerRecord]` that represent the stream of messages we wish to produce. +To do this, we need to create an `Observable[ProducerRecord]` that represent the stream of messages we wish to produce. ```scala val stream = ... stream.map(toProducerRecord).saveToKafka(kafka) ``` -In [TwitterUtils.scala](src/main/scala/TwitterUtils.scala) we wrap the twitter4j api to return an `Observable[twitter4j.Status]` - and also a place to configure your [app credentials](https://apps.twitter.com/) - required to connect to the twitter api. +In [TwitterUtils.scala](src/main/scala/TwitterUtils.scala) we wrap the twitter4j api to return an `Observable[twitter4j.Status]`. + +Our entry point is [TwitterStream.scala](src/main/scala/TwitterStream.scala) where we create two threads, one to stream data out of twitter and into kafka, and another to stream data out of kafka and into our process. In [KafkaUtils.scala](src/main/scala/KafkaUtils.scala) we configure our kafka producers and consumers with string keys and string values, since we'll be storing the json provided by the twitter api. -Our entry point is [TwitterStream.scala](src/main/scala/TwitterStream.scala) where we create two threads, one to stream data out of twitter and into kafka, and another to stream data out of kafka and into our process. +#### Configuration + +Update [kafka.properties](src/main/resources/kafka.properties) to point to your Kafka server. + +```text +kafka.brokers=localhost:9091 +kafka.zkQuorum=localhost:2181 +kafka.twitter_consumer_groupid=twitter-consumer +kafka.twitter_topic_prefix=twitter +``` + +Update [twitter.properties](src/main/resources/twitter.properties) to you Twitter API credentials. + +```text +twitter.consumer_key=KEY +twitter.consumer_secret=SECRET +twitter.access_token=TOKEN +twitter.access_secret=TOKEN_SECRET +``` Once you have configured your api credentials, try it out: @@ -27,4 +47,4 @@ If you have kafka and zookeeper running locally, you should start seeing some tw Don't worry if it takes a while for the consumer to catch up, the kafka consumer threads wake up every so often to look for new topics. -Try changing the query or the code! \ No newline at end of file +Try changing the query or the code!