From e58c181a039c659fdef02946456a84f0351b0564 Mon Sep 17 00:00:00 2001 From: Kirill Usanov Date: Fri, 18 Apr 2025 09:54:55 +0300 Subject: [PATCH 1/3] Add thread reader to client --- lib/ld-eventsource/client.rb | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index 6355a62..cdb58ea 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -141,9 +141,12 @@ def initialize(uri, yield self if block_given? - Thread.new { run_stream }.name = 'LD/SSEClient' + @thread = Thread.new { run_stream } + @thread.name = 'LD/SSEClient' end + attr_reader :thread + # # Specifies a block or Proc to receive events from the stream. This will be called once for every # valid event received, with a single parameter of type {StreamEvent}. It is called from the same From dc67fc7953566b16604af589e3de3e4bb7f0918f Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Fri, 18 Apr 2025 10:23:58 -0400 Subject: [PATCH 2/3] Add `close_and_wait` method to client --- lib/ld-eventsource/client.rb | 31 +++++++++++++++++++++++-------- spec/client_spec.rb | 9 +++++---- 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index cdb58ea..d27b8be 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -98,6 +98,7 @@ def initialize(uri, socket_factory: nil) @uri = URI(uri) @stopped = Concurrent::AtomicBoolean.new(false) + @shutdown = Concurrent::Event.new @headers = headers.clone @connect_timeout = connect_timeout @@ -141,12 +142,9 @@ def initialize(uri, yield self if block_given? - @thread = Thread.new { run_stream } - @thread.name = 'LD/SSEClient' + Thread.new { run_stream }.name = 'LD/SSEClient' end - attr_reader :thread - # # Specifies a block or Proc to receive events from the stream. This will be called once for every # valid event received, with a single parameter of type {StreamEvent}. It is called from the same @@ -188,9 +186,22 @@ def on_error(&action) # has no effect if called a second time. # def close - if @stopped.make_true - reset_http - end + reset_http if @stopped.make_true + end + + # + # Permanently shuts down the client and its connection, and waits until shutdown is complete. No further events will be dispatched. This + # has no effect if called a second time. + # + # If a timeout is specified, the method will return after that amount of time has passed, even if the shutdown is + # not complete. + # + # @param timeout [Float] (nil) maximum time to wait for shutdown, in seconds + # @return [Boolean] true if the shutdown completed successfully, false if it timed out + # + def close_and_wait(timeout = nil) + close + @shutdown.wait(timeout) end # @@ -233,7 +244,8 @@ def run_stream end # There's a potential race if close was called in the middle of the previous line, i.e. after we # connected but before @cxn was set. Checking the variable again is a bit clunky but avoids that. - return if @stopped.value + break if @stopped.value + read_stream(resp) if !resp.nil? rescue => e # When we deliberately close the connection, it will usually trigger an exception. The exact type @@ -244,12 +256,15 @@ def run_stream log_and_dispatch_error(e, "Unexpected error from event source") end end + begin reset_http rescue StandardError => e log_and_dispatch_error(e, "Unexpected error while closing stream") end end + + @shutdown.set end # Try to establish a streaming connection. Returns the StreamingHTTPConnection object if successful. diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 51f516d..b5ab1bc 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -31,10 +31,11 @@ def with_client(client) begin yield client ensure - client.close + raise 'Failed to close within timeout' unless client.close_and_wait(1) end end + def send_stream_content(res, content, keep_open:) res.content_type = "text/event-stream" res.status = 200 @@ -57,7 +58,7 @@ def send_stream_content(res, content, keep_open:) requests << req send_stream_content(res, "", keep_open: true) end - + headers = { "Authorization" => "secret" } with_client(subject.new(server.base_uri, headers: headers)) do |client| @@ -82,7 +83,7 @@ def send_stream_content(res, content, keep_open:) requests << req send_stream_content(res, "", keep_open: true) end - + headers = { "Authorization" => "secret" } with_client(subject.new(server.base_uri, headers: headers, last_event_id: id)) do |client| @@ -438,7 +439,7 @@ def send_stream_content(res, content, keep_open:) server.setup_response("/") do |req,res| send_stream_content(res, "", keep_open: true) end - + with_client(subject.new(server.base_uri)) do |client| expect(client.closed?).to be(false) From 803c13b906c7f7b4fb57446e6a50126e763f4f96 Mon Sep 17 00:00:00 2001 From: Kirill Usanov Date: Fri, 16 May 2025 11:32:19 +0300 Subject: [PATCH 3/3] Added SSE::Client#kill method to immediately kill the worker thread --- lib/ld-eventsource/client.rb | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index d27b8be..76f940a 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -142,7 +142,8 @@ def initialize(uri, yield self if block_given? - Thread.new { run_stream }.name = 'LD/SSEClient' + @thread = Thread.new { run_stream } + @thread.name = 'LD/SSEClient' end # @@ -189,6 +190,15 @@ def close reset_http if @stopped.make_true end + # + # Permanently shuts down the client and its connection, and kills the background worker thread. No further events will be dispatched. This + # has no effect if called a second time. + # + def kill + close + @thread&.kill + end + # # Permanently shuts down the client and its connection, and waits until shutdown is complete. No further events will be dispatched. This # has no effect if called a second time.