From 3b24c2fed4b42381a2cd59bae9ead24bd3d5aa70 Mon Sep 17 00:00:00 2001 From: Daniil Date: Sun, 5 Aug 2018 22:33:39 +0700 Subject: [PATCH] Fixed task scheduling --- .../mc/core/events/lock/LockObserveList.java | 1 + .../events/runner/AllInScheduleStrategy.java | 44 +++++++++++++++++++ .../events/runner/EventExecutorService.java | 8 +++- .../mc/core/events/runner/ExecutorThread.java | 6 +++ 4 files changed, 57 insertions(+), 2 deletions(-) create mode 100644 event-loop/src/main/java/mc/core/events/runner/AllInScheduleStrategy.java diff --git a/event-loop/src/main/java/mc/core/events/lock/LockObserveList.java b/event-loop/src/main/java/mc/core/events/lock/LockObserveList.java index 6fe8efa..5dd2985 100644 --- a/event-loop/src/main/java/mc/core/events/lock/LockObserveList.java +++ b/event-loop/src/main/java/mc/core/events/lock/LockObserveList.java @@ -24,6 +24,7 @@ public class LockObserveList implements Consumer { } public void release() { + callback = null; for (PoorMansLock lock : locks) { lock.removeCallback(this); } diff --git a/event-loop/src/main/java/mc/core/events/runner/AllInScheduleStrategy.java b/event-loop/src/main/java/mc/core/events/runner/AllInScheduleStrategy.java new file mode 100644 index 0000000..6de1556 --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/runner/AllInScheduleStrategy.java @@ -0,0 +1,44 @@ +package mc.core.events.runner; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; + +public class AllInScheduleStrategy implements ScheduleStrategy { + private BlockingQueue globalQueue; + private EventExecutorService eventExecutorService; + + public AllInScheduleStrategy(EventExecutorService eventExecutorService) { + this.globalQueue = eventExecutorService.queue; + this.eventExecutorService = eventExecutorService; + } + + + @Override + public synchronized ResourceRunnable getTask() throws InterruptedException { + + // Wait for last task to finish locking up + // the resources + synchronized (eventExecutorService.waitForLock) { + while (eventExecutorService.waitForLock.get()) { + eventExecutorService.wait(); + } + } + + // Wait for new task in queue + ResourceRunnable runnable = globalQueue.take(); + while (!runnable.getLocks().isReady()) { + CountDownLatch latch = new CountDownLatch(1); + runnable.getLocks().setCallback(latch::countDown); + // Prevent situations where dependencies were resolved + // while we were setting up the callback + if (runnable.getLocks().isReady()) + continue; + latch.await(); + } + + // Lock execution for the next thread + // (wait until resources for previous task will be blocked) + eventExecutorService.waitForLock.set(true); + return runnable; + } +} diff --git a/event-loop/src/main/java/mc/core/events/runner/EventExecutorService.java b/event-loop/src/main/java/mc/core/events/runner/EventExecutorService.java index 95f1eac..3f018ab 100644 --- a/event-loop/src/main/java/mc/core/events/runner/EventExecutorService.java +++ b/event-loop/src/main/java/mc/core/events/runner/EventExecutorService.java @@ -7,11 +7,15 @@ import java.util.Iterator; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; public class EventExecutorService { private static final boolean WORKER_INSTANT_EXECUTE = false; - private BlockingQueue queue = new ArrayBlockingQueue<>(100); - private ScheduleStrategy strategy = new DefaultScheduleStrategy(); + BlockingQueue queue = new ArrayBlockingQueue<>(100); + // A synchronize aid, that prevents ScheduleStrategy from returning + // wrong tasks when executor is late in blocking resources + final AtomicBoolean waitForLock = new AtomicBoolean(false); + private ScheduleStrategy strategy = new AllInScheduleStrategy(this); private Set executorThreads = new HashSet<>(); private int threadCount; diff --git a/event-loop/src/main/java/mc/core/events/runner/ExecutorThread.java b/event-loop/src/main/java/mc/core/events/runner/ExecutorThread.java index 8ff52f0..3e1ecc2 100644 --- a/event-loop/src/main/java/mc/core/events/runner/ExecutorThread.java +++ b/event-loop/src/main/java/mc/core/events/runner/ExecutorThread.java @@ -24,6 +24,12 @@ public class ExecutorThread extends Thread { void executeTask(ResourceRunnable runnable) { runnable.getLocks().lockAll(); + synchronized (service.waitForLock) { + if (service.waitForLock.get()) { + service.waitForLock.set(false); + service.waitForLock.notifyAll(); + } + } try { runnable.run(); } finally {