From 735256858d1ed54437cc53b5b77cf40ae6ebd4f6 Mon Sep 17 00:00:00 2001 From: Artem Sidorkin Date: Thu, 28 Jun 2018 17:59:35 +0300 Subject: [PATCH 1/3] Fixing swimlane dispatcher --- .../javaclient/domain/SwimlaneDispatcher.java | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/eventuate-client-java-event-handling/src/main/java/io/eventuate/javaclient/domain/SwimlaneDispatcher.java b/eventuate-client-java-event-handling/src/main/java/io/eventuate/javaclient/domain/SwimlaneDispatcher.java index 92320e6..2f48ef0 100644 --- a/eventuate-client-java-event-handling/src/main/java/io/eventuate/javaclient/domain/SwimlaneDispatcher.java +++ b/eventuate-client-java-event-handling/src/main/java/io/eventuate/javaclient/domain/SwimlaneDispatcher.java @@ -59,26 +59,29 @@ public QueuedEvent(DispatchedEvent event, Function public void processQueuedEvent() { - QueuedEvent qe = getNextEvent(); - if (qe == null) - logger.trace("No queued event for {} {}", subscriberId, swimlane); - else { - logger.trace("Invoking handler for event for {} {} {}", subscriberId, swimlane, qe.event); - qe.target.apply(qe.event).handle((success, throwable) -> { - if (throwable == null) { - logger.debug("Handler succeeded for event for {} {} {}", subscriberId, swimlane, qe.event); - boolean x = qe.future.complete(success); - logger.trace("Completed future success {}", x); - logger.trace("Maybe processing next queued event {} {}", subscriberId, swimlane); - processNextQueuedEvent(); - } else { - logger.error(String.format("handler for %s %s %s failed: ", subscriberId, swimlane, qe.event), throwable); - boolean x = qe.future.completeExceptionally(throwable); - logger.trace("Completed future failed{}", x); - // TODO - what to do here??? - } - return null; - }); + while (true) { + QueuedEvent qe = getNextEvent(); + if (qe == null) { + logger.trace("No queued event for {} {}", subscriberId, swimlane); + return; + } + else { + logger.trace("Invoking handler for event for {} {} {}", subscriberId, swimlane, qe.event); + qe.target.apply(qe.event).handle((success, throwable) -> { + if (throwable == null) { + logger.debug("Handler succeeded for event for {} {} {}", subscriberId, swimlane, qe.event); + boolean x = qe.future.complete(success); + logger.trace("Completed future success {}", x); + logger.trace("Maybe processing next queued event {} {}", subscriberId, swimlane); + } else { + logger.error(String.format("handler for %s %s %s failed: ", subscriberId, swimlane, qe.event), throwable); + boolean x = qe.future.completeExceptionally(throwable); + logger.trace("Completed future failed{}", x); + // TODO - what to do here??? + } + return null; + }); + } } } From 57c1bf6ce6eb2e1e1a66dc7714a08800dba664cb Mon Sep 17 00:00:00 2001 From: Artem Sidorkin Date: Fri, 29 Jun 2018 19:44:18 +0300 Subject: [PATCH 2/3] Adding swimlane dispatcher test. --- .../build.gradle | 3 +- .../javaclient/domain/SwimlaneDispatcher.java | 4 + .../domain/SwimlaneDispatcherTest.java | 80 +++++++++++++++++++ 3 files changed, 85 insertions(+), 2 deletions(-) create mode 100644 eventuate-client-java-event-handling/src/test/java/io/eventuate/javaclient/domain/SwimlaneDispatcherTest.java diff --git a/eventuate-client-java-event-handling/build.gradle b/eventuate-client-java-event-handling/build.gradle index 941d13f..a6280b1 100644 --- a/eventuate-client-java-event-handling/build.gradle +++ b/eventuate-client-java-event-handling/build.gradle @@ -8,6 +8,5 @@ dependencies { testCompile "junit:junit:4.11" testCompile 'org.mockito:mockito-core:1.9.5' - - + compile project(":eventuate-client-java-test-util") } \ No newline at end of file diff --git a/eventuate-client-java-event-handling/src/main/java/io/eventuate/javaclient/domain/SwimlaneDispatcher.java b/eventuate-client-java-event-handling/src/main/java/io/eventuate/javaclient/domain/SwimlaneDispatcher.java index 2f48ef0..16848dc 100644 --- a/eventuate-client-java-event-handling/src/main/java/io/eventuate/javaclient/domain/SwimlaneDispatcher.java +++ b/eventuate-client-java-event-handling/src/main/java/io/eventuate/javaclient/domain/SwimlaneDispatcher.java @@ -28,6 +28,10 @@ public SwimlaneDispatcher(String subscriberId, Integer swimlane, Executor execut this.executor = executor; } + public boolean getRunning() { + return running.get(); + } + public CompletableFuture dispatch(DispatchedEvent de, Function, CompletableFuture> target) { synchronized (queue) { QueuedEvent qe = new QueuedEvent(de, target); diff --git a/eventuate-client-java-event-handling/src/test/java/io/eventuate/javaclient/domain/SwimlaneDispatcherTest.java b/eventuate-client-java-event-handling/src/test/java/io/eventuate/javaclient/domain/SwimlaneDispatcherTest.java new file mode 100644 index 0000000..6d106f7 --- /dev/null +++ b/eventuate-client-java-event-handling/src/test/java/io/eventuate/javaclient/domain/SwimlaneDispatcherTest.java @@ -0,0 +1,80 @@ +package io.eventuate.javaclient.domain; + +import io.eventuate.DispatchedEvent; +import io.eventuate.Event; +import io.eventuate.EventContext; +import io.eventuate.Int128; +import io.eventuate.testutil.Eventually; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +public class SwimlaneDispatcherTest { + private SwimlaneDispatcher swimlaneDispatcher; + private AtomicInteger numberOfEventsReceived; + private Function, CompletableFuture> handler; + + @Before + public void init() { + swimlaneDispatcher = new SwimlaneDispatcher("1", 1, Executors.newCachedThreadPool()); + numberOfEventsReceived = new AtomicInteger(0); + } + + @Test + public void shouldDispatchManyEvents() { + int numberOfEventsToSend = 5; + + createHandler(); + + sendEvents(numberOfEventsToSend); + assertEventsReceived(numberOfEventsToSend); + } + + @Test + public void testShouldRestart() { + int numberOfEventsToSend = 5; + + createHandler(); + + sendEvents(numberOfEventsToSend); + assertDispatcherStopped(); + sendEvents(numberOfEventsToSend); + assertEventsReceived(numberOfEventsToSend * 2); + } + + private void createHandler() { + handler = evnt -> + CompletableFuture.supplyAsync(() -> { + numberOfEventsReceived.incrementAndGet(); + try { + Thread.sleep(50); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return null; + }); + } + + private void sendEvents(int numberOfEventsToSend) { + for (int i = 0; i < numberOfEventsToSend; i++) { + if (i > 0) { + Assert.assertTrue(swimlaneDispatcher.getRunning()); + } + swimlaneDispatcher.dispatch(new DispatchedEvent<>("1", new Int128(1, 1), new Event() {}, 1, 1L, new EventContext(), Optional.empty()), handler); + } + } + + private void assertEventsReceived(int numberOfEventsToSend) { + Eventually.eventually(() -> Assert.assertEquals(numberOfEventsToSend, numberOfEventsReceived.get())); + } + + private void assertDispatcherStopped() { + Eventually.eventually(() -> Assert.assertFalse(swimlaneDispatcher.getRunning())); + } +} From db31e7570f6087ae0dad5d14ccc060b565970b98 Mon Sep 17 00:00:00 2001 From: Artem Sidorkin Date: Mon, 2 Jul 2018 13:08:23 +0300 Subject: [PATCH 3/3] Extending swimlane dispatcher test. --- .../domain/SwimlaneDispatcherTest.java | 92 +++++++++++++++---- 1 file changed, 75 insertions(+), 17 deletions(-) diff --git a/eventuate-client-java-event-handling/src/test/java/io/eventuate/javaclient/domain/SwimlaneDispatcherTest.java b/eventuate-client-java-event-handling/src/test/java/io/eventuate/javaclient/domain/SwimlaneDispatcherTest.java index 6d106f7..a1e59a2 100644 --- a/eventuate-client-java-event-handling/src/test/java/io/eventuate/javaclient/domain/SwimlaneDispatcherTest.java +++ b/eventuate-client-java-event-handling/src/test/java/io/eventuate/javaclient/domain/SwimlaneDispatcherTest.java @@ -9,38 +9,54 @@ import org.junit.Before; import org.junit.Test; -import java.util.Optional; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; public class SwimlaneDispatcherTest { private SwimlaneDispatcher swimlaneDispatcher; - private AtomicInteger numberOfEventsReceived; private Function, CompletableFuture> handler; + private Set dataReturnedByHandlerFutures; + private Set dataReceivedFromDispatcherFutures; + private Set throwableDataReturnedByHandlerFutures; + private Set throwableDataReceivedFromDispatcherFutures; @Before public void init() { + dataReturnedByHandlerFutures = new HashSet<>(); + dataReceivedFromDispatcherFutures = new HashSet<>(); + throwableDataReturnedByHandlerFutures = new HashSet<>(); + throwableDataReceivedFromDispatcherFutures = new HashSet<>(); + swimlaneDispatcher = new SwimlaneDispatcher("1", 1, Executors.newCachedThreadPool()); - numberOfEventsReceived = new AtomicInteger(0); } @Test public void shouldDispatchManyEvents() { int numberOfEventsToSend = 5; - createHandler(); + createHandler(false); sendEvents(numberOfEventsToSend); assertEventsReceived(numberOfEventsToSend); } + @Test + public void shouldHandleFailures() { + int numberOfEventsToSend = 5; + + createHandler(true); + + sendEvents(numberOfEventsToSend); + assertEventsFailed(numberOfEventsToSend); + } + @Test public void testShouldRestart() { int numberOfEventsToSend = 5; - createHandler(); + createHandler(false); sendEvents(numberOfEventsToSend); assertDispatcherStopped(); @@ -48,17 +64,27 @@ public void testShouldRestart() { assertEventsReceived(numberOfEventsToSend * 2); } - private void createHandler() { - handler = evnt -> - CompletableFuture.supplyAsync(() -> { - numberOfEventsReceived.incrementAndGet(); - try { - Thread.sleep(50); - } catch (InterruptedException e) { - throw new RuntimeException(e); + private void createHandler(boolean shouldFail) { + handler = event -> { + + try { + Thread.sleep(50); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + return CompletableFuture.supplyAsync(() -> { + String data = UUID.randomUUID().toString(); + + if (shouldFail) { + throwableDataReturnedByHandlerFutures.add(data); + throw new RuntimeException(data); } - return null; + + dataReturnedByHandlerFutures.add(data); + return data; }); + }; } private void sendEvents(int numberOfEventsToSend) { @@ -66,12 +92,44 @@ private void sendEvents(int numberOfEventsToSend) { if (i > 0) { Assert.assertTrue(swimlaneDispatcher.getRunning()); } - swimlaneDispatcher.dispatch(new DispatchedEvent<>("1", new Int128(1, 1), new Event() {}, 1, 1L, new EventContext(), Optional.empty()), handler); + CompletableFuture future = (CompletableFuture)swimlaneDispatcher.dispatch(new DispatchedEvent<>("1", + new Int128(1, 1), + new Event() {}, + 1, + 1L, + new EventContext(), + Optional.empty()), handler); + + future.whenCompleteAsync((data, throwable) -> { + if (throwable != null) { + throwableDataReceivedFromDispatcherFutures.add(throwable.getCause().getMessage()); + } else { + dataReceivedFromDispatcherFutures.add(data); + } + }); } } private void assertEventsReceived(int numberOfEventsToSend) { - Eventually.eventually(() -> Assert.assertEquals(numberOfEventsToSend, numberOfEventsReceived.get())); + Eventually.eventually(() -> { + Assert.assertTrue(throwableDataReturnedByHandlerFutures.isEmpty()); + Assert.assertTrue(throwableDataReceivedFromDispatcherFutures.isEmpty()); + + Assert.assertEquals(numberOfEventsToSend, dataReturnedByHandlerFutures.size()); + Assert.assertEquals(numberOfEventsToSend, dataReceivedFromDispatcherFutures.size()); + Assert.assertEquals(dataReturnedByHandlerFutures, dataReceivedFromDispatcherFutures); + }); + } + + private void assertEventsFailed(int numberOfEventsToSend) { + Eventually.eventually(() -> { + Assert.assertTrue(dataReceivedFromDispatcherFutures.isEmpty()); + Assert.assertTrue(dataReturnedByHandlerFutures.isEmpty()); + + Assert.assertEquals(numberOfEventsToSend, throwableDataReturnedByHandlerFutures.size()); + Assert.assertEquals(numberOfEventsToSend, throwableDataReceivedFromDispatcherFutures.size()); + Assert.assertEquals(throwableDataReturnedByHandlerFutures, throwableDataReceivedFromDispatcherFutures); + }); } private void assertDispatcherStopped() {