diff --git a/utils/pom.xml b/utils/pom.xml index be0fc9d..9d6ff23 100644 --- a/utils/pom.xml +++ b/utils/pom.xml @@ -26,6 +26,16 @@ org.slf4j slf4j-api + + org.slf4j + slf4j-log4j12 + test + + + log4j + log4j + test + org.apache.poi poi @@ -38,6 +48,20 @@ org.apache.poi poi-ooxml-schemas + + junit + junit + + + org.zeromq + jeromq + 0.3.4 + + + org.zeromq + jzmq + 3.1.0 + diff --git a/utils/src/main/java/ru/trader/emdn/EMDN.java b/utils/src/main/java/ru/trader/emdn/EMDN.java new file mode 100644 index 0000000..d2f687d --- /dev/null +++ b/utils/src/main/java/ru/trader/emdn/EMDN.java @@ -0,0 +1,63 @@ +package ru.trader.emdn; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.zeromq.ZMQ; +import org.zeromq.ZMQException; + +import java.io.UnsupportedEncodingException; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; + +public class EMDN { + private final static Logger LOG = LoggerFactory.getLogger(EMDN.class); + + private final String subServer; + + public EMDN(String subServer) { + this.subServer = subServer; + } + + + public void getData(){ + ZMQ.Context context = ZMQ.context(1); + try (ZMQ.Socket socket = context.socket(ZMQ.SUB)) { + socket.setReceiveTimeOut(10000); + LOG.debug("Connect to server {}", subServer); + socket.connect(subServer); + + LOG.trace("Subscribe"); + socket.subscribe(new byte[0]); + for (int i = 0; i < 5; i++) { + try { + byte[] receivedData = socket.recv(0); + LOG.trace("Recived data: {}", receivedData); + if (receivedData == null) continue; + String market_json = new String(decompress(receivedData), "UTF-8"); + LOG.trace("JSON: {}", market_json); + } catch (ZMQException | UnsupportedEncodingException ex) { + LOG.error("Error on get data from EMDN", ex); + } + } + } + context.term(); + } + + private byte[] decompress(byte[] input){ + byte[] decompressed = new byte[input.length * 16]; + int decompressedLength = 0; + Inflater inflater = new Inflater(); + try { + inflater.setInput(input); + decompressedLength = inflater.inflate(decompressed); + } catch (DataFormatException e) { + LOG.error("Error on decompress raw data {}", input); + LOG.error("",e); + } finally { + inflater.end(); + } + byte[] res = new byte[decompressedLength]; + System.arraycopy(decompressed, 0, res, 0, decompressedLength); + return res; + } +} diff --git a/utils/src/test/java/ru/trader/emdn/EMDNTest.java b/utils/src/test/java/ru/trader/emdn/EMDNTest.java new file mode 100644 index 0000000..06d1727 --- /dev/null +++ b/utils/src/test/java/ru/trader/emdn/EMDNTest.java @@ -0,0 +1,14 @@ +package ru.trader.emdn; + +import org.junit.Assert; +import org.junit.Test; + +public class EMDNTest extends Assert { + private final static EMDN markettool = new EMDN("tcp://firehose.elite-market-data.net:9050"); + + @Test + public void testGetData() throws Exception { + markettool.getData(); + + } +} diff --git a/utils/src/test/resources/log4j.properties b/utils/src/test/resources/log4j.properties new file mode 100644 index 0000000..6e9f687 --- /dev/null +++ b/utils/src/test/resources/log4j.properties @@ -0,0 +1,9 @@ +log4j.rootLogger = INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%p: %d{dd.MM.yyyy HH:mm:ss} (%F:%L) - %m%n + +log4j.logger.ru.trader.emdn.EMDN = TRACE + +log4j.logger.org.zeromq = TRACE \ No newline at end of file