Ask AI

Datahub (dagster-datahub)

This library provides an integration with Datahub, to support pushing metadata to Datahub from within Dagster ops.


We use the Datahub Python Library. To use it, you’ll first need to start up a Datahub Instance. Datahub Quickstart Guide.


dagster_datahub.DatahubRESTEmitterResource ResourceDefinition[source]

Config Schema:
connection (dagster.StringSource):

Datahub GMS Server

token (Union[dagster.StringSource, None], optional):

Personal Access Token

Default Value: None

connect_timeout_sec (Union[Float, None], optional):

Default Value: None

read_timeout_sec (Union[Float, None], optional):

Default Value: None

retry_status_codes (Union[List[dagster.IntSource], None], optional):

Default Value: None

retry_methods (Union[List[dagster.StringSource], None], optional):

Default Value: None

retry_max_times (Union[dagster.IntSource, None], optional):

Default Value: None

extra_headers (Union[dict, None], optional):

Default Value: None

ca_certificate_path (Union[dagster.StringSource, None], optional):

Default Value: None

server_telemetry_id (Union[dagster.StringSource, None], optional):

Default Value: None

disable_ssl_verification (Union[dagster.BoolSource, None], optional):

Default Value: False

Base class for Dagster resources that utilize structured config.

This class is a subclass of both ResourceDefinition and Config.

Example definition:

class WriterResource(ConfigurableResource):
    prefix: str

    def output(self, text: str) -> None:
        print(f"{self.prefix}{text}")

Example usage:

@asset
def asset_that_uses_writer(writer: WriterResource):
    writer.output("text")

defs = Definitions(
    assets=[asset_that_uses_writer],
    resources={"writer": WriterResource(prefix="a_prefix")},
)

You can optionally use this class to model configuration only and vend an object of a different type for use at runtime. This is useful for those who wish to have a separate object that manages configuration and a separate object at runtime. Or where you want to directly use a third-party class that you do not control.

To do this you override the create_resource methods to return a different object.

class WriterResource(ConfigurableResource):
    str: prefix

    def create_resource(self, context: InitResourceContext) -> Writer:
        # Writer is pre-existing class defined else
        return Writer(self.prefix)

Example usage:

@asset
def use_preexisting_writer_as_resource(writer: ResourceParam[Writer]):
    writer.output("text")

defs = Definitions(
    assets=[use_preexisting_writer_as_resource],
    resources={"writer": WriterResource(prefix="a_prefix")},
)
dagster_datahub.DatahubKafkaEmitterResource ResourceDefinition[source]

Config Schema:
connection (strict dict):
Config Schema:
bootstrap (dagster.StringSource):

Kafka Boostrap Servers. Comma delimited

schema_registry_url (dagster.StringSource):

Schema Registry Location.

schema_registry_config (Union[dict, None], optional):

Extra Schema Registry Config.

Default Value:
{}
topic_routes (Union[dict, None], optional):
Default Value:
{
    "MetadataChangeEvent": "MetadataChangeEvent_v4",
    "MetadataChangeProposal": "MetadataChangeProposal_v1"
}

Base class for Dagster resources that utilize structured config.

This class is a subclass of both ResourceDefinition and Config.

Example definition:

class WriterResource(ConfigurableResource):
    prefix: str

    def output(self, text: str) -> None:
        print(f"{self.prefix}{text}")

Example usage:

@asset
def asset_that_uses_writer(writer: WriterResource):
    writer.output("text")

defs = Definitions(
    assets=[asset_that_uses_writer],
    resources={"writer": WriterResource(prefix="a_prefix")},
)

You can optionally use this class to model configuration only and vend an object of a different type for use at runtime. This is useful for those who wish to have a separate object that manages configuration and a separate object at runtime. Or where you want to directly use a third-party class that you do not control.

To do this you override the create_resource methods to return a different object.

class WriterResource(ConfigurableResource):
    str: prefix

    def create_resource(self, context: InitResourceContext) -> Writer:
        # Writer is pre-existing class defined else
        return Writer(self.prefix)

Example usage:

@asset
def use_preexisting_writer_as_resource(writer: ResourceParam[Writer]):
    writer.output("text")

defs = Definitions(
    assets=[use_preexisting_writer_as_resource],
    resources={"writer": WriterResource(prefix="a_prefix")},
)

Legacy

dagster_datahub.datahub_rest_emitter ResourceDefinition[source]

Config Schema:
connection (dagster.StringSource):

Datahub GMS Server

token (Union[dagster.StringSource, None], optional):

Personal Access Token

Default Value: None

connect_timeout_sec (Union[Float, None], optional):

Default Value: None

read_timeout_sec (Union[Float, None], optional):

Default Value: None

retry_status_codes (Union[List[dagster.IntSource], None], optional):

Default Value: None

retry_methods (Union[List[dagster.StringSource], None], optional):

Default Value: None

retry_max_times (Union[dagster.IntSource, None], optional):

Default Value: None

extra_headers (Union[dict, None], optional):

Default Value: None

ca_certificate_path (Union[dagster.StringSource, None], optional):

Default Value: None

server_telemetry_id (Union[dagster.StringSource, None], optional):

Default Value: None

disable_ssl_verification (Union[dagster.BoolSource, None], optional):

Default Value: False

dagster_datahub.datahub_kafka_emitter ResourceDefinition[source]

Config Schema:
connection (strict dict):
Config Schema:
bootstrap (dagster.StringSource):

Kafka Boostrap Servers. Comma delimited

schema_registry_url (dagster.StringSource):

Schema Registry Location.

schema_registry_config (Union[dict, None], optional):

Extra Schema Registry Config.

Default Value:
{}
topic_routes (Union[dict, None], optional):
Default Value:
{
    "MetadataChangeEvent": "MetadataChangeEvent_v4",
    "MetadataChangeProposal": "MetadataChangeProposal_v1"
}