From 4e6f834a7a102614c0f8b3a1ef0f70078386dc2d Mon Sep 17 00:00:00 2001 From: Daniil Date: Fri, 3 Aug 2018 18:12:49 +0700 Subject: [PATCH] Custom Execution Service for event loop basic implementation --- .../v3/runner/EventExecutorService.java | 62 +++++++++++++++++++ .../core/events/v3/runner/ExecutorThread.java | 33 ++++++++++ .../events/v3/runner/ResourceRunnable.java | 7 +++ .../events/v3/runner/ScheduleStrategy.java | 5 ++ .../ru/core/events/v3/EventExecutorTest.java | 43 +++++++++++++ 5 files changed, 150 insertions(+) create mode 100644 event-loop/src/main/java/mc/core/events/v3/runner/EventExecutorService.java create mode 100644 event-loop/src/main/java/mc/core/events/v3/runner/ExecutorThread.java create mode 100644 event-loop/src/main/java/mc/core/events/v3/runner/ResourceRunnable.java create mode 100644 event-loop/src/main/java/mc/core/events/v3/runner/ScheduleStrategy.java create mode 100644 event-loop/src/test/java/ru/core/events/v3/EventExecutorTest.java diff --git a/event-loop/src/main/java/mc/core/events/v3/runner/EventExecutorService.java b/event-loop/src/main/java/mc/core/events/v3/runner/EventExecutorService.java new file mode 100644 index 0000000..0a5a852 --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/v3/runner/EventExecutorService.java @@ -0,0 +1,62 @@ +package mc.core.events.v3.runner; + +import sun.plugin.dom.exception.InvalidStateException; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +public class EventExecutorService { + private BlockingQueue queue = new ArrayBlockingQueue<>(100); + private ScheduleStrategy strategy = new DefaultScheduleStrategy(); + private Set executorThreads = new HashSet<>(); + private int threadCount; + + public EventExecutorService(int threadCount) { + this.threadCount = threadCount; + } + + public void start() { + if (executorThreads.size() > 0) + throw new InvalidStateException("This executor service was already started."); + + for (int i = 0; i < threadCount; i++) { + Thread thread = new ExecutorThread("Event Loop #" + i, this); + executorThreads.add(thread); + thread.start(); + } + } + + public void stop() { + if (executorThreads.size() == 0) + throw new InvalidStateException("This executor service was not initialized yet."); + + Iterator iterator = executorThreads.iterator(); + while (iterator.hasNext()) { + Thread thread = iterator.next(); + thread.interrupt(); + iterator.remove(); + } + } + + public void addTask(ResourceRunnable task) { + if (Thread.currentThread() instanceof ExecutorThread) { + // TODO: Do we really need instant execution? + ((ExecutorThread) Thread.currentThread()).executeTask(task); + } else + queue.offer(task); + } + + + public ScheduleStrategy getStrategy() { + return strategy; + } + + private class DefaultScheduleStrategy implements ScheduleStrategy { + public ResourceRunnable getTask() throws InterruptedException { + return queue.take(); + } + } +} diff --git a/event-loop/src/main/java/mc/core/events/v3/runner/ExecutorThread.java b/event-loop/src/main/java/mc/core/events/v3/runner/ExecutorThread.java new file mode 100644 index 0000000..271ea7f --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/v3/runner/ExecutorThread.java @@ -0,0 +1,33 @@ +package mc.core.events.v3.runner; + +public class ExecutorThread extends Thread { + private EventExecutorService service; + + public ExecutorThread(String name, EventExecutorService service) { + super(name); + this.service = service; + } + + @Override + public void run() { + while (!isInterrupted() && isAlive()) { + ResourceRunnable runnable; + try { + runnable = service.getStrategy().getTask(); + } catch (InterruptedException e) { + return; + } + + executeTask(runnable); + } + } + + void executeTask(ResourceRunnable runnable) { + runnable.lock(); + try { + runnable.run(); + } finally { + runnable.unlock(); + } + } +} diff --git a/event-loop/src/main/java/mc/core/events/v3/runner/ResourceRunnable.java b/event-loop/src/main/java/mc/core/events/v3/runner/ResourceRunnable.java new file mode 100644 index 0000000..9fd30f8 --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/v3/runner/ResourceRunnable.java @@ -0,0 +1,7 @@ +package mc.core.events.v3.runner; + +public interface ResourceRunnable extends Runnable { + void lock(); + + void unlock(); +} diff --git a/event-loop/src/main/java/mc/core/events/v3/runner/ScheduleStrategy.java b/event-loop/src/main/java/mc/core/events/v3/runner/ScheduleStrategy.java new file mode 100644 index 0000000..f901d85 --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/v3/runner/ScheduleStrategy.java @@ -0,0 +1,5 @@ +package mc.core.events.v3.runner; + +public interface ScheduleStrategy { + ResourceRunnable getTask() throws InterruptedException; +} diff --git a/event-loop/src/test/java/ru/core/events/v3/EventExecutorTest.java b/event-loop/src/test/java/ru/core/events/v3/EventExecutorTest.java new file mode 100644 index 0000000..a478f49 --- /dev/null +++ b/event-loop/src/test/java/ru/core/events/v3/EventExecutorTest.java @@ -0,0 +1,43 @@ +package ru.core.events.v3; + +import mc.core.events.v3.runner.EventExecutorService; +import mc.core.events.v3.runner.ResourceRunnable; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class EventExecutorTest { + + @Test + public void basicTest() throws InterruptedException { + AtomicBoolean testVariable = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + EventExecutorService service = new EventExecutorService(1); + service.start(); + service.addTask(new ResourceRunnable() { + @Override + public void lock() { + + } + + @Override + public void unlock() { + + } + + @Override + public void run() { + testVariable.set(true); + latch.countDown(); + } + }); + + latch.await(1, TimeUnit.SECONDS); + service.stop(); + Assert.assertTrue("Scheduled task was not executed", testVariable.get()); + + } +}