Coverage Summary for Class: TestMultiThread (org.umlg.sqlg.test.schema)
Class |
Method, %
|
Branch, %
|
Line, %
|
TestMultiThread |
100%
(22/22)
|
78.6%
(55/70)
|
89.3%
(208/233)
|
TestMultiThread$1 |
100%
(2/2)
|
75%
(6/8)
|
TestMultiThread$10 |
100%
(1/1)
|
100%
(2/2)
|
TestMultiThread$11 |
100%
(1/1)
|
100%
(2/2)
|
TestMultiThread$2 |
100%
(2/2)
|
66.7%
(4/6)
|
TestMultiThread$3 |
100%
(1/1)
|
100%
(3/3)
|
TestMultiThread$4 |
100%
(1/1)
|
100%
(2/2)
|
TestMultiThread$5 |
100%
(2/2)
|
100%
(4/4)
|
90%
(27/30)
|
TestMultiThread$6 |
100%
(1/1)
|
100%
(2/2)
|
TestMultiThread$7 |
100%
(1/1)
|
100%
(2/2)
|
TestMultiThread$8 |
100%
(1/1)
|
100%
(2/2)
|
TestMultiThread$9 |
100%
(1/1)
|
100%
(2/2)
|
Total |
100%
(36/36)
|
79.7%
(59/74)
|
89.1%
(262/294)
|
package org.umlg.sqlg.test.schema;
import com.google.common.base.Preconditions;
import org.apache.commons.configuration2.builder.fluent.Configurations;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.umlg.sqlg.structure.PropertyDefinition;
import org.umlg.sqlg.structure.PropertyType;
import org.umlg.sqlg.structure.SqlgGraph;
import org.umlg.sqlg.structure.topology.VertexLabel;
import org.umlg.sqlg.test.BaseTest;
import java.net.URL;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/**
* Date: 2014/09/24
* Time: 10:46 AM
*/
public class TestMultiThread extends BaseTest {
private static final Logger LOGGER = LoggerFactory.getLogger(TestMultiThread.class);
/**
* This test is a duplicate of TransactionTest.shouldSupportTransactionIsolationCommitCheck but with the schema created upfront else it deadlocks.
*/
@Test
public void shouldSupportTransactionIsolationCommitCheck() throws Exception {
Vertex v1 = this.sqlgGraph.addVertex();
this.sqlgGraph.tx().commit();
v1.remove();
this.sqlgGraph.tx().commit();
// the purpose of this test is to simulate gremlin server access to a graph instance, where one thread modifies
// the graph and a separate thread cannot affect the transaction of the first
final CountDownLatch latchCommittedInOtherThread = new CountDownLatch(1);
final CountDownLatch latchCommitInOtherThread = new CountDownLatch(1);
final AtomicBoolean noVerticesInFirstThread = new AtomicBoolean(false);
// this thread starts a transaction then waits while the second thread tries to commit it.
final Thread threadTxStarter = new Thread("thread1") {
@Override
public void run() {
TestMultiThread.this.sqlgGraph.addVertex();
latchCommitInOtherThread.countDown();
try {
latchCommittedInOtherThread.await();
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
TestMultiThread.this.sqlgGraph.tx().rollback();
// there should be no vertices here
noVerticesInFirstThread.set(!TestMultiThread.this.sqlgGraph.vertices().hasNext());
}
};
threadTxStarter.start();
// this thread tries to commit the transaction started in the first thread above.
final Thread threadTryCommitTx = new Thread("thread2") {
@Override
public void run() {
try {
latchCommitInOtherThread.await();
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
// try to commit the other transaction
TestMultiThread.this.sqlgGraph.tx().commit();
latchCommittedInOtherThread.countDown();
}
};
threadTryCommitTx.start();
threadTxStarter.join();
threadTryCommitTx.join();
Assert.assertTrue(noVerticesInFirstThread.get());
assertVertexEdgeCounts(sqlgGraph, 0, 0);
}
@Test
public void shouldExecuteWithCompetingThreads() throws InterruptedException {
//Create the schema upfront so that graphs (Hsqldb, H2, Mysql...) that do not support transactional schema's can succeed.
VertexLabel vertexLabel = this.sqlgGraph.getTopology().ensureVertexLabelExist("vertex", new HashMap<>() {{
put("test", PropertyDefinition.of(PropertyType.LONG));
put("blah", PropertyDefinition.of(PropertyType.DOUBLE));
}});
vertexLabel.ensureEdgeLabelExist("friend", vertexLabel, new HashMap<>() {{
put("bloop", PropertyDefinition.of(PropertyType.INTEGER));
}});
this.sqlgGraph.tx().commit();
final Graph graph = this.sqlgGraph;
int totalThreads = 250;
final AtomicInteger vertices = new AtomicInteger(0);
final AtomicInteger edges = new AtomicInteger(0);
final AtomicInteger completedThreads = new AtomicInteger(0);
CountDownLatch countDownLatch = new CountDownLatch(totalThreads);
for (int i = 0; i < totalThreads; i++) {
new Thread() {
@Override
public void run() {
try {
final Random random = new Random();
if (random.nextBoolean()) {
final Vertex a = graph.addVertex();
final Vertex b = graph.addVertex();
final Edge e = a.addEdge("friend", b);
vertices.getAndAdd(2);
a.property("test", this.getId());
b.property("blah", random.nextDouble());
e.property("bloop", random.nextInt());
edges.getAndAdd(1);
graph.tx().commit();
} else {
final Vertex a = graph.addVertex();
final Vertex b = graph.addVertex();
final Edge e = a.addEdge("friend", b);
a.property("test", this.getId());
b.property("blah", random.nextDouble());
e.property("bloop", random.nextInt());
if (random.nextBoolean()) {
graph.tx().commit();
vertices.getAndAdd(2);
edges.getAndAdd(1);
} else {
graph.tx().rollback();
}
}
completedThreads.getAndAdd(1);
LOGGER.info("shouldExecuteWithCompetingThreads " + completedThreads.get());
} catch (Exception e) {
LOGGER.error("failure", e);
Assert.fail(e.getMessage());
} finally {
countDownLatch.countDown();
}
}
}.start();
}
boolean success = countDownLatch.await(5, TimeUnit.MINUTES);
Assert.assertTrue(success);
Assert.assertEquals(completedThreads.get(), totalThreads);
System.out.println(vertices.get());
assertVertexEdgeCounts(graph, vertices.get(), edges.get());
}
private static void assertVertexEdgeCounts(final Graph graph, final int expectedVertexCount, final int expectedEdgeCount) {
getAssertVertexEdgeCounts(expectedVertexCount, expectedEdgeCount).accept(graph);
}
private static Consumer<Graph> getAssertVertexEdgeCounts(final int expectedVertexCount, final int expectedEdgeCount) {
return (g) -> {
Assert.assertEquals(expectedVertexCount, IteratorUtils.count(g.vertices()));
Assert.assertEquals(expectedEdgeCount, IteratorUtils.count(g.edges()));
};
}
@Test
public void testMultiThreadVertices() throws InterruptedException {
Set<Integer> tables = new ConcurrentSkipListSet<>();
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int j = 0; j < 20; j++) {
int finalJ = j;
executorService.submit(() -> {
try {
final Random random = new Random();
random.nextInt();
for (int i = 0; i < 10; i++) {
sqlgGraph.addVertex(T.label, "Person" + finalJ, "name", String.valueOf(finalJ));
}
sqlgGraph.tx().commit();
tables.add(finalJ);
} catch (Exception e) {
sqlgGraph.tx().rollback();
LOGGER.error(e.getMessage(), e);
Assert.fail(e.getMessage());
}
});
}
executorService.shutdown();
if (executorService.awaitTermination(6000, TimeUnit.SECONDS)) {
LOGGER.info("normal termination");
} else {
Assert.fail("failed to terminate executor service normally");
}
for (Integer i : tables) {
LOGGER.info(String.format("looking for 'Person%d'", i));
Assert.assertTrue(String.format("Person%d not found", i), this.sqlgGraph.getTopology().getVertexLabel(this.sqlgGraph.getSqlDialect().getPublicSchema(), "Person" + i).isPresent());
Assert.assertEquals(10, this.sqlgGraph.traversal().V().has(T.label, "Person" + i).has("name", String.valueOf(i)).count().next().intValue());
}
}
@Test
public void testMultiThreadEdges() throws InterruptedException {
//For some reason Maria don't like this one on teamcity
Assume.assumeFalse(isMariaDb());
Assume.assumeFalse(isH2());
Assume.assumeFalse(isHsqldb());
Vertex v1 = sqlgGraph.addVertex(T.label, "Person", "name", "0");
sqlgGraph.tx().commit();
Set<Integer> tables = new ConcurrentSkipListSet<>();
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int j = 0; j < 100; j++) {
executorService.submit(() -> {
final Random random = new Random();
int randomInt = random.nextInt();
for (int i = 0; i < 10; i++) {
Vertex v2 = sqlgGraph.addVertex(T.label, "Person" + randomInt, "name", String.valueOf(randomInt));
v1.addEdge("test" + randomInt, v2);
tables.add(randomInt);
}
sqlgGraph.tx().commit();
});
}
executorService.shutdown();
boolean success = executorService.awaitTermination(60, TimeUnit.SECONDS);
Assert.assertTrue(success);
for (Integer i : tables) {
Assert.assertTrue(this.sqlgGraph.getTopology().getVertexLabel(this.sqlgGraph.getSqlDialect().getPublicSchema(), "Person" + i).isPresent());
Assert.assertEquals(10, this.sqlgGraph.traversal().V().has(T.label, "Person" + i).has("name", String.valueOf(i)).count().next().intValue());
Assert.assertEquals(10, vertexTraversal(this.sqlgGraph, v1).out("test" + i).count().next().intValue());
}
}
@Test
public void testMultiThreadCreateSchemas() throws InterruptedException, ExecutionException {
Set<Integer> schemas = new HashSet<>();
ExecutorService executorService = Executors.newFixedThreadPool(200);
for (int i = 0; i < 10_000; i++) {
Integer schema = new Random().nextInt(99);
schemas.add(schema);
Future<?> f = executorService.submit(() -> {
this.sqlgGraph.getTopology().ensureSchemaExist("schema_" + schema);
this.sqlgGraph.tx().commit();
});
f.get();
}
executorService.shutdown();
boolean terminated = executorService.awaitTermination(5, TimeUnit.SECONDS);
Assert.assertTrue(terminated);
//+ 1 for the public schema
Assert.assertEquals(schemas.size() + 1, this.sqlgGraph.getTopology().getSchemas().size());
}
/**
* test when each graph is created in its own thread, in distributed mode
*/
@Test
public void testMultipleGraphs() throws Exception {
URL sqlProperties = Thread.currentThread().getContextClassLoader().getResource("sqlg.properties");
try {
Configurations configs = new Configurations();
configuration = configs.properties(sqlProperties);
Assume.assumeTrue(isPostgres());
configuration.addProperty("distributed", true);
if (!configuration.containsKey("jdbc.url"))
throw new IllegalArgumentException(String.format("SqlGraph configuration requires that the %s be set", "jdbc.url"));
} catch (ConfigurationException e) {
throw new RuntimeException(e);
}
VertexLabel personTrue = this.sqlgGraph.getTopology().getPublicSchema().ensureVertexLabelExist("Person_True", new LinkedHashMap<>() {{
put("name", PropertyDefinition.of(PropertyType.STRING));
}});
VertexLabel addressTrue = this.sqlgGraph.getTopology().getPublicSchema().ensureVertexLabelExist("Address_True", new LinkedHashMap<>() {{
put("name", PropertyDefinition.of(PropertyType.STRING));
}});
personTrue.ensureEdgeLabelExist("address_True", addressTrue, new LinkedHashMap<>() {{
put("name", PropertyDefinition.of(PropertyType.STRING));
}});
VertexLabel personLabel = this.sqlgGraph.getTopology().getPublicSchema().ensureVertexLabelExist("Person", new LinkedHashMap<>() {{
put("name", PropertyDefinition.of(PropertyType.STRING));
}});
VertexLabel addressLabel = this.sqlgGraph.getTopology().getPublicSchema().ensureVertexLabelExist("Address", new LinkedHashMap<>() {{
put("name", PropertyDefinition.of(PropertyType.STRING));
}});
personLabel.ensureEdgeLabelExist("address", addressLabel, new LinkedHashMap<>() {{
put("name", PropertyDefinition.of(PropertyType.STRING));
}});
this.sqlgGraph.tx().commit();
ExecutorService executorService = Executors.newFixedThreadPool(50);
int loop = 400;
for (int i = 0; i < loop; i++) {
String n = "person" + i;
executorService.submit(() -> {
try {
try (SqlgGraph sqlgGraph1 = SqlgGraph.open(configuration)) {
final Random random = new Random();
if (random.nextBoolean()) {
Vertex person = sqlgGraph1.addVertex(T.label, "Person_True", "name", n);
Vertex address = sqlgGraph1.addVertex(T.label, "Address_True", "name", n);
person.addEdge("address_True", address, "name", n);
} else {
Vertex person = sqlgGraph1.addVertex(T.label, "Person", "name", n);
Vertex address = sqlgGraph1.addVertex(T.label, "Address", "name", n);
person.addEdge("address", address, "name", n);
}
sqlgGraph1.tx().commit();
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
Assert.fail(e.getMessage());
}
});
}
executorService.shutdown();
boolean terminated = executorService.awaitTermination(100, TimeUnit.SECONDS);
Assert.assertTrue(terminated);
try (SqlgGraph sqlgGraph1 = SqlgGraph.open(configuration)) {
Assert.assertEquals(400, sqlgGraph1.traversal().V().hasLabel("Person_True").count().next() + sqlgGraph1.traversal().V().hasLabel("Person").count().next());
Assert.assertEquals(400, sqlgGraph1.traversal().V().hasLabel("Address_True").count().next() + sqlgGraph1.traversal().V().hasLabel("Address").count().next());
Assert.assertEquals(400, sqlgGraph1.traversal().E().hasLabel("address_True").count().next() + sqlgGraph1.traversal().E().hasLabel("address").count().next());
}
}
/**
* test when each graph is created in its own thread, in distributed mode
* each thread created a different label
*/
@Test
public void testMultipleGraphsMultipleLabels() throws Exception {
Assume.assumeTrue(isPostgres());
URL sqlProperties = Thread.currentThread().getContextClassLoader().getResource("sqlg.properties");
try {
Configurations configs = new Configurations();
configuration = configs.properties(sqlProperties);
configuration.addProperty("distributed", true);
configuration.addProperty("c3p0.maxPoolSize", 3);
configuration.addProperty("dataSource.maximumPoolSize", 3);
if (!configuration.containsKey("jdbc.url"))
throw new IllegalArgumentException(String.format("SqlGraph configuration requires that the %s be set", "jdbc.url"));
} catch (ConfigurationException e) {
Assert.fail(e.getMessage());
}
ExecutorService executorService = Executors.newFixedThreadPool(200);
int loop = 200;
// int loop = 2;
for (int i = 0; i < loop; i++) {
String n = "person" + i;
executorService.submit(() -> {
try {
try (SqlgGraph sqlgGraph2 = SqlgGraph.open(configuration)) {
sqlgGraph2.addVertex(T.label, "Person" + n, "name", n);
sqlgGraph2.tx().commit();
}
} catch (Exception e) {
Assert.fail(e.getMessage());
}
});
}
executorService.shutdown();
boolean terminatedNormally = executorService.awaitTermination(1, TimeUnit.MINUTES);
Preconditions.checkState(terminatedNormally);
try (SqlgGraph sqlgGraph2 = SqlgGraph.open(configuration)) {
for (int i = 0; i < loop; i++) {
String n = "person" + i;
Assert.assertEquals(1, sqlgGraph2.traversal().V().hasLabel("Person" + n).count().next().longValue());
}
}
}
@Test
public void testLoadsOfSchemaChanges() throws InterruptedException, ExecutionException {
Assume.assumeFalse(isH2());
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Future<Integer>> futureList = new ArrayList<>();
int loop = 1000;
for (int i = 0; i < loop; i++) {
String n = "person" + i;
String edge = "e" + i;
int current = i;
futureList.add(executorService.submit(() -> {
try {
Vertex v1 = this.sqlgGraph.addVertex(T.label, n, "name", n);
Vertex v2 = this.sqlgGraph.addVertex(T.label, n, "name", n);
final Random random = new Random();
if (random.nextBoolean()) {
v1.property("another" + n, "asd");
}
if (random.nextBoolean()) {
Edge e = v1.addEdge(edge, v2);
if (random.nextBoolean()) {
e.property("yetanother" + n, "asd");
}
}
this.sqlgGraph.tx().commit();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
Assert.fail(e.getMessage());
}
return current;
}));
}
executorService.shutdown();
for (Future<Integer> future : futureList) {
LOGGER.info("Completed " + future.get());
}
boolean terminated = executorService.awaitTermination(1, TimeUnit.SECONDS);
Preconditions.checkState(terminated, "executorService terminated via timeout");
for (int i = 0; i < loop; i++) {
String n = "person" + i;
Assert.assertEquals(n + " failed", 2, this.sqlgGraph.traversal().V().hasLabel(n).count().next().longValue());
}
}
@Test
public void simulateReadWriteChange() throws ExecutionException, InterruptedException {
//Sleep here, help with testing connections from previous test staying idle on postgres.
Thread.sleep(3_000);
List<String> labels = new ArrayList<>();
List<String> properties = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
labels.add("label" + i);
properties.add("property" + i);
}
ExecutorService executorService = Executors.newFixedThreadPool(1);
List<Future<Integer>> futureList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
int count = i;
futureList.add(executorService.submit(() -> {
try {
for (int i1 = 0; i1 < 1000; i1++) {
sqlgGraph.addVertex(T.label, labels.get(i1), properties.get(i1), "asd");
sqlgGraph.tx().commit();
}
return count;
} catch (Exception e) {
sqlgGraph.tx().rollback();
throw new RuntimeException(e);
}
}));
}
for (int i = 0; i < 3; i++) {
int count = i;
futureList.add(executorService.submit(() -> {
try {
for (int i12 = 0; i12 < 1000; i12++) {
sqlgGraph.traversal().V().hasLabel(labels.get(i12)).iterate();
sqlgGraph.tx().rollback();
}
return count;
} catch (Exception e) {
sqlgGraph.tx().rollback();
throw new RuntimeException(e);
}
}));
}
executorService.shutdown();
for (Future<Integer> integerFuture : futureList) {
LOGGER.info("done " + integerFuture.get());
}
Assert.assertEquals(1000, this.sqlgGraph.getTopology().getPublicSchema().getVertexLabels().size());
}
}