Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PartitionedDataset caching prevents from finding files generated by prior nodes with ParallelRunner #4164

Open
gtauzin opened this issue Sep 14, 2024 · 1 comment

Comments

@gtauzin
Copy link

gtauzin commented Sep 14, 2024

Description

I have a pipeline in which the first node generate files that are then picked up by a follow up node using a partition dataset. If I run this pipeline with ParallelRunner, the partition file list is created before the whole pipeline is ran, thus making it impossible to find the files created by the prior node.

Context

I have a pipeline with two nodes that is applied by source buckets using namespaces. I would like to have it run in parallel (one process per source).

  • Node 1. Looks up a source bucket for new files concatenating all of them together;
  • Node 2. looks up all new files concatenations created so far for the source and concatenating them together to get all data collected so far for this source.

The way I achieve this with Kedro is:

  • For node 1, I use IncrementalDataset and the concatenated dataframe is saved using a versioned ParquetDataset.
  • For node 2, I use a PartionedDataset that is able to find all preprocessed recorded event computer so far (with load_args withdirs and max_depth set accordingly)

Node 1 will also return a boolean that node 2 takes an input so that the resulting DAGS has a dependency link from node 1 to node 2.

Steps to Reproduce

Here is what the pipeline code looks like:

def create_pipeline(**kwargs) -> Pipeline:
    def get_pipeline(namespace: str):
        template_pipeline = pipeline(
            [
                node(
                    concatenate_increment,
                    inputs="data_increment",
                    outputs=["concatenated_data_increment", "data_increment_concatenated"],
                    name="concatenate_increment",
                    confirms=f"{namespace}.data_increment", # This is needed as the incremental dataset is namespaced
                ),
                node(
                    concatenate_partition,
                    inputs=[
                        "partitioned_concatenated_data",
                        "data_increment_concatenated",
                    ],
                    outputs="extracted_data",
                    name="concatenate_partition",
                ),
            ],
        )

        return template_pipeline
 
    pipelines = pipeline(
        pipe=get_pipeline(namespace=SOURCES[0]),
        namespace=SOURCES[0],
    )
    for source in SOURCES[1:]:
        pipelines += pipeline(
            pipe=get_pipeline(namespace=source),
            namespace=source,
        )

    return pipelines

And the catalog:

"{source}.data_increment":
  type: partitions.IncrementalDataset
  path: data/01_raw//{source}/
  dataset:
    type: pandas.CSVDataset
  filename_suffix: ".csv"

"{source}.data_increment_concatenated":
  type: MemoryDataset

"{source}.concatenated_data_increment":
  type: pandas.ParquetDataset
  filepath: data/02_intermediate/{source}/concatenated_data.pq
  versioned: true

"{source}.partitioned_concatenated_data":
  type: partitions.PartitionedDataset
  path: data/02_intermediate/{source}/concatenated_data.pq/
  dataset:
    type: pandas.ParquetDataset
  load_args:
    withdirs: true
    max_depth: 1
  filename_suffix: ".pq"

"{source}.extracted_data":
  type: pandas.ParquetDataset
  filepath: data/02_intermediate/{source}/extracted_data.pq

Putting even a single parquet file for a single source in data/01_raw/source_1 and creating a pipeline from the template pipeline method for namespace set to source_1 allows to reproduce the bug. For the sake of clarity, I did not provide the concatenate_increment and concatenate_partition node function, but I can provide them if needed. They are basically just calling pd.concat.

Expected Result

Pipeline runs successfully and results running this pipeline using SequentialRunner or ParallelRunner are identical.

Actual Result

Pipeline runs fine with SequentialRunner, but when ran with ParallelRunner, it complains that there are no files in the partition:

kedro.io.core.DatasetError: No partitions found.

Your Environment

  • Kedro version used (pip show kedro or kedro -V): 0.19.6
  • Python version used (python -V): 3.12
  • Operating system and version: Ubuntu 22.04 LTS
@gtauzin
Copy link
Author

gtauzin commented Sep 14, 2024

From @deepyaman on the kedro slack:

This seems quite possible, though, as _list_partitions is cached, so anything that attempts to access it may have hit this code: https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py#L257-L264
There is an exists check that was introduced in #3332 that is triggered before pipeline run, that can populate this cache.
@Merel
may have some more familiarity, since she worked on #3332

He also suggested to create a custom PartitionedDataset and remove the caching decorator on_list_partitions as a workaround. I can confirm that this workaround worked for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant