Skip to main content

Sling (dagster-sling)

This library provides a Dagster integration with Sling.

For more information on getting started, see the Sling & Dagster documentation.


@dagster_sling.sling_assets [source]

Create a definition for how to materialize a set of Sling replication streams as Dagster assets, as described by a Sling replication config. This will create on Asset for every Sling target stream.

A Sling Replication config is a configuration that maps sources to destinations. For the full spec and descriptions, see Sling’s Documentation.


  • replication_config (Union[Mapping[str, Any], str, Path]) – A path to a Sling replication config, or a dictionary of a replication config.
  • dagster_sling_translator – (DagsterSlingTranslator): Allows customization of how to map a Sling stream to a Dagster AssetKey.
  • (Optional[str] (name) – The name of the op.
  • partitions_def (Optional[PartitionsDefinition]) – The partitions definition for this asset.
  • backfill_policy (Optional[BackfillPolicy]) – The backfill policy for this asset.
  • op_tags (Optional[Mapping[str, Any]]) – The tags for the underlying op.
  • pool (Optional[str]) – A string that identifies the concurrency pool that governs the sling assets’ execution.


Running a sync by providing a path to a Sling Replication config:

from dagster_sling import sling_assets, SlingResource, SlingConnectionResource

sling_resource = SlingResource(
name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_URL")

config_path = "/path/to/replication.yaml"
def my_assets(context, sling: SlingResource):
yield from sling.replicate(context=context)
class dagster_sling.DagsterSlingTranslator [source]
get_asset_key [source]

This API has been superseded. Use DagsterSlingTranslator.get_asset_spec(...).key instead..

A function that takes a stream definition from a Sling replication config and returns a Dagster AssetKey.

The stream definition is a dictionary key/value pair where the key is the stream name and the value is a dictionary representing the Sling Replication Stream Config.

For example:

stream_definition = {"public.users":
{'sql': 'select all_user_id, name from public."all_Users"',
'object': 'public.all_users'}

By default, this returns the class’s target_prefix parameter concatenated with the stream name. A stream named “public.accounts” will create an AssetKey named “target_public_accounts”.

Override this function to customize how to map a Sling stream to a Dagster AssetKey.

Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows:

asset_key: "mydb_users"

Parameters: stream_definition (Mapping[str, Any]) – A dictionary representing the stream definitionReturns: The Dagster AssetKey for the replication stream.Return type: AssetKey Examples:

Using a custom mapping for streams:

class CustomSlingTranslator(DagsterSlingTranslator):
def get_asset_spec(self, stream_definition: Mapping[str, Any]) -> AssetKey:
default_spec = super().get_asset_spec(stream_definition)
map = {"stream1": "asset1", "stream2": "asset2"}
return default_spec.replace_attributes(key=AssetKey(map[stream_definition["name"]]))
get_asset_spec [source]

A function that takes a stream definition from a Sling replication config and returns a Dagster AssetSpec.

The stream definition is a dictionary key/value pair where the key is the stream name and the value is a dictionary representing the Sling Replication Stream Config.

get_auto_materialize_policy [source]

This API has been superseded. Use DagsterSlingTranslator.get_asset_spec(...).auto_materialize_policy instead..

Defines the auto-materialize policy for a given stream definition.

This method checks the provided stream definition for a specific configuration indicating an auto-materialize policy. If the configuration is found, it returns an eager auto-materialize policy. Otherwise, it returns None.


  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: An eager auto-materialize policy if the configuration is found, otherwise None.Return type: Optional[AutoMaterializePolicy]

get_deps_asset_key [source]

This API has been superseded. Iterate over DagsterSlingTranslator.get_asset_spec(...).deps to access AssetDep.asset_key instead..

A function that takes a stream definition from a Sling replication config and returns a Dagster AssetKey for each dependency of the replication stream.

By default, this returns the stream name. For example, a stream named “public.accounts” will create an AssetKey named “target_public_accounts” and a dependency named “public_accounts”.

Override this function to customize how to map a Sling stream to a Dagster dependency. Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows:

deps: "sourcedb_users"

Parameters: stream_definition (Mapping[str, Any]) – A dictionary representing the stream definitionReturns: A list of Dagster AssetKey for each dependency of the replication stream.Return type: Iterable[AssetKey]

get_description [source]

This API has been superseded. Use DagsterSlingTranslator.get_asset_spec(...).description instead..

Retrieves the description for a given stream definition.

This method checks the provided stream definition for a description. It first looks for an “sql” key in the configuration and returns its value if found. If not, it looks for a description in the metadata under the “dagster” key.


  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: The description of the stream if found, otherwise None.Return type: Optional[str]

get_freshness_policy [source]

This API has been superseded. Use DagsterSlingTranslator.get_asset_spec(...).freshness_policy instead..

Retrieves the freshness policy for a given stream definition.

This method checks the provided stream definition for a specific configuration indicating a freshness policy. If the configuration is found, it constructs and returns a FreshnessPolicy object based on the provided parameters. Otherwise, it returns None.


  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: A FreshnessPolicy object if the configuration is found, otherwise None.Return type: Optional[FreshnessPolicy]

get_group_name [source]

This API has been superseded. Use DagsterSlingTranslator.get_asset_spec(...).group_name instead..

Retrieves the group name for a given stream definition.

This method checks the provided stream definition for a group name in the metadata under the “dagster” key.


  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: The group name if found, otherwise None.Return type: Optional[str]

get_kinds [source]

This API has been superseded. Use DagsterSlingTranslator.get_asset_spec(...).kinds instead..

Retrieves the kinds for a given stream definition.

This method returns “sling” by default. This method can be overridden to provide custom kinds.


  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: A set containing kinds for the stream’s assets.Return type: Set[str]

get_metadata [source]

This API has been superseded. Use DagsterSlingTranslator.get_asset_spec(...).metadata instead..

Retrieves the metadata for a given stream definition.

This method extracts the configuration from the provided stream definition and returns it as a JSON metadata value.


  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: A dictionary containing the stream configuration as JSON metadata.Return type: Mapping[str, Any]

get_tags [source]

This API has been superseded. Use DagsterSlingTranslator.get_asset_spec(...).tags instead..

Retrieves the tags for a given stream definition.

This method returns an empty dictionary, indicating that no tags are associated with the stream definition by default. This method can be overridden to provide custom tags.


  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
  • details. (which includes configuration)

Returns: An empty dictionary.Return type: Mapping[str, Any]

sanitize_stream_name [source]

A function that takes a stream name from a Sling replication config and returns a sanitized name for the stream.

By default, this removes any non-alphanumeric characters from the stream name and replaces them with underscores, while removing any double quotes.

Parameters: stream_name (str) – The name of the stream. Examples:

Using a custom stream name sanitizer:

class CustomSlingTranslator(DagsterSlingTranslator):
def sanitize_stream_name(self, stream_name: str) -> str:
return stream_name.replace(".", "")


class dagster_sling.SlingResource [source]

Resource for interacting with the Sling package. This resource can be used to run Sling replications.

Parameters: connections (List[SlingConnectionResource]) – A list of connections to use for the replication. Examples:

from import SlingResource, SlingConnectionResource

sling_resource = SlingResource(
class dagster_sling.SlingConnectionResource [source]

A representation of a connection to a database or file to be used by Sling. This resource can be used as a source or a target for a Sling syncs.

Reference the Sling docs for more information on possible connection types and parameters:

The name of the connection is passed to Sling and must match the name of the connection provided in the replication configuration: You may provide either a connection string or keyword arguments for the connection.


Creating a Sling Connection for a file, such as CSV or JSON:

source = SlingConnectionResource(name="MY_FILE", type="file")

Create a Sling Connection for a Postgres database, using a connection string:

postgres_conn = SlingConnectionResource(name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING"))
mysql_conn = SlingConnectionResource(name="MY_MYSQL", type="mysql", connection_string="mysql://user:password@host:port/schema")

Create a Sling Connection for a Postgres or Snowflake database, using keyword arguments: