diff --git a/app/models/manager_refresh/inventory_collection.rb b/app/models/manager_refresh/inventory_collection.rb index 5e84ab8ccd3..8d635f4356c 100644 --- a/app/models/manager_refresh/inventory_collection.rb +++ b/app/models/manager_refresh/inventory_collection.rb @@ -685,6 +685,8 @@ def stringify_reference(reference) end def manager_ref_to_cols + # TODO(lsmola) this should contain the polymorphic _type, otherwise the IC with polymorphic unique key will get + # conflicts # Convert attributes from unique key to actual db cols manager_ref.map do |ref| association_to_foreign_key_mapping[ref] || ref @@ -911,6 +913,14 @@ def fixed_foreign_keys @fixed_foreign_keys_cache end + def serializable_keys?(all_attribute_keys) + return @serializable_keys_cache unless @serializable_keys_cache.nil? + + @serializable_keys_cache = all_attribute_keys.any? do |key| + model_class.type_for_attribute(key.to_s).respond_to?(:coder) + end + end + def base_class_name return "" unless model_class @@ -986,10 +996,12 @@ def db_collection_for_comparison end def db_collection_for_comparison_for(manager_uuids_set) + # TODO(lsmola) this should have the build_multi_selection_condition, like in the method above full_collection_for_comparison.where(manager_ref.first => manager_uuids_set) end def db_collection_for_comparison_for_complement_of(manager_uuids_set) + # TODO(lsmola) this should have the build_multi_selection_condition, like in the method above full_collection_for_comparison.where.not(manager_ref.first => manager_uuids_set) end diff --git a/app/models/manager_refresh/inventory_object.rb b/app/models/manager_refresh/inventory_object.rb index 330dd9cae04..d95d70a8ae4 100644 --- a/app/models/manager_refresh/inventory_object.rb +++ b/app/models/manager_refresh/inventory_object.rb @@ -15,6 +15,8 @@ def initialize(inventory_collection, data) end def manager_uuid + # TODO(lsmola) should we have a separate function for uuid containing foreign keys? Probably yes, since it could + # speed up the ID fetching. id_with_keys(manager_ref) end @@ -50,36 +52,36 @@ def attributes(inventory_collection_scope = nil) data[key] = value.compact.map(&:load).compact # We can use built in _ids methods to assign array of ids into has_many relations. So e.g. the :key_pairs= # relation setter will become :key_pair_ids= - attributes_for_saving[key.to_s.singularize + "_ids"] = data[key].map(&:id).compact.uniq + attributes_for_saving[(key.to_s.singularize + "_ids").to_sym] = data[key].map(&:id).compact.uniq elsif loadable?(value) || inventory_collection_scope.association_to_foreign_key_mapping[key] # Lets fill also the original data, so other InventoryObject referring to this attribute gets the right # result data[key] = value.load if value.respond_to?(:load) if (foreign_key = inventory_collection_scope.association_to_foreign_key_mapping[key]) # We have an association to fill, lets fill also the :key, cause some other InventoryObject can refer to it - record_id = data[key].try(:id) - attributes_for_saving[foreign_key] = record_id + record_id = data[key].try(:id) + attributes_for_saving[foreign_key.to_sym] = record_id if (foreign_type = inventory_collection_scope.association_to_foreign_type_mapping[key]) # If we have a polymorphic association, we need to also fill a base class name, but we want to nullify it # if record_id is missing base_class = data[key].try(:base_class_name) || data[key].class.try(:base_class).try(:name) - attributes_for_saving[foreign_type] = record_id ? base_class : nil + attributes_for_saving[foreign_type.to_sym] = record_id ? base_class : nil end elsif data[key].kind_of?(::ManagerRefresh::InventoryObject) # We have an association to fill but not an Activerecord association, so e.g. Ancestry, lets just load # it here. This way of storing ancestry is ineffective in DB call count, but RAM friendly - attributes_for_saving[key] = data[key].base_class_name.constantize.find_by(:id => data[key].id) + attributes_for_saving[key.to_sym] = data[key].base_class_name.constantize.find_by(:id => data[key].id) else # We have a normal attribute to fill - attributes_for_saving[key] = data[key] + attributes_for_saving[key.to_sym] = data[key] end else - attributes_for_saving[key] = value + attributes_for_saving[key.to_sym] = value end end - attributes_for_saving.symbolize_keys + attributes_for_saving end def assign_attributes(attributes) diff --git a/app/models/manager_refresh/save_collection/saver/base.rb b/app/models/manager_refresh/save_collection/saver/base.rb index 7d2baa96099..d39713cedf3 100644 --- a/app/models/manager_refresh/save_collection/saver/base.rb +++ b/app/models/manager_refresh/save_collection/saver/base.rb @@ -4,15 +4,33 @@ class Base include Vmdb::Logging include ManagerRefresh::SaveCollection::Saver::SqlHelper - attr_reader :inventory_collection + attr_reader :inventory_collection, :association def initialize(inventory_collection) @inventory_collection = inventory_collection # Private attrs - @unique_index_keys = inventory_collection.manager_ref_to_cols + @primary_key = inventory_collection.model_class.primary_key + @arel_primary_key = inventory_collection.model_class.arel_attribute(primary_key) + @unique_index_keys = inventory_collection.manager_ref_to_cols.map(&:to_sym) + @unique_index_keys_to_s = inventory_collection.manager_ref_to_cols.map(&:to_s) + @select_keys = [@primary_key] + @unique_index_keys_to_s @unique_db_primary_keys = Set.new @unique_db_indexes = Set.new + + # TODO(lsmola) do I need to reload every time? Also it should be enough to clear the associations. + inventory_collection.parent.reload if inventory_collection.parent + @association = inventory_collection.db_collection_for_comparison + + # Right now ApplicationRecordIterator in association is used for targeted refresh. Given the small amount of + # records flowing through there, we probably don't need to optimize that association to fetch a pure SQL. + @pure_sql_records_fetching = !inventory_collection.use_ar_object? && !@association.kind_of?(ManagerRefresh::ApplicationRecordIterator) + + @batch_size_for_persisting = 10_000 + + @batch_size = @pure_sql_records_fetching ? @batch_size_for_persisting : inventory_collection.batch_size + @record_key_method = @pure_sql_records_fetching ? :pure_sql_record_key : :ar_record_key + @select_keys_indexes = @select_keys.each_with_object({}).with_index { |(key, obj), index| obj[key.to_s] = index } end def save_inventory_collection! @@ -22,16 +40,14 @@ def save_inventory_collection! # job. We want to do 1 :delete_complement job at 1 time, to keep to memory down. return delete_complement if inventory_collection.all_manager_uuids.present? - # TODO(lsmola) do I need to reload every time? Also it should be enough to clear the associations. - inventory_collection.parent.reload if inventory_collection.parent - association = inventory_collection.db_collection_for_comparison - save!(association) end private - attr_reader :unique_index_keys, :unique_db_primary_keys, :unique_db_indexes + attr_reader :unique_index_keys, :unique_index_keys_to_s, :select_keys, :unique_db_primary_keys, :unique_db_indexes, + :primary_key, :arel_primary_key, :record_key_method, :pure_sql_records_fetching, :select_keys_indexes, + :batch_size, :batch_size_for_persisting def save!(association) attributes_index = {} @@ -50,7 +66,7 @@ def save!(association) association.find_each do |record| index = inventory_collection.object_index_with_keys(unique_index_keys, record) - next unless assert_distinct_relation(record) + next unless assert_distinct_relation(record.id) next unless assert_unique_record(record, index) inventory_object = inventory_objects_index.delete(index) @@ -62,7 +78,7 @@ def save!(association) delete_record!(record) if inventory_collection.delete_allowed? else # Record was found in the DB and sent for saving, we will be updating the DB. - update_record!(record, hash, inventory_object) if assert_referential_integrity(hash, inventory_object) + update_record!(record, hash, inventory_object) if assert_referential_integrity(hash) end end end @@ -77,7 +93,7 @@ def save!(association) inventory_objects_index.each do |index, inventory_object| hash = attributes_index.delete(index) - create_record!(hash, inventory_object) if assert_referential_integrity(hash, inventory_object) + create_record!(hash, inventory_object) if assert_referential_integrity(hash) end end end @@ -94,8 +110,8 @@ def inventory_collection_details "strategy: #{inventory_collection.strategy}, saver_strategy: #{inventory_collection.saver_strategy}, targeted: #{inventory_collection.targeted?}" end - def batch_size - inventory_collection.batch_size + def record_key(record, key) + record.public_send(key) end def delete_complement @@ -132,8 +148,8 @@ def assert_unique_record(_record, _index) true end - def assert_distinct_relation(record) - if unique_db_primary_keys.include?(record.id) # Include on Set is O(1) + def assert_distinct_relation(primary_key_value) + if unique_db_primary_keys.include?(primary_key_value) # Include on Set is O(1) # Change the InventoryCollection's :association or :arel parameter to return distinct results. The :through # relations can return the same record multiple times. We don't want to do SELECT DISTINCT by default, since # it can be very slow. @@ -145,17 +161,17 @@ def assert_distinct_relation(record) raise("Please update :association or :arel for #{inventory_collection} to return a DISTINCT result. ") end else - unique_db_primary_keys << record.id + unique_db_primary_keys << primary_key_value end true end - def assert_referential_integrity(hash, inventory_object) - inventory_object.inventory_collection.fixed_foreign_keys.each do |x| - next unless hash[x].blank? - subject = "#{inventory_object} of #{inventory_object.inventory_collection} because of missing foreign key #{x} for "\ - "#{inventory_object.inventory_collection.parent.class.name}:"\ - "#{inventory_object.inventory_collection.parent.try(:id)}" + def assert_referential_integrity(hash) + inventory_collection.fixed_foreign_keys.each do |x| + next unless hash[x].nil? + subject = "#{hash} of #{inventory_collection} because of missing foreign key #{x} for "\ + "#{inventory_collection.parent.class.name}:"\ + "#{inventory_collection.parent.try(:id)}" if Rails.env.production? _log.warn("Referential integrity check violated, ignoring #{subject}") return false @@ -186,7 +202,7 @@ def assign_attributes_for_update!(hash, update_time) end def assign_attributes_for_create!(hash, create_time) - hash[:type] = inventory_collection.model_class.name if inventory_collection.supports_sti? && hash[:type].blank? + hash[:type] = inventory_collection.model_class.name if inventory_collection.supports_sti? && hash[:type].nil? hash[:created_on] = create_time if inventory_collection.supports_timestamps_on_variant? hash[:created_at] = create_time if inventory_collection.supports_timestamps_at_variant? assign_attributes_for_update!(hash, create_time) diff --git a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb index 7e56cf2385e..aec05a35962 100644 --- a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb +++ b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb @@ -3,6 +3,47 @@ module Saver class ConcurrentSafeBatch < ManagerRefresh::SaveCollection::Saver::Base private + def record_key(record, key) + send(record_key_method, record, key) + end + + def ar_record_key(record, key) + record.public_send(key) + end + + def pure_sql_record_key(record, key) + record[select_keys_indexes[key]] + end + + def batch_iterator(association) + if pure_sql_records_fetching + # Building fast iterator doing pure SQL query and therefore avoiding redundant creation of AR objects. The + # iterator responds to find_in_batches, so it acts like the AR relation. For targeted refresh, the association + # can already be ApplicationRecordIterator, so we will skip that. + pure_sql_iterator = lambda do |&block| + primary_key_offset = nil + loop do + relation = association.select(*select_keys) + .reorder("#{primary_key} ASC") + .limit(batch_size) + # Using rails way of comparing primary key instead of offset + relation = relation.where(arel_primary_key.gt(primary_key_offset)) if primary_key_offset + records = get_connection.query(relation.to_sql) + last_record = records.last + block.call(records) + + break if records.size < batch_size + primary_key_offset = record_key(last_record, primary_key) + end + end + + ManagerRefresh::ApplicationRecordIterator.new(:iterator => pure_sql_iterator) + else + # Normal Rails relation where we can call find_in_batches + association + end + end + def save!(association) attributes_index = {} inventory_objects_index = {} @@ -22,19 +63,43 @@ def save!(association) all_attribute_keys += [:created_at, :updated_at] if inventory_collection.supports_timestamps_at_variant? _log.info("*************** PROCESSING #{inventory_collection} of size #{inventory_collection.size} *************") - hashes_for_update = [] + + # TODO(lsmola) needed only because UPDATE FROM VALUES needs a specific PG typecasting, remove when fixed in PG + collect_pg_types!(all_attribute_keys) + update_or_destroy_records!(batch_iterator(association), inventory_objects_index, attributes_index, all_attribute_keys) + + all_attribute_keys << :type if inventory_collection.supports_sti? + # Records that were not found in the DB but sent for saving, we will be creating these in the DB. + if inventory_collection.create_allowed? + inventory_objects_index.each_slice(batch_size_for_persisting) do |batch| + create_records!(all_attribute_keys, batch, attributes_index) + end + end + _log.info("*************** PROCESSED #{inventory_collection}, "\ + "created=#{inventory_collection.created_records.count}, "\ + "updated=#{inventory_collection.updated_records.count}, "\ + "deleted=#{inventory_collection.deleted_records.count} *************") + rescue => e + _log.error("Error when saving #{inventory_collection} with #{inventory_collection_details}. Message: #{e.message}") + raise e + end + + def update_or_destroy_records!(records_batch_iterator, inventory_objects_index, attributes_index, all_attribute_keys) + hashes_for_update = [] records_for_destroy = [] - # Records that are in the DB, we will be updating or deleting them. - association.find_in_batches do |batch| + records_batch_iterator.find_in_batches(:batch_size => batch_size) do |batch| update_time = time_now + batch.each do |record| - next unless assert_distinct_relation(record) + primary_key_value = record_key(record, primary_key) - index = inventory_collection.object_index_with_keys(unique_index_keys, record) + next unless assert_distinct_relation(primary_key_value) + # TODO(lsmola) unify this behavior with object_index_with_keys method in InventoryCollection + index = unique_index_keys_to_s.map { |attribute| record_key(record, attribute).to_s }.join(inventory_collection.stringify_joiner) inventory_object = inventory_objects_index.delete(index) - hash = attributes_index.delete(index) + hash = attributes_index[index] if inventory_object.nil? # Record was found in the DB but not sent for saving, that means it doesn't exist anymore and we should @@ -44,35 +109,39 @@ def save!(association) end else # Record was found in the DB and sent for saving, we will be updating the DB. - next unless assert_referential_integrity(hash, inventory_object) - inventory_object.id = record.id + next unless assert_referential_integrity(hash) + inventory_object.id = primary_key_value hash_for_update = if inventory_collection.use_ar_object? record.assign_attributes(hash.except(:id, :type)) values_for_database(inventory_collection.model_class, all_attribute_keys, - record.attributes) + record.attributes.symbolize_keys) + elsif inventory_collection.serializable_keys?(all_attribute_keys) + values_for_database(inventory_collection.model_class, + all_attribute_keys, + hash) else - hash.symbolize_keys + hash end assign_attributes_for_update!(hash_for_update, update_time) - inventory_collection.store_updated_records(record) + inventory_collection.store_updated_records([{:id => primary_key_value}]) - hash_for_update[:id] = record.id - hashes_for_update << hash_for_update.except(:type) + hash_for_update[:id] = primary_key_value + hashes_for_update << hash_for_update end end # Update in batches - if hashes_for_update.size >= batch_size + if hashes_for_update.size >= batch_size_for_persisting update_records!(all_attribute_keys, hashes_for_update) hashes_for_update = [] end # Destroy in batches - if records_for_destroy.size >= batch_size - destroy_records(records_for_destroy) + if records_for_destroy.size >= batch_size_for_persisting + destroy_records!(records_for_destroy) records_for_destroy = [] end end @@ -82,26 +151,11 @@ def save!(association) hashes_for_update = [] # Cleanup so GC can release it sooner # Destroy the last batch - destroy_records(records_for_destroy) + destroy_records!(records_for_destroy) records_for_destroy = [] # Cleanup so GC can release it sooner - - all_attribute_keys << :type if inventory_collection.supports_sti? - # Records that were not found in the DB but sent for saving, we will be creating these in the DB. - if inventory_collection.create_allowed? - inventory_objects_index.each_slice(batch_size) do |batch| - create_records!(all_attribute_keys, batch, attributes_index) - end - end - _log.info("*************** PROCESSED #{inventory_collection}, "\ - "created=#{inventory_collection.created_records.count}, "\ - "updated=#{inventory_collection.updated_records.count}, "\ - "deleted=#{inventory_collection.deleted_records.count} *************") - rescue => e - _log.error("Error when saving #{inventory_collection} with #{inventory_collection_details}. Message: #{e.message}") - raise e end - def destroy_records(records) + def destroy_records!(records) return false unless inventory_collection.delete_allowed? return if records.blank? @@ -109,13 +163,21 @@ def destroy_records(records) rails_delete = %i(destroy delete).include?(inventory_collection.delete_method) if !rails_delete && inventory_collection.model_class.respond_to?(inventory_collection.delete_method) # We have custom delete method defined on a class, that means it supports batch destroy - inventory_collection.model_class.public_send(inventory_collection.delete_method, records.map(&:id)) + # TODO(lsmola) store deleted records to IC + inventory_collection.model_class.public_send(inventory_collection.delete_method, records.map { |x| record_key(x, primary_key) }) else # We have either standard :destroy and :delete rails method, or custom instance level delete method # Note: The standard :destroy and :delete rails method can't be batched because of the hooks and cascade destroy ActiveRecord::Base.transaction do - records.each do |record| - delete_record!(record) + if pure_sql_records_fetching + # For pure SQL fetching, we need to get the AR objects again, so we can call destroy + inventory_collection.model_class.where(:id => records.map { |x| record_key(x, primary_key) }).find_each do |record| + delete_record!(record) + end + else + records.each do |record| + delete_record!(record) + end end end end @@ -123,27 +185,31 @@ def destroy_records(records) def update_records!(all_attribute_keys, hashes) return if hashes.blank? - - ActiveRecord::Base.connection.execute(build_update_query(all_attribute_keys, hashes)) + query = build_update_query(all_attribute_keys, hashes) + get_connection.execute(query) end def create_records!(all_attribute_keys, batch, attributes_index) indexed_inventory_objects = {} - hashes = [] - create_time = time_now + hashes = [] + create_time = time_now batch.each do |index, inventory_object| hash = if inventory_collection.use_ar_object? record = inventory_collection.model_class.new(attributes_index.delete(index)) values_for_database(inventory_collection.model_class, all_attribute_keys, - record.attributes) + record.attributes.symbolize_keys) + elsif inventory_collection.serializable_keys?(all_attribute_keys) + values_for_database(inventory_collection.model_class, + all_attribute_keys, + attributes_index.delete(index)) else - attributes_index.delete(index).symbolize_keys + attributes_index.delete(index) end assign_attributes_for_create!(hash, create_time) - next unless assert_referential_integrity(hash, inventory_object) + next unless assert_referential_integrity(hash) hashes << hash # Index on Unique Columns values, so we can easily fill in the :id later @@ -152,7 +218,7 @@ def create_records!(all_attribute_keys, batch, attributes_index) return if hashes.blank? - result = ActiveRecord::Base.connection.execute( + result = get_connection.execute( build_insert_query(all_attribute_keys, hashes) ) inventory_collection.store_created_records(result) @@ -164,8 +230,8 @@ def create_records!(all_attribute_keys, batch, attributes_index) def values_for_database(model_class, all_attribute_keys, attributes) all_attribute_keys.each_with_object({}) do |attribute_name, db_values| - type = model_class.type_for_attribute(attribute_name.to_s) - raw_val = attributes[attribute_name.to_s] + type = model_class.type_for_attribute(attribute_name.to_s) + raw_val = attributes[attribute_name] db_values[attribute_name] = type.type == :boolean ? type.cast(raw_val) : type.serialize(raw_val) end end @@ -176,11 +242,11 @@ def map_ids_to_inventory_objects(indexed_inventory_objects, all_attribute_keys, # we test if the number of results matches the expected batch size. Then if the counts do not match, the only # safe option is to query all the data from the DB, using the unique_indexes. The batch size will also not match # for every remainders(a last batch in a stream of batches) - if !supports_remote_data_timestamp?(all_attribute_keys) || result.count == batch_size + if !supports_remote_data_timestamp?(all_attribute_keys) || result.count == batch_size_for_persisting result.each do |inserted_record| key = unique_index_columns.map { |x| inserted_record[x.to_s] } inventory_object = indexed_inventory_objects[key] - inventory_object.id = inserted_record["id"] if inventory_object + inventory_object.id = inserted_record[primary_key] if inventory_object end else inventory_collection.model_class.where( diff --git a/app/models/manager_refresh/save_collection/saver/sql_helper.rb b/app/models/manager_refresh/save_collection/saver/sql_helper.rb index f533fd534f4..c816856621e 100644 --- a/app/models/manager_refresh/save_collection/saver/sql_helper.rb +++ b/app/models/manager_refresh/save_collection/saver/sql_helper.rb @@ -16,10 +16,13 @@ def build_insert_set_cols(key) end def build_insert_query(all_attribute_keys, hashes) + # Cache the connection for the batch + connection = get_connection + all_attribute_keys_array = all_attribute_keys.to_a table_name = inventory_collection.model_class.table_name values = hashes.map do |hash| - "(#{all_attribute_keys_array.map { |x| quote(hash[x], x) }.join(",")})" + "(#{all_attribute_keys_array.map { |x| quote(connection, hash[x], x) }.join(",")})" end.join(",") col_names = all_attribute_keys_array.map { |x| quote_column_name(x) }.join(",") @@ -64,17 +67,24 @@ def build_update_set_cols(key) end def quote_column_name(key) - ActiveRecord::Base.connection.quote_column_name(key) + get_connection.quote_column_name(key) + end + + def get_connection + ActiveRecord::Base.connection end def build_update_query(all_attribute_keys, hashes) + # Cache the connection for the batch + connection = get_connection + # We want to ignore type and create timestamps when updating all_attribute_keys_array = all_attribute_keys.to_a.delete_if { |x| %i(type created_at created_on).include?(x) } all_attribute_keys_array << :id table_name = inventory_collection.model_class.table_name - values = hashes.map do |hash| - "(#{all_attribute_keys_array.map { |x| quote(hash[x], x, inventory_collection) }.join(",")})" + values = hashes.map! do |hash| + "(#{all_attribute_keys_array.map { |x| quote(connection, hash[x], x, true) }.join(",")})" end.join(",") update_query = %{ @@ -106,35 +116,46 @@ def build_multi_selection_query(hashes) inventory_collection.build_multi_selection_condition(hashes, unique_index_columns) end - def quote(value, name = nil, used_inventory_collection = nil) + def quote(connection, value, name = nil, type_cast_for_pg = nil) # TODO(lsmola) needed only because UPDATE FROM VALUES needs a specific PG typecasting, remove when fixed in PG - if used_inventory_collection.nil? - ActiveRecord::Base.connection.quote(value) + if type_cast_for_pg + quote_and_pg_type_cast(connection, value, name) else - quote_and_pg_type_cast(value, name, used_inventory_collection) + connection.quote(value) end rescue TypeError => e _log.error("Can't quote value: #{value}, of :#{name} and #{inventory_collection}") raise e end - def quote_and_pg_type_cast(value, name, used_inventory_collection) + def quote_and_pg_type_cast(connection, value, name) pg_type_cast( - ActiveRecord::Base.connection.quote(value), - used_inventory_collection.model_class.columns_hash[name.to_s] - .try(:sql_type_metadata) - .try(:instance_values) - .try(:[], "sql_type") + connection.quote(value), + pg_type(name) ) end def pg_type_cast(value, sql_type) - if sql_type.blank? + if sql_type.nil? value else "#{value}::#{sql_type}" end end + + def pg_type(name) + @pg_types_cache[name] + end + + def collect_pg_types!(all_attribute_keys) + @pg_types_cache = {} + all_attribute_keys.each do |key| + @pg_types_cache[key] = inventory_collection.model_class.columns_hash[key.to_s] + .try(:sql_type_metadata) + .try(:instance_values) + .try(:[], "sql_type") + end + end end end end