Coverage Summary for Class: TestBatchStreamTemporaryVertex (org.umlg.sqlg.test.batch)
Class |
Method, %
|
Branch, %
|
Line, %
|
TestBatchStreamTemporaryVertex |
100%
(5/5)
|
60%
(6/10)
|
100%
(32/32)
|
TestBatchStreamTemporaryVertex$1 |
100%
(1/1)
|
100%
(2/2)
|
TestBatchStreamTemporaryVertex$2 |
100%
(1/1)
|
100%
(2/2)
|
TestBatchStreamTemporaryVertex$3 |
100%
(2/2)
|
90%
(9/10)
|
TestBatchStreamTemporaryVertex$3$1 |
100%
(1/1)
|
100%
(2/2)
|
TestBatchStreamTemporaryVertex$3$2 |
100%
(1/1)
|
100%
(2/2)
|
TestBatchStreamTemporaryVertex$3$3 |
100%
(1/1)
|
100%
(2/2)
|
TestBatchStreamTemporaryVertex$4 |
100%
(2/2)
|
75%
(6/8)
|
Total |
100%
(14/14)
|
60%
(6/10)
|
95%
(57/60)
|
package org.umlg.sqlg.test.batch;
import org.apache.tinkerpop.gremlin.structure.T;
import org.junit.*;
import org.umlg.sqlg.structure.PropertyDefinition;
import org.umlg.sqlg.structure.PropertyType;
import org.umlg.sqlg.structure.topology.VertexLabel;
import org.umlg.sqlg.test.BaseTest;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Date: 2015/12/31
* Time: 9:14 AM
*/
public class TestBatchStreamTemporaryVertex extends BaseTest {
@BeforeClass
public static void beforeClass() {
BaseTest.beforeClass();
if (isPostgres()) {
configuration.addProperty("distributed", true);
}
}
@Before
public void beforeTest() {
Assume.assumeTrue(this.sqlgGraph.getSqlDialect().supportsStreamingBatchMode());
}
@Test
public void testTempBatch() throws SQLException {
this.sqlgGraph.tx().streamingBatchModeOn();
for (int i = 0; i < 1000; i++) {
this.sqlgGraph.streamTemporaryVertex("halo", new LinkedHashMap<String, Object>() {{
put("this", "that");
}});
}
this.sqlgGraph.tx().flush();
int count = 0;
Connection conn = this.sqlgGraph.tx().getConnection();
try (PreparedStatement s = conn.prepareStatement("select * from \"V_halo\"")) {
Assert.assertEquals("", s.getMetaData().getSchemaName(1));
ResultSet resultSet = s.executeQuery();
while (resultSet.next()) {
count++;
Assert.assertEquals("that", resultSet.getString(2));
}
}
Assert.assertEquals(1000, count);
this.sqlgGraph.tx().commit();
}
//Testing issue #226
@Test
public void testStreamTemporaryVertexMultipleThreads() throws InterruptedException {
VertexLabel haloVertexLabel = this.sqlgGraph.getTopology().ensureVertexLabelExist("halo");
haloVertexLabel.ensurePropertiesExist(new HashMap<>() {{
put("this", PropertyDefinition.of(PropertyType.STRING));
}});
this.sqlgGraph.getTopology().ensureVertexLabelExist("A");
this.sqlgGraph.tx().commit();
final CountDownLatch countDownLatch1 = new CountDownLatch(1);
final CountDownLatch countDownLatch2 = new CountDownLatch(1);
final Thread thread1 = new Thread("thread1") {
@Override
public void run() {
TestBatchStreamTemporaryVertex.this.sqlgGraph.tx().streamingBatchModeOn();
TestBatchStreamTemporaryVertex.this.sqlgGraph.streamTemporaryVertex("halo", new LinkedHashMap<>() {{
put("this", "that");
}});
countDownLatch1.countDown();
System.out.println("countDownLatch1 countDown");
TestBatchStreamTemporaryVertex.this.sqlgGraph.streamTemporaryVertex("halo", new LinkedHashMap<>() {{
put("this", "that");
}});
try {
countDownLatch2.await(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
//swallow
}
//If Topology.temporaryTable has been cleared then the next line will block.
//It will block because it will try to create the temp table but the copy command is already in progress.
//The copy command needs to finish before the driver will allow any other command to execute.
TestBatchStreamTemporaryVertex.this.sqlgGraph.streamTemporaryVertex("halo", new LinkedHashMap<>() {{
put("this", "that");
}});
TestBatchStreamTemporaryVertex.this.sqlgGraph.tx().commit();
}
};
thread1.start();
final Thread thread2 = new Thread("thread2") {
@Override
public void run() {
try {
countDownLatch1.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("thread2 countDownLatch ");
TestBatchStreamTemporaryVertex.this.sqlgGraph.addVertex(T.label, "A");
TestBatchStreamTemporaryVertex.this.sqlgGraph.tx().commit();
countDownLatch2.countDown();
}
};
thread2.start();
thread1.join();
thread2.join();
}
}