diff --git a/pom.xml b/pom.xml index 468cc24..4eb6b46 100644 --- a/pom.xml +++ b/pom.xml @@ -14,11 +14,11 @@ 0.3.2-SNAPSHOT - 0.8.2.1 + 0.10.0.0 2.8.0 - 0.25.0 + 0.26.2 2.11 - 2.11.6 + 2.11.8 @@ -50,7 +50,7 @@ org.scalatest scalatest_${scala.compat.version} - 2.1.3 + 2.2.6 test diff --git a/src/test/scala/com/cj/kafka/rx/ProducerRecordTest.scala b/src/test/scala/com/cj/kafka/rx/ProducerRecordTest.scala index bd5816e..ccea909 100644 --- a/src/test/scala/com/cj/kafka/rx/ProducerRecordTest.scala +++ b/src/test/scala/com/cj/kafka/rx/ProducerRecordTest.scala @@ -1,7 +1,8 @@ package com.cj.kafka.rx -import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata, MockProducer} -import org.scalatest.{Matchers, BeforeAndAfter, FlatSpec} +import org.apache.kafka.clients.producer.{MockProducer, ProducerRecord} +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} import rx.lang.scala.Observable import scala.collection.JavaConversions._ @@ -23,7 +24,7 @@ class ProducerRecordTest extends FlatSpec with Matchers with BeforeAndAfter { val messages = urls.map(s => new Record(key = s.getBytes("UTF-8"), value = s.getBytes("UTF-8"), "", 0, 0)) val stream = Observable.from(messages) - val producer = new MockProducer() + val producer = new MockProducer(true, new ByteArraySerializer(), new ByteArraySerializer()) def pred(s: String) = s.toLowerCase.contains("/include-me") @@ -55,11 +56,10 @@ class ProducerRecordTest extends FlatSpec with Matchers with BeforeAndAfter { ) map { x => x.produce("lol") } - val fakeProducer = new MockProducer() + val fakeProducer = new MockProducer(true, new ByteArraySerializer(), new ByteArraySerializer()) val result = stream.saveToKafka(fakeProducer).toBlocking.toList result.last.commit() commitWasCalled should be (true) } - } diff --git a/src/test/scala/com/cj/kafka/rx/RxConsumerIntegrationTest.scala b/src/test/scala/com/cj/kafka/rx/RxConsumerIntegrationTest.scala index a19a9bd..e7b89c0 100644 --- a/src/test/scala/com/cj/kafka/rx/RxConsumerIntegrationTest.scala +++ b/src/test/scala/com/cj/kafka/rx/RxConsumerIntegrationTest.scala @@ -6,6 +6,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.retry.RetryUntilElapsed import org.apache.curator.test.TestingServer import org.apache.kafka.clients.producer.MockProducer +import org.apache.kafka.common.serialization.ByteArraySerializer import org.scalatest._ import rx.lang.scala.subjects.{PublishSubject, ReplaySubject} import rx.lang.scala.{Observer, Observable} @@ -85,7 +86,7 @@ class RxConsumerIntegrationTest extends FlatSpec with ShouldMatchers with Before it should "deliver messages to a producer" in { val fakeStream = Observable.from(getFakeKafkaMessages(10) map { msg => new Record(msg) }) - val producer = new MockProducer(true) + val producer = new MockProducer(true, new ByteArraySerializer(), new ByteArraySerializer()) val savedMessages = fakeStream.map(_.produce("test-topic")).saveToKafka(producer).toBlocking.toList val history = producer.history savedMessages.size should be(10)