Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.eventuate.javaclient.commonimpl;

import io.eventuate.DispatchedEvent;
import io.eventuate.EndOfCurrentEventsReachedEvent;
import io.eventuate.Event;

import java.util.Optional;

public class DefaultSerializedEventDeserializer implements SerializedEventDeserializer {

@Override
public Optional<DispatchedEvent<Event>> toDispatchedEvent(SerializedEvent se) {
String eventType = se.getEventType();
Class<Event> eventClass = toEventClass(eventType);

Event event = JSonMapper.fromJson(se.getEventData(), eventClass);
return Optional.of(new DispatchedEvent<>(se.getEntityId(),
se.getId(),
event,
se.getSwimLane(),
se.getOffset(), se.getEventContext()));
}

private Class<Event> toEventClass(String eventType) {
if ("net.chrisrichardson.eventstore.subscriptions.EndOfCurrentEventsReachedEvent".equals(eventType)) {
eventType = EndOfCurrentEventsReachedEvent.class.getName();
}
try {
return (Class<Event>) Class.forName(eventType);
} catch (ClassNotFoundException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ public class EventuateAggregateStoreImpl implements EventuateAggregateStore {

private AggregateCrud aggregateCrud;
private AggregateEvents aggregateEvents;
private SerializedEventDeserializer serializedEventDeserializer = new DefaultSerializedEventDeserializer();

public EventuateAggregateStoreImpl(AggregateCrud aggregateCrud, AggregateEvents aggregateEvents) {
this.aggregateCrud = aggregateCrud;
this.aggregateEvents = aggregateEvents;
}

public void setSerializedEventDeserializer(SerializedEventDeserializer serializedEventDeserializer) {
this.serializedEventDeserializer = serializedEventDeserializer;
}

@Override
public <T extends Aggregate<T>> CompletableFuture<EntityIdAndVersion> save(Class<T> clasz, List<Event> events) {
return save(clasz, events, Optional.empty());
Expand Down Expand Up @@ -89,30 +94,8 @@ public <T extends Aggregate<T>> CompletableFuture<EntityIdAndVersion> update(Cla

@Override
public CompletableFuture<?> subscribe(String subscriberId, Map<String, Set<String>> aggregatesAndEvents, SubscriberOptions subscriberOptions, Function<DispatchedEvent<Event>, CompletableFuture<?>> handler) {
return aggregateEvents.subscribe(subscriberId, aggregatesAndEvents, subscriberOptions, se -> handler.apply(toDispatchedEvent(se)));
}

private DispatchedEvent<Event> toDispatchedEvent(SerializedEvent se) {
String eventType = se.getEventType();
Class<Event> eventClass = toEventClass(eventType);

Event event = JSonMapper.fromJson(se.getEventData(), eventClass);
return new DispatchedEvent<>(se.getEntityId(),
se.getId(),
event,
se.getSwimLane(),
se.getOffset(), se.getEventContext());
return aggregateEvents.subscribe(subscriberId, aggregatesAndEvents, subscriberOptions,
se -> serializedEventDeserializer.toDispatchedEvent(se).map(handler::apply).orElse(CompletableFuture.completedFuture(null)));
}

private Class<Event> toEventClass(String eventType) {
if ("net.chrisrichardson.eventstore.subscriptions.EndOfCurrentEventsReachedEvent".equals(eventType)) {
eventType = EndOfCurrentEventsReachedEvent.class.getName();
}
try {
return (Class<Event>) Class.forName(eventType);
} catch (ClassNotFoundException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.eventuate.javaclient.commonimpl;

import io.eventuate.DispatchedEvent;
import io.eventuate.Event;

import java.util.Optional;

public interface SerializedEventDeserializer {
Optional<DispatchedEvent<Event>> toDispatchedEvent(SerializedEvent se);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import io.eventuate.DispatchedEvent;
import io.eventuate.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -10,6 +12,8 @@

public class SwimlaneBasedDispatcher {

private static Logger logger = LoggerFactory.getLogger(SwimlaneBasedDispatcher.class);

private final ConcurrentHashMap<Integer, SwimlaneDispatcher> map = new ConcurrentHashMap<>();
private Executor executor;
private String subscriberId;
Expand All @@ -23,10 +27,15 @@ public CompletableFuture<?> dispatch(DispatchedEvent<Event> de, Function<Dispatc
Integer swimlane = de.getSwimlane();
SwimlaneDispatcher stuff = map.get(swimlane);
if (stuff == null) {
logger.trace("No dispatcher for {} {}. Attempting to create", subscriberId, swimlane);
stuff = new SwimlaneDispatcher(subscriberId, swimlane, executor);
SwimlaneDispatcher r = map.putIfAbsent(swimlane, stuff);
if (r != null)
if (r != null) {
logger.trace("Using concurrently created SwimlaneDispatcher for {} {}", subscriberId, swimlane);
stuff = r;
} else {
logger.trace("Using newly created SwimlaneDispatcher for {} {}", subscriberId, swimlane);
}
}
return stuff.dispatch(de, target);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.util.function.Function;

public class SwimlaneDispatcher {

private static Logger logger = LoggerFactory.getLogger(SwimlaneDispatcher.class);

private String subscriberId;
Expand All @@ -32,16 +32,20 @@ public CompletableFuture<?> dispatch(DispatchedEvent<Event> de, Function<Dispatc
synchronized (queue) {
QueuedEvent qe = new QueuedEvent(de, target);
queue.add(qe);
logger.trace("added event to queue: {}", swimlane);
logger.trace("added event to queue: {} {} {}", subscriberId, swimlane, de);
if (running.compareAndSet(false, true)) {
logger.trace("Started thread: {}", swimlane);
executor.execute(new MyRunnable(swimlane));
logger.trace("Stopped - attempting to process newly queued event: {} {}", subscriberId, swimlane);
processNextQueuedEvent();
} else
logger.trace("Not started thread: {}", swimlane);
logger.trace("Running - Not attempting to process newly queued event: {} {}", subscriberId, swimlane);
return qe.future;
}
}

private void processNextQueuedEvent() {
executor.execute(this::processQueuedEvent);
}

class QueuedEvent {
DispatchedEvent<Event> event;
private Function<DispatchedEvent<Event>, CompletableFuture<?>> target;
Expand All @@ -54,49 +58,42 @@ public QueuedEvent(DispatchedEvent<Event> event, Function<DispatchedEvent<Event>
}


private class MyRunnable implements Runnable {
private Integer swimlane;

public MyRunnable(Integer swimlane) {
this.swimlane = swimlane;
}

@Override
public void run() {
logger.trace("Starting thread for {}", swimlane);
while (true) {
QueuedEvent qe = getNextEvent();
if (qe == null)
break;
logger.trace("Processing event for {}", swimlane);
qe.target.apply(qe.event).handle((success, throwable) -> {
if (throwable == null) {
logger.info("Handler succeeded");
boolean x = qe.future.complete(success);
logger.trace("Completed future success {}", x);
} else {
logger.error("handler for " + subscriberId + " for event " + qe.event + " failed: ", throwable);
boolean x = qe.future.completeExceptionally(throwable);
logger.trace("Completed future failed{}", x);
}
return null;
});
}
logger.trace("Exiting thread for {}", swimlane);
public void processQueuedEvent() {
QueuedEvent qe = getNextEvent();
if (qe == null)
logger.trace("No queued event for {} {}", subscriberId, swimlane);
else {
logger.trace("Invoking handler for event for {} {} {}", subscriberId, swimlane, qe.event);
qe.target.apply(qe.event).handle((success, throwable) -> {
if (throwable == null) {
logger.info("Handler succeeded for event for {} {} {}", subscriberId, swimlane, qe.event);
boolean x = qe.future.complete(success);
logger.trace("Completed future success {}", x);
logger.trace("Maybe processing next queued event {} {}", subscriberId, swimlane);
processNextQueuedEvent();
} else {
logger.error("handler for {} {} {} failed: ", throwable, subscriberId, swimlane, qe.event);
boolean x = qe.future.completeExceptionally(throwable);
logger.trace("Completed future failed{}", x);
// TODO - what to do here???
}
return null;
});
}
}

private QueuedEvent getNextEvent() {
QueuedEvent qe1 = queue.poll();
if (qe1 != null)
return qe1;
private QueuedEvent getNextEvent() {
QueuedEvent qe1 = queue.poll();
if (qe1 != null)
return qe1;

synchronized (queue) {
QueuedEvent qe = queue.poll();
if (qe == null) {
running.compareAndSet(true, false);
}
return qe;
synchronized (queue) {
QueuedEvent qe = queue.poll();
if (qe == null) {
running.compareAndSet(true, false);
}
return qe;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import io.eventuate.javaclient.commonimpl.EventuateAggregateStoreImpl;
import io.eventuate.javaclient.commonimpl.AggregateEvents;
import io.eventuate.EventuateAggregateStore;
import io.eventuate.javaclient.commonimpl.SerializedEventDeserializer;
import io.eventuate.javaclient.restclient.EventuateRESTClient;
import io.eventuate.javaclient.commonimpl.AggregateCrud;
import io.eventuate.javaclient.stompclient.EventuateSTOMPClient;
import io.vertx.core.Vertx;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -18,9 +20,17 @@
@EnableConfigurationProperties(EventuateHttpStompClientConfigurationProperties.class)
public class EventuateHttpStompClientConfiguration {

@Autowired(required=false)
private SerializedEventDeserializer serializedEventDeserializer;

@Bean
public EventuateAggregateStore httpStompEventStore(AggregateCrud restClient, AggregateEvents stompClient) {
return new EventuateAggregateStoreImpl(restClient, stompClient);
EventuateAggregateStoreImpl eventuateAggregateStore = new EventuateAggregateStoreImpl(restClient, stompClient);

if (serializedEventDeserializer != null)
eventuateAggregateStore.setSerializedEventDeserializer(serializedEventDeserializer);

return eventuateAggregateStore;
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import org.apache.commons.lang.builder.ToStringBuilder;

import java.util.*;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -36,10 +37,19 @@ synchronized public List<String> ack(String ackHeader) {
}
}

private class PendingAckHeader {
public List<PendingAckHeader> getPendingHeaders() {
return pendingHeaders;
}

public class PendingAckHeader {
private String ackHeader;
private boolean acked;

@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
}

public PendingAckHeader(String ackHeader) {
this.ackHeader = ackHeader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ private void frameHandler(Frame frame, Subscription sub) {
private void handleEvent(Subscription sub, SerializedEvent serializedEvent, String ackHeader) {
ackOrderTracker.add(ackHeader);
vertx.executeBlocking(future -> {

logger.trace("Invoking handler for subscription {} for event {}", sub.subscriberId, serializedEvent);

sub.handler.apply(serializedEvent).handle((result, e) -> {
if (e != null) {
future.fail(e);
Expand All @@ -155,14 +158,17 @@ private void handleEvent(Subscription sub, SerializedEvent serializedEvent, Stri
});
}, false, result -> {
if (result.failed()) {
result.cause().printStackTrace();
// TODO - what else to do here???
logger.error("Failed handler for subscription {} for event {}", result.cause(), sub.subscriberId, serializedEvent);
} else {
logger.trace("Successfully completed handler for subscription {} for event {}", sub.subscriberId, serializedEvent);
context.runOnContext(ignored -> {
for (String ah : ackOrderTracker.ack(ackHeader)) {
if (logger.isTraceEnabled())
logger.trace("Sending acknowledgement: " + ah);
state.connection.ack(ah);
}
logger.trace("Pending ack headers {} {}", sub.subscriberId, ackOrderTracker.getPendingHeaders());
});
}
});
Expand Down