diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index 51fa96b86..83e5700dd 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -90,6 +90,8 @@ def create_push_subscription(project, def delete_subscription(project, subscription_name): """Deletes an existing Pub/Sub topic.""" # [START pubsub_delete_subscription] + # project = "Your Google Cloud Project ID" + # subscription_name = "Your Pubsub subscription name" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_name) @@ -138,6 +140,8 @@ def receive_messages(project, subscription_name): """Receives messages from a pull subscription.""" # [START pubsub_subscriber_async_pull] # [START pubsub_quickstart_subscriber] + # project = "Your Google Cloud Project ID" + # subscription_name = "Your Pubsub subscription name" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_name) @@ -160,6 +164,8 @@ def callback(message): def receive_messages_with_custom_attributes(project, subscription_name): """Receives messages from a pull subscription.""" # [START pubsub_subscriber_sync_pull_custom_attributes] + # project = "Your Google Cloud Project ID" + # subscription_name = "Your Pubsub subscription name" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_name) @@ -186,6 +192,8 @@ def callback(message): def receive_messages_with_flow_control(project, subscription_name): """Receives messages from a pull subscription with flow control.""" # [START pubsub_subscriber_flow_settings] + # project = "Your Google Cloud Project ID" + # subscription_name = "Your Pubsub subscription name" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_name) @@ -207,9 +215,38 @@ def callback(message): # [END pubsub_subscriber_flow_settings] +def receive_messages_synchronously(project, subscription_name): + """Pulling messages synchronously.""" + # [START pubsub_subscriber_sync_pull] + # project = "Your Google Cloud Project ID" + # subscription_name = "Your Pubsub subscription name" + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path( + project, subscription_name) + + # Builds a pull request with a specific number of messages to return. + # `return_immediately` is set to False so that the system waits (for a + # bounded amount of time) until at lease one message is available. + response = subscriber.pull( + subscription_path, + max_messages=3, + return_immediately=False) + + ack_ids = [] + for received_message in response.received_messages: + print("Received: {}".format(received_message.message.data)) + ack_ids.append(received_message.ack_id) + + # Acknowledges the received messages so they will not be sent again. + subscriber.acknowledge(subscription_path, ack_ids) + # [END pubsub_subscriber_sync_pull] + + def listen_for_errors(project, subscription_name): """Receives messages and catches errors from a pull subscription.""" # [START pubsub_subscriber_error_listener] + # project = "Your Google Cloud Project ID" + # subscription_name = "Your Pubsub subscription name" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_name) @@ -281,6 +318,11 @@ def callback(message): help=receive_messages_with_flow_control.__doc__) receive_with_flow_control_parser.add_argument('subscription_name') + receive_messages_synchronously_parser = subparsers.add_parser( + 'receive-synchronously', + help=receive_messages_synchronously.__doc__) + receive_messages_synchronously_parser.add_argument('subscription_name') + listen_for_errors_parser = subparsers.add_parser( 'listen_for_errors', help=listen_for_errors.__doc__) listen_for_errors_parser.add_argument('subscription_name') @@ -314,5 +356,8 @@ def callback(message): elif args.command == 'receive-flow-control': receive_messages_with_flow_control( args.project, args.subscription_name) + elif args.command == 'receive-synchronously': + receive_messages_synchronously( + args.project, args.subscription_name) elif args.command == 'listen_for_errors': listen_for_errors(args.project, args.subscription_name) diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index adbc44e84..728f971f2 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -188,3 +188,18 @@ def test_receive_with_flow_control( assert 'Listening' in out assert subscription in out assert 'Message 1' in out + + +def test_receive_synchronously( + publisher_client, topic, subscription, capsys): + _publish_messages(publisher_client, topic) + + with _make_sleep_patch(): + with pytest.raises(RuntimeError, match='sigil'): + subscriber.receive_messages_with_flow_control( + PROJECT, SUBSCRIPTION) + + out, _ = capsys.readouterr() + assert 'Message 1' in out + assert 'Message 2' in out + assert 'Message 3' in out