From c5c5acee8f5a02c90965cd2f14fe6f67301f2a50 Mon Sep 17 00:00:00 2001 From: Daniil Date: Fri, 3 Aug 2018 20:59:47 +0700 Subject: [PATCH] First working implementation of fully async event loop --- .../java/mc/core/events/v3/EventPipeline.java | 65 +++++++ .../mc/core/events/v3/EventQueueOwner.java | 4 + .../mc/core/events/v3/FullAsyncEventLoop.java | 56 +++++- .../java/mc/core/events/v3/QueueManager.java | 4 - .../java/ru/core/events/v3/EventLoopTest.java | 171 ++++++++++++++++++ 5 files changed, 295 insertions(+), 5 deletions(-) create mode 100644 event-loop/src/main/java/mc/core/events/v3/EventPipeline.java create mode 100644 event-loop/src/main/java/mc/core/events/v3/EventQueueOwner.java delete mode 100644 event-loop/src/main/java/mc/core/events/v3/QueueManager.java create mode 100644 event-loop/src/test/java/ru/core/events/v3/EventLoopTest.java diff --git a/event-loop/src/main/java/mc/core/events/v3/EventPipeline.java b/event-loop/src/main/java/mc/core/events/v3/EventPipeline.java new file mode 100644 index 0000000..d0773a3 --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/v3/EventPipeline.java @@ -0,0 +1,65 @@ +package mc.core.events.v3; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import mc.core.events.Event; +import mc.core.events.v3.runner.EventExecutorService; +import mc.core.events.v3.runner.ResourceRunnable; + +import java.lang.reflect.InvocationTargetException; +import java.util.List; + +@RequiredArgsConstructor +@Getter +@Slf4j +public class EventPipeline { + private final List handlers; + private final FullAsyncEventLoop manager; + private final Event event; + private final EventQueueOwner owner; + private int currentIndex = 0; + @Setter + private PipelineState state = PipelineState.IDLE; + + public void next(EventExecutorService service) { + if (state == PipelineState.IDLE) { + state = PipelineState.WORKING; + } + if (currentIndex >= handlers.size() && state == PipelineState.WORKING) { + state = PipelineState.FINISHED; + manager.update(owner); + return; + } + + if (state == PipelineState.FINISHED) { + throw new IllegalStateException("Attempted to call next step on a FINISHED pipeline"); + } + + RegisteredEventHandler handler = handlers.get(currentIndex); + service.addTask(new ResourceRunnable() { + @Override + public void run() { + // TODO: Do we really need to process this in an async thread? + if (!event.isCanceled() || !handler.isIgnoreCancelled()) { + try { + handler.getMethod().invoke(handler.getObject(), event); + } catch (IllegalAccessException | InvocationTargetException e) { + log.error("Unable to dispatch event " + event.getClass().getSimpleName() + " to handler " + event.getClass().getName(), e); + } + } + } + + @Override + public void after() { + currentIndex++; + next(service); + } + }); + } + + public enum PipelineState { + IDLE, WORKING, FINISHED + } +} diff --git a/event-loop/src/main/java/mc/core/events/v3/EventQueueOwner.java b/event-loop/src/main/java/mc/core/events/v3/EventQueueOwner.java new file mode 100644 index 0000000..996b159 --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/v3/EventQueueOwner.java @@ -0,0 +1,4 @@ +package mc.core.events.v3; + +public interface EventQueueOwner { +} diff --git a/event-loop/src/main/java/mc/core/events/v3/FullAsyncEventLoop.java b/event-loop/src/main/java/mc/core/events/v3/FullAsyncEventLoop.java index 4b01936..4001869 100644 --- a/event-loop/src/main/java/mc/core/events/v3/FullAsyncEventLoop.java +++ b/event-loop/src/main/java/mc/core/events/v3/FullAsyncEventLoop.java @@ -1,16 +1,25 @@ package mc.core.events.v3; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import mc.core.events.Event; import mc.core.events.EventHandler; +import mc.core.events.v3.runner.EventExecutorService; +import org.springframework.beans.factory.annotation.Autowired; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; @Slf4j public class FullAsyncEventLoop { Map, List> handlers = new HashMap<>(); + // Item leaves this queue only when EventPipeline is fully executed + private Map> eventQueue = new ConcurrentHashMap<>(); + @Autowired + @Setter + private EventExecutorService eventExecutorService; public void addEventHandler(Plugin plugin, Object object) { Map candidates = getEventHandlerCandidates(object); @@ -23,7 +32,9 @@ public class FullAsyncEventLoop { } } - + public List getPipelineForEvent(Event event) { + return handlers.get(event.getClass()); + } private Map getEventHandlerCandidates(Object object) { Map candidates; @@ -50,9 +61,52 @@ public class FullAsyncEventLoop { continue; } + method.setAccessible(true); candidates.put(method, annotation); } return candidates; } + public void asyncFireEvent(EventQueueOwner owner, Event event) { + List handlers = getPipelineForEvent(event); + if (handlers == null) + return; + + Queue queue = eventQueue.computeIfAbsent(owner, s -> new ArrayDeque<>()); + queue.add(new EventPipeline(handlers, this, event, owner)); + update(owner); + } + + /** + * Updates queue state for a given owner: + *

+ * - Removes first element of a queue if it is marked as FINISHED + * - Starts executing first pipeline from the queue if it is marked with IDLE + * + * @param owner queue owner + */ + public synchronized void update(EventQueueOwner owner) { + if (!eventQueue.containsKey(owner)) { + log.warn("Unable to update pipeline executor: unable to find queue"); + return; + } + Queue queue = eventQueue.get(owner); + if (queue.isEmpty()) { + log.warn("Unable to update pipeline executor: queue is empty"); + return; + } + + if (queue.peek().getState() == EventPipeline.PipelineState.FINISHED) { + // TODO: Post-event callback? + queue.poll(); + } + + EventPipeline pipeline; + if ((pipeline = queue.peek()) != null + && pipeline.getState() == EventPipeline.PipelineState.IDLE) { + pipeline.next(eventExecutorService); + } + } + + } diff --git a/event-loop/src/main/java/mc/core/events/v3/QueueManager.java b/event-loop/src/main/java/mc/core/events/v3/QueueManager.java deleted file mode 100644 index b395844..0000000 --- a/event-loop/src/main/java/mc/core/events/v3/QueueManager.java +++ /dev/null @@ -1,4 +0,0 @@ -package mc.core.events.v3; - -public class QueueManager { -} diff --git a/event-loop/src/test/java/ru/core/events/v3/EventLoopTest.java b/event-loop/src/test/java/ru/core/events/v3/EventLoopTest.java new file mode 100644 index 0000000..8dc75d9 --- /dev/null +++ b/event-loop/src/test/java/ru/core/events/v3/EventLoopTest.java @@ -0,0 +1,171 @@ +package ru.core.events.v3; + +import mc.core.events.EventHandler; +import mc.core.events.EventPriority; +import mc.core.events.LoginEvent; +import mc.core.events.v3.EventQueueOwner; +import mc.core.events.v3.FullAsyncEventLoop; +import mc.core.events.v3.Plugin; +import mc.core.events.v3.runner.EventExecutorService; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +@SuppressWarnings("Duplicates") +public class EventLoopTest { + + @Test + public void basicTest() throws InterruptedException { + Plugin plugin = new Plugin() { + }; + + EventQueueOwner queueOwner = new EventQueueOwner() { + }; + + + CountDownLatch latch = new CountDownLatch(1); + FullAsyncEventLoop eventLoop = new FullAsyncEventLoop(); + eventLoop.addEventHandler(plugin, new Object() { + @EventHandler + public void onLoginEvent(LoginEvent event) { + + latch.countDown(); + } + }); + + EventExecutorService service = new EventExecutorService(1); + service.start(); + + eventLoop.setEventExecutorService(service); + eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null)); + + latch.await(1, TimeUnit.SECONDS); + Assert.assertEquals("Event was not called", 0, latch.getCount()); + } + + @Test + public void consecutiveExecutionTest() throws InterruptedException { + Plugin plugin = new Plugin() { + }; + + EventQueueOwner queueOwner = new EventQueueOwner() { + }; + + + CountDownLatch latch = new CountDownLatch(2); + FullAsyncEventLoop eventLoop = new FullAsyncEventLoop(); + eventLoop.addEventHandler(plugin, new Object() { + @EventHandler + public void onLoginEvent(LoginEvent event) { + + latch.countDown(); + } + }); + + EventExecutorService service = new EventExecutorService(1); + service.start(); + + eventLoop.setEventExecutorService(service); + + eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null)); + eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null)); + + latch.await(1, TimeUnit.SECONDS); + Assert.assertEquals("Event was not called", 0, latch.getCount()); + } + + @Test + public void prioritySystemTest() throws InterruptedException { + Plugin plugin = new Plugin() { + }; + + EventQueueOwner queueOwner = new EventQueueOwner() { + }; + + + CountDownLatch latch = new CountDownLatch(3); + FullAsyncEventLoop eventLoop = new FullAsyncEventLoop(); + List priorities = new ArrayList<>(3); + + eventLoop.addEventHandler(plugin, new Object() { + @EventHandler(priority = EventPriority.NORMAL) + public void login1(LoginEvent event) { + priorities.add(0); + latch.countDown(); + } + + @EventHandler(priority = EventPriority.HIGHEST) + public void login2(LoginEvent event) { + priorities.add(1); + latch.countDown(); + } + + @EventHandler(priority = EventPriority.LOWEST) + public void login3(LoginEvent event) { + priorities.add(2); + latch.countDown(); + } + }); + + EventExecutorService service = new EventExecutorService(1); + service.start(); + + eventLoop.setEventExecutorService(service); + + eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null)); + + latch.await(1, TimeUnit.SECONDS); + Assert.assertEquals("Incorrect call sequence", "[2, 0, 1]", priorities.toString()); + } + + @Test + public void ignoreCancelledTest() throws InterruptedException { + Plugin plugin = new Plugin() { + }; + + EventQueueOwner queueOwner = new EventQueueOwner() { + }; + + + CountDownLatch latch = new CountDownLatch(1); + FullAsyncEventLoop eventLoop = new FullAsyncEventLoop(); + List priorities = new ArrayList<>(2); + + eventLoop.addEventHandler(plugin, new Object() { + @EventHandler(priority = EventPriority.NORMAL, ignoreCancelled = true) + public void login1(LoginEvent event) { + priorities.add(0); + event.setCanceled(true); + } + + @EventHandler(priority = EventPriority.HIGHEST, ignoreCancelled = true) + public void login2(LoginEvent event) { + priorities.add(1); + } + + @EventHandler(priority = EventPriority.LOWEST, ignoreCancelled = true) + public void login3(LoginEvent event) { + priorities.add(2); + } + + @EventHandler(priority = EventPriority.MONITOR) + public void monitor(LoginEvent event) { + latch.countDown(); + } + }); + + EventExecutorService service = new EventExecutorService(1); + service.start(); + + eventLoop.setEventExecutorService(service); + + eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null)); + + latch.await(1, TimeUnit.SECONDS); + Assert.assertEquals("Incorrect call sequence", "[2, 0]", priorities.toString()); + } +}