From 12ddd1e4f66fa0e5e54ad4ace3c53442422efefb Mon Sep 17 00:00:00 2001 From: Thomas Han Date: Wed, 8 Jun 2016 15:21:13 +1000 Subject: [PATCH 1/2] Upgrade kafka to 0.10.0.0 --- pom.xml | 6 +++--- .../scala/com/cj/kafka/rx/ProducerRecordTest.scala | 10 +++++----- .../com/cj/kafka/rx/RxConsumerIntegrationTest.scala | 3 ++- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index 468cc24..954c155 100644 --- a/pom.xml +++ b/pom.xml @@ -14,11 +14,11 @@ 0.3.2-SNAPSHOT - 0.8.2.1 + 0.10.0.0 2.8.0 0.25.0 2.11 - 2.11.6 + 2.11.8 @@ -50,7 +50,7 @@ org.scalatest scalatest_${scala.compat.version} - 2.1.3 + 2.2.6 test diff --git a/src/test/scala/com/cj/kafka/rx/ProducerRecordTest.scala b/src/test/scala/com/cj/kafka/rx/ProducerRecordTest.scala index bd5816e..ccea909 100644 --- a/src/test/scala/com/cj/kafka/rx/ProducerRecordTest.scala +++ b/src/test/scala/com/cj/kafka/rx/ProducerRecordTest.scala @@ -1,7 +1,8 @@ package com.cj.kafka.rx -import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata, MockProducer} -import org.scalatest.{Matchers, BeforeAndAfter, FlatSpec} +import org.apache.kafka.clients.producer.{MockProducer, ProducerRecord} +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} import rx.lang.scala.Observable import scala.collection.JavaConversions._ @@ -23,7 +24,7 @@ class ProducerRecordTest extends FlatSpec with Matchers with BeforeAndAfter { val messages = urls.map(s => new Record(key = s.getBytes("UTF-8"), value = s.getBytes("UTF-8"), "", 0, 0)) val stream = Observable.from(messages) - val producer = new MockProducer() + val producer = new MockProducer(true, new ByteArraySerializer(), new ByteArraySerializer()) def pred(s: String) = s.toLowerCase.contains("/include-me") @@ -55,11 +56,10 @@ class ProducerRecordTest extends FlatSpec with Matchers with BeforeAndAfter { ) map { x => x.produce("lol") } - val fakeProducer = new MockProducer() + val fakeProducer = new MockProducer(true, new ByteArraySerializer(), new ByteArraySerializer()) val result = stream.saveToKafka(fakeProducer).toBlocking.toList result.last.commit() commitWasCalled should be (true) } - } diff --git a/src/test/scala/com/cj/kafka/rx/RxConsumerIntegrationTest.scala b/src/test/scala/com/cj/kafka/rx/RxConsumerIntegrationTest.scala index a19a9bd..e7b89c0 100644 --- a/src/test/scala/com/cj/kafka/rx/RxConsumerIntegrationTest.scala +++ b/src/test/scala/com/cj/kafka/rx/RxConsumerIntegrationTest.scala @@ -6,6 +6,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.retry.RetryUntilElapsed import org.apache.curator.test.TestingServer import org.apache.kafka.clients.producer.MockProducer +import org.apache.kafka.common.serialization.ByteArraySerializer import org.scalatest._ import rx.lang.scala.subjects.{PublishSubject, ReplaySubject} import rx.lang.scala.{Observer, Observable} @@ -85,7 +86,7 @@ class RxConsumerIntegrationTest extends FlatSpec with ShouldMatchers with Before it should "deliver messages to a producer" in { val fakeStream = Observable.from(getFakeKafkaMessages(10) map { msg => new Record(msg) }) - val producer = new MockProducer(true) + val producer = new MockProducer(true, new ByteArraySerializer(), new ByteArraySerializer()) val savedMessages = fakeStream.map(_.produce("test-topic")).saveToKafka(producer).toBlocking.toList val history = producer.history savedMessages.size should be(10) From 72df329829cd0e9a735128fa40de70951ec2b1d5 Mon Sep 17 00:00:00 2001 From: Thomas Han Date: Mon, 5 Sep 2016 21:04:23 +1000 Subject: [PATCH 2/2] Upgrade rx-scala to 0.26.2 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 954c155..4eb6b46 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ 0.10.0.0 2.8.0 - 0.25.0 + 0.26.2 2.11 2.11.8