Archived
0

disable multi threading in graph builder

This commit is contained in:
iMoHax
2015-07-27 17:50:18 +03:00
parent 7f0b90e19e
commit 011158edf1
5 changed files with 86 additions and 93 deletions

View File

@@ -8,14 +8,12 @@ import ru.trader.core.*;
import java.util.*;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class VendorsGraph extends ConnectibleGraph<Vendor> {
private final static Logger LOG = LoggerFactory.getLogger(VendorsGraph.class);
private final static int THRESHOLD = 8;
private final Scorer scorer;
private final List<VendorsGraphBuilder> deferredTasks = new ArrayList<>();
@@ -85,7 +83,6 @@ public class VendorsGraph extends ConnectibleGraph<Vendor> {
}
private class VendorsGraphBuilder extends ConnectibleGraphBuilder {
private final ArrayList<RecursiveAction> subTasks = new ArrayList<>(THRESHOLD);
private final VendorsGraphBuilder head;
private final BuildEdge edge;
private boolean isAdding;
@@ -107,50 +104,25 @@ public class VendorsGraph extends ConnectibleGraph<Vendor> {
}
@Override
protected void compute() {
LOG.trace("Build graph from {}, limit {}, deep {}", vertex, limit, deep);
protected void build() {
if (isAdding){
if (!vertex.locker().tryLock()){
throw new ConcurrentModificationException("Adding must do in single thread");
} else {
try {
addAlreadyCheckedEdges();
} finally {
vertex.locker().unlock();
}
}
} else {
checkVertex();
}
if (!subTasks.isEmpty()){
joinSubTasks();
}
LOG.trace("End build graph from {} on deep {}", vertex, deep);
}
private void checkVertex(){
Iterator<Vendor> iterator = set.iterator();
while (iterator.hasNext()) {
if (callback.isCancel()) break;
Vendor entry = iterator.next();
LOG.trace("Check {}", entry);
if (entry == vertex.getEntry()) continue;
double nextLimit = onConnect(entry);
if (nextLimit >= 0) {
LOG.trace("Connect {} to {}", entry, vertex);
Vertex<Vendor> next = getInstance(entry, 0, deep);
BuildEdge e;
if (entry instanceof TransitVendor){
e = super.createEdge(next);
} else {
e = createEdge(next);
vertex.connect(e);
}
addSubTask(e, nextLimit);
} else {
LOG.trace("Vertex {} is far away", entry);
}
if (subTasks.size() == THRESHOLD || !iterator.hasNext()){
joinSubTasks();
}
super.build();
}
}
@Override
protected double onConnect(Vendor buyer) {
double nextlimit = super.onConnect(buyer);
protected double checkConnect(Vendor buyer) {
double nextlimit = super.checkConnect(buyer);
Vendor seller = vertex.getEntry();
if (nextlimit > 0){
if (buyer instanceof TransitVendor && (deep == 0 || seller.getPlace().equals(buyer.getPlace()))){
@@ -165,6 +137,18 @@ public class VendorsGraph extends ConnectibleGraph<Vendor> {
return nextlimit;
}
@Override
protected void connect(Vertex<Vendor> next, double nextLimit) {
BuildEdge e;
if (next.getEntry() instanceof TransitVendor){
e = super.createEdge(next);
} else {
e = createEdge(next);
vertex.connect(e);
}
addSubTask(e, nextLimit);
}
@Override
protected VendorsBuildEdge createEdge(Vertex<Vendor> target) {
BuildEdge cEdge = super.createEdge(target);
@@ -268,9 +252,6 @@ public class VendorsGraph extends ConnectibleGraph<Vendor> {
// If level > deep when vertex already added on upper deep
if (next.getLevel() < deep || next.getEntry() instanceof TransitVendor) {
boolean adding = next.getLevel() >= deep;
if (!adding){
next.setLevel(vertex.getLevel() - 1);
}
if (deep > 0 || adding) {
//Recursive build
VendorsGraphBuilder task = new VendorsGraphBuilder(this, e, set, deep - 1, nextLimit);
@@ -286,18 +267,6 @@ public class VendorsGraph extends ConnectibleGraph<Vendor> {
LOG.trace("Vertex {} already check", next);
}
}
private void joinSubTasks(){
for (RecursiveAction subTask : subTasks) {
if (callback.isCancel()){
subTask.cancel(true);
} else {
subTask.join();
}
}
subTasks.clear();
}
}
public class VendorsBuildEdge extends BuildEdge {

View File

@@ -11,7 +11,8 @@ import java.util.concurrent.RecursiveAction;
public abstract class AbstractGraph<T> implements Graph<T> {
private final static ForkJoinPool POOL = new ForkJoinPool();
private final static int THRESHOLD = 4;
//TODO: make it worked in multi thread
private final static int THRESHOLD = 1;
private final static Logger LOG = LoggerFactory.getLogger(AbstractGraph.class);
@@ -82,6 +83,11 @@ public abstract class AbstractGraph<T> implements Graph<T> {
return root;
}
@Override
public Collection<Vertex<T>> vertexes() {
return vertexes;
}
@Override
public int getMinJumps() {
return minJumps;
@@ -98,6 +104,7 @@ public abstract class AbstractGraph<T> implements Graph<T> {
}
protected abstract class GraphBuilder extends RecursiveAction {
protected final List<RecursiveAction> subTasks = new ArrayList<>(THRESHOLD);
protected final Vertex<T> vertex;
protected final Collection<T> set;
protected final int deep;
@@ -110,29 +117,51 @@ public abstract class AbstractGraph<T> implements Graph<T> {
this.limit = limit;
}
protected abstract double onConnect(T entry);
protected abstract double checkConnect(T entry);
protected abstract Edge<T> createEdge(Vertex<T> target);
protected RecursiveAction createSubTask(Vertex<T> vertex, Collection<T> set, int deep, double limit){
return createGraphBuilder(vertex, set, deep, limit);
}
@Override
protected void compute() {
protected final void compute() {
vertex.locker().lock();
try {
if (vertex.getLevel() <= deep){
vertex.setLevel(deep+1);
}
} finally {
vertex.locker().unlock();
}
build();
}
protected void build(){
LOG.trace("Build graph from {}, limit {}, deep {}", vertex, limit, deep);
ArrayList<RecursiveAction> subTasks = new ArrayList<>(THRESHOLD);
Iterator<T> iterator = set.iterator();
while (iterator.hasNext()) {
for (T entry : set) {
if (callback.isCancel()) break;
T entry = iterator.next();
if (entry == vertex.getEntry()) continue;
double nextLimit = onConnect(entry);
double nextLimit = checkConnect(entry);
if (nextLimit >= 0) {
LOG.trace("Connect {} to {}", vertex, entry);
Vertex<T> next = getInstance(entry, 0, deep);
connect(next, nextLimit);
} else {
LOG.trace("Vertex {} is far away", entry);
}
if (subTasks.size() >= THRESHOLD) {
joinSubTasks();
}
}
if (!subTasks.isEmpty()){
joinSubTasks();
}
LOG.trace("End build graph from {} on deep {}", vertex, deep);
}
protected void connect(Vertex<T> next, double nextLimit){
vertex.connect(createEdge(next));
// If level > deep when vertex already added on upper deep
if (next.getLevel() < deep) {
next.setLevel(vertex.getLevel() - 1);
if (deep > 0) {
//Recursive build
RecursiveAction task = createSubTask(next, set, deep - 1, nextLimit);
@@ -140,10 +169,9 @@ public abstract class AbstractGraph<T> implements Graph<T> {
subTasks.add(task);
}
}
} else {
LOG.trace("Vertex {} is far away", entry);
}
if (subTasks.size() == THRESHOLD || !iterator.hasNext()){
protected void joinSubTasks(){
for (RecursiveAction subTask : subTasks) {
if (callback.isCancel()){
subTask.cancel(true);
@@ -153,18 +181,6 @@ public abstract class AbstractGraph<T> implements Graph<T> {
}
subTasks.clear();
}
}
if (!subTasks.isEmpty()){
for (RecursiveAction subTask : subTasks) {
if (callback.isCancel()){
subTask.cancel(true);
} else {
subTask.join();
}
}
subTasks.clear();
}
LOG.trace("End build graph from {} on deep {}", vertex, deep);
}
}
}

View File

@@ -51,7 +51,7 @@ public class ConnectibleGraph<T extends Connectable<T>> extends AbstractGraph<T>
}
@Override
protected double onConnect(T entry) {
protected double checkConnect(T entry) {
distance = vertex.getEntry().getDistance(entry);
if (distance > getShip().getMaxJumpRange()){
LOG.trace("Vertex {} is far away, {}", entry, distance);

View File

@@ -1,5 +1,6 @@
package ru.trader.analysis.graph;
import java.util.Collection;
import java.util.Optional;
public interface Graph<T> {
@@ -9,6 +10,8 @@ public interface Graph<T> {
Vertex<T> getRoot();
Collection<Vertex<T>> vertexes();
int getMinJumps();
int getMinLevel();

View File

@@ -1,14 +1,15 @@
package ru.trader.analysis.graph;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantLock;
public class Vertex<T> {
private final ArrayList<Edge<T>> edges = new ArrayList<>();
private final T entry;
private final int index;
private final ReentrantLock lock = new ReentrantLock();
private volatile int level = -1;
public Vertex(T entry, int index) {
@@ -36,6 +37,10 @@ public class Vertex<T> {
return level;
}
public ReentrantLock locker(){
return lock;
}
public void connect(Edge<T> edge){
assert this == edge.getSource();
synchronized (edges){