diff --git a/core/src/main/java/ru/trader/analysis/VendorsGraph.java b/core/src/main/java/ru/trader/analysis/VendorsGraph.java index cbca98b..4a34a2e 100644 --- a/core/src/main/java/ru/trader/analysis/VendorsGraph.java +++ b/core/src/main/java/ru/trader/analysis/VendorsGraph.java @@ -8,14 +8,12 @@ import ru.trader.core.*; import java.util.*; import java.util.concurrent.ForkJoinTask; -import java.util.concurrent.RecursiveAction; import java.util.function.Predicate; import java.util.stream.Collectors; public class VendorsGraph extends ConnectibleGraph { private final static Logger LOG = LoggerFactory.getLogger(VendorsGraph.class); - private final static int THRESHOLD = 8; private final Scorer scorer; private final List deferredTasks = new ArrayList<>(); @@ -85,7 +83,6 @@ public class VendorsGraph extends ConnectibleGraph { } private class VendorsGraphBuilder extends ConnectibleGraphBuilder { - private final ArrayList subTasks = new ArrayList<>(THRESHOLD); private final VendorsGraphBuilder head; private final BuildEdge edge; private boolean isAdding; @@ -107,50 +104,25 @@ public class VendorsGraph extends ConnectibleGraph { } @Override - protected void compute() { - LOG.trace("Build graph from {}, limit {}, deep {}", vertex, limit, deep); + protected void build() { if (isAdding){ - addAlreadyCheckedEdges(); - } else { - checkVertex(); - } - if (!subTasks.isEmpty()){ - joinSubTasks(); - } - LOG.trace("End build graph from {} on deep {}", vertex, deep); - } - - private void checkVertex(){ - Iterator iterator = set.iterator(); - while (iterator.hasNext()) { - if (callback.isCancel()) break; - Vendor entry = iterator.next(); - LOG.trace("Check {}", entry); - if (entry == vertex.getEntry()) continue; - double nextLimit = onConnect(entry); - if (nextLimit >= 0) { - LOG.trace("Connect {} to {}", entry, vertex); - Vertex next = getInstance(entry, 0, deep); - BuildEdge e; - if (entry instanceof TransitVendor){ - e = super.createEdge(next); - } else { - e = createEdge(next); - vertex.connect(e); - } - addSubTask(e, nextLimit); + if (!vertex.locker().tryLock()){ + throw new ConcurrentModificationException("Adding must do in single thread"); } else { - LOG.trace("Vertex {} is far away", entry); - } - if (subTasks.size() == THRESHOLD || !iterator.hasNext()){ - joinSubTasks(); + try { + addAlreadyCheckedEdges(); + } finally { + vertex.locker().unlock(); + } } + } else { + super.build(); } } @Override - protected double onConnect(Vendor buyer) { - double nextlimit = super.onConnect(buyer); + protected double checkConnect(Vendor buyer) { + double nextlimit = super.checkConnect(buyer); Vendor seller = vertex.getEntry(); if (nextlimit > 0){ if (buyer instanceof TransitVendor && (deep == 0 || seller.getPlace().equals(buyer.getPlace()))){ @@ -165,6 +137,18 @@ public class VendorsGraph extends ConnectibleGraph { return nextlimit; } + @Override + protected void connect(Vertex next, double nextLimit) { + BuildEdge e; + if (next.getEntry() instanceof TransitVendor){ + e = super.createEdge(next); + } else { + e = createEdge(next); + vertex.connect(e); + } + addSubTask(e, nextLimit); + } + @Override protected VendorsBuildEdge createEdge(Vertex target) { BuildEdge cEdge = super.createEdge(target); @@ -268,9 +252,6 @@ public class VendorsGraph extends ConnectibleGraph { // If level > deep when vertex already added on upper deep if (next.getLevel() < deep || next.getEntry() instanceof TransitVendor) { boolean adding = next.getLevel() >= deep; - if (!adding){ - next.setLevel(vertex.getLevel() - 1); - } if (deep > 0 || adding) { //Recursive build VendorsGraphBuilder task = new VendorsGraphBuilder(this, e, set, deep - 1, nextLimit); @@ -286,18 +267,6 @@ public class VendorsGraph extends ConnectibleGraph { LOG.trace("Vertex {} already check", next); } } - - private void joinSubTasks(){ - for (RecursiveAction subTask : subTasks) { - if (callback.isCancel()){ - subTask.cancel(true); - } else { - subTask.join(); - } - } - subTasks.clear(); - } - } public class VendorsBuildEdge extends BuildEdge { diff --git a/core/src/main/java/ru/trader/analysis/graph/AbstractGraph.java b/core/src/main/java/ru/trader/analysis/graph/AbstractGraph.java index 6b3d457..1e4e094 100644 --- a/core/src/main/java/ru/trader/analysis/graph/AbstractGraph.java +++ b/core/src/main/java/ru/trader/analysis/graph/AbstractGraph.java @@ -11,7 +11,8 @@ import java.util.concurrent.RecursiveAction; public abstract class AbstractGraph implements Graph { private final static ForkJoinPool POOL = new ForkJoinPool(); - private final static int THRESHOLD = 4; + //TODO: make it worked in multi thread + private final static int THRESHOLD = 1; private final static Logger LOG = LoggerFactory.getLogger(AbstractGraph.class); @@ -82,6 +83,11 @@ public abstract class AbstractGraph implements Graph { return root; } + @Override + public Collection> vertexes() { + return vertexes; + } + @Override public int getMinJumps() { return minJumps; @@ -98,6 +104,7 @@ public abstract class AbstractGraph implements Graph { } protected abstract class GraphBuilder extends RecursiveAction { + protected final List subTasks = new ArrayList<>(THRESHOLD); protected final Vertex vertex; protected final Collection set; protected final int deep; @@ -110,61 +117,70 @@ public abstract class AbstractGraph implements Graph { this.limit = limit; } - protected abstract double onConnect(T entry); + protected abstract double checkConnect(T entry); protected abstract Edge createEdge(Vertex target); protected RecursiveAction createSubTask(Vertex vertex, Collection set, int deep, double limit){ return createGraphBuilder(vertex, set, deep, limit); } @Override - protected void compute() { + protected final void compute() { + vertex.locker().lock(); + try { + if (vertex.getLevel() <= deep){ + vertex.setLevel(deep+1); + } + } finally { + vertex.locker().unlock(); + } + build(); + } + + protected void build(){ LOG.trace("Build graph from {}, limit {}, deep {}", vertex, limit, deep); - ArrayList subTasks = new ArrayList<>(THRESHOLD); - Iterator iterator = set.iterator(); - while (iterator.hasNext()) { + for (T entry : set) { if (callback.isCancel()) break; - T entry = iterator.next(); if (entry == vertex.getEntry()) continue; - double nextLimit = onConnect(entry); + double nextLimit = checkConnect(entry); if (nextLimit >= 0) { LOG.trace("Connect {} to {}", vertex, entry); Vertex next = getInstance(entry, 0, deep); - vertex.connect(createEdge(next)); - // If level > deep when vertex already added on upper deep - if (next.getLevel() < deep) { - next.setLevel(vertex.getLevel() - 1); - if (deep > 0) { - //Recursive build - RecursiveAction task = createSubTask(next, set, deep - 1, nextLimit); - task.fork(); - subTasks.add(task); - } - } + connect(next, nextLimit); } else { LOG.trace("Vertex {} is far away", entry); } - if (subTasks.size() == THRESHOLD || !iterator.hasNext()){ - for (RecursiveAction subTask : subTasks) { - if (callback.isCancel()){ - subTask.cancel(true); - } else { - subTask.join(); - } - } - subTasks.clear(); + if (subTasks.size() >= THRESHOLD) { + joinSubTasks(); } } if (!subTasks.isEmpty()){ - for (RecursiveAction subTask : subTasks) { - if (callback.isCancel()){ - subTask.cancel(true); - } else { - subTask.join(); - } - } - subTasks.clear(); + joinSubTasks(); } LOG.trace("End build graph from {} on deep {}", vertex, deep); } + + protected void connect(Vertex next, double nextLimit){ + vertex.connect(createEdge(next)); + if (next.getLevel() < deep) { + if (deep > 0) { + //Recursive build + RecursiveAction task = createSubTask(next, set, deep - 1, nextLimit); + task.fork(); + subTasks.add(task); + } + } + } + + protected void joinSubTasks(){ + for (RecursiveAction subTask : subTasks) { + if (callback.isCancel()){ + subTask.cancel(true); + } else { + subTask.join(); + } + } + subTasks.clear(); + } + } } diff --git a/core/src/main/java/ru/trader/analysis/graph/ConnectibleGraph.java b/core/src/main/java/ru/trader/analysis/graph/ConnectibleGraph.java index 9e44570..558016a 100644 --- a/core/src/main/java/ru/trader/analysis/graph/ConnectibleGraph.java +++ b/core/src/main/java/ru/trader/analysis/graph/ConnectibleGraph.java @@ -51,7 +51,7 @@ public class ConnectibleGraph> extends AbstractGraph } @Override - protected double onConnect(T entry) { + protected double checkConnect(T entry) { distance = vertex.getEntry().getDistance(entry); if (distance > getShip().getMaxJumpRange()){ LOG.trace("Vertex {} is far away, {}", entry, distance); diff --git a/core/src/main/java/ru/trader/analysis/graph/Graph.java b/core/src/main/java/ru/trader/analysis/graph/Graph.java index 4f94b28..b500cd5 100644 --- a/core/src/main/java/ru/trader/analysis/graph/Graph.java +++ b/core/src/main/java/ru/trader/analysis/graph/Graph.java @@ -1,5 +1,6 @@ package ru.trader.analysis.graph; +import java.util.Collection; import java.util.Optional; public interface Graph { @@ -9,6 +10,8 @@ public interface Graph { Vertex getRoot(); + Collection> vertexes(); + int getMinJumps(); int getMinLevel(); diff --git a/core/src/main/java/ru/trader/analysis/graph/Vertex.java b/core/src/main/java/ru/trader/analysis/graph/Vertex.java index 29f723c..2134eb1 100644 --- a/core/src/main/java/ru/trader/analysis/graph/Vertex.java +++ b/core/src/main/java/ru/trader/analysis/graph/Vertex.java @@ -1,14 +1,15 @@ package ru.trader.analysis.graph; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; import java.util.Optional; +import java.util.concurrent.locks.ReentrantLock; public class Vertex { private final ArrayList> edges = new ArrayList<>(); private final T entry; private final int index; + private final ReentrantLock lock = new ReentrantLock(); private volatile int level = -1; public Vertex(T entry, int index) { @@ -36,6 +37,10 @@ public class Vertex { return level; } + public ReentrantLock locker(){ + return lock; + } + public void connect(Edge edge){ assert this == edge.getSource(); synchronized (edges){