Archived
0

fix broken receive procedure

This commit is contained in:
iMoHax
2014-09-04 20:23:34 +04:00
parent 1e8dce9477
commit df73bdf764

View File

@@ -61,29 +61,6 @@ public class EMDN {
context.term();
}
private void parseCSV(String csv) {
LOG.debug("Parse csv: {}", csv);
if (csv.isEmpty()) return;
String[] flds = csv.split(",");
// buyPrice,sellPrice,demand,demandLevel,stationStock,stationStockLevel,categoryName,itemName,stationName,timestamp
ItemData item = new ItemData(flds[7]);
item.setBuy(Double.valueOf(flds[0]), Long.valueOf(flds[4]));
item.setSell(Double.valueOf(flds[1]), Long.valueOf(flds[2]));
LOG.trace("Item: {}", item);
String stName = flds[8].split("\\(")[0].trim();
LOG.trace("Station: {}", stName);
Station station = cache.get(stName);
if (station != null){
LOG.trace("Is old, update");
station.update(item);
} else {
LOG.trace("Is new, create");
station = new Station(stName);
station.update(item);
cache.put(stName, station);
}
}
private byte[] decompress(byte[] input){
byte[] decompressed = new byte[input.length * 16];
int decompressedLength = 0;
@@ -132,14 +109,19 @@ public class EMDN {
subscriber.connect(subServer);
LOG.trace("Subscribe");
subscriber.subscribe(new byte[0]);
Station station = null;
while (!executor.isShutdown() && !receive.isCancelled()){
try {
byte[] receivedData = subscriber.recv(0);
LOG.trace("Received data: {}", receivedData);
if (receivedData == null) return;
if (receivedData == null) continue;
//receivedData = decompress(receivedData);
String market_csv = new String(receivedData, "UTF-8");
parseCSV(market_csv);
station = parseCSV(market_csv, station);
if (!subscriber.hasReceiveMore()){
cache.put(station.getName(), station);
station = null;
}
} catch (ZMQException | UnsupportedEncodingException ex) {
LOG.error("Error on get data from EMDN", ex);
}
@@ -148,5 +130,27 @@ public class EMDN {
LOG.error("Error on connect to EMDN", ex);
}
}
private Station parseCSV(String csv, Station station) {
LOG.debug("Parse csv: {}", csv);
if (!csv.isEmpty()) {
String[] flds = csv.split(",");
// buyPrice,sellPrice,demand,demandLevel,stationStock,stationStockLevel,categoryName,itemName,stationName,timestamp
ItemData item = new ItemData(flds[7]);
item.setBuy(Double.valueOf(flds[0]), Long.valueOf(flds[4]));
item.setSell(Double.valueOf(flds[1]), Long.valueOf(flds[2]));
LOG.trace("Item: {}", item);
String stName = flds[8].split("\\(")[0].trim();
LOG.trace("Station: {}", stName);
if (station == null) {
LOG.trace("Is new, create");
station = new Station(stName);
}
assert stName.equals(station.getName());
station.update(item);
}
return station;
}
}
}