Event-loop module refactoring and commenting
This commit is contained in:
@@ -6,17 +6,25 @@ import lombok.Setter;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import mc.core.events.api.EventQueueOwner;
|
import mc.core.events.api.EventQueueOwner;
|
||||||
import mc.core.events.api.LockableResource;
|
import mc.core.events.api.LockableResource;
|
||||||
import mc.core.events.lock.LockObserveList;
|
import mc.core.events.runner.lock.LockObserveList;
|
||||||
import mc.core.events.runner.EventExecutorService;
|
import mc.core.events.runner.ResourceAwareExecutorService;
|
||||||
import mc.core.events.runner.ResourceRunnable;
|
import mc.core.events.runner.ResourceAwareRunnable;
|
||||||
|
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Holds processing pipeline for every event
|
||||||
|
* that enters {@link FullAsyncEventLoop}.
|
||||||
|
* <p>
|
||||||
|
* Ensures that EventHandlers will never be called in a wrong
|
||||||
|
* order by feeding only one task at a time to the {@link ResourceAwareExecutorService}
|
||||||
|
*/
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
@Getter
|
@Getter
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class EventPipelineTask {
|
public class EventPipelineTask {
|
||||||
|
private final ResourceAwareExecutorService service;
|
||||||
private final List<RegisteredEventHandler> handlers;
|
private final List<RegisteredEventHandler> handlers;
|
||||||
private final FullAsyncEventLoop manager;
|
private final FullAsyncEventLoop manager;
|
||||||
private final Event event;
|
private final Event event;
|
||||||
@@ -25,25 +33,45 @@ public class EventPipelineTask {
|
|||||||
@Setter
|
@Setter
|
||||||
private PipelineState state = PipelineState.IDLE;
|
private PipelineState state = PipelineState.IDLE;
|
||||||
|
|
||||||
public void next(EventExecutorService service) {
|
public void next() {
|
||||||
|
if (updatePipelineState()) return;
|
||||||
|
|
||||||
|
RegisteredEventHandler handler = handlers.get(currentIndex);
|
||||||
|
// If event has been already cancelled and current handler
|
||||||
|
// ignores cancelled events
|
||||||
|
if (event.isCanceled() && handler.isIgnoreCancelled()) {
|
||||||
|
// Just skip current event handler
|
||||||
|
currentIndex++;
|
||||||
|
next();
|
||||||
|
} else {
|
||||||
|
feedTask(handler);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update current pipeline status
|
||||||
|
*
|
||||||
|
* @return true if pipeline has been completed
|
||||||
|
*/
|
||||||
|
private boolean updatePipelineState() {
|
||||||
if (state == PipelineState.IDLE) {
|
if (state == PipelineState.IDLE) {
|
||||||
state = PipelineState.WORKING;
|
state = PipelineState.WORKING;
|
||||||
}
|
}
|
||||||
if (currentIndex >= handlers.size() && state == PipelineState.WORKING) {
|
if (currentIndex >= handlers.size() && state == PipelineState.WORKING) {
|
||||||
state = PipelineState.FINISHED;
|
state = PipelineState.FINISHED;
|
||||||
manager.update(owner);
|
manager.update(owner);
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (state == PipelineState.FINISHED) {
|
if (state == PipelineState.FINISHED) {
|
||||||
throw new IllegalStateException("Attempted to call next step on a FINISHED pipeline");
|
throw new IllegalStateException("Attempted to call next step on a FINISHED pipeline");
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
RegisteredEventHandler handler = handlers.get(currentIndex);
|
private void feedTask(RegisteredEventHandler handler) {
|
||||||
if (!event.isCanceled() || !handler.isIgnoreCancelled()) {
|
|
||||||
LockObserveList locks = getLocks(handler);
|
LockObserveList locks = getLocks(handler);
|
||||||
|
service.addTask(new ResourceAwareRunnable() {
|
||||||
service.addTask(new ResourceRunnable() {
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
@@ -56,7 +84,7 @@ public class EventPipelineTask {
|
|||||||
@Override
|
@Override
|
||||||
public void after() {
|
public void after() {
|
||||||
currentIndex++;
|
currentIndex++;
|
||||||
next(service);
|
next();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -64,10 +92,6 @@ public class EventPipelineTask {
|
|||||||
return locks;
|
return locks;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
|
||||||
currentIndex++;
|
|
||||||
next(service);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private LockObserveList getLocks(RegisteredEventHandler handler) {
|
private LockObserveList getLocks(RegisteredEventHandler handler) {
|
||||||
|
|||||||
@@ -1,11 +1,12 @@
|
|||||||
package mc.core.events;
|
package mc.core.events;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
import lombok.Setter;
|
import lombok.Setter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import mc.core.events.api.EventHandler;
|
import mc.core.events.api.EventHandler;
|
||||||
import mc.core.events.api.EventQueueOwner;
|
import mc.core.events.api.EventQueueOwner;
|
||||||
import mc.core.events.api.Plugin;
|
import mc.core.events.api.Plugin;
|
||||||
import mc.core.events.runner.EventExecutorService;
|
import mc.core.events.runner.ResourceAwareExecutorService;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
@@ -13,14 +14,23 @@ import java.lang.reflect.Modifier;
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Event loop core. Manages event handler registration process,
|
||||||
|
* maintains event queues.
|
||||||
|
* <p>
|
||||||
|
* This event loop guarantees that events, assigned to the {@link EventQueueOwner}
|
||||||
|
* will be handler in order of scheduling
|
||||||
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class FullAsyncEventLoop {
|
public class FullAsyncEventLoop {
|
||||||
Map<Class<? extends Event>, List<RegisteredEventHandler>> handlers = new HashMap<>();
|
|
||||||
// Item leaves this queue only when EventPipeline is fully executed
|
// Item leaves this queue only when EventPipeline is fully executed
|
||||||
private Map<EventQueueOwner, Queue<EventPipelineTask>> eventQueue = new ConcurrentHashMap<>();
|
private Map<EventQueueOwner, Queue<EventPipelineTask>> eventQueue = new ConcurrentHashMap<>();
|
||||||
|
private Map<Class<? extends Event>, List<RegisteredEventHandler>> registeredHandlers = new HashMap<>();
|
||||||
|
@SuppressWarnings("SpringJavaAutowiredMembersInspection")
|
||||||
@Autowired
|
@Autowired
|
||||||
@Setter
|
@Setter
|
||||||
private EventExecutorService eventExecutorService;
|
private ResourceAwareExecutorService resourceAwareExecutorService;
|
||||||
|
@Getter
|
||||||
private SharedResourceManager resourceManager = new SharedResourceManager();
|
private SharedResourceManager resourceManager = new SharedResourceManager();
|
||||||
|
|
||||||
public void addEventHandler(Plugin plugin, Object object) {
|
public void addEventHandler(Plugin plugin, Object object) {
|
||||||
@@ -28,14 +38,14 @@ public class FullAsyncEventLoop {
|
|||||||
|
|
||||||
for (Map.Entry<Method, EventHandler> pair : candidates.entrySet()) {
|
for (Map.Entry<Method, EventHandler> pair : candidates.entrySet()) {
|
||||||
@SuppressWarnings("unchecked") Class<? extends Event> eventType = (Class<? extends Event>) pair.getKey().getParameterTypes()[0];
|
@SuppressWarnings("unchecked") Class<? extends Event> eventType = (Class<? extends Event>) pair.getKey().getParameterTypes()[0];
|
||||||
List<RegisteredEventHandler> handlers = this.handlers.computeIfAbsent(eventType, e -> new ArrayList<>());
|
List<RegisteredEventHandler> handlers = this.registeredHandlers.computeIfAbsent(eventType, e -> new ArrayList<>());
|
||||||
handlers.add(new RegisteredEventHandler(plugin, object, pair.getKey(), pair.getValue().lock(), pair.getValue().pluginSynchronize(), pair.getValue().priority().getValue(), pair.getValue().ignoreCancelled()));
|
handlers.add(new RegisteredEventHandler(plugin, object, pair.getKey(), pair.getValue().lock(), pair.getValue().pluginSynchronize(), pair.getValue().priority().getValue(), pair.getValue().ignoreCancelled()));
|
||||||
handlers.sort(Comparator.comparingInt(RegisteredEventHandler::getPriority));
|
handlers.sort(Comparator.comparingInt(RegisteredEventHandler::getPriority));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<RegisteredEventHandler> getPipelineForEvent(Event event) {
|
public List<RegisteredEventHandler> getPipelineForEvent(Event event) {
|
||||||
return handlers.get(event.getClass());
|
return registeredHandlers.get(event.getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<Method, EventHandler> getEventHandlerCandidates(Object object) {
|
private Map<Method, EventHandler> getEventHandlerCandidates(Object object) {
|
||||||
@@ -75,7 +85,7 @@ public class FullAsyncEventLoop {
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
Queue<EventPipelineTask> queue = eventQueue.computeIfAbsent(owner, s -> new ArrayDeque<>());
|
Queue<EventPipelineTask> queue = eventQueue.computeIfAbsent(owner, s -> new ArrayDeque<>());
|
||||||
queue.add(new EventPipelineTask(handlers, this, event, owner));
|
queue.add(new EventPipelineTask(resourceAwareExecutorService, handlers, this, event, owner));
|
||||||
update(owner);
|
update(owner);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -105,11 +115,7 @@ public class FullAsyncEventLoop {
|
|||||||
EventPipelineTask pipeline;
|
EventPipelineTask pipeline;
|
||||||
if ((pipeline = queue.peek()) != null
|
if ((pipeline = queue.peek()) != null
|
||||||
&& pipeline.getState() == EventPipelineTask.PipelineState.IDLE) {
|
&& pipeline.getState() == EventPipelineTask.PipelineState.IDLE) {
|
||||||
pipeline.next(eventExecutorService);
|
pipeline.next();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public SharedResourceManager getResourceManager() {
|
|
||||||
return resourceManager;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,10 @@ import mc.core.events.api.Plugin;
|
|||||||
|
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Holds all the information necessary to register an
|
||||||
|
* event handler in an event loop
|
||||||
|
*/
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
@Getter
|
@Getter
|
||||||
public class RegisteredEventHandler {
|
public class RegisteredEventHandler {
|
||||||
@@ -17,5 +21,4 @@ public class RegisteredEventHandler {
|
|||||||
private final boolean pluginSynchronize;
|
private final boolean pluginSynchronize;
|
||||||
private final int priority;
|
private final int priority;
|
||||||
private final boolean ignoreCancelled;
|
private final boolean ignoreCancelled;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import mc.core.events.api.Plugin;
|
|||||||
import mc.core.events.api.interfaces.LocationProvidingEvent;
|
import mc.core.events.api.interfaces.LocationProvidingEvent;
|
||||||
import mc.core.events.api.interfaces.PlayerProvidingEvent;
|
import mc.core.events.api.interfaces.PlayerProvidingEvent;
|
||||||
import mc.core.events.api.interfaces.WorldProvidingEvent;
|
import mc.core.events.api.interfaces.WorldProvidingEvent;
|
||||||
import mc.core.events.lock.PoorMansLock;
|
import mc.core.events.runner.lock.PoorMansLock;
|
||||||
import mc.core.player.Player;
|
import mc.core.player.Player;
|
||||||
import mc.core.world.World;
|
import mc.core.world.World;
|
||||||
|
|
||||||
|
|||||||
@@ -1,43 +0,0 @@
|
|||||||
package mc.core.events.lock;
|
|
||||||
|
|
||||||
import mc.core.events.runner.ExecutorThread;
|
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
|
||||||
|
|
||||||
public class CustomReentrantLock extends ReentrantLock {
|
|
||||||
private void checkThread() {
|
|
||||||
if (!(Thread.currentThread() instanceof ExecutorThread))
|
|
||||||
throw new RuntimeException("Unable to obtain this resource outside Async Executor");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void lock() {
|
|
||||||
checkThread();
|
|
||||||
super.lock();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void lockInterruptibly() throws InterruptedException {
|
|
||||||
checkThread();
|
|
||||||
super.lockInterruptibly();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean tryLock() {
|
|
||||||
checkThread();
|
|
||||||
return super.tryLock();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
|
|
||||||
checkThread();
|
|
||||||
return super.tryLock(timeout, unit);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void unlock() {
|
|
||||||
checkThread();
|
|
||||||
super.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -3,29 +3,28 @@ package mc.core.events.runner;
|
|||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple scheduling strategy.
|
||||||
|
* <p>
|
||||||
|
* We wait until the first task in a queue will be able to acquire all
|
||||||
|
* the necessary resources and then we schedule it for execution
|
||||||
|
*/
|
||||||
public class AllInScheduleStrategy implements ScheduleStrategy {
|
public class AllInScheduleStrategy implements ScheduleStrategy {
|
||||||
private BlockingQueue<ResourceRunnable> globalQueue;
|
private BlockingQueue<ResourceAwareRunnable> globalQueue;
|
||||||
private EventExecutorService eventExecutorService;
|
private ResourceAwareExecutorService resourceAwareExecutorService;
|
||||||
|
|
||||||
public AllInScheduleStrategy(EventExecutorService eventExecutorService) {
|
public AllInScheduleStrategy(ResourceAwareExecutorService resourceAwareExecutorService) {
|
||||||
this.globalQueue = eventExecutorService.queue;
|
this.globalQueue = resourceAwareExecutorService.queue;
|
||||||
this.eventExecutorService = eventExecutorService;
|
this.resourceAwareExecutorService = resourceAwareExecutorService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized ResourceRunnable getTask() throws InterruptedException {
|
public synchronized ResourceAwareRunnable getTask() throws InterruptedException {
|
||||||
|
waitForResourceLockComplete();
|
||||||
// Wait for last task to finish locking up
|
|
||||||
// the resources
|
|
||||||
synchronized (eventExecutorService.waitForLock) {
|
|
||||||
while (eventExecutorService.waitForLock.get()) {
|
|
||||||
eventExecutorService.wait();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for new task in queue
|
// Wait for new task in queue
|
||||||
ResourceRunnable runnable = globalQueue.take();
|
ResourceAwareRunnable runnable = globalQueue.take();
|
||||||
while (!runnable.getLocks().isReady()) {
|
while (!runnable.getLocks().isReady()) {
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
runnable.getLocks().setCallback(latch::countDown);
|
runnable.getLocks().setCallback(latch::countDown);
|
||||||
@@ -38,7 +37,23 @@ public class AllInScheduleStrategy implements ScheduleStrategy {
|
|||||||
|
|
||||||
// Lock execution for the next thread
|
// Lock execution for the next thread
|
||||||
// (wait until resources for previous task will be blocked)
|
// (wait until resources for previous task will be blocked)
|
||||||
eventExecutorService.waitForLock.set(true);
|
resourceAwareExecutorService.waitForLock.set(true);
|
||||||
return runnable;
|
return runnable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Waits until the last scheduled task will lock all the necessary resources.
|
||||||
|
* <p>
|
||||||
|
* It is required to avoid race-condition when an execution candidate task (first task in a queue)
|
||||||
|
* skips lock-await procedure due to the last scheduled task not having locked necessary resources yet.
|
||||||
|
*
|
||||||
|
* @throws InterruptedException if current thread is interrupted
|
||||||
|
*/
|
||||||
|
private void waitForResourceLockComplete() throws InterruptedException {
|
||||||
|
synchronized (resourceAwareExecutorService.waitForLock) {
|
||||||
|
while (resourceAwareExecutorService.waitForLock.get()) {
|
||||||
|
resourceAwareExecutorService.wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,9 +1,19 @@
|
|||||||
package mc.core.events.runner;
|
package mc.core.events.runner;
|
||||||
|
|
||||||
public class ExecutorThread extends Thread {
|
/**
|
||||||
private EventExecutorService service;
|
* Worker thread for {@link ResourceAwareExecutorService}.
|
||||||
|
* <p>
|
||||||
|
* - Awaits for tasks from {@link ScheduleStrategy}
|
||||||
|
* - Locks up resources for this task
|
||||||
|
* - Notifies {@link ScheduleStrategy} when resource-locking procedure is complete
|
||||||
|
* - Executes the runnable in this thread
|
||||||
|
* - Unlocks all the resources
|
||||||
|
* - Calls {@link ResourceAwareRunnable#after()} callback
|
||||||
|
*/
|
||||||
|
public class ExecutorWorkerThread extends Thread {
|
||||||
|
private ResourceAwareExecutorService service;
|
||||||
|
|
||||||
public ExecutorThread(String name, EventExecutorService service) {
|
public ExecutorWorkerThread(String name, ResourceAwareExecutorService service) {
|
||||||
super(name);
|
super(name);
|
||||||
this.service = service;
|
this.service = service;
|
||||||
}
|
}
|
||||||
@@ -11,7 +21,7 @@ public class ExecutorThread extends Thread {
|
|||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
while (!isInterrupted() && isAlive()) {
|
while (!isInterrupted() && isAlive()) {
|
||||||
ResourceRunnable runnable;
|
ResourceAwareRunnable runnable;
|
||||||
try {
|
try {
|
||||||
runnable = service.getStrategy().getTask();
|
runnable = service.getStrategy().getTask();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
@@ -22,14 +32,9 @@ public class ExecutorThread extends Thread {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void executeTask(ResourceRunnable runnable) {
|
void executeTask(ResourceAwareRunnable runnable) {
|
||||||
runnable.getLocks().lockAll();
|
runnable.getLocks().lockAll();
|
||||||
synchronized (service.waitForLock) {
|
notifyLockingDone();
|
||||||
if (service.waitForLock.get()) {
|
|
||||||
service.waitForLock.set(false);
|
|
||||||
service.waitForLock.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
runnable.run();
|
runnable.run();
|
||||||
} finally {
|
} finally {
|
||||||
@@ -38,4 +43,13 @@ public class ExecutorThread extends Thread {
|
|||||||
}
|
}
|
||||||
runnable.after();
|
runnable.after();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void notifyLockingDone() {
|
||||||
|
synchronized (service.waitForLock) {
|
||||||
|
if (service.waitForLock.get()) {
|
||||||
|
service.waitForLock.set(false);
|
||||||
|
service.waitForLock.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -9,9 +9,19 @@ import java.util.concurrent.ArrayBlockingQueue;
|
|||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
public class EventExecutorService {
|
|
||||||
|
/**
|
||||||
|
* Custom implementation of an ExecutorService.
|
||||||
|
*
|
||||||
|
* Holds a queue of {@link ResourceAwareRunnable} and executes them in a thread pool.
|
||||||
|
*
|
||||||
|
* Warning! This class doesn't guarantee, that tasks will be executed in any specific order.
|
||||||
|
* In fact, it's up to {@link ScheduleStrategy} to decide which task will be scheduled for
|
||||||
|
* execution next.
|
||||||
|
*/
|
||||||
|
public class ResourceAwareExecutorService {
|
||||||
private static final boolean WORKER_INSTANT_EXECUTE = false;
|
private static final boolean WORKER_INSTANT_EXECUTE = false;
|
||||||
BlockingQueue<ResourceRunnable> queue = new ArrayBlockingQueue<>(100);
|
BlockingQueue<ResourceAwareRunnable> queue = new ArrayBlockingQueue<>(100);
|
||||||
// A synchronize aid, that prevents ScheduleStrategy from returning
|
// A synchronize aid, that prevents ScheduleStrategy from returning
|
||||||
// wrong tasks when executor is late in blocking resources
|
// wrong tasks when executor is late in blocking resources
|
||||||
final AtomicBoolean waitForLock = new AtomicBoolean(false);
|
final AtomicBoolean waitForLock = new AtomicBoolean(false);
|
||||||
@@ -19,7 +29,7 @@ public class EventExecutorService {
|
|||||||
private Set<Thread> executorThreads = new HashSet<>();
|
private Set<Thread> executorThreads = new HashSet<>();
|
||||||
private int threadCount;
|
private int threadCount;
|
||||||
|
|
||||||
public EventExecutorService(int threadCount) {
|
public ResourceAwareExecutorService(int threadCount) {
|
||||||
this.threadCount = threadCount;
|
this.threadCount = threadCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -28,7 +38,7 @@ public class EventExecutorService {
|
|||||||
throw new InvalidStateException("This executor service was already started.");
|
throw new InvalidStateException("This executor service was already started.");
|
||||||
|
|
||||||
for (int i = 0; i < threadCount; i++) {
|
for (int i = 0; i < threadCount; i++) {
|
||||||
Thread thread = new ExecutorThread("Event Loop #" + i, this);
|
Thread thread = new ExecutorWorkerThread("Event Loop #" + i, this);
|
||||||
executorThreads.add(thread);
|
executorThreads.add(thread);
|
||||||
thread.start();
|
thread.start();
|
||||||
}
|
}
|
||||||
@@ -46,9 +56,9 @@ public class EventExecutorService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addTask(ResourceRunnable task) {
|
public void addTask(ResourceAwareRunnable task) {
|
||||||
if (WORKER_INSTANT_EXECUTE && Thread.currentThread() instanceof ExecutorThread) {
|
if (WORKER_INSTANT_EXECUTE && Thread.currentThread() instanceof ExecutorWorkerThread) {
|
||||||
((ExecutorThread) Thread.currentThread()).executeTask(task);
|
((ExecutorWorkerThread) Thread.currentThread()).executeTask(task);
|
||||||
} else
|
} else
|
||||||
queue.offer(task);
|
queue.offer(task);
|
||||||
}
|
}
|
||||||
@@ -59,7 +69,7 @@ public class EventExecutorService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private class DefaultScheduleStrategy implements ScheduleStrategy {
|
private class DefaultScheduleStrategy implements ScheduleStrategy {
|
||||||
public ResourceRunnable getTask() throws InterruptedException {
|
public ResourceAwareRunnable getTask() throws InterruptedException {
|
||||||
return queue.take();
|
return queue.take();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1,8 +1,8 @@
|
|||||||
package mc.core.events.runner;
|
package mc.core.events.runner;
|
||||||
|
|
||||||
import mc.core.events.lock.LockObserveList;
|
import mc.core.events.runner.lock.LockObserveList;
|
||||||
|
|
||||||
public interface ResourceRunnable extends Runnable {
|
public interface ResourceAwareRunnable extends Runnable {
|
||||||
default LockObserveList getLocks() {
|
default LockObserveList getLocks() {
|
||||||
return LockObserveList.EMPTY_LIST;
|
return LockObserveList.EMPTY_LIST;
|
||||||
}
|
}
|
||||||
@@ -1,5 +1,5 @@
|
|||||||
package mc.core.events.runner;
|
package mc.core.events.runner;
|
||||||
|
|
||||||
public interface ScheduleStrategy {
|
public interface ScheduleStrategy {
|
||||||
ResourceRunnable getTask() throws InterruptedException;
|
ResourceAwareRunnable getTask() throws InterruptedException;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package mc.core.events.lock;
|
package mc.core.events.runner.lock;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package mc.core.events.lock;
|
package mc.core.events.runner.lock;
|
||||||
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CopyOnWriteArraySet;
|
import java.util.concurrent.CopyOnWriteArraySet;
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
package mc.core.events;
|
package mc.core.events;
|
||||||
|
|
||||||
import mc.core.events.runner.EventExecutorService;
|
import mc.core.events.runner.ResourceAwareExecutorService;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@@ -14,7 +14,7 @@ public class EventExecutorTest {
|
|||||||
public void basicTest() throws InterruptedException {
|
public void basicTest() throws InterruptedException {
|
||||||
AtomicBoolean testVariable = new AtomicBoolean(false);
|
AtomicBoolean testVariable = new AtomicBoolean(false);
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
EventExecutorService service = new EventExecutorService(1);
|
ResourceAwareExecutorService service = new ResourceAwareExecutorService(1);
|
||||||
service.start();
|
service.start();
|
||||||
service.addTask(() -> {
|
service.addTask(() -> {
|
||||||
testVariable.set(true);
|
testVariable.set(true);
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import mc.core.events.api.EventHandler;
|
|||||||
import mc.core.events.api.EventPriority;
|
import mc.core.events.api.EventPriority;
|
||||||
import mc.core.events.api.EventQueueOwner;
|
import mc.core.events.api.EventQueueOwner;
|
||||||
import mc.core.events.api.Plugin;
|
import mc.core.events.api.Plugin;
|
||||||
import mc.core.events.runner.EventExecutorService;
|
import mc.core.events.runner.ResourceAwareExecutorService;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@@ -35,10 +35,10 @@ public class EventLoopTest {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
EventExecutorService service = new EventExecutorService(1);
|
ResourceAwareExecutorService service = new ResourceAwareExecutorService(1);
|
||||||
service.start();
|
service.start();
|
||||||
|
|
||||||
eventLoop.setEventExecutorService(service);
|
eventLoop.setResourceAwareExecutorService(service);
|
||||||
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
|
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
|
||||||
|
|
||||||
latch.await(1, TimeUnit.SECONDS);
|
latch.await(1, TimeUnit.SECONDS);
|
||||||
@@ -64,10 +64,10 @@ public class EventLoopTest {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
EventExecutorService service = new EventExecutorService(1);
|
ResourceAwareExecutorService service = new ResourceAwareExecutorService(1);
|
||||||
service.start();
|
service.start();
|
||||||
|
|
||||||
eventLoop.setEventExecutorService(service);
|
eventLoop.setResourceAwareExecutorService(service);
|
||||||
|
|
||||||
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
|
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
|
||||||
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
|
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
|
||||||
@@ -109,10 +109,10 @@ public class EventLoopTest {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
EventExecutorService service = new EventExecutorService(1);
|
ResourceAwareExecutorService service = new ResourceAwareExecutorService(1);
|
||||||
service.start();
|
service.start();
|
||||||
|
|
||||||
eventLoop.setEventExecutorService(service);
|
eventLoop.setResourceAwareExecutorService(service);
|
||||||
|
|
||||||
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
|
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
|
||||||
|
|
||||||
@@ -156,10 +156,10 @@ public class EventLoopTest {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
EventExecutorService service = new EventExecutorService(1);
|
ResourceAwareExecutorService service = new ResourceAwareExecutorService(1);
|
||||||
service.start();
|
service.start();
|
||||||
|
|
||||||
eventLoop.setEventExecutorService(service);
|
eventLoop.setResourceAwareExecutorService(service);
|
||||||
|
|
||||||
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
|
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
package mc.core.events;
|
package mc.core.events;
|
||||||
|
|
||||||
import mc.core.events.lock.LockObserveList;
|
import mc.core.events.runner.lock.LockObserveList;
|
||||||
import mc.core.events.lock.PoorMansLock;
|
import mc.core.events.runner.lock.PoorMansLock;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user