Coverage Summary for Class: TestBatchStreamVertex (org.umlg.sqlg.test.batch)
Class |
Method, %
|
Branch, %
|
Line, %
|
TestBatchStreamVertex |
100%
(72/72)
|
78.8%
(115/146)
|
95%
(553/582)
|
TestBatchStreamVertex$1 |
100%
(1/1)
|
100%
(2/2)
|
TestBatchStreamVertex$2 |
100%
(1/1)
|
100%
(2/2)
|
TestBatchStreamVertex$3 |
100%
(1/1)
|
100%
(2/2)
|
TestBatchStreamVertex$4 |
100%
(1/1)
|
100%
(2/2)
|
TestBatchStreamVertex$5 |
100%
(1/1)
|
100%
(2/2)
|
Total |
100%
(77/77)
|
78.8%
(115/146)
|
95.1%
(563/592)
|
package org.umlg.sqlg.test.batch;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.junit.*;
import org.umlg.sqlg.structure.*;
import org.umlg.sqlg.structure.topology.Schema;
import org.umlg.sqlg.structure.topology.Topology;
import org.umlg.sqlg.structure.topology.VertexLabel;
import org.umlg.sqlg.test.BaseTest;
import java.math.BigDecimal;
import java.time.*;
import java.time.temporal.ChronoUnit;
import java.util.*;
/**
* Date: 2015/05/19
* Time: 9:34 PM
*/
@SuppressWarnings({"DuplicatedCode", "SpellCheckingInspection"})
public class TestBatchStreamVertex extends BaseTest {
@BeforeClass
public static void beforeClass() {
BaseTest.beforeClass();
if (isPostgres()) {
configuration.addProperty("distributed", true);
}
}
@Before
public void beforeTest() {
Assume.assumeTrue(this.sqlgGraph.getSqlDialect().supportsStreamingBatchMode());
}
@Test
public void testStreamBigDecimal() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
for (double i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "names", BigDecimal.valueOf(i));
}
this.sqlgGraph.tx().commit();
List<BigDecimal> bigDecimalList = this.sqlgGraph.traversal().V().hasLabel("Person").<BigDecimal>values("names").toList();
for (double i = 0; i < 10; i++) {
BigDecimal test = BigDecimal.valueOf(i);
Assert.assertEquals(1, bigDecimalList.stream().filter(bd -> bd.equals(test)).toList().size());
}
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
bigDecimalList = this.sqlgGraph1.traversal().V().hasLabel("Person").<BigDecimal>values("names").toList();
for (double i = 0; i < 10; i++) {
BigDecimal test = BigDecimal.valueOf(i);
Assert.assertEquals(1, bigDecimalList.stream().filter(bd -> bd.equals(test)).toList().size());
}
}
}
@Test
public void testDuplicateLabel() {
Schema publicSchema = this.sqlgGraph.getTopology().getPublicSchema();
VertexLabel aVertexLabel = publicSchema.ensureVertexLabelExist("A", new HashMap<>() {{
put("name", PropertyDefinition.of(PropertyType.STRING));
}});
VertexLabel bVertexLabel = publicSchema.ensureVertexLabelExist("B", new HashMap<>() {{
put("name", PropertyDefinition.of(PropertyType.STRING));
}});
VertexLabel cVertexLabel = publicSchema.ensureVertexLabelExist("C", new HashMap<>() {{
put("name", PropertyDefinition.of(PropertyType.STRING));
}});
aVertexLabel.ensureEdgeLabelExist("abc", bVertexLabel);
aVertexLabel.ensureEdgeLabelExist("abc", cVertexLabel);
this.sqlgGraph.tx().commit();
Vertex a1 = this.sqlgGraph.addVertex(T.label, "A", "name", "a1");
Vertex a2 = this.sqlgGraph.addVertex(T.label, "A", "name", "a1");
Vertex b1 = this.sqlgGraph.addVertex(T.label, "B", "name", "b1");
Vertex c1 = this.sqlgGraph.addVertex(T.label, "C", "name", "c1");
a1.addEdge("abc", b1);
a2.addEdge("abc", c1);
this.sqlgGraph.tx().commit();
List<Vertex> vertexList = this.sqlgGraph.traversal().V().hasLabel("A")
.out("abc").hasLabel("B")
.toList();
Assert.assertEquals(1, vertexList.size());
}
@Test(expected = SqlgExceptions.InvalidTableException.class)
public void testStreamTooLongLabelName() {
this.sqlgGraph.tx().streamingBatchModeOn();
this.sqlgGraph.streamVertex(T.label, "AAAAAAAAAABBBBBBBBBBCCCCCCCCCCDDDDDDDDDDEEEEEEEEEEFFFFFFFFFFABCDEFGH", "name", "halo");
this.sqlgGraph.tx().commit();
}
@Test(expected = SqlgExceptions.InvalidTableException.class)
public void testStreamTooLongLabelNameEnsureMethods() {
this.sqlgGraph.getTopology().getPublicSchema().ensureVertexLabelExist(
"AAAAAAAAAABBBBBBBBBBCCCCCCCCCCDDDDDDDDDDEEEEEEEEEEFFFFFFFFFFABCDEFGH",
new LinkedHashMap<>() {{
put("name", PropertyDefinition.of(PropertyType.STRING));
}}
);
this.sqlgGraph.tx().commit();
this.sqlgGraph.tx().streamingBatchModeOn();
this.sqlgGraph.streamVertex(T.label, "AAAAAAAAAABBBBBBBBBBCCCCCCCCCCDDDDDDDDDDEEEEEEEEEEFFFFFFFFFFABCDEFGH", "name", "halo");
this.sqlgGraph.tx().commit();
}
@Test(expected = SqlgExceptions.InvalidColumnException.class)
public void testStreamColumnTooLongLabelNameEnsureMethods() {
this.sqlgGraph.getTopology().getPublicSchema().ensureVertexLabelExist(
"AAAAAAAAAA",
new LinkedHashMap<>() {{
put("AAAAAAAAAABBBBBBBBBBCCCCCCCCCCDDDDDDDDDDEEEEEEEEEEFFFFFFFFFFABCDEFGH", PropertyDefinition.of(PropertyType.STRING));
}}
);
this.sqlgGraph.tx().commit();
this.sqlgGraph.tx().streamingBatchModeOn();
this.sqlgGraph.streamVertex(T.label, "AAAAAAAAAA", "AAAAAAAAAABBBBBBBBBBCCCCCCCCCCDDDDDDDDDDEEEEEEEEEEFFFFFFFFFFABCDEFGH", "halo");
this.sqlgGraph.tx().commit();
}
@Test(expected = IllegalStateException.class)
public void testAccessPropertyFromEdgeWhileStreaming() {
Vertex v1 = this.sqlgGraph.addVertex(T.label, "Person", "name", "a1");
Vertex v2 = this.sqlgGraph.addVertex(T.label, "Person", "name", "a2");
Edge e1 = v1.addEdge("friend", v2);
this.sqlgGraph.tx().commit();
this.sqlgGraph.tx().streamingBatchModeOn();
LinkedHashMap<String, Object> properties = new LinkedHashMap<>();
for (int i = 0; i < 100; i++) {
properties.put("name", "aa" + i);
this.sqlgGraph.streamVertex("Person", properties);
properties.clear();
}
RecordId recordId = (RecordId) e1.id();
// Assert.assertEquals("a1", SqlgEdge.of(this.sqlgGraph, recordId.getId(), recordId.getSchemaTable().getSchema(), recordId.getSchemaTable().getTable()).value("name"));
Assert.assertEquals("a1", this.sqlgGraph.traversal().E(recordId).next().value("name"));
this.sqlgGraph.tx().commit();
}
@Test(expected = IllegalStateException.class)
public void testAccessPropertyFromVertexWhileStreaming() {
Vertex v1 = this.sqlgGraph.addVertex(T.label, "Person", "name", "a1");
Vertex v2 = this.sqlgGraph.addVertex(T.label, "Person", "name", "a2");
v1.addEdge("friend", v2);
this.sqlgGraph.tx().commit();
this.sqlgGraph.tx().streamingBatchModeOn();
LinkedHashMap<String, Object> properties = new LinkedHashMap<>();
for (int i = 0; i < 100; i++) {
properties.put("name", "aa" + i);
this.sqlgGraph.streamVertex("Person", properties);
properties.clear();
}
RecordId recordId = (RecordId) v1.id();
Assert.assertEquals("a1", SqlgVertex.of(
this.sqlgGraph,
recordId.sequenceId(),
recordId.getSchemaTable().getSchema(),
recordId.getSchemaTable().getTable()).value("name"));
this.sqlgGraph.tx().commit();
}
@Test(expected = IllegalStateException.class)
public void testCanNotQueryWhileStreaming() {
this.sqlgGraph.tx().streamingBatchModeOn();
for (int i = 0; i < 100; i++) {
this.sqlgGraph.streamVertex("Person");
}
Assert.assertEquals(100, this.sqlgGraph.traversal().V().hasLabel("Person").count().next(), 1);
this.sqlgGraph.tx().commit();
}
@Test(expected = IllegalStateException.class)
public void testCanNotQueryFromVertexWhileStreaming() {
Vertex v1 = this.sqlgGraph.addVertex(T.label, "Person");
Vertex v2 = this.sqlgGraph.addVertex(T.label, "Person");
v1.addEdge("friend", v2);
this.sqlgGraph.tx().commit();
this.sqlgGraph.tx().streamingBatchModeOn();
for (int i = 0; i < 100; i++) {
this.sqlgGraph.streamVertex("Person");
}
Assert.assertEquals(100, this.sqlgGraph.traversal().V(v1).out("friend").count().next(), 1);
this.sqlgGraph.tx().commit();
}
@Test(expected = IllegalStateException.class)
public void testCanNotQueryFromVertexWhileStreaming2() {
Vertex v1 = this.sqlgGraph.addVertex(T.label, "Person");
Vertex v2 = this.sqlgGraph.addVertex(T.label, "Person");
v1.addEdge("friend", v2);
this.sqlgGraph.tx().commit();
this.sqlgGraph.tx().streamingBatchModeOn();
for (int i = 0; i < 100; i++) {
this.sqlgGraph.streamVertex("Person");
}
Assert.assertEquals(100, IteratorUtils.count(v1.edges(Direction.OUT, "friend")), 1);
this.sqlgGraph.tx().commit();
}
@Test(expected = IllegalStateException.class)
public void testCanNotQueryFromVertexWhileStreaming3() {
Vertex v1 = this.sqlgGraph.addVertex(T.label, "Person");
Vertex v2 = this.sqlgGraph.addVertex(T.label, "Person");
v1.addEdge("friend", v2);
this.sqlgGraph.tx().commit();
this.sqlgGraph.tx().streamingBatchModeOn();
for (int i = 0; i < 100; i++) {
this.sqlgGraph.streamVertex("Person");
}
Assert.assertEquals(100, IteratorUtils.count(v1.vertices(Direction.OUT, "friend")), 1);
this.sqlgGraph.tx().commit();
}
@Test(expected = IllegalStateException.class)
public void testCanNotQueryFromGraphVerticesWhileStreaming() {
Vertex v1 = this.sqlgGraph.addVertex(T.label, "Person");
Vertex v2 = this.sqlgGraph.addVertex(T.label, "Person");
v1.addEdge("friend", v2);
this.sqlgGraph.tx().commit();
this.sqlgGraph.tx().streamingBatchModeOn();
for (int i = 0; i < 100; i++) {
this.sqlgGraph.streamVertex("Person");
}
Assert.assertEquals(102, IteratorUtils.count(this.sqlgGraph.vertices()), 1);
this.sqlgGraph.tx().commit();
}
@Test(expected = IllegalStateException.class)
public void testCanNotQueryFromGraphEdgesWhileStreaming() {
Vertex v1 = this.sqlgGraph.addVertex(T.label, "Person");
Vertex v2 = this.sqlgGraph.addVertex(T.label, "Person");
v1.addEdge("friend", v2);
this.sqlgGraph.tx().commit();
this.sqlgGraph.tx().streamingBatchModeOn();
for (int i = 0; i < 100; i++) {
this.sqlgGraph.streamVertex("Person");
}
Assert.assertEquals(102, IteratorUtils.count(this.sqlgGraph.edges()), 1);
this.sqlgGraph.tx().commit();
}
@Test
public void testVertexWithNoProperties() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
for (int i = 0; i < 100; i++) {
this.sqlgGraph.streamVertex("Person");
}
this.sqlgGraph.tx().commit();
Assert.assertEquals(100, this.sqlgGraph.traversal().V().hasLabel("Person").count().next(), 1);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
Assert.assertEquals(100, this.sqlgGraph1.traversal().V().hasLabel("Person").count().next(), 1);
}
}
@Test(expected = IllegalStateException.class)
public void testCanNotAddVertexOnceStreaming() {
this.sqlgGraph.tx().streamingBatchModeOn();
LinkedHashMap<String, Object> keyValues = new LinkedHashMap<>();
keyValues.put("name", "test");
this.sqlgGraph.addVertex("A", keyValues);
Assert.fail();
}
@Test(expected = IllegalStateException.class)
public void testCompleteVertexChecksSingleLabelOnly() {
this.sqlgGraph.tx().streamingBatchModeOn();
LinkedHashMap<String, Object> keyValue = new LinkedHashMap<>();
keyValue.put("name", "a");
keyValue.put("surname", "b");
this.sqlgGraph.streamVertex("Person", keyValue);
this.sqlgGraph.streamVertex("Persons", keyValue);
this.sqlgGraph.tx().commit();
Assert.fail();
}
@Test
public void testCompleteVertexFlushAndCloseStream() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
LinkedHashMap<String, Object> keyValue = new LinkedHashMap<>();
keyValue.put("name", "a");
keyValue.put("surname", "b");
this.sqlgGraph.streamVertex("Person", keyValue);
this.sqlgGraph.tx().flush();
this.sqlgGraph.tx().streamingBatchModeOn();
this.sqlgGraph.streamVertex("Persons", keyValue);
this.sqlgGraph.tx().commit();
testCompleteVertexFlushAndCloseStream_assert(this.sqlgGraph);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testCompleteVertexFlushAndCloseStream_assert(this.sqlgGraph1);
}
}
private void testCompleteVertexFlushAndCloseStream_assert(SqlgGraph sqlgGraph) {
Assert.assertEquals(1, sqlgGraph.traversal().V().hasLabel("Person").count().next(), 0L);
Assert.assertEquals(1, sqlgGraph.traversal().V().hasLabel("Persons").count().next(), 0L);
Assert.assertEquals("a", sqlgGraph.traversal().V().hasLabel("Person").next().<String>value("name"));
Assert.assertEquals("b", sqlgGraph.traversal().V().hasLabel("Person").next().<String>value("surname"));
Assert.assertEquals("a", sqlgGraph.traversal().V().hasLabel("Persons").next().<String>value("name"));
Assert.assertEquals("b", sqlgGraph.traversal().V().hasLabel("Persons").next().<String>value("surname"));
}
@Test(expected = IllegalStateException.class)
public void testCompleteVertexChecksSameKeys() {
this.sqlgGraph.tx().streamingBatchModeOn();
LinkedHashMap<String, Object> keyValue = new LinkedHashMap<>();
keyValue.put("name", "a");
keyValue.put("surname", "b");
this.sqlgGraph.streamVertex("Person", keyValue);
keyValue = new LinkedHashMap<>();
keyValue.put("namea", "a");
keyValue.put("surname", "b");
this.sqlgGraph.streamVertex("Person", keyValue);
this.sqlgGraph.tx().commit();
Assert.fail();
}
@Test(expected = IllegalStateException.class)
public void testStreamingVertexKeysSameOrder() {
this.sqlgGraph.tx().streamingBatchModeOn();
LinkedHashMap<String, Object> keyValue = new LinkedHashMap<>();
keyValue.put("name", "a");
keyValue.put("surname", "b");
this.sqlgGraph.streamVertex("Person", keyValue);
keyValue = new LinkedHashMap<>();
keyValue.put("surname", "b");
keyValue.put("name", "a");
this.sqlgGraph.streamVertex("Person", keyValue);
this.sqlgGraph.tx().commit();
Assert.fail();
}
@Test
public void testStreamingVertexDifferentSchema() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
LinkedHashMap<String, Object> keyValue = new LinkedHashMap<>();
keyValue.put("name", "a");
keyValue.put("surname", "b");
this.sqlgGraph.streamVertex("R_HG.Person", keyValue);
keyValue = new LinkedHashMap<>();
keyValue.put("name", "a");
keyValue.put("surname", "b");
this.sqlgGraph.streamVertex("R_HG.Person", keyValue);
this.sqlgGraph.tx().commit();
Assert.assertEquals(2, this.sqlgGraph.traversal().V().hasLabel("R_HG.Person").count().next(), 0L);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
Assert.assertEquals(2, this.sqlgGraph1.traversal().V().hasLabel("R_HG.Person").count().next(), 0L);
}
}
@Test
public void testUsingConnectionDuringResultSetIter() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
for (int i = 1; i < 100_001; i++) {
LinkedHashMap<String, Object> keyValue = new LinkedHashMap<>();
for (int j = 0; j < 2; j++) {
keyValue.put("name" + j, "a" + i);
}
this.sqlgGraph.streamVertex("Person", keyValue);
if (i % 25_000 == 0) {
this.sqlgGraph.tx().commit();
this.sqlgGraph.tx().streamingBatchModeOn();
System.out.println(i);
}
}
this.sqlgGraph.tx().commit();
Assert.assertEquals(100_000, this.sqlgGraph.traversal().V().has(T.label, "Person").count().next().intValue());
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
Assert.assertEquals(100_000, this.sqlgGraph1.traversal().V().has(T.label, "Person").count().next().intValue());
}
}
@Test
public void testMilCompleteVertex() throws InterruptedException {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
this.sqlgGraph.tx().streamingBatchModeOn();
for (int i = 1; i < 100_001; i++) {
LinkedHashMap<String, Object> keyValue = new LinkedHashMap<>();
for (int j = 0; j < 2; j++) {
keyValue.put("name" + j, "a" + i);
}
this.sqlgGraph.streamVertex("Person", keyValue);
if (i % 25_000 == 0) {
this.sqlgGraph.tx().commit();
this.sqlgGraph.tx().streamingBatchModeOn();
System.out.println(i);
}
}
this.sqlgGraph.tx().commit();
stopWatch.stop();
System.out.println(stopWatch);
stopWatch.reset();
stopWatch.start();
Assert.assertEquals(100_000L, this.sqlgGraph.traversal().V().has(T.label, "Person").count().next().longValue());
stopWatch.stop();
System.out.println(stopWatch);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
Assert.assertEquals(100_000L, this.sqlgGraph1.traversal().V().has(T.label, "Person").count().next().longValue());
}
}
@Test
public void testStreamingRollback() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
LinkedHashMap<String, Object> keyValues = new LinkedHashMap<>();
keyValues.put("name", "halo");
keyValues.put("surname", "halo");
for (int i = 0; i < 1000; i++) {
keyValues.put("age", i);
this.sqlgGraph.streamVertex("Man", keyValues);
}
this.sqlgGraph.tx().flush();
this.sqlgGraph.tx().streamingBatchModeOn();
for (int i = 0; i < 1000; i++) {
keyValues.put("age", i);
this.sqlgGraph.streamVertex("Female", keyValues);
}
this.sqlgGraph.tx().rollback();
testStreamingRollback_assert(this.sqlgGraph);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testStreamingRollback_assert(this.sqlgGraph1);
}
}
private void testStreamingRollback_assert(SqlgGraph sqlgGraph) {
Assert.assertEquals(0, sqlgGraph.traversal().V().hasLabel("Man").count().next(), 1);
Assert.assertEquals(0, sqlgGraph.traversal().V().hasLabel("Female").count().next(), 1);
}
@Test
public void streamJava8Style() throws InterruptedException {
List<String> uids = Arrays.asList("1", "2", "3", "4", "5");
this.sqlgGraph.tx().streamingBatchModeOn();
uids.forEach(u->this.sqlgGraph.streamVertex(T.label, "Person", "name", u));
this.sqlgGraph.tx().commit();
Assert.assertEquals(5, this.sqlgGraph.traversal().V().hasLabel("Person").count().next(), 0L);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
Assert.assertEquals(5, this.sqlgGraph1.traversal().V().hasLabel("Person").count().next(), 0L);
}
}
@Test
public void testStreamLocalDateTime() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
LocalDateTime now = LocalDateTime.now().truncatedTo(ChronoUnit.MILLIS);
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "createOn", now);
}
this.sqlgGraph.tx().commit();
testStreamLocalDateTime_assert(this.sqlgGraph, now);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testStreamLocalDateTime_assert(this.sqlgGraph1, now);
}
}
private void testStreamLocalDateTime_assert(SqlgGraph sqlgGraph, LocalDateTime now) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
Assert.assertEquals(now, vertices.get(0).value("createOn"));
}
@Test
public void testStreamLocalDate() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
LocalDate now = LocalDate.now();
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "createOn", now);
}
this.sqlgGraph.tx().commit();
testStreamLocalDate_assert(this.sqlgGraph, now);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testStreamLocalDate_assert(this.sqlgGraph1, now);
}
}
private void testStreamLocalDate_assert(SqlgGraph sqlgGraph, LocalDate now) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
Assert.assertEquals(now, vertices.get(0).value("createOn"));
}
@Test
public void testStreamLocalTime() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
LocalTime now = LocalTime.now();
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "createOn", now);
}
this.sqlgGraph.tx().commit();
testStreamLocalTime_assert(this.sqlgGraph, now);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testStreamLocalTime_assert(this.sqlgGraph1, now);
}
}
private void testStreamLocalTime_assert(SqlgGraph sqlgGraph, LocalTime now) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
Assert.assertEquals(now.toSecondOfDay(), vertices.get(0).<LocalTime>value("createOn").toSecondOfDay());
}
@Test
public void testStreamZonedDateTime() throws InterruptedException {
ZonedDateTime zonedDateTime = ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS);
this.sqlgGraph.tx().streamingBatchModeOn();
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "createOn", zonedDateTime);
}
this.sqlgGraph.tx().commit();
testStreamZonedDateTime_assert(this.sqlgGraph, zonedDateTime);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testStreamZonedDateTime_assert(this.sqlgGraph1, zonedDateTime);
}
}
private void testStreamZonedDateTime_assert(SqlgGraph sqlgGraph, ZonedDateTime zonedDateTime) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
Assert.assertEquals(zonedDateTime, vertices.get(0).<ZonedDateTime>value("createOn"));
}
@Test
public void testStreamPeriod() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
Period period = Period.of(1,2,3);
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "period", period);
}
this.sqlgGraph.tx().commit();
testSteamPeriod_assert(this.sqlgGraph, period);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testSteamPeriod_assert(this.sqlgGraph1, period);
}
}
private void testSteamPeriod_assert(SqlgGraph sqlgGraph, Period period) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
Assert.assertEquals(period, vertices.get(0).<Period>value("period"));
}
@Test
public void testStreamDuration() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
Duration duration = Duration.ofHours(19);
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "duration", duration);
}
this.sqlgGraph.tx().commit();
testStreamDuration_assert(this.sqlgGraph, duration);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testStreamDuration_assert(this.sqlgGraph1, duration);
}
}
private void testStreamDuration_assert(SqlgGraph sqlgGraph, Duration duration) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
Assert.assertEquals(duration, vertices.get(0).<Duration>value("duration"));
}
@Test
public void testStreamJson() throws InterruptedException {
ObjectNode json = Topology.OBJECT_MAPPER.createObjectNode();
json.put("username", "john");
this.sqlgGraph.tx().streamingBatchModeOn();
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "doc", json);
}
this.sqlgGraph.tx().commit();
testStreamJson_assert(this.sqlgGraph, json);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testStreamJson_assert(this.sqlgGraph1, json);
}
}
private void testStreamJson_assert(SqlgGraph sqlgGraph, ObjectNode json) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
JsonNode value = vertices.get(0).value("doc");
Assert.assertEquals(json, value);
}
@Test
public void testStreamStringArray() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
String[] stringArray = new String[]{"a", "b"};
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "names", stringArray);
}
this.sqlgGraph.tx().commit();
testStreamStringArray_assert(this.sqlgGraph, stringArray);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testStreamStringArray_assert(this.sqlgGraph1, stringArray);
}
}
private void testStreamStringArray_assert(SqlgGraph sqlgGraph, String[] stringArray) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
Assert.assertArrayEquals(stringArray, vertices.get(0).<String[]>value("names"));
}
@Test
public void testStreamBooleanArray() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
boolean[] booleanArray = new boolean[]{true, false};
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "names", booleanArray);
}
this.sqlgGraph.tx().commit();
testSteamBooleanArray_assert(this.sqlgGraph, booleanArray);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testSteamBooleanArray_assert(this.sqlgGraph1, booleanArray);
}
}
private void testSteamBooleanArray_assert(SqlgGraph sqlgGraph, boolean[] booleanArray) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
Assert.assertArrayEquals(booleanArray, vertices.get(0).value("names"));
}
@Test
public void testStreamIntArray() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
int[] intArray = new int[]{11, 22};
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "names", intArray);
}
this.sqlgGraph.tx().commit();
testStreamIntArray_assert(this.sqlgGraph, intArray);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testStreamIntArray_assert(this.sqlgGraph1, intArray);
}
}
private void testStreamIntArray_assert(SqlgGraph sqlgGraph, int[] intArray) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
Assert.assertArrayEquals(intArray, vertices.get(0).value("names"));
}
@Test
public void testStreamLongArray() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
long[] longArray = new long[]{11, 22};
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "names", longArray);
}
this.sqlgGraph.tx().commit();
testStreamLongArray_assert(this.sqlgGraph, longArray);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testStreamLongArray_assert(this.sqlgGraph1, longArray);
}
}
private void testStreamLongArray_assert(SqlgGraph sqlgGraph, long[] longArray) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
Assert.assertArrayEquals(longArray, vertices.get(0).value("names"));
}
@Test
public void testStreamFloatArray() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
float[] floatArray = new float[]{11,11f, 22.22f};
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "names", floatArray);
}
this.sqlgGraph.tx().commit();
testStreamFloatArray_assert(this.sqlgGraph, floatArray);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testStreamFloatArray_assert(this.sqlgGraph1, floatArray);
}
}
private void testStreamFloatArray_assert(SqlgGraph sqlgGraph, float[] floatArray) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
Assert.assertArrayEquals(floatArray, vertices.get(0).value("names"), 0f);
}
@Test
public void testStreamDoubleArray() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
double[] doubleArray = new double[]{11.11d, 22.22d};
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "names", doubleArray);
}
this.sqlgGraph.tx().commit();
testStreamDoubleArray_assert(this.sqlgGraph, doubleArray);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testStreamDoubleArray_assert(this.sqlgGraph1, doubleArray);
}
}
private void testStreamDoubleArray_assert(SqlgGraph sqlgGraph, double[] doubleArray) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
Assert.assertArrayEquals(doubleArray, vertices.get(0).value("names"), 0d);
}
@Test
public void testStreamShortArray() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
short[] shortArray = new short[]{11, 22};
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "names", shortArray);
}
this.sqlgGraph.tx().commit();
testStreamShortArray_assert(this.sqlgGraph, shortArray);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testStreamShortArray_assert(this.sqlgGraph1, shortArray);
}
}
private void testStreamShortArray_assert(SqlgGraph sqlgGraph, short[] shortArray) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
Assert.assertArrayEquals(shortArray, vertices.get(0).value("names"));
}
@Test
public void testStreamByteArray() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
byte[] byteArray = new byte[]{1, 2};
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "names", byteArray);
}
this.sqlgGraph.tx().commit();
testStreamByteArray_assert(this.sqlgGraph, byteArray);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testStreamByteArray_assert(this.sqlgGraph1, byteArray);
}
}
private void testStreamByteArray_assert(SqlgGraph sqlgGraph, byte[] byteArray) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
Assert.assertArrayEquals(byteArray, vertices.get(0).value("names"));
}
@Test
public void testLocalDateTimeArray() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
LocalDateTime[] localDateTimes = new LocalDateTime[]{LocalDateTime.now().minusDays(1).truncatedTo(ChronoUnit.MILLIS), LocalDateTime.now().truncatedTo(ChronoUnit.MILLIS)};
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "names", localDateTimes);
}
this.sqlgGraph.tx().commit();
testLocalDateTimeArray_assert(this.sqlgGraph, localDateTimes);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testLocalDateTimeArray_assert(this.sqlgGraph1, localDateTimes);
}
}
private void testLocalDateTimeArray_assert(SqlgGraph sqlgGraph, LocalDateTime[] localDateTimes) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
Assert.assertArrayEquals(localDateTimes, vertices.get(0).<LocalDateTime[]>value("names"));
}
@Test
public void testLocalDateArray() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
LocalDate[] localDates = new LocalDate[]{LocalDate.now().minusDays(1), LocalDate.now()};
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "names", localDates);
}
this.sqlgGraph.tx().commit();
testLocalDateArray_assert(this.sqlgGraph, localDates);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testLocalDateArray_assert(this.sqlgGraph1, localDates);
}
}
private void testLocalDateArray_assert(SqlgGraph sqlgGraph, LocalDate[] localDates) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
Assert.assertArrayEquals(localDates, vertices.get(0).<LocalDate[]>value("names"));
}
@Test
public void testLocalTimeArray() throws InterruptedException {
this.sqlgGraph.tx().streamingBatchModeOn();
LocalTime[] localTimes = new LocalTime[]{LocalTime.now().minusHours(1), LocalTime.now()};
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "names", localTimes);
}
this.sqlgGraph.tx().commit();
testLocalTimeArray_assert(this.sqlgGraph, localTimes);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testLocalTimeArray_assert(this.sqlgGraph1, localTimes);
}
}
private void testLocalTimeArray_assert(SqlgGraph sqlgGraph, LocalTime[] localTimes) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
List<LocalTime> localTimes1 = new ArrayList<>();
for (LocalTime localTime : localTimes) {
localTimes1.add(localTime.minusNanos(localTime.getNano()));
}
Assert.assertArrayEquals(localTimes1.toArray(), vertices.get(0).<LocalTime[]>value("names"));
}
@Test
public void testZonedDateTimeArray() throws InterruptedException {
ZonedDateTime[] zonedDateTimes = new ZonedDateTime[]{ZonedDateTime.now().minusHours(1).truncatedTo(ChronoUnit.MILLIS), ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)};
this.sqlgGraph.addVertex(T.label, "Person", "names", zonedDateTimes);
this.sqlgGraph.tx().commit();
this.sqlgGraph.tx().streamingBatchModeOn();
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "names", zonedDateTimes);
}
this.sqlgGraph.tx().commit();
testZonedDateTimeArray_assert(this.sqlgGraph, zonedDateTimes);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testZonedDateTimeArray_assert(this.sqlgGraph1, zonedDateTimes);
}
}
private void testZonedDateTimeArray_assert(SqlgGraph sqlgGraph, ZonedDateTime[] zonedDateTimes) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(11, vertices.size());
List<ZonedDateTime> zonedDateTimes1 = new ArrayList<>();
Collections.addAll(zonedDateTimes1, zonedDateTimes);
Assert.assertArrayEquals(zonedDateTimes1.toArray(), vertices.get(0).<ZonedDateTime[]>value("names"));
Assert.assertArrayEquals(zonedDateTimes1.toArray(), vertices.get(1).<ZonedDateTime[]>value("names"));
Assert.assertArrayEquals(zonedDateTimes1.toArray(), vertices.get(2).<ZonedDateTime[]>value("names"));
Assert.assertArrayEquals(zonedDateTimes1.toArray(), vertices.get(3).<ZonedDateTime[]>value("names"));
Assert.assertArrayEquals(zonedDateTimes1.toArray(), vertices.get(4).<ZonedDateTime[]>value("names"));
Assert.assertArrayEquals(zonedDateTimes1.toArray(), vertices.get(5).<ZonedDateTime[]>value("names"));
Assert.assertArrayEquals(zonedDateTimes1.toArray(), vertices.get(6).<ZonedDateTime[]>value("names"));
Assert.assertArrayEquals(zonedDateTimes1.toArray(), vertices.get(7).<ZonedDateTime[]>value("names"));
Assert.assertArrayEquals(zonedDateTimes1.toArray(), vertices.get(8).<ZonedDateTime[]>value("names"));
Assert.assertArrayEquals(zonedDateTimes1.toArray(), vertices.get(9).<ZonedDateTime[]>value("names"));
Assert.assertArrayEquals(zonedDateTimes1.toArray(), vertices.get(10).<ZonedDateTime[]>value("names"));
}
@Test
public void testDurationArray() throws InterruptedException {
Duration[] durations = new Duration[]{Duration.ofHours(5), Duration.ofHours(10)};
this.sqlgGraph.addVertex(T.label, "Person", "names", durations);
this.sqlgGraph.tx().commit();
this.sqlgGraph.tx().streamingBatchModeOn();
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "names", durations);
}
this.sqlgGraph.tx().commit();
testDurationArray_assert(this.sqlgGraph, durations);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testDurationArray_assert(this.sqlgGraph1, durations);
}
}
private void testDurationArray_assert(SqlgGraph sqlgGraph, Duration[] durations) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(11, vertices.size());
List<Duration> durations1 = new ArrayList<>();
Collections.addAll(durations1, durations);
Assert.assertArrayEquals(durations1.toArray(), vertices.get(0).<Duration[]>value("names"));
Assert.assertArrayEquals(durations1.toArray(), vertices.get(1).<Duration[]>value("names"));
Assert.assertArrayEquals(durations1.toArray(), vertices.get(2).<Duration[]>value("names"));
Assert.assertArrayEquals(durations1.toArray(), vertices.get(3).<Duration[]>value("names"));
Assert.assertArrayEquals(durations1.toArray(), vertices.get(4).<Duration[]>value("names"));
Assert.assertArrayEquals(durations1.toArray(), vertices.get(5).<Duration[]>value("names"));
Assert.assertArrayEquals(durations1.toArray(), vertices.get(6).<Duration[]>value("names"));
Assert.assertArrayEquals(durations1.toArray(), vertices.get(7).<Duration[]>value("names"));
Assert.assertArrayEquals(durations1.toArray(), vertices.get(8).<Duration[]>value("names"));
Assert.assertArrayEquals(durations1.toArray(), vertices.get(9).<Duration[]>value("names"));
Assert.assertArrayEquals(durations1.toArray(), vertices.get(10).<Duration[]>value("names"));
}
@Test
public void testPeriodArray() throws InterruptedException {
Period[] periods = new Period[]{Period.of(2016,1,1), Period.of(2017,2,2)};
this.sqlgGraph.addVertex(T.label, "Person", "names", periods);
this.sqlgGraph.tx().commit();
this.sqlgGraph.tx().streamingBatchModeOn();
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "names", periods);
}
this.sqlgGraph.tx().commit();
testPeriodArray_assert(this.sqlgGraph, periods);
if (this.sqlgGraph1 != null) {
Thread.sleep(SLEEP_TIME);
testPeriodArray_assert(this.sqlgGraph1, periods);
}
}
private void testPeriodArray_assert(SqlgGraph sqlgGraph, Period[] periods) {
List<Vertex> vertices = sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(11, vertices.size());
List<Period> periods1 = new ArrayList<>();
Collections.addAll(periods1, periods);
Assert.assertArrayEquals(periods1.toArray(), vertices.get(0).<Duration[]>value("names"));
Assert.assertArrayEquals(periods1.toArray(), vertices.get(1).<Duration[]>value("names"));
Assert.assertArrayEquals(periods1.toArray(), vertices.get(2).<Duration[]>value("names"));
Assert.assertArrayEquals(periods1.toArray(), vertices.get(3).<Duration[]>value("names"));
Assert.assertArrayEquals(periods1.toArray(), vertices.get(4).<Duration[]>value("names"));
Assert.assertArrayEquals(periods1.toArray(), vertices.get(5).<Duration[]>value("names"));
Assert.assertArrayEquals(periods1.toArray(), vertices.get(6).<Duration[]>value("names"));
Assert.assertArrayEquals(periods1.toArray(), vertices.get(7).<Duration[]>value("names"));
Assert.assertArrayEquals(periods1.toArray(), vertices.get(8).<Duration[]>value("names"));
Assert.assertArrayEquals(periods1.toArray(), vertices.get(9).<Duration[]>value("names"));
Assert.assertArrayEquals(periods1.toArray(), vertices.get(10).<Duration[]>value("names"));
}
@Test(expected = SqlgExceptions.InvalidPropertyTypeException.class)
public void testStreamJsonAsArray() {
ObjectNode json1 = Topology.OBJECT_MAPPER.createObjectNode();
json1.put("username", "john1");
ObjectNode json2 = Topology.OBJECT_MAPPER.createObjectNode();
json2.put("username", "john2");
JsonNode[] jsonNodes = new JsonNode[]{json1};
this.sqlgGraph.tx().streamingBatchModeOn();
for (int i = 0; i < 10; i++) {
this.sqlgGraph.streamVertex(T.label, "Person", "docs", jsonNodes);
}
this.sqlgGraph.tx().commit();
List<Vertex> vertices = this.sqlgGraph.traversal().V().hasLabel("Person").toList();
Assert.assertEquals(10, vertices.size());
JsonNode[] value = vertices.get(0).value("docs");
Assert.assertArrayEquals(jsonNodes, value);
}
}