diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index 6355a62..76f940a 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -98,6 +98,7 @@ def initialize(uri, socket_factory: nil) @uri = URI(uri) @stopped = Concurrent::AtomicBoolean.new(false) + @shutdown = Concurrent::Event.new @headers = headers.clone @connect_timeout = connect_timeout @@ -141,7 +142,8 @@ def initialize(uri, yield self if block_given? - Thread.new { run_stream }.name = 'LD/SSEClient' + @thread = Thread.new { run_stream } + @thread.name = 'LD/SSEClient' end # @@ -185,9 +187,31 @@ def on_error(&action) # has no effect if called a second time. # def close - if @stopped.make_true - reset_http - end + reset_http if @stopped.make_true + end + + # + # Permanently shuts down the client and its connection, and kills the background worker thread. No further events will be dispatched. This + # has no effect if called a second time. + # + def kill + close + @thread&.kill + end + + # + # Permanently shuts down the client and its connection, and waits until shutdown is complete. No further events will be dispatched. This + # has no effect if called a second time. + # + # If a timeout is specified, the method will return after that amount of time has passed, even if the shutdown is + # not complete. + # + # @param timeout [Float] (nil) maximum time to wait for shutdown, in seconds + # @return [Boolean] true if the shutdown completed successfully, false if it timed out + # + def close_and_wait(timeout = nil) + close + @shutdown.wait(timeout) end # @@ -230,7 +254,8 @@ def run_stream end # There's a potential race if close was called in the middle of the previous line, i.e. after we # connected but before @cxn was set. Checking the variable again is a bit clunky but avoids that. - return if @stopped.value + break if @stopped.value + read_stream(resp) if !resp.nil? rescue => e # When we deliberately close the connection, it will usually trigger an exception. The exact type @@ -241,12 +266,15 @@ def run_stream log_and_dispatch_error(e, "Unexpected error from event source") end end + begin reset_http rescue StandardError => e log_and_dispatch_error(e, "Unexpected error while closing stream") end end + + @shutdown.set end # Try to establish a streaming connection. Returns the StreamingHTTPConnection object if successful. diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 51f516d..b5ab1bc 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -31,10 +31,11 @@ def with_client(client) begin yield client ensure - client.close + raise 'Failed to close within timeout' unless client.close_and_wait(1) end end + def send_stream_content(res, content, keep_open:) res.content_type = "text/event-stream" res.status = 200 @@ -57,7 +58,7 @@ def send_stream_content(res, content, keep_open:) requests << req send_stream_content(res, "", keep_open: true) end - + headers = { "Authorization" => "secret" } with_client(subject.new(server.base_uri, headers: headers)) do |client| @@ -82,7 +83,7 @@ def send_stream_content(res, content, keep_open:) requests << req send_stream_content(res, "", keep_open: true) end - + headers = { "Authorization" => "secret" } with_client(subject.new(server.base_uri, headers: headers, last_event_id: id)) do |client| @@ -438,7 +439,7 @@ def send_stream_content(res, content, keep_open:) server.setup_response("/") do |req,res| send_stream_content(res, "", keep_open: true) end - + with_client(subject.new(server.base_uri)) do |client| expect(client.closed?).to be(false)