diff --git a/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/DefaultSerializedEventDeserializer.java b/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/DefaultSerializedEventDeserializer.java new file mode 100644 index 0000000..893474c --- /dev/null +++ b/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/DefaultSerializedEventDeserializer.java @@ -0,0 +1,36 @@ +package io.eventuate.javaclient.commonimpl; + +import io.eventuate.DispatchedEvent; +import io.eventuate.EndOfCurrentEventsReachedEvent; +import io.eventuate.Event; + +import java.util.Optional; + +public class DefaultSerializedEventDeserializer implements SerializedEventDeserializer { + + @Override + public Optional> toDispatchedEvent(SerializedEvent se) { + String eventType = se.getEventType(); + Class eventClass = toEventClass(eventType); + + Event event = JSonMapper.fromJson(se.getEventData(), eventClass); + return Optional.of(new DispatchedEvent<>(se.getEntityId(), + se.getId(), + event, + se.getSwimLane(), + se.getOffset(), se.getEventContext())); + } + + private Class toEventClass(String eventType) { + if ("net.chrisrichardson.eventstore.subscriptions.EndOfCurrentEventsReachedEvent".equals(eventType)) { + eventType = EndOfCurrentEventsReachedEvent.class.getName(); + } + try { + return (Class) Class.forName(eventType); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + +} diff --git a/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/EventuateAggregateStoreImpl.java b/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/EventuateAggregateStoreImpl.java index 0413e32..1e4202a 100644 --- a/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/EventuateAggregateStoreImpl.java +++ b/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/EventuateAggregateStoreImpl.java @@ -14,12 +14,17 @@ public class EventuateAggregateStoreImpl implements EventuateAggregateStore { private AggregateCrud aggregateCrud; private AggregateEvents aggregateEvents; + private SerializedEventDeserializer serializedEventDeserializer = new DefaultSerializedEventDeserializer(); public EventuateAggregateStoreImpl(AggregateCrud aggregateCrud, AggregateEvents aggregateEvents) { this.aggregateCrud = aggregateCrud; this.aggregateEvents = aggregateEvents; } + public void setSerializedEventDeserializer(SerializedEventDeserializer serializedEventDeserializer) { + this.serializedEventDeserializer = serializedEventDeserializer; + } + @Override public > CompletableFuture save(Class clasz, List events) { return save(clasz, events, Optional.empty()); @@ -89,30 +94,8 @@ public > CompletableFuture update(Cla @Override public CompletableFuture subscribe(String subscriberId, Map> aggregatesAndEvents, SubscriberOptions subscriberOptions, Function, CompletableFuture> handler) { - return aggregateEvents.subscribe(subscriberId, aggregatesAndEvents, subscriberOptions, se -> handler.apply(toDispatchedEvent(se))); - } - - private DispatchedEvent toDispatchedEvent(SerializedEvent se) { - String eventType = se.getEventType(); - Class eventClass = toEventClass(eventType); - - Event event = JSonMapper.fromJson(se.getEventData(), eventClass); - return new DispatchedEvent<>(se.getEntityId(), - se.getId(), - event, - se.getSwimLane(), - se.getOffset(), se.getEventContext()); + return aggregateEvents.subscribe(subscriberId, aggregatesAndEvents, subscriberOptions, + se -> serializedEventDeserializer.toDispatchedEvent(se).map(handler::apply).orElse(CompletableFuture.completedFuture(null))); } - private Class toEventClass(String eventType) { - if ("net.chrisrichardson.eventstore.subscriptions.EndOfCurrentEventsReachedEvent".equals(eventType)) { - eventType = EndOfCurrentEventsReachedEvent.class.getName(); - } - try { - return (Class) Class.forName(eventType); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } } diff --git a/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/SerializedEventDeserializer.java b/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/SerializedEventDeserializer.java new file mode 100644 index 0000000..ecac9b2 --- /dev/null +++ b/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/SerializedEventDeserializer.java @@ -0,0 +1,10 @@ +package io.eventuate.javaclient.commonimpl; + +import io.eventuate.DispatchedEvent; +import io.eventuate.Event; + +import java.util.Optional; + +public interface SerializedEventDeserializer { + Optional> toDispatchedEvent(SerializedEvent se); +} diff --git a/eventuate-client-java-event-handling/src/main/java/io/eventuate/javaclient/domain/SwimlaneBasedDispatcher.java b/eventuate-client-java-event-handling/src/main/java/io/eventuate/javaclient/domain/SwimlaneBasedDispatcher.java index ebe2a4c..41536db 100644 --- a/eventuate-client-java-event-handling/src/main/java/io/eventuate/javaclient/domain/SwimlaneBasedDispatcher.java +++ b/eventuate-client-java-event-handling/src/main/java/io/eventuate/javaclient/domain/SwimlaneBasedDispatcher.java @@ -2,6 +2,8 @@ import io.eventuate.DispatchedEvent; import io.eventuate.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -10,6 +12,8 @@ public class SwimlaneBasedDispatcher { + private static Logger logger = LoggerFactory.getLogger(SwimlaneBasedDispatcher.class); + private final ConcurrentHashMap map = new ConcurrentHashMap<>(); private Executor executor; private String subscriberId; @@ -23,10 +27,15 @@ public CompletableFuture dispatch(DispatchedEvent de, Function dispatch(DispatchedEvent de, Function event; private Function, CompletableFuture> target; @@ -54,49 +58,42 @@ public QueuedEvent(DispatchedEvent event, Function } - private class MyRunnable implements Runnable { - private Integer swimlane; - - public MyRunnable(Integer swimlane) { - this.swimlane = swimlane; - } - - @Override - public void run() { - logger.trace("Starting thread for {}", swimlane); - while (true) { - QueuedEvent qe = getNextEvent(); - if (qe == null) - break; - logger.trace("Processing event for {}", swimlane); - qe.target.apply(qe.event).handle((success, throwable) -> { - if (throwable == null) { - logger.info("Handler succeeded"); - boolean x = qe.future.complete(success); - logger.trace("Completed future success {}", x); - } else { - logger.error("handler for " + subscriberId + " for event " + qe.event + " failed: ", throwable); - boolean x = qe.future.completeExceptionally(throwable); - logger.trace("Completed future failed{}", x); - } - return null; - }); - } - logger.trace("Exiting thread for {}", swimlane); + public void processQueuedEvent() { + QueuedEvent qe = getNextEvent(); + if (qe == null) + logger.trace("No queued event for {} {}", subscriberId, swimlane); + else { + logger.trace("Invoking handler for event for {} {} {}", subscriberId, swimlane, qe.event); + qe.target.apply(qe.event).handle((success, throwable) -> { + if (throwable == null) { + logger.info("Handler succeeded for event for {} {} {}", subscriberId, swimlane, qe.event); + boolean x = qe.future.complete(success); + logger.trace("Completed future success {}", x); + logger.trace("Maybe processing next queued event {} {}", subscriberId, swimlane); + processNextQueuedEvent(); + } else { + logger.error("handler for {} {} {} failed: ", throwable, subscriberId, swimlane, qe.event); + boolean x = qe.future.completeExceptionally(throwable); + logger.trace("Completed future failed{}", x); + // TODO - what to do here??? + } + return null; + }); } + } - private QueuedEvent getNextEvent() { - QueuedEvent qe1 = queue.poll(); - if (qe1 != null) - return qe1; + private QueuedEvent getNextEvent() { + QueuedEvent qe1 = queue.poll(); + if (qe1 != null) + return qe1; - synchronized (queue) { - QueuedEvent qe = queue.poll(); - if (qe == null) { - running.compareAndSet(true, false); - } - return qe; + synchronized (queue) { + QueuedEvent qe = queue.poll(); + if (qe == null) { + running.compareAndSet(true, false); } + return qe; } } + } diff --git a/eventuate-client-java-http-stomp-spring/src/main/java/io/eventuate/javaclient/spring/httpstomp/EventuateHttpStompClientConfiguration.java b/eventuate-client-java-http-stomp-spring/src/main/java/io/eventuate/javaclient/spring/httpstomp/EventuateHttpStompClientConfiguration.java index 884f07d..f7cc7cb 100644 --- a/eventuate-client-java-http-stomp-spring/src/main/java/io/eventuate/javaclient/spring/httpstomp/EventuateHttpStompClientConfiguration.java +++ b/eventuate-client-java-http-stomp-spring/src/main/java/io/eventuate/javaclient/spring/httpstomp/EventuateHttpStompClientConfiguration.java @@ -3,10 +3,12 @@ import io.eventuate.javaclient.commonimpl.EventuateAggregateStoreImpl; import io.eventuate.javaclient.commonimpl.AggregateEvents; import io.eventuate.EventuateAggregateStore; +import io.eventuate.javaclient.commonimpl.SerializedEventDeserializer; import io.eventuate.javaclient.restclient.EventuateRESTClient; import io.eventuate.javaclient.commonimpl.AggregateCrud; import io.eventuate.javaclient.stompclient.EventuateSTOMPClient; import io.vertx.core.Vertx; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -18,9 +20,17 @@ @EnableConfigurationProperties(EventuateHttpStompClientConfigurationProperties.class) public class EventuateHttpStompClientConfiguration { + @Autowired(required=false) + private SerializedEventDeserializer serializedEventDeserializer; + @Bean public EventuateAggregateStore httpStompEventStore(AggregateCrud restClient, AggregateEvents stompClient) { - return new EventuateAggregateStoreImpl(restClient, stompClient); + EventuateAggregateStoreImpl eventuateAggregateStore = new EventuateAggregateStoreImpl(restClient, stompClient); + + if (serializedEventDeserializer != null) + eventuateAggregateStore.setSerializedEventDeserializer(serializedEventDeserializer); + + return eventuateAggregateStore; } @Bean diff --git a/eventuate-client-java-http-stomp/src/main/java/io/eventuate/javaclient/stompclient/AckOrderTracker.java b/eventuate-client-java-http-stomp/src/main/java/io/eventuate/javaclient/stompclient/AckOrderTracker.java index d5ce427..b1e15e3 100644 --- a/eventuate-client-java-http-stomp/src/main/java/io/eventuate/javaclient/stompclient/AckOrderTracker.java +++ b/eventuate-client-java-http-stomp/src/main/java/io/eventuate/javaclient/stompclient/AckOrderTracker.java @@ -2,6 +2,7 @@ import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; +import org.apache.commons.lang.builder.ToStringBuilder; import java.util.*; import java.util.stream.Collectors; @@ -36,10 +37,19 @@ synchronized public List ack(String ackHeader) { } } - private class PendingAckHeader { + public List getPendingHeaders() { + return pendingHeaders; + } + + public class PendingAckHeader { private String ackHeader; private boolean acked; + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } + public PendingAckHeader(String ackHeader) { this.ackHeader = ackHeader; } diff --git a/eventuate-client-java-http-stomp/src/main/java/io/eventuate/javaclient/stompclient/EventuateSTOMPClient.java b/eventuate-client-java-http-stomp/src/main/java/io/eventuate/javaclient/stompclient/EventuateSTOMPClient.java index 849a206..437f339 100644 --- a/eventuate-client-java-http-stomp/src/main/java/io/eventuate/javaclient/stompclient/EventuateSTOMPClient.java +++ b/eventuate-client-java-http-stomp/src/main/java/io/eventuate/javaclient/stompclient/EventuateSTOMPClient.java @@ -145,6 +145,9 @@ private void frameHandler(Frame frame, Subscription sub) { private void handleEvent(Subscription sub, SerializedEvent serializedEvent, String ackHeader) { ackOrderTracker.add(ackHeader); vertx.executeBlocking(future -> { + + logger.trace("Invoking handler for subscription {} for event {}", sub.subscriberId, serializedEvent); + sub.handler.apply(serializedEvent).handle((result, e) -> { if (e != null) { future.fail(e); @@ -155,14 +158,17 @@ private void handleEvent(Subscription sub, SerializedEvent serializedEvent, Stri }); }, false, result -> { if (result.failed()) { - result.cause().printStackTrace(); + // TODO - what else to do here??? + logger.error("Failed handler for subscription {} for event {}", result.cause(), sub.subscriberId, serializedEvent); } else { + logger.trace("Successfully completed handler for subscription {} for event {}", sub.subscriberId, serializedEvent); context.runOnContext(ignored -> { for (String ah : ackOrderTracker.ack(ackHeader)) { if (logger.isTraceEnabled()) logger.trace("Sending acknowledgement: " + ah); state.connection.ack(ah); } + logger.trace("Pending ack headers {} {}", sub.subscriberId, ackOrderTracker.getPendingHeaders()); }); } });