Archived
0

clear file and reader on stop watcher

This commit is contained in:
iMoHax
2016-06-03 16:39:31 +03:00
parent 2e1571a3ed
commit fa3ff32587
3 changed files with 84 additions and 69 deletions

View File

@@ -19,7 +19,7 @@ public class LogReader implements LogHandler {
private void changeFile(File file){ private void changeFile(File file){
if (this.file != null && this.file.equals(file)) return; if (this.file != null && this.file.equals(file)) return;
LOG.trace("Watch new file {}", file); LOG.debug("Watch file {}", file);
try { try {
if (reader != null){ if (reader != null){
closeReader(); closeReader();
@@ -94,6 +94,8 @@ public class LogReader implements LogHandler {
if (reader == null) return; if (reader == null) return;
try { try {
reader.close(); reader.close();
reader = null;
file = null;
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Error on close old reader", e); LOG.warn("Error on close old reader", e);
} }

View File

@@ -11,38 +11,35 @@ import java.util.concurrent.TimeUnit;
public class LogWatcher { public class LogWatcher {
private final static Logger LOG = LoggerFactory.getLogger(LogWatcher.class); private final static Logger LOG = LoggerFactory.getLogger(LogWatcher.class);
private final Path dir; private Path dir;
private final LogHandler handler; private final LogHandler handler;
private WatchService watcher; private WatchService watcher;
private boolean run; private WatcherThread thread;
private Thread thread;
public LogWatcher(String dir, LogHandler handler) { public LogWatcher(LogHandler handler) {
this(FileSystems.getDefault().getPath(dir), handler);
}
public LogWatcher(Path dir, LogHandler handler) {
this.dir = dir;
this.handler = handler; this.handler = handler;
thread = new Thread(){ thread = null;
@Override
public void run() {
watch();
}
};
thread.setDaemon(true);
} }
public void start() throws IOException { public boolean isRun() {
return thread != null && thread.isAlive();
}
public void start(String dir) throws IOException {
start(FileSystems.getDefault().getPath(dir));
}
public void start(Path dir) throws IOException {
LOG.debug("Start log watch service, dir {}", dir); LOG.debug("Start log watch service, dir {}", dir);
if (watcher != null){ if (thread != null){
throw new IllegalStateException("Watch service already started"); throw new IllegalStateException("Watch service already started");
} }
this.dir = dir;
watcher = FileSystems.getDefault().newWatchService(); watcher = FileSystems.getDefault().newWatchService();
dir.register(watcher, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY); dir.register(watcher, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY);
Path last = getLastModify(); Path last = getLastModify();
if (last != null) handler.createFile(last); if (last != null) handler.createFile(last);
run = true; thread = new WatcherThread();
thread.start(); thread.start();
} }
@@ -58,53 +55,13 @@ public class LogWatcher {
return last != null ? last.toPath() : null; return last != null ? last.toPath() : null;
} }
private void watch(){
try {
while (run) {
WatchKey key = watcher.poll(5, TimeUnit.SECONDS);
if (key == null){
handler.notChanges();
continue;
}
for (WatchEvent<?> event: key.pollEvents()) {
WatchEvent.Kind<?> kind = event.kind();
if (kind == StandardWatchEventKinds.OVERFLOW) {
continue;
}
@SuppressWarnings("unchecked")
WatchEvent<Path> ev = (WatchEvent<Path>)event;
Path filename = ev.context();
Path file = dir.resolve(filename);
if (ev.kind() == StandardWatchEventKinds.ENTRY_CREATE){
LOG.trace("File {} was created", file);
handler.createFile(file);
} else
if (ev.kind() == StandardWatchEventKinds.ENTRY_MODIFY){
LOG.trace("File {} was modified", file);
handler.updateFile(file);
}
}
boolean valid = key.reset();
if (!valid) {
break;
}
}
} catch (InterruptedException | ClosedWatchServiceException e) {
if (!Thread.currentThread().isInterrupted()){
Thread.currentThread().interrupt();
}
} finally {
close();
}
}
private void close() { private void close() {
if (watcher != null) try { if (watcher != null){
watcher.close(); try {
} catch (IOException e) { watcher.close();
LOG.warn("Error on close log watcher", e); } catch (IOException e) {
LOG.warn("Error on close log watcher", e);
}
} }
handler.close(); handler.close();
watcher = null; watcher = null;
@@ -112,6 +69,62 @@ public class LogWatcher {
public void stop(){ public void stop(){
LOG.debug("Stop log watch service"); LOG.debug("Stop log watch service");
run = false; if (thread != null){
thread.cancel();
thread = null;
}
close();
}
private class WatcherThread extends Thread {
private boolean run;
private WatcherThread() {
this.setDaemon(true);
}
public void cancel(){
run = false;
}
@Override
public void run() {
run = true;
try {
while (run) {
WatchKey key = watcher.poll(5, TimeUnit.SECONDS);
if (key == null){
handler.notChanges();
continue;
}
for (WatchEvent<?> event: key.pollEvents()) {
WatchEvent.Kind<?> kind = event.kind();
if (kind == StandardWatchEventKinds.OVERFLOW) {
continue;
}
@SuppressWarnings("unchecked")
WatchEvent<Path> ev = (WatchEvent<Path>)event;
Path filename = ev.context();
Path file = dir.resolve(filename);
if (ev.kind() == StandardWatchEventKinds.ENTRY_CREATE){
LOG.trace("File {} was created", file);
handler.createFile(file);
} else
if (ev.kind() == StandardWatchEventKinds.ENTRY_MODIFY){
LOG.trace("File {} was modified", file);
handler.updateFile(file);
}
}
boolean valid = key.reset();
if (!valid) {
break;
}
}
} catch (InterruptedException | ClosedWatchServiceException e) {
cancel();
}
run = false;
}
} }
} }

View File

@@ -24,8 +24,8 @@ public class LogWatcherTest {
LOG.info("Test log watcher"); LOG.info("Test log watcher");
LogHandler handler = new LogReader(".+\\.log$"); LogHandler handler = new LogReader(".+\\.log$");
String path = readLine("Watch dir:"); String path = readLine("Watch dir:");
LogWatcher watcher = new LogWatcher(path, handler); LogWatcher watcher = new LogWatcher(handler);
watcher.start(); watcher.start(path);
Thread.sleep(5*60*1000); Thread.sleep(5*60*1000);
watcher.stop(); watcher.stop();
} }