Skip to content

Commit

Permalink
WorkerSource per-source-type -> per-source
Browse files Browse the repository at this point in the history
- Instead of making WorkerSource implementations responsible for tracking which source's messages they're processing, instantiate a different WorkerSource for each source, and make Worker responsible for forwarding messages to the correct instance.

- Simplifies GeoJSONWorkerSource's coalescing logic.

- Introduces a somewhat subtle requirement for Source implementers: any Source that sends a 'removeSource' message is responsible for sending no further messages. To re-add a Source with the same ID, create a new Source object and start sending messages. A new WorkerSource will be instantiated with matching state.
  • Loading branch information
ChrisLoer committed Jan 29, 2018
1 parent e5497ca commit 373f5c4
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 213 deletions.
13 changes: 8 additions & 5 deletions src/source/geojson_source.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class GeoJSONSource extends Evented implements Source {
_loaded: boolean;
_collectResourceTiming: boolean;
_resourceTiming: Array<PerformanceResourceTiming>;
_removed: boolean;

constructor(id: string, options: GeojsonSourceSpecification & {workerOptions?: any, collectResourceTiming: boolean}, dispatcher: Dispatcher, eventedParent: Evented) {
super();
Expand All @@ -94,6 +95,7 @@ class GeoJSONSource extends Evented implements Source {
this.tileSize = 512;
this.isTileClipped = true;
this.reparseOverscaled = true;
this._removed = false;

this.dispatcher = dispatcher;
this.setEventedParent(eventedParent);
Expand Down Expand Up @@ -198,11 +200,11 @@ class GeoJSONSource extends Evented implements Source {
options.data = JSON.stringify(data);
}

// target {this.type}.loadData rather than literally geojson.loadData,
// target {this.type}.{options.source}.loadData rather than literally geojson.loadData,
// so that other geojson-like source types can easily reuse this
// implementation
this.workerID = this.dispatcher.send(`${this.type}.loadData`, options, (err, abandoned) => {
if (!abandoned) {
this.workerID = this.dispatcher.send(`${this.type}.${options.source}.loadData`, options, (err, abandoned) => {
if (!abandoned && !this._removed) {
this._loaded = true;

if (result && result.resourceTiming && result.resourceTiming[this.id])
Expand All @@ -214,7 +216,7 @@ class GeoJSONSource extends Evented implements Source {
// message queue. Waiting instead for the 'coalesce' to round-trip
// through the foreground just means we're throttling the worker
// to run at a little less than full-throttle.
this.dispatcher.send(`${this.type}.coalesce`, this.workerOptions, null, this.workerID);
this.dispatcher.send(`${this.type}.${options.source}.coalesce`, null, null, this.workerID);
callback(err);
}
}, this.workerID);
Expand Down Expand Up @@ -262,7 +264,8 @@ class GeoJSONSource extends Evented implements Source {
}

onRemove() {
this.dispatcher.broadcast('removeSource', { type: this.type, source: this.id });
this._removed = true;
this.dispatcher.send('removeSource', { type: this.type, source: this.id }, null, this.workerID);
}

serialize() {
Expand Down
92 changes: 34 additions & 58 deletions src/source/geojson_worker_source.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,19 @@ export type LoadGeoJSONParameters = {
geojsonVtOptions?: Object
};

export type CoalesceParameters = {
source: string
};

export type LoadGeoJSON = (params: LoadGeoJSONParameters, callback: Callback<mixed>) => void;

export interface GeoJSONIndex {
}

function loadGeoJSONTile(params: WorkerTileParameters, callback: LoadVectorDataCallback) {
const source = params.source,
canonical = params.tileID.canonical;
const canonical = params.tileID.canonical;

if (!this._sources[source] || !this._sources[source].geoJSONIndex) {
if (!this._geoJSONIndex) {
return callback(null, null); // we couldn't load the file
}

const geoJSONTile = this._sources[source].geoJSONIndex.getTile(canonical.z, canonical.x, canonical.y);
const geoJSONTile = this._geoJSONIndex.getTile(canonical.z, canonical.x, canonical.y);
if (!geoJSONTile) {
return callback(null, null); // nothing in the given tile
}
Expand Down Expand Up @@ -89,12 +84,10 @@ export type SourceState =
*/
class GeoJSONWorkerSource extends VectorTileWorkerSource {
loadGeoJSON: LoadGeoJSON;
_sources: { [string]: {
state?: SourceState,
pendingCallback?: Callback<boolean>,
pendingLoadDataParams?: LoadGeoJSONParameters,
geoJSONIndex?: GeoJSONIndex // object mapping source ids to geojson-vt-like tile indexes
}};
_state: SourceState;
_pendingCallback: Callback<boolean>;
_pendingLoadDataParams: LoadGeoJSONParameters;
_geoJSONIndex: GeoJSONIndex

/**
* @param [loadGeoJSON] Optional method for custom loading/parsing of
Expand All @@ -106,7 +99,6 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
if (loadGeoJSON) {
this.loadGeoJSON = loadGeoJSON;
}
this._sources = {};
}

/**
Expand All @@ -123,45 +115,38 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
* See {@link GeoJSONWorkerSource#coalesce}
*
* @param params
* @param params.source The id of the source.
* @param callback
*/
loadData(params: LoadGeoJSONParameters, callback: Callback<boolean>) {
if (!this._sources[params.source]) {
this._sources[params.source] = {};
}
const source = this._sources[params.source];

if (source.pendingCallback) {
if (this._pendingCallback) {
// Tell the foreground the previous call has been abandoned
source.pendingCallback(null, true);
this._pendingCallback(null, true);
}
source.pendingCallback = callback;
source.pendingLoadDataParams = params;
this._pendingCallback = callback;
this._pendingLoadDataParams = params;

if (source.state &&
source.state !== 'Idle') {
source.state = 'NeedsLoadData';
if (this._state &&
this._state !== 'Idle') {
this._state = 'NeedsLoadData';
} else {
source.state = 'Coalescing';
this._loadData(params.source);
this._state = 'Coalescing';
this._loadData();
}
}

/**
* Internal implementation: called directly by `loadData`
* or by `coalesce` using stored parameters.
*/
_loadData(sourceId: string) {
const source = this._sources[sourceId];
if (!source.pendingCallback || !source.pendingLoadDataParams) {
_loadData() {
if (!this._pendingCallback || !this._pendingLoadDataParams) {
assert(false);
return;
}
const callback = source.pendingCallback;
const params = source.pendingLoadDataParams;
delete source.pendingCallback;
delete source.pendingLoadDataParams;
const callback = this._pendingCallback;
const params = this._pendingLoadDataParams;
delete this._pendingCallback;
delete this._pendingLoadDataParams;
this.loadGeoJSON(params, (err, data) => {
if (err || !data) {
return callback(err);
Expand All @@ -171,14 +156,14 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
rewind(data, true);

try {
source.geoJSONIndex = params.cluster ?
this._geoJSONIndex = params.cluster ?
supercluster(params.superclusterOptions).load(data.features) :
geojsonvt(data, params.geojsonVtOptions);
} catch (err) {
return callback(err);
}

this.loaded[params.source] = {};
this.loaded = {};

const result = {};
if (params.request && params.request.collectResourceTiming) {
Expand Down Expand Up @@ -215,16 +200,12 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
* | ↓
* State: NeedsLoadData
*/
coalesce(params: CoalesceParameters) {
const source = this._sources[params.source];
if (!source) {
return; // coalesce queued after removeSource
}
if (source.state === 'Coalescing') {
source.state = 'Idle';
} else if (source.state === 'NeedsLoadData') {
source.state = 'Coalescing';
this._loadData(params.source);
coalesce() {
if (this._state === 'Coalescing') {
this._state = 'Idle';
} else if (this._state === 'NeedsLoadData') {
this._state = 'Coalescing';
this._loadData();
}
}

Expand All @@ -235,11 +216,10 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
* Otherwise, such as after a setData() call, we load the tile fresh.
*
* @param params
* @param params.source The id of the source for which we're loading this tile.
* @param params.uid The UID for this tile.
*/
reloadTile(params: WorkerTileParameters, callback: WorkerTileCallback) {
const loaded = this.loaded[params.source],
const loaded = this.loaded,
uid = params.uid;

if (loaded && loaded[uid]) {
Expand Down Expand Up @@ -279,13 +259,9 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
}

removeSource(params: {source: string}, callback: Callback<mixed>) {
const removedSource = this._sources[params.source];
if (removedSource) {
if (removedSource.pendingCallback) {
// Don't leak callbacks
removedSource.pendingCallback(null, true);
}
delete this._sources[params.source];
if (this._pendingCallback) {
// Don't leak callbacks
this._pendingCallback(null, true);
}
callback();
}
Expand Down
20 changes: 8 additions & 12 deletions src/source/raster_dem_tile_worker_source.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,29 @@ import type {

class RasterDEMTileWorkerSource {
actor: Actor;
loading: {[string]: {[string]: DEMData}};
loaded: {[string]: {[string]: DEMData}};
loading: {[string]: DEMData};
loaded: {[string]: DEMData};

constructor() {
this.loading = {};
this.loaded = {};
}

loadTile(params: WorkerDEMTileParameters, callback: WorkerDEMTileCallback) {
const source = params.source,
uid = params.uid;

if (!this.loading[source])
this.loading[source] = {};
const uid = params.uid;

const dem = new DEMData(uid);
this.loading[source][uid] = dem;
this.loading[uid] = dem;
dem.loadFromImage(params.rawImageData);
delete this.loading[source][uid];
delete this.loading[uid];

this.loaded[source] = this.loaded[source] || {};
this.loaded[source][uid] = dem;
this.loaded = this.loaded || {};
this.loaded[uid] = dem;
callback(null, dem);
}

removeTile(params: TileParameters) {
const loaded = this.loaded[params.source],
const loaded = this.loaded,
uid = params.uid;
if (loaded && loaded[uid]) {
delete loaded[uid];
Expand Down
27 changes: 12 additions & 15 deletions src/source/vector_tile_worker_source.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ class VectorTileWorkerSource implements WorkerSource {
actor: Actor;
layerIndex: StyleLayerIndex;
loadVectorData: LoadVectorData;
loading: { [string]: { [string]: WorkerTile } };
loaded: { [string]: { [string]: WorkerTile } };
loading: { [string]: WorkerTile };
loaded: { [string]: WorkerTile };

/**
* @param [loadVectorData] Optional method for custom loading of a VectorTile
Expand All @@ -96,15 +96,14 @@ class VectorTileWorkerSource implements WorkerSource {
* a `params.url` property) for fetching and producing a VectorTile object.
*/
loadTile(params: WorkerTileParameters, callback: WorkerTileCallback) {
const source = params.source,
uid = params.uid;
const uid = params.uid;

if (!this.loading[source])
this.loading[source] = {};
if (!this.loading)
this.loading = {};

const workerTile = this.loading[source][uid] = new WorkerTile(params);
const workerTile = this.loading[uid] = new WorkerTile(params);
workerTile.abort = this.loadVectorData(params, (err, response) => {
delete this.loading[source][uid];
delete this.loading[uid];

if (err || !response) {
return callback(err);
Expand All @@ -131,16 +130,16 @@ class VectorTileWorkerSource implements WorkerSource {
callback(null, util.extend({rawTileData: rawTileData.slice(0)}, result, cacheControl, resourceTiming));
});

this.loaded[source] = this.loaded[source] || {};
this.loaded[source][uid] = workerTile;
this.loaded = this.loaded || {};
this.loaded[uid] = workerTile;
});
}

/**
* Implements {@link WorkerSource#reloadTile}.
*/
reloadTile(params: WorkerTileParameters, callback: WorkerTileCallback) {
const loaded = this.loaded[params.source],
const loaded = this.loaded,
uid = params.uid,
vtSource = this;
if (loaded && loaded[uid]) {
Expand Down Expand Up @@ -170,11 +169,10 @@ class VectorTileWorkerSource implements WorkerSource {
* Implements {@link WorkerSource#abortTile}.
*
* @param params
* @param params.source The id of the source for which we're loading this tile.
* @param params.uid The UID for this tile.
*/
abortTile(params: TileParameters, callback: WorkerTileCallback) {
const loading = this.loading[params.source],
const loading = this.loading,
uid = params.uid;
if (loading && loading[uid] && loading[uid].abort) {
loading[uid].abort();
Expand All @@ -187,11 +185,10 @@ class VectorTileWorkerSource implements WorkerSource {
* Implements {@link WorkerSource#removeTile}.
*
* @param params
* @param params.source The id of the source for which we're loading this tile.
* @param params.uid The UID for this tile.
*/
removeTile(params: TileParameters, callback: WorkerTileCallback) {
const loaded = this.loaded[params.source],
const loaded = this.loaded,
uid = params.uid;
if (loaded && loaded[uid]) {
delete loaded[uid];
Expand Down
Loading

0 comments on commit 373f5c4

Please sign in to comment.