From 9e104943f7ef7fea5e058e28cf4ec2669b60e1c7 Mon Sep 17 00:00:00 2001 From: iMoHax Date: Thu, 4 Sep 2014 14:12:11 +0400 Subject: [PATCH] modify emdn threads executor --- .../src/main/java/ru/trader/EMDNUpdater.java | 49 ++++---- utils/src/main/java/ru/trader/emdn/EMDN.java | 114 ++++++++++-------- .../src/main/java/ru/trader/emdn/Market.java | 19 --- .../test/java/ru/trader/emdn/EMDNTest.java | 2 +- 4 files changed, 93 insertions(+), 91 deletions(-) delete mode 100644 utils/src/main/java/ru/trader/emdn/Market.java diff --git a/client/src/main/java/ru/trader/EMDNUpdater.java b/client/src/main/java/ru/trader/EMDNUpdater.java index 09e10ef..8896235 100644 --- a/client/src/main/java/ru/trader/EMDNUpdater.java +++ b/client/src/main/java/ru/trader/EMDNUpdater.java @@ -21,16 +21,20 @@ public class EMDNUpdater { private static long interval; public static void updateFromEMDN(VendorUpdater updater){ - Station emdnData = emdn.getVendor(updater.getName()); - LOG.debug("Update {} from EMDN", updater.getName()); - if (emdnData == null){ + Station emdnData = emdn.get(updater.getName()); + if (emdnData != null){ + update(updater, emdnData); + } else { LOG.trace("Not found in EMDN"); - return; } + } + + private static void update(VendorUpdater updater, Station emdnData){ + LOG.trace("Update {} from EMDN", updater.getName()); for (VendorUpdater.FakeOffer offer : updater.getOffers()) { if (offer.getItem().isMarketItem()){ ItemData data = emdnData.getData(offer.getItem().getId()); - LOG.debug("Update item {} to {}", offer.getItem().getName(), data); + LOG.trace("Update item {} to {}", offer.getItem().getName(), data); if (data != null){ offer.setSprice(data.getBuy()); offer.setBprice(data.getSell()); @@ -56,8 +60,8 @@ public class EMDNUpdater { public static void shutdown(){ if (executor != null) { LOG.debug("Shutdown auto update"); - autoupdate.cancel(true); - executor.shutdown(); + if (autoupdate != null) autoupdate.cancel(true); + executor.shutdownNow(); } emdn.shutdown(); } @@ -83,8 +87,8 @@ public class EMDNUpdater { emdn.start(); } else { - emdn.shutdown(); setInterval(0); + emdn.stop(); } } @@ -94,21 +98,15 @@ public class EMDNUpdater { public static void setInterval(long interval) { if (emdn.isActive()){ + if (autoupdate != null){ + LOG.debug("Stop auto update"); + autoupdate.cancel(true); + autoupdate = null; + } if (interval > 0) { - if (executor != null){ - LOG.debug("Cancel previous auto update"); - autoupdate.cancel(true); - } if (executor == null) executor = Executors.newSingleThreadScheduledExecutor(); LOG.debug("Start auto update each {} sec", interval); autoupdate = executor.scheduleAtFixedRate(emdnUpdater, interval, interval, TimeUnit.SECONDS); - } else { - if (executor != null){ - LOG.debug("Stop auto update"); - autoupdate.cancel(true); - executor.shutdown(); - executor = null; - } } } EMDNUpdater.interval = interval; @@ -125,10 +123,15 @@ public class EMDNUpdater { public void run() { market.vendorsProperty().get().forEach((vendor) -> { LOG.trace("Auto update {}", vendor); - updater.reset(); - updater.init(vendor); - updateFromEMDN(updater); - updater.commit(); + Station emdnData = emdn.pop(vendor.getName()); + if (emdnData != null){ + updater.init(vendor); + update(updater, emdnData); + updater.commit(); + updater.reset(); + } else { + LOG.trace("Not found in EMDN"); + } }); } } diff --git a/utils/src/main/java/ru/trader/emdn/EMDN.java b/utils/src/main/java/ru/trader/emdn/EMDN.java index 1c202e1..46e6949 100644 --- a/utils/src/main/java/ru/trader/emdn/EMDN.java +++ b/utils/src/main/java/ru/trader/emdn/EMDN.java @@ -6,75 +6,61 @@ import org.zeromq.ZMQ; import org.zeromq.ZMQException; import java.io.UnsupportedEncodingException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.zip.DataFormatException; import java.util.zip.Inflater; public class EMDN { private final static Logger LOG = LoggerFactory.getLogger(EMDN.class); - private final Market cache = new Market(); + private final ConcurrentHashMap cache = new ConcurrentHashMap<>(40, 0.9f, 1); + private final ZMQ.Context context; + private ExecutorService executor; private String subServer; - private ZMQ.Context context = null; - private ZMQ.Socket subscriber = null; - private ScheduledExecutorService executor; - private ScheduledFuture receive; + private Future receive; private boolean clear; public EMDN() { + context = ZMQ.context(1); } public EMDN(String subServer, boolean clearOnShutdown) { + this(); this.subServer = subServer; clear = clearOnShutdown; } - private void init(){ - context = ZMQ.context(1); - subscriber = context.socket(ZMQ.SUB); - } public void start(){ if (isActive()) return; - init(); - LOG.info("Connect to server {}", subServer); - subscriber.connect(subServer); - LOG.trace("Subscribe"); - subscriber.subscribe(new byte[0]); - executor = Executors.newSingleThreadScheduledExecutor(); - receive = executor.scheduleWithFixedDelay(() -> { - try { - byte[] receivedData = subscriber.recv(0); - LOG.trace("Received data: {}", receivedData); - if (receivedData == null) return; - //receivedData = decompress(receivedData); - String market_csv = new String(receivedData, "UTF-8"); - parseCSV(market_csv); - } catch (ZMQException | UnsupportedEncodingException ex) { - if (!executor.isShutdown()) - LOG.error("Error on get data from EMDN", ex); - } - }, 0, 1, TimeUnit.MILLISECONDS); + if (executor == null) executor = Executors.newSingleThreadExecutor(); + receive = executor.submit(new Receiver()); + } + + + public void stop() { + if (isActive()){ + LOG.info("Stop EMDN client"); + receive.cancel(false); + receive = null; + if (clear) + cache.clear(); + } } public void shutdown() { - if (isActive()){ - LOG.info("Shutdown EMDN client"); - receive.cancel(false); + LOG.info("Shutdown EMDN client"); + stop(); + if (executor != null) { executor.shutdown(); - subscriber.close(); - context.term(); + try { + executor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException ignore) { + } } - subscriber = null; - context = null; - if (clear) - cache.clear(); + context.term(); } - private void parseCSV(String csv) { LOG.debug("Parse csv: {}", csv); if (csv.isEmpty()) return; @@ -86,7 +72,7 @@ public class EMDN { LOG.trace("Item: {}", item); String stName = flds[8].split("\\(")[0].trim(); LOG.trace("Station: {}", stName); - Station station = cache.getVendor(stName); + Station station = cache.get(stName); if (station != null){ LOG.trace("Is old, update"); station.update(item); @@ -94,7 +80,7 @@ public class EMDN { LOG.trace("Is new, create"); station = new Station(stName); station.update(item); - cache.addVendor(station); + cache.put(stName, station); } } @@ -116,19 +102,51 @@ public class EMDN { return res; } - public Station getVendor(String name){ - return cache.getVendor(name); + public Station get(String name){ + return cache.get(name); + } + + public Station pop(String name) { + return cache.remove(name); } public boolean isActive(){ - return subscriber!=null; + return receive!=null; } public void connectTo(String subServer){ if (subServer.equals(this.subServer)) return; boolean active = isActive(); - if (active) shutdown(); + if (active) stop(); this.subServer = subServer; if (active) start(); } + + private class Receiver implements Runnable { + + @Override + public void run() { + try (ZMQ.Socket subscriber = context.socket(ZMQ.SUB)){ + subscriber.setReceiveTimeOut(10000); + LOG.info("Connect to server {}", subServer); + subscriber.connect(subServer); + LOG.trace("Subscribe"); + subscriber.subscribe(new byte[0]); + while (!executor.isShutdown() && !receive.isCancelled()){ + try { + byte[] receivedData = subscriber.recv(0); + LOG.trace("Received data: {}", receivedData); + if (receivedData == null) return; + //receivedData = decompress(receivedData); + String market_csv = new String(receivedData, "UTF-8"); + parseCSV(market_csv); + } catch (ZMQException | UnsupportedEncodingException ex) { + LOG.error("Error on get data from EMDN", ex); + } + } + } catch (Exception ex){ + LOG.error("Error on connect to EMDN", ex); + } + } + } } diff --git a/utils/src/main/java/ru/trader/emdn/Market.java b/utils/src/main/java/ru/trader/emdn/Market.java deleted file mode 100644 index 71cf9d9..0000000 --- a/utils/src/main/java/ru/trader/emdn/Market.java +++ /dev/null @@ -1,19 +0,0 @@ -package ru.trader.emdn; - -import java.util.concurrent.ConcurrentHashMap; - -public class Market { - private final ConcurrentHashMap vendors = new ConcurrentHashMap<>(40, 0.9f, 1); - - public Station getVendor(String name){ - return vendors.get(name); - } - - public void addVendor(Station vendor){ - vendors.put(vendor.getName(), vendor); - } - - public void clear(){ - vendors.clear(); - } -} diff --git a/utils/src/test/java/ru/trader/emdn/EMDNTest.java b/utils/src/test/java/ru/trader/emdn/EMDNTest.java index 359ad42..923c099 100644 --- a/utils/src/test/java/ru/trader/emdn/EMDNTest.java +++ b/utils/src/test/java/ru/trader/emdn/EMDNTest.java @@ -20,7 +20,7 @@ public class EMDNTest extends Assert { public void testGetData() throws Exception { // wait submit Thread.sleep(4000); - Station station = markettool.getVendor("Eranin"); + Station station = markettool.get("Eranin"); assertNotNull(station); ItemData itemData = station.getData("cropharvesters"); assertNotNull(itemData);