From b2d3792384d8f76bf401696b8734f67c976bfd1d Mon Sep 17 00:00:00 2001 From: Daniil Date: Tue, 7 Aug 2018 16:06:34 +0700 Subject: [PATCH] Timings IO foundation --- .../java/mc/core/timings/ThreadTimings.java | 6 + .../main/java/mc/core/timings/Timings.java | 2 +- .../mc/core/timings/TimingsFileWriter.java | 72 +++++++++++ .../java/mc/core/timings/TimingsManager.java | 112 +++++++++++++++++- .../java/mc/core/timings/TimingsRecord.java | 31 +++++ 5 files changed, 218 insertions(+), 5 deletions(-) create mode 100644 event-loop/src/main/java/mc/core/timings/TimingsFileWriter.java create mode 100644 event-loop/src/main/java/mc/core/timings/TimingsRecord.java diff --git a/event-loop/src/main/java/mc/core/timings/ThreadTimings.java b/event-loop/src/main/java/mc/core/timings/ThreadTimings.java index f05ad22..2973839 100644 --- a/event-loop/src/main/java/mc/core/timings/ThreadTimings.java +++ b/event-loop/src/main/java/mc/core/timings/ThreadTimings.java @@ -8,6 +8,10 @@ public class ThreadTimings { private int threadId; private Stack stack = new Stack<>(); + public Stack getStack() { + return stack; + } + public ThreadTimings() { this.threadId = IDS.getAndIncrement(); } @@ -18,6 +22,7 @@ public class ThreadTimings { public Timings start() { Timings timings = new Timings(this, stack.size()); + Timings.getTimingsManager().waitForTimingsInitialize(); stack.push(timings); return timings; } @@ -25,6 +30,7 @@ public class ThreadTimings { public void end(Timings finished) { Timings timings = null; while (!stack.isEmpty() && timings != finished) { + Timings.getTimingsManager().waitForTimingsInitialize(); timings = stack.pop(); if (!timings.hasFinished()) timings.finish(); diff --git a/event-loop/src/main/java/mc/core/timings/Timings.java b/event-loop/src/main/java/mc/core/timings/Timings.java index 8c537cd..3824b3c 100644 --- a/event-loop/src/main/java/mc/core/timings/Timings.java +++ b/event-loop/src/main/java/mc/core/timings/Timings.java @@ -14,7 +14,7 @@ public class Timings implements AutoCloseable { } public static Timings start() { - return TimingsManager.TIMINGS_MANAGER.start(); + return TimingsManager.TIMINGS_MANAGER.getCurrentThreadTimings().start(); } public static TimingsManager getTimingsManager() { diff --git a/event-loop/src/main/java/mc/core/timings/TimingsFileWriter.java b/event-loop/src/main/java/mc/core/timings/TimingsFileWriter.java new file mode 100644 index 0000000..451fb88 --- /dev/null +++ b/event-loop/src/main/java/mc/core/timings/TimingsFileWriter.java @@ -0,0 +1,72 @@ +package mc.core.timings; + + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.concurrent.locks.ReentrantLock; + +@SuppressWarnings("Duplicates") +@Slf4j +public class TimingsFileWriter { + private FileOutputStream fileOutputStream; + private ObjectOutputStream writer; + private ReentrantLock lock = new ReentrantLock(); + + public TimingsFileWriter(File saveFile) throws IOException { + fileOutputStream = new FileOutputStream(saveFile); + writer = new ObjectOutputStream(fileOutputStream); + } + + public void writeEvent(int threadId, int stackId, long time, TimingsEventType type) { + lock.lock(); + try { + writer.write(threadId); + writer.write(stackId); + writer.writeLong(time); + writer.writeShort(type.getId()); + } catch (IOException e) { + log.error("Unable to write timings record", e); + } finally { + lock.unlock(); + } + } + + public void writeEvent(int threadId, int stackId, long time, TimingsEventType type, String data) { + lock.lock(); + try { + writer.write(threadId); + writer.write(stackId); + writer.writeLong(time); + writer.writeShort(type.getId()); + writer.writeUTF(data); + } catch (IOException e) { + log.error("Unable to write timings record", e); + } finally { + lock.unlock(); + } + } + + public void close() throws IOException { + writer.close(); + fileOutputStream.close(); + } + + @RequiredArgsConstructor + public enum TimingsEventType { + TIMINGS_START((short) 0), + TIMINGS_END((short) 1), + TIMINGS_FILE_INITIALIZING((short) 2), + TIMINGS_FILE_INITIALIZED((short) 3), + TIMINGS_CHANGE_THREAD_OPTIONS((short) 4), + TIMINGS_FILE_END((short) 5); + + @Getter + private final short id; + } +} diff --git a/event-loop/src/main/java/mc/core/timings/TimingsManager.java b/event-loop/src/main/java/mc/core/timings/TimingsManager.java index 05a49d5..f5105ec 100644 --- a/event-loop/src/main/java/mc/core/timings/TimingsManager.java +++ b/event-loop/src/main/java/mc/core/timings/TimingsManager.java @@ -1,14 +1,118 @@ package mc.core.timings; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import lombok.extern.slf4j.Slf4j; +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; + +@Slf4j public class TimingsManager { static TimingsManager TIMINGS_MANAGER = new TimingsManager(); + private final AtomicBoolean waitForFile = new AtomicBoolean(false); private Map threadTimings = new ConcurrentHashMap<>(); + private TimingsFileWriter writer; + private Thread timingsIoThread; + private CountDownLatch ioThreadStopMutex; + private BlockingQueue queue; + private ReentrantLock queueAccessLock = new ReentrantLock(); + + public void startRecording(File file) { + synchronized (waitForFile) { + waitForFile.set(true); + } + try { + writer = new TimingsFileWriter(file); + writer.writeEvent(0, 0, System.nanoTime(), TimingsFileWriter.TimingsEventType.TIMINGS_FILE_INITIALIZING); + // Synchronize current thread state + for (Map.Entry pair : threadTimings.entrySet()) { + writer.writeEvent(pair.getValue().getThreadId(), 0, System.nanoTime(), TimingsFileWriter.TimingsEventType.TIMINGS_CHANGE_THREAD_OPTIONS, "name: " + pair.getKey().getName()); + for (Timings timings : pair.getValue().getStack()) { + writer.writeEvent(pair.getValue().getThreadId(), timings.getId(), timings.getAcquireTime(), TimingsFileWriter.TimingsEventType.TIMINGS_START); + } + } + writer.writeEvent(0, 0, System.nanoTime(), TimingsFileWriter.TimingsEventType.TIMINGS_FILE_INITIALIZED); + queue = new ArrayBlockingQueue<>(200); + ioThreadStopMutex = new CountDownLatch(1); + timingsIoThread = new Thread() { + @Override + public void run() { + try { + while (!isInterrupted() && isAlive()) { + TimingsRecord record; + try { + record = queue.take(); + } catch (InterruptedException e) { + return; + } + record.writeToFile(writer); + } + } finally { + ioThreadStopMutex.countDown(); + } + } + }; + } catch (IOException e) { + log.error("Unable to start timings recording", e); + } + synchronized (waitForFile) { + waitForFile.set(false); + waitForFile.notifyAll(); + } + } + + public void stopRecording() { + // Disable write queue + queueAccessLock.lock(); + queue = null; + queueAccessLock.unlock(); + // Interrupt thread and wait until in finished writing the last task + timingsIoThread.interrupt(); + try { + ioThreadStopMutex.await(); + } catch (InterruptedException e) { + log.error("Unable to wait until last record would be written to file", e); + } + // Write EOF event + writer.writeEvent(0, 0, System.nanoTime(), TimingsFileWriter.TimingsEventType.TIMINGS_FILE_END); + // Unload file + try { + writer.close(); + } catch (IOException e) { + log.error("Unable to close timings file", e); + } + writer = null; + } + + void addToQueue(TimingsRecord record) { + if (queue == null) + return; + queueAccessLock.lock(); + try { + if (queue != null) + queue.offer(record); + } finally { + queueAccessLock.unlock(); + } + } + + void waitForTimingsInitialize() { + synchronized (waitForFile) { + while (waitForFile.get()) { + try { + waitForFile.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } - public Timings start() { - return getCurrentThreadTimings().start(); } public ThreadTimings getCurrentThreadTimings() { diff --git a/event-loop/src/main/java/mc/core/timings/TimingsRecord.java b/event-loop/src/main/java/mc/core/timings/TimingsRecord.java new file mode 100644 index 0000000..38eef49 --- /dev/null +++ b/event-loop/src/main/java/mc/core/timings/TimingsRecord.java @@ -0,0 +1,31 @@ +package mc.core.timings; + +public class TimingsRecord { + private int threadId; + private int stackId; + private long time; + private TimingsFileWriter.TimingsEventType eventType; + private String data; + + public TimingsRecord(int threadId, int stackId, long time, TimingsFileWriter.TimingsEventType eventType) { + this.threadId = threadId; + this.stackId = stackId; + this.time = time; + this.eventType = eventType; + } + + public TimingsRecord(int threadId, int stackId, long time, TimingsFileWriter.TimingsEventType eventType, String data) { + this.threadId = threadId; + this.stackId = stackId; + this.time = time; + this.eventType = eventType; + this.data = data; + } + + public void writeToFile(TimingsFileWriter fileWriter) { + if (data == null) + fileWriter.writeEvent(threadId, stackId, time, eventType); + else + fileWriter.writeEvent(threadId, stackId, time, eventType, data); + } +}