Archived
0

implement EMDN Receiver

This commit is contained in:
iMoHax
2016-09-16 16:41:28 +03:00
parent 53c7f09b54
commit 4e84beb99b
5 changed files with 147 additions and 125 deletions

View File

@@ -5,7 +5,7 @@ import org.slf4j.LoggerFactory;
import ru.trader.controllers.MainController; import ru.trader.controllers.MainController;
import ru.trader.emdn.EMDN; import ru.trader.emdn.EMDN;
import ru.trader.emdn.ItemData; import ru.trader.emdn.ItemData;
import ru.trader.emdn.Station; import ru.trader.emdn.entities.Station;
import ru.trader.model.MarketModel; import ru.trader.model.MarketModel;
import ru.trader.model.support.StationUpdater; import ru.trader.model.support.StationUpdater;
@@ -21,16 +21,10 @@ public class EMDNUpdater {
private static long interval; private static long interval;
public static void updateFromEMDN(StationUpdater updater){ public static void updateFromEMDN(StationUpdater updater){
Station emdnData = emdn.get(updater.getName());
if (emdnData != null){
update(updater, emdnData);
} else {
LOG.trace("Not found in EMDN");
}
} }
private static void update(StationUpdater updater, Station emdnData){ private static void update(StationUpdater updater, Station emdnData){
LOG.trace("Update {} from EMDN", updater.getName()); /* LOG.trace("Update {} from EMDN", updater.getName());
for (StationUpdater.FakeOffer offer : updater.getOffers()) { for (StationUpdater.FakeOffer offer : updater.getOffers()) {
if (offer.getItem().isMarketItem()){ if (offer.getItem().isMarketItem()){
ItemData data = emdnData.getData(offer.getItem().getId()); ItemData data = emdnData.getData(offer.getItem().getId());
@@ -45,7 +39,7 @@ public class EMDNUpdater {
} else { } else {
LOG.trace("Is not market item, skip"); LOG.trace("Is not market item, skip");
} }
} }*/
} }
static void init(){ static void init(){
@@ -121,9 +115,8 @@ public class EMDNUpdater {
@Override @Override
public void run() { public void run() {
market.getSystemNames().forEach(system -> { /* market.getSystemNames().forEach(system -> {
LOG.trace("Auto update {}", system); LOG.trace("Auto update {}", system);
Station emdnData = emdn.pop(system);
if (emdnData != null){ if (emdnData != null){
//TODO: implement new model //TODO: implement new model
//updater.init(system); //updater.init(system);
@@ -133,7 +126,7 @@ public class EMDNUpdater {
} else { } else {
LOG.trace("Not found in EMDN"); LOG.trace("Not found in EMDN");
} }
}); });*/
} }
} }

View File

@@ -3,48 +3,45 @@ package ru.trader.emdn;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ; import org.zeromq.ZMQ;
import org.zeromq.ZMQException; import ru.trader.emdn.entities.Message;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.zip.DataFormatException; import java.util.function.Consumer;
import java.util.zip.Inflater;
public class EMDN { public class EMDN {
private final static Logger LOG = LoggerFactory.getLogger(EMDN.class); private final static Logger LOG = LoggerFactory.getLogger(EMDN.class);
private final ConcurrentHashMap<String, Station> cache = new ConcurrentHashMap<>(40, 0.9f, 1);
private final ZMQ.Context context; private final ZMQ.Context context;
private ExecutorService executor; private ExecutorService executor;
private String subServer; private Receiver receiver;
private Future<?> receive; private Future<?> receiverFuture;
private boolean clear;
public EMDN(){ public EMDN(){
context = ZMQ.context(1); this("tcp://eddn-relay.elite-markets.net:9500",null);
} }
public EMDN(String subServer, boolean clearOnShutdown) { public EMDN(String subServer, Consumer<Message> consumer) {
this(); context = ZMQ.context(1);
this.subServer = subServer; receiver = new Receiver(context, subServer);
clear = clearOnShutdown; receiver.setConsumer(consumer);
} }
public void start(){ public void start(){
if (isActive()) return; if (isActive()) return;
if (executor == null) executor = Executors.newSingleThreadExecutor(); if (executor == null) {
receive = executor.submit(new Receiver()); executor = Executors.newSingleThreadExecutor();
}
receiverFuture = executor.submit(receiver);
} }
public void stop() { public void stop() {
if (isActive()){ if (isActive()){
LOG.info("Stop EMDN client"); LOG.info("Stop EMDN client");
receive.cancel(false); receiver.cancel();
receive = null; receiverFuture.cancel(false);
if (clear) receiverFuture = null;
cache.clear();
} }
} }
@@ -61,96 +58,16 @@ public class EMDN {
context.term(); context.term();
} }
private byte[] decompress(byte[] input){
byte[] decompressed = new byte[input.length * 16];
int decompressedLength = 0;
Inflater inflater = new Inflater();
try {
inflater.setInput(input);
decompressedLength = inflater.inflate(decompressed);
} catch (DataFormatException e) {
LOG.error("Error on decompress raw data {}", input);
LOG.error("",e);
} finally {
inflater.end();
}
byte[] res = new byte[decompressedLength];
System.arraycopy(decompressed, 0, res, 0, decompressedLength);
return res;
}
public Station get(String name){
return cache.get(name);
}
public Station pop(String name) {
return cache.remove(name);
}
public boolean isActive(){ public boolean isActive(){
return receive!=null; return receiverFuture != null && receiver.isActive();
} }
public void connectTo(String subServer){ public void connectTo(String subServer){
if (subServer.equals(this.subServer)) return; if (subServer.equals(receiver.getServer())) return;
boolean active = isActive(); boolean active = isActive();
if (active) stop(); if (active) stop();
this.subServer = subServer; receiver.setServer(subServer);
if (active) start(); if (active) start();
} }
private class Receiver implements Runnable {
@Override
public void run() {
try (ZMQ.Socket subscriber = context.socket(ZMQ.SUB)){
subscriber.setReceiveTimeOut(10000);
LOG.info("Connect to server {}", subServer);
subscriber.connect(subServer);
LOG.trace("Subscribe");
subscriber.subscribe(new byte[0]);
Station station = null;
while (!executor.isShutdown() && !receive.isCancelled()){
try {
byte[] receivedData = subscriber.recv(0);
LOG.trace("Received data: {}", receivedData);
if (receivedData == null) continue;
//receivedData = decompress(receivedData);
String market_csv = new String(receivedData, "UTF-8");
station = parseCSV(market_csv, station);
if (!subscriber.hasReceiveMore()){
cache.put(station.getName(), station);
station = null;
}
} catch (ZMQException | UnsupportedEncodingException ex) {
LOG.error("Error on get data from EMDN", ex);
}
}
} catch (Exception ex){
LOG.error("Error on connect to EMDN", ex);
}
}
private Station parseCSV(String csv, Station station) {
LOG.debug("Parse csv: {}", csv);
if (!csv.isEmpty()) {
String[] flds = csv.split(",");
// buyPrice,sellPrice,demand,demandLevel,stationStock,stationStockLevel,categoryName,itemName,stationName,timestamp
ItemData item = new ItemData(flds[7]);
item.setBuy(Double.valueOf(flds[0]), Long.valueOf(flds[4]));
item.setSell(Double.valueOf(flds[1]), Long.valueOf(flds[2]));
LOG.trace("Item: {}", item);
String stName = flds[8].split("\\(")[0].trim();
LOG.trace("Station: {}", stName);
if (station == null) {
LOG.trace("Is new, create");
station = new Station(stName);
}
assert stName.equals(station.getName());
station.update(item);
}
return station;
}
}
} }

View File

@@ -0,0 +1,114 @@
package ru.trader.emdn;
import com.fasterxml.jackson.core.JsonParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
import ru.trader.emdn.entities.Message;
import java.io.UnsupportedEncodingException;
import java.util.function.Consumer;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
public class Receiver implements Runnable {
private final static Logger LOG = LoggerFactory.getLogger(Receiver.class);
private final ZMQ.Context context;
private final EMDNParser parser;
private String server;
private boolean run;
private Consumer<Message> consumer;
public Receiver(ZMQ.Context context, String server) {
this.context = context;
this.server = server;
parser = new EMDNParser();
run = false;
consumer = null;
}
public String getServer() {
return server;
}
public void setServer(String server) {
this.server = server;
}
public void cancel(){
run = false;
}
public boolean isActive(){
return run;
}
protected void onMessage(Message message){
if (consumer != null){
consumer.accept(message);
}
}
public void setConsumer(Consumer<Message> consumer) {
this.consumer = consumer;
}
@Override
public void run() {
run = true;
try (ZMQ.Socket subscriber = context.socket(ZMQ.SUB)){
LOG.info("Connect to server {}", server);
subscriber.connect(server);
LOG.trace("Subscribe");
subscriber.subscribe("".getBytes());
StringBuilder builder = new StringBuilder();
ZMQ.Poller poller = new ZMQ.Poller(1);
poller.register(subscriber, ZMQ.Poller.POLLIN);
while (run){
try {
poller.poll(10000);
if (poller.pollin(0)){
byte[] receivedData = subscriber.recv(ZMQ.PAIR);
LOG.trace("Received data: {}", receivedData);
if (receivedData == null) continue;
// receivedData = decompress(receivedData);
String marketJson = new String(receivedData, "UTF-8");
builder.append(marketJson);
if (!subscriber.hasReceiveMore()){
Message message = parser.parse(builder.toString());
LOG.trace("Parsed message: {}", message);
builder.setLength(0);
onMessage(message);
}
}
} catch (ZMQException | JsonParseException | UnsupportedEncodingException ex) {
LOG.error("Error on get data from EMDN", ex);
}
}
} catch (Exception ex){
LOG.error("Error on connect to EMDN", ex);
}
}
private byte[] decompress(byte[] input){
byte[] decompressed = new byte[input.length * 16];
int decompressedLength = 0;
Inflater inflater = new Inflater();
try {
inflater.setInput(input);
decompressedLength = inflater.inflate(decompressed);
} catch (DataFormatException e) {
LOG.error("Error on decompress raw data {}", input);
LOG.error("",e);
} finally {
inflater.end();
}
byte[] res = new byte[decompressedLength];
System.arraycopy(decompressed, 0, res, 0, decompressedLength);
return res;
}
}

View File

@@ -4,9 +4,13 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EMDNTest extends Assert { public class EMDNTest extends Assert {
private final static EMDN markettool = new EMDN("tcp://localhost:9050", true); private final static Logger LOG = LoggerFactory.getLogger(EMDNTest.class);
private final static EMDN markettool = new EMDN("tcp://localhost:9050", (m)->{LOG.debug("Receive message: {}", m);});
private final static EMDNEmul server = new EMDNEmul("tcp://localhost:9050"); private final static EMDNEmul server = new EMDNEmul("tcp://localhost:9050");
@@ -20,14 +24,6 @@ public class EMDNTest extends Assert {
public void testGetData() throws Exception { public void testGetData() throws Exception {
// wait submit // wait submit
Thread.sleep(4000); Thread.sleep(4000);
Station station = markettool.get("Eranin");
assertNotNull(station);
ItemData itemData = station.getData("cropharvesters");
assertNotNull(itemData);
assertEquals(0,itemData.getBuy(), 0.0001);
assertEquals(0,itemData.getStock());
assertEquals(2318,itemData.getSell(), 0.0001);
assertEquals(16472,itemData.getDemand());
} }
@After @After

View File

@@ -7,3 +7,5 @@ log4j.appender.stdout.layout.ConversionPattern=%p: %d{dd.MM.yyyy HH:mm:ss} (%F:%
log4j.logger.ru.trader.edce = DEBUG log4j.logger.ru.trader.edce = DEBUG
log4j.logger.ru.trader.edlog = DEBUG log4j.logger.ru.trader.edlog = DEBUG
log4j.logger.ru.trader.emdn = TRACE