Archived
0

add update from EMDN button

This commit is contained in:
iMoHax
2014-08-28 17:21:37 +04:00
parent f426a2d34e
commit 98c6df4546
12 changed files with 396 additions and 109 deletions

View File

@@ -6,6 +6,9 @@ import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
@@ -13,35 +16,80 @@ public class EMDN {
private final static Logger LOG = LoggerFactory.getLogger(EMDN.class);
private final String subServer;
private final Market cache = new Market();
private ZMQ.Context context = null;
private ZMQ.Socket subscriber = null;
private ScheduledExecutorService executor;
private boolean clear;
public EMDN(String subServer) {
public EMDN(String subServer, boolean clearOnShutdown) {
this.subServer = subServer;
clear = clearOnShutdown;
}
private void init(){
context = ZMQ.context(1);
subscriber = context.socket(ZMQ.SUB);
}
public void start(){
if (subscriber!=null) shutdown();
init();
LOG.info("Connect to server {}", subServer);
subscriber.connect(subServer);
LOG.trace("Subscribe");
subscriber.subscribe(new byte[0]);
executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleWithFixedDelay(() -> {
try {
byte[] receivedData = subscriber.recv(0);
LOG.trace("Received data: {}", receivedData);
if (receivedData == null) return;
//receivedData = decompress(receivedData);
String market_csv = new String(receivedData, "UTF-8");
parseCSV(market_csv);
} catch (ZMQException | UnsupportedEncodingException ex) {
if (!executor.isShutdown())
LOG.error("Error on get data from EMDN", ex);
}
}, 0, 1, TimeUnit.MILLISECONDS);
}
public void shutdown() {
if (subscriber!=null){
LOG.info("Shutdown EMDN client");
executor.shutdown();
subscriber.close();
context.term();
}
subscriber = null;
context = null;
if (clear)
cache.clear();
}
public void getData(){
ZMQ.Context context = ZMQ.context(1);
try (ZMQ.Socket socket = context.socket(ZMQ.SUB)) {
socket.setReceiveTimeOut(10000);
LOG.debug("Connect to server {}", subServer);
socket.connect(subServer);
LOG.trace("Subscribe");
socket.subscribe(new byte[0]);
for (int i = 0; i < 5; i++) {
try {
byte[] receivedData = socket.recv(0);
LOG.trace("Recived data: {}", receivedData);
if (receivedData == null) continue;
//receivedData = decompress(receivedData);
String market_json = new String(receivedData, "UTF-8");
LOG.trace("JSON: {}", market_json);
} catch (ZMQException | UnsupportedEncodingException ex) {
LOG.error("Error on get data from EMDN", ex);
}
}
private void parseCSV(String csv) {
LOG.debug("Parse csv: {}", csv);
if (csv.isEmpty()) return;
String[] flds = csv.split(",");
// buyPrice,sellPrice,demand,demandLevel,stationStock,stationStockLevel,categoryName,itemName,stationName,timestamp
ItemData item = new ItemData(flds[7]);
item.setBuy(Double.valueOf(flds[0]), Long.valueOf(flds[4]));
item.setSell(Double.valueOf(flds[1]), Long.valueOf(flds[2]));
LOG.trace("Item: {}", item);
String stName = flds[8].split("\\(")[0].trim();
LOG.trace("Station: {}", stName);
Station station = cache.getVendor(stName);
if (station != null){
LOG.trace("Is old, update");
station.update(item);
} else {
LOG.trace("Is new, create");
station = new Station(stName);
station.update(item);
cache.addVendor(station);
}
context.term();
}
private byte[] decompress(byte[] input){
@@ -61,4 +109,9 @@ public class EMDN {
System.arraycopy(decompressed, 0, res, 0, decompressedLength);
return res;
}
public Station getVendor(String name){
return cache.getVendor(name);
}
}

View File

@@ -0,0 +1,69 @@
package ru.trader.emdn;
public class ItemData {
private final String name;
private double buy;
private double sell;
private long demand;
private long stock;
public ItemData(String name) {
this.name = name;
}
public void setSell(double price, long count){
sell = price;
demand = count;
}
public void setBuy(double price, long count){
buy = price;
stock = count;
}
public double getBuy() {
return buy;
}
public double getSell() {
return sell;
}
public long getDemand() {
return demand;
}
public long getStock() {
return stock;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ItemData)) return false;
ItemData itemData = (ItemData) o;
return !(name != null ? !name.equals(itemData.name) : itemData.name != null);
}
@Override
public int hashCode() {
return name != null ? name.hashCode() : 0;
}
public String getName() {
return name;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("{");
sb.append(name);
sb.append(" buy=").append(buy);
sb.append(" (").append(stock).append(")");
sb.append(" sell=").append(sell);
sb.append(" (").append(demand).append(")");
sb.append('}');
return sb.toString();
}
}

View File

@@ -0,0 +1,19 @@
package ru.trader.emdn;
import java.util.concurrent.ConcurrentHashMap;
public class Market {
private final ConcurrentHashMap<String, Station> vendors = new ConcurrentHashMap<>(40, 0.9f, 1);
public Station getVendor(String name){
return vendors.get(name);
}
public void addVendor(Station vendor){
vendors.put(vendor.getName(), vendor);
}
public void clear(){
vendors.clear();
}
}

View File

@@ -0,0 +1,37 @@
package ru.trader.emdn;
import java.util.concurrent.ConcurrentHashMap;
public class Station {
private final String name;
private final ConcurrentHashMap<String, ItemData> items = new ConcurrentHashMap<>(15, 0.9f, 1);
public Station(String name) {
this.name = name;
}
public ItemData getData(String name){
return items.get(name);
}
void update(ItemData item){
items.put(item.getName(), item);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Station)) return false;
Station station = (Station) o;
return name.equals(station.name);
}
@Override
public int hashCode() {
return name.hashCode();
}
public String getName() {
return name;
}
}