Skip to content

Commit

Permalink
Merge pull request #1503 from senid231/1482-pgq-cdr-http-batch
Browse files Browse the repository at this point in the history
add CdrHttpBatch PGQ Processor, add data filters to CdrHttp/CdrHttpBatch PGQ Processors
  • Loading branch information
dmitry-sinina authored Jul 12, 2024
2 parents fd664da + f935c90 commit 4318618
Show file tree
Hide file tree
Showing 8 changed files with 952 additions and 10 deletions.
88 changes: 88 additions & 0 deletions pgq-processors/lib/event_filter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# frozen_string_literal: true

class EventFilter
OPERATORS = %i[
eq not_eq
start_with end_with contains
gt lt gte lte
in not_in
null not_null
true false
].freeze

attr_reader :field, :op, :value

def initialize(field:, op:, value:)
@field = field.to_s
@op = op.to_sym
raise ArgumentError, "Invalid operator: #{@op}" unless OPERATORS.include?(@op)

@value = value
end

def match?(event)
send("match_#{op}?", event.transform_keys(&:to_s))
end

private

def match_eq?(event)
event[field] == value
end

def match_not_eq?(event)
event[field] != value
end

def match_start_with?(event)
event[field].to_s.start_with?(value.to_s)
end

def match_end_with?(event)
event[field].to_s.end_with?(value.to_s)
end

def match_contains?(event)
event[field].to_s.include?(value.to_s)
end

def match_gt?(event)
!event[field].nil? && event[field] > value
end

def match_lt?(event)
!event[field].nil? && event[field] < value
end

def match_gte?(event)
!event[field].nil? && event[field] >= value
end

def match_lte?(event)
!event[field].nil? && event[field] <= value
end

def match_in?(event)
value.include?(event[field])
end

def match_not_in?(event)
value.exclude?(event[field])
end

def match_null?(event)
event[field].nil?
end

def match_not_null?(event)
!event[field].nil?
end

def match_true?(event)
event[field] == true
end

def match_false?(event)
event[field] == false
end
end
1 change: 1 addition & 0 deletions pgq-processors/pgq_env.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
require_relative 'lib/json_each_row_coder'
require_relative 'lib/shutdown'
require_relative 'lib/amqp_factory'
require_relative 'lib/event_filter'

ENV['ROOT_PATH'] ||= Dir.getwd

Expand Down
18 changes: 18 additions & 0 deletions pgq-processors/processors/cdr_http_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,36 @@
class CdrHttpBase < Pgq::ConsumerGroup
AVAILABLE_HTTP_METHODS = %i[post put get patch].freeze

def initialize(...)
super(...)
# data_filters: [{field:, op:, value:}]
if @params['data_filters'].present?
@data_filters = @params['data_filters'].map { |opts| ::EventFilter.new(**opts.transform_keys(&:to_sym)) }
else
@data_filters = nil
end
end

def perform_events(events)
perform_group events.map(&:data)
end

def perform_group(events)
events.each do |event|
next unless send_event?(event)

perform_http_request permit_field_for(event)
end
end

private

def send_event?(event)
return true if @data_filters.nil?

@data_filters.all? { |filter| filter.match?(event) }
end

def http_method
:post
end
Expand Down
22 changes: 22 additions & 0 deletions pgq-processors/processors/cdr_http_batch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

require 'rest-client'
require_relative 'cdr_http_base'

class CdrHttpBatch < CdrHttpBase
@consumer_name = 'cdr_http'

def perform_group(events)
events_to_send = events.select { |event| send_event?(event) }
return if events_to_send.empty?

permitted_events = events_to_send.map { |event| permit_field_for(event) }
perform_http_request(permitted_events)
end

private

def http_body(events)
{ data: events }.to_json
end
end
Loading

0 comments on commit 4318618

Please sign in to comment.