kedro.extras.datasets.spark.SparkDataSet

class kedro.extras.datasets.spark.SparkDataSet(filepath, file_format='parquet', load_args=None, save_args=None, version=None, credentials=None)[source]

SparkDataSet loads and saves Spark dataframes.

Example usage for the YAML API:

weather:
  type: spark.SparkDataSet
  filepath: s3a://your_bucket/data/01_raw/weather/*
  file_format: csv
  load_args:
    header: True
    inferSchema: True
  save_args:
    sep: '|'
    header: True

weather_with_schema:
  type: spark.SparkDataSet
  filepath: s3a://your_bucket/data/01_raw/weather/*
  file_format: csv
  load_args:
    header: True
    schema:
      filepath: path/to/schema.json
  save_args:
    sep: '|'
    header: True

weather_cleaned:
  type: spark.SparkDataSet
  filepath: data/02_intermediate/data.parquet
  file_format: parquet

Example usage for the Python API:

from pyspark.sql import SparkSession
from pyspark.sql.types import (StructField, StringType,
                               IntegerType, StructType)

from kedro.extras.datasets.spark import SparkDataSet

schema = StructType([StructField("name", StringType(), True),
                     StructField("age", IntegerType(), True)])

data = [('Alex', 31), ('Bob', 12), ('Clarke', 65), ('Dave', 29)]

spark_df = SparkSession.builder.getOrCreate()                                .createDataFrame(data, schema)

data_set = SparkDataSet(filepath="test_data")
data_set.save(spark_df)
reloaded = data_set.load()

reloaded.take(4)

Attributes

DEFAULT_LOAD_ARGS

DEFAULT_SAVE_ARGS

Methods

exists()

Checks whether a data set's output already exists by calling the provided _exists() method.

from_config(name, config[, load_version, ...])

Create a data set instance using the configuration provided.

load()

Loads data by delegation to the provided load method.

release()

Release any cached data.

resolve_load_version()

Compute the version the dataset should be loaded with.

resolve_save_version()

Compute the version the dataset should be saved with.

save(data)

Saves data by delegation to the provided save method.

DEFAULT_LOAD_ARGS: Dict[str, Any] = {}
DEFAULT_SAVE_ARGS: Dict[str, Any] = {}
__init__(filepath, file_format='parquet', load_args=None, save_args=None, version=None, credentials=None)[source]

Creates a new instance of SparkDataSet.

Parameters:
  • filepath (str) – Filepath in POSIX format to a Spark dataframe. When using Databricks and working with data written to mount path points, specify filepath``s for (versioned) ``SparkDataSet``s starting with ``/dbfs/mnt.

  • file_format (str) – File format used during load and save operations. These are formats supported by the running SparkContext include parquet, csv, delta. For a list of supported formats please refer to Apache Spark documentation at https://spark.apache.org/docs/latest/sql-programming-guide.html

  • load_args (Optional[Dict[str, Any]]) – Load args passed to Spark DataFrameReader load method. It is dependent on the selected file format. You can find a list of read options for each supported format in Spark DataFrame read documentation: https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html

  • save_args (Optional[Dict[str, Any]]) – Save args passed to Spark DataFrame write options. Similar to load_args this is dependent on the selected file format. You can pass mode and partitionBy to specify your overwrite mode and partitioning respectively. You can find a list of options for each format in Spark DataFrame write documentation: https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html

  • version (Optional[Version]) – If specified, should be an instance of kedro.io.core.Version. If its load attribute is None, the latest version will be loaded. If its save attribute is None, save version will be autogenerated.

  • credentials (Optional[Dict[str, Any]]) – Credentials to access the S3 bucket, such as key, secret, if filepath prefix is s3a:// or s3n://. Optional keyword arguments passed to hdfs.client.InsecureClient if filepath prefix is hdfs://. Ignored otherwise.

exists()

Checks whether a data set’s output already exists by calling the provided _exists() method.

Return type:

bool

Returns:

Flag indicating whether the output already exists.

Raises:

DatasetError – when underlying exists method raises error.

classmethod from_config(name, config, load_version=None, save_version=None)

Create a data set instance using the configuration provided.

Parameters:
  • name – Data set name.

  • config – Data set config dictionary.

  • load_version – Version string to be used for load operation if the data set is versioned. Has no effect on the data set if versioning was not enabled.

  • save_version – Version string to be used for save operation if the data set is versioned. Has no effect on the data set if versioning was not enabled.

Returns:

An instance of an AbstractDataset subclass.

Raises:

DatasetError – When the function fails to create the data set from its config.

load()

Loads data by delegation to the provided load method.

Return type:

TypeVar(_DO)

Returns:

Data returned by the provided load method.

Raises:

DatasetError – When underlying load method raises error.

release()

Release any cached data.

Raises:

DatasetError – when underlying release method raises error.

Return type:

None

resolve_load_version()

Compute the version the dataset should be loaded with.

Return type:

str | None

resolve_save_version()

Compute the version the dataset should be saved with.

Return type:

str | None

save(data)

Saves data by delegation to the provided save method.

Parameters:

data (TypeVar(_DI)) – the value to be saved by provided save method.

Raises:
  • DatasetError – when underlying save method raises error.

  • FileNotFoundError – when save method got file instead of dir, on Windows.

  • NotADirectoryError – when save method got file instead of dir, on Unix.

Return type:

None