diff --git a/pom.xml b/pom.xml
index 468cc24..4eb6b46 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,11 +14,11 @@
0.3.2-SNAPSHOT
- 0.8.2.1
+ 0.10.0.0
2.8.0
- 0.25.0
+ 0.26.2
2.11
- 2.11.6
+ 2.11.8
@@ -50,7 +50,7 @@
org.scalatest
scalatest_${scala.compat.version}
- 2.1.3
+ 2.2.6
test
diff --git a/src/test/scala/com/cj/kafka/rx/ProducerRecordTest.scala b/src/test/scala/com/cj/kafka/rx/ProducerRecordTest.scala
index bd5816e..ccea909 100644
--- a/src/test/scala/com/cj/kafka/rx/ProducerRecordTest.scala
+++ b/src/test/scala/com/cj/kafka/rx/ProducerRecordTest.scala
@@ -1,7 +1,8 @@
package com.cj.kafka.rx
-import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata, MockProducer}
-import org.scalatest.{Matchers, BeforeAndAfter, FlatSpec}
+import org.apache.kafka.clients.producer.{MockProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
import rx.lang.scala.Observable
import scala.collection.JavaConversions._
@@ -23,7 +24,7 @@ class ProducerRecordTest extends FlatSpec with Matchers with BeforeAndAfter {
val messages = urls.map(s => new Record(key = s.getBytes("UTF-8"), value = s.getBytes("UTF-8"), "", 0, 0))
val stream = Observable.from(messages)
- val producer = new MockProducer()
+ val producer = new MockProducer(true, new ByteArraySerializer(), new ByteArraySerializer())
def pred(s: String) = s.toLowerCase.contains("/include-me")
@@ -55,11 +56,10 @@ class ProducerRecordTest extends FlatSpec with Matchers with BeforeAndAfter {
) map { x =>
x.produce("lol")
}
- val fakeProducer = new MockProducer()
+ val fakeProducer = new MockProducer(true, new ByteArraySerializer(), new ByteArraySerializer())
val result = stream.saveToKafka(fakeProducer).toBlocking.toList
result.last.commit()
commitWasCalled should be (true)
}
-
}
diff --git a/src/test/scala/com/cj/kafka/rx/RxConsumerIntegrationTest.scala b/src/test/scala/com/cj/kafka/rx/RxConsumerIntegrationTest.scala
index a19a9bd..e7b89c0 100644
--- a/src/test/scala/com/cj/kafka/rx/RxConsumerIntegrationTest.scala
+++ b/src/test/scala/com/cj/kafka/rx/RxConsumerIntegrationTest.scala
@@ -6,6 +6,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.RetryUntilElapsed
import org.apache.curator.test.TestingServer
import org.apache.kafka.clients.producer.MockProducer
+import org.apache.kafka.common.serialization.ByteArraySerializer
import org.scalatest._
import rx.lang.scala.subjects.{PublishSubject, ReplaySubject}
import rx.lang.scala.{Observer, Observable}
@@ -85,7 +86,7 @@ class RxConsumerIntegrationTest extends FlatSpec with ShouldMatchers with Before
it should "deliver messages to a producer" in {
val fakeStream = Observable.from(getFakeKafkaMessages(10) map { msg => new Record(msg) })
- val producer = new MockProducer(true)
+ val producer = new MockProducer(true, new ByteArraySerializer(), new ByteArraySerializer())
val savedMessages = fakeStream.map(_.produce("test-topic")).saveToKafka(producer).toBlocking.toList
val history = producer.history
savedMessages.size should be(10)