Archived
0

First working implementation of fully async event loop

This commit is contained in:
Daniil
2018-08-03 20:59:47 +07:00
parent 969b2bbb44
commit c5c5acee8f
5 changed files with 295 additions and 5 deletions

View File

@@ -0,0 +1,65 @@
package mc.core.events.v3;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import mc.core.events.Event;
import mc.core.events.v3.runner.EventExecutorService;
import mc.core.events.v3.runner.ResourceRunnable;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
@RequiredArgsConstructor
@Getter
@Slf4j
public class EventPipeline {
private final List<RegisteredEventHandler> handlers;
private final FullAsyncEventLoop manager;
private final Event event;
private final EventQueueOwner owner;
private int currentIndex = 0;
@Setter
private PipelineState state = PipelineState.IDLE;
public void next(EventExecutorService service) {
if (state == PipelineState.IDLE) {
state = PipelineState.WORKING;
}
if (currentIndex >= handlers.size() && state == PipelineState.WORKING) {
state = PipelineState.FINISHED;
manager.update(owner);
return;
}
if (state == PipelineState.FINISHED) {
throw new IllegalStateException("Attempted to call next step on a FINISHED pipeline");
}
RegisteredEventHandler handler = handlers.get(currentIndex);
service.addTask(new ResourceRunnable() {
@Override
public void run() {
// TODO: Do we really need to process this in an async thread?
if (!event.isCanceled() || !handler.isIgnoreCancelled()) {
try {
handler.getMethod().invoke(handler.getObject(), event);
} catch (IllegalAccessException | InvocationTargetException e) {
log.error("Unable to dispatch event " + event.getClass().getSimpleName() + " to handler " + event.getClass().getName(), e);
}
}
}
@Override
public void after() {
currentIndex++;
next(service);
}
});
}
public enum PipelineState {
IDLE, WORKING, FINISHED
}
}

View File

@@ -0,0 +1,4 @@
package mc.core.events.v3;
public interface EventQueueOwner {
}

View File

@@ -1,16 +1,25 @@
package mc.core.events.v3; package mc.core.events.v3;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import mc.core.events.Event; import mc.core.events.Event;
import mc.core.events.EventHandler; import mc.core.events.EventHandler;
import mc.core.events.v3.runner.EventExecutorService;
import org.springframework.beans.factory.annotation.Autowired;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j @Slf4j
public class FullAsyncEventLoop { public class FullAsyncEventLoop {
Map<Class<? extends Event>, List<RegisteredEventHandler>> handlers = new HashMap<>(); Map<Class<? extends Event>, List<RegisteredEventHandler>> handlers = new HashMap<>();
// Item leaves this queue only when EventPipeline is fully executed
private Map<EventQueueOwner, Queue<EventPipeline>> eventQueue = new ConcurrentHashMap<>();
@Autowired
@Setter
private EventExecutorService eventExecutorService;
public void addEventHandler(Plugin plugin, Object object) { public void addEventHandler(Plugin plugin, Object object) {
Map<Method, EventHandler> candidates = getEventHandlerCandidates(object); Map<Method, EventHandler> candidates = getEventHandlerCandidates(object);
@@ -23,7 +32,9 @@ public class FullAsyncEventLoop {
} }
} }
public List<RegisteredEventHandler> getPipelineForEvent(Event event) {
return handlers.get(event.getClass());
}
private Map<Method, EventHandler> getEventHandlerCandidates(Object object) { private Map<Method, EventHandler> getEventHandlerCandidates(Object object) {
Map<Method, EventHandler> candidates; Map<Method, EventHandler> candidates;
@@ -50,9 +61,52 @@ public class FullAsyncEventLoop {
continue; continue;
} }
method.setAccessible(true);
candidates.put(method, annotation); candidates.put(method, annotation);
} }
return candidates; return candidates;
} }
public void asyncFireEvent(EventQueueOwner owner, Event event) {
List<RegisteredEventHandler> handlers = getPipelineForEvent(event);
if (handlers == null)
return;
Queue<EventPipeline> queue = eventQueue.computeIfAbsent(owner, s -> new ArrayDeque<>());
queue.add(new EventPipeline(handlers, this, event, owner));
update(owner);
}
/**
* Updates queue state for a given owner:
* <p>
* - Removes first element of a queue if it is marked as FINISHED
* - Starts executing first pipeline from the queue if it is marked with IDLE
*
* @param owner queue owner
*/
public synchronized void update(EventQueueOwner owner) {
if (!eventQueue.containsKey(owner)) {
log.warn("Unable to update pipeline executor: unable to find queue");
return;
}
Queue<EventPipeline> queue = eventQueue.get(owner);
if (queue.isEmpty()) {
log.warn("Unable to update pipeline executor: queue is empty");
return;
}
if (queue.peek().getState() == EventPipeline.PipelineState.FINISHED) {
// TODO: Post-event callback?
queue.poll();
}
EventPipeline pipeline;
if ((pipeline = queue.peek()) != null
&& pipeline.getState() == EventPipeline.PipelineState.IDLE) {
pipeline.next(eventExecutorService);
}
}
} }

