diff --git a/.eslintrc.json b/.eslintrc.json index 5b0cebc2..41df1eab 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -13,6 +13,7 @@ "extends": ["eslint:recommended", "plugin:jsdoc/recommended", "plugin:@typescript-eslint/recommended"], "rules": { "@typescript-eslint/no-floating-promises": 1, + "@typescript-eslint/no-unused-vars": ["warn", { "argsIgnorePattern": "^_", "varsIgnorePattern": "^_" }], "jsdoc/require-description": 1, "jsdoc/require-throws": 1, diff --git a/integration/registry.test.ts b/integration/registry.test.ts index 8f1be556..6812e9d7 100644 --- a/integration/registry.test.ts +++ b/integration/registry.test.ts @@ -65,4 +65,20 @@ describe(`Registry end to end integration tests for portal '${portal}'`, () => { expect(returnedEntry).toEqual(entry); }); + + it("Should fail to set an entry with a revision number that's too low", async () => { + const { privateKey } = genKeyPairAndSeed(); + + const entry = { + dataKey, + data: new Uint8Array(), + revision: BigInt(1), + }; + + await client.registry.setEntry(privateKey, entry); + entry.revision--; + await expect(client.registry.setEntry(privateKey, entry)).rejects.toThrowError( + "Unable to update the registry: provided revision number is invalid" + ); + }); }); diff --git a/integration/skydb.test.ts b/integration/skydb.test.ts index f1e83859..fbc04b5b 100644 --- a/integration/skydb.test.ts +++ b/integration/skydb.test.ts @@ -1,10 +1,23 @@ import { client, dataKey, portal } from "."; -import { ExecuteRequestError, genKeyPairAndSeed, getEntryLink, URI_SKYNET_PREFIX } from "../src"; +import { + ExecuteRequestError, + genKeyPairAndSeed, + getEntryLink, + JsonData, + JSONResponse, + SkynetClient, + URI_SKYNET_PREFIX, +} from "../src"; import { hashDataKey } from "../src/crypto"; import { decodeSkylinkBase64 } from "../src/utils/encoding"; import { toHexString } from "../src/utils/string"; describe(`SkyDB end to end integration tests for portal '${portal}'`, () => { + // Sleep for a second before each test to try to avoid rate limiter. + beforeEach(async () => { + await new Promise((r) => setTimeout(r, 1000)); + }); + it("Should get existing SkyDB data", async () => { const publicKey = "89e5147864297b80f5ddf29711ba8c093e724213b0dcbefbc3860cc6d598cc35"; const dataKey = "dataKey1"; @@ -241,4 +254,260 @@ describe(`SkyDB end to end integration tests for portal '${portal}'`, () => { expect(data).toEqual(json); }); + + it("Should update the revision number cache", async () => { + const { publicKey, privateKey } = genKeyPairAndSeed(); + const json = { message: 1 }; + + await client.db.setJSON(privateKey, dataKey, json); + + const cachedRevisionEntry = await client.db.revisionNumberCache.getRevisionAndMutexForEntry(publicKey, dataKey); + expect(cachedRevisionEntry.revision.toString()).toEqual("0"); + + await client.db.setJSON(privateKey, dataKey, json); + + expect(cachedRevisionEntry.revision.toString()).toEqual("1"); + + await client.db.getJSON(publicKey, dataKey); + + expect(cachedRevisionEntry.revision.toString()).toEqual("1"); + }); + + // REGRESSION TESTS: By creating a gap between setJSON and getJSON, a user + // could call getJSON, get outdated data, then call setJSON, and overwrite + // more up to date data with outdated data, but still use a high enough + // revision number. + // + // The fix is that you cannot retrieve the revision number while calling + // setJSON. You have to use the same revision number that you had when you + // called getJSON. + describe("getJSON/setJSON data race regression integration tests", () => { + const jsonOld = { message: 1 }; + const jsonNew = { message: 2 }; + + const delays = [0, 10, 100, 500]; + + const concurrentAccessError = "Concurrent access prevented in SkyDB"; + const registryUpdateError = "Unable to update the registry"; + + const getJSONWithDelay = async function ( + client: SkynetClient, + delay: number, + publicKey: string, + dataKey: string + ): Promise { + await new Promise((r) => setTimeout(r, delay)); + return await client.db.getJSON(publicKey, dataKey); + }; + const setJSONWithDelay = async function ( + client: SkynetClient, + delay: number, + privateKey: string, + dataKey: string, + data: JsonData + ) { + await new Promise((r) => setTimeout(r, delay)); + return await client.db.setJSON(privateKey, dataKey, data); + }; + + it.each(delays)( + "should not get old data when getJSON is called after setJSON on a single client with a '%s' ms delay and getJSON doesn't fail", + async (delay) => { + const { publicKey, privateKey } = genKeyPairAndSeed(); + + // Set the data. + await client.db.setJSON(privateKey, dataKey, jsonOld); + + // Try to invoke the data race. + let receivedJson; + try { + // Get the data while also calling setJSON. + [{ data: receivedJson }] = await Promise.all([ + getJSONWithDelay(client, delay, publicKey, dataKey), + setJSONWithDelay(client, 0, privateKey, dataKey, jsonNew), + ]); + } catch (e) { + if ((e as Error).message.includes(concurrentAccessError)) { + // The data race condition has been prevented and we received the + // expected error. Return from test early. + // + // NOTE: I've manually confirmed that both code paths (no error, and + // return on expected error) are hit. + return; + } + + // Unexpected error, throw. + throw e; + } + + // Data race did not occur, getJSON should have latest JSON. + expect(receivedJson).toEqual(jsonNew); + } + ); + + // NOTE: We can't guarantee that data won't be lost if two (or more) actors + // write to the registry at the same time, but we can guarantee that the + // final state will be the desired final state by at least one of the + // actors. One of the two clients will lose, but the other will win and be + // consistent, so the data won't be corrupt, it'll just be missing one + // update. + it.each(delays)( + "should get either old or new data when getJSON is called after setJSON on two different clients with a '%s' ms delay", + async (delay) => { + // Create two new clients with a fresh revision cache. + const client1 = new SkynetClient(portal); + const client2 = new SkynetClient(portal); + const { publicKey, privateKey } = genKeyPairAndSeed(); + + // Get revision entry cache handles. + const cachedRevisionEntry1 = await client1.db.revisionNumberCache.getRevisionAndMutexForEntry( + publicKey, + dataKey + ); + const cachedRevisionEntry2 = await client2.db.revisionNumberCache.getRevisionAndMutexForEntry( + publicKey, + dataKey + ); + + // Set the initial data. + { + await client1.db.setJSON(privateKey, dataKey, jsonOld); + expect(cachedRevisionEntry1.revision.toString()).toEqual("0"); + expect(cachedRevisionEntry2.revision.toString()).toEqual("-1"); + } + + // Call getJSON and setJSON concurrently on different clients -- both + // should succeeed. + { + // Get the data while also calling setJSON. + const [_, { data: receivedJson }] = await Promise.all([ + setJSONWithDelay(client1, 0, privateKey, dataKey, jsonNew), + getJSONWithDelay(client2, delay, publicKey, dataKey), + ]); + + // See if we got the new or old data. + expect(receivedJson).not.toBeNull(); + expect(cachedRevisionEntry1.revision.toString()).toEqual("1"); + if (receivedJson?.message === jsonNew.message) { + expect(cachedRevisionEntry2.revision.toString()).toEqual("1"); + // Return if we got the new data -- both clients are in sync. + // + // NOTE: I've manually confirmed that both code paths (old data and + // new data) are hit. + return; + } + // client2 should have old data and cached revision at this point. + expect(receivedJson).toEqual(jsonOld); + expect(cachedRevisionEntry2.revision.toString()).toEqual("0"); + } + + // If we got old data and an old revision from getJSON, the client may + // still be able to write to that entry, overwriting the new data. + // + // Try to update the entry with client2 which has the old revision. + const updatedJson = { message: 3 }; + let expectedJson: JsonData; + try { + await client2.db.setJSON(privateKey, dataKey, updatedJson); + expectedJson = updatedJson; + } catch (e) { + // Catches both "doesn't have enough pow" and "provided revision number + // is already registered" errors. + if ((e as Error).message.includes(registryUpdateError)) { + // NOTE: I've manually confirmed that both code paths (no error, and + // return on expected error) are hit. + expectedJson = jsonNew; + } else { + // Unexpected error, throw. + throw e; + } + } + + // The entry should have the overriden, updated data at this point. + await Promise.all([ + async () => { + const { data: receivedJson } = await client1.db.getJSON(publicKey, dataKey); + expect(cachedRevisionEntry1.revision.toString()).toEqual("1"); + expect(receivedJson).toEqual(expectedJson); + }, + async () => { + const { data: receivedJson } = await client2.db.getJSON(publicKey, dataKey); + expect(cachedRevisionEntry2.revision.toString()).toEqual("1"); + expect(receivedJson).toEqual(expectedJson); + }, + ]); + } + ); + + it.each(delays)( + "should make sure that two concurrent setJSON calls on a single client with a '%s' ms delay either fail with the right error or succeed ", + async (delay) => { + const { publicKey, privateKey } = genKeyPairAndSeed(); + + // Try to invoke two concurrent setJSON calls. + try { + await Promise.all([ + setJSONWithDelay(client, delay, privateKey, dataKey, jsonNew), + setJSONWithDelay(client, 0, privateKey, dataKey, jsonOld), + ]); + } catch (e) { + if ((e as Error).message.includes(concurrentAccessError)) { + // The data race condition has been prevented and we received the + // expected error. Return from test early. + // + // NOTE: I've manually confirmed that both code paths (no error, and + // return on expected error) are hit. + return; + } + + // Unexpected error, throw. + throw e; + } + + // Data race did not occur, getJSON should get latest JSON. + const { data: receivedJson } = await client.db.getJSON(publicKey, dataKey); + expect(receivedJson).toEqual(jsonNew); + } + ); + + it.each(delays)( + "should make sure that two concurrent setJSON calls on different clients with a '%s' ms delay fail with the right error or succeed", + async (delay) => { + // Create two new clients with a fresh revision cache. + const client1 = new SkynetClient(portal); + const client2 = new SkynetClient(portal); + const { publicKey, privateKey } = genKeyPairAndSeed(); + + // Try to invoke two concurrent setJSON calls. + try { + await Promise.all([ + setJSONWithDelay(client2, delay, privateKey, dataKey, jsonNew), + setJSONWithDelay(client1, 0, privateKey, dataKey, jsonOld), + ]); + } catch (e) { + if ((e as Error).message.includes(registryUpdateError)) { + // The data race condition has been prevented and we received the + // expected error. Return from test early. + // + // NOTE: I've manually confirmed that both code paths (no error, and + // return on expected error) are hit. + return; + } + + // Unexpected error, throw. + throw e; + } + + // Data race did not occur, getJSON should get one of the JSON values. + let client3; + if (Math.random() < 0.5) { + client3 = client1; + } else { + client3 = client2; + } + const { data: receivedJson } = await client3.db.getJSON(publicKey, dataKey); + expect([jsonOld, jsonNew]).toContainEqual(receivedJson); + } + ); + }); }); diff --git a/integration/upload_download.test.ts b/integration/upload_download.test.ts index b15d9952..009a653d 100644 --- a/integration/upload_download.test.ts +++ b/integration/upload_download.test.ts @@ -148,7 +148,7 @@ describe(`Upload and download end-to-end tests for portal '${portal}'`, () => { expect(contentType).toEqual("application/json"); }); - it("Should get file contents when content type is not specified", async () => { + it("should get file contents when content type is not specified", async () => { // Upload the data to acquire its skylink. Don't specify a content type. const file = new File([JSON.stringify(json)], dataKey); @@ -159,11 +159,24 @@ describe(`Upload and download end-to-end tests for portal '${portal}'`, () => { const { data, contentType } = await client.getFileContent(skylink); - expect(data).toEqual(expect.any(Object)); + expect(typeof data).toEqual("object"); expect(data).toEqual(json); expect(contentType).toEqual("application/octet-stream"); }); + it('should get binary data with responseType: "arraybuffer"', async () => { + // Hard-code skylink for a sqlite3 database. + const skylink = "DABchy1Q3tBUggIP9IF_7ha9vAfBZ1d2aYRxUnHSQg9QNA"; + + // Get file content and check returned values. + + const { data, contentType } = await client.getFileContent(skylink, { responseType: "arraybuffer" }); + + expect(typeof data).toEqual("object"); + expect(data instanceof ArrayBuffer).toBeTruthy(); + expect(contentType).toEqual("application/octet-stream"); + }); + it("Should upload and download a file with spaces in the filename", async () => { const filename = " foo bar "; diff --git a/jest.config.ts b/jest.config.ts index c1a69876..8ce02cf2 100644 --- a/jest.config.ts +++ b/jest.config.ts @@ -11,7 +11,7 @@ const config: Config.InitialOptions = { preset: "ts-jest", // From old package.json. - testTimeout: 90000, + testTimeout: 120000, // Automatically clear mock calls and instances between every test clearMocks: true, // An array of glob patterns indicating a set of files for which coverage information should be collected diff --git a/package.json b/package.json index 36c6f87a..080ba66e 100644 --- a/package.json +++ b/package.json @@ -52,6 +52,7 @@ }, "homepage": "https://github.com/SkynetLabs/skynet-js", "dependencies": { + "async-mutex": "^0.3.2", "axios": "^0.24.0", "base32-decode": "^1.0.0", "base32-encode": "^1.1.1", @@ -64,7 +65,7 @@ "randombytes": "^2.1.0", "sjcl": "^1.0.8", "skynet-mysky-utils": "^0.3.0", - "tus-js-client": "^2.2.0", + "@skynetlabs/tus-js-client": "^2.3.0", "tweetnacl": "^1.0.3", "url-join": "^4.0.1", "url-parse": "^1.5.1" diff --git a/src/client.test.ts b/src/client.test.ts index b51841fa..95c73c64 100644 --- a/src/client.test.ts +++ b/src/client.test.ts @@ -1,9 +1,9 @@ import axios from "axios"; import MockAdapter from "axios-mock-adapter"; -import { combineStrings } from "../utils/testing"; -import { buildRequestUrl } from "./client"; import { SkynetClient } from "./index"; +import { buildRequestUrl } from "./request"; +import { combineStrings } from "../utils/testing"; import { DEFAULT_SKYNET_PORTAL_URL } from "./utils/url"; const portalUrl = DEFAULT_SKYNET_PORTAL_URL; diff --git a/src/client.ts b/src/client.ts index 333b42b8..c5b41139 100644 --- a/src/client.ts +++ b/src/client.ts @@ -32,6 +32,7 @@ import { } from "./file"; import { pinSkylink } from "./pin"; import { getEntry, getEntryLinkAsync, getEntryUrl, setEntry, postSignedEntry } from "./registry"; +import { RevisionNumberCache } from "./revision_cache"; import { deleteJSON, getJSON, @@ -42,10 +43,10 @@ import { setEntryData, deleteEntryData, } from "./skydb"; -import { addSubdomain, addUrlQuery, defaultPortalUrl, ensureUrlPrefix, makeUrl } from "./utils/url"; +import { defaultPortalUrl } from "./utils/url"; import { loadMySky } from "./mysky"; import { extractDomain, getFullDomainUrl } from "./mysky/utils"; -import { ExecuteRequestError } from "./request"; +import { buildRequestHeaders, buildRequestUrl, ExecuteRequestError, Headers } from "./request"; /** * Custom client options. @@ -114,9 +115,13 @@ axios.interceptors.response.use( export class SkynetClient { customOptions: CustomClientOptions; - // The initial portal URL, either given to `new SkynetClient()` or if not, the value of `defaultPortalUrl()`. + // The initial portal URL, the value of `defaultPortalUrl()` if `new + // SkynetClient` is called without a given portal. This initial URL is used to + // resolve the final portal URL. protected initialPortalUrl: string; - // The resolved API portal URL. The request won't be made until needed, or `initPortalUrl()` is called. The request is only made once, for all Skynet Clients. + // The resolved API portal URL. The request won't be made until needed, or + // `initPortalUrl()` is called. The request is only made once, for all Skynet + // Clients. protected static resolvedPortalUrl?: Promise; // The custom portal URL, if one was passed in to `new SkynetClient()`. protected customPortalUrl?: string; @@ -178,6 +183,10 @@ export class SkynetClient { getEntryData: getEntryData.bind(this), setEntryData: setEntryData.bind(this), deleteEntryData: deleteEntryData.bind(this), + + // Holds the cached revision numbers, protected by mutexes to prevent + // concurrent access. + revisionNumberCache: new RevisionNumberCache(), }; // Registry @@ -342,80 +351,3 @@ export class SkynetClient { return portalUrl; } } - -// ======= -// Helpers -// ======= - -/** - * Helper function that builds the request URL. Ensures that the final URL - * always has a protocol prefix for consistency. - * - * @param client - The Skynet client. - * @param parts - The URL parts to use when constructing the URL. - * @param [parts.baseUrl] - The base URL to use, instead of the portal URL. - * @param [parts.endpointPath] - The endpoint to contact. - * @param [parts.subdomain] - An optional subdomain to add to the URL. - * @param [parts.extraPath] - An optional path to append to the URL. - * @param [parts.query] - Optional query parameters to append to the URL. - * @returns - The built URL. - */ -export async function buildRequestUrl( - client: SkynetClient, - parts: { - baseUrl?: string; - endpointPath?: string; - subdomain?: string; - extraPath?: string; - query?: { [key: string]: string | undefined }; - } -): Promise { - let url; - - // Get the base URL, if not passed in. - if (!parts.baseUrl) { - url = await client.portalUrl(); - } else { - url = parts.baseUrl; - } - - // Make sure the URL has a protocol. - url = ensureUrlPrefix(url); - - if (parts.endpointPath) { - url = makeUrl(url, parts.endpointPath); - } - if (parts.extraPath) { - url = makeUrl(url, parts.extraPath); - } - if (parts.subdomain) { - url = addSubdomain(url, parts.subdomain); - } - if (parts.query) { - url = addUrlQuery(url, parts.query); - } - - return url; -} - -export type Headers = { [key: string]: string }; - -/** - * Helper function that builds the request headers. - * - * @param [baseHeaders] - Any base headers. - * @param [customUserAgent] - A custom user agent to set. - * @param [customCookie] - A custom cookie. - * @returns - The built headers. - */ -export function buildRequestHeaders(baseHeaders?: Headers, customUserAgent?: string, customCookie?: string): Headers { - const returnHeaders = { ...baseHeaders }; - // Set some headers from common options. - if (customUserAgent) { - returnHeaders["User-Agent"] = customUserAgent; - } - if (customCookie) { - returnHeaders["Cookie"] = customCookie; - } - return returnHeaders; -} diff --git a/src/download.ts b/src/download.ts index 2fdf0f5f..1230f4b8 100644 --- a/src/download.ts +++ b/src/download.ts @@ -1,13 +1,14 @@ import { AxiosResponse, ResponseType } from "axios"; -import { Headers, SkynetClient } from "./client"; +import { SkynetClient } from "./client"; import { getEntryLink, validateRegistryProof } from "./registry"; +import { buildRequestUrl, Headers } from "./request"; import { convertSkylinkToBase32, formatSkylink } from "./skylink/format"; import { parseSkylink } from "./skylink/parse"; import { isSkylinkV1 } from "./skylink/sia"; import { BaseCustomOptions, DEFAULT_BASE_OPTIONS } from "./utils/options"; import { trimUriPrefix } from "./utils/string"; -import { addSubdomain, addUrlQuery, makeUrl, URI_HANDSHAKE_PREFIX } from "./utils/url"; +import { addUrlSubdomain, addUrlQuery, makeUrl, URI_HANDSHAKE_PREFIX } from "./utils/url"; import { throwValidationError, validateObject, @@ -253,7 +254,7 @@ export function getSkylinkUrlForPortal( } // Convert the skylink (without the path) to base32. skylink = convertSkylinkToBase32(skylink); - url = addSubdomain(portalUrl, skylink); + url = addUrlSubdomain(portalUrl, skylink); url = makeUrl(url, skylinkPath, path); } else { // Get the skylink including the path. @@ -292,12 +293,15 @@ export async function getHnsUrl( const query = buildQuery(opts.download); domain = trimUriPrefix(domain, URI_HANDSHAKE_PREFIX); - const portalUrl = await this.portalUrl(); - const url = opts.subdomain - ? addSubdomain(addSubdomain(portalUrl, opts.hnsSubdomain), domain) - : makeUrl(portalUrl, opts.endpointDownloadHns, domain); + let subdomain, endpointPath, extraPath; + if (opts.subdomain) { + subdomain = `${domain}.${opts.hnsSubdomain}`; + } else { + endpointPath = opts.endpointDownloadHns; + extraPath = domain; + } - return addUrlQuery(url, query); + return buildRequestUrl(this, { endpointPath, extraPath, subdomain, query }); } /** @@ -321,9 +325,8 @@ export async function getHnsresUrl( const opts = { ...DEFAULT_RESOLVE_HNS_OPTIONS, ...this.customOptions, ...customOptions }; domain = trimUriPrefix(domain, URI_HANDSHAKE_PREFIX); - const portalUrl = await this.portalUrl(); - return makeUrl(portalUrl, opts.endpointResolveHns, domain); + return buildRequestUrl(this, { endpointPath: opts.endpointResolveHns, extraPath: domain }); } /** diff --git a/src/mysky/index.ts b/src/mysky/index.ts index 3898ea88..395abfbb 100644 --- a/src/mysky/index.ts +++ b/src/mysky/index.ts @@ -29,14 +29,13 @@ import { DEFAULT_SET_JSON_OPTIONS, CustomGetJSONOptions, CustomSetJSONOptions, - getOrCreateRegistryEntry, + getOrCreateSkyDBRegistryEntry, JSONResponse, - getNextRegistryEntry, - getOrCreateRawBytesRegistryEntry, validateEntryData, CustomSetEntryDataOptions, DEFAULT_SET_ENTRY_DATA_OPTIONS, DELETION_ENTRY_DATA, + incrementRevision, } from "../skydb"; import { Signature } from "../crypto"; import { deriveDiscoverableFileTweak } from "./tweak"; @@ -358,6 +357,10 @@ export class MySky { return await this.connector.connection.remoteHandle().call("userID"); } + // ============= + // SkyDB methods + // ============= + /** * Gets Discoverable JSON at the given path through MySky, if the user has * given Discoverable Read permissions to do so. @@ -427,14 +430,33 @@ export class MySky { const dataKey = deriveDiscoverableFileTweak(path); opts.hashedDataKeyHex = true; // Do not hash the tweak anymore. - const [entry, dataLink] = await getOrCreateRegistryEntry(this.connector.client, publicKey, dataKey, json, opts); - - const signature = await this.signRegistryEntry(entry, path); - - const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); - await this.connector.client.registry.postSignedEntry(publicKey, entry, signature, setEntryOpts); - - return { data: json, dataLink }; + // Immediately fail if the mutex is not available. + return await this.connector.client.db.revisionNumberCache.withCachedEntryLock( + publicKey, + dataKey, + async (cachedRevisionEntry) => { + // Get the cached revision number before doing anything else. + const newRevision = incrementRevision(cachedRevisionEntry.revision); + + // Call SkyDB helper to create the registry entry. We can't call SkyDB's + // setJSON here directly because we need MySky to sign the entry, instead of + // signing it ourselves with a given private key. + const [entry, dataLink] = await getOrCreateSkyDBRegistryEntry( + this.connector.client, + dataKey, + json, + newRevision, + opts + ); + + const signature = await this.signRegistryEntry(entry, path); + + const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); + await this.connector.client.registry.postSignedEntry(publicKey, entry, signature, setEntryOpts); + + return { data: json, dataLink }; + } + ); } /** @@ -456,9 +478,15 @@ export class MySky { ...customOptions, }; + // We re-implement deleteJSON instead of calling SkyDB's deleteJSON so that + // MySky can do the signing. await this.setEntryData(path, DELETION_ENTRY_DATA, { ...opts, allowDeletionEntryData: true }); } + // ================== + // Entry Data Methods + // ================== + /** * Sets entry at the given path to point to the data link. Like setJSON, but it doesn't upload a file. * @@ -525,21 +553,33 @@ export class MySky { ...customOptions, }; + // We re-implement setEntryData instead of calling SkyDB's setEntryData so + // that MySky can do the signing. + validateEntryData(data, opts.allowDeletionEntryData); const publicKey = await this.userID(); const dataKey = deriveDiscoverableFileTweak(path); opts.hashedDataKeyHex = true; // Do not hash the tweak anymore. - const getEntryOpts = extractOptions(opts, DEFAULT_GET_ENTRY_OPTIONS); - const entry = await getNextRegistryEntry(this.connector.client, publicKey, dataKey, data, getEntryOpts); + // Immediately fail if the mutex is not available. + return await this.connector.client.db.revisionNumberCache.withCachedEntryLock( + publicKey, + dataKey, + async (cachedRevisionEntry) => { + // Get the cached revision number before doing anything else. + const newRevision = incrementRevision(cachedRevisionEntry.revision); + + const entry = { dataKey, data, revision: newRevision }; - const signature = await this.signRegistryEntry(entry, path); + const signature = await this.signRegistryEntry(entry, path); - const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); - await this.connector.client.registry.postSignedEntry(publicKey, entry, signature, setEntryOpts); + const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); + await this.connector.client.registry.postSignedEntry(publicKey, entry, signature, setEntryOpts); - return { data: entry.data }; + return { data: entry.data }; + } + ); } /** @@ -656,20 +696,32 @@ export class MySky { const [publicKey, pathSeed] = await Promise.all([this.userID(), this.getEncryptedPathSeed(path, false)]); const dataKey = deriveEncryptedFileTweak(pathSeed); opts.hashedDataKeyHex = true; // Do not hash the tweak anymore. - const encryptionKey = deriveEncryptedFileKeyEntropy(pathSeed); - // Pad and encrypt json file. - const data = encryptJSONFile(json, { version: ENCRYPTED_JSON_RESPONSE_VERSION }, encryptionKey); + // Immediately fail if the mutex is not available. + return await this.connector.client.db.revisionNumberCache.withCachedEntryLock( + publicKey, + dataKey, + async (cachedRevisionEntry) => { + // Get the cached revision number before doing anything else. + const newRevision = incrementRevision(cachedRevisionEntry.revision); - const entry = await getOrCreateRawBytesRegistryEntry(this.connector.client, publicKey, dataKey, data, opts); + // Derive the key. + const encryptionKey = deriveEncryptedFileKeyEntropy(pathSeed); - // Call MySky which checks for write permissions on the path. - const signature = await this.signEncryptedRegistryEntry(entry, path); + // Pad and encrypt json file. + const data = encryptJSONFile(json, { version: ENCRYPTED_JSON_RESPONSE_VERSION }, encryptionKey); - const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); - await this.connector.client.registry.postSignedEntry(publicKey, entry, signature, setEntryOpts); + const [entry] = await getOrCreateSkyDBRegistryEntry(this.connector.client, dataKey, data, newRevision, opts); - return { data: json }; + // Call MySky which checks for write permissions on the path. + const signature = await this.signEncryptedRegistryEntry(entry, path); + + const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); + await this.connector.client.registry.postSignedEntry(publicKey, entry, signature, setEntryOpts); + + return { data: json }; + } + ); } /** diff --git a/src/registry.test.ts b/src/registry.test.ts index 52da2782..e4385ad9 100644 --- a/src/registry.test.ts +++ b/src/registry.test.ts @@ -5,14 +5,15 @@ import { genKeyPairAndSeed, SIGNATURE_LENGTH } from "./crypto"; import { SkynetClient, defaultSkynetPortalUrl, genKeyPairFromSeed } from "./index"; import { getEntryLink, getEntryUrlForPortal, signEntry, validateRegistryProof } from "./registry"; import { uriSkynetPrefix } from "./utils/url"; -import { stringToUint8ArrayUtf8 } from "./utils/string"; +import { hexToUint8Array, stringToUint8ArrayUtf8 } from "./utils/string"; const { publicKey, privateKey } = genKeyPairFromSeed("insecure test seed"); const portalUrl = defaultSkynetPortalUrl; const client = new SkynetClient(portalUrl); const dataKey = "app"; -const registryLookupUrl = getEntryUrlForPortal(portalUrl, publicKey, dataKey); +const registryPostUrl = `${portalUrl}/skynet/registry`; +const registryGetUrl = getEntryUrlForPortal(portalUrl, publicKey, dataKey); describe("getEntry", () => { let mock: MockAdapter; @@ -23,13 +24,14 @@ describe("getEntry", () => { }); it("should throw if the response status is not in the 200s and not 404 and JSON is returned", async () => { - mock.onGet(registryLookupUrl).replyOnce(400, JSON.stringify({ message: "foo error" })); + mock.onGet(registryGetUrl).replyOnce(400, JSON.stringify({ message: "foo error" })); - await expect(client.registry.getEntry(publicKey, dataKey)).rejects.toThrowError( - "Request failed with status code 400" + await expect(client.registry.getEntry(publicKey, dataKey)).rejects.toEqual( + new Error("Request failed with status code 400: foo error") ); }); + // In the case of a 429 error due to rate limiting, all we get is HTML. it("should throw if the response status is not in the 200s and not 404 and HTML is returned", async () => { const responseHTML = ` 429 Too Many Requests @@ -39,10 +41,10 @@ describe("getEntry", () => { `; - mock.onGet(registryLookupUrl).replyOnce(429, responseHTML); + mock.onGet(registryGetUrl).replyOnce(429, responseHTML); - await expect(client.registry.getEntry(publicKey, dataKey)).rejects.toThrowError( - "Request failed with status code 429" + await expect(client.registry.getEntry(publicKey, dataKey)).rejects.toEqual( + new Error("Request failed with status code 429") ); }); @@ -55,7 +57,7 @@ describe("getEntry", () => { "33d14d2889cb292142614da0e0ff13a205c4867961276001471d13b779fc9032568ddd292d9e0dff69d7b1f28be07972cc9d86da3cecf3adecb6f9b7311af808", }; - mock.onGet(registryLookupUrl).replyOnce(200, JSON.stringify(entryData)); + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); await expect(client.registry.getEntry(publicKey, dataKey)).rejects.toThrow(); }); @@ -67,7 +69,7 @@ describe("getEntry", () => { }); it("Should throw on incomplete response from registry GET", async () => { - mock.onGet(registryLookupUrl).replyOnce(200, "{}"); + mock.onGet(registryGetUrl).replyOnce(200, "{}"); await expect(client.registry.getEntry(publicKey, dataKey)).rejects.toThrowError( "Did not get a complete entry response" @@ -125,6 +127,31 @@ describe("getEntryUrl", () => { }); describe("setEntry", () => { + let mock: MockAdapter; + + beforeEach(() => { + mock = new MockAdapter(axios); + mock.resetHistory(); + }); + + it("Should sign and set a valid registry entry", async () => { + // mock a successful registry update + mock.onPost(registryPostUrl).replyOnce(204); + + // Hex-encoded skylink. + const data = hexToUint8Array( + "43414241425f31447430464a73787173755f4a34546f644e4362434776744666315579735f3345677a4f6c546367" + ); + + const entry = { + data, + dataKey, + revision: BigInt(11), + }; + + await client.registry.setEntry(privateKey, entry); + }); + it("Should throw an error if the private key is not hex-encoded", async () => { // @ts-expect-error We pass an invalid private key on purpose. await expect(client.registry.setEntry("foo", {})).rejects.toThrowError( diff --git a/src/registry.ts b/src/registry.ts index 0172fcc0..f198160b 100644 --- a/src/registry.ts +++ b/src/registry.ts @@ -576,11 +576,11 @@ export function validateRegistryProof( } /** - * Handles error responses returned in getEntry. + * Handles error responses returned from getEntry endpoint. * * @param err - The error. * @returns - An empty signed registry entry if the status code is 404. - * @throws - Will throw if the status code is not 404. + * @throws - Will throw if the error response is malformed, or the error message otherwise if the error status code is not 404. */ function handleGetEntryErrResponse(err: ExecuteRequestError): SignedRegistryEntry { // Check if status was 404 "not found" and return null if so. @@ -588,6 +588,7 @@ function handleGetEntryErrResponse(err: ExecuteRequestError): SignedRegistryEntr return { entry: null, signature: null }; } + // Return the error message from skyd. throw err; } diff --git a/src/request.ts b/src/request.ts index e5c73380..acd9d574 100644 --- a/src/request.ts +++ b/src/request.ts @@ -1,5 +1,81 @@ import { AxiosError } from "axios"; +import { SkynetClient } from "./client"; +import { addUrlQuery, addUrlSubdomain, ensureUrlPrefix, makeUrl } from "./utils/url"; + +export type Headers = { [key: string]: string }; + +/** + * Helper function that builds the request headers. + * + * @param [baseHeaders] - Any base headers. + * @param [customUserAgent] - A custom user agent to set. + * @param [customCookie] - A custom cookie. + * @returns - The built headers. + */ +export function buildRequestHeaders(baseHeaders?: Headers, customUserAgent?: string, customCookie?: string): Headers { + const returnHeaders = { ...baseHeaders }; + // Set some headers from common options. + if (customUserAgent) { + returnHeaders["User-Agent"] = customUserAgent; + } + if (customCookie) { + returnHeaders["Cookie"] = customCookie; + } + return returnHeaders; +} + +/** + * Helper function that builds the request URL. Ensures that the final URL + * always has a protocol prefix for consistency. + * + * @param client - The Skynet client. + * @param parts - The URL parts to use when constructing the URL. + * @param [parts.baseUrl] - The base URL to use, instead of the portal URL. + * @param [parts.endpointPath] - The endpoint to contact. + * @param [parts.subdomain] - An optional subdomain to add to the URL. + * @param [parts.extraPath] - An optional path to append to the URL. + * @param [parts.query] - Optional query parameters to append to the URL. + * @returns - The built URL. + */ +export async function buildRequestUrl( + client: SkynetClient, + parts: { + baseUrl?: string; + endpointPath?: string; + subdomain?: string; + extraPath?: string; + query?: { [key: string]: string | undefined }; + } +): Promise { + let url; + + // Get the base URL, if not passed in. + if (!parts.baseUrl) { + url = await client.portalUrl(); + } else { + url = parts.baseUrl; + } + + // Make sure the URL has a protocol. + url = ensureUrlPrefix(url); + + if (parts.endpointPath) { + url = makeUrl(url, parts.endpointPath); + } + if (parts.extraPath) { + url = makeUrl(url, parts.extraPath); + } + if (parts.subdomain) { + url = addUrlSubdomain(url, parts.subdomain); + } + if (parts.query) { + url = addUrlQuery(url, parts.query); + } + + return url; +} + export class ExecuteRequestError extends Error { originalError: AxiosError; responseStatus: number | null; @@ -24,7 +100,7 @@ export class ExecuteRequestError extends Error { static From(err: AxiosError): ExecuteRequestError { /* istanbul ignore next */ if (!err.response) { - return new ExecuteRequestError(`Error repsonse did not contain expected field 'response'.`, err, null, null); + return new ExecuteRequestError(`Error response did not contain expected field 'response'.`, err, null, null); } /* istanbul ignore next */ if (!err.response.status) { @@ -41,11 +117,11 @@ export class ExecuteRequestError extends Error { // If we don't get an error message from skyd, just return the status code. /* istanbul ignore next */ if (!err.response.data) { - return new ExecuteRequestError(`Request failed with status code ${status}.`, err, status, null); + return new ExecuteRequestError(`Request failed with status code ${status}`, err, status, null); } /* istanbul ignore next */ if (!err.response.data.message) { - return new ExecuteRequestError(`Request failed with status code ${status}.`, err, status, null); + return new ExecuteRequestError(`Request failed with status code ${status}`, err, status, null); } // Return the error message from skyd. Pass along the original Axios error. diff --git a/src/revision_cache.ts b/src/revision_cache.ts new file mode 100644 index 00000000..6971889c --- /dev/null +++ b/src/revision_cache.ts @@ -0,0 +1,93 @@ +import { Mutex, tryAcquire } from "async-mutex"; + +/** + * An abstraction over the client's revision number cache. Provides a cache, + * keyed by public key and data key and protected by a mutex to guard against + * concurrent access to the cache. Each cache entry also has its own mutex, to + * protect against concurrent access to that entry. + */ +export class RevisionNumberCache { + private mutex: Mutex; + private cache: { [key: string]: CachedRevisionNumber }; + + constructor() { + this.mutex = new Mutex(); + this.cache = {}; + } + + /** + * Gets the revision cache key for the given public key and data key. + * + * @param publicKey - The given public key. + * @param dataKey - The given data key. + * @returns - The revision cache key. + */ + static getCacheKey(publicKey: string, dataKey: string): string { + return `${publicKey}/${dataKey}`; + } + + /** + * Gets an object containing the cached revision and the mutex for the entry. + * The revision and mutex will be initialized if the entry is not yet cached. + * + * @param publicKey - The given public key. + * @param dataKey - The given data key. + * @returns - The cached revision entry object. + */ + async getRevisionAndMutexForEntry(publicKey: string, dataKey: string): Promise { + const cacheKey = RevisionNumberCache.getCacheKey(publicKey, dataKey); + + // Block until the mutex is available for the cache. + return await this.mutex.runExclusive(async () => { + if (!this.cache[cacheKey]) { + this.cache[cacheKey] = new CachedRevisionNumber(); + } + return this.cache[cacheKey]; + }); + } + + /** + * Calls `exclusiveFn` with exclusive access to the given cached entry. The + * revision number of the entry can be safely updated in `exclusiveFn`. + * + * @param publicKey - The given public key. + * @param dataKey - The given data key. + * @param exclusiveFn - A function to call with exclusive access to the given cached entry. + * @returns - A promise containing the result of calling `exclusiveFn`. + */ + async withCachedEntryLock( + publicKey: string, + dataKey: string, + exclusiveFn: (cachedRevisionEntry: CachedRevisionNumber) => Promise + ): Promise { + // Safely get or create mutex for the requested entry. + const cachedRevisionEntry = await this.getRevisionAndMutexForEntry(publicKey, dataKey); + + try { + return await tryAcquire(cachedRevisionEntry.mutex).runExclusive(async () => exclusiveFn(cachedRevisionEntry)); + } catch (e) { + // Change mutex error to be more descriptive and user-friendly. + if ((e as Error).message.includes("mutex already locked")) { + throw new Error( + `Concurrent access prevented in SkyDB for entry { publicKey: ${publicKey}, dataKey: ${dataKey} }` + ); + } else { + throw e; + } + } + } +} + +/** + * An object containing a cached revision and a corresponding mutex. The + * revision can be internally updated and it will reflect in the client's cache. + */ +export class CachedRevisionNumber { + mutex: Mutex; + revision: bigint; + + constructor() { + this.mutex = new Mutex(); + this.revision = BigInt(-1); + } +} diff --git a/src/skydb.test.ts b/src/skydb.test.ts index 9e257702..851d5ca0 100644 --- a/src/skydb.test.ts +++ b/src/skydb.test.ts @@ -3,11 +3,15 @@ import MockAdapter from "axios-mock-adapter"; import { getSkylinkUrlForPortal } from "./download"; import { MAX_REVISION } from "./utils/number"; +import { stringToUint8ArrayUtf8, toHexString } from "./utils/string"; import { DEFAULT_SKYNET_PORTAL_URL, URI_SKYNET_PREFIX } from "./utils/url"; import { SkynetClient } from "./index"; -import { getEntryUrlForPortal, REGEX_REVISION_NO_QUOTES } from "./registry"; -import { checkCachedDataLink, DELETION_ENTRY_DATA } from "./skydb"; +import { getEntryUrlForPortal } from "./registry"; +import { checkCachedDataLink, DELETION_ENTRY_DATA, JSONResponse } from "./skydb"; import { MAX_ENTRY_LENGTH } from "./mysky"; +import { decodeSkylink } from "./skylink/sia"; +import { getSettledValues } from "../utils/testing"; +import { JsonData } from "./utils/types"; // Generated with genKeyPairFromSeed("insecure test seed") const [publicKey, privateKey] = [ @@ -25,14 +29,15 @@ const bitfield = 2048; const portalUrl = DEFAULT_SKYNET_PORTAL_URL; const client = new SkynetClient(portalUrl); -const registryUrl = `${portalUrl}/skynet/registry`; -const registryLookupUrl = getEntryUrlForPortal(portalUrl, publicKey, dataKey); +const registryPostUrl = `${portalUrl}/skynet/registry`; +const registryGetUrl = getEntryUrlForPortal(portalUrl, publicKey, dataKey); const uploadUrl = `${portalUrl}/skynet/skyfile`; const skylinkUrl = getSkylinkUrlForPortal(portalUrl, skylink); // Hex-encoded skylink. const data = "43414241425f31447430464a73787173755f4a34546f644e4362434776744666315579735f3345677a4f6c546367"; const revision = 11; +// Entry data for the data and revision. const entryData = { data, revision, @@ -40,6 +45,12 @@ const entryData = { "33d14d2889cb292142614da0e0ff13a205c4867961276001471d13b779fc9032568ddd292d9e0dff69d7b1f28be07972cc9d86da3cecf3adecb6f9b7311af809", }; +const headers = { + "skynet-portal-api": portalUrl, + "skynet-skylink": skylink, + "content-type": "application/json", +}; + describe("getJSON", () => { let mock: MockAdapter; @@ -49,15 +60,10 @@ describe("getJSON", () => { mock.resetHistory(); }); - const headers = { - "skynet-portal-api": portalUrl, - "skynet-skylink": skylink, - "content-type": "application/json", - }; - it("should perform a lookup and skylink GET", async () => { - // mock a successful registry lookup - mock.onGet(registryLookupUrl).replyOnce(200, JSON.stringify(entryData)); + // Mock a successful registry lookup. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); + // Mock a successful data download. mock.onGet(skylinkUrl).replyOnce(200, fullJsonData, headers); const { data, dataLink } = await client.db.getJSON(publicKey, dataKey); @@ -66,9 +72,56 @@ describe("getJSON", () => { expect(mock.history.get.length).toBe(2); }); + it("should fail properly with a too low error", async () => { + // Use a custom data key for this test to get a fresh cache. + const dataKey = "testTooLowError"; + const registryGetUrl = getEntryUrlForPortal(portalUrl, publicKey, dataKey); + const skylinkData = toHexString(decodeSkylink(skylink)); + const entryData = { + data: skylinkData, + revision: 1, + signature: + "18d2b5f64042db39c4c591c21bd93015f7839eefab487ef8e27086cdb95b190732211b9a23d38c33f4f9a4e5219de55a80f75ff7e437713732ecdb4ccddb0804", + }; + const entryDataTooLow = { + data: skylinkData, + revision: 0, + signature: + "4d7b26923f4211794eaf5c13230e62618ea3bebcb3fa6511ec8772b1f1e1a675b5244e7c33f89daf31999aeabe46c3a1e324a04d2f35c6ba902c75d35ceba00d", + }; + + // Cache the revision. + + // Mock a successful registry lookup. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); + // Mock a successful data download. + mock.onGet(skylinkUrl).replyOnce(200, fullJsonData, headers); + + const { data } = await client.db.getJSON(publicKey, dataKey); + expect(data).toEqual(jsonData); + + // The cache should contain revision 1. + const cachedRevisionEntry = await client.db.revisionNumberCache.getRevisionAndMutexForEntry(publicKey, dataKey); + expect(cachedRevisionEntry.revision.toString()).toEqual("1"); + + // Return a revision that's too low. + + // Mock a successful registry lookup. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataTooLow)); + // Mock a successful data download. + mock.onGet(skylinkUrl).replyOnce(200, fullJsonData, headers); + + await expect(client.db.getJSON(publicKey, dataKey)).rejects.toThrowError( + "Returned revision number too low. A higher revision number for this userID and path is already cached" + ); + + // The cache should still contain revision 1. + expect(cachedRevisionEntry.revision.toString()).toEqual("1"); + }); + it("should perform a lookup but not a skylink GET if the cachedDataLink is a hit", async () => { // mock a successful registry lookup - mock.onGet(registryLookupUrl).replyOnce(200, JSON.stringify(entryData)); + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); const { data, dataLink } = await client.db.getJSON(publicKey, dataKey, { cachedDataLink: skylink }); expect(data).toBeNull(); @@ -80,7 +133,7 @@ describe("getJSON", () => { const skylinkNoHit = "XABvi7JtJbQSMAcDwnUnmp2FKDPjg8_tTTFP4BwMSxVdEg"; // mock a successful registry lookup - mock.onGet(registryLookupUrl).replyOnce(200, JSON.stringify(entryData)); + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); mock.onGet(skylinkUrl).replyOnce(200, fullJsonData, headers); const { data, dataLink } = await client.db.getJSON(publicKey, dataKey, { cachedDataLink: skylinkNoHit }); @@ -91,7 +144,7 @@ describe("getJSON", () => { it("should throw if the cachedDataLink is not a valid skylink", async () => { // mock a successful registry lookup - mock.onGet(registryLookupUrl).replyOnce(200, JSON.stringify(entryData)); + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); mock.onGet(skylinkUrl).replyOnce(200, fullJsonData, {}); await expect(client.db.getJSON(publicKey, dataKey, { cachedDataLink: "asdf" })).rejects.toThrowError( @@ -101,7 +154,7 @@ describe("getJSON", () => { it("should perform a lookup and skylink GET on legacy pre-v4 data", async () => { // mock a successful registry lookup - mock.onGet(registryLookupUrl).replyOnce(200, JSON.stringify(entryData)); + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); mock.onGet(skylinkUrl).replyOnce(200, legacyJsonData, headers); const jsonReturned = await client.db.getJSON(publicKey, dataKey); @@ -110,7 +163,7 @@ describe("getJSON", () => { }); it("should return null if no entry is found", async () => { - mock.onGet(registryLookupUrl).reply(404); + mock.onGet(registryGetUrl).replyOnce(404); const { data, dataLink } = await client.db.getJSON(publicKey, dataKey); expect(data).toBeNull(); @@ -119,8 +172,8 @@ describe("getJSON", () => { it("should throw if the returned file data is not JSON", async () => { // mock a successful registry lookup - mock.onGet(registryLookupUrl).reply(200, JSON.stringify(entryData)); - mock.onGet(skylinkUrl).reply(200, "thisistext", { ...headers, "content-type": "text/plain" }); + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); + mock.onGet(skylinkUrl).replyOnce(200, "thisistext", { ...headers, "content-type": "text/plain" }); await expect(client.db.getJSON(publicKey, dataKey)).rejects.toThrowError( `File data for the entry at data key '${dataKey}' is not JSON.` @@ -129,8 +182,8 @@ describe("getJSON", () => { it("should throw if the returned _data field in the file data is not JSON", async () => { // mock a successful registry lookup - mock.onGet(registryLookupUrl).reply(200, JSON.stringify(entryData)); - mock.onGet(skylinkUrl).reply(200, { _data: "thisistext", _v: 1 }, headers); + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryData)); + mock.onGet(skylinkUrl).replyOnce(200, { _data: "thisistext", _v: 1 }, headers); await expect(client.db.getJSON(publicKey, dataKey)).rejects.toThrowError( "File data '_data' for the entry at data key 'app' is not JSON." @@ -160,11 +213,8 @@ describe("setJSON", () => { }); it("should perform an upload, lookup and registry update", async () => { - // mock a successful registry lookup - mock.onGet(registryLookupUrl).replyOnce(200, JSON.stringify(entryData)); - // mock a successful registry update - mock.onPost(registryUrl).replyOnce(204); + mock.onPost(registryPostUrl).replyOnce(204); // set data const { data: returnedData, dataLink: returnedSkylink } = await client.db.setJSON(privateKey, dataKey, jsonData); @@ -172,25 +222,23 @@ describe("setJSON", () => { expect(returnedSkylink).toEqual(sialink); // assert our request history contains the expected amount of requests - expect(mock.history.get.length).toBe(1); + expect(mock.history.get.length).toBe(0); expect(mock.history.post.length).toBe(2); const data = JSON.parse(mock.history.post[1].data); expect(data).toBeDefined(); - expect(data.revision).toEqual(revision + 1); + expect(data.revision).toBeGreaterThanOrEqual(revision + 1); }); - it("should use a revision number of 0 if the lookup failed", async () => { - mock.onGet(registryLookupUrl).reply(404); - + it("should use a revision number of 0 if the entry is not cached", async () => { // mock a successful registry update - mock.onPost(registryUrl).reply(204); + mock.onPost(registryPostUrl).replyOnce(204); // call `setJSON` on the client - await client.db.setJSON(privateKey, dataKey, jsonData); + await client.db.setJSON(privateKey, "inexistent entry", jsonData); // assert our request history contains the expected amount of requests - expect(mock.history.get.length).toBe(1); + expect(mock.history.get.length).toBe(0); expect(mock.history.post.length).toBe(2); const data = JSON.parse(mock.history.post[1].data); @@ -199,20 +247,12 @@ describe("setJSON", () => { }); it("should fail if the entry has the maximum allowed revision", async () => { - // mock a successful registry lookup - const entryData = { - data, - // String the bigint since JS doesn't support 64-bit numbers. - revision: MAX_REVISION.toString(), - signature: - "18c76e88141c7cc76d8a77abcd91b5d64d8fc3833eae407ab8a5339e5fcf7940e3fa5830a8ad9439a0c0cc72236ed7b096ae05772f81eee120cbd173bfd6600e", - }; - // Replace the quotes around the stringed bigint. - const json = JSON.stringify(entryData).replace(REGEX_REVISION_NO_QUOTES, '"revision":"$1"'); - mock.onGet(registryLookupUrl).reply(200, json); + const dataKey = "maximum revision"; + const cachedRevisionEntry = await client.db.revisionNumberCache.getRevisionAndMutexForEntry(publicKey, dataKey); + cachedRevisionEntry.revision = MAX_REVISION; // mock a successful registry update - mock.onPost(registryUrl).reply(204); + mock.onPost(registryPostUrl).replyOnce(204); // Try to set data, should fail. await expect(client.db.setJSON(privateKey, dataKey, entryData)).rejects.toThrowError( @@ -239,6 +279,30 @@ describe("setJSON", () => { "Expected parameter 'json' to be type 'object', was type 'undefined'" ); }); + + it("Should not update the cached revision if the registry update fails.", async () => { + const dataKey = "registry failure"; + const json = { foo: "bar" }; + + // mock a successful registry update + mock.onPost(registryPostUrl).replyOnce(204); + + await client.db.setJSON(privateKey, dataKey, json); + + const cachedRevisionEntry = await client.db.revisionNumberCache.getRevisionAndMutexForEntry(publicKey, dataKey); + const revision1 = cachedRevisionEntry.revision; + + // mock a failed registry update + mock.onPost(registryPostUrl).replyOnce(400, JSON.stringify({ message: "foo" })); + + await expect(client.db.setJSON(privateKey, dataKey, json)).rejects.toEqual( + new Error("Request failed with status code 400: foo") + ); + + const revision2 = cachedRevisionEntry.revision; + + expect(revision1.toString()).toEqual(revision2.toString()); + }); }); describe("setEntryData", () => { @@ -276,3 +340,309 @@ describe("checkCachedDataLink", () => { ); }); }); + +// REGRESSION TESTS: By creating a gap between setJSON and getJSON, a user +// could call getJSON, get outdated data, then call setJSON, and overwrite +// more up to date data with outdated data, but still use a high enough +// revision number. +// +// The fix is that you cannot retrieve the revision number while calling +// setJSON. You have to use the same revision number that you had when you +// called getJSON. +describe("getJSON/setJSON data race regression unit tests", () => { + let mock: MockAdapter; + + beforeEach(() => { + // Add a delay to responses to simulate actual calls that use the network. + mock = new MockAdapter(axios, { delayResponse: 100 }); + mock.reset(); + mock.onHead(portalUrl).replyOnce(200, {}, { "skynet-portal-api": portalUrl }); + }); + + const skylinkOld = "XABvi7JtJbQSMAcDwnUnmp2FKDPjg8_tTTFP4BwMSxVdEg"; + const skylinkOldUrl = getSkylinkUrlForPortal(portalUrl, skylinkOld); + const dataOld = toHexString(stringToUint8ArrayUtf8(skylinkOld)); // hex-encoded skylink + const revisionOld = 0; + const entryDataOld = { + data: dataOld, + revision: revisionOld, + signature: + "921d30e860d51f13d1065ea221b29fc8d11cfe7fa0e32b5d5b8e13bee6f91cfa86fe6b12ca4cef7a90ba52d2c50efb62b241f383e9d7bb264558280e564faa0f", + }; + const headersOld = { ...headers, "skynet-skylink": skylinkOld }; + + const skylinkNew = skylink; + const skylinkNewUrl = skylinkUrl; + const dataNew = data; // hex-encoded skylink + const revisionNew = 1; + const entryDataNew = { + data: dataNew, + revision: revisionNew, + signature: + "2a9889915f06d414e8cde51eb17db565410d20b2b50214e8297f7f4a0cb5c77e0edc62a319607dfaa042e0cc16ed0d7e549cca2abd11c2f86a335009936f150d", + }; + const headersNew = { ...headers, "skynet-skylink": skylinkNew }; + + const jsonOld = { message: 1 }; + const jsonNew = { message: 2 }; + const skynetJsonOld = { _data: jsonOld, _v: 2 }; + const skynetJsonNew = { _data: jsonNew, _v: 2 }; + + const concurrentAccessError = "Concurrent access prevented in SkyDB"; + const higherRevisionError = "A higher revision number for this userID and path is already cached"; + + it("should not get old data when getJSON and setJSON are called simultaneously on the same client and getJSON doesn't fail", async () => { + // Create a new client with a fresh revision cache. + const client = new SkynetClient(portalUrl); + + // Mock setJSON with the old skylink. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkOld, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + + // Set the data. + await client.db.setJSON(privateKey, dataKey, jsonOld); + + // Mock getJSON with the new entry data and the new skylink. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataNew)); + mock.onGet(skylinkNewUrl).replyOnce(200, skynetJsonNew, headers); + + // Mock setJSON with the new skylink. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkNew, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + + // Try to invoke the data race. + // Get the data while also calling setJSON. + // + // Use Promise.allSettled to wait for all promises to finish, or some mocked + // requests will hang around and interfere with the later tests. + const settledResults = await Promise.allSettled([ + client.db.getJSON(publicKey, dataKey), + client.db.setJSON(privateKey, dataKey, jsonNew), + ]); + + let data: JsonData | null; + try { + const values = getSettledValues(settledResults); + data = values[0].data; + } catch (e) { + // If any promises were rejected, check the error message. + if ((e as Error).message.includes(concurrentAccessError)) { + // The data race condition was avoided and we received the expected + // error. Return from test early. + return; + } + + throw e; + } + + // Data race did not occur, getJSON should have latest JSON. + expect(data).toEqual(jsonNew); + + // assert our request history contains the expected amount of requests + expect(mock.history.get.length).toBe(2); + expect(mock.history.post.length).toBe(4); + }); + + it("should not get old data when getJSON and setJSON are called simultaneously on different clients and getJSON doesn't fail", async () => { + // Create two new clients with a fresh revision cache. + const client1 = new SkynetClient(portalUrl); + const client2 = new SkynetClient(portalUrl); + + // Mock setJSON with the old skylink. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkOld, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + + // Set the data. + await client1.db.setJSON(privateKey, dataKey, jsonOld); + + // Mock getJSON with the new entry data and the new skylink. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataNew)); + mock.onGet(skylinkNewUrl).replyOnce(200, skynetJsonNew, headersNew); + + // Mock setJSON with the new skylink. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkNew, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + + // Try to invoke the data race. + // Get the data while also calling setJSON. + // + // Use Promise.allSettled to wait for all promises to finish, or some mocked requests will hang around and interfere with the later tests. + const settledResults = await Promise.allSettled([ + client1.db.getJSON(publicKey, dataKey), + client2.db.setJSON(privateKey, dataKey, jsonNew), + ]); + + let data: JsonData | null; + try { + const values = getSettledValues(settledResults); + data = values[0].data; + } catch (e) { + // If any promises were rejected, check the error message. + if ((e as Error).message.includes(higherRevisionError)) { + // The data race condition was avoided and we received the expected + // error. Return from test early. + return; + } + + throw e; + } + + // Data race did not occur, getJSON should have latest JSON. + expect(data).toEqual(jsonNew); + + // assert our request history contains the expected amount of requests. + expect(mock.history.get.length).toBe(2); + expect(mock.history.post.length).toBe(4); + }); + + it("should not mess up cache when two setJSON calls are made simultaneously and one fails", async () => { + // Create a new client with a fresh revision cache. + const client = new SkynetClient(portalUrl); + + // Mock a successful setJSON. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkOld, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + + // Use Promise.allSettled to wait for all promises to finish, or some mocked + // requests will hang around and interfere with the later tests. + const values = await Promise.allSettled([ + client.db.setJSON(privateKey, dataKey, jsonOld), + client.db.setJSON(privateKey, dataKey, jsonOld), + ]); + + try { + getSettledValues(values); + } catch (e) { + if ((e as Error).message.includes(concurrentAccessError)) { + // The data race condition was avoided and we received the expected + // error. Return from test early. + return; + } + + throw e; + } + + const cachedRevisionEntry = await client.db.revisionNumberCache.getRevisionAndMutexForEntry(publicKey, dataKey); + expect(cachedRevisionEntry.revision.toString()).toEqual("0"); + + // Make a getJSON call. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataOld)); + mock.onGet(skylinkOldUrl).replyOnce(200, skynetJsonOld, headersOld); + const { data: receivedJson1 } = await client.db.getJSON(publicKey, dataKey); + + expect(receivedJson1).toEqual(jsonOld); + + // Make another setJSON call - it should still work. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkNew, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + await client.db.setJSON(privateKey, dataKey, jsonNew); + + expect(cachedRevisionEntry.revision.toString()).toEqual("1"); + + // Make a getJSON call. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataNew)); + mock.onGet(skylinkNewUrl).replyOnce(200, skynetJsonNew, headersNew); + const { data: receivedJson2 } = await client.db.getJSON(publicKey, dataKey); + + expect(receivedJson2).toEqual(jsonNew); + + expect(mock.history.get.length).toBe(4); + expect(mock.history.post.length).toBe(4); + }); + + it("should not mess up cache when two setJSON calls are made simultaneously on different clients and one fails", async () => { + // Create two new clients with a fresh revision cache. + const client1 = new SkynetClient(portalUrl); + const client2 = new SkynetClient(portalUrl); + + // Run two simultaneous setJSONs on two different clients - one should work, + // one should fail due to bad revision number. + + // Mock a successful setJSON. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkOld, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + // Mock a failed setJSON (bad revision number). + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkOld, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(400); + + // Use Promise.allSettled to wait for all promises to finish, or some mocked + // requests will hang around and interfere with the later tests. + const values = await Promise.allSettled([ + client1.db.setJSON(privateKey, dataKey, jsonOld), + client2.db.setJSON(privateKey, dataKey, jsonOld), + ]); + + let successClient; + let failClient; + if (values[0].status === "rejected") { + successClient = client2; + failClient = client1; + } else { + successClient = client1; + failClient = client2; + } + + // Test that the client that succeeded has a consistent cache. + + const cachedRevisionEntrySuccess = await successClient.db.revisionNumberCache.getRevisionAndMutexForEntry( + publicKey, + dataKey + ); + expect(cachedRevisionEntrySuccess.revision.toString()).toEqual("0"); + + // Make a getJSON call. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataOld)); + mock.onGet(skylinkOldUrl).replyOnce(200, skynetJsonOld, headersOld); + const { data: receivedJson1 } = await successClient.db.getJSON(publicKey, dataKey); + + expect(receivedJson1).toEqual(jsonOld); + + // Make another setJSON call - it should still work. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkNew, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + await successClient.db.setJSON(privateKey, dataKey, jsonNew); + + expect(cachedRevisionEntrySuccess.revision.toString()).toEqual("1"); + + // Make a getJSON call. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataNew)); + mock.onGet(skylinkNewUrl).replyOnce(200, skynetJsonNew, headersNew); + const { data: receivedJson2 } = await successClient.db.getJSON(publicKey, dataKey); + + expect(receivedJson2).toEqual(jsonNew); + + // Test that the client that failed has a consistent cache. + + const cachedRevisionEntryFail = await failClient.db.revisionNumberCache.getRevisionAndMutexForEntry( + publicKey, + dataKey + ); + expect(cachedRevisionEntryFail.revision.toString()).toEqual("-1"); + + // Make a getJSON call. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataOld)); + mock.onGet(skylinkOldUrl).replyOnce(200, skynetJsonOld, headersOld); + const { data: receivedJsonFail1 } = await failClient.db.getJSON(publicKey, dataKey); + + expect(receivedJsonFail1).toEqual(jsonOld); + + // Make another setJSON call - it should still work. + mock.onPost(uploadUrl).replyOnce(200, { skylink: skylinkNew, merkleroot, bitfield }); + mock.onPost(registryPostUrl).replyOnce(204); + await failClient.db.setJSON(privateKey, dataKey, jsonNew); + + expect(cachedRevisionEntrySuccess.revision.toString()).toEqual("1"); + + // Make a getJSON call. + mock.onGet(registryGetUrl).replyOnce(200, JSON.stringify(entryDataNew)); + mock.onGet(skylinkNewUrl).replyOnce(200, skynetJsonNew, headersNew); + const { data: receivedJsonFail2 } = await failClient.db.getJSON(publicKey, dataKey); + + expect(receivedJsonFail2).toEqual(jsonNew); + + // Check final request counts. + + expect(mock.history.get.length).toBe(8); + expect(mock.history.post.length).toBe(8); + }); +}); diff --git a/src/skydb.ts b/src/skydb.ts index 596bf8d6..213b4442 100644 --- a/src/skydb.ts +++ b/src/skydb.ts @@ -7,10 +7,10 @@ import { DEFAULT_SET_ENTRY_OPTIONS, CustomGetEntryOptions, RegistryEntry, - SignedRegistryEntry, CustomSetEntryOptions, validatePublicKey, } from "./registry"; +import { CachedRevisionNumber } from "./revision_cache"; import { BASE64_ENCODED_SKYLINK_SIZE, decodeSkylink, EMPTY_SKYLINK, RAW_SKYLINK_SIZE } from "./skylink/sia"; import { MAX_REVISION } from "./utils/number"; import { URI_SKYNET_PREFIX } from "./utils/url"; @@ -22,7 +22,7 @@ import { uint8ArrayToStringUtf8, } from "./utils/string"; import { formatSkylink } from "./skylink/format"; -import { DEFAULT_UPLOAD_OPTIONS, CustomUploadOptions, UploadRequestResponse } from "./upload"; +import { DEFAULT_UPLOAD_OPTIONS, CustomUploadOptions } from "./upload"; import { areEqualUint8Arrays } from "./utils/array"; import { decodeSkylinkBase64, encodeSkylinkBase64 } from "./utils/encoding"; import { DEFAULT_BASE_OPTIONS, extractOptions } from "./utils/options"; @@ -40,7 +40,7 @@ import { import { ResponseType } from "axios"; import { EntryData, MAX_ENTRY_LENGTH } from "./mysky"; -export type JsonFullData = { +type SkynetJson = { _data: JsonData; _v: number; }; @@ -49,6 +49,8 @@ export const DELETION_ENTRY_DATA = new Uint8Array(RAW_SKYLINK_SIZE); const JSON_RESPONSE_VERSION = 2; +const UNCACHED_REVISION_NUMBER = BigInt(-1); + /** * Custom get JSON options. Includes the options for get entry, to get the * skylink; and download, to download the file from the skylink. @@ -121,7 +123,21 @@ export type RawBytesResponse = { // ==== /** - * Gets the JSON object corresponding to the publicKey and dataKey. + * Gets the JSON object corresponding to the publicKey and dataKey. If the data + * was found, we update the cached revision number for the entry. + * + * NOTE: The cached revision number will be updated only if the data is found + * (including deleted data). If there is a 404 or the entry contains deleted + * data, null will be returned. If there is an error, the error is returned + * without updating the cached revision number. + * + * Summary: + * - Data found: update cached revision + * - Parse error: don't update cached revision + * - Network error: don't update cached revision + * - Too low version error: don't update the cached revision + * - 404 (data not found): don't update the cached revision + * - Data deleted: update cached revision * * @param this - SkynetClient * @param publicKey - The user public key. @@ -146,44 +162,59 @@ export async function getJSON( ...customOptions, }; - // Lookup the registry entry. - const getEntryOpts = extractOptions(opts, DEFAULT_GET_ENTRY_OPTIONS); - const entry = await getSkyDBRegistryEntry(this, publicKey, dataKey, getEntryOpts); - if (entry === null) { - return { data: null, dataLink: null }; - } - - // Determine the data link. - // TODO: Can this still be an entry link which hasn't yet resolved to a data link? - const { rawDataLink, dataLink } = parseDataLink(entry.data, true); - - // If a cached data link is provided and the data link hasn't changed, return. - if (checkCachedDataLink(rawDataLink, opts.cachedDataLink)) { - return { data: null, dataLink }; - } - - // Download the data in the returned data link. - const downloadOpts = extractOptions(opts, DEFAULT_DOWNLOAD_OPTIONS); - const { data } = await this.getFileContent(dataLink, downloadOpts); - - if (typeof data !== "object" || data === null) { - throw new Error(`File data for the entry at data key '${dataKey}' is not JSON.`); - } - - if (!(data["_data"] && data["_v"])) { - // Legacy data prior to skynet-js v4, return as-is. - return { data, dataLink }; - } - - const actualData = data["_data"]; - if (typeof actualData !== "object" || data === null) { - throw new Error(`File data '_data' for the entry at data key '${dataKey}' is not JSON.`); - } - return { data: actualData as JsonData, dataLink }; + // Immediately fail if the mutex is not available. + return await this.db.revisionNumberCache.withCachedEntryLock(publicKey, dataKey, async (cachedRevisionEntry) => { + // Lookup the registry entry. + const getEntryOpts = extractOptions(opts, DEFAULT_GET_ENTRY_OPTIONS); + const entry: RegistryEntry | null = await getSkyDBRegistryEntryAndUpdateCache( + this, + publicKey, + dataKey, + cachedRevisionEntry, + getEntryOpts + ); + if (entry === null) { + return { data: null, dataLink: null }; + } + + // Determine the data link. + // TODO: Can this still be an entry link which hasn't yet resolved to a data link? + const { rawDataLink, dataLink } = parseDataLink(entry.data, true); + + // If a cached data link is provided and the data link hasn't changed, return. + if (checkCachedDataLink(rawDataLink, opts.cachedDataLink)) { + return { data: null, dataLink }; + } + + // Download the data in the returned data link. + const downloadOpts = extractOptions(opts, DEFAULT_DOWNLOAD_OPTIONS); + const { data }: { data: JsonData | SkynetJson } = await this.getFileContent(dataLink, downloadOpts); + + // Validate that the returned data is JSON. + if (typeof data !== "object" || data === null) { + throw new Error(`File data for the entry at data key '${dataKey}' is not JSON.`); + } + + if (!(data["_data"] && data["_v"])) { + // Legacy data prior to skynet-js v4, return as-is. + return { data, dataLink }; + } + + // Extract the JSON from the returned SkynetJson. + const actualData = data["_data"]; + if (typeof actualData !== "object" || data === null) { + throw new Error(`File data '_data' for the entry at data key '${dataKey}' is not JSON.`); + } + return { data: actualData as JsonData, dataLink }; + }); } /** - * Sets a JSON object at the registry entry corresponding to the publicKey and dataKey. + * Sets a JSON object at the registry entry corresponding to the publicKey and + * dataKey. + * + * This will use the entry revision number from the cache, so getJSON must + * always be called first for existing entries. * * @param this - SkynetClient * @param privateKey - The user private key. @@ -212,18 +243,32 @@ export async function setJSON( }; const { publicKey: publicKeyArray } = sign.keyPair.fromSecretKey(hexToUint8Array(privateKey)); + const publicKey = toHexString(publicKeyArray); + + // Immediately fail if the mutex is not available. + return await this.db.revisionNumberCache.withCachedEntryLock(publicKey, dataKey, async (cachedRevisionEntry) => { + // Get the cached revision number before doing anything else. Increment it. + const newRevision = incrementRevision(cachedRevisionEntry.revision); + + const [entry, dataLink] = await getOrCreateSkyDBRegistryEntry(this, dataKey, json, newRevision, opts); - const [entry, dataLink] = await getOrCreateRegistryEntry(this, toHexString(publicKeyArray), dataKey, json, opts); + // Update the registry. + const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); + await this.registry.setEntry(privateKey, entry, setEntryOpts); - // Update the registry. - const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); - await this.registry.setEntry(privateKey, entry, setEntryOpts); + // Update the cached revision number. + cachedRevisionEntry.revision = newRevision; - return { data: json, dataLink: formatSkylink(dataLink) }; + return { data: json, dataLink: formatSkylink(dataLink) }; + }); } /** - * Deletes a JSON object at the registry entry corresponding to the publicKey and dataKey. + * Deletes a JSON object at the registry entry corresponding to the publicKey + * and dataKey. + * + * This will use the entry revision number from the cache, so getJSON must + * always be called first. * * @param this - SkynetClient * @param privateKey - The user private key. @@ -280,6 +325,9 @@ export async function setDataLink( /** * Gets the raw registry entry data at the given public key and data key. * + * If the data was found, we update the cached revision number for the entry. + * See getJSON for behavior in other cases. + * * @param this - SkynetClient * @param publicKey - The user public key. * @param dataKey - The data key. @@ -302,16 +350,22 @@ export async function getEntryData( ...customOptions, }; - const entry = await getSkyDBRegistryEntry(this, publicKey, dataKey, opts); - if (entry === null) { - return { data: null }; - } - return { data: entry.data }; + // Immediately fail if the mutex is not available. + return await this.db.revisionNumberCache.withCachedEntryLock(publicKey, dataKey, async (cachedRevisionEntry) => { + const entry = await getSkyDBRegistryEntryAndUpdateCache(this, publicKey, dataKey, cachedRevisionEntry, opts); + if (entry === null) { + return { data: null }; + } + return { data: entry.data }; + }); } /** * Sets the raw entry data at the given private key and data key. * + * This will use the entry revision number from the cache, so getEntryData must + * always be called first for existing entries. + * * @param this - SkynetClient * @param privateKey - The user private key. * @param dataKey - The data key. @@ -341,20 +395,32 @@ export async function setEntryData( validateEntryData(data, opts.allowDeletionEntryData); const { publicKey: publicKeyArray } = sign.keyPair.fromSecretKey(hexToUint8Array(privateKey)); + const publicKey = toHexString(publicKeyArray); + + // Immediately fail if the mutex is not available. + return await this.db.revisionNumberCache.withCachedEntryLock(publicKey, dataKey, async (cachedRevisionEntry) => { + // Get the cached revision number. + const newRevision = incrementRevision(cachedRevisionEntry.revision); - const getEntryOpts = extractOptions(opts, DEFAULT_GET_ENTRY_OPTIONS); - const entry = await getNextRegistryEntry(this, toHexString(publicKeyArray), dataKey, data, getEntryOpts); + const entry = { dataKey, data, revision: newRevision }; - const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); - await this.registry.setEntry(privateKey, entry, setEntryOpts); + const setEntryOpts = extractOptions(opts, DEFAULT_SET_ENTRY_OPTIONS); + await this.registry.setEntry(privateKey, entry, setEntryOpts); - return { data: entry.data }; + // Update the cached revision number. + cachedRevisionEntry.revision = newRevision; + + return { data: entry.data }; + }); } /** * Deletes the entry data at the given private key and data key. Trying to * access the data again with e.g. getEntryData will result in null. * + * This will use the entry revision number from the cache, so getEntryData must + * always be called first. + * * @param this - SkynetClient * @param privateKey - The user private key. * @param dataKey - The data key. @@ -382,6 +448,9 @@ export async function deleteEntryData( /** * Gets the raw bytes corresponding to the publicKey and dataKey. The caller is responsible for setting any metadata in the bytes. * + * If the data was found, we update the cached revision number for the entry. + * See getJSON for behavior in other cases. + * * @param this - SkynetClient * @param publicKey - The user public key. * @param dataKey - The key of the data to fetch for the given user. @@ -406,92 +475,39 @@ export async function getRawBytes( ...customOptions, }; - // Lookup the registry entry. - const getEntryOpts = extractOptions(opts, DEFAULT_GET_ENTRY_OPTIONS); - const entry = await getSkyDBRegistryEntry(this, publicKey, dataKey, getEntryOpts); - if (entry === null) { - return { data: null, dataLink: null }; - } - - // Determine the data link. - // TODO: Can this still be an entry link which hasn't yet resolved to a data link? - const { rawDataLink, dataLink } = parseDataLink(entry.data, false); - - // If a cached data link is provided and the data link hasn't changed, return. - if (checkCachedDataLink(rawDataLink, opts.cachedDataLink)) { - return { data: null, dataLink }; - } - - // Download the data in the returned data link. - const downloadOpts = { - ...extractOptions(opts, DEFAULT_DOWNLOAD_OPTIONS), - responseType: "arraybuffer" as ResponseType, - }; - const { data: buffer } = await this.getFileContent(dataLink, downloadOpts); - - return { data: new Uint8Array(buffer), dataLink }; -} - -/* istanbul ignore next */ -/** - * Gets the registry entry for the given raw bytes or creates the entry if it doesn't exist. - * - * @param client - The Skynet client. - * @param publicKey - The user public key. - * @param dataKey - The dat akey. - * @param data - The raw byte data to set. - * @param [customOptions] - Additional settings that can optionally be set. - * @returns - The registry entry and corresponding data link. - * @throws - Will throw if the revision is already the maximum value. - */ -// TODO: Rename & refactor after the SkyDB caching refactor. -export async function getOrCreateRawBytesRegistryEntry( - client: SkynetClient, - publicKey: string, - dataKey: string, - data: Uint8Array, - customOptions?: CustomSetJSONOptions -): Promise { - // Not publicly available, don't validate input. - - const opts = { - ...DEFAULT_SET_JSON_OPTIONS, - ...client.customOptions, - ...customOptions, - }; - - // Create the data to upload to acquire its skylink. - let dataKeyHex = dataKey; - if (!opts.hashedDataKeyHex) { - dataKeyHex = toHexString(stringToUint8ArrayUtf8(dataKey)); - } - const file = new File([data], `dk:${dataKeyHex}`, { type: "application/octet-stream" }); - - // Start file upload, do not block. - const uploadOpts = extractOptions(opts, DEFAULT_UPLOAD_OPTIONS); - const skyfilePromise: Promise = client.uploadFile(file, uploadOpts); - - // Fetch the current value to find out the revision. - // - // Start getEntry, do not block. - const getEntryOpts = extractOptions(opts, DEFAULT_GET_ENTRY_OPTIONS); - const entryPromise: Promise = client.registry.getEntry(publicKey, dataKey, getEntryOpts); - - // Block until both getEntry and uploadFile are finished. - const [signedEntry, skyfile] = await Promise.all([entryPromise, skyfilePromise]); - - const revision = getNextRevisionFromEntry(signedEntry.entry); - - // Build the registry entry. - const dataLink = trimUriPrefix(skyfile.skylink, URI_SKYNET_PREFIX); - const rawDataLink = decodeSkylinkBase64(dataLink); - validateUint8ArrayLen("rawDataLink", rawDataLink, "skylink byte array", RAW_SKYLINK_SIZE); - const entry: RegistryEntry = { - dataKey, - data: rawDataLink, - revision, - }; - return entry; + // Immediately fail if the mutex is not available. + return await this.db.revisionNumberCache.withCachedEntryLock(publicKey, dataKey, async (cachedRevisionEntry) => { + // Lookup the registry entry. + const getEntryOpts = extractOptions(opts, DEFAULT_GET_ENTRY_OPTIONS); + const entry = await getSkyDBRegistryEntryAndUpdateCache( + this, + publicKey, + dataKey, + cachedRevisionEntry, + getEntryOpts + ); + if (entry === null) { + return { data: null, dataLink: null }; + } + + // Determine the data link. + // TODO: Can this still be an entry link which hasn't yet resolved to a data link? + const { rawDataLink, dataLink } = parseDataLink(entry.data, false); + + // If a cached data link is provided and the data link hasn't changed, return. + if (checkCachedDataLink(rawDataLink, opts.cachedDataLink)) { + return { data: null, dataLink }; + } + + // Download the data in the returned data link. + const downloadOpts = { + ...extractOptions(opts, DEFAULT_DOWNLOAD_OPTIONS), + responseType: "arraybuffer" as ResponseType, + }; + const { data: buffer } = await this.getFileContent(dataLink, downloadOpts); + + return { data: new Uint8Array(buffer), dataLink }; + }); } // ======= @@ -499,62 +515,23 @@ export async function getOrCreateRawBytesRegistryEntry( // ======= /** - * Gets the next entry for the given public key and data key, setting the data to be the given data and the revision number accordingly. - * - * @param client - The Skynet client. - * @param publicKey - The user public key. - * @param dataKey - The dat akey. - * @param data - The data to set. - * @param [customOptions] - Additional settings that can optionally be set. - * @returns - The registry entry and corresponding data link. - * @throws - Will throw if the revision is already the maximum value. - */ -export async function getNextRegistryEntry( - client: SkynetClient, - publicKey: string, - dataKey: string, - data: Uint8Array, - customOptions?: CustomGetEntryOptions -): Promise { - // Not publicly available, don't validate input. - - const opts = { - ...DEFAULT_GET_ENTRY_OPTIONS, - ...client.customOptions, - ...customOptions, - }; - - // Get the latest entry. - // TODO: Can remove this once we start caching the latest revision. - const signedEntry = await client.registry.getEntry(publicKey, dataKey, opts); - const revision = getNextRevisionFromEntry(signedEntry.entry); - - // Build the registry entry. - const entry: RegistryEntry = { - dataKey, - data, - revision, - }; - - return entry; -} - -/** - * Gets the registry entry and data link or creates the entry if it doesn't exist. + * Gets the registry entry and data link or creates the entry if it doesn't + * exist. Uses the cached revision number for the entry, or 0 if the entry has + * not been cached. * * @param client - The Skynet client. - * @param publicKey - The user public key. - * @param dataKey - The dat akey. - * @param json - The JSON to set. + * @param dataKey - The data key. + * @param data - The JSON or raw byte data to set. + * @param revision - The revision number to set. * @param [customOptions] - Additional settings that can optionally be set. * @returns - The registry entry and corresponding data link. * @throws - Will throw if the revision is already the maximum value. */ -export async function getOrCreateRegistryEntry( +export async function getOrCreateSkyDBRegistryEntry( client: SkynetClient, - publicKey: string, dataKey: string, - json: JsonData, + data: JsonData | Uint8Array, + revision: bigint, customOptions?: CustomSetJSONOptions ): Promise<[RegistryEntry, string]> { // Not publicly available, don't validate input. @@ -565,57 +542,49 @@ export async function getOrCreateRegistryEntry( ...customOptions, }; - // Set the hidden _data and _v fields. - const fullData: JsonFullData = { _data: json, _v: JSON_RESPONSE_VERSION }; + let fullData: string | Uint8Array; + if (!(data instanceof Uint8Array)) { + // Set the hidden _data and _v fields. + const skynetJson = buildSkynetJsonObject(data); + fullData = JSON.stringify(skynetJson); + } else { + /* istanbul ignore next - This case is only called by setJSONEncrypted which is not tested in this repo */ + fullData = data; + } // Create the data to upload to acquire its skylink. let dataKeyHex = dataKey; if (!opts.hashedDataKeyHex) { dataKeyHex = toHexString(stringToUint8ArrayUtf8(dataKey)); } - const file = new File([JSON.stringify(fullData)], `dk:${dataKeyHex}`, { type: "application/json" }); + const file = new File([fullData], `dk:${dataKeyHex}`, { type: "application/json" }); - // Start file upload, do not block. + // Do file upload. const uploadOpts = extractOptions(opts, DEFAULT_UPLOAD_OPTIONS); - const skyfilePromise: Promise = client.uploadFile(file, uploadOpts); - - // Fetch the current value to find out the revision. - // - // Start getEntry, do not block. - const getEntryOpts = extractOptions(opts, DEFAULT_GET_ENTRY_OPTIONS); - const entryPromise: Promise = client.registry.getEntry(publicKey, dataKey, getEntryOpts); - - // Block until both getEntry and uploadFile are finished. - const [signedEntry, skyfile] = await Promise.all([entryPromise, skyfilePromise]); - - const revision = getNextRevisionFromEntry(signedEntry.entry); + const skyfile = await client.uploadFile(file, uploadOpts); // Build the registry entry. const dataLink = trimUriPrefix(skyfile.skylink, URI_SKYNET_PREFIX); - const data = decodeSkylinkBase64(dataLink); - validateUint8ArrayLen("data", data, "skylink byte array", RAW_SKYLINK_SIZE); + const rawDataLink = decodeSkylinkBase64(dataLink); + validateUint8ArrayLen("rawDataLink", rawDataLink, "skylink byte array", RAW_SKYLINK_SIZE); const entry: RegistryEntry = { dataKey, - data, + data: rawDataLink, revision, }; return [entry, formatSkylink(dataLink)]; } /** - * Gets the next revision from a returned entry (or 0 if the entry was not found). + * Increments the given revision number and checks to make sure it is not + * greater than the maximum revision. * - * @param entry - The returned registry entry. - * @returns - The revision. - * @throws - Will throw if the next revision would be beyond the maximum allowed value. + * @param revision - The given revision number. + * @returns - The incremented revision number. + * @throws - Will throw if the incremented revision number is greater than the maximum revision. */ -export function getNextRevisionFromEntry(entry: RegistryEntry | null): bigint { - let revision: bigint; - if (entry === null) { - revision = BigInt(0); - } else { - revision = entry.revision + BigInt(1); - } +export function incrementRevision(revision: bigint): bigint { + revision = revision + BigInt(1); // Throw if the revision is already the maximum value. if (revision > MAX_REVISION) { @@ -668,27 +637,69 @@ export function validateEntryData(data: Uint8Array, allowDeletionEntryData: bool } /** - * Gets the registry entry, returning null if the entry contains an empty skylink (the deletion sentinel). + * Gets the registry entry, returning null if the entry was not found or if it + * contained a sentinel value indicating deletion. + * + * If the data was found, we update the cached revision number for the entry. + * See getJSON for behavior in other cases. * * @param client - The Skynet Client * @param publicKey - The user public key. * @param dataKey - The key of the data to fetch for the given user. + * @param cachedRevisionEntry - The cached revision entry object containing the revision number and the mutex. * @param opts - Additional settings. * @returns - The registry entry, or null if not found or deleted. */ -async function getSkyDBRegistryEntry( +async function getSkyDBRegistryEntryAndUpdateCache( client: SkynetClient, publicKey: string, dataKey: string, + cachedRevisionEntry: CachedRevisionNumber, opts: CustomGetEntryOptions ): Promise { + // If this throws due to a parse error or network error, exit early and do not + // update the cached revision number. const { entry } = await client.registry.getEntry(publicKey, dataKey, opts); - if (entry === null || areEqualUint8Arrays(entry.data, EMPTY_SKYLINK)) { + + // Don't update the cached revision number if the data was not found (404). Return null. + if (entry === null) { + return null; + } + + // Calculate the new revision. + const newRevision = entry?.revision ?? UNCACHED_REVISION_NUMBER + BigInt(1); + + // Don't update the cached revision number if the received version is too low. + // Throw error. + const cachedRevision = cachedRevisionEntry.revision; + if (cachedRevision && cachedRevision > newRevision) { + throw new Error( + "Returned revision number too low. A higher revision number for this userID and path is already cached" + ); + } + + // Update the cached revision. + cachedRevisionEntry.revision = newRevision; + + // Return null if the entry contained a sentinel value indicating deletion. + // We do this after updating the revision number cache. + if (wasRegistryEntryDeleted(entry)) { return null; } + return entry; } +/** + * Sets the hidden _data and _v fields on the given raw JSON data. + * + * @param data - The given JSON data. + * @returns - The Skynet JSON data. + */ +function buildSkynetJsonObject(data: JsonData): SkynetJson { + return { _data: data, _v: JSON_RESPONSE_VERSION }; +} + /** * Parses a data link out of the given registry entry data. * @@ -710,3 +721,13 @@ function parseDataLink(data: Uint8Array, legacy: boolean): { rawDataLink: string } return { rawDataLink, dataLink: formatSkylink(rawDataLink) }; } + +/** + * Returns whether the given registry entry indicates a past deletion. + * + * @param entry - The registry entry. + * @returns - Whether the registry entry data indicated a past deletion. + */ +function wasRegistryEntryDeleted(entry: RegistryEntry): boolean { + return areEqualUint8Arrays(entry.data, EMPTY_SKYLINK); +} diff --git a/src/upload.test.ts b/src/upload.test.ts index 9dfee2b9..f5535dad 100644 --- a/src/upload.test.ts +++ b/src/upload.test.ts @@ -5,6 +5,7 @@ import MockAdapter from "axios-mock-adapter"; import { SkynetClient, DEFAULT_SKYNET_PORTAL_URL, URI_SKYNET_PREFIX } from "./index"; import { compareFormData } from "../utils/testing"; +import { splitSizeIntoChunkAlignedParts } from "./upload"; const portalUrl = DEFAULT_SKYNET_PORTAL_URL; const client = new SkynetClient(portalUrl); @@ -256,3 +257,84 @@ describe("uploadDirectory", () => { ); }); }); + +describe("splitSizeIntoChunkAlignedParts", () => { + const mib = 1 << 20; + const sizesAndChunks: Array<[number, number, { start: number; end: number }[]]> = [ + [ + 40 * mib, + 2, + [ + { start: 0, end: 40 * mib }, + { start: 40 * mib, end: 40 * mib }, + ], + ], + [ + 41 * mib, + 2, + [ + { start: 0, end: 40 * mib }, + { start: 40 * mib, end: 41 * mib }, + ], + ], + [ + 80 * mib, + 2, + [ + { start: 0, end: 40 * mib }, + { start: 40 * mib, end: 80 * mib }, + ], + ], + [ + 50 * mib, + 2, + [ + { start: 0, end: 40 * mib }, + { start: 40 * mib, end: 50 * mib }, + ], + ], + [ + 100 * mib, + 2, + [ + { start: 0, end: 40 * mib }, + { start: 40 * mib, end: 100 * mib }, + ], + ], + [ + 50 * mib, + 3, + [ + { start: 0, end: 40 * mib }, + { start: 40 * mib, end: 50 * mib }, + { start: 50 * mib, end: 50 * mib }, + ], + ], + [ + 100 * mib, + 3, + [ + { start: 0, end: 40 * mib }, + { start: 40 * mib, end: 80 * mib }, + { start: 80 * mib, end: 100 * mib }, + ], + ], + [ + 500 * mib, + 6, + [ + { start: 0 * mib, end: 80 * mib }, + { start: 80 * mib, end: 160 * mib }, + { start: 160 * mib, end: 240 * mib }, + { start: 240 * mib, end: 320 * mib }, + { start: 320 * mib, end: 400 * mib }, + { start: 400 * mib, end: 500 * mib }, + ], + ], + ]; + + it.each(sizesAndChunks)("Should align size '%s' with '%s' parts", (totalSize, partCount, expectedParts) => { + const parts = splitSizeIntoChunkAlignedParts(totalSize, partCount); + expect(parts).toEqual(expectedParts); + }); +}); diff --git a/src/upload.ts b/src/upload.ts index d6ce2865..c1c96492 100644 --- a/src/upload.ts +++ b/src/upload.ts @@ -1,12 +1,13 @@ import { AxiosResponse } from "axios"; -import { HttpRequest, Upload } from "tus-js-client"; +import { DetailedError, HttpRequest, Upload } from "@skynetlabs/tus-js-client"; import { getFileMimeType } from "./utils/file"; import { BaseCustomOptions, DEFAULT_BASE_OPTIONS } from "./utils/options"; import { formatSkylink } from "./skylink/format"; -import { buildRequestHeaders, buildRequestUrl, SkynetClient } from "./client"; +import { SkynetClient } from "./client"; import { JsonData } from "./utils/types"; import { throwValidationError, validateObject, validateOptionalObject, validateString } from "./utils/validation"; +import { buildRequestHeaders, buildRequestUrl } from "./request"; /** * The tus chunk size is (4MiB - encryptionOverhead) * dataPieces, set in skyd. @@ -14,9 +15,10 @@ import { throwValidationError, validateObject, validateOptionalObject, validateS const TUS_CHUNK_SIZE = (1 << 22) * 10; /** - * A number indicating how many parts should be uploaded in parallel. + * A number indicating how many parts should be uploaded in parallel, by + * default. */ -const TUS_PARALLEL_UPLOADS = 1; +const TUS_PARALLEL_UPLOADS = 2; /** * The retry delays, in ms. Data is stored in skyd for up to 20 minutes, so the @@ -41,6 +43,7 @@ const PORTAL_DIRECTORY_FILE_FIELD_NAME = "files[]"; * @property [customFilename] - The custom filename to use when uploading files. * @property [largeFileSize=41943040] - The size at which files are considered "large" and will be uploaded using the tus resumable upload protocol. This is the size of one chunk by default (40 mib). * @property [errorPages] - Defines a mapping of error codes and subfiles which are to be served in case we are serving the respective error code. All subfiles referred like this must be defined with absolute paths and must exist. + * @property [numParallelUploads=2] - Used to override the default number of parallel uploads. Disable parallel uploads by setting to 1. Note that each parallel upload must be chunk-aligned so the number of parallel uploads may be limited if some parts would end up empty. * @property [retryDelays=[0, 5_000, 15_000, 60_000, 300_000, 600_000]] - An array or undefined, indicating how many milliseconds should pass before the next attempt to uploading will be started after the transfer has been interrupted. The array's length indicates the maximum number of attempts. * @property [tryFiles] - Allows us to set a list of potential subfiles to return in case the requested one does not exist or is a directory. Those subfiles might be listed with relative or absolute paths. If the path is absolute the file must exist. */ @@ -51,6 +54,7 @@ export type CustomUploadOptions = BaseCustomOptions & { customFilename?: string; errorPages?: JsonData; largeFileSize?: number; + numParallelUploads?: number; retryDelays?: number[]; tryFiles?: string[]; }; @@ -73,6 +77,7 @@ export const DEFAULT_UPLOAD_OPTIONS = { customFilename: "", errorPages: undefined, largeFileSize: TUS_CHUNK_SIZE, + numParallelUploads: TUS_PARALLEL_UPLOADS, retryDelays: DEFAULT_TUS_RETRY_DELAYS, tryFiles: undefined, }; @@ -245,9 +250,24 @@ export async function uploadLargeFileRequest( method: "options", }); + // If concatenation is enabled, set the number of parallel uploads as well as + // the part-split function. Note that each part has to be chunk-aligned, so we + // may limit the number of parallel uploads. let parallelUploads = 1; + let splitSizeIntoParts: + | ((totalSize: number, partCount: number) => Array<{ start: number; end: number }>) + | undefined = undefined; if (resp.headers["tus-extension"]?.includes("concatenation")) { - parallelUploads = TUS_PARALLEL_UPLOADS; + // Use a user-provided value, if given. + parallelUploads = opts.numParallelUploads; + // Limit the number of parallel uploads if some parts would end up empty, + // e.g. 50mib would be split into 1 chunk-aligned part, one unaligned part, + // and one empty part. + if (parallelUploads > Math.ceil(file.size / TUS_CHUNK_SIZE)) { + parallelUploads = Math.ceil(file.size / TUS_CHUNK_SIZE); + } + // Set the part-split function. + splitSizeIntoParts = splitSizeIntoChunkAlignedParts; } return new Promise((resolve, reject) => { @@ -260,16 +280,16 @@ export async function uploadLargeFileRequest( filetype: file.type, }, parallelUploads, + splitSizeIntoParts, headers, onProgress, onBeforeRequest: function (req: HttpRequest) { const xhr = req.getUnderlyingObject(); xhr.withCredentials = true; }, - onError: (error: Error) => { + onError: (error: Error | DetailedError) => { // Return error body rather than entire error. - // @ts-expect-error tus-client-js Error is not typed correctly. - const res = error.originalResponse; + const res = (error as DetailedError).originalResponse; const newError = res ? new Error(res.getBody().trim()) || error : error; reject(newError); }, @@ -286,7 +306,7 @@ export async function uploadLargeFileRequest( url: upload.url, endpointPath: opts.endpointLargeUpload, method: "head", - headers: { ...headers, "Tus-Resumable": "1.0.0" }, + headers: { ...headers, "tus-resumable": "1.0.0" }, }); resolve(resp); } catch (err) { @@ -377,6 +397,48 @@ export async function uploadDirectoryRequest( return response; } +/** + * Splits the size into the number of parts, aligning all but the last part on + * chunk boundaries. Called if parallel uploads are used. + * + * @param totalSize - The total size of the upload. + * @param partCount - The number of parts (equal to the value of `parallelUploads` used). + * @returns - An array of parts with start and end boundaries. + */ +export function splitSizeIntoChunkAlignedParts( + totalSize: number, + partCount: number +): Array<{ start: number; end: number }> { + const partSizes = new Array(partCount).fill(0); + // The leftover size that must go into the last part. + const leftover = totalSize % TUS_CHUNK_SIZE; + + // Assign chunks to parts in order, looping back to the beginning if we get to + // the end of the parts array. + let lastPart = 0; + for (let i = 0; i < Math.floor(totalSize / TUS_CHUNK_SIZE); i++) { + partSizes[i % partCount] += TUS_CHUNK_SIZE; + if (i > lastPart) lastPart = i; + } + + // Assign the leftover to the part after the last part that was visited, or + // the last part in the array if all parts were used. + partSizes[Math.min(lastPart + 1, partCount - 1)] += leftover; + + // Convert sizes into parts. + const parts = []; + let lastBoundary = 0; + for (let i = 0; i < partCount; i++) { + parts.push({ + start: lastBoundary, + end: lastBoundary + partSizes[i], + }); + lastBoundary = parts[i].end; + } + + return parts; +} + /** * Sometimes file object might have had the type property defined manually with * Object.defineProperty and some browsers (namely firefox) can have problems diff --git a/src/utils/encoding.ts b/src/utils/encoding.ts index 44ee0d92..08662e25 100644 --- a/src/utils/encoding.ts +++ b/src/utils/encoding.ts @@ -3,7 +3,9 @@ import base32Encode from "base32-encode"; import { fromByteArray, toByteArray } from "base64-js"; import { assertUint64 } from "./number"; +import { BASE32_ENCODED_SKYLINK_SIZE, BASE64_ENCODED_SKYLINK_SIZE } from "../skylink/sia"; import { stringToUint8ArrayUtf8 } from "./string"; +import { validateStringLen } from "./validation"; const BASE32_ENCODING_VARIANT = "RFC4648-HEX"; @@ -14,6 +16,7 @@ const BASE32_ENCODING_VARIANT = "RFC4648-HEX"; * @returns - The decoded bytes. */ export function decodeSkylinkBase32(skylink: string): Uint8Array { + validateStringLen("skylink", skylink, "parameter", BASE32_ENCODED_SKYLINK_SIZE); skylink = skylink.toUpperCase(); const bytes = base32Decode(skylink, BASE32_ENCODING_VARIANT); return new Uint8Array(bytes); @@ -36,6 +39,7 @@ export function encodeSkylinkBase32(bytes: Uint8Array): string { * @returns - The decoded bytes. */ export function decodeSkylinkBase64(skylink: string): Uint8Array { + validateStringLen("skylink", skylink, "parameter", BASE64_ENCODED_SKYLINK_SIZE); // Add padding. skylink = `${skylink}==`; // Convert from URL encoding. diff --git a/src/utils/file.ts b/src/utils/file.ts index b6583cce..f4b532b3 100644 --- a/src/utils/file.ts +++ b/src/utils/file.ts @@ -1,5 +1,6 @@ import mime from "mime/lite"; import path from "path-browserify"; + import { trimPrefix } from "./string"; /** diff --git a/src/utils/url.test.ts b/src/utils/url.test.ts index 85d1a57a..f3da2435 100644 --- a/src/utils/url.test.ts +++ b/src/utils/url.test.ts @@ -1,6 +1,7 @@ import { composeTestCases, combineStrings } from "../../utils/testing"; import { trimPrefix, trimSuffix } from "./string"; import { + addUrlSubdomain, addUrlQuery, DEFAULT_SKYNET_PORTAL_URL, getFullDomainUrlForPortal, @@ -12,14 +13,34 @@ const portalUrl = DEFAULT_SKYNET_PORTAL_URL; const skylink = "XABvi7JtJbQSMAcDwnUnmp2FKDPjg8_tTTFP4BwMSxVdEg"; const skylinkBase32 = "bg06v2tidkir84hg0s1s4t97jaeoaa1jse1svrad657u070c9calq4g"; +describe("addUrlSubdomain", () => { + const parts: Array<[string, string, string]> = [ + [portalUrl, "test", `https://test.siasky.net`], + [`${portalUrl}/`, "test", `https://test.siasky.net`], + [portalUrl, "foo.bar", `https://foo.bar.siasky.net`], + [`${portalUrl}/path`, "test", `https://test.siasky.net/path`], + [`${portalUrl}/path/`, "test", `https://test.siasky.net/path`], + [`${portalUrl}?foo=bar`, "test", `https://test.siasky.net/?foo=bar`], + [`${portalUrl}#foobar`, "test", `https://test.siasky.net/#foobar`], + ]; + + it.each(parts)( + "Should call addUrlSubdomain with URL %s and parameters %s and form URL %s", + (inputUrl, subdomain, expectedUrl) => { + const url = addUrlSubdomain(inputUrl, subdomain); + expect(url).toEqual(expectedUrl); + } + ); +}); + describe("addUrlQuery", () => { const parts: Array<[string, { [key: string]: string | undefined }, string]> = [ [portalUrl, { filename: "test" }, `${portalUrl}/?filename=test`], + [`${portalUrl}/`, { attachment: "true" }, `${portalUrl}/?attachment=true`], [portalUrl, { attachment: "true" }, `${portalUrl}/?attachment=true`], [`${portalUrl}/path`, { download: "true" }, `${portalUrl}/path?download=true`], [`${portalUrl}/path/`, { download: "true" }, `${portalUrl}/path/?download=true`], [`${portalUrl}/skynet/`, { foo: "1", bar: "2" }, `${portalUrl}/skynet/?foo=1&bar=2`], - [`${portalUrl}/`, { attachment: "true" }, `${portalUrl}/?attachment=true`], [`${portalUrl}?foo=bar`, { attachment: "true" }, `${portalUrl}/?foo=bar&attachment=true`], [`${portalUrl}/?attachment=true`, { foo: "bar" }, `${portalUrl}/?attachment=true&foo=bar`], [`${portalUrl}#foobar`, { foo: "bar" }, `${portalUrl}/?foo=bar#foobar`], diff --git a/src/utils/url.ts b/src/utils/url.ts index 2ef1eebe..97827c1d 100644 --- a/src/utils/url.ts +++ b/src/utils/url.ts @@ -71,7 +71,7 @@ export function addPath(url: string, path: string): string { * @param subdomain - The subdomain to add. * @returns - The final URL. */ -export function addSubdomain(url: string, subdomain: string): string { +export function addUrlSubdomain(url: string, subdomain: string): string { const urlObj = new URL(url); urlObj.hostname = `${subdomain}.${urlObj.hostname}`; const str = urlObj.toString(); @@ -153,7 +153,7 @@ export function getFullDomainUrlForPortal(portalUrl: string, domain: string): st // Special handling for localhost. url = "localhost"; } else { - url = addSubdomain(portalUrl, domain); + url = addUrlSubdomain(portalUrl, domain); } // Add back the path if there was one. if (path) { diff --git a/tsconfig.json b/tsconfig.json index a7b408a9..7981b1ab 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,6 +1,7 @@ { "compilerOptions": { "downlevelIteration": true, + "lib": ["dom", "es2020"], "noEmit": true, "esModuleInterop": true, "strict": true, diff --git a/utils/testing.ts b/utils/testing.ts index 506ad520..dae8c7e2 100644 --- a/utils/testing.ts +++ b/utils/testing.ts @@ -97,6 +97,28 @@ export function extractNonSkylinkPath(url: string, skylink: string): string { return path; } +/** + * Gets the settled values from `Promise.allSettled`. Throws if an error is + * found. Returns all settled values if no errors were found. + * + * @param values - The settled values. + * @returns - The settled value if no errors were found. + * @throws - Will throw if an unexpected error occurred. + */ +export function getSettledValues(values: PromiseSettledResult[]): T[] { + const receivedValues = []; + + for (const value of values) { + if (value.status === "rejected") { + throw value.reason; + } else if (value.value) { + receivedValues.push(value.value); + } + } + + return receivedValues; +} + /** * Generates a random Unicode string using the code points between 0 and 65536. * diff --git a/yarn.lock b/yarn.lock index 6fa057b2..cd569412 100644 --- a/yarn.lock +++ b/yarn.lock @@ -586,6 +586,19 @@ dependencies: "@sinonjs/commons" "^1.7.0" +"@skynetlabs/tus-js-client@^2.3.0": + version "2.3.0" + resolved "https://registry.yarnpkg.com/@skynetlabs/tus-js-client/-/tus-js-client-2.3.0.tgz#a14fd4197e2bc4ce8be724967a0e4c17d937cb64" + integrity sha512-piGvPlJh+Bu3Qf08bDlc/TnFLXE81KnFoPgvnsddNwTSLyyspxPFxJmHO5ki6SYyOl3HmUtGPoix+r2M2UpFEA== + dependencies: + buffer-from "^0.1.1" + combine-errors "^3.0.3" + is-stream "^2.0.0" + js-base64 "^2.6.1" + lodash.throttle "^4.1.1" + proper-lockfile "^2.0.1" + url-parse "^1.4.3" + "@tsconfig/node10@^1.0.7": version "1.0.7" resolved "https://registry.yarnpkg.com/@tsconfig/node10/-/node10-1.0.7.tgz#1eb1de36c73478a2479cc661ef5af1c16d86d606" @@ -1011,6 +1024,13 @@ astral-regex@^2.0.0: resolved "https://registry.yarnpkg.com/astral-regex/-/astral-regex-2.0.0.tgz#483143c567aeed4785759c0865786dc77d7d2e31" integrity sha512-Z7tMw1ytTXt5jqMcOP+OQteU1VuNK9Y02uuJtKQ1Sv69jXQKKg5cibLwGJow8yzZP+eAc18EmLGPal0bp36rvQ== +async-mutex@^0.3.2: + version "0.3.2" + resolved "https://registry.yarnpkg.com/async-mutex/-/async-mutex-0.3.2.tgz#1485eda5bda1b0ec7c8df1ac2e815757ad1831df" + integrity sha512-HuTK7E7MT7jZEh1P9GtRW9+aTWiDWWi9InbZ5hjxrnRa39KS4BW04+xLBhYNS2aXhHUIKZSw3gj4Pn1pj+qGAA== + dependencies: + tslib "^2.3.1" + asynckit@^0.4.0: version "0.4.0" resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79" @@ -1721,9 +1741,9 @@ escodegen@^1.14.1: source-map "~0.6.1" eslint-plugin-jsdoc@^37.0.3: - version "37.2.0" - resolved "https://registry.yarnpkg.com/eslint-plugin-jsdoc/-/eslint-plugin-jsdoc-37.2.0.tgz#233690a37fa2e2abbe1f4f28ab676641dc8b3710" - integrity sha512-ca7s/DD1mMObZQ2Y0n0DO/KnFV+FqCX6ztir8pcSuylg3GGCREIisn36P/0cRySuWW/7Y7MNCuUDqtKdgLPU7Q== + version "37.2.4" + resolved "https://registry.yarnpkg.com/eslint-plugin-jsdoc/-/eslint-plugin-jsdoc-37.2.4.tgz#7d6f2a2675cc7de48611d1e9aaa8c1dab1ec5b1e" + integrity sha512-TkB6LMdWlqB/4aeanUVoh2bOl43BMibzokd6+dYQ5yumc1aMx0kJyCgQmc+wbJnc6vahfCJELpfjxXRT3Ay6xA== dependencies: "@es-joy/jsdoccomment" "0.13.0" comment-parser "1.3.0" @@ -4591,6 +4611,11 @@ tslib@^1.8.1: resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00" integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg== +tslib@^2.3.1: + version "2.3.1" + resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.3.1.tgz#e8a335add5ceae51aa261d32a490158ef042ef01" + integrity sha512-77EbyPPpMz+FRFRuAFlWMtmgUWGe9UOG2Z25NqCwiIjRhOf5iKGuzSe5P2w1laq+FkRy4p+PCuVkJSGkzTEKVw== + tslib@~2.1.0: version "2.1.0" resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.1.0.tgz#da60860f1c2ecaa5703ab7d39bc05b6bf988b97a" @@ -4610,19 +4635,6 @@ tunnel-agent@^0.6.0: dependencies: safe-buffer "^5.0.1" -tus-js-client@^2.2.0: - version "2.3.0" - resolved "https://registry.yarnpkg.com/tus-js-client/-/tus-js-client-2.3.0.tgz#5d76145476cea46a4e7c045a0054637cddf8dc39" - integrity sha512-I4cSwm6N5qxqCmBqenvutwSHe9ntf81lLrtf6BmLpG2v4wTl89atCQKqGgqvkodE6Lx+iKIjMbaXmfvStTg01g== - dependencies: - buffer-from "^0.1.1" - combine-errors "^3.0.3" - is-stream "^2.0.0" - js-base64 "^2.6.1" - lodash.throttle "^4.1.1" - proper-lockfile "^2.0.1" - url-parse "^1.4.3" - tweetnacl@^0.14.3, tweetnacl@~0.14.0: version "0.14.5" resolved "https://registry.yarnpkg.com/tweetnacl/-/tweetnacl-0.14.5.tgz#5ae68177f192d4456269d108afa93ff8743f4f64" @@ -4680,9 +4692,9 @@ typedarray-to-buffer@^3.1.5: is-typedarray "^1.0.0" typescript@^4.2.4: - version "4.5.3" - resolved "https://registry.yarnpkg.com/typescript/-/typescript-4.5.3.tgz#afaa858e68c7103317d89eb90c5d8906268d353c" - integrity sha512-eVYaEHALSt+s9LbvgEv4Ef+Tdq7hBiIZgii12xXJnukryt3pMgJf6aKhoCZ3FWQsu6sydEnkg11fYXLzhLBjeQ== + version "4.5.4" + resolved "https://registry.yarnpkg.com/typescript/-/typescript-4.5.4.tgz#a17d3a0263bf5c8723b9c52f43c5084edf13c2e8" + integrity sha512-VgYs2A2QIRuGphtzFV7aQJduJ2gyfTljngLzjpfW9FoYZF6xuw1W0vW9ghCKLfcWrCFxK81CSGRAvS1pn4fIUg== union-value@^1.0.0: version "1.0.1"