Skip to content

Commit

Permalink
Merge pull request #5 from pabloem/standard-coders
Browse files Browse the repository at this point in the history
[WIP] Standard coders with their test suite
  • Loading branch information
robertwb authored Jan 5, 2022
2 parents 91cbe75 + 3ba60a2 commit 6753374
Show file tree
Hide file tree
Showing 7 changed files with 360 additions and 12 deletions.
10 changes: 8 additions & 2 deletions sdks/node-ts/package.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
{
"devDependencies": {
"mocha": "^9.1.3",
"typescript": "^4.5.4"
"typescript": "^4.5.4",
"js-yaml": "^4.1.0"
},
"files": [
"dist"
],
"scripts": {
"build": "tsc --build",
"clean": "tsc --build --clean",
Expand All @@ -13,5 +17,7 @@
"@grpc/grpc-js": "^1.4.6",
"@protobuf-ts/plugin": "^2.1.0",
"typescript-formatter": "^7.2.2"
}
},
"main": "./dist/apache_beam/index.js",
"exports": "./dist/apache_beam/index.js"
}
47 changes: 47 additions & 0 deletions sdks/node-ts/src/apache_beam/coders/standard_coders.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
interface Class<T> {
new(...args: any[]): T;
}

class CoderRegistry {
internal_registry = {};
get(urn: string): Coder {
const constructor: Class<Coder> = this.internal_registry[urn];
if (constructor === undefined) {
return null!;
}
return new constructor();
}

register(urn: string, coderClass: Class<Coder>) {
this.internal_registry[urn] = coderClass;
}
}

export const CODER_REGISTRY = new CoderRegistry();


export class Coder {
encode(element: any): Uint8Array {
throw new Error('Not implemented!');
}

decode(bytes: Uint8Array): any {
throw new Error('Not implemented!');
}
}

export class BytesCoder extends Coder {
static URN: string = "beam:coder:bytes:v1";
constructor() {
super();
}

encode(element: Uint8Array): Uint8Array {
return element;
}

decode(element: Uint8Array): Uint8Array {
return element;
}
}
CODER_REGISTRY.register(BytesCoder.URN, BytesCoder);
154 changes: 144 additions & 10 deletions sdks/node-ts/src/apache_beam/core.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,173 @@
const runnerApi = require('./proto/beam_runner_api');
const translations = require('./internal/translations')


// TODO(pabloem): Use something better, hah.
var _pcollection_counter = 0;

