diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java index 0d437cabee..e0ea0ab7c0 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java @@ -500,13 +500,11 @@ public void flush() throws SailException { } if (activeTxn) { if (!multiThreadingActive) { + tripleStore.commit(); filterUsedIdsInTripleStore(); } handleRemovedIdsInValueStore(); valueStore.commit(); - if (!multiThreadingActive) { - tripleStore.commit(); - } // do not set flag to false until _after_ commit is successfully completed. storeTxnStarted.set(false); } @@ -604,9 +602,9 @@ private void startTransaction(boolean preferThreading) throws SailException { Operation op = opQueue.remove(); if (op != null) { if (op == COMMIT_TRANSACTION) { + tripleStore.commit(); filterUsedIdsInTripleStore(); - tripleStore.commit(); nextTransactionAsync = false; asyncTransactionFinished = true; break; @@ -712,23 +710,18 @@ private void addStatement(Resource subj, IRI pred, Value obj, boolean explicit, private long removeStatements(long subj, long pred, long obj, boolean explicit, long[] contexts) throws IOException { - long removeCount = 0; + long[] removeCount = { 0 }; for (long contextId : contexts) { - final Map perContextCounts = new HashMap<>(); tripleStore.removeTriplesByContext(subj, pred, obj, contextId, explicit, quad -> { - perContextCounts.merge(quad[3], 1L, (c, one) -> c + one); + removeCount[0]++; for (long id : quad) { if (id != 0L) { unusedIds.add(id); } } }); - - for (Entry entry : perContextCounts.entrySet()) { - removeCount += entry.getValue(); - } } - return removeCount; + return removeCount[0]; } private long removeStatements(Resource subj, IRI pred, Value obj, boolean explicit, Resource... contexts) diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbUtil.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbUtil.java index 6f9603a6da..349683628a 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbUtil.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbUtil.java @@ -51,10 +51,11 @@ final class LmdbUtil { private LmdbUtil() { } - static void E(int rc) throws IOException { + static int E(int rc) throws IOException { if (rc != MDB_SUCCESS && rc != MDB_NOTFOUND) { throw new IOException(mdb_strerror(rc)); } + return rc; } static T readTransaction(long env, Transaction transaction) throws IOException { diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java index b1ea7ad759..c352415fb3 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java @@ -16,18 +16,22 @@ import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.transaction; import static org.eclipse.rdf4j.sail.lmdb.Varint.readListUnsigned; import static org.eclipse.rdf4j.sail.lmdb.Varint.writeListUnsigned; +import static org.eclipse.rdf4j.sail.lmdb.Varint.writeUnsigned; import static org.lwjgl.system.MemoryStack.stackPush; import static org.lwjgl.system.MemoryUtil.NULL; import static org.lwjgl.util.lmdb.LMDB.MDB_CREATE; import static org.lwjgl.util.lmdb.LMDB.MDB_FIRST; +import static org.lwjgl.util.lmdb.LMDB.MDB_KEYEXIST; import static org.lwjgl.util.lmdb.LMDB.MDB_LAST; import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT; import static org.lwjgl.util.lmdb.LMDB.MDB_NOMETASYNC; +import static org.lwjgl.util.lmdb.LMDB.MDB_NOOVERWRITE; import static org.lwjgl.util.lmdb.LMDB.MDB_NOSYNC; import static org.lwjgl.util.lmdb.LMDB.MDB_NOTFOUND; import static org.lwjgl.util.lmdb.LMDB.MDB_NOTLS; import static org.lwjgl.util.lmdb.LMDB.MDB_PREV; import static org.lwjgl.util.lmdb.LMDB.MDB_SET_RANGE; +import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS; import static org.lwjgl.util.lmdb.LMDB.mdb_cmp; import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_close; import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_get; @@ -47,6 +51,7 @@ import static org.lwjgl.util.lmdb.LMDB.mdb_put; import static org.lwjgl.util.lmdb.LMDB.mdb_set_compare; import static org.lwjgl.util.lmdb.LMDB.mdb_stat; +import static org.lwjgl.util.lmdb.LMDB.mdb_strerror; import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort; import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin; import static org.lwjgl.util.lmdb.LMDB.mdb_txn_commit; @@ -92,7 +97,7 @@ import org.slf4j.LoggerFactory; /** - * LMDB-based indexed storage and retrieval of RDF statements. TripleStore stores statements in the form of four integer + * LMDB-based indexed storage and retrieval of RDF statements. TripleStore stores statements in the form of four long * IDs. Each ID represent an RDF value that is stored in a {@link ValueStore}. The four IDs refer to the statement's * subject, predicate, object and context. The ID 0 is used to represent the "null" context and doesn't map to * an actual RDF value. @@ -539,7 +544,7 @@ protected void bucketStart(double fraction, long[] lowerValues, long[] upperValu * @throws IOException */ protected void filterUsedIds(Collection ids) throws IOException { - try (MemoryStack stack = stackPush()) { + readTransaction(env, (stack, txn) -> { MDBVal maxKey = MDBVal.malloc(stack); ByteBuffer maxKeyBuf = stack.malloc(TripleStore.MAX_KEY_LENGTH); MDBVal keyData = MDBVal.malloc(stack); @@ -559,7 +564,7 @@ protected void filterUsedIds(Collection ids) throws IOException { keyBuf.clear(); Varint.writeUnsigned(keyBuf, id); keyData.mv_data(keyBuf.flip()); - if (mdb_get(writeTxn, contextsDbi, keyData, valueData) == 0) { + if (mdb_get(txn, contextsDbi, keyData, valueData) == 0) { it.remove(); } } @@ -580,7 +585,7 @@ protected void filterUsedIds(Collection ids) throws IOException { long cursor = 0; try { - E(mdb_cursor_open(writeTxn, dbi, pp)); + E(mdb_cursor_open(txn, dbi, pp)); cursor = pp.get(0); if (fullScan) { @@ -626,7 +631,7 @@ protected void filterUsedIds(Collection ids) throws IOException { int rc = mdb_cursor_get(cursor, keyData, valueData, MDB_SET_RANGE); boolean exists = false; while (!exists && rc == 0) { - if (mdb_cmp(writeTxn, dbi, keyData, maxKey) > 0) { + if (mdb_cmp(txn, dbi, keyData, maxKey) > 0) { // id was not found break; } else if (!matcher.matches(keyData.mv_data())) { @@ -649,7 +654,8 @@ protected void filterUsedIds(Collection ids) throws IOException { } } } - } + return null; + }); } protected double cardinality(long subj, long pred, long obj, long context) throws IOException { @@ -842,6 +848,7 @@ private boolean requiresResize() { public boolean storeTriple(long subj, long pred, long obj, long context, boolean explicit) throws IOException { TripleIndex mainIndex = indexes.get(0); + boolean stAdded; try (MemoryStack stack = MemoryStack.stackPush()) { MDBVal keyVal = MDBVal.malloc(stack); // use calloc to get an empty data value @@ -851,33 +858,34 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean keyBuf.flip(); keyVal.mv_data(keyBuf); - boolean foundExplicit = mdb_get(writeTxn, mainIndex.getDB(true), keyVal, dataVal) == 0; - boolean foundImplicit = !foundExplicit && mdb_get(writeTxn, mainIndex.getDB(false), keyVal, dataVal) == 0; - - boolean stAdded = !(foundExplicit || foundImplicit); - if (stAdded || explicit && foundImplicit) { - if (recordCache == null) { - if (requiresResize()) { - // map is full, resize required - recordCache = new TxnRecordCache(dir); - logger.debug("resize of map size {} required while adding - initialize record cache", mapSize); - } + if (recordCache == null) { + if (requiresResize()) { + // map is full, resize required + recordCache = new TxnRecordCache(dir); + logger.debug("resize of map size {} required while adding - initialize record cache", mapSize); } - if (recordCache != null) { - long quad[] = new long[] { subj, pred, obj, context }; - if (explicit && foundImplicit) { - // remove implicit statement - recordCache.removeRecord(quad, false); - } - // put record in cache and return immediately - return recordCache.storeRecord(quad, explicit); + } + if (recordCache != null) { + long quad[] = new long[] { subj, pred, obj, context }; + if (explicit) { + // remove implicit statement + recordCache.removeRecord(quad, false); } + // put record in cache and return immediately + return recordCache.storeRecord(quad, explicit); + } - if (explicit && foundImplicit) { - E(mdb_del(writeTxn, mainIndex.getDB(false), keyVal, dataVal)); - } - E(mdb_put(writeTxn, mainIndex.getDB(explicit), keyVal, dataVal, 0)); + int rc = mdb_put(writeTxn, mainIndex.getDB(explicit), keyVal, dataVal, MDB_NOOVERWRITE); + if (rc != MDB_SUCCESS && rc != MDB_KEYEXIST) { + throw new IOException(mdb_strerror(rc)); + } + stAdded = rc == MDB_SUCCESS; + boolean foundImplicit = false; + if (explicit && stAdded) { + foundImplicit = E(mdb_del(writeTxn, mainIndex.getDB(false), keyVal, dataVal)) == MDB_SUCCESS; + } + if (stAdded) { for (int i = 1; i < indexes.size(); i++) { TripleIndex index = indexes.get(i); keyBuf.clear(); @@ -887,7 +895,7 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean // update buffer positions in MDBVal keyVal.mv_data(keyBuf); - if (explicit && foundImplicit) { + if (foundImplicit) { E(mdb_del(writeTxn, mainIndex.getDB(false), keyVal, dataVal)); } E(mdb_put(writeTxn, index.getDB(explicit), keyVal, dataVal, 0)); @@ -897,9 +905,9 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean incrementContext(stack, context); } } - - return stAdded; } + + return stAdded; } private void incrementContext(MemoryStack stack, long context) throws IOException { @@ -991,6 +999,7 @@ public void removeTriples(RecordIterator it, boolean explicit, Consumer } if (recordCache != null) { recordCache.removeRecord(quad, explicit); + handler.accept(quad); continue; } @@ -1005,6 +1014,7 @@ public void removeTriples(RecordIterator it, boolean explicit, Consumer } decrementContext(stack, quad[CONTEXT_IDX]); + handler.accept(quad); } } finally { it.close(); @@ -1284,24 +1294,22 @@ GroupMatcher createMatcher(long subj, long pred, long obj, long context) { } void toKey(ByteBuffer bb, long subj, long pred, long obj, long context) { - long[] values = new long[4]; for (int i = 0; i < fieldSeq.length; i++) { switch (fieldSeq[i]) { case 's': - values[i] = subj; + writeUnsigned(bb, subj); break; case 'p': - values[i] = pred; + writeUnsigned(bb, pred); break; case 'o': - values[i] = obj; + writeUnsigned(bb, obj); break; case 'c': - values[i] = context; + writeUnsigned(bb, context); break; } } - writeListUnsigned(bb, values); } void keyToQuad(ByteBuffer key, long[] quad) { diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/TripleStoreTest.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/TripleStoreTest.java index 6a159eecd5..32e20f2e76 100644 --- a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/TripleStoreTest.java +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/TripleStoreTest.java @@ -11,8 +11,13 @@ package org.eclipse.rdf4j.sail.lmdb; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.File; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn; import org.eclipse.rdf4j.sail.lmdb.config.LmdbStoreConfig; @@ -65,6 +70,25 @@ public void testInferredStmts() throws Exception { } } + @Test + public void testGc() throws Exception { + tripleStore.startTransaction(); + tripleStore.storeTriple(1, 2, 3, 1, true); + tripleStore.storeTriple(1, 2, 4, 1, true); + tripleStore.storeTriple(1, 2, 5, 1, true); + tripleStore.storeTriple(1, 6, 7, 1, true); + tripleStore.storeTriple(1, 6, 7, 8, true); + Set removed = new HashSet<>(); + tripleStore.removeTriplesByContext(1, 6, -1, -1, true, quad -> { + for (Long c : quad) { + removed.add(c); + } + }); + tripleStore.commit(); + tripleStore.filterUsedIds(removed); + assertEquals(Arrays.asList(6L, 7L, 8L), removed.stream().sorted().collect(Collectors.toList())); + } + @AfterEach public void after() throws Exception { tripleStore.close();