kedro_datasets.pandas.DeltaTableDataset

class kedro_datasets.pandas.DeltaTableDataset(filepath=None, catalog_type=None, catalog_name=None, database=None, table=None, load_args=None, save_args=None, credentials=None, fs_args=None)[source]

DeltaTableDataset loads/saves delta tables from/to a filesystem (e.g.: local, S3, GCS), Databricks unity catalog and AWS Glue catalog respectively. It handles load and save using a pandas dataframe. When saving data, you can specify one of two modes: overwrite(default), append. If you wish to alter the schema as a part of overwrite, pass overwrite_schema=True. You can overwrite a specific partition by using mode=overwrite together with partition_filters. This will remove all files within the matching partition and insert your data as new files.

Example usage for the YAML API:

boats_filesystem:
  type: pandas.DeltaTableDataset
  filepath: data/01_raw/boats
  credentials: dev_creds
  load_args:
    version: 7
  save_args:
    mode: overwrite

boats_databricks_unity_catalog:
  type: pandas.DeltaTableDataset
  credentials: dev_creds
  catalog_type: UNITY
  database: simple_database
  table: simple_table
  save_args:
    mode: overwrite

trucks_aws_glue_catalog:
  type: pandas.DeltaTableDataset
  credentials: dev_creds
  catalog_type: AWS
  catalog_name: main
  database: db_schema
  table: db_table
  save_args:
    mode: overwrite

Example usage for the Python API:

from kedro_datasets.pandas import DeltaTableDataset
import pandas as pd

data = pd.DataFrame({'col1': [1, 2], 'col2': [4, 5], 'col3': [5, 6]})
dataset = DeltaTableDataset(filepath="test")

dataset.save(data)
reloaded = dataset.load()
assert data.equals(reloaded)

new_data = pd.DataFrame({'col1': [7, 8], 'col2': [9, 10], 'col3': [11, 12]})
dataset.save(new_data)
dataset.get_loaded_version()

Attributes

ACCEPTED_WRITE_MODES

DEFAULT_LOAD_ARGS

DEFAULT_SAVE_ARGS

DEFAULT_WRITE_MODE

fs_args

Appends and returns filesystem credentials to fs_args.

history

Returns the history of actions on DeltaTableDataset as a list of dictionaries.

metadata

Returns the metadata of the DeltaTableDataset as a dictionary.

schema

Returns the schema of the DeltaTableDataset as a dictionary.

Methods

exists()

Checks whether a data set's output already exists by calling the provided _exists() method.

from_config(name, config[, load_version, ...])

Create a data set instance using the configuration provided.

get_loaded_version()

Returns the version of the DeltaTableDataset that is currently loaded.

load()

Loads data by delegation to the provided load method.

release()

Release any cached data.

save(data)

Saves data by delegation to the provided save method.

ACCEPTED_WRITE_MODES = ('overwrite', 'append')
DEFAULT_LOAD_ARGS: Dict[str, Any] = {}
DEFAULT_SAVE_ARGS: Dict[str, Any] = {'mode': 'overwrite'}
DEFAULT_WRITE_MODE = 'overwrite'
__init__(filepath=None, catalog_type=None, catalog_name=None, database=None, table=None, load_args=None, save_args=None, credentials=None, fs_args=None)[source]

Creates a new instance of DeltaTableDataset

Parameters:
  • filepath (str) – Filepath to a delta lake file with the following accepted protocol: S3: s3://<bucket>/<path>, s3a://<bucket>/<path> Azure: az://<container>/<path>, adl://<container>/<path>, abfs://<container>/<path> GCS: gs://<bucket>/<path> If any of the prefix above is not provided, file protocol (local filesystem) will be used.

  • catalog_type (DataCatalog, Optional) – AWS or UNITY if filepath is not provided. Defaults to None.

  • catalog_name (str, Optional) – the name of catalog in AWS Glue or Databricks Unity. Defaults to None.

  • database (str, Optional) – the name of the database (also referred to as schema). Defaults to None.

  • table (str, Optional) – the name of the table.

  • load_args (Dict[str, Any], Optional) – Additional options for loading file(s) into DeltaTableDataset. load_args accepts version to load the appropriate version when loading from a filesystem.

  • save_args (Dict[str, Any], Optional) – Additional saving options for saving into Delta lake. Here you can find all available arguments: https://delta-io.github.io/delta-rs/python/api_reference.html#writing-deltatables

  • credentials (Dict[str, Any], Optional) – Credentials required to get access to the underlying filesystem. E.g. for GCSFileSystem it should look like {“token”: None}.

  • fs_args (Dict[str, Any], Optional) – Extra arguments to pass into underlying filesystem class constructor. (e.g. {“project”: “my-project”} for GCSFileSystem).

Raises:

DatasetError – Invalid configuration supplied (through DeltaTableDataset validation)

exists()

Checks whether a data set’s output already exists by calling the provided _exists() method.

Return type:

bool

Returns:

Flag indicating whether the output already exists.

Raises:

DatasetError – when underlying exists method raises error.

classmethod from_config(name, config, load_version=None, save_version=None)

Create a data set instance using the configuration provided.

Parameters:
  • name – Data set name.

  • config – Data set config dictionary.

  • load_version – Version string to be used for load operation if the data set is versioned. Has no effect on the data set if versioning was not enabled.

  • save_version – Version string to be used for save operation if the data set is versioned. Has no effect on the data set if versioning was not enabled.

Returns:

An instance of an AbstractDataset subclass.

Raises:

DatasetError – When the function fails to create the data set from its config.

property fs_args: Dict[str, Any]

Appends and returns filesystem credentials to fs_args.

Return type:

Dict[str, Any]

get_loaded_version()[source]

Returns the version of the DeltaTableDataset that is currently loaded.

Return type:

int

property history: List[Dict[str, Any]]

Returns the history of actions on DeltaTableDataset as a list of dictionaries.

Return type:

List[Dict[str, Any]]

load()

Loads data by delegation to the provided load method.

Return type:

TypeVar(_DO)

Returns:

Data returned by the provided load method.

Raises:

DatasetError – When underlying load method raises error.

property metadata: Metadata

Returns the metadata of the DeltaTableDataset as a dictionary. Metadata contains the following: 1. A unique id 2. A name, if provided 3. A description, if provided 4. The list of partition_columns. 5. The created_time of the table 6. A map of table configuration. This includes fields such as delta.appendOnly, which if true indicates the table is not meant to have data deleted from it.

Returns: Metadata object containing the above metadata attributes.

Return type:

Metadata

release()

Release any cached data.

Raises:

DatasetError – when underlying release method raises error.

Return type:

None

save(data)

Saves data by delegation to the provided save method.

Parameters:

data (TypeVar(_DI)) – the value to be saved by provided save method.

Raises:
  • DatasetError – when underlying save method raises error.

  • FileNotFoundError – when save method got file instead of dir, on Windows.

  • NotADirectoryError – when save method got file instead of dir, on Unix.

Return type:

None

property schema: Dict[str, Any]

Returns the schema of the DeltaTableDataset as a dictionary.

Return type:

Dict[str, Any]