Ask AI

Source code for dagster._core.definitions.metadata.source_code

import inspect
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Sequence, Union

import dagster._check as check
from dagster._annotations import experimental, public
from dagster._core.definitions.metadata.metadata_set import (
    NamespacedMetadataSet as NamespacedMetadataSet,
    TableMetadataSet as TableMetadataSet,
)
from dagster._core.definitions.metadata.metadata_value import MetadataValue
from dagster._model import DagsterModel
from dagster._serdes import whitelist_for_serdes

if TYPE_CHECKING:
    from dagster._core.definitions.assets import AssetsDefinition, SourceAsset
    from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition

DEFAULT_SOURCE_FILE_KEY = "asset_definition"


@experimental
@whitelist_for_serdes
class LocalFileCodeReference(DagsterModel):
    """Represents a local file source location."""

    file_path: str
    line_number: Optional[int] = None
    label: Optional[str] = None


@experimental
@whitelist_for_serdes
class UrlCodeReference(DagsterModel):
    """Represents a source location which points at a URL, for example
    in source control.
    """

    url: str
    label: Optional[str] = None


[docs] @experimental @whitelist_for_serdes class CodeReferencesMetadataValue(DagsterModel, MetadataValue["CodeReferencesMetadataValue"]): """Metadata value type which represents source locations (locally or otherwise) of the asset in question. For example, the file path and line number where the asset is defined. Attributes: sources (List[Union[LocalFileCodeReference, SourceControlCodeReference]]): A list of code references for the asset, such as file locations or references to source control. """ code_references: List[Union[LocalFileCodeReference, UrlCodeReference]] @property def value(self) -> "CodeReferencesMetadataValue": return self
def local_source_path_from_fn(fn: Callable[..., Any]) -> Optional[LocalFileCodeReference]: cwd = os.getcwd() origin_file = os.path.abspath(os.path.join(cwd, inspect.getsourcefile(fn))) # type: ignore origin_file = check.not_none(origin_file) origin_line = inspect.getsourcelines(fn)[1] return LocalFileCodeReference(file_path=origin_file, line_number=origin_line) class CodeReferencesMetadataSet(NamespacedMetadataSet): """Metadata entries that apply to asset definitions and which specify the location where source code for the asset can be found. """ code_references: Optional[CodeReferencesMetadataValue] = None @classmethod def namespace(cls) -> str: return "dagster" def _with_code_source_single_definition( assets_def: Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"], ) -> Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]: from dagster._core.definitions.assets import AssetsDefinition # SourceAsset doesn't have an op definition to point to - cacheable assets # will be supported eventually but are a bit trickier if not isinstance(assets_def, AssetsDefinition): return assets_def metadata_by_key = dict(assets_def.metadata_by_key) or {} from dagster._core.definitions.decorators.op_decorator import DecoratedOpFunction from dagster._core.definitions.graph_definition import GraphDefinition from dagster._core.definitions.op_definition import OpDefinition base_fn = None if isinstance(assets_def.node_def, OpDefinition): base_fn = ( assets_def.node_def.compute_fn.decorated_fn if isinstance(assets_def.node_def.compute_fn, DecoratedOpFunction) else assets_def.node_def.compute_fn ) elif isinstance(assets_def.node_def, GraphDefinition): # For graph-backed assets, point to the composition fn, e.g. the # function decorated by @graph_asset base_fn = assets_def.node_def.composition_fn if not base_fn: return assets_def source_path = local_source_path_from_fn(base_fn) if source_path: sources = [source_path] for key in assets_def.keys: # merge with any existing metadata existing_source_code_metadata = CodeReferencesMetadataSet.extract( metadata_by_key.get(key, {}) ) existing_code_references = ( existing_source_code_metadata.code_references.code_references if existing_source_code_metadata.code_references else [] ) sources_for_asset: List[Union[LocalFileCodeReference, UrlCodeReference]] = [ *existing_code_references, *sources, ] metadata_by_key[key] = { **metadata_by_key.get(key, {}), **CodeReferencesMetadataSet( code_references=CodeReferencesMetadataValue(code_references=sources_for_asset) ), } return assets_def.map_asset_specs( lambda spec: spec.replace_attributes(metadata=metadata_by_key[spec.key]) )
[docs] @experimental class FilePathMapping(ABC): """Base class which defines a file path mapping function. These functions are used to map local file paths to their corresponding paths in a source control repository. In many cases where a source control repository is reproduced exactly on a local machine, the included AnchorBasedFilePathMapping class can be used to specify a direct mapping between the local file paths and the repository paths. However, in cases where the repository structure differs from the local structure, a custom mapping function can be provided to handle these cases. """
[docs] @public @abstractmethod def convert_to_source_control_path(self, local_path: Path) -> str: """Maps a local file path to the corresponding path in a source control repository. Args: local_path (Path): The local file path to map. Returns: str: The corresponding path in the hosted source control repository, relative to the repository root. """
[docs] @experimental @dataclass class AnchorBasedFilePathMapping(FilePathMapping): """Specifies the mapping between local file paths and their corresponding paths in a source control repository, using a specific file "anchor" as a reference point. All other paths are calculated relative to this anchor file. For example, if the chosen anchor file is `/Users/dagster/Documents/python_modules/my_module/my-module/__init__.py` locally, and `python_modules/my_module/my-module/__init__.py` in a source control repository, in order to map a different file `/Users/dagster/Documents/python_modules/my_module/my-module/my_asset.py` to the repository path, the mapping function will position the file in the repository relative to the anchor file's position in the repository, resulting in `python_modules/my_module/my-module/my_asset.py`. Args: local_file_anchor (Path): The path to a local file that is present in the repository. file_anchor_path_in_repository (str): The path to the anchor file in the repository. Example: .. code-block:: python mapping_fn = AnchorBasedFilePathMapping( local_file_anchor=Path(__file__), file_anchor_path_in_repository="python_modules/my_module/my-module/__init__.py", ) """ local_file_anchor: Path file_anchor_path_in_repository: str
[docs] @public def convert_to_source_control_path(self, local_path: Path) -> str: """Maps a local file path to the corresponding path in a source control repository based on the anchor file and its corresponding path in the repository. Args: local_path (Path): The local file path to map. Returns: str: The corresponding path in the hosted source control repository, relative to the repository root. """ path_from_anchor_to_target = os.path.relpath( local_path, self.local_file_anchor, ) return os.path.normpath( os.path.join( self.file_anchor_path_in_repository, path_from_anchor_to_target, ) )
def convert_local_path_to_git_path( base_git_url: str, file_path_mapping: FilePathMapping, local_path: LocalFileCodeReference, ) -> UrlCodeReference: source_file_from_repo_root = file_path_mapping.convert_to_source_control_path( Path(local_path.file_path) ) line_number_suffix = f"#L{local_path.line_number}" if local_path.line_number else "" return UrlCodeReference( url=f"{base_git_url}/{source_file_from_repo_root}{line_number_suffix}", label=local_path.label, ) def _convert_local_path_to_git_path_single_definition( base_git_url: str, file_path_mapping: FilePathMapping, assets_def: Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"], ) -> Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]: from dagster._core.definitions.assets import AssetsDefinition # SourceAsset doesn't have an op definition to point to - cacheable assets # will be supported eventually but are a bit trickier if not isinstance(assets_def, AssetsDefinition): return assets_def metadata_by_key = dict(assets_def.metadata_by_key) or {} for key in assets_def.keys: existing_source_code_metadata = CodeReferencesMetadataSet.extract( metadata_by_key.get(key, {}) ) if not existing_source_code_metadata.code_references: continue sources_for_asset: List[Union[LocalFileCodeReference, UrlCodeReference]] = [ convert_local_path_to_git_path( base_git_url, file_path_mapping, source, ) if isinstance(source, LocalFileCodeReference) else source for source in existing_source_code_metadata.code_references.code_references ] metadata_by_key[key] = { **metadata_by_key.get(key, {}), **CodeReferencesMetadataSet( code_references=CodeReferencesMetadataValue(code_references=sources_for_asset) ), } return assets_def.map_asset_specs( lambda spec: spec.replace_attributes(metadata=metadata_by_key[spec.key]) ) def _build_github_url(url: str, branch: str) -> str: return f"{url}/tree/{branch}" def _build_gitlab_url(url: str, branch: str) -> str: return f"{url}/-/tree/{branch}"
[docs] @experimental def with_source_code_references( assets_defs: Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]], ) -> Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]]: """Wrapper function which attaches local code reference metadata to the provided asset definitions. This points to the filepath and line number where the asset body is defined. Args: assets_defs (Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]): The asset definitions to which source code metadata should be attached. Returns: Sequence[AssetsDefinition]: The asset definitions with source code metadata attached. """ return [_with_code_source_single_definition(assets_def) for assets_def in assets_defs]