diff --git a/activesupport/CHANGELOG.md b/activesupport/CHANGELOG.md index 19bc05993c8a4..824c4859dce58 100644 --- a/activesupport/CHANGELOG.md +++ b/activesupport/CHANGELOG.md @@ -1,3 +1,21 @@ +* Introduce `ActiveSupport::ReplicationCoordinator` to support application behavior across multiple availability zones. + + For applications that replicate the database across multiple availability zones, an abstract + base class `ActiveSupport::ReplicationCoordinator::Base` is introduced to simplify how + applications discover whether they are in the active zone, and react to changes like + failovers. It manages a background thread to periodically poll and cache the active zone state, + and invokes callbacks when there are active/passive state changes. + + By default, Rails applications will be configured to use + `ActiveSupport::ReplicationCoordinator::SingleZone` which always indicates that the app is in an + "active" zone. + + Applications can build a custom replication coordinator and configure their app to use it by + setting `config.replication_coordinator`. + + A test helper class `ActiveSupport::Testing::ReplicationCoordinator` is also introduced to + simplify testing application behavior during active/passive state changes. + * Add public API for `before_fork_hook` in parallel testing. Introduces a public API for calling the before fork hooks implemented by parallel testing. diff --git a/activesupport/lib/active_support.rb b/activesupport/lib/active_support.rb index 5f8b61f32c405..073e1d8e7fd40 100644 --- a/activesupport/lib/active_support.rb +++ b/activesupport/lib/active_support.rb @@ -54,6 +54,7 @@ module ActiveSupport autoload :IsolatedExecutionState autoload :Notifications autoload :Reloader + autoload :ReplicationCoordinator autoload :SecureCompareRotator eager_autoload do diff --git a/activesupport/lib/active_support/replication_coordinator.rb b/activesupport/lib/active_support/replication_coordinator.rb new file mode 100644 index 0000000000000..8ff916569b140 --- /dev/null +++ b/activesupport/lib/active_support/replication_coordinator.rb @@ -0,0 +1,331 @@ +# frozen_string_literal: true + +require "active_support/concurrency/share_lock" + +module ActiveSupport + # = Active Support Replication Coordinator + # + # Provides an interface for responding to changes in active/passive state across multiple + # availability zones. + # + # == Replication, Availability Zones, and Active-Passive State + # + # A common deployment topology for Rails applications is to have application servers running in + # multiple availability zones, with a single database that is replicated across these zones. + # + # In such deployment, application code may need to determine whether it is running an "active" + # zone and is responsible for writing to the database, or in a "passive" or "standby" zone that + # primarily reads from the zone-local database replica. And, in case of a zone failure, the + # application may need to dynamically promote a passive zone to become the active zone. + # + # The term "Passive" here is intended to include deployments in which the non-active zones are + # handling read requests, and potentially even performing occasional writes back to the active + # zone over an inter-AZ network link. The exact interpretation depends on the nature of the + # replication strategy and your deployment topology. + # + # Some example scenarios where knowing the replication state is important: + # + # - Custom database selector middleware + # - Controlling background jobs that should only run in an active zone + # - Deciding whether to preheat fragment caches for "next page" paginated results (which may not + # be cached in time if relying on an inter-AZ network link and replication lag). + # + # The two classes provided by this module are: + # + # - ReplicationCoordinator::Base: An abstract base class that provides a monitoring + # mechanism to fetch and cache the replication state on a configurable time interval and notify + # when that state changes. + # - ReplicationCoordinator::SingleZone: A concrete implementation that always + # indicates an active zone, and so it represents the default behavior for a single-zone + # deployment that does not use database replication. + # + # == Custom Replication Coordinators + # + # By default, every Rails application is configured to use the SingleZone replication + # coordinator. To configure Rails to use your own replication coordinator, first create a class + # that subclasses ActiveSupport::ReplicationCoordinator::Base: + # + # class CustomReplicationCoordinator < ActiveSupport::ReplicationCoordinator::Base + # def fetch_active_zone + # # Custom logic to determine if the local zone is active and return a boolean + # end + # end + # + # Then configure Rails with an initializer: + # + # Rails.application.configure do + # config.before_initialize do + # config.replication_coordinator = CustomReplicationCoordinator.new(polling_interval: 2.seconds) + # end + # end + # + # == Development and Auto-reloading + # + # The replication coordinator is loaded once and is not monitored for changes. You will have to + # restart the server for changes to be reflected in a running application. + # + # For testing the behavior of code during active/passive state changes, please see the test helper + # class ActiveSupport::Testing::ReplicationCoordinator. + module ReplicationCoordinator + # = Replication Coordinator Abstract Base Class + # + # An abstract base class that provides a monitoring mechanism to fetch and cache the replication + # state on a configurable time interval and notify when that state changes. + # + # Subclasses must only implement #fetch_active_zone, which returns a boolean indicating whether + # the caller is in an active zone. This method may be expensive, so the class uses a + # {Concurrent::TimerTask}[https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/TimerTask.html] + # to manage a background thread o periodically check (and cache) this value. The current cached + # status can cheaply be inspected with #active_zone?. The refresh interval can be set by passing + # a +polling_interval+ option to the constructor. + # + # The background thread will be implicitly started the first time any of these methods is + # called: + # + # - #active_zone? + # - #on_active_zone + # - #on_passive_zone + # + # or it can be explicitly started by calling #start_monitoring. + # + # Note: After a fork, the background thread will not be running; but it will be restarted + # implicitly once any of the above methods are called. + # + # When monitoring is running, registered callbacks are invoked whenever an active zone change is + # detected. + # + # == Basic usage + # + # class CustomReplicationCoordinator < ActiveSupport::ReplicationCoordinator::Base + # def fetch_active_zone + # # Custom logic to determine if the local zone is active + # end + # end + # + # coordinator = CustomReplicationCoordinator.new(polling_interval: 10.seconds) + # + # coordinator.active_zone? # Immediately returns the cached value + # + # coordinator.on_active_zone do |coordinator| + # puts "This zone is now active" + # # Start processes or threads that should only run in the active zone + # end + # + # coordinator.on_passive_zone do |coordinator| + # puts "This zone is now passive" + # # Stop processes or threads that should only run in the active zone + # end + # + # # Start a background thread to monitor the active zone status and invoke the callbacks on changes + # coordinator.start_monitoring + # + # coordinator.updated_at # Returns the last time the active zone status was checked + # + # Subclasses must implement #fetch_active_zone + class Base + attr_reader :state_change_hooks, :polling_interval, :executor, :logger + + # Initialize a new coordinator instance. + # + # [+polling_interval+] How often to refresh active zone status (default: 5 seconds) + def initialize(polling_interval: 5, executor: ActiveSupport::Executor, logger: nil) + @polling_interval = polling_interval + @executor = executor + @logger = logger || (defined?(Rails.logger) && Rails.logger) + @state_change_hooks = { active: [], passive: [] } + + @timer_task = nil + @active_zone = nil + @active_zone_updated_at = nil + @lock = ActiveSupport::Concurrency::ShareLock.new + end + + # Determine if the local zone is active. + # + # This method must be implemented by subclasses to define the logic for determining if the + # local zone is active. The return value is used to trigger state change hooks when the active + # zone changes. + # + # It's assumed that this method may be slow, so ReplicationCoordinator has a background thread + # that calls this method every +polling_interval+ seconds, and caches the result which is + # returned by #active_zone? + # + # Returns +true+ if the local zone is active, +false+ otherwise. + def fetch_active_zone + raise NotImplementedError + end + + # Returns +true+ if the local zone is active, +false+ otherwise. + # Also starts monitoring if it has not already been started. + # + # This always returns a cached value. + def active_zone? + start_monitoring + @active_zone # No need to use a read lock + end + + # Returns the time at which the current value of #active_zone? was fetched, or +nil+ if no + # value has yet been fetched. + # + # This always returns a cached value. + def updated_at + @active_zone_updated_at # No need to use a read lock + end + + # Start monitoring for active zone changes. + # + # This starts a Concurrent::TimerTask to periodically refresh the active zone status. If a + # change is detected, then the appropriate state change callbacks will be invoked. + def start_monitoring + check_active_zone(skip_when_set: true) + timer_task&.execute unless @timer_task&.running? + end + + # Stop monitoring for active zone changes. + # + # This stops the Concurrent::TimerTask, if it is running. + def stop_monitoring + @timer_task&.shutdown + end + + # Register a callback to be executed when the local zone becomes active. + # Also starts monitoring if it has not already been started. + # + # The callback will be immediately executed if this zone is currently active. + # + # [+block+] callback to execute when zone becomes active + # + # Yields the coordinator instance to the block. + def on_active_zone(&block) + start_monitoring + state_change_hooks[:active] << block + block.call(self) if active_zone? + end + + # Register a callback to be executed when the local zone becomes passive. + # Also starts monitoring if it has not already been started. + # + # The callback will be immediately executed if this zone is not currently active. + # + # [+block+] callback to execute when zone becomes passive + # + # Yields the coordinator instance to the block. + def on_passive_zone(&block) + start_monitoring + state_change_hooks[:passive] << block + block.call(self) if !active_zone? + end + + # Clear all registered state_change hooks. + def clear_hooks + state_change_hooks[:active] = [] + state_change_hooks[:passive] = [] + end + + private + def check_active_zone(skip_when_set: false) + return if skip_when_set && !@active_zone.nil? + + # Acquire an exclusive lock to mitigate a thundering herd problem when multiple threads + # might all call active_zone? for the first time at the same time. + if @lock.start_exclusive(no_wait: true) + begin + old_active_zone = @active_zone + @active_zone = executor_wrap { fetch_active_zone } + @active_zone_updated_at = Time.now + ensure + @lock.stop_exclusive + end + + if old_active_zone.nil? || old_active_zone != @active_zone + if @active_zone + logger&.info "#{self.class}: pid #{$$}: switching to active" + run_active_zone_hooks + else + logger&.info "#{self.class}: pid #{$$}: switching to passive" + run_passive_zone_hooks + end + end + else + @lock.sharing { } + end + end + + def executor_wrap(&block) + if @executor + @executor.wrap(&block) + else + yield + end + end + + def run_active_zone_hooks + run_hooks_for(:active) + end + + def run_passive_zone_hooks + run_hooks_for(:passive) + end + + def run_hooks_for(event) + state_change_hooks.fetch(event, []).each do |block| + block.call(self) + rescue Exception => exception + handle_thread_error(exception) + end + end + + def timer_task + @timer_task ||= begin + task = Concurrent::TimerTask.new(execution_interval: polling_interval) do + check_active_zone + end + + task.add_observer do |_, _, error| + if error + executor.error_reporter&.report(error, handled: false, source: "replication_coordinator.active_support") + logger&.error("#{error.detailed_message}: could not check #{self.class} active zone") + end + end + + # The thread-based timer task needs to be recreated after a fork. + # FIXME: this callback is keeping a reference on the instance, + # but only on active instances, and there should only be one of those. + ActiveSupport::ForkTracker.after_fork { @timer_task = nil } + + task + end + end + end + + # = "Single Zone" Replication Coordinator + # + # A concrete implementation that always indicates an active zone, and so it represents the + # default behavior for a single-zone deployment that does not use database replication. + # + # This is a simple implementation that always returns +true+ from #active_zone? + # + # Note that this class does not use a background thread, since there is no need to monitor the + # constant +true+ value. + # + # == Basic usage + # + # rc = ActiveSupport::ReplicationCoordinator::SingleZone.new + # rc.active_zone? #=> true + # rc.on_active_zone { puts "Will always be called" } + # rc.on_passive_zone { puts "Will never be called" } + class SingleZone < Base + # Always returns true, indicating this zone is active. + # + # Returns true. + def fetch_active_zone + true + end + + private + def timer_task + # No-op implementation since no monitoring is needed. + end + end + end +end diff --git a/activesupport/lib/active_support/testing/replication_coordinator.rb b/activesupport/lib/active_support/testing/replication_coordinator.rb new file mode 100644 index 0000000000000..66b82d7ba8d32 --- /dev/null +++ b/activesupport/lib/active_support/testing/replication_coordinator.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +module ActiveSupport + module Testing + # ReplicationCoordinator is a test helper implementing + # ActiveSupport::ReplicationCoordinator::Base that can be used to test the behavior of objects + # that depend on replication state. + class ReplicationCoordinator < ActiveSupport::ReplicationCoordinator::Base + # Returns the number of times #fetch_active_zone has been called. + attr_reader :fetch_count + + # Initializes the replication coordinator with an initial active zone state. + # + # The replication coordinator can be initialized with an initial active zone state using the + # optional +active_zone+ parameter, which defaults to +true+. + def initialize(active_zone = true, **options) + @next_active_zone = active_zone + @fetch_count = 0 + super(**options) + end + + # Sets the value that will next be returned by #fetch_active_zone, simulating an external + # replication state change. + def set_next_active_zone(active_zone) + @next_active_zone = active_zone + end + + def fetch_active_zone # :nodoc: + @fetch_count += 1 + @next_active_zone + end + end + end +end diff --git a/activesupport/test/replication_coordinator_test.rb b/activesupport/test/replication_coordinator_test.rb new file mode 100644 index 0000000000000..d228db350c3f3 --- /dev/null +++ b/activesupport/test/replication_coordinator_test.rb @@ -0,0 +1,232 @@ +# frozen_string_literal: true + +require "active_support/replication_coordinator" +require "active_support/testing/replication_coordinator" +require "active_support/error_reporter/test_helper" + +class ReplicationCoordinatorTest < ActiveSupport::TestCase + test "polling_interval can be set and has a good default" do + klass = Class.new(ActiveSupport::ReplicationCoordinator::Base) do + def fetch_active_zone + true + end + end + + assert_equal 5, klass.new.polling_interval + assert_equal 1, klass.new(polling_interval: 1).polling_interval + end + + test "fetch_active_zone is cached" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(polling_interval: 9999) + rc.start_monitoring + + 10.times { rc.active_zone? } + 10.times { rc.on_active_zone { } } + 10.times { rc.on_passive_zone { } } + + assert_equal 1, rc.fetch_count + end + + test "updated_at is set whenever monitoring calls fetch_active_zone" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(true, polling_interval: 0.01) + rc.active_zone? + original_time = rc.updated_at + + rc.start_monitoring + Timeout.timeout(0.1) { sleep 0.01 while rc.fetch_count < 2 } + rc.stop_monitoring + + assert rc.updated_at > original_time + end + + test "the initial fetch is guarded against a thundering herd" do + rc = Class.new(ActiveSupport::ReplicationCoordinator::Base) do + attr_reader :fetch_count + + def initialize(...) + @fetch_count = 0 + super + end + + def fetch_active_zone + @fetch_count += 1 + sleep 0.1 + true + end + end.new + + 10.times.map { Thread.new { rc.active_zone? } }.map(&:join) + + assert_equal 1, rc.fetch_count + end + + test "the initial active_zone? fetches once" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(true) + assert_equal 0, rc.fetch_count + assert_nil rc.updated_at + + freeze_time do + rc.active_zone? + assert_equal 1, rc.fetch_count + assert_equal Time.now, rc.updated_at + end + end + + test "active_zone? starts monitoring" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(true, polling_interval: 0.01) + + rc.active_zone? + + Timeout.timeout(0.1) { sleep 0.01 while rc.fetch_count < 3 } + assert rc.fetch_count >= 3 + end + + test "the initial on_active_zone fetches once" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(true) + assert_equal 0, rc.fetch_count + assert_nil rc.updated_at + + freeze_time do + rc.on_active_zone { } + assert_equal 1, rc.fetch_count + assert_equal Time.now, rc.updated_at + end + end + + test "on_active_zone hooks are called once, immediately upon registration" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(true) + active_cb_count = passive_cb_count = 0 + + rc.on_active_zone { active_cb_count += 1 } + rc.on_passive_zone { passive_cb_count += 1 } + + assert_equal 1, active_cb_count + assert_equal 0, passive_cb_count + end + + test "on_active_zone starts monitoring" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(true, polling_interval: 0.01) + + rc.on_active_zone { } + + Timeout.timeout(0.1) { sleep 0.01 while rc.fetch_count < 3 } + assert rc.fetch_count >= 3 + end + + test "the initial on_passive_zone fetches once" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(true) + assert_equal 0, rc.fetch_count + assert_nil rc.updated_at + + freeze_time do + rc.on_passive_zone { } + assert_equal 1, rc.fetch_count + assert_equal Time.now, rc.updated_at + end + end + + test "on_passive_zone hooks are called once, immediately upon registration" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(false) + active_cb_count = passive_cb_count = 0 + + rc.on_passive_zone { passive_cb_count += 1 } + rc.on_active_zone { active_cb_count += 1 } + + assert_equal 0, active_cb_count + assert_equal 1, passive_cb_count + end + + test "on_passive_zone starts monitoring" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(false, polling_interval: 0.01) + + rc.on_passive_zone { } + + Timeout.timeout(0.1) { sleep 0.01 while rc.fetch_count < 3 } + assert rc.fetch_count >= 3 + end + + test "hooks are called upon transition while monitoring" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(false, polling_interval: 0.01) + active_cb_count = passive_cb_count = 0 + rc.on_active_zone { active_cb_count += 1 } + rc.on_passive_zone { passive_cb_count += 1 } + + active_cb_count = passive_cb_count = 0 + rc.set_next_active_zone(true) + + Timeout.timeout(0.1) { sleep 0.01 while active_cb_count == 0 } + + assert_equal 1, active_cb_count + assert_equal 0, passive_cb_count + + active_cb_count = passive_cb_count = 0 + rc.set_next_active_zone(false) + + Timeout.timeout(0.1) { sleep 0.01 while passive_cb_count == 0 } + + assert_equal 0, active_cb_count + assert_equal 1, passive_cb_count + ensure + rc&.stop_monitoring + end + + test "errors are logged when raised calling fetch_active_zone from the timer task" do + klass = Class.new(ActiveSupport::ReplicationCoordinator::Base) do + attr_reader :fetch_count + + def initialize(...) + @fetch_count = 0 + super + end + + def fetch_active_zone + @fetch_count += 1 + raise "Simulated exception resolving active zone" if @fetch_count == 3 + true + end + end + + rc = klass.new(polling_interval: 0.01) + + subscriber = ActiveSupport::ErrorReporter::TestHelper::ErrorSubscriber.new + ActiveSupport.error_reporter.subscribe(subscriber) + + rc.start_monitoring + Timeout.timeout(0.1) { sleep 0.01 while rc.fetch_count < 6 } + + assert rc.fetch_count >= 6 # the timer task continues running after an error + + assert_equal 1, subscriber.events.count + assert_equal "Simulated exception resolving active zone", subscriber.events[0][0].message + ensure + rc&.stop_monitoring + end + + test "monitoring will be organically restarted after forking" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(polling_interval: 0.01) + rc.active_zone? + + out, _ = capture_subprocess_io do + child = Process.fork do + initial_fetch_count = rc.fetch_count + rc.active_zone? + + begin + Timeout.timeout(0.1) { sleep 0.01 while rc.fetch_count < initial_fetch_count + 5 } + puts "OK" + rescue Timeout::Error + puts "FAIL" + end + end + Process.wait child + end + + assert_equal "OK", out.chomp + end + + test "SingleZone can be used for the default always-active behavior" do + rc = ActiveSupport::ReplicationCoordinator::SingleZone.new + assert rc.active_zone? + assert_nil rc.instance_variable_get(:@timer_task) # No timer task is created for SingleZone + end +end diff --git a/guides/source/configuring.md b/guides/source/configuring.md index 009181ac14972..50cfe05e5b908 100644 --- a/guides/source/configuring.md +++ b/guides/source/configuring.md @@ -523,6 +523,15 @@ is `ENV['RAILS_RELATIVE_URL_ROOT']`. Enables or disables reloading of classes only when tracked files change. By default tracks everything on autoload paths and is set to `true`. If `config.enable_reloading` is `false`, this option is ignored. +#### `config.replication_coordinator` + +Set this to configure a custom replication coordinator for your application. A replication +coordinator provides an interface for active/passive state in a deployment across multiple +availability zones. See the [`ActiveSupport::ReplicationCoordinator` API +documentation](https://api.rubyonrails.org/classes/ActiveSupport/ReplicationCoordinator.html). + +Defaults to `ActiveSupport::ReplicationCoordinator::SingleZone.new`. + #### `config.require_master_key` Causes the app to not boot if a master key hasn't been made available through `ENV["RAILS_MASTER_KEY"]` or the `config/master.key` file. diff --git a/railties/lib/rails/application/configuration.rb b/railties/lib/rails/application/configuration.rb index 63b2345ddadbc..7893dc7f36f4a 100644 --- a/railties/lib/rails/application/configuration.rb +++ b/railties/lib/rails/application/configuration.rb @@ -24,7 +24,7 @@ class Configuration < ::Rails::Engine::Configuration :content_security_policy_nonce_auto, :require_master_key, :credentials, :disable_sandbox, :sandbox_by_default, :add_autoload_paths_to_load_path, :rake_eager_load, :server_timing, :log_file_size, - :dom_testing_default_html_version, :yjit + :dom_testing_default_html_version, :yjit, :replication_coordinator attr_reader :encoding, :api_only, :loaded_config_version, :log_level @@ -85,6 +85,7 @@ def initialize(*) @server_timing = false @dom_testing_default_html_version = :html4 @yjit = false + @replication_coordinator = ActiveSupport::ReplicationCoordinator::SingleZone.new end # Loads default configuration values for a target version. This includes