Fixed task scheduling
This commit is contained in:
@@ -24,6 +24,7 @@ public class LockObserveList implements Consumer<PoorMansLock> {
|
||||
}
|
||||
|
||||
public void release() {
|
||||
callback = null;
|
||||
for (PoorMansLock lock : locks) {
|
||||
lock.removeCallback(this);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
package mc.core.events.runner;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class AllInScheduleStrategy implements ScheduleStrategy {
|
||||
private BlockingQueue<ResourceRunnable> globalQueue;
|
||||
private EventExecutorService eventExecutorService;
|
||||
|
||||
public AllInScheduleStrategy(EventExecutorService eventExecutorService) {
|
||||
this.globalQueue = eventExecutorService.queue;
|
||||
this.eventExecutorService = eventExecutorService;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized ResourceRunnable getTask() throws InterruptedException {
|
||||
|
||||
// Wait for last task to finish locking up
|
||||
// the resources
|
||||
synchronized (eventExecutorService.waitForLock) {
|
||||
while (eventExecutorService.waitForLock.get()) {
|
||||
eventExecutorService.wait();
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for new task in queue
|
||||
ResourceRunnable runnable = globalQueue.take();
|
||||
while (!runnable.getLocks().isReady()) {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
runnable.getLocks().setCallback(latch::countDown);
|
||||
// Prevent situations where dependencies were resolved
|
||||
// while we were setting up the callback
|
||||
if (runnable.getLocks().isReady())
|
||||
continue;
|
||||
latch.await();
|
||||
}
|
||||
|
||||
// Lock execution for the next thread
|
||||
// (wait until resources for previous task will be blocked)
|
||||
eventExecutorService.waitForLock.set(true);
|
||||
return runnable;
|
||||
}
|
||||
}
|
||||
@@ -7,11 +7,15 @@ import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class EventExecutorService {
|
||||
private static final boolean WORKER_INSTANT_EXECUTE = false;
|
||||
private BlockingQueue<ResourceRunnable> queue = new ArrayBlockingQueue<>(100);
|
||||
private ScheduleStrategy strategy = new DefaultScheduleStrategy();
|
||||
BlockingQueue<ResourceRunnable> queue = new ArrayBlockingQueue<>(100);
|
||||
// A synchronize aid, that prevents ScheduleStrategy from returning
|
||||
// wrong tasks when executor is late in blocking resources
|
||||
final AtomicBoolean waitForLock = new AtomicBoolean(false);
|
||||
private ScheduleStrategy strategy = new AllInScheduleStrategy(this);
|
||||
private Set<Thread> executorThreads = new HashSet<>();
|
||||
private int threadCount;
|
||||
|
||||
|
||||
@@ -24,6 +24,12 @@ public class ExecutorThread extends Thread {
|
||||
|
||||
void executeTask(ResourceRunnable runnable) {
|
||||
runnable.getLocks().lockAll();
|
||||
synchronized (service.waitForLock) {
|
||||
if (service.waitForLock.get()) {
|
||||
service.waitForLock.set(false);
|
||||
service.waitForLock.notifyAll();
|
||||
}
|
||||
}
|
||||
try {
|
||||
runnable.run();
|
||||
} finally {
|
||||
|
||||
Reference in New Issue
Block a user