Skip to content

Commit

Permalink
GH-4954 LMDB: Fix GC and record counting for deletions (#4955)
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed May 15, 2024
2 parents ceb8156 + 25a3df5 commit ccf6250
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -500,13 +500,11 @@ public void flush() throws SailException {
}
if (activeTxn) {
if (!multiThreadingActive) {
tripleStore.commit();
filterUsedIdsInTripleStore();
}
handleRemovedIdsInValueStore();
valueStore.commit();
if (!multiThreadingActive) {
tripleStore.commit();
}
// do not set flag to false until _after_ commit is successfully completed.
storeTxnStarted.set(false);
}
Expand Down Expand Up @@ -604,9 +602,9 @@ private void startTransaction(boolean preferThreading) throws SailException {
Operation op = opQueue.remove();
if (op != null) {
if (op == COMMIT_TRANSACTION) {
tripleStore.commit();
filterUsedIdsInTripleStore();

tripleStore.commit();
nextTransactionAsync = false;
asyncTransactionFinished = true;
break;
Expand Down Expand Up @@ -712,23 +710,18 @@ private void addStatement(Resource subj, IRI pred, Value obj, boolean explicit,

private long removeStatements(long subj, long pred, long obj, boolean explicit, long[] contexts)
throws IOException {
long removeCount = 0;
long[] removeCount = { 0 };
for (long contextId : contexts) {
final Map<Long, Long> perContextCounts = new HashMap<>();
tripleStore.removeTriplesByContext(subj, pred, obj, contextId, explicit, quad -> {
perContextCounts.merge(quad[3], 1L, (c, one) -> c + one);
removeCount[0]++;
for (long id : quad) {
if (id != 0L) {
unusedIds.add(id);
}
}
});

for (Entry<Long, Long> entry : perContextCounts.entrySet()) {
removeCount += entry.getValue();
}
}
return removeCount;
return removeCount[0];
}

private long removeStatements(Resource subj, IRI pred, Value obj, boolean explicit, Resource... contexts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ final class LmdbUtil {
private LmdbUtil() {
}

static void E(int rc) throws IOException {
static int E(int rc) throws IOException {
if (rc != MDB_SUCCESS && rc != MDB_NOTFOUND) {
throw new IOException(mdb_strerror(rc));
}
return rc;
}

static <T> T readTransaction(long env, Transaction<T> transaction) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.transaction;
import static org.eclipse.rdf4j.sail.lmdb.Varint.readListUnsigned;
import static org.eclipse.rdf4j.sail.lmdb.Varint.writeListUnsigned;
import static org.eclipse.rdf4j.sail.lmdb.Varint.writeUnsigned;
import static org.lwjgl.system.MemoryStack.stackPush;
import static org.lwjgl.system.MemoryUtil.NULL;
import static org.lwjgl.util.lmdb.LMDB.MDB_CREATE;
import static org.lwjgl.util.lmdb.LMDB.MDB_FIRST;
import static org.lwjgl.util.lmdb.LMDB.MDB_KEYEXIST;
import static org.lwjgl.util.lmdb.LMDB.MDB_LAST;
import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOMETASYNC;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOOVERWRITE;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOSYNC;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOTFOUND;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOTLS;
import static org.lwjgl.util.lmdb.LMDB.MDB_PREV;
import static org.lwjgl.util.lmdb.LMDB.MDB_SET_RANGE;
import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS;
import static org.lwjgl.util.lmdb.LMDB.mdb_cmp;
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_close;
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_get;
Expand All @@ -47,6 +51,7 @@
import static org.lwjgl.util.lmdb.LMDB.mdb_put;
import static org.lwjgl.util.lmdb.LMDB.mdb_set_compare;
import static org.lwjgl.util.lmdb.LMDB.mdb_stat;
import static org.lwjgl.util.lmdb.LMDB.mdb_strerror;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_commit;
Expand Down Expand Up @@ -92,7 +97,7 @@
import org.slf4j.LoggerFactory;

/**
* LMDB-based indexed storage and retrieval of RDF statements. TripleStore stores statements in the form of four integer
* LMDB-based indexed storage and retrieval of RDF statements. TripleStore stores statements in the form of four long
* IDs. Each ID represent an RDF value that is stored in a {@link ValueStore}. The four IDs refer to the statement's
* subject, predicate, object and context. The ID <tt>0</tt> is used to represent the "null" context and doesn't map to
* an actual RDF value.
Expand Down Expand Up @@ -539,7 +544,7 @@ protected void bucketStart(double fraction, long[] lowerValues, long[] upperValu
* @throws IOException
*/
protected void filterUsedIds(Collection<Long> ids) throws IOException {
try (MemoryStack stack = stackPush()) {
readTransaction(env, (stack, txn) -> {
MDBVal maxKey = MDBVal.malloc(stack);
ByteBuffer maxKeyBuf = stack.malloc(TripleStore.MAX_KEY_LENGTH);
MDBVal keyData = MDBVal.malloc(stack);
Expand All @@ -559,7 +564,7 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
keyBuf.clear();
Varint.writeUnsigned(keyBuf, id);
keyData.mv_data(keyBuf.flip());
if (mdb_get(writeTxn, contextsDbi, keyData, valueData) == 0) {
if (mdb_get(txn, contextsDbi, keyData, valueData) == 0) {
it.remove();
}
}
Expand All @@ -580,7 +585,7 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {

long cursor = 0;
try {
E(mdb_cursor_open(writeTxn, dbi, pp));
E(mdb_cursor_open(txn, dbi, pp));
cursor = pp.get(0);

if (fullScan) {
Expand Down Expand Up @@ -626,7 +631,7 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE);
boolean exists = false;
while (!exists && rc == 0) {
if (mdb_cmp(writeTxn, dbi, keyData, maxKey) > 0) {
if (mdb_cmp(txn, dbi, keyData, maxKey) > 0) {
// id was not found
break;
} else if (!matcher.matches(keyData.mv_data())) {
Expand All @@ -649,7 +654,8 @@ protected void filterUsedIds(Collection<Long> ids) throws IOException {
}
}
}
}
return null;
});
}

protected double cardinality(long subj, long pred, long obj, long context) throws IOException {
Expand Down Expand Up @@ -842,6 +848,7 @@ private boolean requiresResize() {

public boolean storeTriple(long subj, long pred, long obj, long context, boolean explicit) throws IOException {
TripleIndex mainIndex = indexes.get(0);
boolean stAdded;
try (MemoryStack stack = MemoryStack.stackPush()) {
MDBVal keyVal = MDBVal.malloc(stack);
// use calloc to get an empty data value
Expand All @@ -851,33 +858,34 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
keyBuf.flip();
keyVal.mv_data(keyBuf);

boolean foundExplicit = mdb_get(writeTxn, mainIndex.getDB(true), keyVal, dataVal) == 0;
boolean foundImplicit = !foundExplicit && mdb_get(writeTxn, mainIndex.getDB(false), keyVal, dataVal) == 0;

boolean stAdded = !(foundExplicit || foundImplicit);
if (stAdded || explicit && foundImplicit) {
if (recordCache == null) {
if (requiresResize()) {
// map is full, resize required
recordCache = new TxnRecordCache(dir);
logger.debug("resize of map size {} required while adding - initialize record cache", mapSize);
}
if (recordCache == null) {
if (requiresResize()) {
// map is full, resize required
recordCache = new TxnRecordCache(dir);
logger.debug("resize of map size {} required while adding - initialize record cache", mapSize);
}
if (recordCache != null) {
long quad[] = new long[] { subj, pred, obj, context };
if (explicit && foundImplicit) {
// remove implicit statement
recordCache.removeRecord(quad, false);
}
// put record in cache and return immediately
return recordCache.storeRecord(quad, explicit);
}
if (recordCache != null) {
long quad[] = new long[] { subj, pred, obj, context };
if (explicit) {
// remove implicit statement
recordCache.removeRecord(quad, false);
}
// put record in cache and return immediately
return recordCache.storeRecord(quad, explicit);
}

if (explicit && foundImplicit) {
E(mdb_del(writeTxn, mainIndex.getDB(false), keyVal, dataVal));
}
E(mdb_put(writeTxn, mainIndex.getDB(explicit), keyVal, dataVal, 0));
int rc = mdb_put(writeTxn, mainIndex.getDB(explicit), keyVal, dataVal, MDB_NOOVERWRITE);
if (rc != MDB_SUCCESS && rc != MDB_KEYEXIST) {
throw new IOException(mdb_strerror(rc));
}
stAdded = rc == MDB_SUCCESS;
boolean foundImplicit = false;
if (explicit && stAdded) {
foundImplicit = E(mdb_del(writeTxn, mainIndex.getDB(false), keyVal, dataVal)) == MDB_SUCCESS;
}

if (stAdded) {
for (int i = 1; i < indexes.size(); i++) {
TripleIndex index = indexes.get(i);
keyBuf.clear();
Expand All @@ -887,7 +895,7 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
// update buffer positions in MDBVal
keyVal.mv_data(keyBuf);

if (explicit && foundImplicit) {
if (foundImplicit) {
E(mdb_del(writeTxn, mainIndex.getDB(false), keyVal, dataVal));
}
E(mdb_put(writeTxn, index.getDB(explicit), keyVal, dataVal, 0));
Expand All @@ -897,9 +905,9 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
incrementContext(stack, context);
}
}

return stAdded;
}

return stAdded;
}

private void incrementContext(MemoryStack stack, long context) throws IOException {
Expand Down Expand Up @@ -991,6 +999,7 @@ public void removeTriples(RecordIterator it, boolean explicit, Consumer<long[]>
}
if (recordCache != null) {
recordCache.removeRecord(quad, explicit);
handler.accept(quad);
continue;
}

Expand All @@ -1005,6 +1014,7 @@ public void removeTriples(RecordIterator it, boolean explicit, Consumer<long[]>
}

decrementContext(stack, quad[CONTEXT_IDX]);
handler.accept(quad);
}
} finally {
it.close();
Expand Down Expand Up @@ -1284,24 +1294,22 @@ GroupMatcher createMatcher(long subj, long pred, long obj, long context) {
}

void toKey(ByteBuffer bb, long subj, long pred, long obj, long context) {
long[] values = new long[4];
for (int i = 0; i < fieldSeq.length; i++) {
switch (fieldSeq[i]) {
case 's':
values[i] = subj;
writeUnsigned(bb, subj);
break;
case 'p':
values[i] = pred;
writeUnsigned(bb, pred);
break;
case 'o':
values[i] = obj;
writeUnsigned(bb, obj);
break;
case 'c':
values[i] = context;
writeUnsigned(bb, context);
break;
}
}
writeListUnsigned(bb, values);
}

void keyToQuad(ByteBuffer key, long[] quad) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@
package org.eclipse.rdf4j.sail.lmdb;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
import org.eclipse.rdf4j.sail.lmdb.config.LmdbStoreConfig;
Expand Down Expand Up @@ -65,6 +70,25 @@ public void testInferredStmts() throws Exception {
}
}

@Test
public void testGc() throws Exception {
tripleStore.startTransaction();
tripleStore.storeTriple(1, 2, 3, 1, true);
tripleStore.storeTriple(1, 2, 4, 1, true);
tripleStore.storeTriple(1, 2, 5, 1, true);
tripleStore.storeTriple(1, 6, 7, 1, true);
tripleStore.storeTriple(1, 6, 7, 8, true);
Set<Long> removed = new HashSet<>();
tripleStore.removeTriplesByContext(1, 6, -1, -1, true, quad -> {
for (Long c : quad) {
removed.add(c);
}
});
tripleStore.commit();
tripleStore.filterUsedIds(removed);
assertEquals(Arrays.asList(6L, 7L, 8L), removed.stream().sorted().collect(Collectors.toList()));
}

@AfterEach
public void after() throws Exception {
tripleStore.close();
Expand Down

0 comments on commit ccf6250

Please sign in to comment.