Coverage Summary for Class: TestBatchStreamEdge (org.umlg.sqlg.test.batch)

Class Class, % Method, % Branch, % Line, %
TestBatchStreamEdge 100% (1/1) 100% (32/32) 87.8% (86/98) 99.1% (426/430)


 package org.umlg.sqlg.test.batch;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.commons.lang3.time.StopWatch;
 import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.junit.*;
 import org.umlg.sqlg.structure.SqlgGraph;
 import org.umlg.sqlg.structure.SqlgVertex;
 import org.umlg.sqlg.structure.topology.Topology;
 import org.umlg.sqlg.test.BaseTest;
 
 import java.math.BigDecimal;
 import java.time.*;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
  * Date: 2015/07/18
  * Time: 4:18 PM
  */
 public class TestBatchStreamEdge extends BaseTest {
 
     final private int NUMBER_OF_VERTICES = 1_000;
 
     @BeforeClass
     public static void beforeClass() {
         BaseTest.beforeClass();
         if (isPostgres()) {
             configuration.addProperty("distributed", true);
         }
     }
 
     @Before
     public void beforeTest() {
         Assume.assumeTrue(this.sqlgGraph.getSqlDialect().supportsStreamingBatchMode());
     }
 
     @Test(expected = IllegalStateException.class)
     public void testCanNotCreateBatchEdgeWhileBatchVertexInProgress() {
         SqlgVertex v1 = (SqlgVertex) this.sqlgGraph.addVertex(T.label, "Dog");
         SqlgVertex v2 = (SqlgVertex) this.sqlgGraph.addVertex(T.label, "House");
         this.sqlgGraph.tx().streamingBatchModeOn();
         LinkedHashMap<String, Object> keyValues = new LinkedHashMap<>();
         keyValues.put("name", "test");
         this.sqlgGraph.streamVertex("A", keyValues);
         this.sqlgGraph.streamVertex("A", keyValues);
         v1.streamEdge("a", v2);
         Assert.fail();
     }
 
     @Test(expected = IllegalStateException.class)
     public void testEdgeLabelRemainsTheSame() {
         SqlgVertex v1 = (SqlgVertex) this.sqlgGraph.addVertex(T.label, "A");
         SqlgVertex v2 = (SqlgVertex) this.sqlgGraph.addVertex(T.label, "A");
         this.sqlgGraph.tx().commit();
         this.sqlgGraph.tx().streamingBatchModeOn();
         v1.streamEdge("a", v2);
         v1.streamEdge("b", v2);
         Assert.fail();
     }
 
     @Test
     public void testEdgeFlushAndCloseStream() throws InterruptedException {
         SqlgVertex v1 = (SqlgVertex) this.sqlgGraph.addVertex(T.label, "A");
         SqlgVertex v2 = (SqlgVertex) this.sqlgGraph.addVertex(T.label, "A");
         this.sqlgGraph.tx().commit();
         this.sqlgGraph.tx().streamingBatchModeOn();
         v1.streamEdge("a", v2);
         this.sqlgGraph.tx().flush();
         this.sqlgGraph.tx().streamingBatchModeOn();
         v1.streamEdge("b", v2);
         this.sqlgGraph.tx().commit();
         testEdgeFlushCloseStream_assert(this.sqlgGraph);
         if (this.sqlgGraph1 != null) {
             Thread.sleep(SLEEP_TIME);
             testEdgeFlushCloseStream_assert(this.sqlgGraph1);
         }
     }
 
     private void testEdgeFlushCloseStream_assert(SqlgGraph sqlgGraph) {
         Assert.assertEquals(1, sqlgGraph.traversal().E().hasLabel("a").count().next(), 1);
         Assert.assertEquals(1, sqlgGraph.traversal().E().hasLabel("b").count().next(), 1);
     }
 
     @Test(expected = IllegalStateException.class)
     public void testEdgePropertiesRemainsTheSame() {
         SqlgVertex v1 = (SqlgVertex) this.sqlgGraph.addVertex(T.label, "A");
         SqlgVertex v2 = (SqlgVertex) this.sqlgGraph.addVertex(T.label, "A");
         this.sqlgGraph.tx().commit();
         this.sqlgGraph.tx().streamingBatchModeOn();
         LinkedHashMap<String, Object> keyValues = new LinkedHashMap<>();
         keyValues.put("name", "halo");
         v1.streamEdge("a", v2, keyValues);
         keyValues.clear();
         keyValues.put("namea", "halo");
         v1.streamEdge("a", v2, keyValues);
         Assert.fail();
     }
 
     @Test(expected = IllegalStateException.class)
     public void testEdgePropertiesSameOrder() {
         SqlgVertex v1 = (SqlgVertex) this.sqlgGraph.addVertex(T.label, "A");
         SqlgVertex v2 = (SqlgVertex) this.sqlgGraph.addVertex(T.label, "A");
         this.sqlgGraph.tx().commit();
         this.sqlgGraph.tx().streamingBatchModeOn();
         LinkedHashMap<String, Object> keyValues = new LinkedHashMap<>();
         keyValues.put("name", "halo");
         keyValues.put("surname", "test");
         v1.streamEdge("a", v2, keyValues);
         keyValues.clear();
         keyValues.put("surname", "test");
         keyValues.put("name", "halo");
         v1.streamEdge("a", v2, keyValues);
         Assert.fail();
     }
 
 
     @Test
     public void testStreamingVerticesAndEdges() throws InterruptedException {
         this.sqlgGraph.tx().streamingBatchModeOn();
         LinkedHashMap<String, Object> keyValues = new LinkedHashMap<>();
         keyValues.put("name", "halo");
         keyValues.put("surname", "halo");
         for (int i = 0; i < 1000; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Man", keyValues);
         }
         this.sqlgGraph.tx().flush();
         this.sqlgGraph.tx().streamingBatchModeOn();
         for (int i = 0; i < 1000; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Female", keyValues);
         }
         this.sqlgGraph.tx().flush();
         this.sqlgGraph.tx().streamingBatchModeOn();
         int count = 0;
         List<Vertex> men = this.sqlgGraph.traversal().V().hasLabel("Man").toList();
         List<Vertex> females = this.sqlgGraph.traversal().V().hasLabel("Female").toList();
         for (Vertex man : men) {
             SqlgVertex female = (SqlgVertex) females.get(count++);
             ((SqlgVertex)man).streamEdge("married", female);
         }
         this.sqlgGraph.tx().commit();
         testStreamingVerticesAndEdges_assert(this.sqlgGraph);
         if (this.sqlgGraph1 != null) {
             Thread.sleep(SLEEP_TIME);
             testStreamingVerticesAndEdges_assert(this.sqlgGraph1);
         }
     }
 
     private void testStreamingVerticesAndEdges_assert(SqlgGraph sqlgGraph) {
         Assert.assertEquals(1000, sqlgGraph.traversal().V().hasLabel("Man").count().next(), 1);
         Assert.assertEquals(1000, sqlgGraph.traversal().V().hasLabel("Female").count().next(), 1);
         Assert.assertEquals(1000, sqlgGraph.traversal().E().hasLabel("married").count().next(), 1);
     }
 
     @Test
     public void testMilCompleteEdges() {
         ArrayList<SqlgVertex> persons = createMilPersonVertex();
         ArrayList<SqlgVertex> cars = createMilCarVertex();
         this.sqlgGraph.tx().commit();
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
         this.sqlgGraph.tx().streamingBatchModeOn();
         LinkedHashMap<String, Object> keyValues = new LinkedHashMap<>();
         keyValues.put("name", "halo");
         keyValues.put("name2", "halo");
         for (int i = 0; i < NUMBER_OF_VERTICES; i++) {
             SqlgVertex person = persons.get(0);
             SqlgVertex car = cars.get(i);
             person.streamEdge("person_car", car, keyValues);
         }
         this.sqlgGraph.tx().commit();
         int mb = 1024 * 1024;
 
         // get Runtime instance
         Runtime instance = Runtime.getRuntime();
 
         System.out.println("***** Heap utilization statistics [MB] *****\n");
 
         // available memory
         System.out.println("Total Memory: " + instance.totalMemory() / mb);
 
         // free memory
         System.out.println("Free Memory: " + instance.freeMemory() / mb);
 
         // used memory
         System.out.println("Used Memory: "
                 + (instance.totalMemory() - instance.freeMemory()) / mb);
 
         // Maximum available memory
         System.out.println("Max Memory: " + instance.maxMemory() / mb);
         Assert.assertEquals(NUMBER_OF_VERTICES, this.sqlgGraph.traversal().V(persons.get(0)).out("person_car").toList().size());
         stopWatch.stop();
         System.out.println("testMilCompleteEdges took " + stopWatch.toString());
     }
 
     @Test
     public void testEdgeWithProperties() throws InterruptedException {
         this.sqlgGraph.tx().streamingBatchModeOn();
         LinkedHashMap<String, Object> keyValues = new LinkedHashMap<>();
         keyValues.put("name", "halo");
         keyValues.put("surname", "halo");
         for (int i = 0; i < 1000; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Man", keyValues);
         }
         this.sqlgGraph.tx().flush();
         for (int i = 0; i < 1000; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Female", keyValues);
         }
         this.sqlgGraph.tx().flush();
         int count = 0;
         List<Vertex> men = this.sqlgGraph.traversal().V().hasLabel("Man").toList();
         List<Vertex> females = this.sqlgGraph.traversal().V().hasLabel("Female").toList();
         LinkedHashMap<String, Object> edgeKeyValues = new LinkedHashMap<>();
         edgeKeyValues.put("name", "halo");
         edgeKeyValues.put("surname", "halo");
         for (Vertex man : men) {
             SqlgVertex female = (SqlgVertex) females.get(count++);
             ((SqlgVertex)man).streamEdge("married", female, edgeKeyValues);
         }
         this.sqlgGraph.tx().commit();
         testEdgeWithProperties_assert(this.sqlgGraph);
         if (this.sqlgGraph1 != null) {
             Thread.sleep(SLEEP_TIME);
             testEdgeWithProperties_assert(this.sqlgGraph1);
         }
     }
 
     private void testEdgeWithProperties_assert(SqlgGraph sqlgGraph) {
         Assert.assertEquals(1000, sqlgGraph.traversal().V().hasLabel("Man").count().next(), 1);
         Assert.assertEquals(1000, sqlgGraph.traversal().V().hasLabel("Female").count().next(), 1);
         Assert.assertEquals(1000, sqlgGraph.traversal().E().hasLabel("married").count().next(), 1);
         Assert.assertEquals(1000, sqlgGraph.traversal().E().hasLabel("married").values("name").count().next(), 1);
         Assert.assertEquals(1000, sqlgGraph.traversal().E().hasLabel("married").values("surname").count().next(), 1);
         Assert.assertEquals("halo", sqlgGraph.traversal().E().hasLabel("married").values("name").next());
         Assert.assertEquals("halo", sqlgGraph.traversal().E().hasLabel("married").values("surname").next());
     }
 
     @Test
     public void testStreamLocalDateTime() throws InterruptedException {
         this.sqlgGraph.tx().streamingBatchModeOn();
         LinkedHashMap<String, Object> keyValues = new LinkedHashMap<>();
         keyValues.put("name", "halo");
         keyValues.put("surname", "halo");
         for (int i = 0; i < 10; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Man", keyValues);
         }
         this.sqlgGraph.tx().flush();
         for (int i = 0; i < 10; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Female", keyValues);
         }
         this.sqlgGraph.tx().flush();
         int count = 0;
         List<Vertex> men = this.sqlgGraph.traversal().V().hasLabel("Man").toList();
         List<Vertex> females = this.sqlgGraph.traversal().V().hasLabel("Female").toList();
         LinkedHashMap<String, Object> edgeKeyValues = new LinkedHashMap<>();
         LocalDateTime now = LocalDateTime.now().truncatedTo(ChronoUnit.MILLIS);
         edgeKeyValues.put("localDateTime", now);
         for (Vertex man : men) {
             SqlgVertex female = (SqlgVertex) females.get(count++);
             ((SqlgVertex)man).streamEdge("married", female, edgeKeyValues);
         }
         this.sqlgGraph.tx().commit();
         testStreamLocalDateTime_assert(this.sqlgGraph, now);
         if (this.sqlgGraph1 != null) {
             Thread.sleep(SLEEP_TIME);
             testStreamLocalDateTime_assert(this.sqlgGraph1, now);
         }
     }
 
     private void testStreamLocalDateTime_assert(SqlgGraph sqlgGraph, LocalDateTime now) {
         Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Man").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Female").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().E().hasLabel("married").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().E().hasLabel("married").values("localDateTime").count().next(), 1);
         Assert.assertEquals(now, sqlgGraph.traversal().E().hasLabel("married").values("localDateTime").next());
     }
 
     @Test
     public void testStreamLocalDate() throws InterruptedException {
         this.sqlgGraph.tx().streamingBatchModeOn();
         LinkedHashMap<String, Object> keyValues = new LinkedHashMap<>();
         keyValues.put("name", "halo");
         keyValues.put("surname", "halo");
         for (int i = 0; i < 10; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Man", keyValues);
         }
         this.sqlgGraph.tx().flush();
         for (int i = 0; i < 10; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Female", keyValues);
         }
         this.sqlgGraph.tx().flush();
         int count = 0;
         List<Vertex> men = this.sqlgGraph.traversal().V().hasLabel("Man").toList();
         List<Vertex> females = this.sqlgGraph.traversal().V().hasLabel("Female").toList();
         LinkedHashMap<String, Object> edgeKeyValues = new LinkedHashMap<>();
         LocalDate localDate = LocalDate.now();
         edgeKeyValues.put("localDate", localDate);
         for (Vertex man : men) {
             SqlgVertex female = (SqlgVertex) females.get(count++);
             ((SqlgVertex)man).streamEdge("married", female, edgeKeyValues);
         }
         this.sqlgGraph.tx().commit();
         testStreamLocalDate_assert(this.sqlgGraph, localDate);
         if (this.sqlgGraph1 != null) {
             Thread.sleep(SLEEP_TIME);
             testStreamLocalDate_assert(this.sqlgGraph1, localDate);
         }
     }
 
     private void testStreamLocalDate_assert(SqlgGraph sqlgGraph, LocalDate localDate) {
         Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Man").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Female").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().E().hasLabel("married").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().E().hasLabel("married").values("localDate").count().next(), 1);
         Assert.assertEquals(localDate, sqlgGraph.traversal().E().hasLabel("married").values("localDate").next());
     }
 
     @Test
     public void testStreamLocalTime() throws InterruptedException {
         this.sqlgGraph.tx().streamingBatchModeOn();
         LinkedHashMap<String, Object> keyValues = new LinkedHashMap<>();
         keyValues.put("name", "halo");
         keyValues.put("surname", "halo");
         for (int i = 0; i < 10; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Man", keyValues);
         }
         this.sqlgGraph.tx().flush();
         for (int i = 0; i < 10; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Female", keyValues);
         }
         this.sqlgGraph.tx().flush();
         int count = 0;
         List<Vertex> men = this.sqlgGraph.traversal().V().hasLabel("Man").toList();
         List<Vertex> females = this.sqlgGraph.traversal().V().hasLabel("Female").toList();
         LinkedHashMap<String, Object> edgeKeyValues = new LinkedHashMap<>();
         LocalTime localTime = LocalTime.now();
         edgeKeyValues.put("localTime", localTime);
         for (Vertex man : men) {
             SqlgVertex female = (SqlgVertex) females.get(count++);
             ((SqlgVertex)man).streamEdge("married", female, edgeKeyValues);
         }
         this.sqlgGraph.tx().commit();
         testStreamLocalTime_assert(this.sqlgGraph, localTime);
         if (this.sqlgGraph1 != null) {
             Thread.sleep(SLEEP_TIME);
             testStreamLocalTime_assert(this.sqlgGraph1, localTime);
         }
     }
 
     private void testStreamLocalTime_assert(SqlgGraph sqlgGraph, LocalTime localTime) {
         Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Man").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Female").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().E().hasLabel("married").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().E().hasLabel("married").values("localTime").count().next(), 1);
         Assert.assertEquals(localTime.toSecondOfDay(), sqlgGraph.traversal().E().hasLabel("married").<LocalTime>values("localTime").next().toSecondOfDay());
     }
 
     @Test
     public void testStreamZonedDateTime() throws InterruptedException {
         this.sqlgGraph.tx().streamingBatchModeOn();
         LinkedHashMap<String, Object> keyValues = new LinkedHashMap<>();
         keyValues.put("name", "halo");
         keyValues.put("surname", "halo");
         for (int i = 0; i < 10; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Man", keyValues);
         }
         this.sqlgGraph.tx().flush();
         for (int i = 0; i < 10; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Female", keyValues);
         }
         this.sqlgGraph.tx().flush();
         int count = 0;
         List<Vertex> men = this.sqlgGraph.traversal().V().hasLabel("Man").toList();
         List<Vertex> females = this.sqlgGraph.traversal().V().hasLabel("Female").toList();
         LinkedHashMap<String, Object> edgeKeyValues = new LinkedHashMap<>();
         ZonedDateTime zonedDateTime = ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS);
         edgeKeyValues.put("zonedDateTime", zonedDateTime);
         for (Vertex man : men) {
             SqlgVertex female = (SqlgVertex) females.get(count++);
             ((SqlgVertex)man).streamEdge("married", female, edgeKeyValues);
         }
         this.sqlgGraph.tx().commit();
         testStreamZonedDateTime_assert(this.sqlgGraph, zonedDateTime);
         if (this.sqlgGraph1 != null) {
             Thread.sleep(SLEEP_TIME);
             testStreamZonedDateTime_assert(this.sqlgGraph1, zonedDateTime);
         }
     }
 
     private void testStreamZonedDateTime_assert(SqlgGraph sqlgGraph, ZonedDateTime zonedDateTime) {
         Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Man").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Female").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().E().hasLabel("married").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().E().hasLabel("married").values("zonedDateTime").count().next(), 1);
         Assert.assertEquals(zonedDateTime, sqlgGraph.traversal().E().hasLabel("married").values("zonedDateTime").next());
     }
 
     @Test
     public void testStreamPeriod() throws InterruptedException {
         this.sqlgGraph.tx().streamingBatchModeOn();
         LinkedHashMap<String, Object> keyValues = new LinkedHashMap<>();
         keyValues.put("name", "halo");
         keyValues.put("surname", "halo");
         for (int i = 0; i < 10; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Man", keyValues);
         }
         this.sqlgGraph.tx().flush();
         for (int i = 0; i < 10; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Female", keyValues);
         }
         this.sqlgGraph.tx().flush();
         int count = 0;
         List<Vertex> men = this.sqlgGraph.traversal().V().hasLabel("Man").toList();
         List<Vertex> females = this.sqlgGraph.traversal().V().hasLabel("Female").toList();
         LinkedHashMap<String, Object> edgeKeyValues = new LinkedHashMap<>();
         Period period = Period.of(1,2,3);
         edgeKeyValues.put("period", period);
         for (Vertex man : men) {
             SqlgVertex female = (SqlgVertex) females.get(count++);
             ((SqlgVertex)man).streamEdge("married", female, edgeKeyValues);
         }
         this.sqlgGraph.tx().commit();
         testStreamPeriod_assert(this.sqlgGraph, period);
         if (this.sqlgGraph1 != null) {
             Thread.sleep(SLEEP_TIME);
             testStreamPeriod_assert(this.sqlgGraph1, period);
         }
     }
 
     private void testStreamPeriod_assert(SqlgGraph sqlgGraph, Period period) {
         Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Man").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Female").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().E().hasLabel("married").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().E().hasLabel("married").values("period").count().next(), 1);
         Assert.assertEquals(period, sqlgGraph.traversal().E().hasLabel("married").values("period").next());
     }
 
     @Test
     public void testStreamDuration() throws InterruptedException {
         this.sqlgGraph.tx().streamingBatchModeOn();
         LinkedHashMap<String, Object> keyValues = new LinkedHashMap<>();
         keyValues.put("name", "halo");
         keyValues.put("surname", "halo");
         for (int i = 0; i < 10; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Man", keyValues);
         }
         this.sqlgGraph.tx().flush();
         for (int i = 0; i < 10; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Female", keyValues);
         }
         this.sqlgGraph.tx().flush();
         int count = 0;
         List<Vertex> men = this.sqlgGraph.traversal().V().hasLabel("Man").toList();
         List<Vertex> females = this.sqlgGraph.traversal().V().hasLabel("Female").toList();
         LinkedHashMap<String, Object> edgeKeyValues = new LinkedHashMap<>();
         Duration duration = Duration.ofHours(19);
         edgeKeyValues.put("duration", duration);
         for (Vertex man : men) {
             SqlgVertex female = (SqlgVertex) females.get(count++);
             ((SqlgVertex)man).streamEdge("married", female, edgeKeyValues);
         }
         this.sqlgGraph.tx().commit();
         testStreamDuration_assert(this.sqlgGraph, duration);
         if (this.sqlgGraph1 != null) {
             Thread.sleep(SLEEP_TIME);
             testStreamDuration_assert(this.sqlgGraph1, duration);
         }
     }
 
     private void testStreamDuration_assert(SqlgGraph sqlgGraph, Duration duration) {
         Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Man").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Female").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().E().hasLabel("married").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().E().hasLabel("married").values("duration").count().next(), 1);
         Assert.assertEquals(duration, sqlgGraph.traversal().E().hasLabel("married").values("duration").next());
     }
 
     @Test
     public void testStreamJson() throws InterruptedException {
         this.sqlgGraph.tx().streamingBatchModeOn();
         LinkedHashMap<String, Object> keyValues = new LinkedHashMap<>();
         keyValues.put("name", "halo");
         keyValues.put("surname", "halo");
         for (int i = 0; i < 10; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Man", keyValues);
         }
         this.sqlgGraph.tx().flush();
         for (int i = 0; i < 10; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Female", keyValues);
         }
         this.sqlgGraph.tx().flush();
         int count = 0;
         List<Vertex> men = this.sqlgGraph.traversal().V().hasLabel("Man").toList();
         List<Vertex> females = this.sqlgGraph.traversal().V().hasLabel("Female").toList();
         LinkedHashMap<String, Object> edgeKeyValues = new LinkedHashMap<>();
 
         ObjectNode json = Topology.OBJECT_MAPPER.createObjectNode();
         json.put("username", "john");
 
         edgeKeyValues.put("doc", json);
         for (Vertex man : men) {
             SqlgVertex female = (SqlgVertex) females.get(count++);
             ((SqlgVertex)man).streamEdge("married", female, edgeKeyValues);
         }
         this.sqlgGraph.tx().commit();
         testStreamJson_assert(this.sqlgGraph, json);
         if (this.sqlgGraph1 != null) {
             Thread.sleep(SLEEP_TIME);
             testStreamJson_assert(this.sqlgGraph1, json);
         }
     }
 
     private void testStreamJson_assert(SqlgGraph sqlgGraph, ObjectNode json) {
         Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Man").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Female").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().E().hasLabel("married").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().E().hasLabel("married").values("doc").count().next(), 1);
         Assert.assertEquals(json, sqlgGraph.traversal().E().hasLabel("married").values("doc").next());
     }
 
     @Test
     public void testStreamBigDecimal() throws InterruptedException {
         this.sqlgGraph.tx().streamingBatchModeOn();
         LinkedHashMap<String, Object> keyValues = new LinkedHashMap<>();
         keyValues.put("name", "halo");
         keyValues.put("surname", "halo");
         for (int i = 0; i < 10; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Man", keyValues);
         }
         this.sqlgGraph.tx().flush();
         for (int i = 0; i < 10; i++) {
             keyValues.put("age", i);
             this.sqlgGraph.streamVertex("Female", keyValues);
         }
         this.sqlgGraph.tx().flush();
         int count = 0;
         List<Vertex> men = this.sqlgGraph.traversal().V().hasLabel("Man").toList();
         List<Vertex> females = this.sqlgGraph.traversal().V().hasLabel("Female").toList();
         LinkedHashMap<String, Object> edgeKeyValues = new LinkedHashMap<>();
         BigDecimal bigDecimal = BigDecimal.valueOf(1.1D);
         edgeKeyValues.put("bigDecimal", bigDecimal);
         for (Vertex man : men) {
             SqlgVertex female = (SqlgVertex) females.get(count++);
             ((SqlgVertex)man).streamEdge("married", female, edgeKeyValues);
         }
         this.sqlgGraph.tx().commit();
         testStreamBigDecimal_assert(this.sqlgGraph, bigDecimal);
         if (this.sqlgGraph1 != null) {
             Thread.sleep(SLEEP_TIME);
             testStreamBigDecimal_assert(this.sqlgGraph1, bigDecimal);
         }
     }
 
     private void testStreamBigDecimal_assert(SqlgGraph sqlgGraph, BigDecimal bigDecimal) {
         Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Man").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().V().hasLabel("Female").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().E().hasLabel("married").count().next(), 1);
         Assert.assertEquals(10, sqlgGraph.traversal().E().hasLabel("married").values("bigDecimal").count().next(), 1);
         Assert.assertEquals(bigDecimal, sqlgGraph.traversal().E().hasLabel("married").values("bigDecimal").next());
     }
 
     private ArrayList<SqlgVertex> createMilPersonVertex() {
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
         ArrayList<SqlgVertex> result = new ArrayList<>();
         this.sqlgGraph.tx().normalBatchModeOn();
         for (int i = 1; i < NUMBER_OF_VERTICES + 1; i++) {
             Map<String, Object> keyValue = new LinkedHashMap<>();
             for (int j = 0; j < 100; j++) {
                 keyValue.put("name" + j, "aaaaaaaaaa" + i);
             }
             SqlgVertex person = (SqlgVertex) this.sqlgGraph.addVertex("Person", keyValue);
             result.add(person);
             if (i % (NUMBER_OF_VERTICES / 10) == 0) {
                 this.sqlgGraph.tx().commit();
                 this.sqlgGraph.tx().normalBatchModeOn();
             }
         }
         this.sqlgGraph.tx().commit();
         stopWatch.stop();
         System.out.println("createMilPersonVertex took " + stopWatch.toString());
         return result;
     }
 
     private ArrayList<SqlgVertex> createMilCarVertex() {
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
         ArrayList<SqlgVertex> result = new ArrayList<>();
         this.sqlgGraph.tx().normalBatchModeOn();
         for (int i = 1; i < NUMBER_OF_VERTICES + 1; i++) {
             Map<String, Object> keyValue = new LinkedHashMap<>();
             for (int j = 0; j < 100; j++) {
                 keyValue.put("name" + j, "aaaaaaaaaa" + i);
             }
             SqlgVertex car = (SqlgVertex) this.sqlgGraph.addVertex("Car", keyValue);
             result.add(car);
             if (i % (NUMBER_OF_VERTICES / 10) == 0) {
                 this.sqlgGraph.tx().commit();
                 this.sqlgGraph.tx().normalBatchModeOn();
             }
         }
         this.sqlgGraph.tx().commit();
         stopWatch.stop();
         System.out.println("createMilCarVertex took " + stopWatch.toString());
         return result;
     }
 
 }