Archived
0

Merged in anarok/merge/loop-3 (pull request #7)

Anarok/merge/loop 3
This commit is contained in:
Daniil Glazko
2018-08-09 20:49:46 +00:00
committed by Forwolk
43 changed files with 1580 additions and 107 deletions

View File

@@ -35,6 +35,8 @@ subprojects {
/* Components */
compile (group: 'org.projectlombok', name: 'lombok', version: '1.16.16')
compile 'com.flowpowered:flow-nbt:1.0.0' //Named Binary Tags
testCompile 'junit:junit:4.12'
}
task copyDep(type: Copy) {

View File

@@ -1,90 +1,14 @@
package mc.core;
import mc.core.player.ILook;
import lombok.AllArgsConstructor;
import lombok.Data;
import mc.core.player.Look;
import mc.core.world.World;
import java.io.Serializable;
import java.util.Objects;
public class WarpPosition extends Location implements Serializable, ILook {
private ILook look;
public WarpPosition(double x, double y, double z, World world) {
super(x, y, z, world);
}
public WarpPosition(double x, double y, double z, float yaw, float pitch, World world) {
super(x, y, z, world);
this.look = new Look(yaw, pitch);
}
public WarpPosition(double x, double y, double z) {
super(x, y, z);
}
public WarpPosition(double x, double y, double z, float yaw, float pitch) {
super(x, y, z);
this.look = new Look(yaw, pitch);
}
public WarpPosition(long compactValue) {
super(compactValue);
}
public WarpPosition(Location location) {
super(location.getX(), location.getY(), location.getZ());
}
public WarpPosition(Location location, Look look) {
super(location.getX(), location.getY(), location.getZ());
this.look = look;
}
public WarpPosition(long compactValue, World world) {
super(compactValue, world);
}
@Override
public void set(Look look) {
this.look = look;
}
@Override
public float getYaw() {
return this.look.getYaw();
}
@Override
public void setYaw(float yaw) {
this.look.setYaw(yaw);
}
@Override
public float getPitch() {
return this.look.getPitch();
}
@Override
public void setPitch(float pitch) {
this.look.setPitch(pitch);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
WarpPosition that = (WarpPosition) o;
return Objects.equals(look, that.look);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), look);
}
public boolean hasLook() {
return look != null;
}
@Data
@AllArgsConstructor
public class WarpPosition implements Serializable {
private Location location;
private Look look;
}

View File

@@ -1,15 +0,0 @@
package mc.core.player;
import java.io.Serializable;
public interface ILook extends Serializable {
void set(Look look);
float getYaw();
float getPitch();
void setYaw(float yaw);
void setPitch(float pitch);
}

View File

@@ -7,12 +7,13 @@ package mc.core.player;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
@Data
@AllArgsConstructor
public class Look implements ILook {
public class Look implements Serializable{
private float yaw, pitch;
@Override
public void set(Look look) {
this.yaw = look.yaw;
this.pitch = look.pitch;

7
event-loop/TODO Normal file
View File

@@ -0,0 +1,7 @@
- Система иерархических блокировок ресурсов (чанки в мире)
- Нужно что-то делать с подгрузкой отсутсвующих чанков для таких блокировок
- Возможность вызвать событие из EventHandler
- Performance Monitor
- Возможная проблема с переполнением очереди при спаме пакетами от игрока
- Добавить поля с замками для ресурсов (Player, World, Chunk)
- Time Scheduler

15
event-loop/build.gradle Normal file
View File

@@ -0,0 +1,15 @@
group 'mc'
version '1.0-SNAPSHOT'
dependencies {
/* Core */
compile_excludeCopy project(':core')
testCompile group: 'org.slf4j', name: 'slf4j-simple', version: '1.6.1'
testCompile group: 'com.carrotsearch', name: 'junit-benchmarks', version: '0.7.0'
}
test {
exclude "ru/core/events/*Benchmark.class"
}

View File

@@ -0,0 +1,113 @@
package mc.core.events;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import mc.core.events.api.EventQueueOwner;
import mc.core.events.api.LockableResource;
import mc.core.events.runner.lock.LockObserveList;
import mc.core.events.runner.ResourceAwareExecutorService;
import mc.core.events.runner.ResourceAwareRunnable;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
/**
* Holds processing pipeline for every event
* that enters {@link FullAsyncEventLoop}.
* <p>
* Ensures that EventHandlers will never be called in a wrong
* order by feeding only one task at a time to the {@link ResourceAwareExecutorService}
*/
@RequiredArgsConstructor
@Getter
@Slf4j
public class EventPipelineTask {
private final ResourceAwareExecutorService service;
private final List<RegisteredEventHandler> handlers;
private final FullAsyncEventLoop manager;
private final Event event;
private final EventQueueOwner owner;
private int currentIndex = 0;
@Setter
private PipelineState state = PipelineState.IDLE;
public void next() {
if (updatePipelineState()) return;
RegisteredEventHandler handler = handlers.get(currentIndex);
// If event has been already cancelled and current handler
// ignores cancelled events
if (event.isCanceled() && handler.isIgnoreCancelled()) {
// Just skip current event handler
currentIndex++;
next();
} else {
feedTask(handler);
}
}
/**
* Update current pipeline status
*
* @return true if pipeline has been completed
*/
private boolean updatePipelineState() {
if (state == PipelineState.IDLE) {
state = PipelineState.WORKING;
}
if (currentIndex >= handlers.size() && state == PipelineState.WORKING) {
state = PipelineState.FINISHED;
manager.update(owner);
return true;
}
if (state == PipelineState.FINISHED) {
throw new IllegalStateException("Attempted to call next step on a FINISHED pipeline");
}
return false;
}
private void feedTask(RegisteredEventHandler handler) {
LockObserveList locks = getLocks(handler);
service.addTask(new ResourceAwareRunnable() {
@Override
public void run() {
try {
handler.getMethod().invoke(handler.getObject(), event);
} catch (IllegalAccessException | InvocationTargetException e) {
log.error("Unable to dispatch event " + event.getClass().getSimpleName() + " to handler " + event.getClass().getName(), e);
}
}
@Override
public void after() {
currentIndex++;
next();
}
@Override
public LockObserveList getLocks() {
return locks;
}
});
}
private LockObserveList getLocks(RegisteredEventHandler handler) {
LockObserveList locks = new LockObserveList();
if (handler.isPluginSynchronize())
locks.add(manager.getResourceManager().getPluginLock(handler.getPlugin()));
for (LockableResource resource : handler.getLock()) {
locks.addAll(manager.getResourceManager().getAnnotationLocks(resource, event));
}
return locks;
}
public enum PipelineState {
IDLE, WORKING, FINISHED
}
}

View File

@@ -0,0 +1,121 @@
package mc.core.events;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import mc.core.events.api.EventHandler;
import mc.core.events.api.EventQueueOwner;
import mc.core.events.api.Plugin;
import mc.core.events.runner.ResourceAwareExecutorService;
import org.springframework.beans.factory.annotation.Autowired;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* Event loop core. Manages event handler registration process,
* maintains event queues.
* <p>
* This event loop guarantees that events, assigned to the {@link EventQueueOwner}
* will be handler in order of scheduling
*/
@Slf4j
public class FullAsyncEventLoop {
// Item leaves this queue only when EventPipeline is fully executed
private Map<EventQueueOwner, Queue<EventPipelineTask>> eventQueue = new ConcurrentHashMap<>();
private Map<Class<? extends Event>, List<RegisteredEventHandler>> registeredHandlers = new HashMap<>();
@SuppressWarnings("SpringJavaAutowiredMembersInspection")
@Autowired
@Setter
private ResourceAwareExecutorService resourceAwareExecutorService;
@Getter
private SharedResourceManager resourceManager = new SharedResourceManager();
public void addEventHandler(Plugin plugin, Object object) {
Map<Method, EventHandler> candidates = getEventHandlerCandidates(object);
for (Map.Entry<Method, EventHandler> pair : candidates.entrySet()) {
@SuppressWarnings("unchecked") Class<? extends Event> eventType = (Class<? extends Event>) pair.getKey().getParameterTypes()[0];
List<RegisteredEventHandler> handlers = this.registeredHandlers.computeIfAbsent(eventType, e -> new ArrayList<>());
handlers.add(new RegisteredEventHandler(plugin, object, pair.getKey(), pair.getValue().lock(), pair.getValue().pluginSynchronize(), pair.getValue().priority().getValue(), pair.getValue().ignoreCancelled()));
handlers.sort(Comparator.comparingInt(RegisteredEventHandler::getPriority));
}
}
public List<RegisteredEventHandler> getPipelineForEvent(Event event) {
return registeredHandlers.get(event.getClass());
}
private Map<Method, EventHandler> getEventHandlerCandidates(Object object) {
Map<Method, EventHandler> candidates;
candidates = new HashMap<>();
for (Method method : object.getClass().getDeclaredMethods()) {
EventHandler annotation = method.getAnnotation(EventHandler.class);
if (annotation == null)
continue;
if (!Modifier.isPublic(method.getModifiers())) {
log.error("Unable to register {} as an EventHandler. Method must have a 'public' access modifier.", method.toString());
continue;
}
if (method.getParameterCount() != 1) {
log.error("Unable to register {} as an EventHandler. Method must have exactly one argument.", method.toString());
continue;
}
Class<?> firstParamType = method.getParameterTypes()[0];
if (!Event.class.isAssignableFrom(firstParamType)) {
log.error("Unable to register {} as an EventHandler. First parameter type must implement 'Event' interface.", method.toString());
continue;
}
method.setAccessible(true);
candidates.put(method, annotation);
}
return candidates;
}
public void asyncFireEvent(EventQueueOwner owner, Event event) {
List<RegisteredEventHandler> handlers = getPipelineForEvent(event);
if (handlers == null)
return;
Queue<EventPipelineTask> queue = eventQueue.computeIfAbsent(owner, s -> new ArrayDeque<>());
queue.add(new EventPipelineTask(resourceAwareExecutorService, handlers, this, event, owner));
update(owner);
}
/**
* Updates queue state for a given owner:
* <p>
* - Removes first element of a queue if it is marked as FINISHED
* - Starts executing first pipeline from the queue if it is marked with IDLE
*
* @param owner queue owner
*/
public synchronized void update(EventQueueOwner owner) {
if (!eventQueue.containsKey(owner)) {
log.warn("Unable to update pipeline executor: unable to find queue");
return;
}
Queue<EventPipelineTask> queue = eventQueue.get(owner);
if (queue.isEmpty()) {
log.warn("Unable to update pipeline executor: queue is empty");
return;
}
if (queue.peek().getState() == EventPipelineTask.PipelineState.FINISHED) {
queue.poll();
}
EventPipelineTask pipeline;
if ((pipeline = queue.peek()) != null
&& pipeline.getState() == EventPipelineTask.PipelineState.IDLE) {
pipeline.next();
}
}
}

View File

@@ -0,0 +1,24 @@
package mc.core.events;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import mc.core.events.api.LockableResource;
import mc.core.events.api.Plugin;
import java.lang.reflect.Method;
/**
* Holds all the information necessary to register an
* event handler in an event loop
*/
@RequiredArgsConstructor
@Getter
public class RegisteredEventHandler {
private final Plugin plugin;
private final Object object;
private final Method method;
private final LockableResource[] lock;
private final boolean pluginSynchronize;
private final int priority;
private final boolean ignoreCancelled;
}

View File

@@ -0,0 +1,64 @@
package mc.core.events;
import lombok.extern.slf4j.Slf4j;
import mc.core.Location;
import mc.core.events.api.LockableResource;
import mc.core.events.api.Plugin;
import mc.core.events.api.interfaces.LocationProvidingEvent;
import mc.core.events.api.interfaces.PlayerProvidingEvent;
import mc.core.events.api.interfaces.WorldProvidingEvent;
import mc.core.events.runner.lock.PoorMansLock;
import mc.core.player.Player;
import mc.core.world.World;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Slf4j
public class SharedResourceManager {
private Map<Plugin, PoorMansLock> pluginLocks = new ConcurrentHashMap<>();
// TODO: Memory leak HERE. Fix with introducing field to Player class
private Map<Player, PoorMansLock> playerLocks = new ConcurrentHashMap<>();
// TODO: Memory leak HERE. Fix with introducing field to World class
private Map<World, PoorMansLock> worldLocks = new ConcurrentHashMap<>();
public PoorMansLock getPluginLock(Plugin plugin) {
return pluginLocks.computeIfAbsent(plugin, s -> new PoorMansLock());
}
public PoorMansLock getPlayerLock(Player player) {
return playerLocks.computeIfAbsent(player, s -> new PoorMansLock());
}
public PoorMansLock getWorldLock(World world) {
return worldLocks.computeIfAbsent(world, s -> new PoorMansLock());
}
private <T> T require(LockableResource resource, Event event, Class<T> inter) {
if (inter.isInstance(event)) {
//noinspection unchecked
return (T) event;
} else
throw new IllegalArgumentException("Unable to lock " + resource + " while attempting to process event. Event " + event.getClass().getSimpleName() + " must implement " + inter);
}
public Collection<PoorMansLock> getAnnotationLocks(LockableResource resource, Event event) {
switch (resource) {
case PLAYER:
return require(resource, event, PlayerProvidingEvent.class).getAssociatedPlayers().stream().map(this::getPlayerLock).collect(Collectors.toList());
case PLAYER_WORLD:
return require(resource, event, PlayerProvidingEvent.class).getAssociatedPlayers().stream().map(s -> s.getLocation().getWorld()).map(this::getWorldLock).collect(Collectors.toList());
case EVENT_LOCATION_WORLD:
return require(resource, event, LocationProvidingEvent.class).getAssociatedLocations().stream().map(Location::getWorld).map(this::getWorldLock).collect(Collectors.toList());
case EVENT_WORLD:
return require(resource, event, WorldProvidingEvent.class).getAssociatedWorlds().stream().map(this::getWorldLock).collect(Collectors.toList());
default:
log.warn("Unable to find action for " + resource + " resource definition.");
return Collections.emptyList();
}
}
}

View File

@@ -0,0 +1,17 @@
package mc.core.events.api;
import java.lang.annotation.*;
@Documented
@Target(ElementType.METHOD)
@Inherited
@Retention(RetentionPolicy.RUNTIME)
public @interface EventHandler {
EventPriority priority() default EventPriority.NORMAL;
boolean ignoreCancelled() default false;
boolean pluginSynchronize() default true;
LockableResource[] lock() default {};
}

View File

@@ -0,0 +1,19 @@
package mc.core.events.api;
import lombok.Getter;
public enum EventPriority {
LOWEST(0),
LOW(1),
NORMAL(2),
HIGH(3),
HIGHEST(4),
MONITOR(5);
@Getter
private int value;
EventPriority(int value) {
this.value = value;
}
}

View File

@@ -0,0 +1,4 @@
package mc.core.events.api;
public interface EventQueueOwner {
}

View File

@@ -0,0 +1,10 @@
package mc.core.events.api;
public enum LockableResource {
PLAYER,
PLAYER_WORLD,
EVENT_LOCATION_WORLD,
EVENT_WORLD
// TODO: Add entity-related constants
}

View File

@@ -0,0 +1,4 @@
package mc.core.events.api;
public interface Plugin {
}

View File

@@ -0,0 +1,9 @@
package mc.core.events.api.interfaces;
import mc.core.Location;
import java.util.Collection;
public interface LocationProvidingEvent {
Collection<Location> getAssociatedLocations();
}

View File

@@ -0,0 +1,21 @@
package mc.core.events.api.interfaces;
import mc.core.Location;
import mc.core.player.Player;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
public interface PlayerProvidingEvent extends LocationProvidingEvent {
List<Player> getAssociatedPlayers();
@Override
default Collection<Location> getAssociatedLocations() {
List<Player> players = getAssociatedPlayers();
if (players.size() == 1)
return Collections.singletonList(players.get(0).getLocation());
else
throw new RuntimeException("This method is not implemented.");
}
}

View File

@@ -0,0 +1,10 @@
package mc.core.events.api.interfaces;
import mc.core.world.World;
import java.util.Collection;
public interface WorldProvidingEvent {
Collection<World> getAssociatedWorlds();
}

View File

@@ -0,0 +1,31 @@
package mc.core.events.api.samples;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import mc.core.Location;
import mc.core.events.EventBase;
import mc.core.events.api.interfaces.LocationProvidingEvent;
import mc.core.events.api.interfaces.PlayerProvidingEvent;
import mc.core.player.Player;
import mc.core.world.block.Block;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@RequiredArgsConstructor
@Getter
public class BlockBreakEvent extends EventBase implements PlayerProvidingEvent, LocationProvidingEvent {
private final Player player;
private final Block block;
@Override
public List<Player> getAssociatedPlayers() {
return Collections.singletonList(player);
}
@Override
public Collection<Location> getAssociatedLocations() {
return Collections.singletonList(block.getLocation());
}
}

View File

@@ -0,0 +1,59 @@
package mc.core.events.runner;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
/**
* Simple scheduling strategy.
* <p>
* We wait until the first task in a queue will be able to acquire all
* the necessary resources and then we schedule it for execution
*/
public class AllInScheduleStrategy implements ScheduleStrategy {
private BlockingQueue<ResourceAwareRunnable> globalQueue;
private ResourceAwareExecutorService resourceAwareExecutorService;
public AllInScheduleStrategy(ResourceAwareExecutorService resourceAwareExecutorService) {
this.globalQueue = resourceAwareExecutorService.queue;
this.resourceAwareExecutorService = resourceAwareExecutorService;
}
@Override
public synchronized ResourceAwareRunnable getTask() throws InterruptedException {
waitForResourceLockComplete();
// Wait for new task in queue
ResourceAwareRunnable runnable = globalQueue.take();
while (!runnable.getLocks().isReady()) {
CountDownLatch latch = new CountDownLatch(1);
runnable.getLocks().setCallback(latch::countDown);
// Prevent situations where dependencies were resolved
// while we were setting up the callback
if (runnable.getLocks().isReady())
continue;
latch.await();
}
// Lock execution for the next thread
// (wait until resources for previous task will be blocked)
resourceAwareExecutorService.waitForLock.set(true);
return runnable;
}
/**
* Waits until the last scheduled task will lock all the necessary resources.
* <p>
* It is required to avoid race-condition when an execution candidate task (first task in a queue)
* skips lock-await procedure due to the last scheduled task not having locked necessary resources yet.
*
* @throws InterruptedException if current thread is interrupted
*/
private void waitForResourceLockComplete() throws InterruptedException {
synchronized (resourceAwareExecutorService.waitForLock) {
while (resourceAwareExecutorService.waitForLock.get()) {
resourceAwareExecutorService.wait();
}
}
}
}

View File

@@ -0,0 +1,55 @@
package mc.core.events.runner;
/**
* Worker thread for {@link ResourceAwareExecutorService}.
* <p>
* - Awaits for tasks from {@link ScheduleStrategy}
* - Locks up resources for this task
* - Notifies {@link ScheduleStrategy} when resource-locking procedure is complete
* - Executes the runnable in this thread
* - Unlocks all the resources
* - Calls {@link ResourceAwareRunnable#after()} callback
*/
public class ExecutorWorkerThread extends Thread {
private ResourceAwareExecutorService service;
public ExecutorWorkerThread(String name, ResourceAwareExecutorService service) {
super(name);
this.service = service;
}
@Override
public void run() {
while (!isInterrupted() && isAlive()) {
ResourceAwareRunnable runnable;
try {
runnable = service.getStrategy().getTask();
} catch (InterruptedException e) {
return;
}
executeTask(runnable);
}
}
void executeTask(ResourceAwareRunnable runnable) {
runnable.getLocks().lockAll();
notifyLockingDone();
try {
runnable.run();
} finally {
runnable.getLocks().unlockAll();
runnable.getLocks().release();
}
runnable.after();
}
private void notifyLockingDone() {
synchronized (service.waitForLock) {
if (service.waitForLock.get()) {
service.waitForLock.set(false);
service.waitForLock.notifyAll();
}
}
}
}

View File

@@ -0,0 +1,74 @@
package mc.core.events.runner;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Custom implementation of an ExecutorService.
*
* Holds a queue of {@link ResourceAwareRunnable} and executes them in a thread pool.
*
* Warning! This class doesn't guarantee, that tasks will be executed in any specific order.
* In fact, it's up to {@link ScheduleStrategy} to decide which task will be scheduled for
* execution next.
*/
public class ResourceAwareExecutorService {
private static final boolean WORKER_INSTANT_EXECUTE = false;
BlockingQueue<ResourceAwareRunnable> queue = new ArrayBlockingQueue<>(100);
// A synchronize aid, that prevents ScheduleStrategy from returning
// wrong tasks when executor is late in blocking resources
final AtomicBoolean waitForLock = new AtomicBoolean(false);
private ScheduleStrategy strategy = new AllInScheduleStrategy(this);
private Set<Thread> executorThreads = new HashSet<>();
private int threadCount;
public ResourceAwareExecutorService(int threadCount) {
this.threadCount = threadCount;
}
public void start() {
if (executorThreads.size() > 0)
throw new RuntimeException("This executor service was already started.");
for (int i = 0; i < threadCount; i++) {
Thread thread = new ExecutorWorkerThread("Event Loop #" + i, this);
executorThreads.add(thread);
thread.start();
}
}
public void stop() {
if (executorThreads.size() == 0)
throw new RuntimeException("This executor service was not initialized yet.");
Iterator<Thread> iterator = executorThreads.iterator();
while (iterator.hasNext()) {
Thread thread = iterator.next();
thread.interrupt();
iterator.remove();
}
}
public void addTask(ResourceAwareRunnable task) {
if (WORKER_INSTANT_EXECUTE && Thread.currentThread() instanceof ExecutorWorkerThread) {
((ExecutorWorkerThread) Thread.currentThread()).executeTask(task);
} else
queue.offer(task);
}
public ScheduleStrategy getStrategy() {
return strategy;
}
private class DefaultScheduleStrategy implements ScheduleStrategy {
public ResourceAwareRunnable getTask() throws InterruptedException {
return queue.take();
}
}
}

View File

@@ -0,0 +1,13 @@
package mc.core.events.runner;
import mc.core.events.runner.lock.LockObserveList;
public interface ResourceAwareRunnable extends Runnable {
default LockObserveList getLocks() {
return LockObserveList.EMPTY_LIST;
}
default void after() {
}
}

View File

@@ -0,0 +1,5 @@
package mc.core.events.runner;
public interface ScheduleStrategy {
ResourceAwareRunnable getTask() throws InterruptedException;
}

View File

@@ -0,0 +1,61 @@
package mc.core.events.runner.lock;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
public class LockObserveList implements Consumer<PoorMansLock> {
public static LockObserveList EMPTY_LIST = new LockObserveList();
private List<PoorMansLock> locks = new ArrayList<>();
private Runnable callback;
public void setCallback(Runnable callback) {
this.callback = callback;
}
public void add(PoorMansLock lock) {
locks.add(lock);
lock.addCallback(this);
}
public void addAll(Iterable<PoorMansLock> locks) {
for (PoorMansLock lock : locks)
add(lock);
}
public void release() {
callback = null;
for (PoorMansLock lock : locks) {
lock.removeCallback(this);
}
locks.clear();
}
public boolean isReady() {
for (PoorMansLock lock : locks) {
if (lock.isLocked())
return false;
}
return true;
}
public void lockAll() {
for (PoorMansLock lock : locks)
lock.lock();
}
public void unlockAll() {
for (PoorMansLock lock : locks)
lock.unlock();
}
@Override
public void accept(PoorMansLock lock) {
if (!lock.isLocked()) {
if (isReady()) {
if (callback != null)
callback.run();
}
}
}
}

View File

@@ -0,0 +1,52 @@
package mc.core.events.runner.lock;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;
public class PoorMansLock {
private Thread owner = null;
private Set<Consumer<PoorMansLock>> callbacks = new CopyOnWriteArraySet<>();
public void addCallback(Consumer<PoorMansLock> callback) {
callbacks.add(callback);
}
public void removeCallback(Consumer<PoorMansLock> callback) {
callbacks.remove(callback);
}
public boolean isLocked() {
return owner != null;
}
private void triggerUpdate() {
for (Consumer<PoorMansLock> consumer : callbacks)
consumer.accept(this);
}
public synchronized void lock() {
if(owner == Thread.currentThread())
return;
if (owner != null) {
throw new RuntimeException("Unable to lock this resource: already in use");
}
owner = Thread.currentThread();
triggerUpdate();
}
public synchronized void unlock() {
if (owner == null)
return;
if (owner != Thread.currentThread()) {
throw new RuntimeException("Attempt to unlock resource from non-owning thread");
}
owner = null;
triggerUpdate();
}
}

View File

@@ -0,0 +1,45 @@
package mc.core.timings;
import java.util.Stack;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadTimings {
private static AtomicInteger IDS = new AtomicInteger();
private int threadId;
private Stack<Timings> stack = new Stack<>();
public ThreadTimings() {
this.threadId = IDS.getAndIncrement();
}
public Stack<Timings> getStack() {
return stack;
}
public int getThreadId() {
return threadId;
}
public Timings start() {
Timings timings = new Timings(this, stack.size());
getTimingsManager().waitForTimingsInitialize();
stack.push(timings);
getTimingsManager().notifyTimings(this, timings, true);
return timings;
}
private TimingsManager getTimingsManager() {
return Timings.getTimingsManager();
}
public void end(Timings finished) {
Timings timings = null;
while (!stack.isEmpty() && timings != finished) {
getTimingsManager().waitForTimingsInitialize();
timings = stack.pop();
if (!timings.hasFinished())
timings.finish();
getTimingsManager().notifyTimings(this, timings, false);
}
}
}

View File

@@ -0,0 +1,51 @@
package mc.core.timings;
public class Timings implements AutoCloseable {
private ThreadTimings threadTimings;
private long acquireTime;
@SuppressWarnings("FieldCanBeLocal")
private long endTime = -1;
private int id;
public Timings(ThreadTimings threadTimings, int id) {
this.id = id;
this.threadTimings = threadTimings;
this.acquireTime = System.nanoTime();
}
public static Timings start() {
return TimingsStaticAccessor.getTimingsManager().getCurrentThreadTimings().start();
}
public static TimingsManager getTimingsManager() {
return TimingsStaticAccessor.getTimingsManager();
}
public int getId() {
return id;
}
public long getEndTime() {
return endTime;
}
public long getAcquireTime() {
return acquireTime;
}
public boolean hasFinished() {
return endTime != -1;
}
public void finish() {
if (hasFinished())
throw new IllegalStateException("This timing was already finished");
this.endTime = System.nanoTime();
}
@Override
public void close() {
finish();
this.threadTimings.end(this);
}
}

View File

@@ -0,0 +1,17 @@
package mc.core.timings;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public enum TimingsEventType {
TIMINGS_START((short) 0),
TIMINGS_END((short) 1),
TIMINGS_FILE_INITIALIZING((short) 2),
TIMINGS_FILE_INITIALIZED((short) 3),
TIMINGS_CHANGE_THREAD_OPTIONS((short) 4),
TIMINGS_FILE_END((short) 5);
@Getter
private final short id;
}

View File

@@ -0,0 +1,161 @@
package mc.core.timings;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import mc.core.timings.io.DefaultWriterFactory;
import mc.core.timings.io.TimingsWriter;
import mc.core.timings.io.TimingsWriterFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class TimingsManager {
private final Map<Thread, ThreadTimings> threadTimings = new ConcurrentHashMap<>();
// These variables are essential in Timings thread synchronization
private final AtomicBoolean waitForFile = new AtomicBoolean(false);
private TimingsWriter writer;
private Thread timingsIoThread;
private CountDownLatch ioThreadStopMutex;
private BlockingQueue<TimingsRecord> queue;
private ReentrantLock queueAccessLock = new ReentrantLock();
// For modularity purposes
@Autowired
@Setter
private TimingsWriterFactory writerFactory = new DefaultWriterFactory();
public TimingsManager() {
TimingsStaticAccessor.TIMINGS_MANAGER = this;
}
public void startRecording(File file) {
synchronized (waitForFile) {
waitForFile.set(true);
}
try {
writer = writerFactory.newInstance(file);
writer.writeEvent(0, 0, System.nanoTime(), TimingsEventType.TIMINGS_FILE_INITIALIZING);
// Synchronize current thread state
for (Map.Entry<Thread, ThreadTimings> pair : threadTimings.entrySet()) {
writer.writeEvent(pair.getValue().getThreadId(), 0, System.nanoTime(), TimingsEventType.TIMINGS_CHANGE_THREAD_OPTIONS, "name: " + pair.getKey().getName());
for (Timings timings : pair.getValue().getStack()) {
writer.writeEvent(pair.getValue().getThreadId(), timings.getId(), timings.getAcquireTime(), TimingsEventType.TIMINGS_START);
}
}
writer.writeEvent(0, 0, System.nanoTime(), TimingsEventType.TIMINGS_FILE_INITIALIZED);
queue = new ArrayBlockingQueue<>(200);
ioThreadStopMutex = new CountDownLatch(1);
timingsIoThread = new Thread() {
@Override
public void run() {
try {
while (!isInterrupted() && isAlive()) {
TimingsRecord record;
try {
if (queue == null)
return;
record = queue.take();
} catch (InterruptedException e) {
return;
}
record.writeToFile(writer);
}
} finally {
ioThreadStopMutex.countDown();
}
}
};
timingsIoThread.setName("Timings IO thread");
timingsIoThread.start();
} catch (Exception e) {
log.error("Unable to start timings recording", e);
}
synchronized (waitForFile) {
waitForFile.set(false);
waitForFile.notifyAll();
}
}
public void stopRecording() {
// Disable write queue
queueAccessLock.lock();
queue = null;
queueAccessLock.unlock();
// Interrupt thread and wait until in finished writing the last task
timingsIoThread.interrupt();
try {
ioThreadStopMutex.await();
} catch (InterruptedException e) {
log.error("Unable to wait until last record would be written to file", e);
}
// Write EOF event
writer.writeEvent(0, 0, System.nanoTime(), TimingsEventType.TIMINGS_FILE_END);
// Unload file
try {
writer.close();
} catch (IOException e) {
log.error("Unable to close timings file", e);
}
writer = null;
}
void notifyTimings(ThreadTimings thread, Timings timings, boolean start) {
if (queue == null)
return;
queueAccessLock.lock();
try {
if (queue != null)
queue.offer(
new TimingsRecord(thread.getThreadId(),
timings.getId(),
start ? timings.getAcquireTime() : timings.getEndTime(),
start ? TimingsEventType.TIMINGS_START : TimingsEventType.TIMINGS_END
)
);
} finally {
queueAccessLock.unlock();
}
}
void waitForTimingsInitialize() {
synchronized (waitForFile) {
while (waitForFile.get()) {
try {
waitForFile.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public ThreadTimings getCurrentThreadTimings() {
synchronized (this.threadTimings) {
if (this.threadTimings.containsKey(Thread.currentThread())) {
return this.threadTimings.get(Thread.currentThread());
} else {
ThreadTimings timings = new ThreadTimings();
this.threadTimings.put(Thread.currentThread(), timings);
if (queue != null) {
try {
writer.writeEvent(timings.getThreadId(), 0, System.nanoTime(), TimingsEventType.TIMINGS_CHANGE_THREAD_OPTIONS, "name: " + Thread.currentThread().getName());
} catch (NullPointerException ignored) {
// It means that there the file recording was stopped
// we don't actually care about it
}
}
return timings;
}
}
}
}

View File

@@ -0,0 +1,33 @@
package mc.core.timings;
import mc.core.timings.io.TimingsWriter;
class TimingsRecord {
private int threadId;
private int stackId;
private long time;
private TimingsEventType eventType;
private String data;
public TimingsRecord(int threadId, int stackId, long time, TimingsEventType eventType) {
this.threadId = threadId;
this.stackId = stackId;
this.time = time;
this.eventType = eventType;
}
public TimingsRecord(int threadId, int stackId, long time, TimingsEventType eventType, String data) {
this.threadId = threadId;
this.stackId = stackId;
this.time = time;
this.eventType = eventType;
this.data = data;
}
public void writeToFile(TimingsWriter fileWriter) {
if (data == null)
fileWriter.writeEvent(threadId, stackId, time, eventType);
else
fileWriter.writeEvent(threadId, stackId, time, eventType, data);
}
}

View File

@@ -0,0 +1,9 @@
package mc.core.timings;
public class TimingsStaticAccessor {
static TimingsManager TIMINGS_MANAGER;
public static TimingsManager getTimingsManager() {
return TIMINGS_MANAGER != null ? TIMINGS_MANAGER : (TIMINGS_MANAGER = new TimingsManager());
}
}

View File

@@ -0,0 +1,11 @@
package mc.core.timings.io;
import java.io.File;
import java.io.IOException;
public class DefaultWriterFactory implements TimingsWriterFactory {
@Override
public TimingsWriter newInstance(File file) throws IOException {
return new TimingsFileWriter(file);
}
}

View File

@@ -0,0 +1,63 @@
package mc.core.timings.io;
import lombok.extern.slf4j.Slf4j;
import mc.core.timings.TimingsEventType;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.concurrent.locks.ReentrantLock;
@SuppressWarnings("Duplicates")
@Slf4j
public class TimingsFileWriter implements TimingsWriter {
private FileOutputStream fileOutputStream;
private ObjectOutputStream writer;
private ReentrantLock lock = new ReentrantLock();
public TimingsFileWriter(File saveFile) throws IOException {
fileOutputStream = new FileOutputStream(saveFile);
writer = new ObjectOutputStream(fileOutputStream);
}
@Override
public void writeEvent(int threadId, int stackId, long time, TimingsEventType type) {
lock.lock();
try {
writer.writeInt(threadId);
writer.writeInt(stackId);
writer.writeLong(time);
writer.writeShort(type.getId());
writer.writeBoolean(false);
} catch (IOException e) {
log.error("Unable to write timings record", e);
} finally {
lock.unlock();
}
}
@Override
public void writeEvent(int threadId, int stackId, long time, TimingsEventType type, String data) {
lock.lock();
try {
writer.writeInt(threadId);
writer.writeInt(stackId);
writer.writeLong(time);
writer.writeShort(type.getId());
writer.writeBoolean(true);
writer.writeUTF(data);
} catch (IOException e) {
log.error("Unable to write timings record", e);
} finally {
lock.unlock();
}
}
@Override
public void close() throws IOException {
writer.close();
fileOutputStream.close();
}
}

View File

@@ -0,0 +1,24 @@
package mc.core.timings.io;
import lombok.extern.slf4j.Slf4j;
import mc.core.timings.TimingsEventType;
import java.io.IOException;
@Slf4j
public class TimingsLogWriter implements TimingsWriter {
@Override
public void writeEvent(int threadId, int stackId, long time, TimingsEventType type) {
log.info("[{}] Thread #{}, Stack #{}: {}", time, threadId, stackId, type.toString());
}
@Override
public void writeEvent(int threadId, int stackId, long time, TimingsEventType type, String data) {
log.info("[{}] Thread #{}, Stack #{}: {} ({})", time, threadId, stackId, type.toString(), data);
}
@Override
public void close() throws IOException {
}
}

View File

@@ -0,0 +1,13 @@
package mc.core.timings.io;
import mc.core.timings.TimingsEventType;
import java.io.IOException;
public interface TimingsWriter {
void writeEvent(int threadId, int stackId, long time, TimingsEventType type);
void writeEvent(int threadId, int stackId, long time, TimingsEventType type, String data);
void close() throws IOException;
}

View File

@@ -0,0 +1,8 @@
package mc.core.timings.io;
import java.io.File;
import java.io.IOException;
public interface TimingsWriterFactory {
TimingsWriter newInstance(File file) throws IOException;
}

View File

@@ -0,0 +1,29 @@
package mc.core.events;
import mc.core.events.runner.ResourceAwareExecutorService;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class EventExecutorTest {
@Test
public void basicTest() throws InterruptedException {
AtomicBoolean testVariable = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
ResourceAwareExecutorService service = new ResourceAwareExecutorService(1);
service.start();
service.addTask(() -> {
testVariable.set(true);
latch.countDown();
});
latch.await(1, TimeUnit.SECONDS);
service.stop();
Assert.assertTrue("Scheduled task was not executed", testVariable.get());
}
}

View File

@@ -0,0 +1,169 @@
package mc.core.events;
import mc.core.events.api.EventHandler;
import mc.core.events.api.EventPriority;
import mc.core.events.api.EventQueueOwner;
import mc.core.events.api.Plugin;
import mc.core.events.runner.ResourceAwareExecutorService;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@SuppressWarnings("Duplicates")
public class EventLoopTest {
@Test
public void basicTest() throws InterruptedException {
Plugin plugin = new Plugin() {
};
EventQueueOwner queueOwner = new EventQueueOwner() {
};
CountDownLatch latch = new CountDownLatch(1);
FullAsyncEventLoop eventLoop = new FullAsyncEventLoop();
eventLoop.addEventHandler(plugin, new Object() {
@EventHandler
public void onLoginEvent(LoginEvent event) {
latch.countDown();
}
});
ResourceAwareExecutorService service = new ResourceAwareExecutorService(1);
service.start();
eventLoop.setResourceAwareExecutorService(service);
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
latch.await(1, TimeUnit.SECONDS);
Assert.assertEquals("Event was not called", 0, latch.getCount());
}
@Test
public void consecutiveExecutionTest() throws InterruptedException {
Plugin plugin = new Plugin() {
};
EventQueueOwner queueOwner = new EventQueueOwner() {
};
CountDownLatch latch = new CountDownLatch(2);
FullAsyncEventLoop eventLoop = new FullAsyncEventLoop();
eventLoop.addEventHandler(plugin, new Object() {
@EventHandler
public void onLoginEvent(LoginEvent event) {
latch.countDown();
}
});
ResourceAwareExecutorService service = new ResourceAwareExecutorService(1);
service.start();
eventLoop.setResourceAwareExecutorService(service);
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
latch.await(1, TimeUnit.SECONDS);
Assert.assertEquals("Event was not called", 0, latch.getCount());
}
@Test
public void prioritySystemTest() throws InterruptedException {
Plugin plugin = new Plugin() {
};
EventQueueOwner queueOwner = new EventQueueOwner() {
};
CountDownLatch latch = new CountDownLatch(3);
FullAsyncEventLoop eventLoop = new FullAsyncEventLoop();
List<Integer> priorities = new ArrayList<>(3);
eventLoop.addEventHandler(plugin, new Object() {
@EventHandler(priority = EventPriority.NORMAL)
public void login1(LoginEvent event) {
priorities.add(0);
latch.countDown();
}
@EventHandler(priority = EventPriority.HIGHEST)
public void login2(LoginEvent event) {
priorities.add(1);
latch.countDown();
}
@EventHandler(priority = EventPriority.LOWEST)
public void login3(LoginEvent event) {
priorities.add(2);
latch.countDown();
}
});
ResourceAwareExecutorService service = new ResourceAwareExecutorService(1);
service.start();
eventLoop.setResourceAwareExecutorService(service);
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
latch.await(1, TimeUnit.SECONDS);
Assert.assertEquals("Incorrect call sequence", "[2, 0, 1]", priorities.toString());
}
@Test
public void ignoreCancelledTest() throws InterruptedException {
Plugin plugin = new Plugin() {
};
EventQueueOwner queueOwner = new EventQueueOwner() {
};
CountDownLatch latch = new CountDownLatch(1);
FullAsyncEventLoop eventLoop = new FullAsyncEventLoop();
List<Integer> priorities = new ArrayList<>(2);
eventLoop.addEventHandler(plugin, new Object() {
@EventHandler(priority = EventPriority.NORMAL, ignoreCancelled = true)
public void login1(LoginEvent event) {
priorities.add(0);
event.setCanceled(true);
}
@EventHandler(priority = EventPriority.HIGHEST, ignoreCancelled = true)
public void login2(LoginEvent event) {
priorities.add(1);
}
@EventHandler(priority = EventPriority.LOWEST, ignoreCancelled = true)
public void login3(LoginEvent event) {
priorities.add(2);
}
@EventHandler(priority = EventPriority.MONITOR)
public void monitor(LoginEvent event) {
latch.countDown();
}
});
ResourceAwareExecutorService service = new ResourceAwareExecutorService(1);
service.start();
eventLoop.setResourceAwareExecutorService(service);
eventLoop.asyncFireEvent(queueOwner, new LoginEvent(null));
latch.await(1, TimeUnit.SECONDS);
Assert.assertEquals("Incorrect call sequence", "[2, 0]", priorities.toString());
}
}

View File

@@ -0,0 +1,90 @@
package mc.core.events;
import mc.core.events.runner.lock.LockObserveList;
import mc.core.events.runner.lock.PoorMansLock;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
public class LockTest {
@Test
public void basicTest() throws InterruptedException {
AtomicBoolean engageCallbackCalled = new AtomicBoolean(false);
AtomicBoolean disengageCallbackCalled = new AtomicBoolean(false);
PoorMansLock lock = new PoorMansLock();
lock.addCallback(lock1 -> {
if (lock1.isLocked())
engageCallbackCalled.set(true);
else
disengageCallbackCalled.set(true);
});
lock.lock();
Assert.assertTrue("Lock is not locked", lock.isLocked());
Assert.assertTrue("Engage callback was not called", engageCallbackCalled.get());
engageCallbackCalled.set(false);
try {
lock.lock();
Assert.assertFalse("Engage callback was called from attempt to block from the same thread", engageCallbackCalled.get());
} catch (Exception ex) {
Assert.fail("Exception fired while attempting to lock from the same thread");
return;
}
Assert.assertFalse("Disengage callback was called while not actually disengaging [x1]", disengageCallbackCalled.get());
AtomicBoolean lockExceptionFired = new AtomicBoolean(false);
AtomicBoolean unlockExceptionFired = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
try {
lock.lock();
} catch (Exception ex) {
lockExceptionFired.set(true);
}
try {
lock.unlock();
} catch (Exception ex) {
unlockExceptionFired.set(true);
}
latch.countDown();
}).start();
latch.await();
Assert.assertTrue("Exception was not fired on concurrent lock attempt", lockExceptionFired.get());
Assert.assertTrue("Exception was not fired on non-owner unlock attempt", unlockExceptionFired.get());
Assert.assertFalse("Disengage callback was called while not actually disengaging [x2]", disengageCallbackCalled.get());
lock.unlock();
Assert.assertTrue("Disengage callback was on called on lock disengage", disengageCallbackCalled.get());
}
@Test
public void observeListTest() {
PoorMansLock lock1 = new PoorMansLock();
PoorMansLock lock2 = new PoorMansLock();
LockObserveList list = new LockObserveList();
list.add(lock1);
list.add(lock2);
Assert.assertTrue("LockObserveList was no able to correctly identify lock states for unlocked locks", list.isReady());
lock1.lock();
Assert.assertFalse("LockObserveList was no able to correctly identify lock states for list with one locked lock", list.isReady());
AtomicBoolean listReadyCallbackCalled = new AtomicBoolean(false);
list.setCallback(() -> listReadyCallbackCalled.set(true));
lock2.lock();
Assert.assertFalse("Callback was called when another lock got engaged", listReadyCallbackCalled.get());
lock1.unlock();
Assert.assertFalse("Callback was called while one lock is still locked", listReadyCallbackCalled.get());
lock2.unlock();
Assert.assertTrue("Callback was not called when both locks are actually free", listReadyCallbackCalled.get());
}
}

View File

@@ -0,0 +1,51 @@
package mc.core.timings;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
public class TimingsTest {
@Test
public void basicTest() {
try (Timings timings = Timings.start()) {
System.out.println("Test code");
}
}
@Test
public void brokenTimingTest() {
try (Timings timings = Timings.start()) {
Timings t1 = Timings.start();
Timings.start();
System.out.println("Pre Close t1");
t1.close();
System.out.println("Finished");
}
}
@Test
public void fileRecording() throws IOException {
Timings.getTimingsManager().startRecording(new File("test.timings"));
try (Timings t1 = Timings.start()) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
try (Timings t2 = Timings.start()) {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
Timings.getTimingsManager().stopRecording();
}
}

View File

@@ -10,7 +10,6 @@ import mc.core.network.proto_1_12_2.State;
import mc.core.network.proto_1_12_2.TeleportManager;
import mc.core.network.proto_1_12_2.netty.wrappers.WrapperNetChannel;
import mc.core.network.proto_1_12_2.packets.*;
import mc.core.player.Look;
import mc.core.player.Player;
import mc.core.player.PlayerManager;
import mc.core.player.PlayerMode;
@@ -48,8 +47,8 @@ public class LoginHandler extends AbstractStateHandler implements LoginStateHand
Player player = playerManager.getPlayer(packet.getPlayerName())
.orElseGet(() -> playerManager.createPlayer(
packet.getPlayerName(),
world.getSpawn(),
new Look(0f, 0f)));
world.getSpawn().getLocation(),
world.getSpawn().getLook()));
channel.writeAndFlush(new LoginSuccessPacket(
player.getUUID(),
@@ -68,7 +67,7 @@ public class LoginHandler extends AbstractStateHandler implements LoginStateHand
// Spawn Position
SpawnPositionPacket pkt2 = new SpawnPositionPacket();
pkt2.setLocation(world.getSpawn());
pkt2.setLocation(world.getSpawn().getLocation());
channel.write(pkt2);
// Player Abilities

View File

@@ -6,4 +6,4 @@ include('vanilla_commands')
include('proto_1.12.2') // Protocol 1.12.2
include('proto_1.12.2_netty') // Protocol 1.12.2 (Netty impl.)
include('generated_world')
include('event-loop')