From 3acb4465c2f3ac7aafd704f2351b121a7e439923 Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Mon, 6 Oct 2014 16:09:05 -0400 Subject: [PATCH] (pubsub) refactor api. --- docs/components/docs/docs-values.js | 17 +- docs/components/docs/docs.html | 18 + docs/components/docs/docs.js | 3 +- docs/css/main.css | 5 + lib/common/util.js | 3 + lib/index.js | 29 +- lib/pubsub/index.js | 532 ++++++++++------------------ lib/pubsub/subscription.js | 329 +++++++++++++++++ lib/pubsub/topic.js | 270 ++++++++++++++ package.json | 8 +- regression/pubsub.js | 200 ++++++----- scripts/docs.sh | 29 ++ test/pubsub/index.js | 219 ++++++++---- test/pubsub/subscription.js | 430 ++++++++++++++++++++++ test/pubsub/topic.js | 341 ++++++++++++++++++ 15 files changed, 1918 insertions(+), 515 deletions(-) create mode 100644 lib/pubsub/subscription.js create mode 100644 lib/pubsub/topic.js create mode 100755 scripts/docs.sh create mode 100644 test/pubsub/subscription.js create mode 100644 test/pubsub/topic.js diff --git a/docs/components/docs/docs-values.js b/docs/components/docs/docs-values.js index fc3f9952daa..c785015e671 100644 --- a/docs/components/docs/docs-values.js +++ b/docs/components/docs/docs-values.js @@ -39,6 +39,21 @@ angular.module('gcloud.docs') ] }, + pubsub: { + title: 'PubSub', + _url: '{baseUrl}/pubsub', + pages: [ + { + title: 'Topic', + url: '/topic' + }, + { + title: 'Subscription', + url: '/subscription' + } + ] + }, + storage: { title: 'Storage', _url: '{baseUrl}/storage' @@ -49,6 +64,6 @@ angular.module('gcloud.docs') // https://github.com/npm/node-semver#versions // List should be in ascending order. '<=0.7.1': ['gcloud', 'datastore', 'storage'], - '>0.7.1': ['gcloud', 'datastoreWithTransaction', 'storage'] + '>0.7.1': ['gcloud', 'datastoreWithTransaction', 'pubsub', 'storage'] } }); diff --git a/docs/components/docs/docs.html b/docs/components/docs/docs.html index eb6ead22cab..fba03068e2b 100644 --- a/docs/components/docs/docs.html +++ b/docs/components/docs/docs.html @@ -73,6 +73,24 @@

Datastore Overview

+
+

Pub/Sub Overview

+

+ Google Cloud Pub/Sub is in Alpha status. As a result, it might be changed in backward-incompatible ways and is not recommended for production use. It is not subject to any SLA or deprecation policy. +

+

+ The gcloud.pubsub method will return a pubsub object, allowing you to create topics, publish messages, subscribe to topics, and more. See the Google Cloud Pub/Sub overview for more information. +

+
+var pubsub = gcloud.pubsub({ + projectId: 'myProject', + keyFilename: '/path/to/keyfile.json' +});
+

+ See the examples below, which demonstrate everything from creating a topic to subscribing to messages on a topic. +

+
+

Storage Overview

diff --git a/docs/components/docs/docs.js b/docs/components/docs/docs.js index 98d3dc89ca7..96af4591334 100644 --- a/docs/components/docs/docs.js +++ b/docs/components/docs/docs.js @@ -172,7 +172,8 @@ angular return function(mixInData) { return mixInData .reduce(function(acc, mixInMethods) { - return acc = acc.concat(mixInMethods); + acc = acc.concat(mixInMethods); + return acc; }, data) .sort(function(a, b) { return a.name > b.name; diff --git a/docs/css/main.css b/docs/css/main.css index 32a20d55194..e8d4d001dcb 100755 --- a/docs/css/main.css +++ b/docs/css/main.css @@ -617,6 +617,11 @@ h2, h3 { position: relative; } +.notice { + background-color: #e5ecf9; + padding: 8px; +} + .permalink { display: none; position: absolute; diff --git a/lib/common/util.js b/lib/common/util.js index 8158afce99d..ec27a6d09e7 100644 --- a/lib/common/util.js +++ b/lib/common/util.js @@ -183,6 +183,9 @@ module.exports.handleResp = handleResp; * @return {string} */ function getType(value) { + if (value instanceof Buffer) { + return 'buffer'; + } return Object.prototype.toString.call(value).match(/\s(\w+)\]/)[1]; } diff --git a/lib/index.js b/lib/index.js index 530f42352d9..3d01d7ca23c 100644 --- a/lib/index.js +++ b/lib/index.js @@ -20,6 +20,12 @@ 'use strict'; +/** + * @type module:common/util + * @private + */ +var util = require('./common/util.js'); + /** * @type {module:datastore} * @private @@ -30,7 +36,7 @@ var Datastore = require('./datastore'); * @type {module:pubsub} * @private */ -var pubsub = require('./pubsub'); +var PubSub = require('./pubsub'); /** * @type {module:storage} @@ -86,13 +92,17 @@ var Storage = require('./storage'); * * var bucket = gcloud.storage.bucket({ * bucketName: 'PhotosBucket', - * // properties may be overriden: + * // properties may be overridden: * keyFilename: '/path/to/other/keyfile.json' * }); */ function gcloud(config) { return { datastore: new Datastore(config), + pubsub: function(options) { + options = options || {}; + return new PubSub(util.extendGlobalConfig(config, options)); + }, storage: new Storage(config) }; } @@ -130,24 +140,25 @@ gcloud.datastore = Datastore; * Note: Google Cloud Pub/Sub API is available as a Limited Preview and the * client library we provide is currently experimental. The API and/or the * client might be changed in backward-incompatible ways. This API is not - * subject to any SLA or deprecation policy. Request to be whitelisted to use - * it by filling the + * subject to any SLA or deprecation policy. Request to be whitelisted to use it + * by filling the * [Limited Preview application form]{@link http://goo.gl/sO0wTu}. * * @type {module:pubsub} * - * @return {object} + * @return {module:pubsub} * * @example * var gcloud = require('gcloud'); - * var pubsub = gcloud.pubsub; * - * var conn = new pubsub.Connection({ + * var pubsub = gcloud.pubsub({ * projectId: YOUR_PROJECT_ID, * keyFilename: '/path/to/the/key.json' * }); */ -gcloud.pubsub = pubsub; +gcloud.pubsub = function(config) { + return new PubSub(config); +}; /** * Google Cloud Storage allows you to store data on Google infrastructure. @@ -171,7 +182,7 @@ gcloud.pubsub = pubsub; * * // storage: * // { - * // Bucket: function() {} + * // bucket: function() {} * // } */ gcloud.storage = Storage; diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index 31d67eec1e3..d6646b381b2 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -1,4 +1,4 @@ -/** +/*! * Copyright 2014 Google Inc. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,447 +14,285 @@ * limitations under the License. */ -/** +/*! * @module pubsub */ 'use strict'; -var events = require('events'); -var extend = require('extend'); -var nodeutil = require('util'); - -/** @type {module:common/connection} */ -var conn = require('../common/connection.js'); - -/** @type {module:common/util} */ -var util = require('../common/util.js'); - -/** @private @const {string} Base URL for Pub/Sub API. */ -var PUBSUB_BASE_URL = 'https://www.googleapis.com/pubsub/v1beta1'; - -/** @const {array} Required scopes for Pub/Sub API. */ -var SCOPES = [ - 'https://www.googleapis.com/auth/pubsub', - 'https://www.googleapis.com/auth/cloud-platform' -]; - -function Subscription(conn, name) { - this.conn = conn; - this.name = name; - - this.autoAck = false; - this.pullIntervalInMs = 10; - this.closed = false; -} - -nodeutil.inherits(Subscription, events.EventEmitter); - -/** - * Acknowledges the backend that message is retrieved. - * @param {Array} ids A list of message IDs. - * @param {Function} callback Callback function. - */ -Subscription.prototype.ack = function(ids, callback) { - ids = util.arrayize(ids); - var body = { - subscription: this.name, - ackId: ids - }; - this.conn.makeReq('POST', 'subscriptions/acknowledge', null, body, callback); -}; - -/** - * Pulls from the subscribed topic. - * @param {Boolean} opts.returnImmediately If set, the system will respond - * immediately. Otherwise, wait - * until new messages are available. - * Returns if timeout is reached. - * @param {Function} callback Callback. - */ -Subscription.prototype.pull = function(opts, callback) { - var that = this; - // TODO(jbd): Should not be racing with other pull. - // TOOD(jbd): Make opts optional. - var body = { - subscription: this.name, - returnImmediately: !!opts.returnImmediately - }; - this.conn.makeReq( - 'POST', 'subscriptions/pull', null, body, function(err, message) { - // TODO(jbd): Fix API to return a list of messages. - if (err) { - callback(err); - return; - } - if (!that.autoAck) { - that.emitMessage_(message); - callback(); - return; - } - that.ack(message.ackId, function(err) { - if (err) { - callback(err); - return; - } - that.emitMessage_(message); - callback(); - }); - }); -}; - -/** - * Polls the backend for new messages. - */ -Subscription.prototype.startPulling_ = function() { - var that = this; - var pullFn = function() { - if (that.closed) { - return; - } - that.pull({ returnImmediately: false }, function(err) { - // TODO(jbd): Fix API to return a more explicit error code or message. - if (err && err.message.indexOf('has no more messages') < 0) { - that.emitError_(err); - } - setTimeout(function() { - pullFn(); - }, that.pullIntervalInMs); - }); - }; - pullFn(); -}; - /** - * Deletes the current subscription. Pull requests from the current - * subscription will be errored once unsubscription is done. - * @param {Function} callback Optional callback. + * @type module:common/connection + * @private */ -Subscription.prototype.del = function(callback) { - callback = callback || util.noop; - var that = this; - var path = util.format('subscriptions/{fullName}', { - fullName: this.name - }); - this.conn.makeReq('DELETE', path, null, true, function(err) { - if (err) { - callback(err); - return; - } - that.closed = true; - callback(null); - }); -}; - -/** - * Closes the subscription. - */ -Subscription.prototype.close = function() { - this.closed = true; -}; - -/** - * Emits a 'message' event with the provided message. - */ -Subscription.prototype.emitMessage_ = function(msg) { - if (msg.pubsubEvent && msg.pubsubEvent.message) { - var data = msg.pubsubEvent.message.data; - msg.pubsubEvent.message.data = new Buffer(data, 'base64').toString('utf-8'); - } - this.emit('message', msg); -}; +var conn = require('../common/connection.js'); /** - * Emits an error with the provided error. + * @type module:pubsub/subscription + * @private */ -Subscription.prototype.emitError_ = function(err) { - this.emit('error', err); -}; +var Subscription = require('./subscription.js'); /** - * Represents a Google Cloud Pub/Sub API topic. - * @param {Connection} conn Authorized connection. - * @param {string} name Full name of the topic. + * @type module:pubsub/topic + * @private */ -function Topic(conn, name) { - this.conn = conn; - this.name = name; -} +var Topic = require('./topic.js'); /** - * Publishes the provided string message. - * @param {string} data String message to publish. - * @param {Function} callback Optional callback. + * @type module:common/util + * @private */ -Topic.prototype.publish = function(data, callback) { - callback = callback || util.noop; - this.publishMessage({ - topic: this.name, - message: { - data: new Buffer(data).toString('base64') - } - }, callback); -}; +var util = require('../common/util.js'); /** - * Publishes a raw message. - * @param {message} message Raw message to publish. - * @param {Function} callback Optional callback. + * @const {string} Base URL for Pub/Sub API. + * @private */ -Topic.prototype.publishMessage = function(message, callback) { - callback = callback || util.noop; - message.topic = this.name; - this.conn.makeReq('POST', 'topics/publish', null, message, callback); -}; +var PUBSUB_BASE_URL = 'https://www.googleapis.com/pubsub/v1beta1'; /** - * Deletes a topic. - * @param {Function} callback Optional callback. + * @const {array} Required scopes for Pub/Sub API. + * @private */ -Topic.prototype.del = function(callback) { - callback = callback || util.noop; - var path = 'topics/' + this.name; - this.conn.makeReq('DELETE', path, null, true, callback); -}; +var SCOPES = [ + 'https://www.googleapis.com/auth/pubsub', + 'https://www.googleapis.com/auth/cloud-platform' +]; /** - * Represents connection to Google Cloud Pub/Sub API. - * @param {string} opts.projectId Google Developers Console Project ID. - * @param {string} opts.email Service account email. + * [Google Cloud Pub/Sub]{@link https://developers.google.com/pubsub/overview} + * is a reliable, many-to-many, asynchronous messaging service from Google + * Cloud Platform. + * + * Note: Google Cloud Pub/Sub API is available as a Limited Preview and the + * client library we provide is currently experimental. The API and/or the + * client might be changed in backward-incompatible ways. This API is not + * subject to any SLA or deprecation policy. Request to be whitelisted to use + * it by filling the + * [Limited Preview application form]{@link http://goo.gl/sO0wTu}. + * + * @constructor + * @alias module:pubsub + * + * @param {object=} options - Configuration object. + * @param {string=} options.projectId - Google Developers Console Project ID. * @param {string=} options.keyFilename - Full path to the JSON key downloaded * from the Google Developers Console. Alternatively, you may provide a * `credentials` object. * @param {object=} options.credentials - Credentials object, used in place of * a `keyFilename`. + * + * @example + * var gcloud = require('gcloud'); + * + * // From Google Compute Engine and Google App Engine: + * + * // Access `pubsub` through the `gcloud` module directly. + * var pubsub = gcloud.pubsub(); + * + * // Elsewhere: + * + * // Provide configuration details up-front. + * var myProject = gcloud({ + * keyFilename: '/path/to/keyfile.json', + * projectId: 'my-project' + * }); + * + * var pubsub = myProject.pubsub(); + * + * + * // Override default configuration details. + * var anotherPubsubConnection = myProject.pubsub({ + * keyFilename: '/path/to/another/keyfile.json', + * }); + * + * + * // Specify all options at instantiation. + * var pubsub = gcloud.pubsub({ + * keyFilename: '/path/to/keyfile.json', + * projectId: 'my-project' + * }); */ -function Connection(opts) { - opts = opts || {}; - var id = opts.projectId; +function PubSub(options) { + options = options || {}; - this.id = id; - this.conn = new conn.Connection({ - credentials: opts.credentials, - keyFilename: opts.keyFilename, + this.connection = new conn.Connection({ + credentials: options.credentials, + keyFilename: options.keyFilename, scopes: SCOPES }); + this.projectId = options.projectId; + this.projectName = '/projects/' + this.projectId; } /** - * Lists subscriptions. - * @param {string} query.filterByTopic Returns subscriptions that are - * subscribed to the topic provided. - * @param {string} query.pageToken Page token. - * @param {Number} query.maxResults Max number of results to return. - * @param {Function} callback Callback function. + * Get a list of the topics registered to your project. You may optionally + * provide a query object as the first argument to customize the response. + * + * @param {object=} query - Query object. + * @param {string=} query.pageToken - Page token. + * @param {number=} query.maxResults - Maximum number of results to return. + * @param {function} callback - The callback function. + * + * @example + * // Get all topics. + * pubsub.getTopics(function(err, topics, nextQuery) { + * // If `nextQuery` is non-null, there may be more results to fetch. To do + * // so, run `pubsub.getTopics(nextQuery, callback);`. + * }); + * + * // Customize the query. + * pubsub.getTopics({ + * maxResults: 3 + * }, function(err, topics, nextQuery) {}); */ -Connection.prototype.listSubscriptions = function(query, callback) { +PubSub.prototype.getTopics = function(query, callback) { var that = this; - if (arguments.length < 2) { + if (!callback) { callback = query; query = {}; } - var q = extend({}, query); - if (q.filterByTopic) { - q.query = - 'pubsub.googleapis.com/topic in (' + - this.fullTopicName_(q.filterByTopic) + ')'; - } else { - q.query = - 'cloud.googleapis.com/project in (' + this.fullProjectName_() + ')'; - } - delete q.filterByTopic; - - this.makeReq('GET', 'subscriptions', q, true, function(err, result) { + query.query = 'cloud.googleapis.com/project in (' + this.projectName + ')'; + this.makeReq_('GET', 'topics', query, true, function(err, result) { if (err) { callback(err); return; } - var items = result.subscription || []; - var subscriptions = items.map(function(item) { - return new Subscription(that, item.name); + var topics = (result.topic || []).map(function(item) { + return new Topic(that, { + name: item.name + }); }); var nextQuery = null; if (result.nextPageToken) { - nextQuery = q; + nextQuery = query; nextQuery.pageToken = result.nextPageToken; } - callback(null, subscriptions, nextQuery); + callback(null, topics, nextQuery); }); }; /** - * Gets a subscription. - * @param {string} name Name of the subscription. - * @param {Function} callback Callback. + * Create a topic with the given name. + * + * @param {string} name - Name of the topic. + * @param {function=} callback - The callback function. + * + * @example + * pubsub.createTopic('my-new-topic', function(err, topic) { + * topic.publish('New message!', function(err) {}); + * }); */ -Connection.prototype.getSubscription = function(name, callback) { - var that = this; - var fullName = '/subscriptions/' + this.id + '/' + name; - this.makeReq('GET', 'subscriptions/' + fullName, null, true, function(err) { - if (err) { - callback(err); - return; - } - callback(null, new Subscription(that, fullName)); - }); -}; - -Connection.prototype.createSubscription = function(opts, callback) { - var that = this; - var subscription = { - topic:'/topics/' + this.id + '/' + opts.topic, - name: '/subscriptions/' + this.id + '/' + opts.name, - ackDeadlineSeconds: opts.ackDeadlineSeconds +PubSub.prototype.createTopic = function(name, callback) { + callback = callback || util.noop; + var topic = this.topic(name); + var req = { + name: topic.name }; - this.makeReq('POST', 'subscriptions', null, subscription, function(err) { + this.makeReq_('POST', 'topics', null, req, function(err) { if (err) { callback(err); return; } - callback(null, new Subscription(that, subscription.name)); - }); + callback(null, topic); + }.bind(this)); }; /** - * Subscribe with the provided options. - * @param {string} name Name of the subscription. - * @param {Boolean} opts.autoAck Automatically acknowledges the - * message once it's pulled. - * @return {Subscription} + * Create a Topic object to reference an existing topic. + * + * @throws {Error} If a name is not provided. + * + * @param {string} name - The name of the topic. + * @return {module:pubsub/topic} + * + * @example + * var topic = pubsub.topic('my-existing-topic'); + * topic.publish('New message!'); */ -Connection.prototype.subscribe = function(name, opts) { - opts = opts || {}; - - var fullName = '/subscriptions/' + this.id + '/' + name; - var sub = new Subscription(this, fullName); - sub.autoAck = !!opts.autoAck; - this.getSubscription(name, function(err) { - if (err) { - sub.emitError_(err); - return; - } - sub.emit('ready'); - sub.startPulling_(); +PubSub.prototype.topic = function(name) { + if (!name) { + throw new Error('A name must be specified for a new topic.'); + } + return new Topic(this, { + name: name }); - return sub; }; /** - * Lists topics. - * @param {string} query.pageToken Page token. - * @param {Number} query.maxResults Max number of results to return. - * @param {Function} callback Callback function. + * Get a list of the subscriptions registered to all of your project's topics. + * You may optionally provide a query object as the first argument to customize + * the response. + * + * @param {object=} query - Query object. + * @param {string=} query.pageToken - Page token. + * @param {number=} query.maxResults - Maximum number of results to return. + * @param {function} callback - The callback function. + * + * @example + * // Get all subscriptions. + * pubsub.getSubscriptions(function(err, subscriptions, nextQuery) { + * // If `nextQuery` is non-null, there may be more results to fetch. To do + * // so, run `pubsub.getSubscriptions(nextQuery, callback);`. + * }); + * + * // Customize the query. + * pubsub.getSubscriptions({ + * maxResults: 10 + * }, function(err, subscriptions, nextQuery) {}); */ -Connection.prototype.listTopics = function(query, callback) { +PubSub.prototype.getSubscriptions = function(query, callback) { var that = this; - if (arguments.length < 2) { + if (!callback) { callback = query; query = {}; } - var q = extend({}, query); - q.query = 'cloud.googleapis.com/project in (' + this.fullProjectName_() + ')'; - this.makeReq('GET', 'topics', q, true, function(err, result) { + if (!query.query) { + query.query = 'cloud.googleapis.com/project in (' + this.projectName + ')'; + } + this.makeReq_('GET', 'subscriptions', query, true, function(err, result) { if (err) { callback(err); return; } - var items = result.topic || []; - var topics = items.map(function(item) { - return new Topic(that, item.name); + var subscriptions = (result.subscription || []).map(function(item) { + return new Subscription(that, { + name: item.name + }); }); var nextQuery = null; if (result.nextPageToken) { - nextQuery = q; + nextQuery = query; nextQuery.pageToken = result.nextPageToken; } - callback(null, topics, nextQuery); - }); -}; - -/** - * Gets a topic. - * @param {string} name Name of the topic to get. - * @param {Function} callback Optional callback. - */ -Connection.prototype.getTopic = function(name, callback) { - var that = this; - callback = callback || util.noop; - var fullName = this.fullTopicName_(name); - this.makeReq('GET', 'topics/' + fullName, null, true, function(err) { - if (err) { - callback(err); - return; - } - callback(null, new Topic(that, fullName)); - }); -}; - -/** - * Creates a topic with the given name. - * @param {string} name Name of the topic. - * @param {Function} callback Optional callback. - */ -Connection.prototype.createTopic = function(name, callback) { - var that = this; - callback = callback || util.noop; - var fullName = this.fullTopicName_(name); - this.makeReq('POST', 'topics', null, { name: fullName }, function(err) { - if (err) { - callback(err); - return; - } - callback(null, new Topic(that, fullName)); - }); -}; - -/** - * Returns the full name of a topic. - * Full name is in /topics// form. - */ -Connection.prototype.fullTopicName_ = function(name) { - return util.format('/topics/{projectId}/{name}', { - projectId: this.id, name: name + callback(null, subscriptions, nextQuery); }); }; /** - * Returns the fully qualified project name. - * Full name is in /projects/ form. + * Make a new request object from the provided arguments and wrap the callback + * to intercept non-successful responses. + * + * @private + * + * @param {string} method - Action. + * @param {string} path - Request path. + * @param {*} query - Request query object. + * @param {*} body - Request body contents. + * @param {function} callback - The callback function. */ -Connection.prototype.fullProjectName_ = function() { - return util.format('/projects/{projectId}', { - projectId: this.id - }); -}; - -Connection.prototype.makeReq = function(method, path, q, body, callback) { +PubSub.prototype.makeReq_ = function(method, path, q, body, callback) { var reqOpts = { method: method, qs: q, - uri: util.format('{base}/{path}', { - base: PUBSUB_BASE_URL, - path: path - }) + uri: PUBSUB_BASE_URL + '/' + path }; if (body) { reqOpts.json = body; } - this.conn.req(reqOpts, function(err, res, body) { + this.connection.req(reqOpts, function(err, res, body) { util.handleResp(err, res, body, callback); }); }; -/** - * Exports Connection. - */ -module.exports.Connection = Connection; - -/** - * Exports Topic. - */ -module.exports.Topic = Topic; - -/** - * Exports Subscription. - */ -module.exports.Subscription = Subscription; +module.exports = PubSub; diff --git a/lib/pubsub/subscription.js b/lib/pubsub/subscription.js new file mode 100644 index 00000000000..aedaa0eb552 --- /dev/null +++ b/lib/pubsub/subscription.js @@ -0,0 +1,329 @@ +/*! + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*! + * @module pubsub/subscription + */ + +'use strict'; + +var events = require('events'); +var nodeutil = require('util'); + +/** + * @type module:common/util + * @private + */ +var util = require('../common/util.js'); + +/*! Developer Documentation + * + * @param {module:pubsub} pubsub - PubSub object. + * @param {object} options - Configuration object. + * @param {boolean} options.autoAck - Automatically acknowledge the message + * once it's pulled. (default: false) + * @param {number} options.interval - Interval in milliseconds to check for new + * messages. (default: 10) + * @param {string} options.name - Name of the subscription. + */ +/** + * A Subscription object will give you access to your Google Cloud Pub/Sub + * subscription. + * + * Subscriptions are sometimes retrieved when using various methods: + * + * - {@linkcode module:pubsub#getSubscriptions} + * - {@linkcode module:pubsub/topic#getSubscriptions} + * - {@linkcode module:pubsub/topic#subscribe} + * + * Subscription objects may be created directly with: + * + * - {@linkcode module:pubsub/topic#subscription} + * + * All Subscription objects are instances of an + * [EventEmitter]{@link http://nodejs.org/api/events.html}. The subscription + * will pull for messages automatically as long as there is at least one + * listener assigned for the `message` event. + * + * @alias pubsub/subscription + * @constructor + * + * @example + * //- + * // From {@linkcode module:pubsub#getSubscriptions}: + * //- + * pubsub.getSubscriptions(function(err, subscriptions) { + * // `subscriptions` is an array of Subscription objects. + * }); + * + * //- + * // From {@linkcode module:pubsub/topic#getSubscriptions}: + * //- + * var topic = pubsub.topic('my-existing-topic'); + * topic.getSubscriptions(function(err, subscriptions) { + * // `subscriptions` is an array of Subscription objects. + * }); + * + * //- + * // From {@linkcode module:pubsub/topic#subscribe}: + * //- + * var topic = pubsub.topic('my-existing-topic'); + * topic.subscribe('new-subscription', function(err, subscription) { + * // `subscription` is a Subscription object. + * }); + * + * //- + * // From {@linkcode module:pubsub/topic#subscription}: + * //- + * var topic = pubsub.topic('my-existing-topic'); + * var subscription = topic.subscription('my-existing-subscription'); + * // `subscription` is a Subscription object. + * + * //- + * // Once you have obtained a subscription object, you may begin to register + * // listeners. This will automatically trigger pulling for messages. + * //- + * // Register an error handler. + * subscription.on('error', function(err) {}); + * + * // Register a listener for `message` events. + * function onMessage(message) { + * // Called every time a message is received. + * // message.id = ID used to acknowledge its receival. + * // message.data = Contents of the message. + * } + * subscription.on('message', onMessage); + * + * // Remove the listener from receiving `message` events. + * subscription.removeListener('message', onMessage); + */ +function Subscription(pubsub, options) { + events.EventEmitter.call(this); + + this.name = Subscription.formatName_(pubsub.projectId, options.name); + this.makeReq_ = pubsub.makeReq_.bind(pubsub); + + this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false; + this.closed = false; + this.interval = util.is(options.interval, 'number') ? options.interval : 10; + + this.listenForEvents_(); +} + +nodeutil.inherits(Subscription, events.EventEmitter); + +/** + * Format the name of a subscription. A subscription's full name is in the + * format of /subscription/{projectId}/{name}. + * + * @private + */ +Subscription.formatName_ = function(projectId, name) { + // Simple check if the name is already formatted. + if (name.indexOf('/') > -1) { + return name; + } + return '/subscriptions/' + projectId + '/' + name; +}; + +/** + * Simplify a message from an API response to have two properties, `id` and + * `data`. `data` is always converted to a string. + * + * @private + */ +Subscription.formatMessage_ = function(msg) { + var message = { + id: msg.ackId + }; + if (msg.pubsubEvent && msg.pubsubEvent.message) { + message.data = + new Buffer(msg.pubsubEvent.message.data, 'base64').toString('utf-8'); + try { + message.data = JSON.parse(message.data); + } catch(e) {} + } + return message; +}; + +/** + * Begin listening for events on the subscription. This method keeps track of + * how many message listeners are assigned, and then removed, making sure + * polling is handled automatically. + * + * As long as there is one active message listener, the connection is open. As + * soon as there are no more message listeners, the connection is closed. + * + * @private + * + * @example + * this.listenForEvents_(); + */ +Subscription.prototype.listenForEvents_ = function() { + var that = this; + var messageListeners = 0; + + this.on('newListener', function(event) { + if (event === 'message') { + messageListeners++; + if (that.closed) { + that.closed = false; + } + that.startPulling_(); + } + }); + + this.on('removeListener', function(event) { + if (event === 'message' && --messageListeners === 0) { + that.closed = true; + } + }); +}; + +/** + * Poll the backend for new messages. This runs a loop to ping the API at the + * provided interval from the subscription's instantiation. If one wasn't + * provided, the default value is 10 milliseconds. + * + * If messages are received, they are emitted on the `message` event. + * + * Note: This method is automatically called once a message event handler is + * assigned to the description. + * + * To stop pulling, see {@linkcode module:pubsub/subscription#close}. + * + * @private + * + * @example + * subscription.startPulling_(); + */ +Subscription.prototype.startPulling_ = function() { + var that = this; + if (this.closed) { + return; + } + this.pull({ + returnImmediately: false + }, function(err, message) { + if (err) { + that.emit('error', err); + } + if (message) { + that.emit('message', message); + } + setTimeout(that.startPulling_.bind(that), that.interval); + }); +}; + +/** + * Acknowledge to the backend that the message was retrieved. You must provide + * either a single ID, or an array of IDs. + * + * @throws {Error} If at least one id is not provided. + * + * @param {string|string[]} ids - An ID or array of message IDs. + * @param {function} callback - The callback function. + * + * @example + * subscription.ack('ePHEESyhuE8e...', function(err) {}); + */ +Subscription.prototype.ack = function(ids, callback) { + if (!ids || ids.length === 0) { + throw new Error( + 'At least one ID must be specified before it can be acknowledged'); + } + ids = util.arrayize(ids); + var body = { + subscription: this.name, + ackId: ids + }; + this.makeReq_('POST', 'subscriptions/acknowledge', null, body, callback); +}; + +/** + * Delete the subscription. Pull requests from the current subscription will be + * errored once unsubscription is complete. + * + * @param {function=} callback - The callback function. + * + * @example + * subscription.delete(function(err) {}); + */ +Subscription.prototype.delete = function(callback) { + callback = callback || util.noop; + this.makeReq_( + 'DELETE', 'subscriptions/' + this.name, null, true, function(err) { + if (err) { + callback(err); + return; + } + this.closed = true; + this.removeAllListeners(); + callback(null); + }.bind(this)); +}; + +/** + * Pull messages from the subscribed topic. If messages were found, your + * callback is executed with the message object. + * + * Note that messages are pulled automatically once you register your first + * event listener to the subscription, thus the call to `pull` is handled for + * you. If you don't want to start pulling, simply don't register a + * `subscription.on('message', function() {})` event handler. + * + * @param {object=} options - Configuration object. + * @param {boolean=} options.returnImmediately - If set, the system will respond + * immediately. Otherwise, wait until new messages are available. Returns if + * timeout is reached. + * @param {function} callback - The callback function. + * + * @example + * subscription.pull(function(err, message) { + * // message.id = ID used to acknowledge its receival. + * // message.data = Contents of the message. + * }); + */ +Subscription.prototype.pull = function(options, callback) { + var that = this; + // TODO(jbd): Should not be racing with other pull. + if (!callback) { + callback = options; + options = {}; + } + var body = { + subscription: this.name, + returnImmediately: !!options.returnImmediately + }; + this.makeReq_( + 'POST', 'subscriptions/pull', null, body, function(err, message) { + // TODO(jbd): Fix API to return a list of messages. + if (err) { + callback(err); + return; + } + message = Subscription.formatMessage_(message); + if (that.autoAck) { + that.ack(message.id, function(err) { + callback(err, message); + }); + } else { + callback(null, message); + } + }); +}; + +module.exports = Subscription; diff --git a/lib/pubsub/topic.js b/lib/pubsub/topic.js new file mode 100644 index 00000000000..53ced5ac507 --- /dev/null +++ b/lib/pubsub/topic.js @@ -0,0 +1,270 @@ +/*! + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*! + * @module pubsub/topic + */ + +'use strict'; + +/** + * @type module:common/util + * @private + */ +var util = require('../common/util.js'); + +/** + * @type module:pubsub/subscription + * @private + */ +var Subscription = require('./subscription.js'); + +/*! Developer Documentation + * + * @param {module:pubsub} pubsub - PubSub object. + * @param {object} options - Configuration object. + * @param {string} options.name - Name of the topic. + */ +/** + * A Topic object allows you to interact with a Google Cloud Pub/Sub topic. To + * get this object, you will use the methods on the `pubsub` object, + * {@linkcode module:pubsub#topic} and {@linkcode module:pubsub#createTopic}. + * + * @constructor + * @alias module:pubsub/topic + * + * @example + * // From pubsub.topic: + * var topic = pubsub.topic('my-existing-topic'); + * + * // From pubsub.createTopic: + * pubsub.createTopic('my-new-topic', function(err, topic) { + * // `topic` is a Topic object. + * }); + */ +function Topic(pubsub, options) { + this.makeReq_ = pubsub.makeReq_.bind(pubsub); + this.name = Topic.formatName_(pubsub.projectId, options.name); + this.projectId = pubsub.projectId; + this.pubsub = pubsub; +} + +/** + * Format the name of a topic. A Topic's full name is in the format of + * /topics/{projectId}/{name}. + * + * @private + * + * @return {string} + */ +Topic.formatName_ = function(projectId, name) { + // Simple check if the name is already formatted. + if (name.indexOf('/') > -1) { + return name; + } + return '/topics/' + projectId + '/' + name; +}; + +/** + * Publish the provided message. A message can be of any type. + * + * @throws {Error} If no message is provided. + * + * @param {*} message - The message to publish. + * @param {function=} callback - The callback function. + * + * @example + * topic.publish('New message!', function(err) {}); + * + * topic.publish({ + * user_id: 3, + * name: 'Stephen', + * message: 'Hello from me!' + * }, function(err) {}); + */ +Topic.prototype.publish = function(message, callback) { + if (!message) { + throw new Error('Cannot publish an empty message.'); + } + callback = callback || util.noop; + if (!util.is(message, 'string') && !util.is(message, 'buffer')) { + message = JSON.stringify(message); + } + this.publishRaw({ + data: new Buffer(message).toString('base64') + }, callback); +}; + +/** + * Publish a raw message. + * + * @throws {Error} If no message is provided. + * + * @param {object} message - Raw message to publish. + * @param {array=} message.label - List of labels for the message. + * @param {*} message.data - The contents of the message. + * @param {function=} callback - The callback function. + * + * @example + * topic.publishRaw({ + * data: new Buffer('New message!').toString('base64') + * }, function(err) {}); + */ +Topic.prototype.publishRaw = function(message, callback) { + if (!message) { + throw new Error('Cannot publish an empty message.'); + } + callback = callback || util.noop; + if (!util.is(message.data, 'string') && !util.is(message.data, 'buffer')) { + message.data = new Buffer(JSON.stringify(message.data)).toString('base64'); + } + message.topic = this.name; + this.makeReq_('POST', 'topics/publish', null, message, callback); +}; + +/** + * Delete the topic. + * + * @param {function=} callback - The callback function. + * + * @example + * topic.delete(function(err) {}); + */ +Topic.prototype.delete = function(callback) { + callback = callback || util.noop; + this.makeReq_('DELETE', 'topics/' + this.name, null, true, callback); +}; + +/** + * Get a list of the subscriptions registered to this topic. You may optionally + * provide a query object as the first argument to customize the response. + * + * Your provided callback will either be invoked with an error object, if an API + * error occurred, or an array of {@linkcode module:pubsub/subscription} + * objects. + * + * @param {object=} query - Query object. + * @param {string=} query.pageToken - Page token. + * @param {number=} query.maxResults - Maximum number of results to return. + * @param {function} callback - The callback function. + * + * @example + * // Get all subscriptions. + * topic.getSubscriptions(function(err, subscriptions, nextQuery) { + * // If `nextQuery` is non-null, there may be more results to fetch. To do + * // so, run `topic.getSubscriptions(nextQuery, callback);`. + * }); + * + * // Customize the query. + * topic.getSubscriptions({ + * maxResults: 3 + * }, function(err, subscriptions, nextQuery) {}); + */ +Topic.prototype.getSubscriptions = function(query, callback) { + query.query = 'pubsub.googleapis.com/topic in (' + this.name + ')'; + this.pubsub.getSubscriptions(query, callback); +}; + +/** + * Create a subscription to this topic. You may optionally provide an object to + * customize the subscription. + * + * Your provided callback will either be invoked with an error object, if an API + * error occurred, or a {@linkcode module:pubsub/subscription} object. + * + * @throws {Error} If a name is not provided. + * + * @param {string} name - The name of the subscription. + * @param {object=} options - Configuration object. + * @param {number=} options.ackDeadlineSeconds - The maximum time after + * receiving a message that you must ack a message before it is redelivered. + * @param {boolean=} options.autoAck - Automatically acknowledge the message + * once it's pulled. (default: false) + * @param {number=} options.interval - Interval in milliseconds to check for new + * messages. (default: 10) + * @param {function} callback - The callback function. + * + * @example + * // Without specifying any options. + * topic.subscribe('new-subscription', function(err, subscription) {}); + * + * // With options. + * topic.subscribe('new-subscription', { + * ackDeadlineSeconds: 90, + * autoAck: true, + * interval: 30 + * }, function(err, subscription) {}); + */ +Topic.prototype.subscribe = function(name, options, callback) { + if (!name) { + throw new Error('A name is required for a new subscription.'); + } + if (!callback) { + callback = options; + options = {}; + } + var body = { + topic: this.name, + name: Subscription.formatName_(this.projectId, name) + }; + if (options.ackDeadlineSeconds) { + body.ackDeadlineSeconds = options.ackDeadlineSeconds; + } + this.makeReq_('POST', 'subscriptions', null, body, function(err) { + if (err) { + callback(err); + return; + } + callback(null, this.subscription(name, options)); + }.bind(this)); +}; + +/** + * Create a Subscription object in reference to an existing subscription. This + * command by itself will not run any API requests. You will receive a + * {@linkcode module:pubsub/subscription} object, which will allow you to + * interact with your subscription. + * + * @throws {Error} If a name is not provided. + * + * @param {string} name - Name of the subscription. + * @param {object=} options - Configuration object. + * @param {boolean=} options.autoAck - Automatically acknowledge the message + * once it's pulled. + * @param {number=} options.interval - Interval in milliseconds to check for new + * messages. + * @return {module:pubsub/subscription} + * + * @example + * var subscription = topic.subscription('my-existing-subscription'); + * + * // Register a listener for `message` events. + * subscription.on('message', function(message) { + * // Called every time a message is received. + * // message.id = ID used to acknowledge its receival. + * // message.data = Contents of the message. + * }); + */ +Topic.prototype.subscription = function(name, options) { + if (!name) { + throw new Error('The name of a subscription is required.'); + } + options = options || {}; + options.name = name; + return new Subscription(this.pubsub, options); +}; + +module.exports = Topic; diff --git a/package.json b/package.json index 9534ae9d758..ede031b2c7c 100644 --- a/package.json +++ b/package.json @@ -63,12 +63,12 @@ "tmp": "0.0.24" }, "scripts": { - "docs": "dox < lib/index.js > docs/json/master/index.json & dox < lib/datastore/dataset.js > docs/json/master/datastore/dataset.json & dox < lib/datastore/transaction.js > docs/json/master/datastore/transaction.json & dox < lib/datastore/index.js > docs/json/master/datastore/index.json & dox < lib/datastore/request.js > docs/json/master/datastore/request.json & dox < lib/datastore/query.js > docs/json/master/datastore/query.json & dox < lib/storage/index.js > docs/json/master/storage/index.json", + "docs": "./scripts/docs.sh", "lint": "jshint lib/ regression/ test/", "test": "mocha --recursive --reporter spec", - "regression-test": "mocha regression/datastore.js regression/storage.js --reporter spec --timeout 15000", - "cover": "istanbul cover -x 'regression/* lib/pubsub/*' _mocha -- --timeout 15000 test/* regression/datastore.js regression/storage.js", - "coveralls": "istanbul cover -x 'regression/* lib/pubsub/*' _mocha --report lcovonly -- --timeout 15000 test/* regression/datastore.js regression/storage.js -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage" + "regression-test": "mocha regression/datastore.js regression/pubsub.js regression/storage.js --reporter spec --timeout 15000", + "cover": "istanbul cover -x 'regression/*' _mocha -- --timeout 15000 test/* regression/datastore.js regression/pubsub.js regression/storage.js", + "coveralls": "istanbul cover -x 'regression/*' _mocha --report lcovonly -- --timeout 15000 test/* regression/datastore.js regression/pubsub.js regression/storage.js -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage" }, "license": "Apache 2" } diff --git a/regression/pubsub.js b/regression/pubsub.js index d0dea40566d..c230fe97bb5 100644 --- a/regression/pubsub.js +++ b/regression/pubsub.js @@ -14,7 +14,7 @@ * limitations under the License. */ -/*global describe, it, before */ +/*global describe, it, before, after */ 'use strict'; @@ -22,148 +22,168 @@ var assert = require('assert'); var async = require('async'); var env = require('./env.js'); -var gcloud = require('../lib'); - -var topicNames = ['topic1', 'topic2', 'topic3']; -var subscriptions = [ - { - name: 'sub1', - ackDeadlineSeconds: 30 - }, - { - name: 'sub2', - ackDeadlineSeconds: 60 - } -]; +var gcloud = require('../lib')(env); + +var Subscription = require('../lib/pubsub/subscription.js'); -var conn = new gcloud.pubsub.Connection(env); +var pubsub = gcloud.pubsub(); describe('pubsub', function() { + var topicNames = ['topic1', 'topic2', 'topic3']; - before(function(done) { + function deleteAllTopics(callback) { // TODO: Handle pagination. - var createFn = function(name, callback) { - conn.createTopic(name, callback); - }; - conn.listTopics(function(err, topics) { + pubsub.getTopics(function(err, topics) { + if (err) { + callback(err); + return; + } + async.parallel(topics.map(function(topic) { + return topic.delete.bind(topic); + }), callback); + }); + } + + before(function(done) { + deleteAllTopics(function(err) { assert.ifError(err); - var fns = topics.map(function(t) { - return function(cb) { - t.del(cb); - }; - }); - async.parallel(fns, function(err) { - assert.ifError(err); - async.map(topicNames, createFn, done); - }); + // Create new topics. + async.map(topicNames, pubsub.createTopic.bind(pubsub), done); }); }); + after(deleteAllTopics); + describe('Topic', function() { it('should be listed', function(done) { - conn.listTopics(function(err, topics) { - assert(topics.length, 3); - done(err); + pubsub.getTopics(function(err, topics) { + assert.ifError(err); + assert(topics.length, topicNames.length); + done(); }); }); it('should return a nextQuery if there are more results', function(done) { - conn.listTopics({ maxResults: 2 }, function(err, topics, next) { - assert(topics.length, 2); - assert(next.maxResults, 2); + pubsub.getTopics({ + maxResults: topicNames.length - 1 + }, function(err, topics, next) { + assert.ifError(err); + assert(topics.length, topicNames.length - 1); + assert(next.maxResults, topicNames.length - 1); assert(!!next.pageToken, true); - done(err); + done(); }); }); it('should be created', function(done) { - conn.createTopic('topic-new', done); - }); - - it('should be gettable', function(done) { - conn.getTopic('topic1', done); + pubsub.createTopic('new-topic-name', done); }); it('should publish a message', function(done) { - conn.getTopic('topic1', function(err, topic) { - topic.publish('message from me', done); - }); + pubsub.topic(topicNames[0]) + .publish('message from me', done); }); it('should be deleted', function(done) { - conn.getTopic('topic3', function(err, topic) { - topic.del(done); - }); + pubsub.topic(topicNames[0]) + .delete(done); }); }); describe('Subscription', function() { + var TOPIC_NAME = 'test-topic'; + var subscriptions = [ + { + name: 'sub1', + options: { ackDeadlineSeconds: 30 } + }, + { + name: 'sub2', + options: { ackDeadlineSeconds: 60 } + } + ]; + var topic; + + function deleteAllTopics(callback) { + pubsub.getTopics(function(err, topics) { + if (err) { + callback(err); + return; + } + async.parallel(topics.map(function(topic) { + return topic.delete.bind(topic); + }), callback); + }); + } + + function deleteAllSubscriptions(callback) { + pubsub.getSubscriptions(function(err, subs) { + if (err) { + callback(err); + return; + } + async.parallel(subs.map(function(sub) { + return sub.delete.bind(sub); + }), callback); + }); + } + before(function(done) { - var createFn = function(item, callback) { - conn.createSubscription({ - name: item.name, - topic: 'topic1', - ackDeadlineSeconds: item.ackDeadlineSeconds - }, callback); - }; - conn.listSubscriptions(function(err, subs) { + async.parallel([deleteAllTopics, deleteAllSubscriptions], function(err) { assert.ifError(err); - var fns = subs.map(function(sub) { - return function(cb) { - sub.del(cb); - }; - }); - async.series(fns, function(err) { + // Create a new test topic. + pubsub.createTopic(TOPIC_NAME, function(err, newTopic) { assert.ifError(err); - async.map(subscriptions, createFn, done); + topic = newTopic; + // Create subscriptions. + async.parallel(subscriptions.map(function(sub) { + return topic.subscribe.bind(topic, sub.name, sub.options); + }), done); }); }); }); - it('should be listed', function(done) { - conn.listSubscriptions(function(err, subs) { - assert.strictEqual(subs.length, 2); - done(err); - }); + after(function(done) { + topic.delete(done); }); - it('should be gettable', function(done) { - conn.getSubscription('sub1', function(err, sub) { + it('should list all subscriptions registered to the topic', function(done) { + topic.getSubscriptions(function(err, subs) { assert.ifError(err); - assert.strictEqual(sub.name, '/subscriptions/' + env.projectId + - '/sub1'); + assert(subs[0] instanceof Subscription); + assert.equal(subs.length, subscriptions.length); done(); }); }); - it('should error while getting a non-existent subscription', function(done){ - conn.getSubscription('sub-nothing-is-here', function(err) { - assert.strictEqual(err.code, 404); + it('should allow creation of a topic', function(done) { + topic.subscribe('new-subscription', function(err, sub) { + assert.ifError(err); + assert(sub instanceof Subscription); done(); }); }); - it('should be created', function(done) { - conn.createSubscription({ - topic: 'topic1', - name: 'new-sub' - }, done); + it('should error when using a non-existent subscription', function(done) { + var subscription = topic.subscription('non-existent-subscription'); + subscription.pull(function(err) { + assert.equal(err.code, 404); + done(); + }); }); it('should be able to pull and ack', function(done) { - conn.getTopic('topic1', function(err, topic) { - assert.ifError(err); - topic.publish('hello', function(err) { - assert.ifError(err); - }); - }); - conn.getSubscription('sub1', function(err, sub) { + var subscription = topic.subscription(subscriptions[0].name); + subscription.pull({ returnImmediately: true }, function(err, msg) { assert.ifError(err); - sub.on('message', function(msg) { - sub.ack(msg.ackId, done); - }); - sub.pull({}, function() {}); + subscription.ack(msg.id, done); }); + topic.publish('hello', assert.ifError); + topic.publish('hello', assert.ifError); + topic.publish('hello', assert.ifError); + topic.publish('hello', assert.ifError); + topic.publish('hello', assert.ifError); + topic.publish('hello', assert.ifError); }); }); }); diff --git a/scripts/docs.sh b/scripts/docs.sh new file mode 100755 index 00000000000..ac7617c0ab3 --- /dev/null +++ b/scripts/docs.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +# Copyright 2014 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +./node_modules/.bin/dox < lib/index.js > docs/json/master/index.json & + +./node_modules/.bin/dox < lib/datastore/dataset.js > docs/json/master/datastore/dataset.json & +./node_modules/.bin/dox < lib/datastore/index.js > docs/json/master/datastore/index.json & +./node_modules/.bin/dox < lib/datastore/query.js > docs/json/master/datastore/query.json & +./node_modules/.bin/dox < lib/datastore/request.js > docs/json/master/datastore/request.json & +./node_modules/.bin/dox < lib/datastore/transaction.js > docs/json/master/datastore/transaction.json & + +./node_modules/.bin/dox < lib/pubsub/index.js > docs/json/master/pubsub/index.json & +./node_modules/.bin/dox < lib/pubsub/subscription.js > docs/json/master/pubsub/subscription.json & +./node_modules/.bin/dox < lib/pubsub/topic.js > docs/json/master/pubsub/topic.json & + +./node_modules/.bin/dox < lib/storage/index.js > docs/json/master/storage/index.json diff --git a/test/pubsub/index.js b/test/pubsub/index.js index d249ed97379..da93518d77b 100644 --- a/test/pubsub/index.js +++ b/test/pubsub/index.js @@ -14,81 +14,174 @@ * limitations under the License. */ -/*global describe, it */ +/*global describe, it, beforeEach */ 'use strict'; var assert = require('assert'); -var pubsub = require('../../lib/pubsub'); - -describe('Subscription', function() { - it('should ack messages if autoAck is set', function(done) { - var sub = new pubsub.Subscription({}, 'sub1'); - sub.autoAck = true; - sub.conn.makeReq = function(method, path, qs, body, callback) { - if (path === 'subscriptions/pull') { - callback(null, { ackId: 'ackd-id' }); - return; - } - if (path === 'subscriptions/acknowledge') { - done(); - } +var PubSub = require('../../lib/pubsub/index.js'); +var Subscription = require('../../lib/pubsub/subscription.js'); +var Topic = require('../../lib/pubsub/topic.js'); + +describe('PubSub', function() { + var PROJECT_ID = 'test-project'; + var pubsub; + + beforeEach(function() { + pubsub = new PubSub({ projectId: PROJECT_ID }); + pubsub.makeReq_ = function(method, path, q, body, callback) { + callback(); }; - sub.pull({}, function() {}); }); - it('should be closed', function(done) { - var sub = new pubsub.Subscription({}, 'sub1'); - sub.close(); - assert.strictEqual(sub.closed, true); - done(); + describe('getTopics', function() { + beforeEach(function() { + pubsub.makeReq_ = function(method, path, q, body, callback) { + callback(null, { topic: [{ name: 'fake-topic' }] }); + }; + }); + + it('should accept a query and a callback', function(done) { + pubsub.getTopics({}, done); + }); + + it('should accept just a callback', function(done) { + pubsub.getTopics(done); + }); + + it('should build a project-wide query', function() { + pubsub.makeReq_ = function(method, path, q) { + var query = + 'cloud.googleapis.com/project in (/projects/' + PROJECT_ID + ')'; + assert.equal(method, 'GET'); + assert.equal(path, 'topics'); + assert.equal(q.query, query); + }; + pubsub.getTopics(function() {}); + }); + + it('should return Topic instances', function() { + pubsub.getTopics(function(err, topics) { + assert.ifError(err); + assert(topics[0] instanceof Topic); + }); + }); + + it('should return a query if more results exist', function() { + var token = 'next-page-token'; + pubsub.makeReq_ = function(method, path, q, body, callback) { + callback(null, { nextPageToken: token }); + }; + var query = { maxResults: 1 }; + pubsub.getTopics(query, function(err, topics, nextQuery) { + assert.ifError(err); + assert.strictEqual(query.maxResults, nextQuery.maxResults); + assert.equal(query.pageToken, token); + }); + }); + + it('should pass error if api returns an error', function() { + var error = new Error('Error'); + pubsub.makeReq_ = function(method, path, q, body, callback) { + callback(error); + }; + pubsub.getTopics(function(err) { + assert.equal(err, error); + }); + }); }); - it('should pull messages', function(done) { - var conn = new pubsub.Connection({ - projectId: 'test-project' - }); - conn.makeReq = function(method, path, qs, body, callback) { - if (path === 'subscriptions//subscriptions/test-project/sub1') { - callback(null, {}); - return; - } - if (path === 'subscriptions/pull') { - callback(null, { ackId: 123 }); - return; - } - }; - var sub = conn.subscribe('sub1', { autoAck: false }); - sub.once('message', function() { - sub.close(); - done(); + describe('createTopic', function() { + it('should create a topic', function() { + var topicName = 'new-topic-name'; + pubsub.makeReq_ = function(method, path, q, body) { + assert.equal(method, 'POST'); + assert.equal(path, 'topics'); + assert.equal(body.name, '/topics/' + PROJECT_ID + '/' + topicName); + }; + pubsub.createTopic(topicName, function() {}); + }); + + it('should return a Topic object', function() { + pubsub.createTopic('new-topic', function(err, topic) { + assert.ifError(err); + assert(topic instanceof Topic); + }); }); }); - it('should pull and ack messages', function(done) { - var conn = new pubsub.Connection({ - projectId: 'test-project' - }); - conn.makeReq = function(method, path, qs, body, callback) { - if (path === 'subscriptions//subscriptions/test-project/sub1') { - callback(null, {}); - return; - } - if (path === 'subscriptions/pull') { - setImmediate(function() { - callback(null, { ackId: 123 }); - }); - return; - } - if (path === 'subscriptions/acknowledge') { - callback(null, true); - return; - } - }; - var sub = conn.subscribe('sub1', { autoAck: true }); - sub.once('message', function() { - sub.close(); - done(); + describe('topic', function() { + it('should throw if a name is not provided', function() { + assert.throws(function() { + pubsub.topic(); + }, /name must be specified/); }); + + it('should return a Topic object', function() { + assert(pubsub.topic('new-topic') instanceof Topic); + }); + }); + + describe('getSubscriptions', function() { + beforeEach(function() { + pubsub.makeReq_ = function(method, path, q, body, callback) { + callback(null, { subscription: [{ name: 'fake-subscription' }] }); + }; + }); + + it('should accept a query and a callback', function(done) { + pubsub.getSubscriptions({}, done); + }); + + it('should accept just a callback', function(done) { + pubsub.getSubscriptions(done); + }); + + it('should build a project-wide query', function() { + pubsub.makeReq_ = function(method, path, q) { + var query = + 'cloud.googleapis.com/project in (/projects/' + PROJECT_ID + ')'; + assert.equal(method, 'GET'); + assert.equal(path, 'subscriptions'); + assert.equal(q.query, query); + }; + pubsub.getSubscriptions(function() {}); + }); + + it('should return Subscription instances', function() { + pubsub.getSubscriptions(function(err, subscriptions) { + assert.ifError(err); + assert(subscriptions[0] instanceof Subscription); + }); + }); + + it('should return a query if more results exist', function() { + var token = 'next-page-token'; + pubsub.makeReq_ = function(method, path, q, body, callback) { + callback(null, { nextPageToken: token }); + }; + var query = { maxResults: 1 }; + pubsub.getSubscriptions(query, function(err, subscriptions, nextQuery) { + assert.ifError(err); + assert.strictEqual(query.maxResults, nextQuery.maxResults); + assert.equal(query.pageToken, token); + }); + }); + + it('should pass error if api returns an error', function() { + var error = new Error('Error'); + pubsub.makeReq_ = function(method, path, q, body, callback) { + callback(error); + }; + pubsub.getSubscriptions(function(err) { + assert.equal(err, error); + }); + }); + }); + + it('should pass network requests to the connection object', function(done) { + var pubsub = new PubSub(); + pubsub.connection.req = done.bind(null, null); + pubsub.makeReq_(); }); }); diff --git a/test/pubsub/subscription.js b/test/pubsub/subscription.js new file mode 100644 index 00000000000..2d1ed3206e7 --- /dev/null +++ b/test/pubsub/subscription.js @@ -0,0 +1,430 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*global describe, it, beforeEach, afterEach */ + +'use strict'; + +var assert = require('assert'); +var util = require('../../lib/common/util.js'); +var Subscription = require('../../lib/pubsub/subscription.js'); + +describe('Subscription', function() { + var PROJECT_ID = 'test-project'; + var SUB_NAME = 'test-subscription'; + var SUB_FULL_NAME = '/subscriptions/' + PROJECT_ID + '/' + SUB_NAME; + var pubsubMock = { + projectId: PROJECT_ID, + makeReq_: util.noop + }; + var message = 'howdy'; + var messageBuffer = new Buffer(message).toString('base64'); + var messageObj = { + ackId: 3, + pubsubEvent: { + message: { + data: messageBuffer + } + } + }; + var expectedMessage = { id: 3, data: message }; + var subscription; + + beforeEach(function() { + subscription = new Subscription(pubsubMock, { name: SUB_NAME }); + }); + + describe('initialization', function() { + it('should format name', function(done) { + var formatName_ = Subscription.formatName_; + Subscription.formatName_ = function() { + Subscription.formatName_ = formatName_; + done(); + }; + new Subscription(pubsubMock, { name: SUB_NAME }); + }); + + it('should honor configuration settings', function() { + var CONFIG = { + name: SUB_NAME, + autoAck: true, + interval: 100 + }; + var sub = new Subscription(pubsubMock, CONFIG); + assert.strictEqual(sub.autoAck, CONFIG.autoAck); + assert.strictEqual(sub.interval, CONFIG.interval); + }); + + it('should not be closed', function() { + assert.strictEqual(subscription.closed, false); + }); + + it('should default autoAck to false if not specified', function() { + var sub = new Subscription(pubsubMock, { name: SUB_NAME }); + assert.strictEqual(sub.autoAck, false); + }); + + it('should set default interval if one is not specified', function() { + var sub = new Subscription(pubsubMock, { name: SUB_NAME }); + assert.equal(sub.interval, 10); + }); + }); + + describe('formatName_', function() { + it('should format name', function() { + var formattedName = Subscription.formatName_(PROJECT_ID, SUB_NAME); + assert.equal(formattedName, SUB_FULL_NAME); + }); + + it('should format name when given a complete name', function() { + var formattedName = Subscription.formatName_(PROJECT_ID, SUB_FULL_NAME); + assert.equal(formattedName, SUB_FULL_NAME); + }); + }); + + describe('listenForEvents_', function() { + afterEach(function() { + subscription.removeAllListeners(); + }); + + it('should start pulling once a message listener is bound', function(done) { + subscription.startPulling_ = function() { + done(); + }; + subscription.on('message', util.noop); + }); + + it('should close when no more message listeners are bound', function() { + subscription.startPulling_ = util.noop; + subscription.on('message', util.noop); + subscription.on('message', util.noop); + // 2 listeners: sub should be open. + assert.strictEqual(subscription.closed, false); + subscription.removeListener('message', util.noop); + // 1 listener: sub should be open. + assert.strictEqual(subscription.closed, false); + subscription.removeListener('message', util.noop); + // 0 listeners: sub should be closed. + assert.strictEqual(subscription.closed, true); + }); + }); + + describe('ack', function() { + it('should throw if no IDs are provided', function() { + assert.throws(function() { + subscription.ack(); + }, /At least one ID/); + assert.throws(function() { + subscription.ack([]); + }, /At least one ID/); + }); + + it('should accept a single id', function() { + assert.doesNotThrow(function() { + subscription.ack(1, util.noop); + }); + }); + + it('should accept an array of ids', function() { + assert.doesNotThrow(function() { + subscription.ack([1], util.noop); + }); + }); + + it('should make an array out of ids', function(done) { + var ID = 1; + subscription.makeReq_ = function(method, path, qs, body) { + assert.equal(body.subscription, SUB_FULL_NAME); + assert.deepEqual(body.ackId, [ID]); + done(); + }; + subscription.ack(ID, assert.ifError); + }); + + it('should make correct api request', function(done) { + var IDS = [1, 2, 3]; + subscription.makeReq_ = function(method, path, qs, body) { + assert.equal(body.subscription, SUB_FULL_NAME); + assert.deepEqual(body.ackId, IDS); + done(); + }; + subscription.ack(IDS, assert.ifError); + }); + + it('should pass callback to request', function(done) { + subscription.makeReq_ = function(method, path, qs, body, callback) { + callback(); + }; + subscription.ack(1, done); + }); + }); + + describe('pull', function() { + beforeEach(function() { + subscription.ack = util.noop; + subscription.makeReq_ = function(method, path, qs, body, callback) { + callback(null, messageObj); + }; + }); + + it('should make correct api request', function(done) { + subscription.makeReq_ = function(method, path, qs, body) { + assert.equal(method, 'POST'); + assert.equal(path, 'subscriptions/pull'); + assert.equal(body.subscription, SUB_FULL_NAME); + assert.strictEqual(body.returnImmediately, false); + done(); + }; + subscription.pull({}, assert.ifError); + }); + + it('should not require configuration options', function(done) { + subscription.pull(done); + }); + + it('should default returnImmediately to false', function(done) { + subscription.makeReq_ = function(method, path, qs, body) { + assert.strictEqual(body.returnImmediately, false); + done(); + }; + subscription.pull({}, assert.ifError); + }); + + it('should honor options', function(done) { + subscription.makeReq_ = function(method, path, qs, body) { + assert.strictEqual(body.returnImmediately, true); + done(); + }; + subscription.pull({ returnImmediately: true }, assert.ifError); + }); + + it('should pass error to callback', function(done) { + var error = new Error('Error.'); + subscription.makeReq_ = function(method, path, qs, body, callback) { + callback(error); + }; + subscription.pull(function(err) { + assert.equal(err, error); + done(); + }); + }); + + describe('autoAck false', function() { + beforeEach(function() { + subscription.autoAck = false; + }); + + it('should not ack', function() { + subscription.ack = function() { + throw new Error('Should not have acked.'); + }; + subscription.pull({}, assert.ifError); + }); + + it('should execute callback with message', function(done) { + subscription.pull({}, function(err, msg) { + assert.deepEqual(msg, expectedMessage); + done(); + }); + }); + }); + + describe('autoAck true', function() { + beforeEach(function() { + subscription.autoAck = true; + subscription.ack = function(id, callback) { + callback(); + }; + }); + + it('should ack', function(done) { + subscription.ack = function() { + done(); + }; + subscription.pull({}, assert.ifError); + }); + + it('should pass id to ack', function(done) { + subscription.ack = function(id) { + assert.equal(id, expectedMessage.id); + done(); + }; + subscription.pull({}, assert.ifError); + }); + + it('should pass callback to ack', function(done) { + subscription.pull({}, done); + }); + + it('should invoke callback with error from ack', function(done) { + var error = new Error('Error.'); + subscription.ack = function(id, callback) { + callback(error); + }; + subscription.pull({}, function(err) { + assert.equal(err, error); + done(); + }); + }); + + it('should execute callback', function(done) { + subscription.pull({}, done); + }); + }); + }); + + describe('startPulling_', function() { + beforeEach(function() { + subscription.pull = util.noop; + }); + + it('should pull at specified interval', function(done) { + var INTERVAL = 5; + subscription.pull = function(options, callback) { + // After pull is called once, overwrite with `done`. + // This is to override the function passed to `setTimeout`, so we are + // sure it's the same pull function when we execute it. + subscription.pull = function() { + done(); + }; + callback(); + }; + var setTimeout = global.setTimeout; + global.setTimeout = function(fn, interval) { + global.setTimeout = setTimeout; + assert.equal(interval, INTERVAL); + // This should execute the `done` function from when we overrided it + // above. + fn(); + }; + subscription.interval = INTERVAL; + subscription.startPulling_(); + }); + + it('should stop pulling if subscription is closed', function() { + var pulledCount = 0; + subscription.pull = function() { + if (++pulledCount === 3) { + subscription.pull = function() { + throw Error('Should have stopped pulling.'); + }; + subscription.close(); + } + }; + subscription.startPulling_(); + }); + + it('should set returnImmediately to false when pulling', function(done) { + subscription.pull = function(options) { + assert.strictEqual(options.returnImmediately, false); + done(); + }; + subscription.startPulling_(); + }); + + it('should emit an error event if one is encountered', function(done) { + var error = new Error('Error.'); + subscription.pull = function(options, callback) { + subscription.pull = function() {}; + setImmediate(function() { + callback(error); + }); + }; + subscription + .once('error', function(err) { + assert.equal(err, error); + done(); + }) + .startPulling_(); + }); + + it('should emit a message event', function(done) { + subscription.pull = function(options, callback) { + callback(null, { hi: 'there' }); + }; + subscription + .once('message', function(msg) { + assert.deepEqual(msg, { hi: 'there' }); + done(); + }); + }); + }); + + describe('delete', function() { + it('should delete a subscription', function(done) { + subscription.makeReq_ = function(method, path) { + assert.equal(method, 'DELETE'); + assert.equal(path, 'subscriptions/' + subscription.name); + done(); + }; + subscription.delete(); + }); + + it('should close a subscription once deleted', function() { + subscription.makeReq_ = function(method, path, qs, body, callback) { + callback(); + }; + subscription.closed = false; + subscription.delete(); + assert.strictEqual(subscription.closed, true); + }); + + it('should remove all listeners', function(done) { + subscription.makeReq_ = function(method, path, qs, body, callback) { + callback(); + }; + subscription.removeAllListeners = function() { + done(); + }; + subscription.delete(); + }); + + it('should execute callback when deleted', function(done) { + subscription.makeReq_ = function(method, path, qs, body, callback) { + callback(); + }; + subscription.delete(done); + }); + + it('should execute callback with an api error', function(done) { + var error = new Error('Error.'); + subscription.makeReq_ = function(method, path, qs, body, callback) { + callback(error); + }; + subscription.delete(function(err) { + assert.equal(err, error); + done(); + }); + }); + }); + + describe('formatMessage_', function() { + it('should decode stringified JSON to object', function() { + var obj = { hi: 'there' }; + var stringified = new Buffer(JSON.stringify(obj)).toString('base64'); + var msg = Subscription.formatMessage_({ + ackId: 3, + pubsubEvent: { message: { data: stringified } } + }); + assert.deepEqual(msg, { id: 3, data: obj }); + }); + + it('should decode buffer to string', function() { + var msg = Subscription.formatMessage_(messageObj); + assert.deepEqual(msg, expectedMessage); + }); + }); +}); diff --git a/test/pubsub/topic.js b/test/pubsub/topic.js new file mode 100644 index 00000000000..800076b0a52 --- /dev/null +++ b/test/pubsub/topic.js @@ -0,0 +1,341 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*global describe, it, beforeEach, afterEach */ + +'use strict'; + +var assert = require('assert'); +var util = require('../../lib/common/util.js'); + +var SubscriptionCached = require('../../lib/pubsub/subscription.js'); +var formatName_Cached = SubscriptionCached.formatName_; +var SubscriptionOverride; +var formatName_Override; + +function Subscription(a, b) { + var OverrideFn = SubscriptionOverride || SubscriptionCached; + return new OverrideFn(a, b); +} + +Subscription.formatName_ = function() { + var args = [].slice.apply(arguments); + return (formatName_Override || formatName_Cached).apply(null, args); +}; + +var Topic = require('sandboxed-module') + .require('../../lib/pubsub/topic.js', { + requires: { + './subscription.js': Subscription + } + }); + +describe('Topic', function() { + var PROJECT_ID = 'test-project'; + var TOPIC_NAME = 'test-topic'; + var pubsubMock = { + projectId: PROJECT_ID, + makeReq_: util.noop + }; + var topic; + + beforeEach(function() { + topic = new Topic(pubsubMock, { name: TOPIC_NAME }); + }); + + afterEach(function() { + SubscriptionOverride = null; + formatName_Override = null; + }); + + describe('initialization', function() { + it('should format name', function(done) { + var formatName_ = Topic.formatName_; + Topic.formatName_ = function() { + Topic.formatName_ = formatName_; + done(); + }; + new Topic(pubsubMock, { name: TOPIC_NAME }); + }); + + it('should assign projectId to `this`', function() { + assert.equal(topic.projectId, PROJECT_ID); + }); + + it('should assign pubsub object to `this`', function() { + assert.deepEqual(topic.pubsub, pubsubMock); + }); + }); + + describe('formatName_', function() { + var fullName = '/topics/' + PROJECT_ID + '/' + TOPIC_NAME; + + it('should format name', function() { + var formattedName = Topic.formatName_(PROJECT_ID, TOPIC_NAME); + assert.equal(formattedName, fullName); + }); + + it('should format name when given a complete name', function() { + var formattedName = Topic.formatName_(PROJECT_ID, fullName); + assert.equal(formattedName, fullName); + }); + }); + + describe('publishing', function() { + var message = 'howdy'; + var messageBuffer = new Buffer(message); + var messageRaw = { data: messageBuffer.toString('base64') }; + var messageObj = { test: 'object' }; + var messageObjDecoded = + new Buffer(JSON.stringify(messageObj)).toString('base64'); + + describe('publish', function() { + it('should throw if no message is provided', function() { + assert.throws(function() { + topic.publish(); + }, /empty message/); + }); + + it('should convert string to raw message', function(done) { + topic.publishRaw = function(msg) { + assert.deepEqual(msg, messageRaw); + done(); + }; + topic.publish(message, assert.ifError); + }); + + it('should convert buffer to raw message', function(done) { + topic.publishRaw = function(msg) { + assert.deepEqual(msg, messageRaw); + done(); + }; + topic.publish(messageBuffer, assert.ifError); + }); + + it('should stringify non-strings & non-buffers', function(done) { + topic.publishRaw = function(msg) { + assert.deepEqual(msg.data, messageObjDecoded); + done(); + }; + topic.publish(messageObj, assert.ifError); + }); + + it('should pass callback', function(done) { + topic.publishRaw = function(msg, callback) { + callback(); + }; + topic.publish(message, done); + }); + }); + + describe('publishRaw', function() { + it('should throw if no message is provided', function() { + assert.throws(function() { + topic.publishRaw(); + }, /empty message/); + }); + + it('should stringify non-strings & non-buffers', function(done) { + topic.makeReq_ = function(method, path, qs, body) { + assert.deepEqual(body.data, messageObjDecoded); + done(); + }; + topic.publishRaw({ data: messageObj }, assert.ifError); + }); + + it('should post raw messages to the api', function(done) { + topic.makeReq_ = function(method, path, qs, body) { + assert.equal(method, 'POST'); + assert.equal(path, 'topics/publish'); + assert.deepEqual(body.message, messageRaw.message); + done(); + }; + topic.publishRaw(messageRaw, assert.ifError); + }); + + it('should attach topic name to the request', function(done) { + topic.makeReq_ = function(method, path, qs, body) { + assert.equal(body.topic, topic.name); + done(); + }; + topic.publishRaw(messageRaw, assert.ifError); + }); + }); + }); + + describe('delete', function() { + it('should delete a topic', function(done) { + topic.makeReq_ = function(method, path) { + assert.equal(method, 'DELETE'); + assert.equal(path, 'topics/' + topic.name); + done(); + }; + topic.delete(); + }); + }); + + describe('subscriptions', function() { + var SUB_NAME = 'new-sub-name'; + var SUB_FULL_NAME = '/subscriptions/' + PROJECT_ID + '/' + SUB_NAME; + var CONFIG = { autoAck: true, interval: 90 }; + + describe('getSubscriptions', function() { + it('should call parent getSubscriptions', function(done) { + topic.pubsub.getSubscriptions = function() { + done(); + }; + topic.getSubscriptions(assert.ifError); + }); + + it('should pass query', function(done) { + var query = { pageToken: 1, maxResults: 3 }; + topic.pubsub.getSubscriptions = function(q) { + assert.strictEqual(q.pageToken, query.pageToken); + assert.strictEqual(q.maxResults, query.maxResults); + done(); + }; + topic.getSubscriptions(query, assert.ifError); + }); + + it('should pass callback', function(done) { + topic.pubsub.getSubscriptions = function(q, callback) { + callback(); + }; + topic.getSubscriptions({}, done); + }); + + it('should attach scoped topic query to query object', function(done) { + topic.pubsub.getSubscriptions = function(q) { + assert.equal( + q.query, 'pubsub.googleapis.com/topic in (' + topic.name + ')'); + done(); + }; + topic.getSubscriptions({}, assert.ifError); + }); + }); + + describe('subscribe', function() { + it('should throw if no name is provided', function() { + assert.throws(function() { + topic.subscribe(); + }, /name.*required/); + }); + + it('should not require configuration options', function(done) { + topic.makeReq_ = function(method, path, qs, body, callback) { + callback(); + }; + topic.subscribe(SUB_NAME, done); + }); + + it('should format provided name', function(done) { + formatName_Override = function() { + done(); + }; + topic.subscribe(SUB_NAME, assert.ifError); + }); + + it('should send correct request', function(done) { + topic.makeReq_ = function(method, path, qs, body) { + assert.equal(method, 'POST'); + assert.equal(path, 'subscriptions'); + assert.equal(body.topic, topic.name); + assert.equal(body.name, SUB_FULL_NAME); + done(); + }; + topic.subscribe(SUB_NAME, assert.ifError); + }); + + it('should return an api error to the callback', function(done) { + var error = new Error('Error.'); + topic.makeReq_ = function(method, path, qs, body, callback) { + callback(error); + }; + topic.subscribe(SUB_NAME, function(err) { + assert.equal(err, error); + done(); + }); + }); + + it('should create a new subscription', function(done) { + topic.subscription = function(name) { + assert.equal(name, SUB_NAME); + done(); + }; + topic.makeReq_ = function(method, path, qs, body, callback) { + callback(); + }; + topic.subscribe(SUB_NAME, assert.ifError); + }); + + it('should honor settings on the api request', function(done) { + var SEC = 90; + topic.makeReq_ = function(method, path, qs, body) { + assert.strictEqual(body.ackDeadlineSeconds, SEC); + done(); + }; + topic.subscribe(SUB_NAME, { ackDeadlineSeconds: SEC }, assert.ifError); + }); + + it('should honor settings on the subscription object', function(done) { + topic.subscription = function(name, options) { + assert.deepEqual(options, CONFIG); + done(); + }; + topic.makeReq_ = function(method, path, qs, body, callback) { + callback(); + }; + topic.subscribe(SUB_NAME, CONFIG, assert.ifError); + }); + }); + + describe('subscription', function() { + it('should throw if no name is provided', function() { + assert.throws(function() { + topic.subscription(); + }, /name.*required/); + }); + + it('should return a Subscription object', function() { + SubscriptionOverride = function() {}; + var subscription = topic.subscription(SUB_NAME, {}); + assert(subscription instanceof SubscriptionOverride); + }); + + it('should honor settings', function(done) { + SubscriptionOverride = function(pubsub, options) { + assert.deepEqual(options, CONFIG); + done(); + }; + topic.subscription(SUB_NAME, CONFIG); + }); + + it('should pass specified name to the Subscription', function(done) { + SubscriptionOverride = function(pubsub, options) { + assert.equal(options.name, SUB_NAME); + done(); + }; + topic.subscription(SUB_NAME, {}); + }); + + it('should not require options', function() { + assert.doesNotThrow(function() { + topic.subscription(SUB_NAME); + }); + }); + }); + }); +});