From 4e84beb99b49d1d074c554ea11bc9345212fb105 Mon Sep 17 00:00:00 2001 From: iMoHax Date: Fri, 16 Sep 2016 16:41:28 +0300 Subject: [PATCH] implement EMDN Receiver --- .../src/main/java/ru/trader/EMDNUpdater.java | 17 +-- utils/src/main/java/ru/trader/emdn/EMDN.java | 123 +++--------------- .../main/java/ru/trader/emdn/Receiver.java | 114 ++++++++++++++++ .../test/java/ru/trader/emdn/EMDNTest.java | 14 +- utils/src/test/resources/log4j.properties | 4 +- 5 files changed, 147 insertions(+), 125 deletions(-) create mode 100644 utils/src/main/java/ru/trader/emdn/Receiver.java diff --git a/client/src/main/java/ru/trader/EMDNUpdater.java b/client/src/main/java/ru/trader/EMDNUpdater.java index 4fee2d2..e0f491b 100644 --- a/client/src/main/java/ru/trader/EMDNUpdater.java +++ b/client/src/main/java/ru/trader/EMDNUpdater.java @@ -5,7 +5,7 @@ import org.slf4j.LoggerFactory; import ru.trader.controllers.MainController; import ru.trader.emdn.EMDN; import ru.trader.emdn.ItemData; -import ru.trader.emdn.Station; +import ru.trader.emdn.entities.Station; import ru.trader.model.MarketModel; import ru.trader.model.support.StationUpdater; @@ -21,16 +21,10 @@ public class EMDNUpdater { private static long interval; public static void updateFromEMDN(StationUpdater updater){ - Station emdnData = emdn.get(updater.getName()); - if (emdnData != null){ - update(updater, emdnData); - } else { - LOG.trace("Not found in EMDN"); - } } private static void update(StationUpdater updater, Station emdnData){ - LOG.trace("Update {} from EMDN", updater.getName()); +/* LOG.trace("Update {} from EMDN", updater.getName()); for (StationUpdater.FakeOffer offer : updater.getOffers()) { if (offer.getItem().isMarketItem()){ ItemData data = emdnData.getData(offer.getItem().getId()); @@ -45,7 +39,7 @@ public class EMDNUpdater { } else { LOG.trace("Is not market item, skip"); } - } + }*/ } static void init(){ @@ -121,9 +115,8 @@ public class EMDNUpdater { @Override public void run() { - market.getSystemNames().forEach(system -> { +/* market.getSystemNames().forEach(system -> { LOG.trace("Auto update {}", system); - Station emdnData = emdn.pop(system); if (emdnData != null){ //TODO: implement new model //updater.init(system); @@ -133,7 +126,7 @@ public class EMDNUpdater { } else { LOG.trace("Not found in EMDN"); } - }); + });*/ } } diff --git a/utils/src/main/java/ru/trader/emdn/EMDN.java b/utils/src/main/java/ru/trader/emdn/EMDN.java index 7b0416a..153996a 100644 --- a/utils/src/main/java/ru/trader/emdn/EMDN.java +++ b/utils/src/main/java/ru/trader/emdn/EMDN.java @@ -3,48 +3,45 @@ package ru.trader.emdn; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zeromq.ZMQ; -import org.zeromq.ZMQException; +import ru.trader.emdn.entities.Message; -import java.io.UnsupportedEncodingException; import java.util.concurrent.*; -import java.util.zip.DataFormatException; -import java.util.zip.Inflater; +import java.util.function.Consumer; public class EMDN { private final static Logger LOG = LoggerFactory.getLogger(EMDN.class); - private final ConcurrentHashMap cache = new ConcurrentHashMap<>(40, 0.9f, 1); private final ZMQ.Context context; private ExecutorService executor; - private String subServer; - private Future receive; - private boolean clear; + private Receiver receiver; + private Future receiverFuture; - public EMDN() { - context = ZMQ.context(1); + public EMDN(){ + this("tcp://eddn-relay.elite-markets.net:9500",null); } - public EMDN(String subServer, boolean clearOnShutdown) { - this(); - this.subServer = subServer; - clear = clearOnShutdown; + public EMDN(String subServer, Consumer consumer) { + context = ZMQ.context(1); + receiver = new Receiver(context, subServer); + receiver.setConsumer(consumer); } public void start(){ if (isActive()) return; - if (executor == null) executor = Executors.newSingleThreadExecutor(); - receive = executor.submit(new Receiver()); + if (executor == null) { + executor = Executors.newSingleThreadExecutor(); + } + receiverFuture = executor.submit(receiver); } public void stop() { if (isActive()){ LOG.info("Stop EMDN client"); - receive.cancel(false); - receive = null; - if (clear) - cache.clear(); + receiver.cancel(); + receiverFuture.cancel(false); + receiverFuture = null; } } @@ -61,96 +58,16 @@ public class EMDN { context.term(); } - private byte[] decompress(byte[] input){ - byte[] decompressed = new byte[input.length * 16]; - int decompressedLength = 0; - Inflater inflater = new Inflater(); - try { - inflater.setInput(input); - decompressedLength = inflater.inflate(decompressed); - } catch (DataFormatException e) { - LOG.error("Error on decompress raw data {}", input); - LOG.error("",e); - } finally { - inflater.end(); - } - byte[] res = new byte[decompressedLength]; - System.arraycopy(decompressed, 0, res, 0, decompressedLength); - return res; - } - - public Station get(String name){ - return cache.get(name); - } - - public Station pop(String name) { - return cache.remove(name); - } - public boolean isActive(){ - return receive!=null; + return receiverFuture != null && receiver.isActive(); } public void connectTo(String subServer){ - if (subServer.equals(this.subServer)) return; + if (subServer.equals(receiver.getServer())) return; boolean active = isActive(); if (active) stop(); - this.subServer = subServer; + receiver.setServer(subServer); if (active) start(); } - private class Receiver implements Runnable { - - @Override - public void run() { - try (ZMQ.Socket subscriber = context.socket(ZMQ.SUB)){ - subscriber.setReceiveTimeOut(10000); - LOG.info("Connect to server {}", subServer); - subscriber.connect(subServer); - LOG.trace("Subscribe"); - subscriber.subscribe(new byte[0]); - Station station = null; - while (!executor.isShutdown() && !receive.isCancelled()){ - try { - byte[] receivedData = subscriber.recv(0); - LOG.trace("Received data: {}", receivedData); - if (receivedData == null) continue; - //receivedData = decompress(receivedData); - String market_csv = new String(receivedData, "UTF-8"); - station = parseCSV(market_csv, station); - if (!subscriber.hasReceiveMore()){ - cache.put(station.getName(), station); - station = null; - } - } catch (ZMQException | UnsupportedEncodingException ex) { - LOG.error("Error on get data from EMDN", ex); - } - } - } catch (Exception ex){ - LOG.error("Error on connect to EMDN", ex); - } - } - - private Station parseCSV(String csv, Station station) { - LOG.debug("Parse csv: {}", csv); - if (!csv.isEmpty()) { - String[] flds = csv.split(","); - // buyPrice,sellPrice,demand,demandLevel,stationStock,stationStockLevel,categoryName,itemName,stationName,timestamp - ItemData item = new ItemData(flds[7]); - item.setBuy(Double.valueOf(flds[0]), Long.valueOf(flds[4])); - item.setSell(Double.valueOf(flds[1]), Long.valueOf(flds[2])); - LOG.trace("Item: {}", item); - String stName = flds[8].split("\\(")[0].trim(); - LOG.trace("Station: {}", stName); - if (station == null) { - LOG.trace("Is new, create"); - station = new Station(stName); - } - assert stName.equals(station.getName()); - station.update(item); - } - return station; - } - - } } diff --git a/utils/src/main/java/ru/trader/emdn/Receiver.java b/utils/src/main/java/ru/trader/emdn/Receiver.java new file mode 100644 index 0000000..feed401 --- /dev/null +++ b/utils/src/main/java/ru/trader/emdn/Receiver.java @@ -0,0 +1,114 @@ +package ru.trader.emdn; + +import com.fasterxml.jackson.core.JsonParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.zeromq.ZMQ; +import org.zeromq.ZMQException; +import ru.trader.emdn.entities.Message; + +import java.io.UnsupportedEncodingException; +import java.util.function.Consumer; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; + +public class Receiver implements Runnable { + private final static Logger LOG = LoggerFactory.getLogger(Receiver.class); + private final ZMQ.Context context; + private final EMDNParser parser; + private String server; + private boolean run; + private Consumer consumer; + + public Receiver(ZMQ.Context context, String server) { + this.context = context; + this.server = server; + parser = new EMDNParser(); + run = false; + consumer = null; + } + + public String getServer() { + return server; + } + + public void setServer(String server) { + this.server = server; + } + + public void cancel(){ + run = false; + } + + public boolean isActive(){ + return run; + } + + protected void onMessage(Message message){ + if (consumer != null){ + consumer.accept(message); + } + } + + public void setConsumer(Consumer consumer) { + this.consumer = consumer; + } + + @Override + public void run() { + run = true; + try (ZMQ.Socket subscriber = context.socket(ZMQ.SUB)){ + LOG.info("Connect to server {}", server); + subscriber.connect(server); + LOG.trace("Subscribe"); + subscriber.subscribe("".getBytes()); + StringBuilder builder = new StringBuilder(); + ZMQ.Poller poller = new ZMQ.Poller(1); + poller.register(subscriber, ZMQ.Poller.POLLIN); + + while (run){ + try { + poller.poll(10000); + if (poller.pollin(0)){ + byte[] receivedData = subscriber.recv(ZMQ.PAIR); + LOG.trace("Received data: {}", receivedData); + if (receivedData == null) continue; + // receivedData = decompress(receivedData); + String marketJson = new String(receivedData, "UTF-8"); + builder.append(marketJson); + if (!subscriber.hasReceiveMore()){ + Message message = parser.parse(builder.toString()); + LOG.trace("Parsed message: {}", message); + builder.setLength(0); + onMessage(message); + } + } + } catch (ZMQException | JsonParseException | UnsupportedEncodingException ex) { + LOG.error("Error on get data from EMDN", ex); + } + } + } catch (Exception ex){ + LOG.error("Error on connect to EMDN", ex); + } + } + + + private byte[] decompress(byte[] input){ + byte[] decompressed = new byte[input.length * 16]; + int decompressedLength = 0; + Inflater inflater = new Inflater(); + try { + inflater.setInput(input); + decompressedLength = inflater.inflate(decompressed); + } catch (DataFormatException e) { + LOG.error("Error on decompress raw data {}", input); + LOG.error("",e); + } finally { + inflater.end(); + } + byte[] res = new byte[decompressedLength]; + System.arraycopy(decompressed, 0, res, 0, decompressedLength); + return res; + } + +} diff --git a/utils/src/test/java/ru/trader/emdn/EMDNTest.java b/utils/src/test/java/ru/trader/emdn/EMDNTest.java index 923c099..cfadb2d 100644 --- a/utils/src/test/java/ru/trader/emdn/EMDNTest.java +++ b/utils/src/test/java/ru/trader/emdn/EMDNTest.java @@ -4,9 +4,13 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class EMDNTest extends Assert { - private final static EMDN markettool = new EMDN("tcp://localhost:9050", true); + private final static Logger LOG = LoggerFactory.getLogger(EMDNTest.class); + + private final static EMDN markettool = new EMDN("tcp://localhost:9050", (m)->{LOG.debug("Receive message: {}", m);}); private final static EMDNEmul server = new EMDNEmul("tcp://localhost:9050"); @@ -20,14 +24,6 @@ public class EMDNTest extends Assert { public void testGetData() throws Exception { // wait submit Thread.sleep(4000); - Station station = markettool.get("Eranin"); - assertNotNull(station); - ItemData itemData = station.getData("cropharvesters"); - assertNotNull(itemData); - assertEquals(0,itemData.getBuy(), 0.0001); - assertEquals(0,itemData.getStock()); - assertEquals(2318,itemData.getSell(), 0.0001); - assertEquals(16472,itemData.getDemand()); } @After diff --git a/utils/src/test/resources/log4j.properties b/utils/src/test/resources/log4j.properties index 0231244..a72d80f 100644 --- a/utils/src/test/resources/log4j.properties +++ b/utils/src/test/resources/log4j.properties @@ -6,4 +6,6 @@ log4j.appender.stdout.layout.ConversionPattern=%p: %d{dd.MM.yyyy HH:mm:ss} (%F:% log4j.logger.ru.trader.edce = DEBUG -log4j.logger.ru.trader.edlog = DEBUG \ No newline at end of file +log4j.logger.ru.trader.edlog = DEBUG + +log4j.logger.ru.trader.emdn = TRACE \ No newline at end of file