Archived
0

Timings IO foundation

This commit is contained in:
Daniil
2018-08-07 16:06:34 +07:00
parent 000816f110
commit b2d3792384
5 changed files with 218 additions and 5 deletions

View File

@@ -8,6 +8,10 @@ public class ThreadTimings {
private int threadId; private int threadId;
private Stack<Timings> stack = new Stack<>(); private Stack<Timings> stack = new Stack<>();
public Stack<Timings> getStack() {
return stack;
}
public ThreadTimings() { public ThreadTimings() {
this.threadId = IDS.getAndIncrement(); this.threadId = IDS.getAndIncrement();
} }
@@ -18,6 +22,7 @@ public class ThreadTimings {
public Timings start() { public Timings start() {
Timings timings = new Timings(this, stack.size()); Timings timings = new Timings(this, stack.size());
Timings.getTimingsManager().waitForTimingsInitialize();
stack.push(timings); stack.push(timings);
return timings; return timings;
} }
@@ -25,6 +30,7 @@ public class ThreadTimings {
public void end(Timings finished) { public void end(Timings finished) {
Timings timings = null; Timings timings = null;
while (!stack.isEmpty() && timings != finished) { while (!stack.isEmpty() && timings != finished) {
Timings.getTimingsManager().waitForTimingsInitialize();
timings = stack.pop(); timings = stack.pop();
if (!timings.hasFinished()) if (!timings.hasFinished())
timings.finish(); timings.finish();

View File

@@ -14,7 +14,7 @@ public class Timings implements AutoCloseable {
} }
public static Timings start() { public static Timings start() {
return TimingsManager.TIMINGS_MANAGER.start(); return TimingsManager.TIMINGS_MANAGER.getCurrentThreadTimings().start();
} }
public static TimingsManager getTimingsManager() { public static TimingsManager getTimingsManager() {

View File

@@ -0,0 +1,72 @@
package mc.core.timings;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.concurrent.locks.ReentrantLock;
@SuppressWarnings("Duplicates")
@Slf4j
public class TimingsFileWriter {
private FileOutputStream fileOutputStream;
private ObjectOutputStream writer;
private ReentrantLock lock = new ReentrantLock();
public TimingsFileWriter(File saveFile) throws IOException {
fileOutputStream = new FileOutputStream(saveFile);
writer = new ObjectOutputStream(fileOutputStream);
}
public void writeEvent(int threadId, int stackId, long time, TimingsEventType type) {
lock.lock();
try {
writer.write(threadId);
writer.write(stackId);
writer.writeLong(time);
writer.writeShort(type.getId());
} catch (IOException e) {
log.error("Unable to write timings record", e);
} finally {
lock.unlock();
}
}
public void writeEvent(int threadId, int stackId, long time, TimingsEventType type, String data) {
lock.lock();
try {
writer.write(threadId);
writer.write(stackId);
writer.writeLong(time);
writer.writeShort(type.getId());
writer.writeUTF(data);
} catch (IOException e) {
log.error("Unable to write timings record", e);
} finally {
lock.unlock();
}
}
public void close() throws IOException {
writer.close();
fileOutputStream.close();
}
@RequiredArgsConstructor
public enum TimingsEventType {
TIMINGS_START((short) 0),
TIMINGS_END((short) 1),
TIMINGS_FILE_INITIALIZING((short) 2),
TIMINGS_FILE_INITIALIZED((short) 3),
TIMINGS_CHANGE_THREAD_OPTIONS((short) 4),
TIMINGS_FILE_END((short) 5);
@Getter
private final short id;
}
}

View File

@@ -1,14 +1,118 @@
package mc.core.timings; package mc.core.timings;
import java.util.Map; import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ConcurrentHashMap;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class TimingsManager { public class TimingsManager {
static TimingsManager TIMINGS_MANAGER = new TimingsManager(); static TimingsManager TIMINGS_MANAGER = new TimingsManager();
private final AtomicBoolean waitForFile = new AtomicBoolean(false);
private Map<Thread, ThreadTimings> threadTimings = new ConcurrentHashMap<>(); private Map<Thread, ThreadTimings> threadTimings = new ConcurrentHashMap<>();
private TimingsFileWriter writer;
private Thread timingsIoThread;
private CountDownLatch ioThreadStopMutex;
private BlockingQueue<TimingsRecord> queue;
private ReentrantLock queueAccessLock = new ReentrantLock();
public void startRecording(File file) {
synchronized (waitForFile) {
waitForFile.set(true);
}
try {
writer = new TimingsFileWriter(file);
writer.writeEvent(0, 0, System.nanoTime(), TimingsFileWriter.TimingsEventType.TIMINGS_FILE_INITIALIZING);
// Synchronize current thread state
for (Map.Entry<Thread, ThreadTimings> pair : threadTimings.entrySet()) {
writer.writeEvent(pair.getValue().getThreadId(), 0, System.nanoTime(), TimingsFileWriter.TimingsEventType.TIMINGS_CHANGE_THREAD_OPTIONS, "name: " + pair.getKey().getName());
for (Timings timings : pair.getValue().getStack()) {
writer.writeEvent(pair.getValue().getThreadId(), timings.getId(), timings.getAcquireTime(), TimingsFileWriter.TimingsEventType.TIMINGS_START);
}
}
writer.writeEvent(0, 0, System.nanoTime(), TimingsFileWriter.TimingsEventType.TIMINGS_FILE_INITIALIZED);
queue = new ArrayBlockingQueue<>(200);
ioThreadStopMutex = new CountDownLatch(1);
timingsIoThread = new Thread() {
@Override
public void run() {
try {
while (!isInterrupted() && isAlive()) {
TimingsRecord record;
try {
record = queue.take();
} catch (InterruptedException e) {
return;
}
record.writeToFile(writer);
}
} finally {
ioThreadStopMutex.countDown();
}
}
};
} catch (IOException e) {
log.error("Unable to start timings recording", e);
}
synchronized (waitForFile) {
waitForFile.set(false);
waitForFile.notifyAll();
}
}
public void stopRecording() {
// Disable write queue
queueAccessLock.lock();
queue = null;
queueAccessLock.unlock();
// Interrupt thread and wait until in finished writing the last task
timingsIoThread.interrupt();
try {
ioThreadStopMutex.await();
} catch (InterruptedException e) {
log.error("Unable to wait until last record would be written to file", e);
}
// Write EOF event
writer.writeEvent(0, 0, System.nanoTime(), TimingsFileWriter.TimingsEventType.TIMINGS_FILE_END);
// Unload file
try {
writer.close();
} catch (IOException e) {
log.error("Unable to close timings file", e);
}
writer = null;
}
void addToQueue(TimingsRecord record) {
if (queue == null)
return;
queueAccessLock.lock();
try {
if (queue != null)
queue.offer(record);
} finally {
queueAccessLock.unlock();
}
}
void waitForTimingsInitialize() {
synchronized (waitForFile) {
while (waitForFile.get()) {
try {
waitForFile.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public Timings start() {
return getCurrentThreadTimings().start();
} }
public ThreadTimings getCurrentThreadTimings() { public ThreadTimings getCurrentThreadTimings() {

View File

@@ -0,0 +1,31 @@
package mc.core.timings;
public class TimingsRecord {
private int threadId;
private int stackId;
private long time;
private TimingsFileWriter.TimingsEventType eventType;
private String data;
public TimingsRecord(int threadId, int stackId, long time, TimingsFileWriter.TimingsEventType eventType) {
this.threadId = threadId;
this.stackId = stackId;
this.time = time;
this.eventType = eventType;
}
public TimingsRecord(int threadId, int stackId, long time, TimingsFileWriter.TimingsEventType eventType, String data) {
this.threadId = threadId;
this.stackId = stackId;
this.time = time;
this.eventType = eventType;
this.data = data;
}
public void writeToFile(TimingsFileWriter fileWriter) {
if (data == null)
fileWriter.writeEvent(threadId, stackId, time, eventType);
else
fileWriter.writeEvent(threadId, stackId, time, eventType, data);
}
}