/**
* Represents an 'edge' in a graph. These may be PCollections, PCollection views,
* and Pipelines themselves.
*/
class PValue {
constructor() {
// TODO: Have a reference to its graph representation
type: string = "unknown";
name: string;

constructor(name: string) {
this.name = name;
}

isPipeline(): boolean {
return this.type === "pipeline";
}

apply(transform: PTransform): PValue {
return transform.expand(this);
}

map(callable: any): PValue {
map(callable: DoFn | GenericCallable): PValue {
return this.apply(new ParDo(callable));
}

// Top-level functions:
// - flatMap
// - filter?
}

class Pipeline extends PValue {

/**
* A Pipeline is the base object to start building a Beam DAG. It is the
* first object that a user creates, and then they may start applying
* transformations to it to build a DAG.
*/
export class Pipeline extends PValue {
type: string = "pipeline";
portablePipeline: any;
}

class PCollection extends PValue {
export class PCollection extends PValue {
type: string = "pcollection";
portablePcollection: any;

constructor(name: string, portablePcollection: any) {
super(name)
this.portablePcollection = portablePcollection;
}
}

class PTransform {
export class PTransform {
expand(input: PValue): PValue {
throw new Error('Method expand has not been implemented.');
}
}

class ParDo extends PTransform {

/**
* @returns true if the input is a `DoFn`.
*
* Since type information is lost at runtime, we check the object's attributes
* to determine whether it's a DoFn or not.
*
* @example
* Prints "true" for a new `DoFn` but "false" for a function:
* ```ts
* console.log(new DoFn());
* console.log(in => in * 2));
* ```
* @param callableOrDoFn
* @returns
*/
function isDoFn(callableOrDoFn: DoFn | GenericCallable) {
const df = (callableOrDoFn as DoFn)
if (df.type !== undefined && df.type === "dofn") {
return true;
} else {
return false;
}
}

export class ParDo extends PTransform {
private doFn;
constructor(callableOrDoFn: any) {
constructor(callableOrDoFn: DoFn | GenericCallable) {
super()
this.doFn = callableOrDoFn;
if (isDoFn(callableOrDoFn)) {
this.doFn = callableOrDoFn;
} else {
this.doFn = new _CallableWrapperDoFn(callableOrDoFn as GenericCallable);
}
}

expand(input: PValue): PValue {
console.log(runnerApi.StandardPTransforms_Primitives.PAR_DO.urn);

const pardoProto = runnerApi.PTransform.create({
// TODO(pabloem): Get the name for the PTransform
'uniqueName': 'todouniquename',
'spec': runnerApi.FunctionSpec.create({
// TODO(pabloem): URNS ARE DISAPPEARING!
'urn': runnerApi.StandardPTransforms_Primitives.PAR_DO.urn,
'payload': runnerApi.ParDoPayload.create({
'doFn': runnerApi.FunctionSpec.create()
})
}),
// TODO(pabloem): Add inputs
'inputs': {},
'outputs': {'out': 'ref_PCollection_' + _pcollection_counter}
});
_pcollection_counter += 1;

// TODO(pablom): Do this properly
return new PCollection(pardoProto.outputs.out, pardoProto);
}
}

interface GenericCallable {
(input: any): any
}

export class DoFn {
type: string = "dofn";
process(element: any) {
throw new Error('Method process has not been implemented!');
}

startBundle(element: any) {
throw new Error('Method process has not been implemented!');
}

finishBundle(element: any) {
throw new Error('Method process has not been implemented!');
}
}

class _CallableWrapperDoFn extends DoFn {
private fn;
constructor(fn: GenericCallable) {
super();
this.fn = fn;
}
process(element: any) {
return this.fn(element);
}
}

class DoFn {
export class Impulse extends PTransform {
expand(input: PValue): PValue {
if (!input.isPipeline()) {
throw new Error("User is attempting to apply Impulse transform to a non-pipeline object.");
}

const impulseProto = runnerApi.PTransform.create({
// TODO(pabloem): Get the name for the PTransform
'uniqueName': 'todouniquename',
'spec': runnerApi.FunctionSpec.create({
'urn': translations.DATA_INPUT_URN,
'payload': translations.IMPULSE_BUFFER
}),
'outputs': {'out': 'ref_PCollection_' + _pcollection_counter}
});
_pcollection_counter += 1;

return new PCollection(impulseProto.outputs.out, impulseProto);
}
}
1 change: 1 addition & 0 deletions sdks/node-ts/src/apache_beam/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './core'
6 changes: 6 additions & 0 deletions sdks/node-ts/src/apache_beam/internal/translations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

export const IMPULSE_BUFFER = new TextEncoder().encode("impulse");

export const DATA_INPUT_URN = 'beam:runner:source:v1';
export const DATA_OUTPUT_URN = 'beam:runner:sink:v1';
export const IDENTITY_DOFN_URN = 'beam:dofn:identity:0.1';
13 changes: 13 additions & 0 deletions sdks/node-ts/test/core_test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
const beam = require("../dist/apache_beam");
// TODO(pabloem): Fix installation.

describe("core module", function() {
describe("basic ptransform", function() {
it("runs a basic expansion", function() {
var p = new beam.Pipeline();
var res2 = p.apply(new beam.Impulse())
.apply(new beam.ParDo(function(v) {return v*2;}));
console.log(res2);
});
});
});
Loading

0 comments on commit 6753374

Please sign in to comment.