Skip to content

Commit

Permalink
feat(cardano-services): implements rabbitmq new interface
Browse files Browse the repository at this point in the history
  • Loading branch information
iccicci authored and rhyslbw committed Jun 28, 2022
1 parent 27aa81d commit a880367
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 82 deletions.
2 changes: 1 addition & 1 deletion packages/cardano-services/src/Program/loadHttpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ const serviceMapFactory = (args: ProgramArgs, logger: Logger, cache: InMemoryCac
logger,
txSubmitProvider:
args.options?.useQueue && args.options?.rabbitmqUrl
? new RabbitMqTxSubmitProvider(args.options.rabbitmqUrl)
? new RabbitMqTxSubmitProvider({ rabbitmqUrl: args.options.rabbitmqUrl })
: ogmiosTxSubmitProvider(urlToConnectionConfig(args.options?.ogmiosUrl))
}),
[ServiceNames.Rewards]: async () => {
Expand Down
2 changes: 2 additions & 0 deletions packages/cardano-services/src/ProgramsCommon/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ export const OGMIOS_URL_DEFAULT = (() => {
})();

export const RABBITMQ_URL_DEFAULT = 'amqp://localhost:5672';

export const USE_QUEUE_DEFAULT = false;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { CustomError } from 'ts-custom-error';
import { Programs } from '../programs';

export class WrongOption extends CustomError {
public constructor(program: Programs, option: string, expected: string[]) {
super();
this.message = `${program} requires a valid ${option} program option. Expected: ${expected.join(', ')}`;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './WrongOption';
2 changes: 2 additions & 0 deletions packages/cardano-services/src/ProgramsCommon/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export * from './CommonOptionDescriptions';
export * from './defaults';
export * from './errors';
export * from './options';
export * from './programs';
7 changes: 7 additions & 0 deletions packages/cardano-services/src/ProgramsCommon/programs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/**
* cardano-services programs
*/
export enum Programs {
HttpServer = 'HTTP server',
RabbitmqWorker = 'RabbitMQ worker'
}

This file was deleted.

1 change: 0 additions & 1 deletion packages/cardano-services/src/TxWorker/errors/index.ts

This file was deleted.

1 change: 0 additions & 1 deletion packages/cardano-services/src/TxWorker/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
export * from './defaults';
export * from './errors';
export * from './loadTxWorker';
export * from './TxWorkerOptionDescriptions';
19 changes: 10 additions & 9 deletions packages/cardano-services/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
} from './Program';
import { CACHE_TTL_DEFAULT } from './InMemoryCache';
import { Command } from 'commander';
import { CommonOptionDescriptions } from './ProgramsCommon';
import { CommonOptionDescriptions, Programs, USE_QUEUE_DEFAULT, WrongOption } from './ProgramsCommon';
import { DB_POLL_INTERVAL_DEFAULT } from './NetworkInfo';
import { InvalidLoggerLevel } from './errors';
import {
Expand All @@ -19,7 +19,6 @@ import {
POLLING_CYCLE_DEFAULT,
TxWorkerOptionDescriptions,
TxWorkerOptions,
WrongProgramOption,
loadTxWorker
} from './TxWorker';
import { URL } from 'url';
Expand All @@ -41,6 +40,13 @@ clear();
// eslint-disable-next-line no-console
console.log('Cardano Services CLI');

const stringToBoolean = (value: string, program: Programs, option: string) => {
// for compatibility: accepting same values as envalid in startWorker.ts
if (['0', 'f', 'false'].includes(value)) return false;
if (['1', 't', 'true'].includes(value)) return true;
throw new WrongOption(program, option, ['false', 'true']);
};

const commonOptions = (command: Command) =>
command
.option(
Expand Down Expand Up @@ -95,7 +101,7 @@ commonOptions(
(interval) => Number.parseInt(interval, 10),
DB_POLL_INTERVAL_DEFAULT
)
.option('--use-queue', ProgramOptionDescriptions.UseQueue, () => true, false)
.option('--use-queue', ProgramOptionDescriptions.UseQueue, () => true, USE_QUEUE_DEFAULT)
.action(async (serviceNames: ServiceNames[], options: { apiUrl: URL } & HttpServerOptions) => {
const { apiUrl, ...rest } = options;
const server = await loadHttpServer({ apiUrl: apiUrl || API_URL_DEFAULT, options: rest, serviceNames });
Expand All @@ -111,12 +117,7 @@ commonOptions(program.command('start-worker').description('Start RabbitMQ worker
.option(
'--parallel [parallel]',
TxWorkerOptionDescriptions.Parallel,
(parallel) => {
// for compatibility: accepting same values as envalid in startWorker.ts
if (['0', 'f', 'false'].includes(parallel)) return false;
if (['1', 't', 'true'].includes(parallel)) return true;
throw new WrongProgramOption(ServiceNames.TxSubmit, TxWorkerOptionDescriptions.Parallel, ['false', 'true']);
},
(parallel) => stringToBoolean(parallel, Programs.RabbitmqWorker, TxWorkerOptionDescriptions.Parallel),
PARALLEL_MODE_DEFAULT
)
.option(
Expand Down
3 changes: 2 additions & 1 deletion packages/cardano-services/src/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { CACHE_TTL_DEFAULT } from './InMemoryCache';
import { DB_POLL_INTERVAL_DEFAULT } from './NetworkInfo';
import { LogLevel } from 'bunyan';
import { URL } from 'url';
import { USE_QUEUE_DEFAULT } from './ProgramsCommon';
import { cacheTtlValidator } from './util/validators';
import { config } from 'dotenv';
import { loggerMethodNames } from './util';
Expand All @@ -20,7 +21,7 @@ const envSpecs = {
OGMIOS_URL: envalid.url({ default: OGMIOS_URL_DEFAULT }),
RABBITMQ_URL: envalid.url({ default: RABBITMQ_URL_DEFAULT }),
SERVICE_NAMES: envalid.str({ example: Object.values(ServiceNames).toString() }),
USE_QUEUE: envalid.bool({ default: false })
USE_QUEUE: envalid.bool({ default: USE_QUEUE_DEFAULT })
};

void (async () => {
Expand Down
42 changes: 17 additions & 25 deletions packages/cardano-services/test/entrypoints.txWorker.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable sonarjs/no-duplicate-string */
import { BAD_CONNECTION_URL, enqueueFakeTx, removeAllMessagesFromQueue } from '../../rabbitmq/test/utils';
import { BAD_CONNECTION_URL, enqueueFakeTx, removeAllQueues } from '../../rabbitmq/test/utils';
import { ChildProcess, fork } from 'child_process';
import { createConnectionObject } from '@cardano-ogmios/client';
import { createHealthyMockOgmiosServer, ogmiosServerReady } from './util';
Expand Down Expand Up @@ -52,48 +52,34 @@ describe('tx-worker entrypoints', () => {
afterAll(async () => await serverClosePromise(ogmiosServer));

beforeEach(async () => {
await removeAllMessagesFromQueue();
await enqueueFakeTx();
await enqueueFakeTx();
await removeAllQueues();
hookLogs = [];
loggerHookCounter = 0;
});

afterEach((done) => {
resetHook();
if (proc?.kill()) proc.on('close', done);
if (proc?.kill()) proc.on('close', () => done());
else done();
});

// Tests without any assertion fail if they get timeout
// eslint-disable-next-line sonarjs/cognitive-complexity
describe('with a working RabbitMQ server', () => {
describe('worker starts', () => {
it('cli:start-worker', (done) => {
describe('transaction are actually submitted', () => {
it('cli:start-worker submits transactions', async () => {
hookPromise = new Promise((resolve) => (hook = resolve));
proc = fork(exePath('cli'), commonArgs, { stdio: 'pipe' });
proc.stdout!.on('data', (data) => (data.toString().match('RabbitMQ transactions worker') ? done() : null));
await Promise.all([hookPromise, enqueueFakeTx()]);
});

it('startWorker', (done) => {
hook = done;
it('startWorker submits transactions', async () => {
hookPromise = new Promise((resolve) => (hook = resolve));
proc = fork(exePath('startWorker'), { env: commonEnv, stdio: 'pipe' });
await Promise.all([hookPromise, enqueueFakeTx()]);
});
});

describe('transaction are actually submitted', () => {
it('cli:start-worker submits transactions', () =>
new Promise<void>(async (resolve) => {
hook = resolve;
proc = fork(exePath('cli'), commonArgs, { stdio: 'pipe' });
}));

it('startWorker submits transactions', () =>
new Promise<void>(async (resolve) => {
hook = resolve;
proc = fork(exePath('startWorker'), { env: commonEnv, stdio: 'pipe' });
}));
});

describe('parallel option', () => {
describe('without parallel option', () => {
it('cli:start-worker starts in serial mode', (done) => {
Expand All @@ -104,7 +90,9 @@ describe('tx-worker entrypoints', () => {
it('startWorker starts in serial mode', async () => {
[hook, hookPromise] = loggerHook();
proc = fork(exePath('startWorker'), { env: commonEnv, stdio: 'pipe' });
const txPromises = [enqueueFakeTx(0), enqueueFakeTx(1)];
await hookPromise;
await Promise.all(txPromises);
expect(hookLogs).toEqual(['Processing tx 1', 'Processed tx 1', 'Processing tx 2', 'Processed tx 2']);
});
});
Expand All @@ -114,7 +102,7 @@ describe('tx-worker entrypoints', () => {
expect.assertions(2);
proc = fork(exePath('cli'), [...commonArgs, '--parallel', 'test'], { stdio: 'pipe' });
proc.stderr!.on('data', (data) =>
expect(data.toString()).toMatch('tx-submit requires a valid Parallel mode')
expect(data.toString()).toMatch('RabbitMQ worker requires a valid Parallel mode')
);
proc.on('exit', (code) => {
expect(code).toBe(1);
Expand All @@ -141,7 +129,9 @@ describe('tx-worker entrypoints', () => {
it('startWorker starts in serial mode', async () => {
[hook, hookPromise] = loggerHook();
proc = fork(exePath('startWorker'), { env: { ...commonEnv, PARALLEL: 'false' }, stdio: 'pipe' });
const txPromises = [enqueueFakeTx(0), enqueueFakeTx(1)];
await hookPromise;
await Promise.all(txPromises);
expect(hookLogs).toEqual(['Processing tx 1', 'Processed tx 1', 'Processing tx 2', 'Processed tx 2']);
});
});
Expand All @@ -155,7 +145,9 @@ describe('tx-worker entrypoints', () => {
it('startWorker starts in parallel mode', async () => {
[hook, hookPromise] = loggerHook();
proc = fork(exePath('startWorker'), { env: { ...commonEnv, PARALLEL: 'true' }, stdio: 'pipe' });
const txPromises = [enqueueFakeTx(0), enqueueFakeTx(1)];
await hookPromise;
await Promise.all(txPromises);
expect(hookLogs).toEqual(['Processing tx 1', 'Processing tx 2', 'Processed tx 1', 'Processed tx 2']);
});
});
Expand Down
91 changes: 57 additions & 34 deletions packages/cardano-services/test/load/load.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as envalid from 'envalid';
import { Cardano } from '@cardano-sdk/core';
import { ChildProcess, fork } from 'child_process';
import { ObservableWallet, SingleAddressWallet } from '@cardano-sdk/wallet';
import { InitializeTxResult, ObservableWallet, SingleAddressWallet } from '@cardano-sdk/wallet';
import { ServiceNames } from '../../src';
import {
assetProvider,
Expand All @@ -17,6 +17,7 @@ import {
} from '../../../wallet/test/e2e/config';
import { filter, firstValueFrom } from 'rxjs';
import { removeRabbitMQContainer, setupRabbitMQContainer } from '../../../rabbitmq/test/jest-setup/docker';
import JSONBig from 'json-bigint';
import path from 'path';

interface TestOptions {
Expand All @@ -28,7 +29,7 @@ interface TestOptions {
interface TestReport extends TestOptions {
timeBeforeSubmitTxs: number;
timeAfterWorkerStarted: number;
timeAfterTxsInMempool: number; // TODO: will work after https://input-output.atlassian.net/browse/ADP-1823
timeAfterTxsInMempool: number;
timeAfterTxsInBlockchain: number;
}

Expand Down Expand Up @@ -141,7 +142,9 @@ describe('load', () => {
wallet = await getWallet();
({ address } = (await firstValueFrom(wallet.addresses$))[0]);

logger.debug('Waiting to settle wallet status');
await firstValueFrom(wallet.syncStatus.isSettled$.pipe(filter((isSettled) => isSettled)));
logger.debug('Wallet status settled');
};

const waitForTxInBlockchain = (txId: Cardano.TransactionId) =>
Expand All @@ -164,9 +167,9 @@ describe('load', () => {
});

logger.info(`Fragmentation tx: ${tx.hash}`);

await wallet.submitTx(await wallet.finalizeTx(tx));
await waitForTxInBlockchain(tx.hash);
logger.info('Fragmentation completed');
};

if (options.directlyToOgmios) await fragment();
Expand Down Expand Up @@ -220,50 +223,70 @@ describe('load', () => {
afterEach(stopWorker);

const performTest = async (options: TestOptions) => {
const { directlyToOgmios, withRunningWorker } = options;
const { directlyToOgmios, parallel, withRunningWorker } = options;
const submitPromises: Promise<void>[] = [];
const txIds: Cardano.TransactionId[] = [];
let timeAfterWorkerStarted = 0;
let timeBeforeSubmitTxs = 0;

await fragmentWhenRequired(options);
try {
logger.debug(`Starting test with options: ${JSON.stringify({ directlyToOgmios, parallel, withRunningWorker })}`);

const startWorkerForTest = async () => {
if (!directlyToOgmios) await startWorker(options);
timeAfterWorkerStarted = Date.now();
};
await fragmentWhenRequired(options);

const startWorkerForTest = async () => {
if (!directlyToOgmios) await startWorker(options);
timeAfterWorkerStarted = Date.now();
};

const finalizeAndSubmit = async (tx: InitializeTxResult) => {
try {
await wallet.submitTx(await wallet.finalizeTx(tx));
} catch (error) {
logger.error(JSONBig.stringify(tx), error);
throw error;
}
};

const submitTransactions = async () => {
timeBeforeSubmitTxs = Date.now();
for (let i = 0; i < env.TRANSACTIONS_NUMBER; ++i) {
const coins = 1_000_000n + 1000n * BigInt(i);
const tx = await wallet.initializeTx({ outputs: new Set([{ address, value: { coins } }]) });
const submitTransactions = async () => {
timeBeforeSubmitTxs = Date.now();
for (let i = 0; i < env.TRANSACTIONS_NUMBER; ++i) {
const coins = 1_000_000n + 1000n * BigInt(i);
const tx = await wallet.initializeTx({ outputs: new Set([{ address, value: { coins } }]) });

submitPromises.push(wallet.submitTx(await wallet.finalizeTx(tx)));
txIds.push(tx.hash);
submitPromises.push(finalizeAndSubmit(tx));
txIds.push(tx.hash);
}
};

if (withRunningWorker) {
await startWorkerForTest();
await submitTransactions();
} else {
await submitTransactions();
await startWorkerForTest();
}
};

if (withRunningWorker) {
await startWorkerForTest();
await submitTransactions();
} else {
await submitTransactions();
await startWorkerForTest();
}
await expect(Promise.all(submitPromises)).resolves.not.toThrow();
const timeAfterTxsInMempool = Date.now();

await Promise.all(submitPromises);
const timeAfterTxsInMempool = Date.now();
await Promise.all(txIds.map((txId) => waitForTxInBlockchain(txId)));

await Promise.all(txIds.map((txId) => waitForTxInBlockchain(txId)));
testReports.push({
...options,
timeAfterTxsInBlockchain: Date.now(),
timeAfterTxsInMempool,
timeAfterWorkerStarted,
timeBeforeSubmitTxs
});

testReports.push({
...options,
timeAfterTxsInBlockchain: Date.now(),
timeAfterTxsInMempool,
timeAfterWorkerStarted,
timeBeforeSubmitTxs
});
logger.debug(`Completed test with options: ${JSON.stringify({ directlyToOgmios, parallel, withRunningWorker })}`);
} catch (error) {
logger.error(
`Failed test with options: ${JSON.stringify({ directlyToOgmios, parallel, withRunningWorker })}`,
error
);
}
};

describe('directly to ogmios', () => {
Expand Down

0 comments on commit a880367

Please sign in to comment.