Skip to content

Commit

Permalink
[BEAM-12272] Python - Backport Firestore connector's ramp-up throttli…
Browse files Browse the repository at this point in the history
…ng to Datastore connector (#14723)

* Initial Python implementation of ramp-up throttler.

* Finish implementation, add tests.

* Add f-string.

* Fix Beam user agent, bump Datastore version

* Fix formatting

* Add throttling counter, update var name in comment.

* Add missing license

* Fix timestamp test on Windows.

* Remove custom to_runner_api_parameter
  • Loading branch information
danthev authored Jun 11, 2021
1 parent a8a8b3a commit cd2117b
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def run(argv=None):
| 'Input' >> beam.Create(list(range(num_entities)))
| 'To String' >> beam.Map(str)
| 'To Entity' >> beam.Map(EntityWrapper(kind, ancestor_key).make_entity)
| 'Write to Datastore' >> WriteToDatastore(project))
| 'Write to Datastore' >> WriteToDatastore(project, hint_num_workers=1))
p.run()

query = Query(kind=kind, project=project, ancestor=ancestor_key)
Expand Down Expand Up @@ -153,7 +153,7 @@ def run(argv=None):
_ = (
entities
| 'To Keys' >> beam.Map(lambda entity: entity.key)
| 'delete entities' >> DeleteFromDatastore(project))
| 'delete entities' >> DeleteFromDatastore(project, hint_num_workers=1))

p.run()

Expand Down
45 changes: 40 additions & 5 deletions sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from apache_beam.io.gcp.datastore.v1new import types
from apache_beam.io.gcp.datastore.v1new import util
from apache_beam.io.gcp.datastore.v1new.adaptive_throttler import AdaptiveThrottler
from apache_beam.io.gcp.datastore.v1new.rampup_throttling_fn import RampupThrottlingFn
from apache_beam.metrics.metric import Metrics
from apache_beam.transforms import Create
from apache_beam.transforms import DoFn
Expand Down Expand Up @@ -276,15 +277,33 @@ class _Mutate(PTransform):
Only idempotent Datastore mutation operations (upsert and delete) are
supported, as the commits are retried when failures occur.
"""
def __init__(self, mutate_fn):

# Default hint for the expected number of workers in the ramp-up throttling
# step for write or delete operations.
_DEFAULT_HINT_NUM_WORKERS = 500

def __init__(
self,
mutate_fn,
throttle_rampup=True,
hint_num_workers=_DEFAULT_HINT_NUM_WORKERS):
"""Initializes a Mutate transform.
Args:
mutate_fn: Instance of `DatastoreMutateFn` to use.
throttle_rampup: Whether to enforce a gradual ramp-up.
hint_num_workers: A hint for the expected number of workers, used to
estimate appropriate limits during ramp-up throttling.
"""
self._mutate_fn = mutate_fn
self._throttle_rampup = throttle_rampup
self._hint_num_workers = hint_num_workers

def expand(self, pcoll):
if self._throttle_rampup:
throttling_fn = RampupThrottlingFn(self._hint_num_workers)
pcoll = (
pcoll | 'Enforce throttling during ramp-up' >> ParDo(throttling_fn))
return pcoll | 'Write Batch to Datastore' >> ParDo(self._mutate_fn)

class DatastoreMutateFn(DoFn):
Expand Down Expand Up @@ -440,14 +459,22 @@ class WriteToDatastore(_Mutate):
property key is empty then it is filled with the project ID passed to this
transform.
"""
def __init__(self, project):
def __init__(
self,
project,
throttle_rampup=True,
hint_num_workers=_Mutate._DEFAULT_HINT_NUM_WORKERS):
"""Initialize the `WriteToDatastore` transform.
Args:
project: (:class:`str`) The ID of the project to write entities to.
throttle_rampup: Whether to enforce a gradual ramp-up.
hint_num_workers: A hint for the expected number of workers, used to
estimate appropriate limits during ramp-up throttling.
"""
mutate_fn = WriteToDatastore._DatastoreWriteFn(project)
super(WriteToDatastore, self).__init__(mutate_fn)
super(WriteToDatastore,
self).__init__(mutate_fn, throttle_rampup, hint_num_workers)

