From ce000e74b8aa1f4059da4e791a70289b4ad13dbf Mon Sep 17 00:00:00 2001 From: mahrous-deriv Date: Thu, 21 Apr 2022 07:55:31 +0000 Subject: [PATCH 1/6] WIP --- lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm b/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm index 378c9ed..f4196d0 100644 --- a/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm +++ b/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm @@ -21,7 +21,7 @@ no indirect; __PACKAGE__->register_type('consumer_groups'); use constant RESPONSE_TIMEOUT => $ENV{RPC_QUEUE_RESPONSE_TIMEOUT} // 30; -use constant DEFAULT_CATEGORY_NAME => 'general'; +use constant DEFAULT_CATEGORY_NAME => 'myriad.service.deriv.general.states_list.rpc/request'; use constant REQUIRED_RESPONSE_PARAMETERS => qw(message_id response); =head1 NAME @@ -364,7 +364,7 @@ Subscription will be done only once within first request to backend server. sub wait_for_messages { my ($self) = @_; - $self->{already_waiting} //= $self->redis->subscribe([$self->whoami], $self->redis->curry::weak::on(message => $self->curry::weak::_on_message)); + $self->{already_waiting} //= $self->redis->subscribe(['myriad.'.$self->whoami], $self->redis->curry::weak::on(message => $self->curry::weak::_on_message)); return; } From 0e5b8157f7e59856278e1e047fa810791e2a8853 Mon Sep 17 00:00:00 2001 From: mahrous-deriv Date: Thu, 21 Apr 2022 09:15:43 +0000 Subject: [PATCH 2/6] WIP --- lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm b/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm index f4196d0..e847f93 100644 --- a/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm +++ b/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm @@ -376,6 +376,10 @@ sub _on_message { try { $message = decode_json_utf8($raw_message); + $message->{args} = decode_json_utf8( $message->{args} ); + $message->{response} = decode_json_utf8( $message->{response} ); + $message->{response} = $message->{response}{response}; + $message->{args} = $message->{args}{args}; } catch { my $err = $@; From 5701cd687f561fc81401586c157a03ac79159f2f Mon Sep 17 00:00:00 2001 From: mahrous-deriv Date: Thu, 21 Apr 2022 09:17:13 +0000 Subject: [PATCH 3/6] WIP --- lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm b/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm index e847f93..48e2afb 100644 --- a/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm +++ b/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm @@ -376,10 +376,10 @@ sub _on_message { try { $message = decode_json_utf8($raw_message); - $message->{args} = decode_json_utf8( $message->{args} ); - $message->{response} = decode_json_utf8( $message->{response} ); - $message->{response} = $message->{response}{response}; - $message->{args} = $message->{args}{args}; + $message->{args} = decode_json_utf8( $message->{args} ); + $message->{response} = decode_json_utf8( $message->{response} ); + $message->{response} = $message->{response}{response}; + $message->{args} = $message->{args}{args}; } catch { my $err = $@; From 5e3ddf607907886b3644d01d8b42bcbbe21f557b Mon Sep 17 00:00:00 2001 From: mahrous-deriv Date: Fri, 22 Apr 2022 01:40:33 +0000 Subject: [PATCH 4/6] prepare async response --- lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm b/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm index 48e2afb..d52c936 100644 --- a/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm +++ b/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm @@ -249,6 +249,7 @@ sub call_rpc { } $api_response = $rpc_response_cb->($result->result); + return Future->done if $block_response || !$api_response; $c->send({json => $api_response}, $req_storage); @@ -364,7 +365,8 @@ Subscription will be done only once within first request to backend server. sub wait_for_messages { my ($self) = @_; - $self->{already_waiting} //= $self->redis->subscribe(['myriad.'.$self->whoami], $self->redis->curry::weak::on(message => $self->curry::weak::_on_message)); + $self->{already_waiting} //= + $self->redis->subscribe(['myriad.' . $self->whoami], $self->redis->curry::weak::on(message => $self->curry::weak::_on_message)); return; } @@ -375,11 +377,11 @@ sub _on_message { my $message = {}; try { - $message = decode_json_utf8($raw_message); - $message->{args} = decode_json_utf8( $message->{args} ); - $message->{response} = decode_json_utf8( $message->{response} ); - $message->{response} = $message->{response}{response}; - $message->{args} = $message->{args}{args}; + $message = decode_json_utf8($raw_message); + $message->{args} = decode_json_utf8($message->{args}); + $message->{response} = decode_json_utf8($message->{response}); + $message->{response} = decode_json_utf8($message->{response}); + $message->{response} = {result => $message->{response}{response}}; } catch { my $err = $@; From fc7503e163be3475ea54e1d3926878b416b84c6e Mon Sep 17 00:00:00 2001 From: mahrous-deriv Date: Fri, 22 Apr 2022 03:44:14 +0000 Subject: [PATCH 5/6] Backwards support --- .../WebSocketProxy/Backend/ConsumerGroups.pm | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm b/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm index d52c936..be767eb 100644 --- a/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm +++ b/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm @@ -21,7 +21,7 @@ no indirect; __PACKAGE__->register_type('consumer_groups'); use constant RESPONSE_TIMEOUT => $ENV{RPC_QUEUE_RESPONSE_TIMEOUT} // 30; -use constant DEFAULT_CATEGORY_NAME => 'myriad.service.deriv.general.states_list.rpc/request'; +use constant DEFAULT_CATEGORY_NAME => 'general'; use constant REQUIRED_RESPONSE_PARAMETERS => qw(message_id response); =head1 NAME @@ -366,7 +366,8 @@ Subscription will be done only once within first request to backend server. sub wait_for_messages { my ($self) = @_; $self->{already_waiting} //= - $self->redis->subscribe(['myriad.' . $self->whoami], $self->redis->curry::weak::on(message => $self->curry::weak::_on_message)); + $self->redis->subscribe(['myriad.' . $self->whoami, $self->whoami], + $self->redis->curry::weak::on(message => $self->curry::weak::_on_message)); return; } @@ -377,11 +378,12 @@ sub _on_message { my $message = {}; try { - $message = decode_json_utf8($raw_message); - $message->{args} = decode_json_utf8($message->{args}); - $message->{response} = decode_json_utf8($message->{response}); - $message->{response} = decode_json_utf8($message->{response}); - $message->{response} = {result => $message->{response}{response}}; + $message = decode_json_utf8($raw_message); + if (ref $message->{args} ne 'HASH') { + $message->{args} = decode_json_utf8($message->{args}); + $message->{response} = decode_json_utf8($message->{response}); + $message->{response} = {result => $message->{response}{response}}; + } } catch { my $err = $@; From 0da1decceddb8438a439a404d97986f31e7d9546 Mon Sep 17 00:00:00 2001 From: Mahrous Amer <81215999+mahrous-deriv@users.noreply.github.com> Date: Tue, 26 Apr 2022 14:12:22 +0800 Subject: [PATCH 6/6] Update ConsumerGroups.pm --- lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm b/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm index be767eb..4d95363 100644 --- a/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm +++ b/lib/Mojo/WebSocketProxy/Backend/ConsumerGroups.pm @@ -249,7 +249,6 @@ sub call_rpc { } $api_response = $rpc_response_cb->($result->result); - return Future->done if $block_response || !$api_response; $c->send({json => $api_response}, $req_storage);