From ed57fc7a886571f10657b0322a8846c09ba29d73 Mon Sep 17 00:00:00 2001 From: Mike Dalessio Date: Sat, 7 Jun 2025 14:26:25 -0400 Subject: [PATCH] Introduce ReplicationCoordinator to support multiple AZs It's common for applications that are deployed across multiple availability zones (using a replicated database) to create an ad-hoc method for processes to discover which zone is "active", meaning the zone primarily responsible for writing to the database. For example, a team may choose to use a MySQL system variable to indicate the data center where the primary database sits. In which case, they need to write code to make sure all Rails processes in all zones query this efficiently (it may be slow to access in non-primary zones) and are notified if the primary zone changes, as in the case of a data center failover. `ReplicationCoordinator::Base` is introduced to allow developers to write code that determines whether a process is in an active zone, and then: - monitor and cache that value, with configurable polling interval - fire callbacks when the state changes from active -> passive or vice versa Additionally, a test helper class is provided to simplify testing failover behavior. Finally, Rails is configured by default to use a simple concrete replication coordinator, `SingleZone`, which always indicates the caller is in an active zone. --- activesupport/CHANGELOG.md | 18 + activesupport/lib/active_support.rb | 1 + .../active_support/replication_coordinator.rb | 331 ++++++++++++++++++ .../testing/replication_coordinator.rb | 34 ++ .../test/replication_coordinator_test.rb | 232 ++++++++++++ guides/source/configuring.md | 9 + .../lib/rails/application/configuration.rb | 3 +- 7 files changed, 627 insertions(+), 1 deletion(-) create mode 100644 activesupport/lib/active_support/replication_coordinator.rb create mode 100644 activesupport/lib/active_support/testing/replication_coordinator.rb create mode 100644 activesupport/test/replication_coordinator_test.rb diff --git a/activesupport/CHANGELOG.md b/activesupport/CHANGELOG.md index 19bc05993c8a4..824c4859dce58 100644 --- a/activesupport/CHANGELOG.md +++ b/activesupport/CHANGELOG.md @@ -1,3 +1,21 @@ +* Introduce `ActiveSupport::ReplicationCoordinator` to support application behavior across multiple availability zones. + + For applications that replicate the database across multiple availability zones, an abstract + base class `ActiveSupport::ReplicationCoordinator::Base` is introduced to simplify how + applications discover whether they are in the active zone, and react to changes like + failovers. It manages a background thread to periodically poll and cache the active zone state, + and invokes callbacks when there are active/passive state changes. + + By default, Rails applications will be configured to use + `ActiveSupport::ReplicationCoordinator::SingleZone` which always indicates that the app is in an + "active" zone. + + Applications can build a custom replication coordinator and configure their app to use it by + setting `config.replication_coordinator`. + + A test helper class `ActiveSupport::Testing::ReplicationCoordinator` is also introduced to + simplify testing application behavior during active/passive state changes. + * Add public API for `before_fork_hook` in parallel testing. Introduces a public API for calling the before fork hooks implemented by parallel testing. diff --git a/activesupport/lib/active_support.rb b/activesupport/lib/active_support.rb index 5f8b61f32c405..073e1d8e7fd40 100644 --- a/activesupport/lib/active_support.rb +++ b/activesupport/lib/active_support.rb @@ -54,6 +54,7 @@ module ActiveSupport autoload :IsolatedExecutionState autoload :Notifications autoload :Reloader + autoload :ReplicationCoordinator autoload :SecureCompareRotator eager_autoload do diff --git a/activesupport/lib/active_support/replication_coordinator.rb b/activesupport/lib/active_support/replication_coordinator.rb new file mode 100644 index 0000000000000..8ff916569b140 --- /dev/null +++ b/activesupport/lib/active_support/replication_coordinator.rb @@ -0,0 +1,331 @@ +# frozen_string_literal: true + +require "active_support/concurrency/share_lock" + +module ActiveSupport + # = Active Support Replication Coordinator + # + # Provides an interface for responding to changes in active/passive state across multiple + # availability zones. + # + # == Replication, Availability Zones, and Active-Passive State + # + # A common deployment topology for Rails applications is to have application servers running in + # multiple availability zones, with a single database that is replicated across these zones. + # + # In such deployment, application code may need to determine whether it is running an "active" + # zone and is responsible for writing to the database, or in a "passive" or "standby" zone that + # primarily reads from the zone-local database replica. And, in case of a zone failure, the + # application may need to dynamically promote a passive zone to become the active zone. + # + # The term "Passive" here is intended to include deployments in which the non-active zones are + # handling read requests, and potentially even performing occasional writes back to the active + # zone over an inter-AZ network link. The exact interpretation depends on the nature of the + # replication strategy and your deployment topology. + # + # Some example scenarios where knowing the replication state is important: + # + # - Custom database selector middleware + # - Controlling background jobs that should only run in an active zone + # - Deciding whether to preheat fragment caches for "next page" paginated results (which may not + # be cached in time if relying on an inter-AZ network link and replication lag). + # + # The two classes provided by this module are: + # + # - ReplicationCoordinator::Base: An abstract base class that provides a monitoring + # mechanism to fetch and cache the replication state on a configurable time interval and notify + # when that state changes. + # - ReplicationCoordinator::SingleZone: A concrete implementation that always + # indicates an active zone, and so it represents the default behavior for a single-zone + # deployment that does not use database replication. + # + # == Custom Replication Coordinators + # + # By default, every Rails application is configured to use the SingleZone replication + # coordinator. To configure Rails to use your own replication coordinator, first create a class + # that subclasses ActiveSupport::ReplicationCoordinator::Base: + # + # class CustomReplicationCoordinator < ActiveSupport::ReplicationCoordinator::Base + # def fetch_active_zone + # # Custom logic to determine if the local zone is active and return a boolean + # end + # end + # + # Then configure Rails with an initializer: + # + # Rails.application.configure do + # config.before_initialize do + # config.replication_coordinator = CustomReplicationCoordinator.new(polling_interval: 2.seconds) + # end + # end + # + # == Development and Auto-reloading + # + # The replication coordinator is loaded once and is not monitored for changes. You will have to + # restart the server for changes to be reflected in a running application. + # + # For testing the behavior of code during active/passive state changes, please see the test helper + # class ActiveSupport::Testing::ReplicationCoordinator. + module ReplicationCoordinator + # = Replication Coordinator Abstract Base Class + # + # An abstract base class that provides a monitoring mechanism to fetch and cache the replication + # state on a configurable time interval and notify when that state changes. + # + # Subclasses must only implement #fetch_active_zone, which returns a boolean indicating whether + # the caller is in an active zone. This method may be expensive, so the class uses a + # {Concurrent::TimerTask}[https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/TimerTask.html] + # to manage a background thread o periodically check (and cache) this value. The current cached + # status can cheaply be inspected with #active_zone?. The refresh interval can be set by passing + # a +polling_interval+ option to the constructor. + # + # The background thread will be implicitly started the first time any of these methods is + # called: + # + # - #active_zone? + # - #on_active_zone + # - #on_passive_zone + # + # or it can be explicitly started by calling #start_monitoring. + # + # Note: After a fork, the background thread will not be running; but it will be restarted + # implicitly once any of the above methods are called. + # + # When monitoring is running, registered callbacks are invoked whenever an active zone change is + # detected. + # + # == Basic usage + # + # class CustomReplicationCoordinator < ActiveSupport::ReplicationCoordinator::Base + # def fetch_active_zone + # # Custom logic to determine if the local zone is active + # end + # end + # + # coordinator = CustomReplicationCoordinator.new(polling_interval: 10.seconds) + # + # coordinator.active_zone? # Immediately returns the cached value + # + # coordinator.on_active_zone do |coordinator| + # puts "This zone is now active" + # # Start processes or threads that should only run in the active zone + # end + # + # coordinator.on_passive_zone do |coordinator| + # puts "This zone is now passive" + # # Stop processes or threads that should only run in the active zone + # end + # + # # Start a background thread to monitor the active zone status and invoke the callbacks on changes + # coordinator.start_monitoring + # + # coordinator.updated_at # Returns the last time the active zone status was checked + # + # Subclasses must implement #fetch_active_zone + class Base + attr_reader :state_change_hooks, :polling_interval, :executor, :logger + + # Initialize a new coordinator instance. + # + # [+polling_interval+] How often to refresh active zone status (default: 5 seconds) + def initialize(polling_interval: 5, executor: ActiveSupport::Executor, logger: nil) + @polling_interval = polling_interval + @executor = executor + @logger = logger || (defined?(Rails.logger) && Rails.logger) + @state_change_hooks = { active: [], passive: [] } + + @timer_task = nil + @active_zone = nil + @active_zone_updated_at = nil + @lock = ActiveSupport::Concurrency::ShareLock.new + end + + # Determine if the local zone is active. + # + # This method must be implemented by subclasses to define the logic for determining if the + # local zone is active. The return value is used to trigger state change hooks when the active + # zone changes. + # + # It's assumed that this method may be slow, so ReplicationCoordinator has a background thread + # that calls this method every +polling_interval+ seconds, and caches the result which is + # returned by #active_zone? + # + # Returns +true+ if the local zone is active, +false+ otherwise. + def fetch_active_zone + raise NotImplementedError + end + + # Returns +true+ if the local zone is active, +false+ otherwise. + # Also starts monitoring if it has not already been started. + # + # This always returns a cached value. + def active_zone? + start_monitoring + @active_zone # No need to use a read lock + end + + # Returns the time at which the current value of #active_zone? was fetched, or +nil+ if no + # value has yet been fetched. + # + # This always returns a cached value. + def updated_at + @active_zone_updated_at # No need to use a read lock + end + + # Start monitoring for active zone changes. + # + # This starts a Concurrent::TimerTask to periodically refresh the active zone status. If a + # change is detected, then the appropriate state change callbacks will be invoked. + def start_monitoring + check_active_zone(skip_when_set: true) + timer_task&.execute unless @timer_task&.running? + end + + # Stop monitoring for active zone changes. + # + # This stops the Concurrent::TimerTask, if it is running. + def stop_monitoring + @timer_task&.shutdown + end + + # Register a callback to be executed when the local zone becomes active. + # Also starts monitoring if it has not already been started. + # + # The callback will be immediately executed if this zone is currently active. + # + # [+block+] callback to execute when zone becomes active + # + # Yields the coordinator instance to the block. + def on_active_zone(&block) + start_monitoring + state_change_hooks[:active] << block + block.call(self) if active_zone? + end + + # Register a callback to be executed when the local zone becomes passive. + # Also starts monitoring if it has not already been started. + # + # The callback will be immediately executed if this zone is not currently active. + # + # [+block+] callback to execute when zone becomes passive + # + # Yields the coordinator instance to the block. + def on_passive_zone(&block) + start_monitoring + state_change_hooks[:passive] << block + block.call(self) if !active_zone? + end + + # Clear all registered state_change hooks. + def clear_hooks + state_change_hooks[:active] = [] + state_change_hooks[:passive] = [] + end + + private + def check_active_zone(skip_when_set: false) + return if skip_when_set && !@active_zone.nil? + + # Acquire an exclusive lock to mitigate a thundering herd problem when multiple threads + # might all call active_zone? for the first time at the same time. + if @lock.start_exclusive(no_wait: true) + begin + old_active_zone = @active_zone + @active_zone = executor_wrap { fetch_active_zone } + @active_zone_updated_at = Time.now + ensure + @lock.stop_exclusive + end + + if old_active_zone.nil? || old_active_zone != @active_zone + if @active_zone + logger&.info "#{self.class}: pid #{$$}: switching to active" + run_active_zone_hooks + else + logger&.info "#{self.class}: pid #{$$}: switching to passive" + run_passive_zone_hooks + end + end + else + @lock.sharing { } + end + end + + def executor_wrap(&block) + if @executor + @executor.wrap(&block) + else + yield + end + end + + def run_active_zone_hooks + run_hooks_for(:active) + end + + def run_passive_zone_hooks + run_hooks_for(:passive) + end + + def run_hooks_for(event) + state_change_hooks.fetch(event, []).each do |block| + block.call(self) + rescue Exception => exception + handle_thread_error(exception) + end + end + + def timer_task + @timer_task ||= begin + task = Concurrent::TimerTask.new(execution_interval: polling_interval) do + check_active_zone + end + + task.add_observer do |_, _, error| + if error + executor.error_reporter&.report(error, handled: false, source: "replication_coordinator.active_support") + logger&.error("#{error.detailed_message}: could not check #{self.class} active zone") + end + end + + # The thread-based timer task needs to be recreated after a fork. + # FIXME: this callback is keeping a reference on the instance, + # but only on active instances, and there should only be one of those. + ActiveSupport::ForkTracker.after_fork { @timer_task = nil } + + task + end + end + end + + # = "Single Zone" Replication Coordinator + # + # A concrete implementation that always indicates an active zone, and so it represents the + # default behavior for a single-zone deployment that does not use database replication. + # + # This is a simple implementation that always returns +true+ from #active_zone? + # + # Note that this class does not use a background thread, since there is no need to monitor the + # constant +true+ value. + # + # == Basic usage + # + # rc = ActiveSupport::ReplicationCoordinator::SingleZone.new + # rc.active_zone? #=> true + # rc.on_active_zone { puts "Will always be called" } + # rc.on_passive_zone { puts "Will never be called" } + class SingleZone < Base + # Always returns true, indicating this zone is active. + # + # Returns true. + def fetch_active_zone + true + end + + private + def timer_task + # No-op implementation since no monitoring is needed. + end + end + end +end diff --git a/activesupport/lib/active_support/testing/replication_coordinator.rb b/activesupport/lib/active_support/testing/replication_coordinator.rb new file mode 100644 index 0000000000000..66b82d7ba8d32 --- /dev/null +++ b/activesupport/lib/active_support/testing/replication_coordinator.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +module ActiveSupport + module Testing + # ReplicationCoordinator is a test helper implementing + # ActiveSupport::ReplicationCoordinator::Base that can be used to test the behavior of objects + # that depend on replication state. + class ReplicationCoordinator < ActiveSupport::ReplicationCoordinator::Base + # Returns the number of times #fetch_active_zone has been called. + attr_reader :fetch_count + + # Initializes the replication coordinator with an initial active zone state. + # + # The replication coordinator can be initialized with an initial active zone state using the + # optional +active_zone+ parameter, which defaults to +true+. + def initialize(active_zone = true, **options) + @next_active_zone = active_zone + @fetch_count = 0 + super(**options) + end + + # Sets the value that will next be returned by #fetch_active_zone, simulating an external + # replication state change. + def set_next_active_zone(active_zone) + @next_active_zone = active_zone + end + + def fetch_active_zone # :nodoc: + @fetch_count += 1 + @next_active_zone + end + end + end +end diff --git a/activesupport/test/replication_coordinator_test.rb b/activesupport/test/replication_coordinator_test.rb new file mode 100644 index 0000000000000..d228db350c3f3 --- /dev/null +++ b/activesupport/test/replication_coordinator_test.rb @@ -0,0 +1,232 @@ +# frozen_string_literal: true + +require "active_support/replication_coordinator" +require "active_support/testing/replication_coordinator" +require "active_support/error_reporter/test_helper" + +class ReplicationCoordinatorTest < ActiveSupport::TestCase + test "polling_interval can be set and has a good default" do + klass = Class.new(ActiveSupport::ReplicationCoordinator::Base) do + def fetch_active_zone + true + end + end + + assert_equal 5, klass.new.polling_interval + assert_equal 1, klass.new(polling_interval: 1).polling_interval + end + + test "fetch_active_zone is cached" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(polling_interval: 9999) + rc.start_monitoring + + 10.times { rc.active_zone? } + 10.times { rc.on_active_zone { } } + 10.times { rc.on_passive_zone { } } + + assert_equal 1, rc.fetch_count + end + + test "updated_at is set whenever monitoring calls fetch_active_zone" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(true, polling_interval: 0.01) + rc.active_zone? + original_time = rc.updated_at + + rc.start_monitoring + Timeout.timeout(0.1) { sleep 0.01 while rc.fetch_count < 2 } + rc.stop_monitoring + + assert rc.updated_at > original_time + end + + test "the initial fetch is guarded against a thundering herd" do + rc = Class.new(ActiveSupport::ReplicationCoordinator::Base) do + attr_reader :fetch_count + + def initialize(...) + @fetch_count = 0 + super + end + + def fetch_active_zone + @fetch_count += 1 + sleep 0.1 + true + end + end.new + + 10.times.map { Thread.new { rc.active_zone? } }.map(&:join) + + assert_equal 1, rc.fetch_count + end + + test "the initial active_zone? fetches once" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(true) + assert_equal 0, rc.fetch_count + assert_nil rc.updated_at + + freeze_time do + rc.active_zone? + assert_equal 1, rc.fetch_count + assert_equal Time.now, rc.updated_at + end + end + + test "active_zone? starts monitoring" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(true, polling_interval: 0.01) + + rc.active_zone? + + Timeout.timeout(0.1) { sleep 0.01 while rc.fetch_count < 3 } + assert rc.fetch_count >= 3 + end + + test "the initial on_active_zone fetches once" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(true) + assert_equal 0, rc.fetch_count + assert_nil rc.updated_at + + freeze_time do + rc.on_active_zone { } + assert_equal 1, rc.fetch_count + assert_equal Time.now, rc.updated_at + end + end + + test "on_active_zone hooks are called once, immediately upon registration" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(true) + active_cb_count = passive_cb_count = 0 + + rc.on_active_zone { active_cb_count += 1 } + rc.on_passive_zone { passive_cb_count += 1 } + + assert_equal 1, active_cb_count + assert_equal 0, passive_cb_count + end + + test "on_active_zone starts monitoring" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(true, polling_interval: 0.01) + + rc.on_active_zone { } + + Timeout.timeout(0.1) { sleep 0.01 while rc.fetch_count < 3 } + assert rc.fetch_count >= 3 + end + + test "the initial on_passive_zone fetches once" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(true) + assert_equal 0, rc.fetch_count + assert_nil rc.updated_at + + freeze_time do + rc.on_passive_zone { } + assert_equal 1, rc.fetch_count + assert_equal Time.now, rc.updated_at + end + end + + test "on_passive_zone hooks are called once, immediately upon registration" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(false) + active_cb_count = passive_cb_count = 0 + + rc.on_passive_zone { passive_cb_count += 1 } + rc.on_active_zone { active_cb_count += 1 } + + assert_equal 0, active_cb_count + assert_equal 1, passive_cb_count + end + + test "on_passive_zone starts monitoring" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(false, polling_interval: 0.01) + + rc.on_passive_zone { } + + Timeout.timeout(0.1) { sleep 0.01 while rc.fetch_count < 3 } + assert rc.fetch_count >= 3 + end + + test "hooks are called upon transition while monitoring" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(false, polling_interval: 0.01) + active_cb_count = passive_cb_count = 0 + rc.on_active_zone { active_cb_count += 1 } + rc.on_passive_zone { passive_cb_count += 1 } + + active_cb_count = passive_cb_count = 0 + rc.set_next_active_zone(true) + + Timeout.timeout(0.1) { sleep 0.01 while active_cb_count == 0 } + + assert_equal 1, active_cb_count + assert_equal 0, passive_cb_count + + active_cb_count = passive_cb_count = 0 + rc.set_next_active_zone(false) + + Timeout.timeout(0.1) { sleep 0.01 while passive_cb_count == 0 } + + assert_equal 0, active_cb_count + assert_equal 1, passive_cb_count + ensure + rc&.stop_monitoring + end + + test "errors are logged when raised calling fetch_active_zone from the timer task" do + klass = Class.new(ActiveSupport::ReplicationCoordinator::Base) do + attr_reader :fetch_count + + def initialize(...) + @fetch_count = 0 + super + end + + def fetch_active_zone + @fetch_count += 1 + raise "Simulated exception resolving active zone" if @fetch_count == 3 + true + end + end + + rc = klass.new(polling_interval: 0.01) + + subscriber = ActiveSupport::ErrorReporter::TestHelper::ErrorSubscriber.new + ActiveSupport.error_reporter.subscribe(subscriber) + + rc.start_monitoring + Timeout.timeout(0.1) { sleep 0.01 while rc.fetch_count < 6 } + + assert rc.fetch_count >= 6 # the timer task continues running after an error + + assert_equal 1, subscriber.events.count + assert_equal "Simulated exception resolving active zone", subscriber.events[0][0].message + ensure + rc&.stop_monitoring + end + + test "monitoring will be organically restarted after forking" do + rc = ActiveSupport::Testing::ReplicationCoordinator.new(polling_interval: 0.01) + rc.active_zone? + + out, _ = capture_subprocess_io do + child = Process.fork do + initial_fetch_count = rc.fetch_count + rc.active_zone? + + begin + Timeout.timeout(0.1) { sleep 0.01 while rc.fetch_count < initial_fetch_count + 5 } + puts "OK" + rescue Timeout::Error + puts "FAIL" + end + end + Process.wait child + end + + assert_equal "OK", out.chomp + end + + test "SingleZone can be used for the default always-active behavior" do + rc = ActiveSupport::ReplicationCoordinator::SingleZone.new + assert rc.active_zone? + assert_nil rc.instance_variable_get(:@timer_task) # No timer task is created for SingleZone + end +end diff --git a/guides/source/configuring.md b/guides/source/configuring.md index 009181ac14972..50cfe05e5b908 100644 --- a/guides/source/configuring.md +++ b/guides/source/configuring.md @@ -523,6 +523,15 @@ is `ENV['RAILS_RELATIVE_URL_ROOT']`. Enables or disables reloading of classes only when tracked files change. By default tracks everything on autoload paths and is set to `true`. If `config.enable_reloading` is `false`, this option is ignored. +#### `config.replication_coordinator` + +Set this to configure a custom replication coordinator for your application. A replication +coordinator provides an interface for active/passive state in a deployment across multiple +availability zones. See the [`ActiveSupport::ReplicationCoordinator` API +documentation](https://api.rubyonrails.org/classes/ActiveSupport/ReplicationCoordinator.html). + +Defaults to `ActiveSupport::ReplicationCoordinator::SingleZone.new`. + #### `config.require_master_key` Causes the app to not boot if a master key hasn't been made available through `ENV["RAILS_MASTER_KEY"]` or the `config/master.key` file. diff --git a/railties/lib/rails/application/configuration.rb b/railties/lib/rails/application/configuration.rb index 63b2345ddadbc..7893dc7f36f4a 100644 --- a/railties/lib/rails/application/configuration.rb +++ b/railties/lib/rails/application/configuration.rb @@ -24,7 +24,7 @@ class Configuration < ::Rails::Engine::Configuration :content_security_policy_nonce_auto, :require_master_key, :credentials, :disable_sandbox, :sandbox_by_default, :add_autoload_paths_to_load_path, :rake_eager_load, :server_timing, :log_file_size, - :dom_testing_default_html_version, :yjit + :dom_testing_default_html_version, :yjit, :replication_coordinator attr_reader :encoding, :api_only, :loaded_config_version, :log_level @@ -85,6 +85,7 @@ def initialize(*) @server_timing = false @dom_testing_default_html_version = :html4 @yjit = false + @replication_coordinator = ActiveSupport::ReplicationCoordinator::SingleZone.new end # Loads default configuration values for a target version. This includes