diff --git a/.eslintrc.json b/.eslintrc.json index 5b0cebc2..41df1eab 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -13,6 +13,7 @@ "extends": ["eslint:recommended", "plugin:jsdoc/recommended", "plugin:@typescript-eslint/recommended"], "rules": { "@typescript-eslint/no-floating-promises": 1, + "@typescript-eslint/no-unused-vars": ["warn", { "argsIgnorePattern": "^_", "varsIgnorePattern": "^_" }], "jsdoc/require-description": 1, "jsdoc/require-throws": 1, diff --git a/integration/registry.test.ts b/integration/registry.test.ts index 8f1be556..6812e9d7 100644 --- a/integration/registry.test.ts +++ b/integration/registry.test.ts @@ -65,4 +65,20 @@ describe(`Registry end to end integration tests for portal '${portal}'`, () => { expect(returnedEntry).toEqual(entry); }); + + it("Should fail to set an entry with a revision number that's too low", async () => { + const { privateKey } = genKeyPairAndSeed(); + + const entry = { + dataKey, + data: new Uint8Array(), + revision: BigInt(1), + }; + + await client.registry.setEntry(privateKey, entry); + entry.revision--; + await expect(client.registry.setEntry(privateKey, entry)).rejects.toThrowError( + "Unable to update the registry: provided revision number is invalid" + ); + }); }); diff --git a/integration/skydb.test.ts b/integration/skydb.test.ts index f1e83859..fbc04b5b 100644 --- a/integration/skydb.test.ts +++ b/integration/skydb.test.ts @@ -1,10 +1,23 @@ import { client, dataKey, portal } from "."; -import { ExecuteRequestError, genKeyPairAndSeed, getEntryLink, URI_SKYNET_PREFIX } from "../src"; +import { + ExecuteRequestError, + genKeyPairAndSeed, + getEntryLink, + JsonData, + JSONResponse, + SkynetClient, + URI_SKYNET_PREFIX, +} from "../src"; import { hashDataKey } from "../src/crypto"; import { decodeSkylinkBase64 } from "../src/utils/encoding"; import { toHexString } from "../src/utils/string"; describe(`SkyDB end to end integration tests for portal '${portal}'`, () => { + // Sleep for a second before each test to try to avoid rate limiter. + beforeEach(async () => { + await new Promise((r) => setTimeout(r, 1000)); + }); + it("Should get existing SkyDB data", async () => { const publicKey = "89e5147864297b80f5ddf29711ba8c093e724213b0dcbefbc3860cc6d598cc35"; const dataKey = "dataKey1"; @@ -241,4 +254,260 @@ describe(`SkyDB end to end integration tests for portal '${portal}'`, () => { expect(data).toEqual(json); }); + + it("Should update the revision number cache", async () => { + const { publicKey, privateKey } = genKeyPairAndSeed(); + const json = { message: 1 }; + + await client.db.setJSON(privateKey, dataKey, json); + + const cachedRevisionEntry = await client.db.revisionNumberCache.getRevisionAndMutexForEntry(publicKey, dataKey); + expect(cachedRevisionEntry.revision.toString()).toEqual("0"); + + await client.db.setJSON(privateKey, dataKey, json); + + expect(cachedRevisionEntry.revision.toString()).toEqual("1"); + + await client.db.getJSON(publicKey, dataKey); + + expect(cachedRevisionEntry.revision.toString()).toEqual("1"); + }); + + // REGRESSION TESTS: By creating a gap between setJSON and getJSON, a user + // could call getJSON, get outdated data, then call setJSON, and overwrite + // more up to date data with outdated data, but still use a high enough + // revision number. + // + // The fix is that you cannot retrieve the revision number while calling + // setJSON. You have to use the same revision number that you had when you + // called getJSON. + describe("getJSON/setJSON data race regression integration tests", () => { + const jsonOld = { message: 1 }; + const jsonNew = { message: 2 }; + + const delays = [0, 10, 100, 500]; + + const concurrentAccessError = "Concurrent access prevented in SkyDB"; + const registryUpdateError = "Unable to update the registry"; + + const getJSONWithDelay = async function ( + client: SkynetClient, + delay: number, + publicKey: string, + dataKey: string + ): Promise { + await new Promise((r) => setTimeout(r, delay)); + return await client.db.getJSON(publicKey, dataKey); + }; + const setJSONWithDelay = async function ( + client: SkynetClient, + delay: number, + privateKey: string, + dataKey: string, + data: JsonData + ) { + await new Promise((r) => setTimeout(r, delay)); + return await client.db.setJSON(privateKey, dataKey, data); + }; + + it.each(delays)( + "should not get old data when getJSON is called after setJSON on a single client with a '%s' ms delay and getJSON doesn't fail", + async (delay) => { + const { publicKey, privateKey } = genKeyPairAndSeed(); + + // Set the data. + await client.db.setJSON(privateKey, dataKey, jsonOld); + + // Try to invoke the data race. + let receivedJson; + try { + // Get the data while also calling setJSON. + [{ data: receivedJson }] = await Promise.all([ + getJSONWithDelay(client, delay, publicKey, dataKey), + setJSONWithDelay(client, 0, privateKey, dataKey, jsonNew), + ]); + } catch (e) { + if ((e as Error).message.includes(concurrentAccessError)) { + // The data race condition has been prevented and we received the + // expected error. Return from test early. + // + // NOTE: I've manually confirmed that both code paths (no error, and + // return on expected error) are hit. + return; + } + + // Unexpected error, throw. + throw e; + } + + // Data race did not occur, getJSON should have latest JSON. + expect(receivedJson).toEqual(jsonNew); + } + ); + + // NOTE: We can't guarantee that data won't be lost if two (or more) actors + // write to the registry at the same time, but we can guarantee that the + // final state will be the desired final state by at least one of the + // actors. One of the two clients will lose, but the other will win and be + // consistent, so the data won't be corrupt, it'll just be missing one + // update. + it.each(delays)( + "should get either old or new data when getJSON is called after setJSON on two different clients with a '%s' ms delay", + async (delay) => { + // Create two new clients with a fresh revision cache. + const client1 = new SkynetClient(portal); + const client2 = new SkynetClient(portal); + const { publicKey, privateKey } = genKeyPairAndSeed(); + + // Get revision entry cache handles. + const cachedRevisionEntry1 = await client1.db.revisionNumberCache.getRevisionAndMutexForEntry( + publicKey, + dataKey + ); + const cachedRevisionEntry2 = await client2.db.revisionNumberCache.getRevisionAndMutexForEntry( + publicKey, + dataKey + ); + + // Set the initial data. + { + await client1.db.setJSON(privateKey, dataKey, jsonOld); + expect(cachedRevisionEntry1.revision.toString()).toEqual("0"); + expect(cachedRevisionEntry2.revision.toString()).toEqual("-1"); + } + + // Call getJSON and setJSON concurrently on different clients -- both + // should succeeed. + { + // Get the data while also calling setJSON. + const [_, { data: receivedJson }] = await Promise.all([ + setJSONWithDelay(client1, 0, privateKey, dataKey, jsonNew), + getJSONWithDelay(client2, delay, publicKey, dataKey), + ]); + + // See if we got the new or old data. + expect(receivedJson).not.toBeNull(); + expect(cachedRevisionEntry1.revision.toString()).toEqual("1"); + if (receivedJson?.message === jsonNew.message) { + expect(cachedRevisionEntry2.revision.toString()).toEqual("1"); + // Return if we got the new data -- both clients are in sync. + // + // NOTE: I've manually confirmed that both code paths (old data and + // new data) are hit. + return; + } + // client2 should have old data and cached revision at this point. + expect(receivedJson).toEqual(jsonOld); + expect(cachedRevisionEntry2.revision.toString()).toEqual("0"); + } + + // If we got old data and an old revision from getJSON, the client may + // still be able to write to that entry, overwriting the new data. + // + // Try to update the entry with client2 which has the old revision. + const updatedJson = { message: 3 }; + let expectedJson: JsonData; + try { + await client2.db.setJSON(privateKey, dataKey, updatedJson); + expectedJson = updatedJson; + } catch (e) { + // Catches both "doesn't have enough pow" and "provided revision number + // is already registered" errors. + if ((e as Error).message.includes(registryUpdateError)) { + // NOTE: I've manually confirmed that both code paths (no error, and + // return on expected error) are hit. + expectedJson = jsonNew; + } else { + // Unexpected error, throw. + throw e; + } + } + + // The entry should have the overriden, updated data at this point. + await Promise.all([ + async () => { + const { data: receivedJson } = await client1.db.getJSON(publicKey, dataKey); + expect(cachedRevisionEntry1.revision.toString()).toEqual("1"); + expect(receivedJson).toEqual(expectedJson); + }, + async () => { + const { data: receivedJson } = await client2.db.getJSON(publicKey, dataKey); + expect(cachedRevisionEntry2.revision.toString()).toEqual("1"); + expect(receivedJson).toEqual(expectedJson); + }, + ]); + } + ); + + it.each(delays)( + "should make sure that two concurrent setJSON calls on a single client with a '%s' ms delay either fail with the right error or succeed ", + async (delay) => { + const { publicKey, privateKey } = genKeyPairAndSeed(); + + // Try to invoke two concurrent setJSON calls. + try { + await Promise.all([ + setJSONWithDelay(client, delay, privateKey, dataKey, jsonNew), + setJSONWithDelay(client, 0, privateKey, dataKey, jsonOld), + ]); + } catch (e) { + if ((e as Error).message.includes(concurrentAccessError)) { + // The data race condition has been prevented and we received the + // expected error. Return from test early. + // + // NOTE: I've manually confirmed that both code paths (no error, and + // return on expected error) are hit. + return; + } + + // Unexpected error, throw. + throw e; + } + + // Data race did not occur, getJSON should get latest JSON. + const { data: receivedJson } = await client.db.getJSON(publicKey, dataKey); + expect(receivedJson).toEqual(jsonNew); + } + ); + + it.each(delays)( + "should make sure that two concurrent setJSON calls on different clients with a '%s' ms delay fail with the right error or succeed", + async (delay) => { + // Create two new clients with a fresh revision cache. + const client1 = new SkynetClient(portal); + const client2 = new SkynetClient(portal); + const { publicKey, privateKey } = genKeyPairAndSeed(); + + // Try to invoke two concurrent setJSON calls. + try { + await Promise.all([ + setJSONWithDelay(client2, delay, privateKey, dataKey, jsonNew), + setJSONWithDelay(client1, 0, privateKey, dataKey, jsonOld), + ]); + } catch (e) { + if ((e as Error).message.includes(registryUpdateError)) { + // The data race condition has been prevented and we received the + // expected error. Return from test early. + // + // NOTE: I've manually confirmed that both code paths (no error, and + // return on expected error) are hit. + return; + } + + // Unexpected error, throw. + throw e; + } + + // Data race did not occur, getJSON should get one of the JSON values. + let client3; + if (Math.random() < 0.5) { + client3 = client1; + } else { + client3 = client2; + } + const { data: receivedJson } = await client3.db.getJSON(publicKey, dataKey); + expect([jsonOld, jsonNew]).toContainEqual(receivedJson); + } + ); + }); }); diff --git a/jest.config.ts b/jest.config.ts index c1a69876..8ce02cf2 100644 --- a/jest.config.ts +++ b/jest.config.ts @@ -11,7 +11,7 @@ const config: Config.InitialOptions = { preset: "ts-jest", // From old package.json. - testTimeout: 90000, + testTimeout: 120000, // Automatically clear mock calls and instances between every test clearMocks: true, // An array of glob patterns indicating a set of files for which coverage information should be collected diff --git a/package.json b/package.json index 36c6f87a..653b6602 100644 --- a/package.json +++ b/package.json @@ -52,6 +52,7 @@ }, "homepage": "https://github.com/SkynetLabs/skynet-js", "dependencies": { + "async-mutex": "^0.3.2", "axios": "^0.24.0", "base32-decode": "^1.0.0", "base32-encode": "^1.1.1", diff --git a/src/client.ts b/src/client.ts index f6c87b25..c5b41139 100644 --- a/src/client.ts +++ b/src/client.ts @@ -32,6 +32,7 @@ import { } from "./file"; import { pinSkylink } from "./pin"; import { getEntry, getEntryLinkAsync, getEntryUrl, setEntry, postSignedEntry } from "./registry"; +import { RevisionNumberCache } from "./revision_cache"; import { deleteJSON, getJSON, @@ -182,6 +183,10 @@ export class SkynetClient { getEntryData: getEntryData.bind(this), setEntryData: setEntryData.bind(this), deleteEntryData: deleteEntryData.bind(this), + + // Holds the cached revision numbers, protected by mutexes to prevent + // concurrent access. + revisionNumberCache: new RevisionNumberCache(), }; // Registry diff --git a/src/mysky/index.ts b/src/mysky/index.ts index 5965a809..27601697 100644 --- a/src/mysky/index.ts +++ b/src/mysky/index.ts @@ -29,14 +29,13 @@ import { DEFAULT_SET_JSON_OPTIONS, CustomGetJSONOptions, CustomSetJSONOptions, - getOrCreateRegistryEntry, + getOrCreateSkyDBRegistryEntry, JSONResponse, - getNextRegistryEntry, - getOrCreateRawBytesRegistryEntry, validateEntryData, CustomSetEntryDataOptions, DEFAULT_SET_ENTRY_DATA_OPTIONS, DELETION_ENTRY_DATA, + incrementRevision, } from "../skydb"; import { Signature } from "../crypto"; import { deriveDiscoverableFileTweak } from "./tweak"; @@ -326,6 +325,10 @@ export class MySky { return await this.connector.connection.remoteHandle().call("userID"); } + // ============= + // SkyDB methods + // ============= + /** * Gets Discoverable JSON at the given path through MySky, if the user has * given Discoverable Read permissions to do so. @@ -395,14 +398,33 @@ export class MySky { const dataKey = deriveDiscoverableFileTweak(path); opts.hashedDataKeyHex = true; // Do not hash the tweak anymore. - const [entry, dataLink] = await getOrCreateRegistryEntry(this.connector.client, publicKey, dataKey, json, opts); - - const signature = await this.signRegistryEntry(entry, path); - - const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); - await this.connector.client.registry.postSignedEntry(publicKey, entry, signature, setEntryOpts); - - return { data: json, dataLink }; + // Immediately fail if the mutex is not available. + return await this.connector.client.db.revisionNumberCache.withCachedEntryLock( + publicKey, + dataKey, + async (cachedRevisionEntry) => { + // Get the cached revision number before doing anything else. + const newRevision = incrementRevision(cachedRevisionEntry.revision); + + // Call SkyDB helper to create the registry entry. We can't call SkyDB's + // setJSON here directly because we need MySky to sign the entry, instead of + // signing it ourselves with a given private key. + const [entry, dataLink] = await getOrCreateSkyDBRegistryEntry( + this.connector.client, + dataKey, + json, + newRevision, + opts + ); + + const signature = await this.signRegistryEntry(entry, path); + + const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); + await this.connector.client.registry.postSignedEntry(publicKey, entry, signature, setEntryOpts); + + return { data: json, dataLink }; + } + ); } /** @@ -424,9 +446,15 @@ export class MySky { ...customOptions, }; + // We re-implement deleteJSON instead of calling SkyDB's deleteJSON so that + // MySky can do the signing. await this.setEntryData(path, DELETION_ENTRY_DATA, { ...opts, allowDeletionEntryData: true }); } + // ================== + // Entry Data Methods + // ================== + /** * Sets entry at the given path to point to the data link. Like setJSON, but it doesn't upload a file. * @@ -493,21 +521,33 @@ export class MySky { ...customOptions, }; + // We re-implement setEntryData instead of calling SkyDB's setEntryData so + // that MySky can do the signing. + validateEntryData(data, opts.allowDeletionEntryData); const publicKey = await this.userID(); const dataKey = deriveDiscoverableFileTweak(path); opts.hashedDataKeyHex = true; // Do not hash the tweak anymore. - const getEntryOpts = extractOptions(opts, DEFAULT_GET_ENTRY_OPTIONS); - const entry = await getNextRegistryEntry(this.connector.client, publicKey, dataKey, data, getEntryOpts); + // Immediately fail if the mutex is not available. + return await this.connector.client.db.revisionNumberCache.withCachedEntryLock( + publicKey, + dataKey, + async (cachedRevisionEntry) => { + // Get the cached revision number before doing anything else. + const newRevision = incrementRevision(cachedRevisionEntry.revision); + + const entry = { dataKey, data, revision: newRevision }; - const signature = await this.signRegistryEntry(entry, path); + const signature = await this.signRegistryEntry(entry, path); - const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); - await this.connector.client.registry.postSignedEntry(publicKey, entry, signature, setEntryOpts); + const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); + await this.connector.client.registry.postSignedEntry(publicKey, entry, signature, setEntryOpts); - return { data: entry.data }; + return { data: entry.data }; + } + ); } /** @@ -624,20 +664,32 @@ export class MySky { const [publicKey, pathSeed] = await Promise.all([this.userID(), this.getEncryptedPathSeed(path, false)]); const dataKey = deriveEncryptedFileTweak(pathSeed); opts.hashedDataKeyHex = true; // Do not hash the tweak anymore. - const encryptionKey = deriveEncryptedFileKeyEntropy(pathSeed); - // Pad and encrypt json file. - const data = encryptJSONFile(json, { version: ENCRYPTED_JSON_RESPONSE_VERSION }, encryptionKey); + // Immediately fail if the mutex is not available. + return await this.connector.client.db.revisionNumberCache.withCachedEntryLock( + publicKey, + dataKey, + async (cachedRevisionEntry) => { + // Get the cached revision number before doing anything else. + const newRevision = incrementRevision(cachedRevisionEntry.revision); - const entry = await getOrCreateRawBytesRegistryEntry(this.connector.client, publicKey, dataKey, data, opts); + // Derive the key. + const encryptionKey = deriveEncryptedFileKeyEntropy(pathSeed); - // Call MySky which checks for write permissions on the path. - const signature = await this.signEncryptedRegistryEntry(entry, path); + // Pad and encrypt json file. + const data = encryptJSONFile(json, { version: ENCRYPTED_JSON_RESPONSE_VERSION }, encryptionKey); - const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); - await this.connector.client.registry.postSignedEntry(publicKey, entry, signature, setEntryOpts); + const [entry] = await getOrCreateSkyDBRegistryEntry(this.connector.client, dataKey, data, newRevision, opts); - return { data: json }; + // Call MySky which checks for write permissions on the path. + const signature = await this.signEncryptedRegistryEntry(entry, path); + + const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); + await this.connector.client.registry.postSignedEntry(publicKey, entry, signature, setEntryOpts); + + return { data: json }; + } + ); } /** diff --git a/src/registry.test.ts b/src/registry.test.ts index 52da2782..e4385ad9 100644 --- a/src/registry.test.ts +++ b/src/registry.test.ts @@ -5,14 +5,15 @@ import { genKeyPairAndSeed, SIGNATURE_LENGTH } from "./crypto"; import { SkynetClient, defaultSkynetPortalUrl, genKeyPairFromSeed } from "./index"; import { getEntryLink, getEntryUrlForPortal, signEntry, validateRegistryProof } from "./registry"; import { uriSkynetPrefix } from "./utils/url"; -import { stringToUint8ArrayUtf8 } from "./utils/string"; +import { hexToUint8Array, stringToUint8ArrayUtf8 } from "./utils/string"; const { publicKey, privateKey } = genKeyPairFromSeed("insecure test seed"); const portalUrl = defaultSkynetPortalUrl; const client = new SkynetClient(portalUrl); const dataKey = "app"; -const registryLookupUrl = getEntryUrlForPortal(portalUrl, publicKey, dataKey); +const registryPostUrl = `${portalUrl}/skynet/registry`; +const registryGetUrl = getEntryUrlForPortal(portalUrl, publicKey, dataKey); describe("getEntry", () => { let mock: MockAdapter; @@ -23,13 +24,14 @@ describe("getEntry", () => { }); it("should throw if the response status is not in the 200s and not 404 and JSON is returned", async () => { - mock.onGet(registryLookupUrl).replyOnce(400, JSON.stringify({ message: "foo error" })); + mock.onGet(registryGetUrl).replyOnce(400, JSON.stringify({ message: "foo error" })); - await expect(client.registry.getEntry(publicKey, dataKey)).rejects.toThrowError( - "Request failed with status code 400" + await expect(client.registry.getEntry(publicKey, dataKey)).rejects.toEqual( + new Error("Request failed with status code 400: foo error") ); }); + // In the case of a 429 error due to rate limiting, all we get is HTML. it("should throw if the response status is not in the 200s and not 404 and HTML is returned", async () => { const responseHTML = ` 429 Too Many Requests @@ -39,10 +41,10 @@ describe("getEntry", () => { `; - mock.onGet(registryLookupUrl).replyOnce(429, responseHTML); + mock.onGet(registryGetUrl).replyOnce(429, responseHTML); - await expect(client.registry.getEntry(publicKey, dataKey)).rejects.toThrowError( - "Request failed with status code 429" + await expect(client.registry.getEntry(publicKey, dataKey)).rejects.toEqual( + new Error("Request failed with status code 429") ); }); @@ -55,7 +57,7 @@ describe("getEntry", () => { "33d14d2889cb292142614da0e0ff13a205c4867961276001471d13b779fc9032568ddd292d9e0dff69d7b1f28be07972cc9d86da3cecf3adecb6f9b7311af808", }; - mock.onGet(registryLookupUrl).replyOnce(200, JSON.stringify(entryData)); + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); await expect(client.registry.getEntry(publicKey, dataKey)).rejects.toThrow(); }); @@ -67,7 +69,7 @@ describe("getEntry", () => { }); it("Should throw on incomplete response from registry GET", async () => { - mock.onGet(registryLookupUrl).replyOnce(200, "{}"); + mock.onGet(registryGetUrl).replyOnce(200, "{}"); await expect(client.registry.getEntry(publicKey, dataKey)).rejects.toThrowError( "Did not get a complete entry response" @@ -125,6 +127,31 @@ describe("getEntryUrl", () => { }); describe("setEntry", () => { + let mock: MockAdapter; + + beforeEach(() => { + mock = new MockAdapter(axios); + mock.resetHistory(); + }); + + it("Should sign and set a valid registry entry", async () => { + // mock a successful registry update + mock.onPost(registryPostUrl).replyOnce(204); + + // Hex-encoded skylink. + const data = hexToUint8Array( + "43414241425f31447430464a73787173755f4a34546f644e4362434776744666315579735f3345677a4f6c546367" + ); + + const entry = { + data, + dataKey, + revision: BigInt(11), + }; + + await client.registry.setEntry(privateKey, entry); + }); + it("Should throw an error if the private key is not hex-encoded", async () => { // @ts-expect-error We pass an invalid private key on purpose. await expect(client.registry.setEntry("foo", {})).rejects.toThrowError( diff --git a/src/registry.ts b/src/registry.ts index 0172fcc0..f198160b 100644 --- a/src/registry.ts +++ b/src/registry.ts @@ -576,11 +576,11 @@ export function validateRegistryProof( } /** - * Handles error responses returned in getEntry. + * Handles error responses returned from getEntry endpoint. * * @param err - The error. * @returns - An empty signed registry entry if the status code is 404. - * @throws - Will throw if the status code is not 404. + * @throws - Will throw if the error response is malformed, or the error message otherwise if the error status code is not 404. */ function handleGetEntryErrResponse(err: ExecuteRequestError): SignedRegistryEntry { // Check if status was 404 "not found" and return null if so. @@ -588,6 +588,7 @@ function handleGetEntryErrResponse(err: ExecuteRequestError): SignedRegistryEntr return { entry: null, signature: null }; } + // Return the error message from skyd. throw err; } diff --git a/src/request.ts b/src/request.ts index 56ea4f6b..acd9d574 100644 --- a/src/request.ts +++ b/src/request.ts @@ -100,7 +100,7 @@ export class ExecuteRequestError extends Error { static From(err: AxiosError): ExecuteRequestError { /* istanbul ignore next */ if (!err.response) { - return new ExecuteRequestError(`Error repsonse did not contain expected field 'response'.`, err, null, null); + return new ExecuteRequestError(`Error response did not contain expected field 'response'.`, err, null, null); } /* istanbul ignore next */ if (!err.response.status) { @@ -117,11 +117,11 @@ export class ExecuteRequestError extends Error { // If we don't get an error message from skyd, just return the status code. /* istanbul ignore next */ if (!err.response.data) { - return new ExecuteRequestError(`Request failed with status code ${status}.`, err, status, null); + return new ExecuteRequestError(`Request failed with status code ${status}`, err, status, null); } /* istanbul ignore next */ if (!err.response.data.message) { - return new ExecuteRequestError(`Request failed with status code ${status}.`, err, status, null); + return new ExecuteRequestError(`Request failed with status code ${status}`, err, status, null); } // Return the error message from skyd. Pass along the original Axios error. diff --git a/src/revision_cache.ts b/src/revision_cache.ts new file mode 100644 index 00000000..6971889c --- /dev/null +++ b/src/revision_cache.ts @@ -0,0 +1,93 @@ +import { Mutex, tryAcquire } from "async-mutex"; + +/** + * An abstraction over the client's revision number cache. Provides a cache, + * keyed by public key and data key and protected by a mutex to guard against + * concurrent access to the cache. Each cache entry also has its own mutex, to + * protect against concurrent access to that entry. + */ +export class RevisionNumberCache { + private mutex: Mutex; + private cache: { [key: string]: CachedRevisionNumber }; + + constructor() { + this.mutex = new Mutex(); + this.cache = {}; + } + + /** + * Gets the revision cache key for the given public key and data key. + * + * @param publicKey - The given public key. + * @param dataKey - The given data key. + * @returns - The revision cache key. + */ + static getCacheKey(publicKey: string, dataKey: string): string { + return `${publicKey}/${dataKey}`; + } + + /** + * Gets an object containing the cached revision and the mutex for the entry. + * The revision and mutex will be initialized if the entry is not yet cached. + * + * @param publicKey - The given public key. + * @param dataKey - The given data key. + * @returns - The cached revision entry object. + */ + async getRevisionAndMutexForEntry(publicKey: string, dataKey: string): Promise { + const cacheKey = RevisionNumberCache.getCacheKey(publicKey, dataKey); + + // Block until the mutex is available for the cache. + return await this.mutex.runExclusive(async () => { + if (!this.cache[cacheKey]) { + this.cache[cacheKey] = new CachedRevisionNumber(); + } + return this.cache[cacheKey]; + }); + } + + /** + * Calls `exclusiveFn` with exclusive access to the given cached entry. The + * revision number of the entry can be safely updated in `exclusiveFn`. + * + * @param publicKey - The given public key. + * @param dataKey - The given data key. + * @param exclusiveFn - A function to call with exclusive access to the given cached entry. + * @returns - A promise containing the result of calling `exclusiveFn`. + */ + async withCachedEntryLock( + publicKey: string, + dataKey: string, + exclusiveFn: (cachedRevisionEntry: CachedRevisionNumber) => Promise + ): Promise { + // Safely get or create mutex for the requested entry. + const cachedRevisionEntry = await this.getRevisionAndMutexForEntry(publicKey, dataKey); + + try { + return await tryAcquire(cachedRevisionEntry.mutex).runExclusive(async () => exclusiveFn(cachedRevisionEntry)); + } catch (e) { + // Change mutex error to be more descriptive and user-friendly. + if ((e as Error).message.includes("mutex already locked")) { + throw new Error( + `Concurrent access prevented in SkyDB for entry { publicKey: ${publicKey}, dataKey: ${dataKey} }` + ); + } else { + throw e; + } + } + } +} + +/** + * An object containing a cached revision and a corresponding mutex. The + * revision can be internally updated and it will reflect in the client's cache. + */ +export class CachedRevisionNumber { + mutex: Mutex; + revision: bigint; + + constructor() { + this.mutex = new Mutex(); + this.revision = BigInt(-1); + } +} diff --git a/src/skydb.test.ts b/src/skydb.test.ts index 9e257702..851d5ca0 100644 --- a/src/skydb.test.ts +++ b/src/skydb.test.ts @@ -3,11 +3,15 @@ import MockAdapter from "axios-mock-adapter"; import { getSkylinkUrlForPortal } from "./download"; import { MAX_REVISION } from "./utils/number"; +import { stringToUint8ArrayUtf8, toHexString } from "./utils/string"; import { DEFAULT_SKYNET_PORTAL_URL, URI_SKYNET_PREFIX } from "./utils/url"; import { SkynetClient } from "./index"; -import { getEntryUrlForPortal, REGEX_REVISION_NO_QUOTES } from "./registry"; -import { checkCachedDataLink, DELETION_ENTRY_DATA } from "./skydb"; +import { getEntryUrlForPortal } from "./registry"; +import { checkCachedDataLink, DELETION_ENTRY_DATA, JSONResponse } from "./skydb"; import { MAX_ENTRY_LENGTH } from "./mysky"; +import { decodeSkylink } from "./skylink/sia"; +import { getSettledValues } from "../utils/testing"; +import { JsonData } from "./utils/types"; // Generated with genKeyPairFromSeed("insecure test seed") const [publicKey, privateKey] = [ @@ -25,14 +29,15 @@ const bitfield = 2048; const portalUrl = DEFAULT_SKYNET_PORTAL_URL; const client = new SkynetClient(portalUrl); -const registryUrl = `${portalUrl}/skynet/registry`; -const registryLookupUrl = getEntryUrlForPortal(portalUrl, publicKey, dataKey); +const registryPostUrl = `${portalUrl}/skynet/registry`; +const registryGetUrl = getEntryUrlForPortal(portalUrl, publicKey, dataKey); const uploadUrl = `${portalUrl}/skynet/skyfile`; const skylinkUrl = getSkylinkUrlForPortal(portalUrl, skylink); // Hex-encoded skylink. const data = "43414241425f31447430464a73787173755f4a34546f644e4362434776744666315579735f3345677a4f6c546367"; const revision = 11; +// Entry data for the data and revision. const entryData = { data, revision, @@ -40,6 +45,12 @@ const entryData = { "33d14d2889cb292142614da0e0ff13a205c4867961276001471d13b779fc9032568ddd292d9e0dff69d7b1f28be07972cc9d86da3cecf3adecb6f9b7311af809", }; +const headers = { + "skynet-portal-api": portalUrl, + "skynet-skylink": skylink, + "content-type": "application/json", +}; + describe("getJSON", () => { let mock: MockAdapter; @@ -49,15 +60,10 @@ describe("getJSON", () => { mock.resetHistory(); }); - const headers = { - "skynet-portal-api": portalUrl, - "skynet-skylink": skylink, - "content-type": "application/json", - }; - it("should perform a lookup and skylink GET", async () => { - // mock a successful registry lookup - mock.onGet(registryLookupUrl).replyOnce(200, JSON.stringify(entryData)); + // Mock a successful registry lookup. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); + // Mock a successful data download. mock.onGet(skylinkUrl).replyOnce(200, fullJsonData, headers); const { data, dataLink } = await client.db.getJSON(publicKey, dataKey); @@ -66,9 +72,56 @@ describe("getJSON", () => { expect(mock.history.get.length).toBe(2); }); + it("should fail properly with a too low error", async () => { + // Use a custom data key for this test to get a fresh cache. + const dataKey = "testTooLowError"; + const registryGetUrl = getEntryUrlForPortal(portalUrl, publicKey, dataKey); + const skylinkData = toHexString(decodeSkylink(skylink)); + const entryData = { + data: skylinkData, + revision: 1, + signature: + "18d2b5f64042db39c4c591c21bd93015f7839eefab487ef8e27086cdb95b190732211b9a23d38c33f4f9a4e5219de55a80f75ff7e437713732ecdb4ccddb0804", + }; + const entryDataTooLow = { + data: skylinkData, + revision: 0, + signature: + "4d7b26923f4211794eaf5c13230e62618ea3bebcb3fa6511ec8772b1f1e1a675b5244e7c33f89daf31999aeabe46c3a1e324a04d2f35c6ba902c75d35ceba00d", + }; + + // Cache the revision. + + // Mock a successful registry lookup. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); + // Mock a successful data download. + mock.onGet(skylinkUrl).replyOnce(200, fullJsonData, headers); + + const { data } = await client.db.getJSON(publicKey, dataKey); + expect(data).toEqual(jsonData); + + // The cache should contain revision 1. + const cachedRevisionEntry = await client.db.revisionNumberCache.getRevisionAndMutexForEntry(publicKey, dataKey); + expect(cachedRevisionEntry.revision.toString()).toEqual("1"); + + // Return a revision that's too low. + + // Mock a successful registry lookup. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataTooLow)); + // Mock a successful data download. + mock.onGet(skylinkUrl).replyOnce(200, fullJsonData, headers); + + await expect(client.db.getJSON(publicKey, dataKey)).rejects.toThrowError( + "Returned revision number too low. A higher revision number for this userID and path is already cached" + ); + + // The cache should still contain revision 1. + expect(cachedRevisionEntry.revision.toString()).toEqual("1"); + }); + it("should perform a lookup but not a skylink GET if the cachedDataLink is a hit", async () => { // mock a successful registry lookup - mock.onGet(registryLookupUrl).replyOnce(200, JSON.stringify(entryData)); + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); const { data, dataLink } = await client.db.getJSON(publicKey, dataKey, { cachedDataLink: skylink }); expect(data).toBeNull(); @@ -80,7 +133,7 @@ describe("getJSON", () => { const skylinkNoHit = "XABvi7JtJbQSMAcDwnUnmp2FKDPjg8_tTTFP4BwMSxVdEg"; // mock a successful registry lookup - mock.onGet(registryLookupUrl).replyOnce(200, JSON.stringify(entryData)); + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); mock.onGet(skylinkUrl).replyOnce(200, fullJsonData, headers); const { data, dataLink } = await client.db.getJSON(publicKey, dataKey, { cachedDataLink: skylinkNoHit }); @@ -91,7 +144,7 @@ describe("getJSON", () => { it("should throw if the cachedDataLink is not a valid skylink", async () => { // mock a successful registry lookup - mock.onGet(registryLookupUrl).replyOnce(200, JSON.stringify(entryData)); + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); mock.onGet(skylinkUrl).replyOnce(200, fullJsonData, {}); await expect(client.db.getJSON(publicKey, dataKey, { cachedDataLink: "asdf" })).rejects.toThrowError( @@ -101,7 +154,7 @@ describe("getJSON", () => { it("should perform a lookup and skylink GET on legacy pre-v4 data", async () => { // mock a successful registry lookup - mock.onGet(registryLookupUrl).replyOnce(200, JSON.stringify(entryData)); + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); mock.onGet(skylinkUrl).replyOnce(200, legacyJsonData, headers); const jsonReturned = await client.db.getJSON(publicKey, dataKey); @@ -110,7 +163,7 @@ describe("getJSON", () => { }); it("should return null if no entry is found", async () => { - mock.onGet(registryLookupUrl).reply(404); + mock.onGet(registryGetUrl).replyOnce(404); const { data, dataLink } = await client.db.getJSON(publicKey, dataKey); expect(data).toBeNull(); @@ -119,8 +172,8 @@ describe("getJSON", () => { it("should throw if the returned file data is not JSON", async () => { // mock a successful registry lookup - mock.onGet(registryLookupUrl).reply(200, JSON.stringify(entryData)); - mock.onGet(skylinkUrl).reply(200, "thisistext", { ...headers, "content-type": "text/plain" }); + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); + mock.onGet(skylinkUrl).replyOnce(200, "thisistext", { ...headers, "content-type": "text/plain" }); await expect(client.db.getJSON(publicKey, dataKey)).rejects.toThrowError( `File data for the entry at data key '${dataKey}' is not JSON.` @@ -129,8 +182,8 @@ describe("getJSON", () => { it("should throw if the returned _data field in the file data is not JSON", async () => { // mock a successful registry lookup - mock.onGet(registryLookupUrl).reply(200, JSON.stringify(entryData)); - mock.onGet(skylinkUrl).reply(200, { _data: "thisistext", _v: 1 }, headers); + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); + mock.onGet(skylinkUrl).replyOnce(200, { _data: "thisistext", _v: 1 }, headers); await expect(client.db.getJSON(publicKey, dataKey)).rejects.toThrowError( "File data '_data' for the entry at data key 'app' is not JSON." @@ -160,11 +213,8 @@ describe("setJSON", () => { }); it("should perform an upload, lookup and registry update", async () => { - // mock a successful registry lookup - mock.onGet(registryLookupUrl).replyOnce(200, JSON.stringify(entryData)); - // mock a successful registry update - mock.onPost(registryUrl).replyOnce(204); + mock.onPost(registryPostUrl).replyOnce(204); // set data const { data: returnedData, dataLink: returnedSkylink } = await client.db.setJSON(privateKey, dataKey, jsonData); @@ -172,25 +222,23 @@ describe("setJSON", () => { expect(returnedSkylink).toEqual(sialink); // assert our request history contains the expected amount of requests - expect(mock.history.get.length).toBe(1); + expect(mock.history.get.length).toBe(0); expect(mock.history.post.length).toBe(2); const data = JSON.parse(mock.history.post[1].data); expect(data).toBeDefined(); - expect(data.revision).toEqual(revision + 1); + expect(data.revision).toBeGreaterThanOrEqual(revision + 1); }); - it("should use a revision number of 0 if the lookup failed", async () => { - mock.onGet(registryLookupUrl).reply(404); - + it("should use a revision number of 0 if the entry is not cached", async () => { // mock a successful registry update - mock.onPost(registryUrl).reply(204); + mock.onPost(registryPostUrl).replyOnce(204); // call `setJSON` on the client - await client.db.setJSON(privateKey, dataKey, jsonData); + await client.db.setJSON(privateKey, "inexistent entry", jsonData); // assert our request history contains the expected amount of requests - expect(mock.history.get.length).toBe(1); + expect(mock.history.get.length).toBe(0); expect(mock.history.post.length).toBe(2); const data = JSON.parse(mock.history.post[1].data); @@ -199,20 +247,12 @@ describe("setJSON", () => { }); it("should fail if the entry has the maximum allowed revision", async () => { - // mock a successful registry lookup - const entryData = { - data, - // String the bigint since JS doesn't support 64-bit numbers. - revision: MAX_REVISION.toString(), - signature: - "18c76e88141c7cc76d8a77abcd91b5d64d8fc3833eae407ab8a5339e5fcf7940e3fa5830a8ad9439a0c0cc72236ed7b096ae05772f81eee120cbd173bfd6600e", - }; - // Replace the quotes around the stringed bigint. - const json = JSON.stringify(entryData).replace(REGEX_REVISION_NO_QUOTES, '"revision":"$1"'); - mock.onGet(registryLookupUrl).reply(200, json); + const dataKey = "maximum revision"; + const cachedRevisionEntry = await client.db.revisionNumberCache.getRevisionAndMutexForEntry(publicKey, dataKey); + cachedRevisionEntry.revision = MAX_REVISION; // mock a successful registry update - mock.onPost(registryUrl).reply(204); + mock.onPost(registryPostUrl).replyOnce(204); // Try to set data, should fail. await expect(client.db.setJSON(privateKey, dataKey, entryData)).rejects.toThrowError( @@ -239,6 +279,30 @@ describe("setJSON", () => { "Expected parameter 'json' to be type 'object', was type 'undefined'" ); }); + + it("Should not update the cached revision if the registry update fails.", async () => { + const dataKey = "registry failure"; + const json = { foo: "bar" }; + + // mock a successful registry update + mock.onPost(registryPostUrl).replyOnce(204); + + await client.db.setJSON(privateKey, dataKey, json); + + const cachedRevisionEntry = await client.db.revisionNumberCache.getRevisionAndMutexForEntry(publicKey, dataKey); + const revision1 = cachedRevisionEntry.revision; + + // mock a failed registry update + mock.onPost(registryPostUrl).replyOnce(400, JSON.stringify({ message: "foo" })); + + await expect(client.db.setJSON(privateKey, dataKey, json)).rejects.toEqual( + new Error("Request failed with status code 400: foo") + ); + + const revision2 = cachedRevisionEntry.revision; + + expect(revision1.toString()).toEqual(revision2.toString()); + }); }); describe("setEntryData", () => { @@ -276,3 +340,309 @@ describe("checkCachedDataLink", () => { ); }); }); + +// REGRESSION TESTS: By creating a gap between setJSON and getJSON, a user +// could call getJSON, get outdated data, then call setJSON, and overwrite +// more up to date data with outdated data, but still use a high enough +// revision number. +// +// The fix is that you cannot retrieve the revision number while calling +// setJSON. You have to use the same revision number that you had when you +// called getJSON. +describe("getJSON/setJSON data race regression unit tests", () => { + let mock: MockAdapter; + + beforeEach(() => { + // Add a delay to responses to simulate actual calls that use the network. + mock = new MockAdapter(axios, { delayResponse: 100 }); + mock.reset(); + mock.onHead(portalUrl).replyOnce(200, {}, { "skynet-portal-api": portalUrl }); + }); + + const skylinkOld = "XABvi7JtJbQSMAcDwnUnmp2FKDPjg8_tTTFP4BwMSxVdEg"; + const skylinkOldUrl = getSkylinkUrlForPortal(portalUrl, skylinkOld); + const dataOld = toHexString(stringToUint8ArrayUtf8(skylinkOld)); // hex-encoded skylink + const revisionOld = 0; + const entryDataOld = { + data: dataOld, + revision: revisionOld, + signature: + "921d30e860d51f13d1065ea221b29fc8d11cfe7fa0e32b5d5b8e13bee6f91cfa86fe6b12ca4cef7a90ba52d2c50efb62b241f383e9d7bb264558280e564faa0f", + }; + const headersOld = { ...headers, "skynet-skylink": skylinkOld }; + + const skylinkNew = skylink; + const skylinkNewUrl = skylinkUrl; + const dataNew = data; // hex-encoded skylink + const revisionNew = 1; + const entryDataNew = { + data: dataNew, + revision: revisionNew, + signature: + "2a9889915f06d414e8cde51eb17db565410d20b2b50214e8297f7f4a0cb5c77e0edc62a319607dfaa042e0cc16ed0d7e549cca2abd11c2f86a335009936f150d", + }; + const headersNew = { ...headers, "skynet-skylink": skylinkNew }; + + const jsonOld = { message: 1 }; + const jsonNew = { message: 2 }; + const skynetJsonOld = { _data: jsonOld, _v: 2 }; + const skynetJsonNew = { _data: jsonNew, _v: 2 }; + + const concurrentAccessError = "Concurrent access prevented in SkyDB"; + const higherRevisionError = "A higher revision number for this userID and path is already cached"; + + it("should not get old data when getJSON and setJSON are called simultaneously on the same client and getJSON doesn't fail", async () => { + // Create a new client with a fresh revision cache. + const client = new SkynetClient(portalUrl); + + // Mock setJSON with the old skylink. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkOld, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + + // Set the data. + await client.db.setJSON(privateKey, dataKey, jsonOld); + + // Mock getJSON with the new entry data and the new skylink. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataNew)); + mock.onGet(skylinkNewUrl).replyOnce(200, skynetJsonNew, headers); + + // Mock setJSON with the new skylink. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkNew, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + + // Try to invoke the data race. + // Get the data while also calling setJSON. + // + // Use Promise.allSettled to wait for all promises to finish, or some mocked + // requests will hang around and interfere with the later tests. + const settledResults = await Promise.allSettled([ + client.db.getJSON(publicKey, dataKey), + client.db.setJSON(privateKey, dataKey, jsonNew), + ]); + + let data: JsonData | null; + try { + const values = getSettledValues(settledResults); + data = values[0].data; + } catch (e) { + // If any promises were rejected, check the error message. + if ((e as Error).message.includes(concurrentAccessError)) { + // The data race condition was avoided and we received the expected + // error. Return from test early. + return; + } + + throw e; + } + + // Data race did not occur, getJSON should have latest JSON. + expect(data).toEqual(jsonNew); + + // assert our request history contains the expected amount of requests + expect(mock.history.get.length).toBe(2); + expect(mock.history.post.length).toBe(4); + }); + + it("should not get old data when getJSON and setJSON are called simultaneously on different clients and getJSON doesn't fail", async () => { + // Create two new clients with a fresh revision cache. + const client1 = new SkynetClient(portalUrl); + const client2 = new SkynetClient(portalUrl); + + // Mock setJSON with the old skylink. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkOld, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + + // Set the data. + await client1.db.setJSON(privateKey, dataKey, jsonOld); + + // Mock getJSON with the new entry data and the new skylink. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataNew)); + mock.onGet(skylinkNewUrl).replyOnce(200, skynetJsonNew, headersNew); + + // Mock setJSON with the new skylink. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkNew, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + + // Try to invoke the data race. + // Get the data while also calling setJSON. + // + // Use Promise.allSettled to wait for all promises to finish, or some mocked requests will hang around and interfere with the later tests. + const settledResults = await Promise.allSettled([ + client1.db.getJSON(publicKey, dataKey), + client2.db.setJSON(privateKey, dataKey, jsonNew), + ]); + + let data: JsonData | null; + try { + const values = getSettledValues(settledResults); + data = values[0].data; + } catch (e) { + // If any promises were rejected, check the error message. + if ((e as Error).message.includes(higherRevisionError)) { + // The data race condition was avoided and we received the expected + // error. Return from test early. + return; + } + + throw e; + } + + // Data race did not occur, getJSON should have latest JSON. + expect(data).toEqual(jsonNew); + + // assert our request history contains the expected amount of requests. + expect(mock.history.get.length).toBe(2); + expect(mock.history.post.length).toBe(4); + }); + + it("should not mess up cache when two setJSON calls are made simultaneously and one fails", async () => { + // Create a new client with a fresh revision cache. + const client = new SkynetClient(portalUrl); + + // Mock a successful setJSON. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkOld, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + + // Use Promise.allSettled to wait for all promises to finish, or some mocked + // requests will hang around and interfere with the later tests. + const values = await Promise.allSettled([ + client.db.setJSON(privateKey, dataKey, jsonOld), + client.db.setJSON(privateKey, dataKey, jsonOld), + ]); + + try { + getSettledValues(values); + } catch (e) { + if ((e as Error).message.includes(concurrentAccessError)) { + // The data race condition was avoided and we received the expected + // error. Return from test early. + return; + } + + throw e; + } + + const cachedRevisionEntry = await client.db.revisionNumberCache.getRevisionAndMutexForEntry(publicKey, dataKey); + expect(cachedRevisionEntry.revision.toString()).toEqual("0"); + + // Make a getJSON call. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataOld)); + mock.onGet(skylinkOldUrl).replyOnce(200, skynetJsonOld, headersOld); + const { data: receivedJson1 } = await client.db.getJSON(publicKey, dataKey); + + expect(receivedJson1).toEqual(jsonOld); + + // Make another setJSON call - it should still work. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkNew, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + await client.db.setJSON(privateKey, dataKey, jsonNew); + + expect(cachedRevisionEntry.revision.toString()).toEqual("1"); + + // Make a getJSON call. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataNew)); + mock.onGet(skylinkNewUrl).replyOnce(200, skynetJsonNew, headersNew); + const { data: receivedJson2 } = await client.db.getJSON(publicKey, dataKey); + + expect(receivedJson2).toEqual(jsonNew); + + expect(mock.history.get.length).toBe(4); + expect(mock.history.post.length).toBe(4); + }); + + it("should not mess up cache when two setJSON calls are made simultaneously on different clients and one fails", async () => { + // Create two new clients with a fresh revision cache. + const client1 = new SkynetClient(portalUrl); + const client2 = new SkynetClient(portalUrl); + + // Run two simultaneous setJSONs on two different clients - one should work, + // one should fail due to bad revision number. + + // Mock a successful setJSON. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkOld, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + // Mock a failed setJSON (bad revision number). + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkOld, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(400); + + // Use Promise.allSettled to wait for all promises to finish, or some mocked + // requests will hang around and interfere with the later tests. + const values = await Promise.allSettled([ + client1.db.setJSON(privateKey, dataKey, jsonOld), + client2.db.setJSON(privateKey, dataKey, jsonOld), + ]); + + let successClient; + let failClient; + if (values[0].status === "rejected") { + successClient = client2; + failClient = client1; + } else { + successClient = client1; + failClient = client2; + } + + // Test that the client that succeeded has a consistent cache. + + const cachedRevisionEntrySuccess = await successClient.db.revisionNumberCache.getRevisionAndMutexForEntry( + publicKey, + dataKey + ); + expect(cachedRevisionEntrySuccess.revision.toString()).toEqual("0"); + + // Make a getJSON call. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataOld)); + mock.onGet(skylinkOldUrl).replyOnce(200, skynetJsonOld, headersOld); + const { data: receivedJson1 } = await successClient.db.getJSON(publicKey, dataKey); + + expect(receivedJson1).toEqual(jsonOld); + + // Make another setJSON call - it should still work. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkNew, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + await successClient.db.setJSON(privateKey, dataKey, jsonNew); + + expect(cachedRevisionEntrySuccess.revision.toString()).toEqual("1"); + + // Make a getJSON call. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataNew)); + mock.onGet(skylinkNewUrl).replyOnce(200, skynetJsonNew, headersNew); + const { data: receivedJson2 } = await successClient.db.getJSON(publicKey, dataKey); + + expect(receivedJson2).toEqual(jsonNew); + + // Test that the client that failed has a consistent cache. + + const cachedRevisionEntryFail = await failClient.db.revisionNumberCache.getRevisionAndMutexForEntry( + publicKey, + dataKey + ); + expect(cachedRevisionEntryFail.revision.toString()).toEqual("-1"); + + // Make a getJSON call. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataOld)); + mock.onGet(skylinkOldUrl).replyOnce(200, skynetJsonOld, headersOld); + const { data: receivedJsonFail1 } = await failClient.db.getJSON(publicKey, dataKey); + + expect(receivedJsonFail1).toEqual(jsonOld); + + // Make another setJSON call - it should still work. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkNew, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + await failClient.db.setJSON(privateKey, dataKey, jsonNew); + + expect(cachedRevisionEntrySuccess.revision.toString()).toEqual("1"); + + // Make a getJSON call. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataNew)); + mock.onGet(skylinkNewUrl).replyOnce(200, skynetJsonNew, headersNew); + const { data: receivedJsonFail2 } = await failClient.db.getJSON(publicKey, dataKey); + + expect(receivedJsonFail2).toEqual(jsonNew); + + // Check final request counts. + + expect(mock.history.get.length).toBe(8); + expect(mock.history.post.length).toBe(8); + }); +}); diff --git a/src/skydb.ts b/src/skydb.ts index 596bf8d6..213b4442 100644 --- a/src/skydb.ts +++ b/src/skydb.ts @@ -7,10 +7,10 @@ import { DEFAULT_SET_ENTRY_OPTIONS, CustomGetEntryOptions, RegistryEntry, - SignedRegistryEntry, CustomSetEntryOptions, validatePublicKey, } from "./registry"; +import { CachedRevisionNumber } from "./revision_cache"; import { BASE64_ENCODED_SKYLINK_SIZE, decodeSkylink, EMPTY_SKYLINK, RAW_SKYLINK_SIZE } from "./skylink/sia"; import { MAX_REVISION } from "./utils/number"; import { URI_SKYNET_PREFIX } from "./utils/url"; @@ -22,7 +22,7 @@ import { uint8ArrayToStringUtf8, } from "./utils/string"; import { formatSkylink } from "./skylink/format"; -import { DEFAULT_UPLOAD_OPTIONS, CustomUploadOptions, UploadRequestResponse } from "./upload"; +import { DEFAULT_UPLOAD_OPTIONS, CustomUploadOptions } from "./upload"; import { areEqualUint8Arrays } from "./utils/array"; import { decodeSkylinkBase64, encodeSkylinkBase64 } from "./utils/encoding"; import { DEFAULT_BASE_OPTIONS, extractOptions } from "./utils/options"; @@ -40,7 +40,7 @@ import { import { ResponseType } from "axios"; import { EntryData, MAX_ENTRY_LENGTH } from "./mysky"; -export type JsonFullData = { +type SkynetJson = { _data: JsonData; _v: number; }; @@ -49,6 +49,8 @@ export const DELETION_ENTRY_DATA = new Uint8Array(RAW_SKYLINK_SIZE); const JSON_RESPONSE_VERSION = 2; +const UNCACHED_REVISION_NUMBER = BigInt(-1); + /** * Custom get JSON options. Includes the options for get entry, to get the * skylink; and download, to download the file from the skylink. @@ -121,7 +123,21 @@ export type RawBytesResponse = { // ==== /** - * Gets the JSON object corresponding to the publicKey and dataKey. + * Gets the JSON object corresponding to the publicKey and dataKey. If the data + * was found, we update the cached revision number for the entry. + * + * NOTE: The cached revision number will be updated only if the data is found + * (including deleted data). If there is a 404 or the entry contains deleted + * data, null will be returned. If there is an error, the error is returned + * without updating the cached revision number. + * + * Summary: + * - Data found: update cached revision + * - Parse error: don't update cached revision + * - Network error: don't update cached revision + * - Too low version error: don't update the cached revision + * - 404 (data not found): don't update the cached revision + * - Data deleted: update cached revision * * @param this - SkynetClient * @param publicKey - The user public key. @@ -146,44 +162,59 @@ export async function getJSON( ...customOptions, }; - // Lookup the registry entry. - const getEntryOpts = extractOptions(opts, DEFAULT_GET_ENTRY_OPTIONS); - const entry = await getSkyDBRegistryEntry(this, publicKey, dataKey, getEntryOpts); - if (entry === null) { - return { data: null, dataLink: null }; - } - - // Determine the data link. - // TODO: Can this still be an entry link which hasn't yet resolved to a data link? - const { rawDataLink, dataLink } = parseDataLink(entry.data, true); - - // If a cached data link is provided and the data link hasn't changed, return. - if (checkCachedDataLink(rawDataLink, opts.cachedDataLink)) { - return { data: null, dataLink }; - } - - // Download the data in the returned data link. - const downloadOpts = extractOptions(opts, DEFAULT_DOWNLOAD_OPTIONS); - const { data } = await this.getFileContent(dataLink, downloadOpts); - - if (typeof data !== "object" || data === null) { - throw new Error(`File data for the entry at data key '${dataKey}' is not JSON.`); - } - - if (!(data["_data"] && data["_v"])) { - // Legacy data prior to skynet-js v4, return as-is. - return { data, dataLink }; - } - - const actualData = data["_data"]; - if (typeof actualData !== "object" || data === null) { - throw new Error(`File data '_data' for the entry at data key '${dataKey}' is not JSON.`); - } - return { data: actualData as JsonData, dataLink }; + // Immediately fail if the mutex is not available. + return await this.db.revisionNumberCache.withCachedEntryLock(publicKey, dataKey, async (cachedRevisionEntry) => { + // Lookup the registry entry. + const getEntryOpts = extractOptions(opts, DEFAULT_GET_ENTRY_OPTIONS); + const entry: RegistryEntry | null = await getSkyDBRegistryEntryAndUpdateCache( + this, + publicKey, + dataKey, + cachedRevisionEntry, + getEntryOpts + ); + if (entry === null) { + return { data: null, dataLink: null }; + } + + // Determine the data link. + // TODO: Can this still be an entry link which hasn't yet resolved to a data link? + const { rawDataLink, dataLink } = parseDataLink(entry.data, true); + + // If a cached data link is provided and the data link hasn't changed, return. + if (checkCachedDataLink(rawDataLink, opts.cachedDataLink)) { + return { data: null, dataLink }; + } + + // Download the data in the returned data link. + const downloadOpts = extractOptions(opts, DEFAULT_DOWNLOAD_OPTIONS); + const { data }: { data: JsonData | SkynetJson } = await this.getFileContent(dataLink, downloadOpts); + + // Validate that the returned data is JSON. + if (typeof data !== "object" || data === null) { + throw new Error(`File data for the entry at data key '${dataKey}' is not JSON.`); + } + + if (!(data["_data"] && data["_v"])) { + // Legacy data prior to skynet-js v4, return as-is. + return { data, dataLink }; + } + + // Extract the JSON from the returned SkynetJson. + const actualData = data["_data"]; + if (typeof actualData !== "object" || data === null) { + throw new Error(`File data '_data' for the entry at data key '${dataKey}' is not JSON.`); + } + return { data: actualData as JsonData, dataLink }; + }); } /** - * Sets a JSON object at the registry entry corresponding to the publicKey and dataKey. + * Sets a JSON object at the registry entry corresponding to the publicKey and + * dataKey. + * + * This will use the entry revision number from the cache, so getJSON must + * always be called first for existing entries. * * @param this - SkynetClient * @param privateKey - The user private key. @@ -212,18 +243,32 @@ export async function setJSON( }; const { publicKey: publicKeyArray } = sign.keyPair.fromSecretKey(hexToUint8Array(privateKey)); + const publicKey = toHexString(publicKeyArray); + + // Immediately fail if the mutex is not available. + return await this.db.revisionNumberCache.withCachedEntryLock(publicKey, dataKey, async (cachedRevisionEntry) => { + // Get the cached revision number before doing anything else. Increment it. + const newRevision = incrementRevision(cachedRevisionEntry.revision); + + const [entry, dataLink] = await getOrCreateSkyDBRegistryEntry(this, dataKey, json, newRevision, opts); - const [entry, dataLink] = await getOrCreateRegistryEntry(this, toHexString(publicKeyArray), dataKey, json, opts); + // Update the registry. + const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); + await this.registry.setEntry(privateKey, entry, setEntryOpts); - // Update the registry. - const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); - await this.registry.setEntry(privateKey, entry, setEntryOpts); + // Update the cached revision number. + cachedRevisionEntry.revision = newRevision; - return { data: json, dataLink: formatSkylink(dataLink) }; + return { data: json, dataLink: formatSkylink(dataLink) }; + }); } /** - * Deletes a JSON object at the registry entry corresponding to the publicKey and dataKey. + * Deletes a JSON object at the registry entry corresponding to the publicKey + * and dataKey. + * + * This will use the entry revision number from the cache, so getJSON must + * always be called first. * * @param this - SkynetClient * @param privateKey - The user private key. @@ -280,6 +325,9 @@ export async function setDataLink( /** * Gets the raw registry entry data at the given public key and data key. * + * If the data was found, we update the cached revision number for the entry. + * See getJSON for behavior in other cases. + * * @param this - SkynetClient * @param publicKey - The user public key. * @param dataKey - The data key. @@ -302,16 +350,22 @@ export async function getEntryData( ...customOptions, }; - const entry = await getSkyDBRegistryEntry(this, publicKey, dataKey, opts); - if (entry === null) { - return { data: null }; - } - return { data: entry.data }; + // Immediately fail if the mutex is not available. + return await this.db.revisionNumberCache.withCachedEntryLock(publicKey, dataKey, async (cachedRevisionEntry) => { + const entry = await getSkyDBRegistryEntryAndUpdateCache(this, publicKey, dataKey, cachedRevisionEntry, opts); + if (entry === null) { + return { data: null }; + } + return { data: entry.data }; + }); } /** * Sets the raw entry data at the given private key and data key. * + * This will use the entry revision number from the cache, so getEntryData must + * always be called first for existing entries. + * * @param this - SkynetClient * @param privateKey - The user private key. * @param dataKey - The data key. @@ -341,20 +395,32 @@ export async function setEntryData( validateEntryData(data, opts.allowDeletionEntryData); const { publicKey: publicKeyArray } = sign.keyPair.fromSecretKey(hexToUint8Array(privateKey)); + const publicKey = toHexString(publicKeyArray); + + // Immediately fail if the mutex is not available. + return await this.db.revisionNumberCache.withCachedEntryLock(publicKey, dataKey, async (cachedRevisionEntry) => { + // Get the cached revision number. + const newRevision = incrementRevision(cachedRevisionEntry.revision); - const getEntryOpts = extractOptions(opts, DEFAULT_GET_ENTRY_OPTIONS); - const entry = await getNextRegistryEntry(this, toHexString(publicKeyArray), dataKey, data, getEntryOpts); + const entry = { dataKey, data, revision: newRevision }; - const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); - await this.registry.setEntry(privateKey, entry, setEntryOpts); + const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); + await this.registry.setEntry(privateKey, entry, setEntryOpts); - return { data: entry.data }; + // Update the cached revision number. + cachedRevisionEntry.revision = newRevision; + + return { data: entry.data }; + }); } /** * Deletes the entry data at the given private key and data key. Trying to * access the data again with e.g. getEntryData will result in null. * + * This will use the entry revision number from the cache, so getEntryData must + * always be called first. + * * @param this - SkynetClient * @param privateKey - The user private key. * @param dataKey - The data key. @@ -382,6 +448,9 @@ export async function deleteEntryData( /** * Gets the raw bytes corresponding to the publicKey and dataKey. The caller is responsible for setting any metadata in the bytes. * + * If the data was found, we update the cached revision number for the entry. + * See getJSON for behavior in other cases. + * * @param this - SkynetClient * @param publicKey - The user public key. * @param dataKey - The key of the data to fetch for the given user. @@ -406,92 +475,39 @@ export async function getRawBytes( ...customOptions, }; - // Lookup the registry entry. - const getEntryOpts = extractOptions(opts, DEFAULT_GET_ENTRY_OPTIONS); - const entry = await getSkyDBRegistryEntry(this, publicKey, dataKey, getEntryOpts); - if (entry === null) { - return { data: null, dataLink: null }; - } - - // Determine the data link. - // TODO: Can this still be an entry link which hasn't yet resolved to a data link? - const { rawDataLink, dataLink } = parseDataLink(entry.data, false); - - // If a cached data link is provided and the data link hasn't changed, return. - if (checkCachedDataLink(rawDataLink, opts.cachedDataLink)) { - return { data: null, dataLink }; - } - - // Download the data in the returned data link. - const downloadOpts = { - ...extractOptions(opts, DEFAULT_DOWNLOAD_OPTIONS), - responseType: "arraybuffer" as ResponseType, - }; - const { data: buffer } = await this.getFileContent(dataLink, downloadOpts); - - return { data: new Uint8Array(buffer), dataLink }; -} - -/* istanbul ignore next */ -/** - * Gets the registry entry for the given raw bytes or creates the entry if it doesn't exist. - * - * @param client - The Skynet client. - * @param publicKey - The user public key. - * @param dataKey - The dat akey. - * @param data - The raw byte data to set. - * @param [customOptions] - Additional settings that can optionally be set. - * @returns - The registry entry and corresponding data link. - * @throws - Will throw if the revision is already the maximum value. - */ -// TODO: Rename & refactor after the SkyDB caching refactor. -export async function getOrCreateRawBytesRegistryEntry( - client: SkynetClient, - publicKey: string, - dataKey: string, - data: Uint8Array, - customOptions?: CustomSetJSONOptions -): Promise { - // Not publicly available, don't validate input. - - const opts = { - ...DEFAULT_SET_JSON_OPTIONS, - ...client.customOptions, - ...customOptions, - }; - - // Create the data to upload to acquire its skylink. - let dataKeyHex = dataKey; - if (!opts.hashedDataKeyHex) { - dataKeyHex = toHexString(stringToUint8ArrayUtf8(dataKey)); - } - const file = new File([data], `dk:${dataKeyHex}`, { type: "application/octet-stream" }); - - // Start file upload, do not block. - const uploadOpts = extractOptions(opts, DEFAULT_UPLOAD_OPTIONS); - const skyfilePromise: Promise = client.uploadFile(file, uploadOpts); - - // Fetch the current value to find out the revision. - // - // Start getEntry, do not block. - const getEntryOpts = extractOptions(opts, DEFAULT_GET_ENTRY_OPTIONS); - const entryPromise: Promise = client.registry.getEntry(publicKey, dataKey, getEntryOpts); - - // Block until both getEntry and uploadFile are finished. - const [signedEntry, skyfile] = await Promise.all([entryPromise, skyfilePromise]); - - const revision = getNextRevisionFromEntry(signedEntry.entry); - - // Build the registry entry. - const dataLink = trimUriPrefix(skyfile.skylink, URI_SKYNET_PREFIX); - const rawDataLink = decodeSkylinkBase64(dataLink); - validateUint8ArrayLen("rawDataLink", rawDataLink, "skylink byte array", RAW_SKYLINK_SIZE); - const entry: RegistryEntry = { - dataKey, - data: rawDataLink, - revision, - }; - return entry; + // Immediately fail if the mutex is not available. + return await this.db.revisionNumberCache.withCachedEntryLock(publicKey, dataKey, async (cachedRevisionEntry) => { + // Lookup the registry entry. + const getEntryOpts = extractOptions(opts, DEFAULT_GET_ENTRY_OPTIONS); + const entry = await getSkyDBRegistryEntryAndUpdateCache( + this, + publicKey, + dataKey, + cachedRevisionEntry, + getEntryOpts + ); + if (entry === null) { + return { data: null, dataLink: null }; + } + + // Determine the data link. + // TODO: Can this still be an entry link which hasn't yet resolved to a data link? + const { rawDataLink, dataLink } = parseDataLink(entry.data, false); + + // If a cached data link is provided and the data link hasn't changed, return. + if (checkCachedDataLink(rawDataLink, opts.cachedDataLink)) { + return { data: null, dataLink }; + } + + // Download the data in the returned data link. + const downloadOpts = { + ...extractOptions(opts, DEFAULT_DOWNLOAD_OPTIONS), + responseType: "arraybuffer" as ResponseType, + }; + const { data: buffer } = await this.getFileContent(dataLink, downloadOpts); + + return { data: new Uint8Array(buffer), dataLink }; + }); } // ======= @@ -499,62 +515,23 @@ export async function getOrCreateRawBytesRegistryEntry( // ======= /** - * Gets the next entry for the given public key and data key, setting the data to be the given data and the revision number accordingly. - * - * @param client - The Skynet client. - * @param publicKey - The user public key. - * @param dataKey - The dat akey. - * @param data - The data to set. - * @param [customOptions] - Additional settings that can optionally be set. - * @returns - The registry entry and corresponding data link. - * @throws - Will throw if the revision is already the maximum value. - */ -export async function getNextRegistryEntry( - client: SkynetClient, - publicKey: string, - dataKey: string, - data: Uint8Array, - customOptions?: CustomGetEntryOptions -): Promise { - // Not publicly available, don't validate input. - - const opts = { - ...DEFAULT_GET_ENTRY_OPTIONS, - ...client.customOptions, - ...customOptions, - }; - - // Get the latest entry. - // TODO: Can remove this once we start caching the latest revision. - const signedEntry = await client.registry.getEntry(publicKey, dataKey, opts); - const revision = getNextRevisionFromEntry(signedEntry.entry); - - // Build the registry entry. - const entry: RegistryEntry = { - dataKey, - data, - revision, - }; - - return entry; -} - -/** - * Gets the registry entry and data link or creates the entry if it doesn't exist. + * Gets the registry entry and data link or creates the entry if it doesn't + * exist. Uses the cached revision number for the entry, or 0 if the entry has + * not been cached. * * @param client - The Skynet client. - * @param publicKey - The user public key. - * @param dataKey - The dat akey. - * @param json - The JSON to set. + * @param dataKey - The data key. + * @param data - The JSON or raw byte data to set. + * @param revision - The revision number to set. * @param [customOptions] - Additional settings that can optionally be set. * @returns - The registry entry and corresponding data link. * @throws - Will throw if the revision is already the maximum value. */ -export async function getOrCreateRegistryEntry( +export async function getOrCreateSkyDBRegistryEntry( client: SkynetClient, - publicKey: string, dataKey: string, - json: JsonData, + data: JsonData | Uint8Array, + revision: bigint, customOptions?: CustomSetJSONOptions ): Promise<[RegistryEntry, string]> { // Not publicly available, don't validate input. @@ -565,57 +542,49 @@ export async function getOrCreateRegistryEntry( ...customOptions, }; - // Set the hidden _data and _v fields. - const fullData: JsonFullData = { _data: json, _v: JSON_RESPONSE_VERSION }; + let fullData: string | Uint8Array; + if (!(data instanceof Uint8Array)) { + // Set the hidden _data and _v fields. + const skynetJson = buildSkynetJsonObject(data); + fullData = JSON.stringify(skynetJson); + } else { + /* istanbul ignore next - This case is only called by setJSONEncrypted which is not tested in this repo */ + fullData = data; + } // Create the data to upload to acquire its skylink. let dataKeyHex = dataKey; if (!opts.hashedDataKeyHex) { dataKeyHex = toHexString(stringToUint8ArrayUtf8(dataKey)); } - const file = new File([JSON.stringify(fullData)], `dk:${dataKeyHex}`, { type: "application/json" }); + const file = new File([fullData], `dk:${dataKeyHex}`, { type: "application/json" }); - // Start file upload, do not block. + // Do file upload. const uploadOpts = extractOptions(opts, DEFAULT_UPLOAD_OPTIONS); - const skyfilePromise: Promise = client.uploadFile(file, uploadOpts); - - // Fetch the current value to find out the revision. - // - // Start getEntry, do not block. - const getEntryOpts = extractOptions(opts, DEFAULT_GET_ENTRY_OPTIONS); - const entryPromise: Promise = client.registry.getEntry(publicKey, dataKey, getEntryOpts); - - // Block until both getEntry and uploadFile are finished. - const [signedEntry, skyfile] = await Promise.all([entryPromise, skyfilePromise]); - - const revision = getNextRevisionFromEntry(signedEntry.entry); + const skyfile = await client.uploadFile(file, uploadOpts); // Build the registry entry. const dataLink = trimUriPrefix(skyfile.skylink, URI_SKYNET_PREFIX); - const data = decodeSkylinkBase64(dataLink); - validateUint8ArrayLen("data", data, "skylink byte array", RAW_SKYLINK_SIZE); + const rawDataLink = decodeSkylinkBase64(dataLink); + validateUint8ArrayLen("rawDataLink", rawDataLink, "skylink byte array", RAW_SKYLINK_SIZE); const entry: RegistryEntry = { dataKey, - data, + data: rawDataLink, revision, }; return [entry, formatSkylink(dataLink)]; } /** - * Gets the next revision from a returned entry (or 0 if the entry was not found). + * Increments the given revision number and checks to make sure it is not + * greater than the maximum revision. * - * @param entry - The returned registry entry. - * @returns - The revision. - * @throws - Will throw if the next revision would be beyond the maximum allowed value. + * @param revision - The given revision number. + * @returns - The incremented revision number. + * @throws - Will throw if the incremented revision number is greater than the maximum revision. */ -export function getNextRevisionFromEntry(entry: RegistryEntry | null): bigint { - let revision: bigint; - if (entry === null) { - revision = BigInt(0); - } else { - revision = entry.revision + BigInt(1); - } +export function incrementRevision(revision: bigint): bigint { + revision = revision + BigInt(1); // Throw if the revision is already the maximum value. if (revision > MAX_REVISION) { @@ -668,27 +637,69 @@ export function validateEntryData(data: Uint8Array, allowDeletionEntryData: bool } /** - * Gets the registry entry, returning null if the entry contains an empty skylink (the deletion sentinel). + * Gets the registry entry, returning null if the entry was not found or if it + * contained a sentinel value indicating deletion. + * + * If the data was found, we update the cached revision number for the entry. + * See getJSON for behavior in other cases. * * @param client - The Skynet Client * @param publicKey - The user public key. * @param dataKey - The key of the data to fetch for the given user. + * @param cachedRevisionEntry - The cached revision entry object containing the revision number and the mutex. * @param opts - Additional settings. * @returns - The registry entry, or null if not found or deleted. */ -async function getSkyDBRegistryEntry( +async function getSkyDBRegistryEntryAndUpdateCache( client: SkynetClient, publicKey: string, dataKey: string, + cachedRevisionEntry: CachedRevisionNumber, opts: CustomGetEntryOptions ): Promise { + // If this throws due to a parse error or network error, exit early and do not + // update the cached revision number. const { entry } = await client.registry.getEntry(publicKey, dataKey, opts); - if (entry === null || areEqualUint8Arrays(entry.data, EMPTY_SKYLINK)) { + + // Don't update the cached revision number if the data was not found (404). Return null. + if (entry === null) { + return null; + } + + // Calculate the new revision. + const newRevision = entry?.revision ?? UNCACHED_REVISION_NUMBER + BigInt(1); + + // Don't update the cached revision number if the received version is too low. + // Throw error. + const cachedRevision = cachedRevisionEntry.revision; + if (cachedRevision && cachedRevision > newRevision) { + throw new Error( + "Returned revision number too low. A higher revision number for this userID and path is already cached" + ); + } + + // Update the cached revision. + cachedRevisionEntry.revision = newRevision; + + // Return null if the entry contained a sentinel value indicating deletion. + // We do this after updating the revision number cache. + if (wasRegistryEntryDeleted(entry)) { return null; } + return entry; } +/** + * Sets the hidden _data and _v fields on the given raw JSON data. + * + * @param data - The given JSON data. + * @returns - The Skynet JSON data. + */ +function buildSkynetJsonObject(data: JsonData): SkynetJson { + return { _data: data, _v: JSON_RESPONSE_VERSION }; +} + /** * Parses a data link out of the given registry entry data. * @@ -710,3 +721,13 @@ function parseDataLink(data: Uint8Array, legacy: boolean): { rawDataLink: string } return { rawDataLink, dataLink: formatSkylink(rawDataLink) }; } + +/** + * Returns whether the given registry entry indicates a past deletion. + * + * @param entry - The registry entry. + * @returns - Whether the registry entry data indicated a past deletion. + */ +function wasRegistryEntryDeleted(entry: RegistryEntry): boolean { + return areEqualUint8Arrays(entry.data, EMPTY_SKYLINK); +} diff --git a/src/utils/encoding.ts b/src/utils/encoding.ts index 44ee0d92..08662e25 100644 --- a/src/utils/encoding.ts +++ b/src/utils/encoding.ts @@ -3,7 +3,9 @@ import base32Encode from "base32-encode"; import { fromByteArray, toByteArray } from "base64-js"; import { assertUint64 } from "./number"; +import { BASE32_ENCODED_SKYLINK_SIZE, BASE64_ENCODED_SKYLINK_SIZE } from "../skylink/sia"; import { stringToUint8ArrayUtf8 } from "./string"; +import { validateStringLen } from "./validation"; const BASE32_ENCODING_VARIANT = "RFC4648-HEX"; @@ -14,6 +16,7 @@ const BASE32_ENCODING_VARIANT = "RFC4648-HEX"; * @returns - The decoded bytes. */ export function decodeSkylinkBase32(skylink: string): Uint8Array { + validateStringLen("skylink", skylink, "parameter", BASE32_ENCODED_SKYLINK_SIZE); skylink = skylink.toUpperCase(); const bytes = base32Decode(skylink, BASE32_ENCODING_VARIANT); return new Uint8Array(bytes); @@ -36,6 +39,7 @@ export function encodeSkylinkBase32(bytes: Uint8Array): string { * @returns - The decoded bytes. */ export function decodeSkylinkBase64(skylink: string): Uint8Array { + validateStringLen("skylink", skylink, "parameter", BASE64_ENCODED_SKYLINK_SIZE); // Add padding. skylink = `${skylink}==`; // Convert from URL encoding. diff --git a/tsconfig.json b/tsconfig.json index b5bc863b..c1be2556 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,6 +1,7 @@ { "compilerOptions": { "downlevelIteration": true, + "lib": ["dom", "es2020"], "noEmit": true, "esModuleInterop": true, "strict": true, diff --git a/utils/testing.ts b/utils/testing.ts index 3d062e4b..651133bf 100644 --- a/utils/testing.ts +++ b/utils/testing.ts @@ -73,6 +73,28 @@ export function extractNonSkylinkPath(url: string, skylink: string): string { return path; } +/** + * Gets the settled values from `Promise.allSettled`. Throws if an error is + * found. Returns all settled values if no errors were found. + * + * @param values - The settled values. + * @returns - The settled value if no errors were found. + * @throws - Will throw if an unexpected error occurred. + */ +export function getSettledValues(values: PromiseSettledResult[]): T[] { + const receivedValues = []; + + for (const value of values) { + if (value.status === "rejected") { + throw value.reason; + } else if (value.value) { + receivedValues.push(value.value); + } + } + + return receivedValues; +} + /** * Generates a random Unicode string using the code points between 0 and 65536. * diff --git a/yarn.lock b/yarn.lock index 6fa057b2..f3585ca4 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1011,6 +1011,13 @@ astral-regex@^2.0.0: resolved "https://registry.yarnpkg.com/astral-regex/-/astral-regex-2.0.0.tgz#483143c567aeed4785759c0865786dc77d7d2e31" integrity sha512-Z7tMw1ytTXt5jqMcOP+OQteU1VuNK9Y02uuJtKQ1Sv69jXQKKg5cibLwGJow8yzZP+eAc18EmLGPal0bp36rvQ== +async-mutex@^0.3.2: + version "0.3.2" + resolved "https://registry.yarnpkg.com/async-mutex/-/async-mutex-0.3.2.tgz#1485eda5bda1b0ec7c8df1ac2e815757ad1831df" + integrity sha512-HuTK7E7MT7jZEh1P9GtRW9+aTWiDWWi9InbZ5hjxrnRa39KS4BW04+xLBhYNS2aXhHUIKZSw3gj4Pn1pj+qGAA== + dependencies: + tslib "^2.3.1" + asynckit@^0.4.0: version "0.4.0" resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79" @@ -4591,6 +4598,11 @@ tslib@^1.8.1: resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00" integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg== +tslib@^2.3.1: + version "2.3.1" + resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.3.1.tgz#e8a335add5ceae51aa261d32a490158ef042ef01" + integrity sha512-77EbyPPpMz+FRFRuAFlWMtmgUWGe9UOG2Z25NqCwiIjRhOf5iKGuzSe5P2w1laq+FkRy4p+PCuVkJSGkzTEKVw== + tslib@~2.1.0: version "2.1.0" resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.1.0.tgz#da60860f1c2ecaa5703ab7d39bc05b6bf988b97a"