Coverage Summary for Class: TestBatchedStreaming (org.umlg.sqlg.test.batch)

Class Method, % Branch, % Line, %
TestBatchedStreaming 100% (19/19) 84.8% (39/46) 100% (201/201)
TestBatchedStreaming$1 100% (1/1) 100% (3/3)
TestBatchedStreaming$2 100% (1/1) 100% (4/4)
Total 100% (21/21) 84.8% (39/46) 100% (208/208)


 package org.umlg.sqlg.test.batch;
 
 import org.apache.commons.lang3.time.StopWatch;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
 import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.umlg.sqlg.structure.PropertyDefinition;
 import org.umlg.sqlg.structure.PropertyType;
 import org.umlg.sqlg.structure.SqlgExceptions;
 import org.umlg.sqlg.structure.SqlgGraph;
 import org.umlg.sqlg.test.BaseTest;
 
 import java.util.*;
 
 /**
  * Date: 2015/10/03
  * Time: 8:53 PM
  */
 public class TestBatchedStreaming extends BaseTest {
 
  private static final Logger LOGGER = LoggerFactory.getLogger(TestBatchedStreaming.class);
 
  @BeforeClass
  public static void beforeClass() {
  BaseTest.beforeClass();
  if (isPostgres()) {
  configuration.addProperty("distributed", true);
  }
  }
 
  @Before
  public void beforeTest() {
  Assume.assumeTrue(this.sqlgGraph.getSqlDialect().supportsStreamingBatchMode());
  }
 
  @Test
  public void testWithoutLock() {
  this.sqlgGraph.getTopology().getPublicSchema().ensureVertexLabelExist("A", new HashMap<>() {{
  put("name", PropertyDefinition.of(PropertyType.STRING));
  put("age", PropertyDefinition.of(PropertyType.INTEGER));
  }});
  this.sqlgGraph.tx().commit();
  this.sqlgGraph.tx().streamingBatchModeOn();
  StopWatch stopWatch = StopWatch.createStarted();
  for (int i = 0; i < 1_000_000; i++) {
  this.sqlgGraph.streamVertex(T.label, "A", "name", "John", "age", 1);
  }
  this.sqlgGraph.tx().commit();
  Assert.assertEquals(1_000_000, this.sqlgGraph.traversal().V().hasLabel("A").count().next(), 0L);
  stopWatch.stop();
  LOGGER.info(stopWatch.toString());
  }
 
  @Test
  public void testWithLock() {
  Assume.assumeTrue(sqlgGraph.getSqlDialect().supportsStreamingBatchMode());
  int size = 1_000_000;
 
  Map<String, PropertyDefinition> properties = new HashMap<>();
  for (int i = 0; i < 100; i++) {
  properties.put("a" + i, PropertyDefinition.of(PropertyType.STRING));
  }
  this.sqlgGraph.getTopology().getPublicSchema().ensureVertexLabelExist("A", properties);
  this.sqlgGraph.tx().commit();
  this.sqlgGraph.getTopology().lock();
  this.sqlgGraph.tx().streamingBatchModeOn();
  LinkedHashMap<String, Object> row = new LinkedHashMap<>();
  for (int j = 0; j < 100; j++) {
  row.put("a" + j, "v_" + j);
  }
  StopWatch stopWatch = StopWatch.createStarted();
  for (int i = 0; i < size; i++) {
  this.sqlgGraph.streamVertex("A", row);
  }
  this.sqlgGraph.tx().commit();
  stopWatch.stop();
  LOGGER.info(stopWatch.toString());
  Assert.assertEquals(size, this.sqlgGraph.traversal().V().hasLabel("A").count().next(), 0L);
  }
 
  @Test
  public void testNullProperties() throws InterruptedException {
  this.sqlgGraph.tx().streamingBatchModeOn();
  //surname is null, that bad-ass means null
  try {
  this.sqlgGraph.streamVertex(T.label, "Person", "name", "John1", "surname", null, "age", 1);
  } catch (SqlgExceptions.InvalidPropertyTypeException e) {
  Assert.assertEquals("Property of type NULL is not supported", e.getMessage());
  }
  try {
  this.sqlgGraph.streamVertex(T.label, "Person", "name", "John2", "surname", "Smith", "age", null);
  } catch (SqlgExceptions.InvalidPropertyTypeException e) {
  Assert.assertEquals("Property of type NULL is not supported", e.getMessage());
  }
  this.sqlgGraph.getTopology().getPublicSchema().ensureVertexLabelExist(
  "Person",
  new LinkedHashMap<>() {{
  put("name", PropertyDefinition.of(PropertyType.STRING));
  put("surname", PropertyDefinition.of(PropertyType.STRING));
  put("age", PropertyDefinition.of(PropertyType.INTEGER));
  }});
  this.sqlgGraph.streamVertex(T.label, "Person", "name", "John1", "surname", null, "age", 1);
  this.sqlgGraph.streamVertex(T.label, "Person", "name", "John2", "surname", "Smith", "age", null);
  this.sqlgGraph.streamVertex(T.label, "Person", "name", "John3", "surname", "", "age", 1);
  this.sqlgGraph.tx().commit();
  testNullProperties_assert(this.sqlgGraph);
  if (this.sqlgGraph1 != null) {
  Thread.sleep(1000);
  testNullProperties_assert(this.sqlgGraph1);
  }
  }
 
  private void testNullProperties_assert(SqlgGraph sqlgGraph) {
  Vertex john1 = sqlgGraph.traversal().V().hasLabel("Person").has("name", "John1").next();
  Assert.assertNull(john1.property("surname").value());
  Assert.assertNotNull(john1.property("age").value());
  Vertex john2 = sqlgGraph.traversal().V().hasLabel("Person").has("name", "John2").next();
  Assert.assertNotNull(john2.property("surname").value());
  Assert.assertNull(john2.property("age").value());
  Vertex john3 = sqlgGraph.traversal().V().hasLabel("Person").has("name", "John3").next();
  Assert.assertNotNull(john3.property("surname").value());
  Assert.assertEquals("", john3.value("surname"));
  }
 
  @Test
  public void testStreamingWithBatchSize() throws InterruptedException {
  int BATCH_SIZE = 100;
  StopWatch stopWatch = new StopWatch();
  stopWatch.start();
  LinkedHashMap<String, Object> properties = new LinkedHashMap<>();
  this.sqlgGraph.tx().streamingWithLockBatchModeOn();
  List<Pair<Vertex, Vertex>> uids = new ArrayList<>();
  String uuidCache1 = null;
  String uuidCache2 = null;
  for (int i = 1; i <= 1000; i++) {
  String uuid1 = UUID.randomUUID().toString();
  String uuid2 = UUID.randomUUID().toString();
  if (i == 50) {
  uuidCache1 = uuid1;
  uuidCache2 = uuid2;
  }
  properties.put("id", uuid1);
  Vertex v1 = this.sqlgGraph.addVertex("Person", properties);
  properties.put("id", uuid2);
  Vertex v2 = this.sqlgGraph.addVertex("Person", properties);
  uids.add(Pair.of(v1, v2));
  if (i % (BATCH_SIZE / 2) == 0) {
  this.sqlgGraph.tx().flush();
  this.sqlgGraph.tx().streamingWithLockBatchModeOn();
  for (Pair<Vertex, Vertex> uid : uids) {
  uid.getLeft().addEdge("friend", uid.getRight());
  }
  //This is needed because the number of edges are less than the batch size so it will not be auto flushed
  this.sqlgGraph.tx().flush();
  uids.clear();
  this.sqlgGraph.tx().streamingWithLockBatchModeOn();
  }
  }
  this.sqlgGraph.tx().commit();
  stopWatch.stop();
 
  testStreamingWithBatchSize(this.sqlgGraph, uuidCache1, uuidCache2);
  if (this.sqlgGraph1 != null) {
  Thread.sleep(1000);
  testStreamingWithBatchSize(this.sqlgGraph1, uuidCache1, uuidCache2);
  }
  }
 
  private void testStreamingWithBatchSize(SqlgGraph sqlgGraph, String uuidCache1, String uuidCache2) {
 
  Assert.assertEquals(2000, sqlgGraph.traversal().V().hasLabel("Person").count().next(), 0);
  Assert.assertEquals(1000, sqlgGraph.traversal().E().hasLabel("friend").count().next(), 0);
 
  GraphTraversal<Vertex, Vertex> has = sqlgGraph.traversal().V().hasLabel("Person").has("id", uuidCache1);
  Assert.assertTrue(has.hasNext());
  Vertex person50 = has.next();
 
  GraphTraversal<Vertex, Vertex> has1 = sqlgGraph.traversal().V().hasLabel("Person").has("id", uuidCache2);
  Assert.assertTrue(has1.hasNext());
  Vertex person250 = has1.next();
  Assert.assertTrue(sqlgGraph.traversal().V(person50.id()).out().hasNext());
  Vertex person250Please = sqlgGraph.traversal().V(person50.id()).out().next();
  Assert.assertEquals(person250, person250Please);
  }
 
  @Test
  public void testStreamingWithBatchSizeNonDefaultSchema() throws InterruptedException {
  final int BATCH_SIZE = 1000;
  StopWatch stopWatch = new StopWatch();
  stopWatch.start();
  LinkedHashMap<String, Object> properties = new LinkedHashMap<>();
  this.sqlgGraph.tx().streamingWithLockBatchModeOn();
  List<Pair<Vertex, Vertex>> uids = new ArrayList<>();
  String uuidCache1 = null;
  String uuidCache2 = null;
  for (int i = 1; i <= 1000; i++) {
  String uuid1 = UUID.randomUUID().toString();
  String uuid2 = UUID.randomUUID().toString();
  if (i == 50) {
  uuidCache1 = uuid1;
  uuidCache2 = uuid2;
  }
  properties.put("id", uuid1);
  Vertex v1 = this.sqlgGraph.addVertex("A.Person", properties);
  properties.put("id", uuid2);
  Vertex v2 = this.sqlgGraph.addVertex("A.Person", properties);
  uids.add(Pair.of(v1, v2));
  if (i % (BATCH_SIZE / 2) == 0) {
  this.sqlgGraph.tx().flush();
  for (Pair<Vertex, Vertex> uid : uids) {
  uid.getLeft().addEdge("friend", uid.getRight());
  }
  //This is needed because the number of edges are less than the batch size so it will not be auto flushed
  this.sqlgGraph.tx().flush();
  uids.clear();
  }
  }
  this.sqlgGraph.tx().commit();
  stopWatch.stop();
 
  testStreamingWithBatchSizeNonDefaultSchema_assert(this.sqlgGraph, uuidCache1, uuidCache2);
  if (this.sqlgGraph1 != null) {
  Thread.sleep(1000);
  testStreamingWithBatchSizeNonDefaultSchema_assert(this.sqlgGraph1, uuidCache1, uuidCache2);
  }
  }
 
  private void testStreamingWithBatchSizeNonDefaultSchema_assert(SqlgGraph sqlgGraph, String uuidCache1, String uuidCache2) {
  Assert.assertEquals(2000, sqlgGraph.traversal().V().hasLabel("A.Person").count().next(), 0);
  Assert.assertEquals(1000, sqlgGraph.traversal().E().hasLabel("A.friend").count().next(), 0);
 
  GraphTraversal<Vertex, Vertex> has = sqlgGraph.traversal().V().hasLabel("A.Person").has("id", uuidCache1);
  Assert.assertTrue(has.hasNext());
  Vertex person50 = has.next();
 
  GraphTraversal<Vertex, Vertex> has1 = sqlgGraph.traversal().V().hasLabel("A.Person").has("id", uuidCache2);
  Assert.assertTrue(has1.hasNext());
  Vertex person250 = has1.next();
  Assert.assertTrue(sqlgGraph.traversal().V(person50.id()).out().hasNext());
  Vertex person250Please = sqlgGraph.traversal().V(person50.id()).out().next();
  Assert.assertEquals(person250, person250Please);
  }
 
  @Test
  public void testStreamingWithBatchSizeWithCallBack() throws InterruptedException {
  LinkedHashMap<String, Object> properties = new LinkedHashMap<>();
  List<Vertex> persons = new ArrayList<>();
  this.sqlgGraph.tx().streamingWithLockBatchModeOn();
  for (int i = 1; i <= 10; i++) {
  String uuid1 = UUID.randomUUID().toString();
  properties.put("id", uuid1);
  persons.add(this.sqlgGraph.addVertex("Person", properties));
  }
  this.sqlgGraph.tx().flush();
  Vertex previous = null;
  for (Vertex person : persons) {
  if (previous == null) {
  previous = person;
  } else {
  previous.addEdge("friend", person);
  }
  }
  this.sqlgGraph.tx().commit();
  testStreamingWithBatchSizeWithCallBack_assert(this.sqlgGraph);
  if (this.sqlgGraph1 != null) {
  Thread.sleep(1000);
  testStreamingWithBatchSizeWithCallBack_assert(this.sqlgGraph1);
  }
  }
 
  private void testStreamingWithBatchSizeWithCallBack_assert(SqlgGraph sqlgGraph) {
  Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Person").count().next(), 0);
  Assert.assertEquals(9, sqlgGraph.traversal().E().hasLabel("friend").count().next(), 0);
  }
 
  @Test
  public void streamJava8StyleWithSchema() throws InterruptedException {
  List<String> uids = Arrays.asList("1", "2", "3", "4", "5");
  this.sqlgGraph.tx().streamingBatchModeOn();
  uids.forEach(u -> this.sqlgGraph.streamVertex(T.label, "R_HG.Person", "name", u));
  this.sqlgGraph.tx().commit();
  streamJava8StyleWithSchema_assert(this.sqlgGraph);
  if (this.sqlgGraph1 != null) {
  Thread.sleep(1000);
  streamJava8StyleWithSchema_assert(this.sqlgGraph1);
  }
  }
 
  private void streamJava8StyleWithSchema_assert(SqlgGraph sqlgGraph) {
  Assert.assertEquals(5, sqlgGraph.traversal().V().hasLabel("R_HG.Person").count().next(), 0L);
  }
 
  @Test
  public void testBatchContinuations() throws InterruptedException {
  this.sqlgGraph.tx().normalBatchModeOn();
  Vertex v1 = this.sqlgGraph.addVertex(T.label, "Person");
  Vertex v2 = this.sqlgGraph.addVertex(T.label, "Dog");
  v1.addEdge("pet", v2);
  this.sqlgGraph.tx().flush();
  this.sqlgGraph.tx().streamingWithLockBatchModeOn();
  for (int i = 1; i <= 100; i++) {
  this.sqlgGraph.addVertex("Person", new LinkedHashMap<>());
  }
  this.sqlgGraph.tx().flush();
  this.sqlgGraph.tx().streamingBatchModeOn();
  this.sqlgGraph.streamVertex("Person", new LinkedHashMap<>());
  this.sqlgGraph.tx().commit();
  testBatchContinuations_assert(this.sqlgGraph);
  if (this.sqlgGraph1 != null) {
  Thread.sleep(1000);
  testBatchContinuations_assert(this.sqlgGraph1);
  }
  }
 
  private void testBatchContinuations_assert(SqlgGraph sqlgGraph) {
  Assert.assertEquals(102, sqlgGraph.traversal().V().hasLabel("Person").count().next(), 0L);
  Assert.assertEquals(1, sqlgGraph.traversal().V().hasLabel("Dog").count().next(), 0L);
  }
 
  @Test
  public void testBatchWithAttributeWithBackSlashAsLastChar() {
  this.sqlgGraph.tx().streamingBatchModeOn();
  this.sqlgGraph.streamVertex(T.label, "Person", "name", "a\\", "test", "b\\");
  this.sqlgGraph.streamVertex(T.label, "Person", "name", "a\\", "test", "b\\");
  this.sqlgGraph.streamVertex(T.label, "Person", "name", "a\\", "test", "b\\");
  this.sqlgGraph.streamVertex(T.label, "Person", "name", "a\\", "test", "b\\");
  this.sqlgGraph.streamVertex(T.label, "Person", "name", "a\\", "test", "b\\");
  this.sqlgGraph.streamVertex(T.label, "Person", "name", "a\\", "test", "b\\");
  this.sqlgGraph.tx().commit();
  }
 
 }