Coverage Summary for Class: TestDeadLock (org.umlg.sqlg.test.topology)
Class |
Class, %
|
Method, %
|
Branch, %
|
Line, %
|
TestDeadLock |
100%
(1/1)
|
100%
(18/18)
|
45.5%
(10/22)
|
89.6%
(173/193)
|
package org.umlg.sqlg.test.topology;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.umlg.sqlg.structure.SqlgGraph;
import org.umlg.sqlg.test.BaseTest;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author <a href="https://github.com/pietermartin">Pieter Martin</a>
* Date: 2017/06/19
*/
@SuppressWarnings("Duplicates")
public class TestDeadLock extends BaseTest {
private final static Logger LOGGER = LoggerFactory.getLogger(TestDeadLock.class);
@BeforeClass
public static void beforeClass() {
BaseTest.beforeClass();
Assume.assumeFalse(isH2());
}
@Test
public void testDeadLock4() throws InterruptedException {
Vertex a1 = this.sqlgGraph.addVertex(T.label, "A", "nameA1", "haloA1");
Vertex b1 = this.sqlgGraph.addVertex(T.label, "B", "nameB1", "haloB1");
this.sqlgGraph.tx().commit();
Thread t1 = new Thread(() -> {
//Lock table A
for (int i = 0; i < 3; i++) {
try {
this.sqlgGraph.addVertex(T.label, "A", "nameA2", "haloA2");
b1.property("nameB1", "haloAgainB2");
this.sqlgGraph.tx().commit();
break;
} catch (Exception e) {
this.sqlgGraph.tx().rollback();
}
}
}, "First writer");
Thread t2 = new Thread(() -> {
//Lock table B
for (int i = 0; i < 3; i++) {
try {
this.sqlgGraph.addVertex(T.label, "B", "nameB2", "haloB2");
a1.property("nameA1", "haloAgainA1");
this.sqlgGraph.tx().commit();
break;
} catch (Exception e) {
this.sqlgGraph.tx().rollback();
}
}
}, "Second writer");
t1.start();
t2.start();
t1.join();
t2.join();
Assert.assertEquals(2, this.sqlgGraph.traversal().V().hasLabel("A").count().next(), 0);
Assert.assertEquals(2, this.sqlgGraph.traversal().V().hasLabel("B").count().next(), 0);
Assert.assertEquals(1, this.sqlgGraph.traversal().V().hasLabel("B").has("nameB1", "haloAgainB2").count().next(), 0);
Assert.assertEquals(1, this.sqlgGraph.traversal().V().hasLabel("A").has("nameA1", "haloAgainA1").count().next(), 0);
}
@Test
public void testDeadLock3() throws InterruptedException {
SqlgGraph g = this.sqlgGraph;
Map<String, Object> m1 = new HashMap<>();
m1.put("name", "name1");
g.addVertex("s1.v1", m1);
g.tx().commit();
CountDownLatch t1Wrote = new CountDownLatch(1);
CountDownLatch t2Wrote = new CountDownLatch(1);
Thread t1 = new Thread(() -> {
Map<String, Object> m11 = new HashMap<>();
m11.put("name", "name2");
g.addVertex("s1.v1", m11);
t1Wrote.countDown();
try {
t2Wrote.await(10, TimeUnit.SECONDS);
Map<String, Object> m2 = new HashMap<>();
m2.put("name", "name3");
m2.put("att1", "val1");
g.addVertex("s1.v1", m2);
g.tx().commit();
} catch (InterruptedException ie) {
Assert.fail(ie.getMessage());
}
}, "First writer");
Thread t2 = new Thread(() -> {
try {
t1Wrote.await();
Map<String, Object> m112 = new HashMap<>();
m112.put("name", "name4");
m112.put("att2", "val2");
g.addVertex("s1.v1", m112);
t2Wrote.countDown();
g.tx().commit();
} catch (InterruptedException ie) {
Assert.fail(ie.getMessage());
}
}, "Second writer");
t1.start();
t2.start();
t1.join();
t2.join();
if (isPostgres()) {
Assert.assertEquals(4L, g.traversal().V().hasLabel("s1.v1").count().next(), 0);
} else if (isHsqldb()) {
// Assert.assertEquals(3L, g.traversal().V().hasLabel("s1.v1").count().next(), 0);
} else if (isH2()) {
// Assert.assertEquals(3L, g.traversal().V().hasLabel("s1.v1").count().next(), 0);
}
}
@Test
public void testDeadLock2() throws InterruptedException {
this.sqlgGraph.addVertex(T.label, "A");
this.sqlgGraph.tx().commit();
CountDownLatch latch = new CountDownLatch(1);
Thread thread1 = new Thread(() -> {
//#1 open a transaction.
this.sqlgGraph.traversal().V().hasLabel("A").next();
try {
LOGGER.debug("await");
latch.await();
//sleep for a bit to let Thread2 first take the topology lock
Thread.sleep(1000);
LOGGER.debug("thread1 wakeup");
//This will try to take a read lock that will dead lock
this.sqlgGraph.traversal().V().hasLabel("A").next();
LOGGER.debug("thread1 complete");
this.sqlgGraph.tx().commit();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "thread1");
Thread thread2 = new Thread(() -> {
//#2 take the topology lock, adding a name field will lock the topology.
//this will not be able to complete while Thread1's transaction is still in progress.
//It locks in postgres.
latch.countDown();
this.sqlgGraph.addVertex(T.label, "A", "name", "a");
this.sqlgGraph.tx().commit();
LOGGER.debug("thread2 fini");
}, "thread2");
thread1.start();
Thread.sleep(1000);
thread2.start();
thread1.join();
thread2.join();
}
/**
* this deadlocks!
*
* @throws Exception
*/
@Test
public void testDeadlock1() throws Exception {
final Vertex v1 = sqlgGraph.addVertex(T.label, "t1", "name", "n1");
Vertex v2 = sqlgGraph.addVertex(T.label, "t1", "name", "n2");
Vertex v3 = sqlgGraph.addVertex(T.label, "t2", "name", "n3");
v1.addEdge("e1", v2);
sqlgGraph.tx().commit();
// sqlgGraph.getTopology().setLockTimeout(5);
Object o1 = new Object();
Object o2 = new Object();
AtomicInteger ok = new AtomicInteger(0);
Thread t1 = new Thread(() -> {
try {
synchronized (o1) {
o1.wait();
}
GraphTraversal<Vertex, Vertex> gt = sqlgGraph.traversal().V().hasLabel("t1").out("e1");
int cnt = 0;
// this lock the E_e1 table and then request topology read lock
while (gt.hasNext()) {
gt.next();
synchronized (o2) {
o2.notify();
}
if (cnt == 0) {
synchronized (o1) {
o1.wait(1000);
}
}
cnt++;
}
sqlgGraph.tx().commit();
ok.incrementAndGet();
} catch (Exception e) {
LOGGER.debug(e.getMessage());
}
}, "thread-1");
t1.start();
Thread t2 = new Thread(() -> {
try {
synchronized (o2) {
o2.wait();
}
// this locks the topology and then tries to modify the E_e1 table
v1.addEdge("e1", v3);
synchronized (o1) {
o1.notify();
}
sqlgGraph.tx().commit();
ok.incrementAndGet();
} catch (Exception e) {
LOGGER.debug(e.getMessage());
}
}, "thread-2");
t2.start();
Thread.sleep(1000);
synchronized (o1) {
o1.notifyAll();
}
t1.join();
t2.join();
Assert.assertEquals(2, ok.get());
}
@Test
public void testDeadlock1DifferentGraphs() throws Exception {
Vertex v1 = sqlgGraph.addVertex(T.label, "t1", "name", "n1");
Vertex v2 = sqlgGraph.addVertex(T.label, "t1", "name", "n2");
Vertex v3 = sqlgGraph.addVertex(T.label, "t2", "name", "n3");
v1.addEdge("e1", v2);
sqlgGraph.tx().commit();
Object o1 = new Object();
Object o2 = new Object();
AtomicInteger ok = new AtomicInteger(0);
Thread t1 = new Thread(() -> {
try {
synchronized (o1) {
o1.wait();
}
GraphTraversal<Vertex, Vertex> gt = sqlgGraph.traversal().V().hasLabel("t1").out("e1");
int cnt = 0;
// this lock the E_e1 table and then request topology read lock
while (gt.hasNext()) {
gt.next();
synchronized (o2) {
o2.notify();
}
if (cnt == 0) {
synchronized (o1) {
o1.wait(1000);
}
}
cnt++;
}
sqlgGraph.tx().commit();
ok.incrementAndGet();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}, "thread-1");
t1.start();
Thread t2 = new Thread(() -> {
try {
try (SqlgGraph sqlgGraph1 = SqlgGraph.open(getConfigurationClone())) {
Vertex v1b = sqlgGraph1.vertices(v1.id()).next();
Vertex v3b = sqlgGraph1.vertices(v3.id()).next();
synchronized (o2) {
o2.wait();
}
// this locks the topology and then tries to modify the E_e1 table
v1b.addEdge("e1", v3b);
synchronized (o1) {
o1.notify();
}
sqlgGraph1.tx().commit();
ok.incrementAndGet();
}
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}, "thread-2");
t2.start();
Thread.sleep(1000);
synchronized (o1) {
o1.notifyAll();
}
t1.join();
t2.join();
Assert.assertEquals(2, ok.get());
}
}