diff --git a/utils/src/main/java/ru/trader/emdn/EMDN.java b/utils/src/main/java/ru/trader/emdn/EMDN.java index 46e6949..7b0416a 100644 --- a/utils/src/main/java/ru/trader/emdn/EMDN.java +++ b/utils/src/main/java/ru/trader/emdn/EMDN.java @@ -61,29 +61,6 @@ public class EMDN { context.term(); } - private void parseCSV(String csv) { - LOG.debug("Parse csv: {}", csv); - if (csv.isEmpty()) return; - String[] flds = csv.split(","); - // buyPrice,sellPrice,demand,demandLevel,stationStock,stationStockLevel,categoryName,itemName,stationName,timestamp - ItemData item = new ItemData(flds[7]); - item.setBuy(Double.valueOf(flds[0]), Long.valueOf(flds[4])); - item.setSell(Double.valueOf(flds[1]), Long.valueOf(flds[2])); - LOG.trace("Item: {}", item); - String stName = flds[8].split("\\(")[0].trim(); - LOG.trace("Station: {}", stName); - Station station = cache.get(stName); - if (station != null){ - LOG.trace("Is old, update"); - station.update(item); - } else { - LOG.trace("Is new, create"); - station = new Station(stName); - station.update(item); - cache.put(stName, station); - } - } - private byte[] decompress(byte[] input){ byte[] decompressed = new byte[input.length * 16]; int decompressedLength = 0; @@ -132,14 +109,19 @@ public class EMDN { subscriber.connect(subServer); LOG.trace("Subscribe"); subscriber.subscribe(new byte[0]); + Station station = null; while (!executor.isShutdown() && !receive.isCancelled()){ try { byte[] receivedData = subscriber.recv(0); LOG.trace("Received data: {}", receivedData); - if (receivedData == null) return; + if (receivedData == null) continue; //receivedData = decompress(receivedData); String market_csv = new String(receivedData, "UTF-8"); - parseCSV(market_csv); + station = parseCSV(market_csv, station); + if (!subscriber.hasReceiveMore()){ + cache.put(station.getName(), station); + station = null; + } } catch (ZMQException | UnsupportedEncodingException ex) { LOG.error("Error on get data from EMDN", ex); } @@ -148,5 +130,27 @@ public class EMDN { LOG.error("Error on connect to EMDN", ex); } } + + private Station parseCSV(String csv, Station station) { + LOG.debug("Parse csv: {}", csv); + if (!csv.isEmpty()) { + String[] flds = csv.split(","); + // buyPrice,sellPrice,demand,demandLevel,stationStock,stationStockLevel,categoryName,itemName,stationName,timestamp + ItemData item = new ItemData(flds[7]); + item.setBuy(Double.valueOf(flds[0]), Long.valueOf(flds[4])); + item.setSell(Double.valueOf(flds[1]), Long.valueOf(flds[2])); + LOG.trace("Item: {}", item); + String stName = flds[8].split("\\(")[0].trim(); + LOG.trace("Station: {}", stName); + if (station == null) { + LOG.trace("Is new, create"); + station = new Station(stName); + } + assert stName.equals(station.getName()); + station.update(item); + } + return station; + } + } }