Coverage Summary for Class: TestMultiThreadedBatch (org.umlg.sqlg.test.batch)
Class |
Class, %
|
Method, %
|
Branch, %
|
Line, %
|
TestMultiThreadedBatch |
100%
(1/1)
|
100%
(10/10)
|
50%
(23/46)
|
85.4%
(76/89)
|
package org.umlg.sqlg.test.batch;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.umlg.sqlg.structure.SqlgGraph;
import org.umlg.sqlg.test.BaseTest;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Date: 2014/11/06
* Time: 5:48 AM
*/
public class TestMultiThreadedBatch extends BaseTest {
private static final Logger logger = LoggerFactory.getLogger(TestMultiThreadedBatch.class.getName());
@BeforeClass
public static void beforeClass() {
BaseTest.beforeClass();
if (isPostgres()) {
configuration.addProperty("distributed", true);
}
}
@Before
public void beforeTest() {
Assume.assumeTrue(this.sqlgGraph.getSqlDialect().supportsBatchMode());
}
@Test
public void testMultiThreadAddVertex() throws InterruptedException {
sqlgGraph.tx().rollback();
Set<Integer> tables = new ConcurrentSkipListSet<>();
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int j = 0; j < 50; j++) {
executorService.submit(() -> {
sqlgGraph.tx().rollback();
sqlgGraph.tx().normalBatchModeOn();
final Random random = new Random();
int randomInt = random.nextInt();
try {
for (int i = 0; i < 1000; i++) {
Vertex v1 = sqlgGraph.addVertex(T.label, "Person" + randomInt, "name", randomInt);
Vertex v2 = sqlgGraph.addVertex(T.label, "Person" + randomInt, "name", randomInt);
v1.addEdge(String.valueOf(randomInt), v2, "name", randomInt);
}
tables.add(randomInt);
sqlgGraph.tx().commit();
sqlgGraph.tx().normalBatchModeOn();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
});
}
executorService.shutdown();
boolean terminated = executorService.awaitTermination(60000, TimeUnit.SECONDS);
Assert.assertTrue(terminated);
Assert.assertEquals(50, tables.size());
testMultiThreadAddVertex_assert(this.sqlgGraph, tables);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testMultiThreadAddVertex_assert(this.sqlgGraph1, tables);
}
}
private void testMultiThreadAddVertex_assert(SqlgGraph sqlgGraph, Set<Integer> tables) {
for (Integer i : tables) {
Assert.assertTrue(sqlgGraph.getTopology().getVertexLabel(sqlgGraph.getSqlDialect().getPublicSchema(), "Person" + i).isPresent());
Assert.assertEquals(2000, sqlgGraph.traversal().V().has(T.label, "Person" + i).has("name", i).count().next().intValue());
List<Vertex> persons = sqlgGraph.traversal().V().has(T.label, "Person" + i).toList();
for (Vertex v : persons) {
Assert.assertEquals(i, v.value("name"));
}
}
}
@Test
public void testMultiThreadAddVertexSameLabel() throws InterruptedException {
sqlgGraph.tx().rollback();
Set<Integer> tables = new ConcurrentSkipListSet<>();
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int j = 0; j < 50; j++) {
executorService.submit(() -> {
sqlgGraph.tx().rollback();
sqlgGraph.tx().normalBatchModeOn();
final Random random = new Random();
int randomInt = random.nextInt();
while (tables.contains(randomInt)) {
randomInt = random.nextInt();
}
try {
for (int k = 0; k < 3; k++) {
boolean failed = false;
for (int i = 0; i < 1000; i++) {
try {
Vertex v1 = sqlgGraph.addVertex(T.label, "House", "name", randomInt);
Vertex v2 = sqlgGraph.addVertex(T.label, "Street", "name", randomInt);
v1.addEdge("Overlooks", v2, "name", randomInt);
tables.add(randomInt);
} catch (Exception e) {
failed = true;
tables.remove(randomInt);
sqlgGraph.tx().rollback();
if ((isPostgres() && e.getCause().getClass().getSimpleName().equals("PSQLException")) ||
(isHsqldb() && e.getCause().getClass().getSimpleName().equals("SQLSyntaxErrorException")) ||
(isMariaDb() && e.getCause().getClass().getSimpleName().equals("SQLSyntaxErrorException")) ||
(isMysql() && e.getCause().getClass().getSimpleName().equals("SQLSyntaxErrorException")) ||
(isH2() && e.getCause().getClass().getSimpleName().equals("JdbcSQLSyntaxErrorException"))) {
//swallow
logger.warn("Rollback transaction due to schema creation failure.", e);
Thread.sleep(1000);
break;
} else {
logger.error(String.format("got exception %s", e.getCause().getClass().getSimpleName()), e);
throw new RuntimeException(e);
}
}
}
if (!failed) {
sqlgGraph.tx().commit();
break;
}
}
sqlgGraph.tx().normalBatchModeOn();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
});
}
executorService.shutdown();
boolean terminated = executorService.awaitTermination(60000, TimeUnit.SECONDS);
Assert.assertTrue(terminated);
Assert.assertEquals(50, tables.size());
testMultiThreadAddVertexSameLabel_assert(this.sqlgGraph, tables);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testMultiThreadAddVertexSameLabel_assert(this.sqlgGraph1, tables);
}
}
private void testMultiThreadAddVertexSameLabel_assert(SqlgGraph sqlgGraph, Set<Integer> tables) {
Assert.assertTrue(sqlgGraph.getTopology().getVertexLabel(sqlgGraph.getSqlDialect().getPublicSchema(), "House").isPresent());
Assert.assertTrue(sqlgGraph.getTopology().getVertexLabel(sqlgGraph.getSqlDialect().getPublicSchema(), "Street").isPresent());
for (Integer i : tables) {
Assert.assertEquals(String.valueOf(i), 1000, sqlgGraph.traversal().V().has(T.label, "House").has("name", i).count().next().intValue());
Assert.assertEquals(String.valueOf(i), 1000, sqlgGraph.traversal().V().has(T.label, "House").has("name", i).out("Overlooks").has("name", i).count().next().intValue());
}
}
}