Archived
0

modify emdn threads executor

This commit is contained in:
iMoHax
2014-09-04 14:12:11 +04:00
parent fb8dfeb42d
commit 9e104943f7
4 changed files with 93 additions and 91 deletions

View File

@@ -21,16 +21,20 @@ public class EMDNUpdater {
private static long interval; private static long interval;
public static void updateFromEMDN(VendorUpdater updater){ public static void updateFromEMDN(VendorUpdater updater){
Station emdnData = emdn.getVendor(updater.getName()); Station emdnData = emdn.get(updater.getName());
LOG.debug("Update {} from EMDN", updater.getName()); if (emdnData != null){
if (emdnData == null){ update(updater, emdnData);
} else {
LOG.trace("Not found in EMDN"); LOG.trace("Not found in EMDN");
return;
} }
}
private static void update(VendorUpdater updater, Station emdnData){
LOG.trace("Update {} from EMDN", updater.getName());
for (VendorUpdater.FakeOffer offer : updater.getOffers()) { for (VendorUpdater.FakeOffer offer : updater.getOffers()) {
if (offer.getItem().isMarketItem()){ if (offer.getItem().isMarketItem()){
ItemData data = emdnData.getData(offer.getItem().getId()); ItemData data = emdnData.getData(offer.getItem().getId());
LOG.debug("Update item {} to {}", offer.getItem().getName(), data); LOG.trace("Update item {} to {}", offer.getItem().getName(), data);
if (data != null){ if (data != null){
offer.setSprice(data.getBuy()); offer.setSprice(data.getBuy());
offer.setBprice(data.getSell()); offer.setBprice(data.getSell());
@@ -56,8 +60,8 @@ public class EMDNUpdater {
public static void shutdown(){ public static void shutdown(){
if (executor != null) { if (executor != null) {
LOG.debug("Shutdown auto update"); LOG.debug("Shutdown auto update");
autoupdate.cancel(true); if (autoupdate != null) autoupdate.cancel(true);
executor.shutdown(); executor.shutdownNow();
} }
emdn.shutdown(); emdn.shutdown();
} }
@@ -83,8 +87,8 @@ public class EMDNUpdater {
emdn.start(); emdn.start();
} }
else { else {
emdn.shutdown();
setInterval(0); setInterval(0);
emdn.stop();
} }
} }
@@ -94,21 +98,15 @@ public class EMDNUpdater {
public static void setInterval(long interval) { public static void setInterval(long interval) {
if (emdn.isActive()){ if (emdn.isActive()){
if (interval > 0) { if (autoupdate != null){
if (executor != null){ LOG.debug("Stop auto update");
LOG.debug("Cancel previous auto update");
autoupdate.cancel(true); autoupdate.cancel(true);
autoupdate = null;
} }
if (interval > 0) {
if (executor == null) executor = Executors.newSingleThreadScheduledExecutor(); if (executor == null) executor = Executors.newSingleThreadScheduledExecutor();
LOG.debug("Start auto update each {} sec", interval); LOG.debug("Start auto update each {} sec", interval);
autoupdate = executor.scheduleAtFixedRate(emdnUpdater, interval, interval, TimeUnit.SECONDS); autoupdate = executor.scheduleAtFixedRate(emdnUpdater, interval, interval, TimeUnit.SECONDS);
} else {
if (executor != null){
LOG.debug("Stop auto update");
autoupdate.cancel(true);
executor.shutdown();
executor = null;
}
} }
} }
EMDNUpdater.interval = interval; EMDNUpdater.interval = interval;
@@ -125,10 +123,15 @@ public class EMDNUpdater {
public void run() { public void run() {
market.vendorsProperty().get().forEach((vendor) -> { market.vendorsProperty().get().forEach((vendor) -> {
LOG.trace("Auto update {}", vendor); LOG.trace("Auto update {}", vendor);
updater.reset(); Station emdnData = emdn.pop(vendor.getName());
if (emdnData != null){
updater.init(vendor); updater.init(vendor);
updateFromEMDN(updater); update(updater, emdnData);
updater.commit(); updater.commit();
updater.reset();
} else {
LOG.trace("Not found in EMDN");
}
}); });
} }
} }

View File

@@ -6,74 +6,60 @@ import org.zeromq.ZMQ;
import org.zeromq.ZMQException; import org.zeromq.ZMQException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.concurrent.Executors; import java.util.concurrent.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.zip.DataFormatException; import java.util.zip.DataFormatException;
import java.util.zip.Inflater; import java.util.zip.Inflater;
public class EMDN { public class EMDN {
private final static Logger LOG = LoggerFactory.getLogger(EMDN.class); private final static Logger LOG = LoggerFactory.getLogger(EMDN.class);
private final Market cache = new Market(); private final ConcurrentHashMap<String, Station> cache = new ConcurrentHashMap<>(40, 0.9f, 1);
private final ZMQ.Context context;
private ExecutorService executor;
private String subServer; private String subServer;
private ZMQ.Context context = null; private Future<?> receive;
private ZMQ.Socket subscriber = null;
private ScheduledExecutorService executor;
private ScheduledFuture<?> receive;
private boolean clear; private boolean clear;
public EMDN() { public EMDN() {
context = ZMQ.context(1);
} }
public EMDN(String subServer, boolean clearOnShutdown) { public EMDN(String subServer, boolean clearOnShutdown) {
this();
this.subServer = subServer; this.subServer = subServer;
clear = clearOnShutdown; clear = clearOnShutdown;
} }
private void init(){
context = ZMQ.context(1);
subscriber = context.socket(ZMQ.SUB);
}
public void start(){ public void start(){
if (isActive()) return; if (isActive()) return;
init(); if (executor == null) executor = Executors.newSingleThreadExecutor();
LOG.info("Connect to server {}", subServer); receive = executor.submit(new Receiver());
subscriber.connect(subServer);
LOG.trace("Subscribe");
subscriber.subscribe(new byte[0]);
executor = Executors.newSingleThreadScheduledExecutor();
receive = executor.scheduleWithFixedDelay(() -> {
try {
byte[] receivedData = subscriber.recv(0);
LOG.trace("Received data: {}", receivedData);
if (receivedData == null) return;
//receivedData = decompress(receivedData);
String market_csv = new String(receivedData, "UTF-8");
parseCSV(market_csv);
} catch (ZMQException | UnsupportedEncodingException ex) {
if (!executor.isShutdown())
LOG.error("Error on get data from EMDN", ex);
}
}, 0, 1, TimeUnit.MILLISECONDS);
} }
public void shutdown() {
public void stop() {
if (isActive()){ if (isActive()){
LOG.info("Shutdown EMDN client"); LOG.info("Stop EMDN client");
receive.cancel(false); receive.cancel(false);
executor.shutdown(); receive = null;
subscriber.close();
context.term();
}
subscriber = null;
context = null;
if (clear) if (clear)
cache.clear(); cache.clear();
} }
}
public void shutdown() {
LOG.info("Shutdown EMDN client");
stop();
if (executor != null) {
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException ignore) {
}
}
context.term();
}
private void parseCSV(String csv) { private void parseCSV(String csv) {
LOG.debug("Parse csv: {}", csv); LOG.debug("Parse csv: {}", csv);
@@ -86,7 +72,7 @@ public class EMDN {
LOG.trace("Item: {}", item); LOG.trace("Item: {}", item);
String stName = flds[8].split("\\(")[0].trim(); String stName = flds[8].split("\\(")[0].trim();
LOG.trace("Station: {}", stName); LOG.trace("Station: {}", stName);
Station station = cache.getVendor(stName); Station station = cache.get(stName);
if (station != null){ if (station != null){
LOG.trace("Is old, update"); LOG.trace("Is old, update");
station.update(item); station.update(item);
@@ -94,7 +80,7 @@ public class EMDN {
LOG.trace("Is new, create"); LOG.trace("Is new, create");
station = new Station(stName); station = new Station(stName);
station.update(item); station.update(item);
cache.addVendor(station); cache.put(stName, station);
} }
} }
@@ -116,19 +102,51 @@ public class EMDN {
return res; return res;
} }
public Station getVendor(String name){ public Station get(String name){
return cache.getVendor(name); return cache.get(name);
}
public Station pop(String name) {
return cache.remove(name);
} }
public boolean isActive(){ public boolean isActive(){
return subscriber!=null; return receive!=null;
} }
public void connectTo(String subServer){ public void connectTo(String subServer){
if (subServer.equals(this.subServer)) return; if (subServer.equals(this.subServer)) return;
boolean active = isActive(); boolean active = isActive();
if (active) shutdown(); if (active) stop();
this.subServer = subServer; this.subServer = subServer;
if (active) start(); if (active) start();
} }
private class Receiver implements Runnable {
@Override
public void run() {
try (ZMQ.Socket subscriber = context.socket(ZMQ.SUB)){
subscriber.setReceiveTimeOut(10000);
LOG.info("Connect to server {}", subServer);
subscriber.connect(subServer);
LOG.trace("Subscribe");
subscriber.subscribe(new byte[0]);
while (!executor.isShutdown() && !receive.isCancelled()){
try {
byte[] receivedData = subscriber.recv(0);
LOG.trace("Received data: {}", receivedData);
if (receivedData == null) return;
//receivedData = decompress(receivedData);
String market_csv = new String(receivedData, "UTF-8");
parseCSV(market_csv);
} catch (ZMQException | UnsupportedEncodingException ex) {
LOG.error("Error on get data from EMDN", ex);
}
}
} catch (Exception ex){
LOG.error("Error on connect to EMDN", ex);
}
}
}
} }

View File

@@ -1,19 +0,0 @@
package ru.trader.emdn;
import java.util.concurrent.ConcurrentHashMap;
public class Market {
private final ConcurrentHashMap<String, Station> vendors = new ConcurrentHashMap<>(40, 0.9f, 1);
public Station getVendor(String name){
return vendors.get(name);
}
public void addVendor(Station vendor){
vendors.put(vendor.getName(), vendor);
}
public void clear(){
vendors.clear();
}
}

View File

@@ -20,7 +20,7 @@ public class EMDNTest extends Assert {
public void testGetData() throws Exception { public void testGetData() throws Exception {
// wait submit // wait submit
Thread.sleep(4000); Thread.sleep(4000);
Station station = markettool.getVendor("Eranin"); Station station = markettool.get("Eranin");
assertNotNull(station); assertNotNull(station);
ItemData itemData = station.getData("cropharvesters"); ItemData itemData = station.getData("cropharvesters");
assertNotNull(itemData); assertNotNull(itemData);