diff --git a/build.gradle b/build.gradle index f7f6e99..4497056 100644 --- a/build.gradle +++ b/build.gradle @@ -35,6 +35,8 @@ subprojects { /* Components */ compile (group: 'org.projectlombok', name: 'lombok', version: '1.16.16') + compile 'com.flowpowered:flow-nbt:1.0.0' //Named Binary Tags + testCompile 'junit:junit:4.12' } task copyDep(type: Copy) { diff --git a/core/src/main/java/mc/core/WarpPosition.java b/core/src/main/java/mc/core/WarpPosition.java index 4a0e501..6e1aacf 100644 --- a/core/src/main/java/mc/core/WarpPosition.java +++ b/core/src/main/java/mc/core/WarpPosition.java @@ -1,90 +1,14 @@ package mc.core; -import mc.core.player.ILook; +import lombok.AllArgsConstructor; +import lombok.Data; import mc.core.player.Look; -import mc.core.world.World; import java.io.Serializable; -import java.util.Objects; -public class WarpPosition extends Location implements Serializable, ILook { - private ILook look; - - public WarpPosition(double x, double y, double z, World world) { - super(x, y, z, world); - } - - public WarpPosition(double x, double y, double z, float yaw, float pitch, World world) { - super(x, y, z, world); - this.look = new Look(yaw, pitch); - } - - public WarpPosition(double x, double y, double z) { - super(x, y, z); - } - - public WarpPosition(double x, double y, double z, float yaw, float pitch) { - super(x, y, z); - this.look = new Look(yaw, pitch); - } - - public WarpPosition(long compactValue) { - super(compactValue); - } - - public WarpPosition(Location location) { - super(location.getX(), location.getY(), location.getZ()); - } - - public WarpPosition(Location location, Look look) { - super(location.getX(), location.getY(), location.getZ()); - this.look = look; - } - - public WarpPosition(long compactValue, World world) { - super(compactValue, world); - } - - @Override - public void set(Look look) { - this.look = look; - } - - @Override - public float getYaw() { - return this.look.getYaw(); - } - - @Override - public void setYaw(float yaw) { - this.look.setYaw(yaw); - } - - @Override - public float getPitch() { - return this.look.getPitch(); - } - - @Override - public void setPitch(float pitch) { - this.look.setPitch(pitch); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; - WarpPosition that = (WarpPosition) o; - return Objects.equals(look, that.look); - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), look); - } - - public boolean hasLook() { - return look != null; - } +@Data +@AllArgsConstructor +public class WarpPosition implements Serializable { + private Location location; + private Look look; } diff --git a/core/src/main/java/mc/core/player/ILook.java b/core/src/main/java/mc/core/player/ILook.java deleted file mode 100644 index fdebd51..0000000 --- a/core/src/main/java/mc/core/player/ILook.java +++ /dev/null @@ -1,15 +0,0 @@ -package mc.core.player; - -import java.io.Serializable; - -public interface ILook extends Serializable { - void set(Look look); - - float getYaw(); - - float getPitch(); - - void setYaw(float yaw); - - void setPitch(float pitch); -} diff --git a/core/src/main/java/mc/core/player/Look.java b/core/src/main/java/mc/core/player/Look.java index f15f6f7..1c0f7f4 100644 --- a/core/src/main/java/mc/core/player/Look.java +++ b/core/src/main/java/mc/core/player/Look.java @@ -7,12 +7,13 @@ package mc.core.player; import lombok.AllArgsConstructor; import lombok.Data; +import java.io.Serializable; + @Data @AllArgsConstructor -public class Look implements ILook { +public class Look implements Serializable{ private float yaw, pitch; - @Override public void set(Look look) { this.yaw = look.yaw; this.pitch = look.pitch; diff --git a/event-loop/TODO b/event-loop/TODO new file mode 100644 index 0000000..373999f --- /dev/null +++ b/event-loop/TODO @@ -0,0 +1,7 @@ +- Система иерархических блокировок ресурсов (чанки в мире) + - Нужно что-то делать с подгрузкой отсутсвующих чанков для таких блокировок +- Возможность вызвать событие из EventHandler +- Performance Monitor +- Возможная проблема с переполнением очереди при спаме пакетами от игрока +- Добавить поля с замками для ресурсов (Player, World, Chunk) +- Time Scheduler \ No newline at end of file diff --git a/event-loop/build.gradle b/event-loop/build.gradle new file mode 100644 index 0000000..0a1c7d0 --- /dev/null +++ b/event-loop/build.gradle @@ -0,0 +1,15 @@ +group 'mc' +version '1.0-SNAPSHOT' + +dependencies { + /* Core */ + compile_excludeCopy project(':core') + + testCompile group: 'org.slf4j', name: 'slf4j-simple', version: '1.6.1' + testCompile group: 'com.carrotsearch', name: 'junit-benchmarks', version: '0.7.0' +} + + +test { + exclude "ru/core/events/*Benchmark.class" +} \ No newline at end of file diff --git a/event-loop/src/main/java/mc/core/events/EventPipelineTask.java b/event-loop/src/main/java/mc/core/events/EventPipelineTask.java new file mode 100644 index 0000000..f79634f --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/EventPipelineTask.java @@ -0,0 +1,113 @@ +package mc.core.events; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import mc.core.events.api.EventQueueOwner; +import mc.core.events.api.LockableResource; +import mc.core.events.runner.lock.LockObserveList; +import mc.core.events.runner.ResourceAwareExecutorService; +import mc.core.events.runner.ResourceAwareRunnable; + +import java.lang.reflect.InvocationTargetException; +import java.util.List; + +/** + * Holds processing pipeline for every event + * that enters {@link FullAsyncEventLoop}. + *

+ * Ensures that EventHandlers will never be called in a wrong + * order by feeding only one task at a time to the {@link ResourceAwareExecutorService} + */ +@RequiredArgsConstructor +@Getter +@Slf4j +public class EventPipelineTask { + private final ResourceAwareExecutorService service; + private final List handlers; + private final FullAsyncEventLoop manager; + private final Event event; + private final EventQueueOwner owner; + private int currentIndex = 0; + @Setter + private PipelineState state = PipelineState.IDLE; + + public void next() { + if (updatePipelineState()) return; + + RegisteredEventHandler handler = handlers.get(currentIndex); + // If event has been already cancelled and current handler + // ignores cancelled events + if (event.isCanceled() && handler.isIgnoreCancelled()) { + // Just skip current event handler + currentIndex++; + next(); + } else { + feedTask(handler); + } + } + + /** + * Update current pipeline status + * + * @return true if pipeline has been completed + */ + private boolean updatePipelineState() { + if (state == PipelineState.IDLE) { + state = PipelineState.WORKING; + } + if (currentIndex >= handlers.size() && state == PipelineState.WORKING) { + state = PipelineState.FINISHED; + manager.update(owner); + return true; + } + + if (state == PipelineState.FINISHED) { + throw new IllegalStateException("Attempted to call next step on a FINISHED pipeline"); + } + return false; + } + + private void feedTask(RegisteredEventHandler handler) { + LockObserveList locks = getLocks(handler); + service.addTask(new ResourceAwareRunnable() { + @Override + public void run() { + try { + handler.getMethod().invoke(handler.getObject(), event); + } catch (IllegalAccessException | InvocationTargetException e) { + log.error("Unable to dispatch event " + event.getClass().getSimpleName() + " to handler " + event.getClass().getName(), e); + } + } + + @Override + public void after() { + currentIndex++; + next(); + } + + @Override + public LockObserveList getLocks() { + return locks; + } + }); + } + + private LockObserveList getLocks(RegisteredEventHandler handler) { + LockObserveList locks = new LockObserveList(); + + if (handler.isPluginSynchronize()) + locks.add(manager.getResourceManager().getPluginLock(handler.getPlugin())); + + for (LockableResource resource : handler.getLock()) { + locks.addAll(manager.getResourceManager().getAnnotationLocks(resource, event)); + } + + return locks; + } + + public enum PipelineState { + IDLE, WORKING, FINISHED + } +} diff --git a/event-loop/src/main/java/mc/core/events/FullAsyncEventLoop.java b/event-loop/src/main/java/mc/core/events/FullAsyncEventLoop.java new file mode 100644 index 0000000..ec8fa85 --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/FullAsyncEventLoop.java @@ -0,0 +1,121 @@ +package mc.core.events; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import mc.core.events.api.EventHandler; +import mc.core.events.api.EventQueueOwner; +import mc.core.events.api.Plugin; +import mc.core.events.runner.ResourceAwareExecutorService; +import org.springframework.beans.factory.annotation.Autowired; + +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Event loop core. Manages event handler registration process, + * maintains event queues. + *

+ * This event loop guarantees that events, assigned to the {@link EventQueueOwner} + * will be handler in order of scheduling + */ +@Slf4j +public class FullAsyncEventLoop { + // Item leaves this queue only when EventPipeline is fully executed + private Map> eventQueue = new ConcurrentHashMap<>(); + private Map, List> registeredHandlers = new HashMap<>(); + @SuppressWarnings("SpringJavaAutowiredMembersInspection") + @Autowired + @Setter + private ResourceAwareExecutorService resourceAwareExecutorService; + @Getter + private SharedResourceManager resourceManager = new SharedResourceManager(); + + public void addEventHandler(Plugin plugin, Object object) { + Map candidates = getEventHandlerCandidates(object); + + for (Map.Entry pair : candidates.entrySet()) { + @SuppressWarnings("unchecked") Class eventType = (Class) pair.getKey().getParameterTypes()[0]; + List handlers = this.registeredHandlers.computeIfAbsent(eventType, e -> new ArrayList<>()); + handlers.add(new RegisteredEventHandler(plugin, object, pair.getKey(), pair.getValue().lock(), pair.getValue().pluginSynchronize(), pair.getValue().priority().getValue(), pair.getValue().ignoreCancelled())); + handlers.sort(Comparator.comparingInt(RegisteredEventHandler::getPriority)); + } + } + + public List getPipelineForEvent(Event event) { + return registeredHandlers.get(event.getClass()); + } + + private Map getEventHandlerCandidates(Object object) { + Map candidates; + candidates = new HashMap<>(); + for (Method method : object.getClass().getDeclaredMethods()) { + EventHandler annotation = method.getAnnotation(EventHandler.class); + if (annotation == null) + continue; + + if (!Modifier.isPublic(method.getModifiers())) { + log.error("Unable to register {} as an EventHandler. Method must have a 'public' access modifier.", method.toString()); + continue; + + } + + if (method.getParameterCount() != 1) { + log.error("Unable to register {} as an EventHandler. Method must have exactly one argument.", method.toString()); + continue; + } + + Class firstParamType = method.getParameterTypes()[0]; + if (!Event.class.isAssignableFrom(firstParamType)) { + log.error("Unable to register {} as an EventHandler. First parameter type must implement 'Event' interface.", method.toString()); + continue; + } + + method.setAccessible(true); + candidates.put(method, annotation); + } + return candidates; + } + + public void asyncFireEvent(EventQueueOwner owner, Event event) { + List handlers = getPipelineForEvent(event); + if (handlers == null) + return; + + Queue queue = eventQueue.computeIfAbsent(owner, s -> new ArrayDeque<>()); + queue.add(new EventPipelineTask(resourceAwareExecutorService, handlers, this, event, owner)); + update(owner); + } + + /** + * Updates queue state for a given owner: + *

+ * - Removes first element of a queue if it is marked as FINISHED + * - Starts executing first pipeline from the queue if it is marked with IDLE + * + * @param owner queue owner + */ + public synchronized void update(EventQueueOwner owner) { + if (!eventQueue.containsKey(owner)) { + log.warn("Unable to update pipeline executor: unable to find queue"); + return; + } + Queue queue = eventQueue.get(owner); + if (queue.isEmpty()) { + log.warn("Unable to update pipeline executor: queue is empty"); + return; + } + + if (queue.peek().getState() == EventPipelineTask.PipelineState.FINISHED) { + queue.poll(); + } + + EventPipelineTask pipeline; + if ((pipeline = queue.peek()) != null + && pipeline.getState() == EventPipelineTask.PipelineState.IDLE) { + pipeline.next(); + } + } +} diff --git a/event-loop/src/main/java/mc/core/events/RegisteredEventHandler.java b/event-loop/src/main/java/mc/core/events/RegisteredEventHandler.java new file mode 100644 index 0000000..f0333f7 --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/RegisteredEventHandler.java @@ -0,0 +1,24 @@ +package mc.core.events; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import mc.core.events.api.LockableResource; +import mc.core.events.api.Plugin; + +import java.lang.reflect.Method; + +/** + * Holds all the information necessary to register an + * event handler in an event loop + */ +@RequiredArgsConstructor +@Getter +public class RegisteredEventHandler { + private final Plugin plugin; + private final Object object; + private final Method method; + private final LockableResource[] lock; + private final boolean pluginSynchronize; + private final int priority; + private final boolean ignoreCancelled; +} diff --git a/event-loop/src/main/java/mc/core/events/SharedResourceManager.java b/event-loop/src/main/java/mc/core/events/SharedResourceManager.java new file mode 100644 index 0000000..ad9f55e --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/SharedResourceManager.java @@ -0,0 +1,64 @@ +package mc.core.events; + +import lombok.extern.slf4j.Slf4j; +import mc.core.Location; +import mc.core.events.api.LockableResource; +import mc.core.events.api.Plugin; +import mc.core.events.api.interfaces.LocationProvidingEvent; +import mc.core.events.api.interfaces.PlayerProvidingEvent; +import mc.core.events.api.interfaces.WorldProvidingEvent; +import mc.core.events.runner.lock.PoorMansLock; +import mc.core.player.Player; +import mc.core.world.World; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +@Slf4j +public class SharedResourceManager { + private Map pluginLocks = new ConcurrentHashMap<>(); + // TODO: Memory leak HERE. Fix with introducing field to Player class + private Map playerLocks = new ConcurrentHashMap<>(); + // TODO: Memory leak HERE. Fix with introducing field to World class + private Map worldLocks = new ConcurrentHashMap<>(); + + public PoorMansLock getPluginLock(Plugin plugin) { + return pluginLocks.computeIfAbsent(plugin, s -> new PoorMansLock()); + } + + public PoorMansLock getPlayerLock(Player player) { + return playerLocks.computeIfAbsent(player, s -> new PoorMansLock()); + } + + public PoorMansLock getWorldLock(World world) { + return worldLocks.computeIfAbsent(world, s -> new PoorMansLock()); + } + + private T require(LockableResource resource, Event event, Class inter) { + if (inter.isInstance(event)) { + //noinspection unchecked + return (T) event; + } else + throw new IllegalArgumentException("Unable to lock " + resource + " while attempting to process event. Event " + event.getClass().getSimpleName() + " must implement " + inter); + } + + public Collection getAnnotationLocks(LockableResource resource, Event event) { + switch (resource) { + case PLAYER: + return require(resource, event, PlayerProvidingEvent.class).getAssociatedPlayers().stream().map(this::getPlayerLock).collect(Collectors.toList()); + case PLAYER_WORLD: + return require(resource, event, PlayerProvidingEvent.class).getAssociatedPlayers().stream().map(s -> s.getLocation().getWorld()).map(this::getWorldLock).collect(Collectors.toList()); + case EVENT_LOCATION_WORLD: + return require(resource, event, LocationProvidingEvent.class).getAssociatedLocations().stream().map(Location::getWorld).map(this::getWorldLock).collect(Collectors.toList()); + case EVENT_WORLD: + return require(resource, event, WorldProvidingEvent.class).getAssociatedWorlds().stream().map(this::getWorldLock).collect(Collectors.toList()); + default: + log.warn("Unable to find action for " + resource + " resource definition."); + return Collections.emptyList(); + } + } + +} diff --git a/event-loop/src/main/java/mc/core/events/api/EventHandler.java b/event-loop/src/main/java/mc/core/events/api/EventHandler.java new file mode 100644 index 0000000..dbf255d --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/api/EventHandler.java @@ -0,0 +1,17 @@ +package mc.core.events.api; + +import java.lang.annotation.*; + +@Documented +@Target(ElementType.METHOD) +@Inherited +@Retention(RetentionPolicy.RUNTIME) +public @interface EventHandler { + EventPriority priority() default EventPriority.NORMAL; + + boolean ignoreCancelled() default false; + + boolean pluginSynchronize() default true; + + LockableResource[] lock() default {}; +} diff --git a/event-loop/src/main/java/mc/core/events/api/EventPriority.java b/event-loop/src/main/java/mc/core/events/api/EventPriority.java new file mode 100644 index 0000000..0c6fdce --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/api/EventPriority.java @@ -0,0 +1,19 @@ +package mc.core.events.api; + +import lombok.Getter; + +public enum EventPriority { + LOWEST(0), + LOW(1), + NORMAL(2), + HIGH(3), + HIGHEST(4), + MONITOR(5); + + @Getter + private int value; + + EventPriority(int value) { + this.value = value; + } +} diff --git a/event-loop/src/main/java/mc/core/events/api/EventQueueOwner.java b/event-loop/src/main/java/mc/core/events/api/EventQueueOwner.java new file mode 100644 index 0000000..49f4fd4 --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/api/EventQueueOwner.java @@ -0,0 +1,4 @@ +package mc.core.events.api; + +public interface EventQueueOwner { +} diff --git a/event-loop/src/main/java/mc/core/events/api/LockableResource.java b/event-loop/src/main/java/mc/core/events/api/LockableResource.java new file mode 100644 index 0000000..5b86b0a --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/api/LockableResource.java @@ -0,0 +1,10 @@ +package mc.core.events.api; + +public enum LockableResource { + PLAYER, + PLAYER_WORLD, + EVENT_LOCATION_WORLD, + EVENT_WORLD + + // TODO: Add entity-related constants +} diff --git a/event-loop/src/main/java/mc/core/events/api/Plugin.java b/event-loop/src/main/java/mc/core/events/api/Plugin.java new file mode 100644 index 0000000..040ab60 --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/api/Plugin.java @@ -0,0 +1,4 @@ +package mc.core.events.api; + +public interface Plugin { +} diff --git a/event-loop/src/main/java/mc/core/events/api/interfaces/LocationProvidingEvent.java b/event-loop/src/main/java/mc/core/events/api/interfaces/LocationProvidingEvent.java new file mode 100644 index 0000000..bec7040 --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/api/interfaces/LocationProvidingEvent.java @@ -0,0 +1,9 @@ +package mc.core.events.api.interfaces; + +import mc.core.Location; + +import java.util.Collection; + +public interface LocationProvidingEvent { + Collection getAssociatedLocations(); +} diff --git a/event-loop/src/main/java/mc/core/events/api/interfaces/PlayerProvidingEvent.java b/event-loop/src/main/java/mc/core/events/api/interfaces/PlayerProvidingEvent.java new file mode 100644 index 0000000..d05af39 --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/api/interfaces/PlayerProvidingEvent.java @@ -0,0 +1,21 @@ +package mc.core.events.api.interfaces; + +import mc.core.Location; +import mc.core.player.Player; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public interface PlayerProvidingEvent extends LocationProvidingEvent { + List getAssociatedPlayers(); + + @Override + default Collection getAssociatedLocations() { + List players = getAssociatedPlayers(); + if (players.size() == 1) + return Collections.singletonList(players.get(0).getLocation()); + else + throw new RuntimeException("This method is not implemented."); + } +} diff --git a/event-loop/src/main/java/mc/core/events/api/interfaces/WorldProvidingEvent.java b/event-loop/src/main/java/mc/core/events/api/interfaces/WorldProvidingEvent.java new file mode 100644 index 0000000..9f561e5 --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/api/interfaces/WorldProvidingEvent.java @@ -0,0 +1,10 @@ +package mc.core.events.api.interfaces; + +import mc.core.world.World; + +import java.util.Collection; + +public interface WorldProvidingEvent { + Collection getAssociatedWorlds(); + +} diff --git a/event-loop/src/main/java/mc/core/events/api/samples/BlockBreakEvent.java b/event-loop/src/main/java/mc/core/events/api/samples/BlockBreakEvent.java new file mode 100644 index 0000000..a515022 --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/api/samples/BlockBreakEvent.java @@ -0,0 +1,31 @@ +package mc.core.events.api.samples; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import mc.core.Location; +import mc.core.events.EventBase; +import mc.core.events.api.interfaces.LocationProvidingEvent; +import mc.core.events.api.interfaces.PlayerProvidingEvent; +import mc.core.player.Player; +import mc.core.world.block.Block; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +@RequiredArgsConstructor +@Getter +public class BlockBreakEvent extends EventBase implements PlayerProvidingEvent, LocationProvidingEvent { + private final Player player; + private final Block block; + + @Override + public List getAssociatedPlayers() { + return Collections.singletonList(player); + } + + @Override + public Collection getAssociatedLocations() { + return Collections.singletonList(block.getLocation()); + } +} diff --git a/event-loop/src/main/java/mc/core/events/runner/AllInScheduleStrategy.java b/event-loop/src/main/java/mc/core/events/runner/AllInScheduleStrategy.java new file mode 100644 index 0000000..ce275bd --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/runner/AllInScheduleStrategy.java @@ -0,0 +1,59 @@ +package mc.core.events.runner; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; + +/** + * Simple scheduling strategy. + *

+ * We wait until the first task in a queue will be able to acquire all + * the necessary resources and then we schedule it for execution + */ +public class AllInScheduleStrategy implements ScheduleStrategy { + private BlockingQueue globalQueue; + private ResourceAwareExecutorService resourceAwareExecutorService; + + public AllInScheduleStrategy(ResourceAwareExecutorService resourceAwareExecutorService) { + this.globalQueue = resourceAwareExecutorService.queue; + this.resourceAwareExecutorService = resourceAwareExecutorService; + } + + + @Override + public synchronized ResourceAwareRunnable getTask() throws InterruptedException { + waitForResourceLockComplete(); + + // Wait for new task in queue + ResourceAwareRunnable runnable = globalQueue.take(); + while (!runnable.getLocks().isReady()) { + CountDownLatch latch = new CountDownLatch(1); + runnable.getLocks().setCallback(latch::countDown); + // Prevent situations where dependencies were resolved + // while we were setting up the callback + if (runnable.getLocks().isReady()) + continue; + latch.await(); + } + + // Lock execution for the next thread + // (wait until resources for previous task will be blocked) + resourceAwareExecutorService.waitForLock.set(true); + return runnable; + } + + /** + * Waits until the last scheduled task will lock all the necessary resources. + *

+ * It is required to avoid race-condition when an execution candidate task (first task in a queue) + * skips lock-await procedure due to the last scheduled task not having locked necessary resources yet. + * + * @throws InterruptedException if current thread is interrupted + */ + private void waitForResourceLockComplete() throws InterruptedException { + synchronized (resourceAwareExecutorService.waitForLock) { + while (resourceAwareExecutorService.waitForLock.get()) { + resourceAwareExecutorService.wait(); + } + } + } +} diff --git a/event-loop/src/main/java/mc/core/events/runner/ExecutorWorkerThread.java b/event-loop/src/main/java/mc/core/events/runner/ExecutorWorkerThread.java new file mode 100644 index 0000000..e1e5f7b --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/runner/ExecutorWorkerThread.java @@ -0,0 +1,55 @@ +package mc.core.events.runner; + +/** + * Worker thread for {@link ResourceAwareExecutorService}. + *

+ * - Awaits for tasks from {@link ScheduleStrategy} + * - Locks up resources for this task + * - Notifies {@link ScheduleStrategy} when resource-locking procedure is complete + * - Executes the runnable in this thread + * - Unlocks all the resources + * - Calls {@link ResourceAwareRunnable#after()} callback + */ +public class ExecutorWorkerThread extends Thread { + private ResourceAwareExecutorService service; + + public ExecutorWorkerThread(String name, ResourceAwareExecutorService service) { + super(name); + this.service = service; + } + + @Override + public void run() { + while (!isInterrupted() && isAlive()) { + ResourceAwareRunnable runnable; + try { + runnable = service.getStrategy().getTask(); + } catch (InterruptedException e) { + return; + } + + executeTask(runnable); + } + } + + void executeTask(ResourceAwareRunnable runnable) { + runnable.getLocks().lockAll(); + notifyLockingDone(); + try { + runnable.run(); + } finally { + runnable.getLocks().unlockAll(); + runnable.getLocks().release(); + } + runnable.after(); + } + + private void notifyLockingDone() { + synchronized (service.waitForLock) { + if (service.waitForLock.get()) { + service.waitForLock.set(false); + service.waitForLock.notifyAll(); + } + } + } +} diff --git a/event-loop/src/main/java/mc/core/events/runner/ResourceAwareExecutorService.java b/event-loop/src/main/java/mc/core/events/runner/ResourceAwareExecutorService.java new file mode 100644 index 0000000..0ec99ca --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/runner/ResourceAwareExecutorService.java @@ -0,0 +1,74 @@ +package mc.core.events.runner; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** + * Custom implementation of an ExecutorService. + * + * Holds a queue of {@link ResourceAwareRunnable} and executes them in a thread pool. + * + * Warning! This class doesn't guarantee, that tasks will be executed in any specific order. + * In fact, it's up to {@link ScheduleStrategy} to decide which task will be scheduled for + * execution next. + */ +public class ResourceAwareExecutorService { + private static final boolean WORKER_INSTANT_EXECUTE = false; + BlockingQueue queue = new ArrayBlockingQueue<>(100); + // A synchronize aid, that prevents ScheduleStrategy from returning + // wrong tasks when executor is late in blocking resources + final AtomicBoolean waitForLock = new AtomicBoolean(false); + private ScheduleStrategy strategy = new AllInScheduleStrategy(this); + private Set executorThreads = new HashSet<>(); + private int threadCount; + + public ResourceAwareExecutorService(int threadCount) { + this.threadCount = threadCount; + } + + public void start() { + if (executorThreads.size() > 0) + throw new RuntimeException("This executor service was already started."); + + for (int i = 0; i < threadCount; i++) { + Thread thread = new ExecutorWorkerThread("Event Loop #" + i, this); + executorThreads.add(thread); + thread.start(); + } + } + + public void stop() { + if (executorThreads.size() == 0) + throw new RuntimeException("This executor service was not initialized yet."); + + Iterator iterator = executorThreads.iterator(); + while (iterator.hasNext()) { + Thread thread = iterator.next(); + thread.interrupt(); + iterator.remove(); + } + } + + public void addTask(ResourceAwareRunnable task) { + if (WORKER_INSTANT_EXECUTE && Thread.currentThread() instanceof ExecutorWorkerThread) { + ((ExecutorWorkerThread) Thread.currentThread()).executeTask(task); + } else + queue.offer(task); + } + + + public ScheduleStrategy getStrategy() { + return strategy; + } + + private class DefaultScheduleStrategy implements ScheduleStrategy { + public ResourceAwareRunnable getTask() throws InterruptedException { + return queue.take(); + } + } +} diff --git a/event-loop/src/main/java/mc/core/events/runner/ResourceAwareRunnable.java b/event-loop/src/main/java/mc/core/events/runner/ResourceAwareRunnable.java new file mode 100644 index 0000000..6f88476 --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/runner/ResourceAwareRunnable.java @@ -0,0 +1,13 @@ +package mc.core.events.runner; + +import mc.core.events.runner.lock.LockObserveList; + +public interface ResourceAwareRunnable extends Runnable { + default LockObserveList getLocks() { + return LockObserveList.EMPTY_LIST; + } + + default void after() { + + } +} diff --git a/event-loop/src/main/java/mc/core/events/runner/ScheduleStrategy.java b/event-loop/src/main/java/mc/core/events/runner/ScheduleStrategy.java new file mode 100644 index 0000000..10cee55 --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/runner/ScheduleStrategy.java @@ -0,0 +1,5 @@ +package mc.core.events.runner; + +public interface ScheduleStrategy { + ResourceAwareRunnable getTask() throws InterruptedException; +} diff --git a/event-loop/src/main/java/mc/core/events/runner/lock/LockObserveList.java b/event-loop/src/main/java/mc/core/events/runner/lock/LockObserveList.java new file mode 100644 index 0000000..157d325 --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/runner/lock/LockObserveList.java @@ -0,0 +1,61 @@ +package mc.core.events.runner.lock; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +public class LockObserveList implements Consumer { + public static LockObserveList EMPTY_LIST = new LockObserveList(); + private List locks = new ArrayList<>(); + private Runnable callback; + + public void setCallback(Runnable callback) { + this.callback = callback; + } + + public void add(PoorMansLock lock) { + locks.add(lock); + lock.addCallback(this); + } + + public void addAll(Iterable locks) { + for (PoorMansLock lock : locks) + add(lock); + } + + public void release() { + callback = null; + for (PoorMansLock lock : locks) { + lock.removeCallback(this); + } + locks.clear(); + } + + public boolean isReady() { + for (PoorMansLock lock : locks) { + if (lock.isLocked()) + return false; + } + return true; + } + + public void lockAll() { + for (PoorMansLock lock : locks) + lock.lock(); + } + + public void unlockAll() { + for (PoorMansLock lock : locks) + lock.unlock(); + } + + @Override + public void accept(PoorMansLock lock) { + if (!lock.isLocked()) { + if (isReady()) { + if (callback != null) + callback.run(); + } + } + } +} diff --git a/event-loop/src/main/java/mc/core/events/runner/lock/PoorMansLock.java b/event-loop/src/main/java/mc/core/events/runner/lock/PoorMansLock.java new file mode 100644 index 0000000..ace2edc --- /dev/null +++ b/event-loop/src/main/java/mc/core/events/runner/lock/PoorMansLock.java @@ -0,0 +1,52 @@ +package mc.core.events.runner.lock; + +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.function.Consumer; + +public class PoorMansLock { + private Thread owner = null; + private Set> callbacks = new CopyOnWriteArraySet<>(); + + public void addCallback(Consumer callback) { + callbacks.add(callback); + } + + public void removeCallback(Consumer callback) { + callbacks.remove(callback); + } + + + public boolean isLocked() { + return owner != null; + } + + private void triggerUpdate() { + for (Consumer consumer : callbacks) + consumer.accept(this); + } + + public synchronized void lock() { + if(owner == Thread.currentThread()) + return; + + if (owner != null) { + throw new RuntimeException("Unable to lock this resource: already in use"); + } + + owner = Thread.currentThread(); + triggerUpdate(); + } + + public synchronized void unlock() { + if (owner == null) + return; + + if (owner != Thread.currentThread()) { + throw new RuntimeException("Attempt to unlock resource from non-owning thread"); + } + + owner = null; + triggerUpdate(); + } +} diff --git a/event-loop/src/main/java/mc/core/timings/ThreadTimings.java b/event-loop/src/main/java/mc/core/timings/ThreadTimings.java new file mode 100644 index 0000000..408842b --- /dev/null +++ b/event-loop/src/main/java/mc/core/timings/ThreadTimings.java @@ -0,0 +1,45 @@ +package mc.core.timings; + +import java.util.Stack; +import java.util.concurrent.atomic.AtomicInteger; + +public class ThreadTimings { + private static AtomicInteger IDS = new AtomicInteger(); + private int threadId; + private Stack stack = new Stack<>(); + + public ThreadTimings() { + this.threadId = IDS.getAndIncrement(); + } + + public Stack getStack() { + return stack; + } + + public int getThreadId() { + return threadId; + } + + public Timings start() { + Timings timings = new Timings(this, stack.size()); + getTimingsManager().waitForTimingsInitialize(); + stack.push(timings); + getTimingsManager().notifyTimings(this, timings, true); + return timings; + } + + private TimingsManager getTimingsManager() { + return Timings.getTimingsManager(); + } + + public void end(Timings finished) { + Timings timings = null; + while (!stack.isEmpty() && timings != finished) { + getTimingsManager().waitForTimingsInitialize(); + timings = stack.pop(); + if (!timings.hasFinished()) + timings.finish(); + getTimingsManager().notifyTimings(this, timings, false); + } + } +} diff --git a/event-loop/src/main/java/mc/core/timings/Timings.java b/event-loop/src/main/java/mc/core/timings/Timings.java new file mode 100644 index 0000000..b9d1252 --- /dev/null +++ b/event-loop/src/main/java/mc/core/timings/Timings.java @@ -0,0 +1,51 @@ +package mc.core.timings; + +public class Timings implements AutoCloseable { + private ThreadTimings threadTimings; + private long acquireTime; + @SuppressWarnings("FieldCanBeLocal") + private long endTime = -1; + private int id; + + public Timings(ThreadTimings threadTimings, int id) { + this.id = id; + this.threadTimings = threadTimings; + this.acquireTime = System.nanoTime(); + } + + public static Timings start() { + return TimingsStaticAccessor.getTimingsManager().getCurrentThreadTimings().start(); + } + + public static TimingsManager getTimingsManager() { + return TimingsStaticAccessor.getTimingsManager(); + } + + public int getId() { + return id; + } + + public long getEndTime() { + return endTime; + } + + public long getAcquireTime() { + return acquireTime; + } + + public boolean hasFinished() { + return endTime != -1; + } + + public void finish() { + if (hasFinished()) + throw new IllegalStateException("This timing was already finished"); + this.endTime = System.nanoTime(); + } + + @Override + public void close() { + finish(); + this.threadTimings.end(this); + } +} diff --git a/event-loop/src/main/java/mc/core/timings/TimingsEventType.java b/event-loop/src/main/java/mc/core/timings/TimingsEventType.java new file mode 100644 index 0000000..181643a --- /dev/null +++ b/event-loop/src/main/java/mc/core/timings/TimingsEventType.java @@ -0,0 +1,17 @@ +package mc.core.timings; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public enum TimingsEventType { + TIMINGS_START((short) 0), + TIMINGS_END((short) 1), + TIMINGS_FILE_INITIALIZING((short) 2), + TIMINGS_FILE_INITIALIZED((short) 3), + TIMINGS_CHANGE_THREAD_OPTIONS((short) 4), + TIMINGS_FILE_END((short) 5); + + @Getter + private final short id; +} \ No newline at end of file diff --git a/event-loop/src/main/java/mc/core/timings/TimingsManager.java b/event-loop/src/main/java/mc/core/timings/TimingsManager.java new file mode 100644 index 0000000..3e8ea66 --- /dev/null +++ b/event-loop/src/main/java/mc/core/timings/TimingsManager.java @@ -0,0 +1,161 @@ +package mc.core.timings; + +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import mc.core.timings.io.DefaultWriterFactory; +import mc.core.timings.io.TimingsWriter; +import mc.core.timings.io.TimingsWriterFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; + +@Slf4j +public class TimingsManager { + private final Map threadTimings = new ConcurrentHashMap<>(); + // These variables are essential in Timings thread synchronization + private final AtomicBoolean waitForFile = new AtomicBoolean(false); + private TimingsWriter writer; + private Thread timingsIoThread; + private CountDownLatch ioThreadStopMutex; + private BlockingQueue queue; + private ReentrantLock queueAccessLock = new ReentrantLock(); + // For modularity purposes + @Autowired + @Setter + private TimingsWriterFactory writerFactory = new DefaultWriterFactory(); + + public TimingsManager() { + TimingsStaticAccessor.TIMINGS_MANAGER = this; + } + + public void startRecording(File file) { + synchronized (waitForFile) { + waitForFile.set(true); + } + try { + writer = writerFactory.newInstance(file); + writer.writeEvent(0, 0, System.nanoTime(), TimingsEventType.TIMINGS_FILE_INITIALIZING); + // Synchronize current thread state + for (Map.Entry pair : threadTimings.entrySet()) { + writer.writeEvent(pair.getValue().getThreadId(), 0, System.nanoTime(), TimingsEventType.TIMINGS_CHANGE_THREAD_OPTIONS, "name: " + pair.getKey().getName()); + for (Timings timings : pair.getValue().getStack()) { + writer.writeEvent(pair.getValue().getThreadId(), timings.getId(), timings.getAcquireTime(), TimingsEventType.TIMINGS_START); + } + } + writer.writeEvent(0, 0, System.nanoTime(), TimingsEventType.TIMINGS_FILE_INITIALIZED); + queue = new ArrayBlockingQueue<>(200); + ioThreadStopMutex = new CountDownLatch(1); + timingsIoThread = new Thread() { + @Override + public void run() { + try { + while (!isInterrupted() && isAlive()) { + TimingsRecord record; + try { + if (queue == null) + return; + record = queue.take(); + } catch (InterruptedException e) { + return; + } + record.writeToFile(writer); + } + } finally { + ioThreadStopMutex.countDown(); + } + } + }; + timingsIoThread.setName("Timings IO thread"); + timingsIoThread.start(); + } catch (Exception e) { + log.error("Unable to start timings recording", e); + } + synchronized (waitForFile) { + waitForFile.set(false); + waitForFile.notifyAll(); + } + } + + public void stopRecording() { + // Disable write queue + queueAccessLock.lock(); + queue = null; + queueAccessLock.unlock(); + // Interrupt thread and wait until in finished writing the last task + timingsIoThread.interrupt(); + try { + ioThreadStopMutex.await(); + } catch (InterruptedException e) { + log.error("Unable to wait until last record would be written to file", e); + } + // Write EOF event + writer.writeEvent(0, 0, System.nanoTime(), TimingsEventType.TIMINGS_FILE_END); + // Unload file + try { + writer.close(); + } catch (IOException e) { + log.error("Unable to close timings file", e); + } + writer = null; + } + + void notifyTimings(ThreadTimings thread, Timings timings, boolean start) { + if (queue == null) + return; + queueAccessLock.lock(); + try { + if (queue != null) + queue.offer( + new TimingsRecord(thread.getThreadId(), + timings.getId(), + start ? timings.getAcquireTime() : timings.getEndTime(), + start ? TimingsEventType.TIMINGS_START : TimingsEventType.TIMINGS_END + ) + ); + } finally { + queueAccessLock.unlock(); + } + } + + void waitForTimingsInitialize() { + synchronized (waitForFile) { + while (waitForFile.get()) { + try { + waitForFile.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + } + + public ThreadTimings getCurrentThreadTimings() { + + synchronized (this.threadTimings) { + if (this.threadTimings.containsKey(Thread.currentThread())) { + return this.threadTimings.get(Thread.currentThread()); + } else { + ThreadTimings timings = new ThreadTimings(); + this.threadTimings.put(Thread.currentThread(), timings); + if (queue != null) { + try { + writer.writeEvent(timings.getThreadId(), 0, System.nanoTime(), TimingsEventType.TIMINGS_CHANGE_THREAD_OPTIONS, "name: " + Thread.currentThread().getName()); + } catch (NullPointerException ignored) { + // It means that there the file recording was stopped + // we don't actually care about it + } + } + return timings; + } + } + } +} diff --git a/event-loop/src/main/java/mc/core/timings/TimingsRecord.java b/event-loop/src/main/java/mc/core/timings/TimingsRecord.java new file mode 100644 index 0000000..3d3e3f6 --- /dev/null +++ b/event-loop/src/main/java/mc/core/timings/TimingsRecord.java @@ -0,0 +1,33 @@ +package mc.core.timings; + +import mc.core.timings.io.TimingsWriter; + +class TimingsRecord { + private int threadId; + private int stackId; + private long time; + private TimingsEventType eventType; + private String data; + + public TimingsRecord(int threadId, int stackId, long time, TimingsEventType eventType) { + this.threadId = threadId; + this.stackId = stackId; + this.time = time; + this.eventType = eventType; + } + + public TimingsRecord(int threadId, int stackId, long time, TimingsEventType eventType, String data) { + this.threadId = threadId; + this.stackId = stackId; + this.time = time; + this.eventType = eventType; + this.data = data; + } + + public void writeToFile(TimingsWriter fileWriter) { + if (data == null) + fileWriter.writeEvent(threadId, stackId, time, eventType); + else + fileWriter.writeEvent(threadId, stackId, time, eventType, data); + } +} diff --git a/event-loop/src/main/java/mc/core/timings/TimingsStaticAccessor.java b/event-loop/src/main/java/mc/core/timings/TimingsStaticAccessor.java new file mode 100644 index 0000000..35d0ed4 --- /dev/null +++ b/event-loop/src/main/java/mc/core/timings/TimingsStaticAccessor.java @@ -0,0 +1,9 @@ +package mc.core.timings; + +public class TimingsStaticAccessor { + static TimingsManager TIMINGS_MANAGER; + + public static TimingsManager getTimingsManager() { + return TIMINGS_MANAGER != null ? TIMINGS_MANAGER : (TIMINGS_MANAGER = new TimingsManager()); + } +} diff --git a/event-loop/src/main/java/mc/core/timings/io/DefaultWriterFactory.java b/event-loop/src/main/java/mc/core/timings/io/DefaultWriterFactory.java new file mode 100644 index 0000000..192ee03 --- /dev/null +++ b/event-loop/src/main/java/mc/core/timings/io/DefaultWriterFactory.java @@ -0,0 +1,11 @@ +package mc.core.timings.io; + +import java.io.File; +import java.io.IOException; + +public class DefaultWriterFactory implements TimingsWriterFactory { + @Override + public TimingsWriter newInstance(File file) throws IOException { + return new TimingsFileWriter(file); + } +} diff --git a/event-loop/src/main/java/mc/core/timings/io/TimingsFileWriter.java b/event-loop/src/main/java/mc/core/timings/io/TimingsFileWriter.java new file mode 100644 index 0000000..e2b593c --- /dev/null +++ b/event-loop/src/main/java/mc/core/timings/io/TimingsFileWriter.java @@ -0,0 +1,63 @@ +package mc.core.timings.io; + + +import lombok.extern.slf4j.Slf4j; +import mc.core.timings.TimingsEventType; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.concurrent.locks.ReentrantLock; + +@SuppressWarnings("Duplicates") +@Slf4j +public class TimingsFileWriter implements TimingsWriter { + private FileOutputStream fileOutputStream; + private ObjectOutputStream writer; + private ReentrantLock lock = new ReentrantLock(); + + public TimingsFileWriter(File saveFile) throws IOException { + fileOutputStream = new FileOutputStream(saveFile); + writer = new ObjectOutputStream(fileOutputStream); + } + + @Override + public void writeEvent(int threadId, int stackId, long time, TimingsEventType type) { + lock.lock(); + try { + writer.writeInt(threadId); + writer.writeInt(stackId); + writer.writeLong(time); + writer.writeShort(type.getId()); + writer.writeBoolean(false); + } catch (IOException e) { + log.error("Unable to write timings record", e); + } finally { + lock.unlock(); + } + } + + @Override + public void writeEvent(int threadId, int stackId, long time, TimingsEventType type, String data) { + lock.lock(); + try { + writer.writeInt(threadId); + writer.writeInt(stackId); + writer.writeLong(time); + writer.writeShort(type.getId()); + writer.writeBoolean(true); + writer.writeUTF(data); + } catch (IOException e) { + log.error("Unable to write timings record", e); + } finally { + lock.unlock(); + } + } + + @Override + public void close() throws IOException { + writer.close(); + fileOutputStream.close(); + } +} diff --git a/event-loop/src/main/java/mc/core/timings/io/TimingsLogWriter.java b/event-loop/src/main/java/mc/core/timings/io/TimingsLogWriter.java new file mode 100644 index 0000000..0fa5fa3 --- /dev/null +++ b/event-loop/src/main/java/mc/core/timings/io/TimingsLogWriter.java @@ -0,0 +1,24 @@ +package mc.core.timings.io; + +import lombok.extern.slf4j.Slf4j; +import mc.core.timings.TimingsEventType; + +import java.io.IOException; + +@Slf4j +public class TimingsLogWriter implements TimingsWriter { + @Override + public void writeEvent(int threadId, int stackId, long time, TimingsEventType type) { + log.info("[{}] Thread #{}, Stack #{}: {}", time, threadId, stackId, type.toString()); + } + + @Override + public void writeEvent(int threadId, int stackId, long time, TimingsEventType type, String data) { + log.info("[{}] Thread #{}, Stack #{}: {} ({})", time, threadId, stackId, type.toString(), data); + } + + @Override + public void close() throws IOException { + + } +} diff --git a/event-loop/src/main/java/mc/core/timings/io/TimingsWriter.java b/event-loop/src/main/java/mc/core/timings/io/TimingsWriter.java new file mode 100644 index 0000000..640bdd5 --- /dev/null +++ b/event-loop/src/main/java/mc/core/timings/io/TimingsWriter.java @@ -0,0 +1,13 @@ +package mc.core.timings.io; + +import mc.core.timings.TimingsEventType; + +import java.io.IOException; + +public interface TimingsWriter { + void writeEvent(int threadId, int stackId, long time, TimingsEventType type); + + void writeEvent(int threadId, int stackId, long time, TimingsEventType type, String data); + + void close() throws IOException; +} diff --git a/event-loop/src/main/java/mc/core/timings/io/TimingsWriterFactory.java b/event-loop/src/main/java/mc/core/timings/io/TimingsWriterFactory.java new file mode 100644 index 0000000..db12e6a --- /dev/null +++ b/event-loop/src/main/java/mc/core/timings/io/TimingsWriterFactory.java @@ -0,0 +1,8 @@ +package mc.core.timings.io; + +import java.io.File; +import java.io.IOException; + +public interface TimingsWriterFactory { + TimingsWriter newInstance(File file) throws IOException; +} diff --git a/event-loop/src/test/java/mc/core/events/EventExecutorTest.java b/event-loop/src/test/java/mc/core/events/EventExecutorTest.java new file mode 100644 index 0000000..a6c6c14 --- /dev/null +++ b/event-loop/src/test/java/mc/core/events/EventExecutorTest.java @@ -0,0 +1,29 @@ +package mc.core.events; + +import mc.core.events.runner.ResourceAwareExecutorService; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class EventExecutorTest { + + @Test + public void basicTest() throws InterruptedException { + AtomicBoolean testVariable = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + ResourceAwareExecutorService service = new ResourceAwareExecutorService(1); + service.start(); + service.addTask(() -> { + testVariable.set(true); + latch.countDown(); + }); + + latch.await(1, TimeUnit.SECONDS); + service.stop(); + Assert.assertTrue("Scheduled task was not executed", testVariable.get()); + + } +} diff --git a/event-loop/src/test/java/mc/core/events/EventLoopTest.java b/event-loop/src/test/java/mc/core/events/EventLoopTest.java new file mode 100644 index 0000000..a92e738 --- /dev/null +++ b/event-loop/src/test/java/mc/core/events/EventLoopTest.java @@ -0,0 +1,169 @@ +package mc.core.events; + +import mc.core.events.api.EventHandler; +import mc.core.events.api.EventPriority; +import mc.core.events.api.EventQueueOwner; +import mc.core.events.api.Plugin; +import mc.core.events.runner.ResourceAwareExecutorService; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +@SuppressWarnings("Duplicates") +public class EventLoopTest { + + @Test + public void basicTest() throws InterruptedException { + Plugin plugin = new Plugin() { + }; + + EventQueueOwner queueOwner = new EventQueueOwner() { + }; + + + CountDownLatch latch = new CountDownLatch(1); + FullAsyncEventLoop eventLoop = new FullAsyncEventLoop(); + eventLoop.addEventHandler(plugin, new Object() { + @EventHandler + public void onLoginEvent(LoginEvent event) { + + latch.countDown(); + } + }); + + ResourceAwareExecutorService service = new ResourceAwareExecutorService(1); + service.start(); + + eventLoop.setResourceAwareExecutorService(service); + eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null)); + + latch.await(1, TimeUnit.SECONDS); + Assert.assertEquals("Event was not called", 0, latch.getCount()); + } + + @Test + public void consecutiveExecutionTest() throws InterruptedException { + Plugin plugin = new Plugin() { + }; + + EventQueueOwner queueOwner = new EventQueueOwner() { + }; + + + CountDownLatch latch = new CountDownLatch(2); + FullAsyncEventLoop eventLoop = new FullAsyncEventLoop(); + eventLoop.addEventHandler(plugin, new Object() { + @EventHandler + public void onLoginEvent(LoginEvent event) { + + latch.countDown(); + } + }); + + ResourceAwareExecutorService service = new ResourceAwareExecutorService(1); + service.start(); + + eventLoop.setResourceAwareExecutorService(service); + + eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null)); + eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null)); + + latch.await(1, TimeUnit.SECONDS); + Assert.assertEquals("Event was not called", 0, latch.getCount()); + } + + @Test + public void prioritySystemTest() throws InterruptedException { + Plugin plugin = new Plugin() { + }; + + EventQueueOwner queueOwner = new EventQueueOwner() { + }; + + + CountDownLatch latch = new CountDownLatch(3); + FullAsyncEventLoop eventLoop = new FullAsyncEventLoop(); + List priorities = new ArrayList<>(3); + + eventLoop.addEventHandler(plugin, new Object() { + @EventHandler(priority = EventPriority.NORMAL) + public void login1(LoginEvent event) { + priorities.add(0); + latch.countDown(); + } + + @EventHandler(priority = EventPriority.HIGHEST) + public void login2(LoginEvent event) { + priorities.add(1); + latch.countDown(); + } + + @EventHandler(priority = EventPriority.LOWEST) + public void login3(LoginEvent event) { + priorities.add(2); + latch.countDown(); + } + }); + + ResourceAwareExecutorService service = new ResourceAwareExecutorService(1); + service.start(); + + eventLoop.setResourceAwareExecutorService(service); + + eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null)); + + latch.await(1, TimeUnit.SECONDS); + Assert.assertEquals("Incorrect call sequence", "[2, 0, 1]", priorities.toString()); + } + + @Test + public void ignoreCancelledTest() throws InterruptedException { + Plugin plugin = new Plugin() { + }; + + EventQueueOwner queueOwner = new EventQueueOwner() { + }; + + + CountDownLatch latch = new CountDownLatch(1); + FullAsyncEventLoop eventLoop = new FullAsyncEventLoop(); + List priorities = new ArrayList<>(2); + + eventLoop.addEventHandler(plugin, new Object() { + @EventHandler(priority = EventPriority.NORMAL, ignoreCancelled = true) + public void login1(LoginEvent event) { + priorities.add(0); + event.setCanceled(true); + } + + @EventHandler(priority = EventPriority.HIGHEST, ignoreCancelled = true) + public void login2(LoginEvent event) { + priorities.add(1); + } + + @EventHandler(priority = EventPriority.LOWEST, ignoreCancelled = true) + public void login3(LoginEvent event) { + priorities.add(2); + } + + @EventHandler(priority = EventPriority.MONITOR) + public void monitor(LoginEvent event) { + latch.countDown(); + } + }); + + ResourceAwareExecutorService service = new ResourceAwareExecutorService(1); + service.start(); + + eventLoop.setResourceAwareExecutorService(service); + + eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null)); + + latch.await(1, TimeUnit.SECONDS); + Assert.assertEquals("Incorrect call sequence", "[2, 0]", priorities.toString()); + } +} diff --git a/event-loop/src/test/java/mc/core/events/LockTest.java b/event-loop/src/test/java/mc/core/events/LockTest.java new file mode 100644 index 0000000..e1e65a2 --- /dev/null +++ b/event-loop/src/test/java/mc/core/events/LockTest.java @@ -0,0 +1,90 @@ +package mc.core.events; + +import mc.core.events.runner.lock.LockObserveList; +import mc.core.events.runner.lock.PoorMansLock; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +public class LockTest { + @Test + public void basicTest() throws InterruptedException { + AtomicBoolean engageCallbackCalled = new AtomicBoolean(false); + AtomicBoolean disengageCallbackCalled = new AtomicBoolean(false); + + PoorMansLock lock = new PoorMansLock(); + lock.addCallback(lock1 -> { + if (lock1.isLocked()) + engageCallbackCalled.set(true); + else + disengageCallbackCalled.set(true); + }); + lock.lock(); + Assert.assertTrue("Lock is not locked", lock.isLocked()); + Assert.assertTrue("Engage callback was not called", engageCallbackCalled.get()); + + engageCallbackCalled.set(false); + try { + lock.lock(); + Assert.assertFalse("Engage callback was called from attempt to block from the same thread", engageCallbackCalled.get()); + } catch (Exception ex) { + Assert.fail("Exception fired while attempting to lock from the same thread"); + return; + } + + Assert.assertFalse("Disengage callback was called while not actually disengaging [x1]", disengageCallbackCalled.get()); + + AtomicBoolean lockExceptionFired = new AtomicBoolean(false); + AtomicBoolean unlockExceptionFired = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> { + try { + lock.lock(); + } catch (Exception ex) { + lockExceptionFired.set(true); + } + try { + lock.unlock(); + } catch (Exception ex) { + unlockExceptionFired.set(true); + } + latch.countDown(); + }).start(); + + latch.await(); + Assert.assertTrue("Exception was not fired on concurrent lock attempt", lockExceptionFired.get()); + Assert.assertTrue("Exception was not fired on non-owner unlock attempt", unlockExceptionFired.get()); + Assert.assertFalse("Disengage callback was called while not actually disengaging [x2]", disengageCallbackCalled.get()); + + lock.unlock(); + Assert.assertTrue("Disengage callback was on called on lock disengage", disengageCallbackCalled.get()); + } + + @Test + public void observeListTest() { + PoorMansLock lock1 = new PoorMansLock(); + PoorMansLock lock2 = new PoorMansLock(); + + LockObserveList list = new LockObserveList(); + list.add(lock1); + list.add(lock2); + + Assert.assertTrue("LockObserveList was no able to correctly identify lock states for unlocked locks", list.isReady()); + lock1.lock(); + Assert.assertFalse("LockObserveList was no able to correctly identify lock states for list with one locked lock", list.isReady()); + + + AtomicBoolean listReadyCallbackCalled = new AtomicBoolean(false); + list.setCallback(() -> listReadyCallbackCalled.set(true)); + lock2.lock(); + + Assert.assertFalse("Callback was called when another lock got engaged", listReadyCallbackCalled.get()); + lock1.unlock(); + Assert.assertFalse("Callback was called while one lock is still locked", listReadyCallbackCalled.get()); + lock2.unlock(); + Assert.assertTrue("Callback was not called when both locks are actually free", listReadyCallbackCalled.get()); + + } +} diff --git a/event-loop/src/test/java/mc/core/timings/TimingsTest.java b/event-loop/src/test/java/mc/core/timings/TimingsTest.java new file mode 100644 index 0000000..a3e6200 --- /dev/null +++ b/event-loop/src/test/java/mc/core/timings/TimingsTest.java @@ -0,0 +1,51 @@ +package mc.core.timings; + +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +public class TimingsTest { + @Test + public void basicTest() { + try (Timings timings = Timings.start()) { + System.out.println("Test code"); + } + } + + @Test + public void brokenTimingTest() { + try (Timings timings = Timings.start()) { + Timings t1 = Timings.start(); + Timings.start(); + System.out.println("Pre Close t1"); + t1.close(); + System.out.println("Finished"); + } + } + + @Test + public void fileRecording() throws IOException { + Timings.getTimingsManager().startRecording(new File("test.timings")); + + try (Timings t1 = Timings.start()) { + try { + Thread.sleep(20); + } catch (InterruptedException e) { + e.printStackTrace(); + } + try (Timings t2 = Timings.start()) { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + Thread.sleep(5); + + } catch (InterruptedException e) { + e.printStackTrace(); + } + + + Timings.getTimingsManager().stopRecording(); + } +} diff --git a/proto_1.12.2_netty/src/main/java/mc/core/network/proto_1_12_2/netty/handlers/LoginHandler.java b/proto_1.12.2_netty/src/main/java/mc/core/network/proto_1_12_2/netty/handlers/LoginHandler.java index 6e712ee..d8a2cbc 100644 --- a/proto_1.12.2_netty/src/main/java/mc/core/network/proto_1_12_2/netty/handlers/LoginHandler.java +++ b/proto_1.12.2_netty/src/main/java/mc/core/network/proto_1_12_2/netty/handlers/LoginHandler.java @@ -10,7 +10,6 @@ import mc.core.network.proto_1_12_2.State; import mc.core.network.proto_1_12_2.TeleportManager; import mc.core.network.proto_1_12_2.netty.wrappers.WrapperNetChannel; import mc.core.network.proto_1_12_2.packets.*; -import mc.core.player.Look; import mc.core.player.Player; import mc.core.player.PlayerManager; import mc.core.player.PlayerMode; @@ -48,8 +47,8 @@ public class LoginHandler extends AbstractStateHandler implements LoginStateHand Player player = playerManager.getPlayer(packet.getPlayerName()) .orElseGet(() -> playerManager.createPlayer( packet.getPlayerName(), - world.getSpawn(), - new Look(0f, 0f))); + world.getSpawn().getLocation(), + world.getSpawn().getLook())); channel.writeAndFlush(new LoginSuccessPacket( player.getUUID(), @@ -68,7 +67,7 @@ public class LoginHandler extends AbstractStateHandler implements LoginStateHand // Spawn Position SpawnPositionPacket pkt2 = new SpawnPositionPacket(); - pkt2.setLocation(world.getSpawn()); + pkt2.setLocation(world.getSpawn().getLocation()); channel.write(pkt2); // Player Abilities @@ -84,7 +83,7 @@ public class LoginHandler extends AbstractStateHandler implements LoginStateHand ChunkDataPacket pkt8 = new ChunkDataPacket(); pkt8.setX(0); pkt8.setZ(0); - pkt8.getChunks().add(world.getChunk(0, 0,0)); + pkt8.getChunks().add(world.getChunk(0, 0, 0)); pkt8.setInitChunk(true); channel.writeAndFlush(pkt8); @@ -107,7 +106,7 @@ public class LoginHandler extends AbstractStateHandler implements LoginStateHand playerData.setPing(0); playerData.setHasDisplayName(true); playerData.setDisplayName(Text.builder() - .append(Text.of(TextColor.RED, TextStyle.BOLD, player.getName().substring(0,1))) + .append(Text.of(TextColor.RED, TextStyle.BOLD, player.getName().substring(0, 1))) .append(Text.of(TextColor.WHITE, player.getName().substring(1))) .build() ); diff --git a/settings.gradle b/settings.gradle index b865638..bc4f7bf 100644 --- a/settings.gradle +++ b/settings.gradle @@ -6,4 +6,4 @@ include('vanilla_commands') include('proto_1.12.2') // Protocol 1.12.2 include('proto_1.12.2_netty') // Protocol 1.12.2 (Netty impl.) include('generated_world') - +include('event-loop') \ No newline at end of file