From dbdd226c74081f8f1e478a99f82af5c39cc4fa4a Mon Sep 17 00:00:00 2001 From: gaojun Date: Thu, 3 Apr 2025 16:05:18 +0800 Subject: [PATCH] Add ability to POST an SSE request --- lib/ld-eventsource/client.rb | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index 6355a62..7969624 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -85,6 +85,10 @@ class Client # if you want to use something other than the default `TCPSocket`; it must implement # `open(uri, timeout)` to return a connected `Socket` # @yieldparam [Client] client the new client instance, before opening the connection + # @payload payload [String | Hash | Array] (nil) optional request payload. If payload is + # provided, a POST request will be used, instead of a GET request. If payload is a Hash or + # an Array, it will be converted to JSON and sent as the request body. Also, reconnection + # is disabled if payload is set. # def initialize(uri, headers: {}, @@ -95,13 +99,15 @@ def initialize(uri, last_event_id: nil, proxy: nil, logger: nil, - socket_factory: nil) + socket_factory: nil, + payload: nil) @uri = URI(uri) @stopped = Concurrent::AtomicBoolean.new(false) @headers = headers.clone @connect_timeout = connect_timeout @read_timeout = read_timeout + @payload = payload @logger = logger || default_logger http_client_options = {} if socket_factory @@ -243,6 +249,8 @@ def run_stream end begin reset_http + # When we post request with payload, reconnection should be avoided. + close if @payload rescue StandardError => e log_and_dispatch_error(e, "Unexpected error while closing stream") end @@ -262,9 +270,8 @@ def connect cxn = nil begin @logger.info { "Connecting to event stream at #{@uri}" } - cxn = @http_client.request("GET", @uri, { - headers: build_headers - }) + verb = @payload ? "POST" : "GET" + cxn = @http_client.request(verb, @uri, build_opts) if cxn.status.code == 200 content_type = cxn.content_type.mime_type if content_type && content_type.start_with?("text/event-stream") @@ -358,5 +365,15 @@ def build_headers h['Last-Event-Id'] = @last_id if !@last_id.nil? && @last_id != "" h.merge(@headers) end + + def build_opts + return {headers: build_headers} if @payload.nil? + + if @payload.is_a?(Hash) || @payload.is_a?(Array) + {headers: build_headers, json: @payload} + else + {headers: build_headers, body: @payload.to_s} + end + end end end