Skip to content

Commit

Permalink
Add prometheus event collection
Browse files Browse the repository at this point in the history
  • Loading branch information
Mooli Tayer committed Jul 5, 2017
1 parent da53ce7 commit c12f52f
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module ManageIQ::Providers
class Kubernetes::MonitoringManager < ManageIQ::Providers::MonitoringManager
require_nested :EventCatcher

include ManageIQ::Providers::Kubernetes::MonitoringManagerMixin

belongs_to :parent_manager,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class ManageIQ::Providers::Kubernetes::MonitoringManager::EventCatcher < ManageIQ::Providers::BaseManager::EventCatcher
require_nested :Runner

def self.ems_class
ManageIQ::Providers::Kubernetes::MonitoringManager
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
class ManageIQ::Providers::Kubernetes::MonitoringManager::EventCatcher::PrometheusEventMonitor
def initialize(ems)
@ems = ems
end

def start
@collecting_events = true
end

def stop
@collecting_events = false
end

def each_batch
while @collecting_events
yield fetch
end
rescue EOFError => err
$cn_monitoring_log.info("Monitoring connection closed #{err}")
end

def fetch
unless @current_generation
@current_generation, @current_index = last_position
end

# TODO: exception if no endpoint
response = @ems.connect.get do |req|
req.params['generationID'] = @current_generation
req.params['fromIndex'] = @current_index
end
# {
# "generationID":"323e0863-f501-4896-b7dc-353cf863597d",
# "messages":[
# ...
# ]
# }
# TODO: raise exception unless success
alert_list = response.body
$cn_monitoring_log.info("Got [#{alert_list['messages'].size}] new Alerts")
return if alert_list['messages'].blank?
@current_generation = alert_list["generationID"]
@current_index = alert_list['messages'].last['index'] + 1
events = alert_list["messages"]
events = events.select { |alert| alert.fetch_path("data", "commonLabels", "job") == 'kubernetes-nodes'}
events.each { |e| e['generationID'] = @current_generation }
events
end

def last_position
last_event = @ems.ems_events.last || OpenStruct.new(:full_data => {})
last_index = last_event.full_data['index']
[
last_event.full_data['generationID'].to_s,
last_index ? last_index + 1 : 0
]
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class ManageIQ::Providers::Kubernetes::MonitoringManager::EventCatcher::Runner < ManageIQ::Providers::BaseManager::EventCatcher::Runner
include ManageIQ::Providers::Kubernetes::MonitoringManager::RunnerMixin
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
module ManageIQ::Providers::Kubernetes::MonitoringManager::RunnerMixin
# TODO: this should really be inside event catcher, but I can't get it to work
extend ActiveSupport::Concern

# This module is shared between:
# - Kubernetes::MonitoringManager::EventCatcher
# - Openshift::MonitoringManager::EventCatcher

def event_monitor_handle
@event_monitor_handle ||= ManageIQ::Providers::Kubernetes::MonitoringManager::EventCatcher::PrometheusEventMonitor.new(@ems)
end

def reset_event_monitor_handle
@event_monitor_handle = nil
end

def stop_event_monitor
@event_monitor_handle.stop unless @event_monitor_handle.nil?
rescue => err
$cm_monitoring_log.error("Event Monitor error [#{err.message}]")
$cm_monitoring_log.error("Error details: [#{err.details}]")
$cm_monitoring_log.log_backtrace(err)
ensure
reset_event_monitor_handle
end

def monitor_events
event_monitor_handle.start
event_monitor_running
event_monitor_handle.each_batch do |events|
@queue.enq events
# TODO: mark all events not retrieved as resolved
sleep_poll_normal
end

ensure
reset_event_monitor_handle
end

def queue_event(event)
event_hash = extract_event_data(event)
if event_hash
$cn_monitoring_log.info("Queuing event [#{event_hash}]")
EmsEvent.add_queue('add', @cfg[:ems_id], event_hash)
end
end

# Returns hash, or nil if event should be discarded.
def extract_event_data(event)
# EXAMPLE:
#
# {
# "generationID": '3f8e1781-b755-4f6a-8855-94eb20b00dc6',
# "index":1,
# "timestamp":"2017-06-26T18:24:37.821275471+03:00",
# "data":{
# "alerts":[
# {
# "annotations":{
# "severity":"error",
# "url":"https://www.youtube.com/watch?v=dQw4w9WgXcQ"
# },
# "endsAt":"0001-01-01T00:00:00Z",
# "generatorURL":"http://dhcp-3-157.tlv.redhat.com:9090/graph?g0.expr=container_fs_usage_bytes%7Bcontainer_name%3D%22%22%2Cdevice%3D%22%2Fdev%2Fmapper%2Fvg0-lv_root%22%7D+%3E+4000000000\u0026g0.tab=0",
# "labels":{
# "alertname":"Node_high_usage_on_vg0_lvroot__info",
# "device":"/dev/mapper/vg0-lv_root","id":"/",
# "instance":"vm-48-45.eng.lab.tlv.redhat.com",
# "job":"kubernetes-nodes",
# "monitor":"codelab-monitor"
# },
# "startsAt":"2017-06-26T18:24:07.803+03:00",
# "status":"firing"
# }
# ],
# "commonAnnotations":{
# "severity":"error",
# "url":"https://www.youtube.com/watch?v=dQw4w9WgXcQ"
# },
# "commonLabels":{
# "alertname":"Node_high_usage_on_vg0_lvroot__info",
# "device":"/dev/mapper/vg0-lv_root","id":"/",
# "instance":"vm-48-45.eng.lab.tlv.redhat.com",
# "job":"kubernetes-nodes",
# "monitor":"codelab-monitor"
# },
# "externalURL":"http://dhcp-3-157.tlv.redhat.com:9093",
# "groupKey":"{}:{}",
# "groupLabels":{},
# "receiver":"message-buffer-wh",
# "status":"firing|resolved",
# "version":"4"
# }
# }
event = event.dup

annotations, labels = event["data"]["commonAnnotations"], event["data"]["commonLabels"]
severity, event[:url] = annotations['severity'], annotations['url']

event[:severity] = parse_severity(severity)
event[:resolved] = event["data"]["status"] == 'resolved'
event[:ems_ref] = genarate_incident_identifier(event, labels)

target = find_target(labels)
{
:ems_id => @cfg[:ems_id],
:source => 'DATAWAREHOUSE',
:timestamp => Time.parse(event["timestamp"]),
:event_type => 'datawarehouse_alert',
:target_type => target.class.name,
:target_id => target.id,
:container_node_id => target.id,
:container_node_name => target.name,
:message => annotations['message'],
:full_data => event.to_h
}
end

def find_target(labels)
instance = ContainerNode.find_by(:name => labels['instance'], :ems_id => @cfg[:ems_id])
$cm_monitoring_log.error("Could not find alert target from labels: [#{labels}]") unless instance
instance
end

def parse_severity(severity)
MiqAlertStatus::SEVERITY_LEVELS.find { |x| x == severity.downcase} || 'error'
end

def genarate_incident_identifier(event, labels)
# When event b resolves event a, they both have the same startAt.
# Labels are added to avoid having two incidents starting at the same time.
"#{event["data"]["alerts"][0]["startsAt"]}_#{labels.hash}"
end
end
2 changes: 2 additions & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
:event_catcher:
:event_catcher_kubernetes:
:poll: 1.seconds
:event_catcher_monitoring:
:poll: 30.seconds
:queue_worker_base:
:ems_metrics_collector_worker:
:ems_metrics_collector_worker_kubernetes:
Expand Down

0 comments on commit c12f52f

Please sign in to comment.