Archived
0

Custom Execution Service for event loop basic implementation

This commit is contained in:
Daniil
2018-08-03 18:12:49 +07:00
parent c0341fd273
commit 4e6f834a7a
5 changed files with 150 additions and 0 deletions

View File

@@ -0,0 +1,62 @@
package mc.core.events.v3.runner;
import sun.plugin.dom.exception.InvalidStateException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class EventExecutorService {
private BlockingQueue<ResourceRunnable> queue = new ArrayBlockingQueue<>(100);
private ScheduleStrategy strategy = new DefaultScheduleStrategy();
private Set<Thread> executorThreads = new HashSet<>();
private int threadCount;
public EventExecutorService(int threadCount) {
this.threadCount = threadCount;
}
public void start() {
if (executorThreads.size() > 0)
throw new InvalidStateException("This executor service was already started.");
for (int i = 0; i < threadCount; i++) {
Thread thread = new ExecutorThread("Event Loop #" + i, this);
executorThreads.add(thread);
thread.start();
}
}
public void stop() {
if (executorThreads.size() == 0)
throw new InvalidStateException("This executor service was not initialized yet.");
Iterator<Thread> iterator = executorThreads.iterator();
while (iterator.hasNext()) {
Thread thread = iterator.next();
thread.interrupt();
iterator.remove();
}
}
public void addTask(ResourceRunnable task) {
if (Thread.currentThread() instanceof ExecutorThread) {
// TODO: Do we really need instant execution?
((ExecutorThread) Thread.currentThread()).executeTask(task);
} else
queue.offer(task);
}
public ScheduleStrategy getStrategy() {
return strategy;
}
private class DefaultScheduleStrategy implements ScheduleStrategy {
public ResourceRunnable getTask() throws InterruptedException {
return queue.take();
}
}
}

View File

@@ -0,0 +1,33 @@
package mc.core.events.v3.runner;
public class ExecutorThread extends Thread {
private EventExecutorService service;
public ExecutorThread(String name, EventExecutorService service) {
super(name);
this.service = service;
}
@Override
public void run() {
while (!isInterrupted() && isAlive()) {
ResourceRunnable runnable;
try {
runnable = service.getStrategy().getTask();
} catch (InterruptedException e) {
return;
}
executeTask(runnable);
}
}
void executeTask(ResourceRunnable runnable) {
runnable.lock();
try {
runnable.run();
} finally {
runnable.unlock();
}
}
}

View File

@@ -0,0 +1,7 @@
package mc.core.events.v3.runner;
public interface ResourceRunnable extends Runnable {
void lock();
void unlock();
}

View File

@@ -0,0 +1,5 @@
package mc.core.events.v3.runner;
public interface ScheduleStrategy {
ResourceRunnable getTask() throws InterruptedException;
}

View File

@@ -0,0 +1,43 @@
package ru.core.events.v3;
import mc.core.events.v3.runner.EventExecutorService;
import mc.core.events.v3.runner.ResourceRunnable;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class EventExecutorTest {
@Test
public void basicTest() throws InterruptedException {
AtomicBoolean testVariable = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
EventExecutorService service = new EventExecutorService(1);
service.start();
service.addTask(new ResourceRunnable() {
@Override
public void lock() {
}
@Override
public void unlock() {
}
@Override
public void run() {
testVariable.set(true);
latch.countDown();
}
});
latch.await(1, TimeUnit.SECONDS);
service.stop();
Assert.assertTrue("Scheduled task was not executed", testVariable.get());
}
}