Skip to content

Can Kiba handle multiple sources and destinations?

Thibaut Barrère edited this page Apr 22, 2020 · 14 revisions

This is a somewhat frequent question. The answer is yes, but please read along to understand how things work in that area.

Multiple sources behaviour

If you declare multiple sources, like this:

source MySource, some_param: xxx
source MySource, some_param: yyy
source MyOtherSource, some_other_param: zzz

then at runtime, each source will be read sequentially by Kiba.

This means that your first transform will see:

  1. The first row yield by the first source.
  2. The second row yield by the first source.
  3. ...
  4. The first row yield by the second source.
  5. ...
  6. The last row yield by the last source.

Dealing with different source output formats

While having multiple sources can be convenient, it also means that your pipeline may have to deal with rows that are initially formatted differently, and you may have to reformat the rows dynamically, based on which source they are coming from, e.g:

# assuming that your sources are passing some meta-data
# to determine which source a row comes from
transform do |row|
  if row.fetch(:meta_source) == MySource
    { ... }
  else
    { ... }
  end
end

If you do so, you will have to pass enough data (or meta-data) to determine what type of data massaging is needed for a specific row.

Multiple destinations

In a similar fashion, each row reaching the end of the pipeline will be passed to each declared destination, in the declaration order.

If you have 2 destinations:

destination MyFirstDestination, config
destination MySecondDestination, config

then the row will first be passed to the first destination, and to the second right after.

⚠️ Be careful that if the first destination modifies the row (which is usually not recommended as a destination), the second destination will process the modified version of the row.

My destinations have different formats, how do I handle this?

A common trick (for instance used in Kiba Pro SQL BulkInsertDestination) is to have your destination pass a row_pre_processor lambda, which allows to adapt the data to what the specific destination needs.

This even allows to target different tables in a given database, like this:

destination Kiba::Pro::Destinations::SQLBulkInsert,
  table: 'orders',
  row_pre_processor: -> (row) { row.except(:line_items }
destination Kiba::Pro::Destinations::SQLBulkInsert,
  table: 'order_line_items',
  row_pre_processor: -> (row) { row.only(:line_items }

To implement such a pre processor, you can use the following canvas:

class MyDestination
  attr_reader :row_pre_processor

  def initialize(row_pre_processor: nil)
    @row_pre_processor = row_pre_processor
  end

  def write(row)
    row = row_pre_processor ? row_pre_processor.call(row) : row
    # do processing here
  end
end

Is it possible to route rows programmatically to a specific destination?

Yes - you can use a pre-processor to determine if the row should be sent to a destination or another:

destination MyDestination,
  table: 'active_customers',
  row_pre_processor: -> (r) { r.fetch(:status) == 'active' ? r : nil }

destination MyDestination,
  table: 'inactive_customers',
  row_pre_processor: -> (r) { r.fetch(:status) == 'inactive' ? r : nil }

Then your destination must pre-process the row and do nothing if the result is nil:

class MyDestination
  def initialize(table:, row_pre_processor: nil)

  def write(row)
    row = row_pre_processor ? row_pre_processor.call(row) : row
    return unless row
    # do the writing job here
  end
end