Coverage Summary for Class: TestBatchedStreaming (org.umlg.sqlg.test.batch)
Class |
Method, %
|
Branch, %
|
Line, %
|
TestBatchedStreaming |
100%
(19/19)
|
84.8%
(39/46)
|
100%
(201/201)
|
TestBatchedStreaming$1 |
100%
(1/1)
|
100%
(3/3)
|
TestBatchedStreaming$2 |
100%
(1/1)
|
100%
(4/4)
|
Total |
100%
(21/21)
|
84.8%
(39/46)
|
100%
(208/208)
|
package org.umlg.sqlg.test.batch;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.umlg.sqlg.structure.PropertyDefinition;
import org.umlg.sqlg.structure.PropertyType;
import org.umlg.sqlg.structure.SqlgExceptions;
import org.umlg.sqlg.structure.SqlgGraph;
import org.umlg.sqlg.test.BaseTest;
import java.util.*;
/**
* Date: 2015/10/03
* Time: 8:53 PM
*/
public class TestBatchedStreaming extends BaseTest {
private static final Logger LOGGER = LoggerFactory.getLogger(TestBatchedStreaming.class);
@BeforeClass
public static void beforeClass() {
BaseTest.beforeClass();
if (isPostgres()) {
configuration.addProperty("distributed", true);
}
}
@Before
public void beforeTest() {
Assume.assumeTrue(this.sqlgGraph.getSqlDialect().supportsStreamingBatchMode());
}
@Test
public void testWithoutLock() {
this.sqlgGraph.getTopology().getPublicSchema().ensureVertexLabelExist("A", new HashMap<>() {{
put("name", PropertyDefinition.of(PropertyType.STRING));
put("age", PropertyDefinition.of(PropertyType.INTEGER));
}});
this.sqlgGraph.tx().commit();
this.sqlgGraph.tx().streamingBatchModeOn();
StopWatch stopWatch = StopWatch.createStarted();
for (int i = 0; i < 1_000_000; i++) {
this.sqlgGraph.streamVertex(T.label, "A", "name", "John", "age", 1);
}
this.sqlgGraph.tx().commit();
Assert.assertEquals(1_000_000, this.sqlgGraph.traversal().V().hasLabel("A").count().next(), 0L);
stopWatch.stop();
LOGGER.info(stopWatch.toString());
}
@Test
public void testWithLock() {
Assume.assumeTrue(sqlgGraph.getSqlDialect().supportsStreamingBatchMode());
int size = 1_000_000;
Map<String, PropertyDefinition> properties = new HashMap<>();
for (int i = 0; i < 100; i++) {
properties.put("a" + i, PropertyDefinition.of(PropertyType.STRING));
}
this.sqlgGraph.getTopology().getPublicSchema().ensureVertexLabelExist("A", properties);
this.sqlgGraph.tx().commit();
this.sqlgGraph.getTopology().lock();
this.sqlgGraph.tx().streamingBatchModeOn();
LinkedHashMap<String, Object> row = new LinkedHashMap<>();
for (int j = 0; j < 100; j++) {
row.put("a" + j, "v_" + j);
}
StopWatch stopWatch = StopWatch.createStarted();
for (int i = 0; i < size; i++) {
this.sqlgGraph.streamVertex("A", row);
}
this.sqlgGraph.tx().commit();
stopWatch.stop();
LOGGER.info(stopWatch.toString());
Assert.assertEquals(size, this.sqlgGraph.traversal().V().hasLabel("A").count().next(), 0L);
}
@Test
public void testNullProperties() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
//surname is null, that bad-ass means null
try {
this.sqlgGraph.streamVertex(T.label, "Person", "name", "John1", "surname", null, "age", 1);
} catch (SqlgExceptions.InvalidPropertyTypeException e) {
Assert.assertEquals("Property of type NULL is not supported", e.getMessage());
}
try {
this.sqlgGraph.streamVertex(T.label, "Person", "name", "John2", "surname", "Smith", "age", null);
} catch (SqlgExceptions.InvalidPropertyTypeException e) {
Assert.assertEquals("Property of type NULL is not supported", e.getMessage());
}
this.sqlgGraph.getTopology().getPublicSchema().ensureVertexLabelExist(
"Person",
new LinkedHashMap<>() {{
put("name", PropertyDefinition.of(PropertyType.STRING));
put("surname", PropertyDefinition.of(PropertyType.STRING));
put("age", PropertyDefinition.of(PropertyType.INTEGER));
}});
this.sqlgGraph.streamVertex(T.label, "Person", "name", "John1", "surname", null, "age", 1);
this.sqlgGraph.streamVertex(T.label, "Person", "name", "John2", "surname", "Smith", "age", null);
this.sqlgGraph.streamVertex(T.label, "Person", "name", "John3", "surname", "", "age", 1);
this.sqlgGraph.tx().commit();
testNullProperties_assert(this.sqlgGraph);
if (this.sqlgGraph1 != null) {
Thread.sleep(1000);
testNullProperties_assert(this.sqlgGraph1);
}
}
private void testNullProperties_assert(SqlgGraph sqlgGraph) {
Vertex john1 = sqlgGraph.traversal().V().hasLabel("Person").has("name", "John1").next();
Assert.assertNull(john1.property("surname").value());
Assert.assertNotNull(john1.property("age").value());
Vertex john2 = sqlgGraph.traversal().V().hasLabel("Person").has("name", "John2").next();
Assert.assertNotNull(john2.property("surname").value());
Assert.assertNull(john2.property("age").value());
Vertex john3 = sqlgGraph.traversal().V().hasLabel("Person").has("name", "John3").next();
Assert.assertNotNull(john3.property("surname").value());
Assert.assertEquals("", john3.value("surname"));
}
@Test
public void testStreamingWithBatchSize() throws InterruptedException {
int BATCH_SIZE = 100;
StopWatch stopWatch = new StopWatch();
stopWatch.start();
LinkedHashMap<String, Object> properties = new LinkedHashMap<>();
this.sqlgGraph.tx().streamingWithLockBatchModeOn();
List<Pair<Vertex, Vertex>> uids = new ArrayList<>();
String uuidCache1 = null;
String uuidCache2 = null;
for (int i = 1; i <= 1000; i++) {
String uuid1 = UUID.randomUUID().toString();
String uuid2 = UUID.randomUUID().toString();
if (i == 50) {
uuidCache1 = uuid1;
uuidCache2 = uuid2;
}
properties.put("id", uuid1);
Vertex v1 = this.sqlgGraph.addVertex("Person", properties);
properties.put("id", uuid2);
Vertex v2 = this.sqlgGraph.addVertex("Person", properties);
uids.add(Pair.of(v1, v2));
if (i % (BATCH_SIZE / 2) == 0) {
this.sqlgGraph.tx().flush();
this.sqlgGraph.tx().streamingWithLockBatchModeOn();
for (Pair<Vertex, Vertex> uid : uids) {
uid.getLeft().addEdge("friend", uid.getRight());
}
//This is needed because the number of edges are less than the batch size so it will not be auto flushed
this.sqlgGraph.tx().flush();
uids.clear();
this.sqlgGraph.tx().streamingWithLockBatchModeOn();
}
}
this.sqlgGraph.tx().commit();
stopWatch.stop();
testStreamingWithBatchSize(this.sqlgGraph, uuidCache1, uuidCache2);
if (this.sqlgGraph1 != null) {
Thread.sleep(1000);
testStreamingWithBatchSize(this.sqlgGraph1, uuidCache1, uuidCache2);
}
}
private void testStreamingWithBatchSize(SqlgGraph sqlgGraph, String uuidCache1, String uuidCache2) {
Assert.assertEquals(2000, sqlgGraph.traversal().V().hasLabel("Person").count().next(), 0);
Assert.assertEquals(1000, sqlgGraph.traversal().E().hasLabel("friend").count().next(), 0);
GraphTraversal<Vertex, Vertex> has = sqlgGraph.traversal().V().hasLabel("Person").has("id", uuidCache1);
Assert.assertTrue(has.hasNext());
Vertex person50 = has.next();
GraphTraversal<Vertex, Vertex> has1 = sqlgGraph.traversal().V().hasLabel("Person").has("id", uuidCache2);
Assert.assertTrue(has1.hasNext());
Vertex person250 = has1.next();
Assert.assertTrue(sqlgGraph.traversal().V(person50.id()).out().hasNext());
Vertex person250Please = sqlgGraph.traversal().V(person50.id()).out().next();
Assert.assertEquals(person250, person250Please);
}
@Test
public void testStreamingWithBatchSizeNonDefaultSchema() throws InterruptedException {
final int BATCH_SIZE = 1000;
StopWatch stopWatch = new StopWatch();
stopWatch.start();
LinkedHashMap<String, Object> properties = new LinkedHashMap<>();
this.sqlgGraph.tx().streamingWithLockBatchModeOn();
List<Pair<Vertex, Vertex>> uids = new ArrayList<>();
String uuidCache1 = null;
String uuidCache2 = null;
for (int i = 1; i <= 1000; i++) {
String uuid1 = UUID.randomUUID().toString();
String uuid2 = UUID.randomUUID().toString();
if (i == 50) {
uuidCache1 = uuid1;
uuidCache2 = uuid2;
}
properties.put("id", uuid1);
Vertex v1 = this.sqlgGraph.addVertex("A.Person", properties);
properties.put("id", uuid2);
Vertex v2 = this.sqlgGraph.addVertex("A.Person", properties);
uids.add(Pair.of(v1, v2));
if (i % (BATCH_SIZE / 2) == 0) {
this.sqlgGraph.tx().flush();
for (Pair<Vertex, Vertex> uid : uids) {
uid.getLeft().addEdge("friend", uid.getRight());
}
//This is needed because the number of edges are less than the batch size so it will not be auto flushed
this.sqlgGraph.tx().flush();
uids.clear();
}
}
this.sqlgGraph.tx().commit();
stopWatch.stop();
testStreamingWithBatchSizeNonDefaultSchema_assert(this.sqlgGraph, uuidCache1, uuidCache2);
if (this.sqlgGraph1 != null) {
Thread.sleep(1000);
testStreamingWithBatchSizeNonDefaultSchema_assert(this.sqlgGraph1, uuidCache1, uuidCache2);
}
}
private void testStreamingWithBatchSizeNonDefaultSchema_assert(SqlgGraph sqlgGraph, String uuidCache1, String uuidCache2) {
Assert.assertEquals(2000, sqlgGraph.traversal().V().hasLabel("A.Person").count().next(), 0);
Assert.assertEquals(1000, sqlgGraph.traversal().E().hasLabel("A.friend").count().next(), 0);
GraphTraversal<Vertex, Vertex> has = sqlgGraph.traversal().V().hasLabel("A.Person").has("id", uuidCache1);
Assert.assertTrue(has.hasNext());
Vertex person50 = has.next();
GraphTraversal<Vertex, Vertex> has1 = sqlgGraph.traversal().V().hasLabel("A.Person").has("id", uuidCache2);
Assert.assertTrue(has1.hasNext());
Vertex person250 = has1.next();
Assert.assertTrue(sqlgGraph.traversal().V(person50.id()).out().hasNext());
Vertex person250Please = sqlgGraph.traversal().V(person50.id()).out().next();
Assert.assertEquals(person250, person250Please);
}
@Test
public void testStreamingWithBatchSizeWithCallBack() throws InterruptedException {
LinkedHashMap<String, Object> properties = new LinkedHashMap<>();
List<Vertex> persons = new ArrayList<>();
this.sqlgGraph.tx().streamingWithLockBatchModeOn();
for (int i = 1; i <= 10; i++) {
String uuid1 = UUID.randomUUID().toString();
properties.put("id", uuid1);
persons.add(this.sqlgGraph.addVertex("Person", properties));
}
this.sqlgGraph.tx().flush();
Vertex previous = null;
for (Vertex person : persons) {
if (previous == null) {
previous = person;
} else {
previous.addEdge("friend", person);
}
}
this.sqlgGraph.tx().commit();
testStreamingWithBatchSizeWithCallBack_assert(this.sqlgGraph);
if (this.sqlgGraph1 != null) {
Thread.sleep(1000);
testStreamingWithBatchSizeWithCallBack_assert(this.sqlgGraph1);
}
}
private void testStreamingWithBatchSizeWithCallBack_assert(SqlgGraph sqlgGraph) {
Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Person").count().next(), 0);
Assert.assertEquals(9, sqlgGraph.traversal().E().hasLabel("friend").count().next(), 0);
}
@Test
public void streamJava8StyleWithSchema() throws InterruptedException {
List<String> uids = Arrays.asList("1", "2", "3", "4", "5");
this.sqlgGraph.tx().streamingBatchModeOn();
uids.forEach(u -> this.sqlgGraph.streamVertex(T.label, "R_HG.Person", "name", u));
this.sqlgGraph.tx().commit();
streamJava8StyleWithSchema_assert(this.sqlgGraph);
if (this.sqlgGraph1 != null) {
Thread.sleep(1000);
streamJava8StyleWithSchema_assert(this.sqlgGraph1);
}
}
private void streamJava8StyleWithSchema_assert(SqlgGraph sqlgGraph) {
Assert.assertEquals(5, sqlgGraph.traversal().V().hasLabel("R_HG.Person").count().next(), 0L);
}
@Test
public void testBatchContinuations() throws InterruptedException {
this.sqlgGraph.tx().normalBatchModeOn();
Vertex v1 = this.sqlgGraph.addVertex(T.label, "Person");
Vertex v2 = this.sqlgGraph.addVertex(T.label, "Dog");
v1.addEdge("pet", v2);
this.sqlgGraph.tx().flush();
this.sqlgGraph.tx().streamingWithLockBatchModeOn();
for (int i = 1; i <= 100; i++) {
this.sqlgGraph.addVertex("Person", new LinkedHashMap<>());
}
this.sqlgGraph.tx().flush();
this.sqlgGraph.tx().streamingBatchModeOn();
this.sqlgGraph.streamVertex("Person", new LinkedHashMap<>());
this.sqlgGraph.tx().commit();
testBatchContinuations_assert(this.sqlgGraph);
if (this.sqlgGraph1 != null) {
Thread.sleep(1000);
testBatchContinuations_assert(this.sqlgGraph1);
}
}
private void testBatchContinuations_assert(SqlgGraph sqlgGraph) {
Assert.assertEquals(102, sqlgGraph.traversal().V().hasLabel("Person").count().next(), 0L);
Assert.assertEquals(1, sqlgGraph.traversal().V().hasLabel("Dog").count().next(), 0L);
}
@Test
public void testBatchWithAttributeWithBackSlashAsLastChar() {
this.sqlgGraph.tx().streamingBatchModeOn();
this.sqlgGraph.streamVertex(T.label, "Person", "name", "a\\", "test", "b\\");
this.sqlgGraph.streamVertex(T.label, "Person", "name", "a\\", "test", "b\\");
this.sqlgGraph.streamVertex(T.label, "Person", "name", "a\\", "test", "b\\");
this.sqlgGraph.streamVertex(T.label, "Person", "name", "a\\", "test", "b\\");
this.sqlgGraph.streamVertex(T.label, "Person", "name", "a\\", "test", "b\\");
this.sqlgGraph.streamVertex(T.label, "Person", "name", "a\\", "test", "b\\");
this.sqlgGraph.tx().commit();
}
}