Coverage Summary for Class: TestMultipleThreadMultipleJvm (org.umlg.sqlg.test.schema)
Class |
Class, %
|
Method, %
|
Branch, %
|
Line, %
|
TestMultipleThreadMultipleJvm |
100%
(1/1)
|
86.7%
(13/15)
|
75%
(87/116)
|
83.5%
(218/261)
|
package org.umlg.sqlg.test.schema;
import org.apache.commons.configuration2.builder.fluent.Configurations;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.umlg.sqlg.sql.dialect.SqlSchemaChangeDialect;
import org.umlg.sqlg.structure.PropertyDefinition;
import org.umlg.sqlg.structure.PropertyType;
import org.umlg.sqlg.structure.SqlgGraph;
import org.umlg.sqlg.structure.topology.EdgeLabel;
import org.umlg.sqlg.structure.topology.Schema;
import org.umlg.sqlg.structure.topology.VertexLabel;
import org.umlg.sqlg.test.BaseTest;
import java.net.URL;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
/**
* Date: 2016/10/29
* Time: 7:24 PM
*/
public class TestMultipleThreadMultipleJvm extends BaseTest {
private static final Logger logger = LoggerFactory.getLogger(TestMultipleThreadMultipleJvm.class.getName());
@BeforeClass
public static void beforeClass() {
URL sqlProperties = Thread.currentThread().getContextClassLoader().getResource("sqlg.properties");
try {
Configurations configs = new Configurations();
configuration = configs.properties(sqlProperties);
Assume.assumeTrue(isPostgres());
configuration.addProperty("distributed", true);
configuration.addProperty("c3p0.maxPoolSize", 3);
configuration.addProperty("dataSource.maximumPoolSize", 3);
if (!configuration.containsKey("jdbc.url"))
throw new IllegalArgumentException(String.format("SqlGraph configuration requires that the %s be set", "jdbc.url"));
} catch (ConfigurationException e) {
throw new RuntimeException(e);
}
}
// @Test
public void testMultiThreadedLocking() throws Exception {
//number graphs, pretending its a separate jvm
int NUMBER_OF_GRAPHS = 10;
ExecutorService sqlgGraphsExecutorService = Executors.newFixedThreadPool(100);
CompletionService<Boolean> sqlgGraphsExecutorCompletionService = new ExecutorCompletionService<>(sqlgGraphsExecutorService);
List<SqlgGraph> graphs = new ArrayList<>();
try {
//Pre-create all the graphs
for (int i = 0; i < NUMBER_OF_GRAPHS; i++) {
graphs.add(SqlgGraph.open(configuration));
}
List<Future<Boolean>> results = new ArrayList<>();
for (SqlgGraph sqlgGraphAsync : graphs) {
results.add(sqlgGraphsExecutorCompletionService.submit(() -> {
((SqlSchemaChangeDialect) sqlgGraphAsync.getSqlDialect()).lock(sqlgGraphAsync);
sqlgGraphAsync.tx().rollback();
return true;
}));
}
sqlgGraphsExecutorService.shutdown();
for (Future<Boolean> result : results) {
result.get(10, TimeUnit.SECONDS);
}
} finally {
for (SqlgGraph graph : graphs) {
graph.close();
}
}
}
@Test
public void testMultiThreadedSchemaCreation() throws Exception {
//number graphs, pretending its a separate jvm
int NUMBER_OF_GRAPHS = 10;
int NUMBER_OF_SCHEMAS = 200;
//Pre-create all the graphs
List<SqlgGraph> graphs = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_GRAPHS; i++) {
graphs.add(SqlgGraph.open(configuration));
}
logger.info(String.format("Done firing up %d graphs", NUMBER_OF_GRAPHS));
ExecutorService poolPerGraph = Executors.newFixedThreadPool(NUMBER_OF_GRAPHS);
CompletionService<SqlgGraph> poolPerGraphsExecutorCompletionService = new ExecutorCompletionService<>(poolPerGraph);
try {
List<Future<SqlgGraph>> results = new ArrayList<>();
for (final SqlgGraph sqlgGraphAsync : graphs) {
results.add(
poolPerGraphsExecutorCompletionService.submit(() -> {
for (int i = 0; i < NUMBER_OF_SCHEMAS; i++) {
//noinspection Duplicates
try {
sqlgGraphAsync.getTopology().ensureSchemaExist("schema_" + i);
final Random random = new Random();
if (random.nextBoolean()) {
logger.info("ensureSchemaExist " + "schema_" + i);
sqlgGraphAsync.tx().commit();
} else {
sqlgGraphAsync.tx().rollback();
}
} catch (Exception e) {
sqlgGraphAsync.tx().rollback();
if (e.getCause().getClass().getSimpleName().equals("PSQLException")) {
//swallow
logger.warn("Rollback transaction due to schema creation failure.", e);
} else {
logger.error(String.format("got exception %s", e.getCause().getClass().getSimpleName()), e);
throw new RuntimeException(e);
}
}
}
return sqlgGraphAsync;
}
)
);
}
poolPerGraph.shutdown();
for (Future<SqlgGraph> result : results) {
result.get(1, TimeUnit.MINUTES);
}
Thread.sleep(20_000);
for (SqlgGraph graph : graphs) {
assertEquals(this.sqlgGraph.getTopology(), graph.getTopology());
for (Schema schema : graph.getTopology().getSchemas()) {
Assert.assertTrue(schema.isCommitted());
}
}
} finally {
for (SqlgGraph graph : graphs) {
graph.close();
}
}
}
@Test
public void testMultiThreadedSchemaCreation2() throws Exception {
//number graphs, pretending its a separate jvm
int NUMBER_OF_GRAPHS = 5;
int NUMBER_OF_SCHEMAS = 100;
//Pre-create all the graphs
List<SqlgGraph> graphs = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_GRAPHS; i++) {
graphs.add(SqlgGraph.open(configuration));
}
logger.info(String.format("Done firing up %d graphs", NUMBER_OF_GRAPHS));
ExecutorService poolPerGraph = Executors.newFixedThreadPool(NUMBER_OF_GRAPHS);
CompletionService<SqlgGraph> poolPerGraphsExecutorCompletionService = new ExecutorCompletionService<>(poolPerGraph);
try {
List<Future<SqlgGraph>> results = new ArrayList<>();
for (final SqlgGraph sqlgGraphAsync : graphs) {
for (int i = 0; i < NUMBER_OF_SCHEMAS; i++) {
final int count = i;
results.add(
poolPerGraphsExecutorCompletionService.submit(() -> {
//noinspection Duplicates
try {
sqlgGraphAsync.getTopology().ensureSchemaExist("schema_" + count);
final Random random = new Random();
if (random.nextBoolean()) {
sqlgGraphAsync.tx().commit();
} else {
sqlgGraphAsync.tx().rollback();
}
} catch (Exception e) {
sqlgGraphAsync.tx().rollback();
if (e.getCause().getClass().getSimpleName().equals("PSQLException")) {
//swallow
logger.warn("Rollback transaction due to schema creation failure.", e);
} else {
logger.error(String.format("got exception %s", e.getCause().getClass().getSimpleName()), e);
throw new RuntimeException(e);
}
}
return sqlgGraphAsync;
}
)
);
}
}
poolPerGraph.shutdown();
for (Future<SqlgGraph> result : results) {
result.get(1, TimeUnit.MINUTES);
}
Thread.sleep(20_000);
for (SqlgGraph graph : graphs) {
assertEquals(this.sqlgGraph.getTopology(), graph.getTopology());
}
} finally {
for (SqlgGraph graph : graphs) {
graph.close();
}
}
}
@Test
public void testMultiThreadedVertexLabelCreation() throws Exception {
//number of graphs, pretending they are in separate jvms
int NUMBER_OF_GRAPHS = 5;
int NUMBER_OF_SCHEMAS = 100;
Set<Integer> successfulSchemas = new HashSet<>();
//Pre-create all the graphs
List<SqlgGraph> graphs = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_GRAPHS; i++) {
graphs.add(SqlgGraph.open(configuration));
}
logger.info(String.format("Done firing up %d graphs", NUMBER_OF_GRAPHS));
ExecutorService poolPerGraph = Executors.newFixedThreadPool(NUMBER_OF_GRAPHS);
CompletionService<SqlgGraph> poolPerGraphsExecutorCompletionService = new ExecutorCompletionService<>(poolPerGraph);
try {
Map<String, PropertyDefinition> properties = new HashMap<>();
properties.put("name", PropertyDefinition.of(PropertyType.STRING));
properties.put("age", PropertyDefinition.of(PropertyType.INTEGER));
List<Future<SqlgGraph>> results = new ArrayList<>();
for (final SqlgGraph sqlgGraphAsync : graphs) {
for (int i = 0; i < NUMBER_OF_SCHEMAS; i++) {
final int count = i;
results.add(
poolPerGraphsExecutorCompletionService.submit(() -> {
//noinspection Duplicates
for (int j = 0; j < 3; j++) {
VertexLabel outVertexLabel;
VertexLabel inVertexLabel;
EdgeLabel edgeLabel;
try {
outVertexLabel = sqlgGraphAsync.getTopology().ensureVertexLabelExist("schema_" + count, "tableOut_" + count, properties);
logger.info(String.format("created %s.%s", "schema_" + count, "tableOut_" + count));
inVertexLabel = sqlgGraphAsync.getTopology().ensureVertexLabelExist("schema_" + count, "tableIn_" + count, properties);
logger.info(String.format("created %s.%s", "schema_" + count, "tableIn_" + count));
edgeLabel = sqlgGraphAsync.getTopology().ensureEdgeLabelExist("edge_" + count, outVertexLabel, inVertexLabel, properties);
logger.info(String.format("created %s", "edge_" + count));
Assert.assertNotNull(outVertexLabel);
Assert.assertNotNull(inVertexLabel);
Assert.assertNotNull(edgeLabel);
final Random random = new Random();
if (random.nextBoolean()) {
sqlgGraphAsync.tx().commit();
successfulSchemas.add(count);
} else {
sqlgGraphAsync.tx().rollback();
}
break;
} catch (Exception e) {
sqlgGraphAsync.tx().rollback();
if (e.getCause().getClass().getSimpleName().equals("PSQLException")) {
//swallow
logger.warn("Rollback transaction due to schema creation failure.", e);
} else {
logger.error(String.format("got exception %s", e.getCause().getClass().getSimpleName()), e);
Assert.fail(e.getMessage());
}
}
}
return sqlgGraphAsync;
}
)
);
}
}
for (Future<SqlgGraph> result : results) {
result.get(5, TimeUnit.MINUTES);
}
Thread.sleep(20_000);
for (SqlgGraph graph : graphs) {
assertEquals(this.sqlgGraph.getTopology(), graph.getTopology());
logger.info(graph.getTopology().toJson().toString());
assertEquals("this.sqlGraph schema sizes mismatch", successfulSchemas.size() + 1, this.sqlgGraph.getTopology().getSchemas().size());
assertEquals("graph schema sizes mismatch", successfulSchemas.size() + 1, graph.getTopology().getSchemas().size());
if (!this.sqlgGraph.getTopology().toJson().equals(graph.getTopology().toJson())) {
for (Schema schema : this.sqlgGraph.getTopology().getSchemas()) {
Optional<Schema> otherSchema = graph.getTopology().getSchema(schema.getName());
Assert.assertTrue(otherSchema.isPresent());
if (!schema.toJson().equals(otherSchema.get().toJson())) {
logger.debug(schema.toJson().toString());
logger.debug(otherSchema.get().toJson().toString());
}
}
Assert.fail("json not the same");
}
}
logger.info("starting inserting data");
for (final SqlgGraph sqlgGraphAsync : graphs) {
for (int i = 0; i < NUMBER_OF_SCHEMAS; i++) {
if (successfulSchemas.contains(i)) {
final int count = i;
results.add(
poolPerGraphsExecutorCompletionService.submit(() -> {
//noinspection Duplicates
try {
Vertex v1 = sqlgGraphAsync.addVertex(T.label, "schema_" + count + "." + "tableOut_" + count, "name", "asdasd", "age", 1);
Vertex v2 = sqlgGraphAsync.addVertex(T.label, "schema_" + count + "." + "tableIn_" + count, "name", "asdasd", "age", 1);
v1.addEdge("edge_" + count, v2, "name", "asdasd", "age", 1);
final Random random = new Random();
if (random.nextBoolean()) {
sqlgGraphAsync.tx().rollback();
} else {
sqlgGraphAsync.tx().commit();
}
} catch (Exception e) {
sqlgGraphAsync.tx().rollback();
throw new RuntimeException(e);
}
return sqlgGraphAsync;
}
)
);
}
}
}
poolPerGraph.shutdown();
for (Future<SqlgGraph> result : results) {
result.get(30, TimeUnit.SECONDS);
}
//Because of the rollBack logic the insert code may also create topology elements, so sleep a bit for notify to do its thing.
Thread.sleep(20_000);
logger.info("starting querying data");
Set<Vertex> vertices = this.sqlgGraph.traversal().V().out().toSet();
this.sqlgGraph.tx().rollback();
for (SqlgGraph graph : graphs) {
logger.info("assert querying data");
Set<Vertex> actual = graph.traversal().V().out().toSet();
logger.info("vertices.size = " + vertices.size() + " actual.size = " + actual.size());
assertEquals(vertices, actual);
graph.tx().rollback();
}
} finally {
for (SqlgGraph graph : graphs) {
graph.close();
}
}
}
@Test
public void testConcurrentModificationException() throws Exception {
//number graphs, pretending its a separate jvm
int NUMBER_OF_GRAPHS = 3;
int NUMBER_OF_SCHEMAS = 100;
//Pre-create all the graphs
List<SqlgGraph> graphs = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_GRAPHS; i++) {
graphs.add(SqlgGraph.open(configuration));
}
logger.info(String.format("Done firing up %d graphs", NUMBER_OF_GRAPHS));
try {
ExecutorService insertPoolPerGraph = Executors.newFixedThreadPool(NUMBER_OF_GRAPHS);
CompletionService<SqlgGraph> insertPoolPerGraphsExecutorCompletionService = new ExecutorCompletionService<>(insertPoolPerGraph);
List<Future<SqlgGraph>> results = new ArrayList<>();
logger.info("starting inserting data");
for (final SqlgGraph sqlgGraphAsync : graphs) {
for (int i = 0; i < NUMBER_OF_SCHEMAS; i++) {
final int count = i;
results.add(
insertPoolPerGraphsExecutorCompletionService.submit(() -> {
//noinspection Duplicates
try {
for (int j = 0; j < 10; j++) {
Vertex v1 = sqlgGraphAsync.addVertex(T.label, "schema_" + count + "." + "tableOut_" + count, "name", "asdasd", "age", 1);
Vertex v2 = sqlgGraphAsync.addVertex(T.label, "schema_" + count + "." + "tableIn_" + count, "name", "asdasd", "age", 1);
v1.addEdge("edge_" + count, v2, "name", "asdasd", "age", 1);
sqlgGraphAsync.tx().commit();
}
} catch (Exception e) {
sqlgGraphAsync.tx().rollback();
if (e.getCause() == null) {
e.printStackTrace();
}
if (e.getCause().getClass().getSimpleName().equals("PSQLException")) {
//swallow
logger.warn("Rollback transaction due to schema creation failure.", e);
} else {
logger.error(String.format("got exception %s", e.getCause().getClass().getSimpleName()), e);
Assert.fail(e.getMessage());
}
}
return sqlgGraphAsync;
}
)
);
}
}
insertPoolPerGraph.shutdown();
AtomicBoolean keepReading = new AtomicBoolean(true);
ExecutorService readPoolPerGraph = Executors.newFixedThreadPool(NUMBER_OF_GRAPHS);
CompletionService<SqlgGraph> readPoolPerGraphsExecutorCompletionService = new ExecutorCompletionService<>(readPoolPerGraph);
List<Future<SqlgGraph>> readResults = new ArrayList<>();
logger.info("starting reading data");
for (final SqlgGraph sqlgGraphAsync : graphs) {
for (int i = 0; i < NUMBER_OF_SCHEMAS; i++) {
readResults.add(
readPoolPerGraphsExecutorCompletionService.submit(() -> {
try {
while (keepReading.get()) {
sqlgGraphAsync.getTopology().getAllTables();
sqlgGraphAsync.getTopology().getEdgeForeignKeys();
//noinspection BusyWait
Thread.sleep(100);
}
} catch (Exception e) {
sqlgGraphAsync.tx().rollback();
throw new RuntimeException(e);
}
return sqlgGraphAsync;
}
)
);
}
}
readPoolPerGraph.shutdown();
for (Future<SqlgGraph> result : results) {
result.get(30, TimeUnit.SECONDS);
logger.info("graph results returned");
}
keepReading.set(false);
for (Future<SqlgGraph> result : readResults) {
result.get(30, TimeUnit.SECONDS);
logger.info("graph readResults returned");
}
Thread.sleep(10_000);
logger.info("starting querying data");
Set<Vertex> vertices = this.sqlgGraph.traversal().V().out().toSet();
this.sqlgGraph.tx().rollback();
for (SqlgGraph graph : graphs) {
logger.info("assert querying data");
Set<Vertex> other = graph.traversal().V().out().toSet();
Assert.assertEquals(vertices.size(), other.size());
Assert.assertEquals(vertices, other);
graph.tx().rollback();
}
} finally {
for (SqlgGraph graph : graphs) {
graph.close();
}
}
}
}