View File

@@ -1,4 +0,0 @@
package mc.core.events.v3;
public class QueueManager {
}

View File

@@ -0,0 +1,171 @@
package ru.core.events.v3;
import mc.core.events.EventHandler;
import mc.core.events.EventPriority;
import mc.core.events.LoginEvent;
import mc.core.events.v3.EventQueueOwner;
import mc.core.events.v3.FullAsyncEventLoop;
import mc.core.events.v3.Plugin;
import mc.core.events.v3.runner.EventExecutorService;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@SuppressWarnings("Duplicates")
public class EventLoopTest {
@Test
public void basicTest() throws InterruptedException {
Plugin plugin = new Plugin() {
};
EventQueueOwner queueOwner = new EventQueueOwner() {
};
CountDownLatch latch = new CountDownLatch(1);
FullAsyncEventLoop eventLoop = new FullAsyncEventLoop();
eventLoop.addEventHandler(plugin, new Object() {
@EventHandler
public void onLoginEvent(LoginEvent event) {
latch.countDown();
}
});
EventExecutorService service = new EventExecutorService(1);
service.start();
eventLoop.setEventExecutorService(service);
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
latch.await(1, TimeUnit.SECONDS);
Assert.assertEquals("Event was not called", 0, latch.getCount());
}
@Test
public void consecutiveExecutionTest() throws InterruptedException {
Plugin plugin = new Plugin() {
};
EventQueueOwner queueOwner = new EventQueueOwner() {
};
CountDownLatch latch = new CountDownLatch(2);
FullAsyncEventLoop eventLoop = new FullAsyncEventLoop();
eventLoop.addEventHandler(plugin, new Object() {
@EventHandler
public void onLoginEvent(LoginEvent event) {
latch.countDown();
}
});
EventExecutorService service = new EventExecutorService(1);
service.start();
eventLoop.setEventExecutorService(service);
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
latch.await(1, TimeUnit.SECONDS);
Assert.assertEquals("Event was not called", 0, latch.getCount());
}
@Test
public void prioritySystemTest() throws InterruptedException {
Plugin plugin = new Plugin() {
};
EventQueueOwner queueOwner = new EventQueueOwner() {
};
CountDownLatch latch = new CountDownLatch(3);
FullAsyncEventLoop eventLoop = new FullAsyncEventLoop();
List<Integer> priorities = new ArrayList<>(3);
eventLoop.addEventHandler(plugin, new Object() {
@EventHandler(priority = EventPriority.NORMAL)
public void login1(LoginEvent event) {
priorities.add(0);
latch.countDown();
}
@EventHandler(priority = EventPriority.HIGHEST)
public void login2(LoginEvent event) {
priorities.add(1);
latch.countDown();
}
@EventHandler(priority = EventPriority.LOWEST)
public void login3(LoginEvent event) {
priorities.add(2);
latch.countDown();
}
});
EventExecutorService service = new EventExecutorService(1);
service.start();
eventLoop.setEventExecutorService(service);
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
latch.await(1, TimeUnit.SECONDS);
Assert.assertEquals("Incorrect call sequence", "[2, 0, 1]", priorities.toString());
}
@Test
public void ignoreCancelledTest() throws InterruptedException {
Plugin plugin = new Plugin() {
};
EventQueueOwner queueOwner = new EventQueueOwner() {
};
CountDownLatch latch = new CountDownLatch(1);
FullAsyncEventLoop eventLoop = new FullAsyncEventLoop();
List<Integer> priorities = new ArrayList<>(2);
eventLoop.addEventHandler(plugin, new Object() {
@EventHandler(priority = EventPriority.NORMAL, ignoreCancelled = true)
public void login1(LoginEvent event) {
priorities.add(0);
event.setCanceled(true);
}
@EventHandler(priority = EventPriority.HIGHEST, ignoreCancelled = true)
public void login2(LoginEvent event) {
priorities.add(1);
}
@EventHandler(priority = EventPriority.LOWEST, ignoreCancelled = true)
public void login3(LoginEvent event) {
priorities.add(2);
}
@EventHandler(priority = EventPriority.MONITOR)
public void monitor(LoginEvent event) {
latch.countDown();
}
});
EventExecutorService service = new EventExecutorService(1);
service.start();
eventLoop.setEventExecutorService(service);
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
latch.await(1, TimeUnit.SECONDS);
Assert.assertEquals("Incorrect call sequence", "[2, 0]", priorities.toString());
}
}