class _DatastoreWriteFn(_Mutate.DatastoreMutateFn):
def element_to_client_batch_item(self, element):
Expand Down Expand Up @@ -485,15 +512,23 @@ class DeleteFromDatastore(_Mutate):
project ID passed to this transform. If ``project`` field in key is empty then
it is filled with the project ID passed to this transform.
"""
def __init__(self, project):
def __init__(
self,
project,
throttle_rampup=True,
hint_num_workers=_Mutate._DEFAULT_HINT_NUM_WORKERS):
"""Initialize the `DeleteFromDatastore` transform.
Args:
project: (:class:`str`) The ID of the project from which the entities will
be deleted.
throttle_rampup: Whether to enforce a gradual ramp-up.
hint_num_workers: A hint for the expected number of workers, used to
estimate appropriate limits during ramp-up throttling.
"""
mutate_fn = DeleteFromDatastore._DatastoreDeleteFn(project)
super(DeleteFromDatastore, self).__init__(mutate_fn)
super(DeleteFromDatastore,
self).__init__(mutate_fn, throttle_rampup, hint_num_workers)

class _DatastoreDeleteFn(_Mutate.DatastoreMutateFn):
def element_to_client_batch_item(self, element):
Expand Down
10 changes: 9 additions & 1 deletion sdks/python/apache_beam/io/gcp/datastore/v1new/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
from typing import Union

from google.api_core import exceptions
from google.api_core.gapic_v1 import client_info
from google.cloud import environment_vars
from google.cloud.datastore import __version__
from google.cloud.datastore import client

from apache_beam.io.gcp.datastore.v1new import types
from apache_beam.version import __version__ as beam_version
from cachetools.func import ttl_cache

# https://cloud.google.com/datastore/docs/concepts/errors#error_codes
Expand All @@ -47,7 +50,12 @@
@ttl_cache(maxsize=128, ttl=3600)
def get_client(project, namespace):
"""Returns a Cloud Datastore client."""
_client = client.Client(project=project, namespace=namespace)
_client_info = client_info.ClientInfo(
client_library_version=__version__,
gapic_version=__version__,
user_agent=f'beam-python-sdk/{beam_version}')
_client = client.Client(
project=project, namespace=namespace, client_info=_client_info)
# Avoid overwriting user setting. BEAM-7608
if not os.environ.get(environment_vars.GCD_HOST, None):
_client.base_url = 'https://batch-datastore.googleapis.com' # BEAM-1387
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import datetime
import logging
import time
from typing import TypeVar

from apache_beam import typehints
from apache_beam.io.gcp.datastore.v1new import util
from apache_beam.metrics.metric import Metrics
from apache_beam.transforms import DoFn
from apache_beam.utils.retry import FuzzedExponentialIntervals

T = TypeVar('T')

_LOG = logging.getLogger(__name__)


@typehints.with_input_types(T)
@typehints.with_output_types(T)
class RampupThrottlingFn(DoFn):
"""A ``DoFn`` that throttles ramp-up following an exponential function.
An implementation of a client-side throttler that enforces a gradual ramp-up,
broadly in line with Datastore best practices. See also
https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic.
"""

_BASE_BUDGET = 500
_RAMP_UP_INTERVAL = datetime.timedelta(minutes=5)

def __init__(self, num_workers, *unused_args, **unused_kwargs):
"""Initializes a ramp-up throttler transform.
Args:
num_workers: A hint for the expected number of workers, used to derive
the local rate limit.
"""
super(RampupThrottlingFn, self).__init__(*unused_args, **unused_kwargs)
self._num_workers = num_workers
self._successful_ops = util.MovingSum(window_ms=1000, bucket_ms=1000)
self._first_instant = datetime.datetime.now()
self._throttled_secs = Metrics.counter(
RampupThrottlingFn, "cumulativeThrottlingSeconds")

def _calc_max_ops_budget(
self,
first_instant: datetime.datetime,
current_instant: datetime.datetime):
"""Function that returns per-second budget according to best practices.
The exact function is `500 / num_workers * 1.5^max(0, (x-5)/5)`, where x is
the number of minutes since start time.
"""
timedelta_since_first = current_instant - first_instant
growth = max(
0.0, (timedelta_since_first - self._RAMP_UP_INTERVAL) /
self._RAMP_UP_INTERVAL)
max_ops_budget = int(self._BASE_BUDGET / self._num_workers * (1.5**growth))
return max(1, max_ops_budget)

def process(self, element, **kwargs):
backoff = iter(
FuzzedExponentialIntervals(initial_delay_secs=1, num_retries=10000))

while True:
instant = datetime.datetime.now()
max_ops_budget = self._calc_max_ops_budget(self._first_instant, instant)
current_op_count = self._successful_ops.sum(instant.timestamp() * 1000)
available_ops = max_ops_budget - current_op_count

if available_ops > 0:
self._successful_ops.add(instant.timestamp() * 1000, 1)
yield element
break
else:
backoff_ms = next(backoff)
_LOG.info('Delaying by %sms to conform to gradual ramp-up.', backoff_ms)
time.sleep(backoff_ms)
self._throttled_secs.inc(backoff_ms)
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import datetime
import unittest

from mock import patch

from apache_beam.io.gcp.datastore.v1new.rampup_throttling_fn import RampupThrottlingFn

DATE_ZERO = datetime.datetime(
year=1970, month=1, day=1, tzinfo=datetime.timezone.utc)


class _RampupDelayException(Exception):
pass


class RampupThrottlerTransformTest(unittest.TestCase):
@patch('datetime.datetime')
@patch('time.sleep')
def test_rampup_throttling(self, mock_sleep, mock_datetime):
mock_datetime.now.return_value = DATE_ZERO
throttling_fn = RampupThrottlingFn(num_workers=1)
rampup_schedule = [
(DATE_ZERO + datetime.timedelta(seconds=0), 500),
(DATE_ZERO + datetime.timedelta(milliseconds=1), 0),
(DATE_ZERO + datetime.timedelta(seconds=1), 500),
(DATE_ZERO + datetime.timedelta(seconds=1, milliseconds=1), 0),
(DATE_ZERO + datetime.timedelta(minutes=5), 500),
(DATE_ZERO + datetime.timedelta(minutes=10), 750),
(DATE_ZERO + datetime.timedelta(minutes=15), 1125),
(DATE_ZERO + datetime.timedelta(minutes=30), 3796),
(DATE_ZERO + datetime.timedelta(minutes=60), 43248),
]

mock_sleep.side_effect = _RampupDelayException()
for date, expected_budget in rampup_schedule:
mock_datetime.now.return_value = date
for _ in range(expected_budget):
next(throttling_fn.process(None))
# Delay after budget is exhausted
with self.assertRaises(_RampupDelayException):
next(throttling_fn.process(None))


if __name__ == '__main__':
unittest.main()
10 changes: 5 additions & 5 deletions sdks/python/apache_beam/io/gcp/datastore/v1new/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@
import math

# Constants used in batched mutation RPCs:
WRITE_BATCH_INITIAL_SIZE = 200
WRITE_BATCH_INITIAL_SIZE = 50
# Max allowed Datastore writes per batch, and max bytes per batch.
# Note that the max bytes per batch set here is lower than the 10MB limit
# actually enforced by the API, to leave space for the CommitRequest wrapper
# around the mutations.
# https://cloud.google.com/datastore/docs/concepts/limits
WRITE_BATCH_MAX_SIZE = 500
WRITE_BATCH_MAX_BYTES_SIZE = 9000000
WRITE_BATCH_MIN_SIZE = 10
WRITE_BATCH_TARGET_LATENCY_MS = 5000
WRITE_BATCH_MIN_SIZE = 5
WRITE_BATCH_TARGET_LATENCY_MS = 6000


class MovingSum(object):
Expand All @@ -47,8 +47,8 @@ class MovingSum(object):
moving average tracker.
"""
def __init__(self, window_ms, bucket_ms):
if window_ms <= bucket_ms or bucket_ms <= 0:
raise ValueError("window_ms > bucket_ms > 0 please")
if window_ms < bucket_ms or bucket_ms <= 0:
raise ValueError("window_ms >= bucket_ms > 0 please")
self._num_buckets = int(math.ceil(window_ms / bucket_ms))
self._bucket_ms = bucket_ms
self._Reset(now=0) # initialize the moving window members
Expand Down
8 changes: 4 additions & 4 deletions sdks/python/apache_beam/io/gcp/datastore/v1new/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,18 @@ def test_fast_queries(self):
def test_slow_queries(self):
self._batcher.report_latency(0, 10000, 200)
self._batcher.report_latency(0, 10000, 200)
self.assertEqual(100, self._batcher.get_batch_size(0))
self.assertEqual(120, self._batcher.get_batch_size(0))

def test_size_not_below_minimum(self):
self._batcher.report_latency(0, 30000, 50)
self._batcher.report_latency(0, 30000, 50)
self._batcher.report_latency(0, 75000, 50)
self._batcher.report_latency(0, 75000, 50)
self.assertEqual(util.WRITE_BATCH_MIN_SIZE, self._batcher.get_batch_size(0))

def test_sliding_window(self):
self._batcher.report_latency(0, 30000, 50)
self._batcher.report_latency(50000, 5000, 200)
self._batcher.report_latency(100000, 5000, 200)
self.assertEqual(200, self._batcher.get_batch_size(150000))
self.assertEqual(240, self._batcher.get_batch_size(150000))


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def get_version():
'cachetools>=3.1.0,<5',
'google-apitools>=0.5.31,<0.5.32',
'google-auth>=1.18.0,<2',
'google-cloud-datastore>=1.7.1,<2',
'google-cloud-datastore>=1.8.0,<2',
'google-cloud-pubsub>=0.39.0,<2',
# GCP packages required by tests
'google-cloud-bigquery>=1.6.0,<3',
Expand Down

0 comments on commit cd2117b

Please sign in to comment.