Ask AI

Source code for dagster_github.resources

import time
from datetime import datetime
from typing import Any, Dict, Optional

import jwt
import requests
from dagster import ConfigurableResource, resource
from dagster._annotations import public
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from pydantic import Field

GET_REPO_ID_QUERY = """
query get_repo_id($repo_name: String!, $repo_owner: String!) {
  repository(name: $repo_name, owner: $repo_owner) {
    id
  }
}
"""

GET_REPO_AND_REF_QUERY = """
query get_repo_and_ref($repo_name: String!, $repo_owner: String!, $source: String!) {
  repository(name: $repo_name, owner: $repo_owner) {
    id
    ref(qualifiedName: $source) {
      target {
        oid
      }
    }
  }
}
"""

CREATE_ISSUE_MUTATION = """
mutation CreateIssue($id: ID!, $title: String!, $body: String!) {
  createIssue(input: {
    repositoryId: $id,
    title: $title,
    body: $body
  }) {
    clientMutationId,
    issue {
      body
      title
      url
    }
  }
}
"""

CREATE_REF_MUTATION = """
mutation CreateRef($id: ID!, $name: String!, $oid: GitObjectID!) {
  createRef(input: {
    repositoryId: $id,
    name: $name,
    oid: $oid
  }) {
    clientMutationId,
    ref {
      id
      name
      target {
        oid
      }
    }
  }
}
"""

CREATE_PULL_REQUEST_MUTATION = """
mutation CreatePullRequest(
  $base_repo_id: ID!,
  $base_ref_name: String!,
  $head_repo_id: ID!,
  $head_ref_name: String!,
  $title: String!,
  $body: String,
  $maintainer_can_modify: Boolean,
  $draft: Boolean
) {
  createPullRequest(input: {
    repositoryId: $base_repo_id,
    baseRefName: $base_ref_name,
    headRepositoryId: $head_repo_id,
    headRefName: $head_ref_name,
    title: $title,
    body: $body,
    maintainerCanModify: $maintainer_can_modify,
    draft: $draft
  }) {
    clientMutationId
    pullRequest {
      id
      url
    }
  }
}
"""


def to_seconds(dt: datetime) -> float:
    return (dt - datetime(1970, 1, 1)).total_seconds()


[docs] class GithubClient: """A client for interacting with the GitHub API. This client handles authentication and provides methods for making requests to the GitHub API using an authenticated session. Attributes: client (requests.Session): The HTTP session used for making requests. app_id (int): The GitHub App ID. app_private_rsa_key (str): The private RSA key for the GitHub App. default_installation_id (Optional[int]): The default installation ID for the GitHub App. hostname (Optional[str]): The GitHub hostname, defaults to None. installation_tokens (Dict[Any, Any]): A dictionary to store installation tokens. app_token (Dict[str, Any]): A dictionary to store the app token. """ def __init__( self, client: requests.Session, app_id: int, app_private_rsa_key: str, default_installation_id: Optional[int], hostname: Optional[str] = None, ) -> None: self.client = client self.app_private_rsa_key = app_private_rsa_key self.app_id = app_id self.default_installation_id = default_installation_id self.installation_tokens: Dict[Any, Any] = {} self.app_token: Dict[str, Any] = {} self.hostname = hostname def __set_app_token(self) -> None: # from https://developer.github.com/apps/building-github-apps/authenticating-with-github-apps/ # needing to self-sign a JWT now = int(time.time()) # JWT expiration time (10 minute maximum) expires = now + (10 * 60) encoded_token = jwt.encode( { # issued at time "iat": now, # JWT expiration time "exp": expires, # GitHub App's identifier "iss": self.app_id, }, self.app_private_rsa_key, algorithm="RS256", ) self.app_token = { "value": encoded_token, "expires": expires, } def __check_app_token(self) -> None: if ("expires" not in self.app_token) or ( self.app_token["expires"] < (int(time.time()) + 60) ): self.__set_app_token()
[docs] @public def get_installations(self, headers: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Retrieve the list of installations for the authenticated GitHub App. This method makes a GET request to the GitHub API to fetch the installations associated with the authenticated GitHub App. It ensures that the app token is valid and includes it in the request headers. Args: headers (Optional[Dict[str, Any]]): Optional headers to include in the request. Returns: Dict[str, Any]: A dictionary containing the installations data. Raises: requests.exceptions.HTTPError: If the request to the GitHub API fails. """ if headers is None: headers = {} self.__check_app_token() headers["Authorization"] = f"Bearer {self.app_token['value']}" headers["Accept"] = "application/vnd.github.machine-man-preview+json" request = self.client.get( ( "https://api.github.com/app/installations" if self.hostname is None else f"https://{self.hostname}/api/v3/app/installations" ), headers=headers, ) request.raise_for_status() return request.json()
def __set_installation_token( self, installation_id: int, headers: Optional[Dict[str, Any]] = None ) -> None: if headers is None: headers = {} self.__check_app_token() headers["Authorization"] = f"Bearer {self.app_token['value']}" headers["Accept"] = "application/vnd.github.machine-man-preview+json" request = requests.post( ( f"https://api.github.com/app/installations/{installation_id}/access_tokens" if self.hostname is None else f"https://{self.hostname}/api/v3/app/installations/{installation_id}/access_tokens" ), headers=headers, ) request.raise_for_status() auth = request.json() self.installation_tokens[installation_id] = { "value": auth["token"], "expires": to_seconds(datetime.strptime(auth["expires_at"], "%Y-%m-%dT%H:%M:%SZ")), } def __check_installation_tokens(self, installation_id: int) -> None: if (installation_id not in self.installation_tokens) or ( self.installation_tokens[installation_id]["expires"] < (int(time.time()) + 60) ): self.__set_installation_token(installation_id)
[docs] @public def execute( self, query: str, variables: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, Any]] = None, installation_id: Optional[int] = None, ) -> Dict[str, Any]: """Execute a GraphQL query against the GitHub API. This method sends a POST request to the GitHub API with the provided GraphQL query and optional variables. It ensures that the appropriate installation token is included in the request headers. Args: query (str): The GraphQL query string to be executed. variables (Optional[Dict[str, Any]]): Optional variables to include in the query. headers (Optional[Dict[str, Any]]): Optional headers to include in the request. installation_id (Optional[int]): The installation ID to use for authentication. Returns: Dict[str, Any]: The response data from the GitHub API. Raises: RuntimeError: If no installation ID is provided and no default installation ID is set. requests.exceptions.HTTPError: If the request to the GitHub API fails. """ if headers is None: headers = {} if installation_id is None: if self.default_installation_id: installation_id = self.default_installation_id else: raise RuntimeError("No installation_id provided") self.__check_installation_tokens(installation_id) headers["Authorization"] = f"token {self.installation_tokens[installation_id]['value']}" json: Dict[str, Any] = {"query": query} if variables: json["variables"] = variables request = requests.post( ( "https://api.github.com/graphql" if self.hostname is None else f"https://{self.hostname}/api/graphql" ), json=json, headers=headers, ) request.raise_for_status() if "errors" in request.json(): raise RuntimeError(request.json()["errors"]) return request.json()
[docs] @public def create_issue( self, repo_name: str, repo_owner: str, title: str, body: str, installation_id: Optional[int] = None, ) -> Dict[str, Any]: """Create a new issue in the specified GitHub repository. This method first retrieves the repository ID using the provided repository name and owner, then creates a new issue in that repository with the given title and body. Args: repo_name (str): The name of the repository where the issue will be created. repo_owner (str): The owner of the repository where the issue will be created. title (str): The title of the issue. body (str): The body content of the issue. installation_id (Optional[int]): The installation ID to use for authentication. Returns: Dict[str, Any]: The response data from the GitHub API containing the created issue details. Raises: RuntimeError: If there are errors in the response from the GitHub API. """ res = self.execute( query=GET_REPO_ID_QUERY, variables={"repo_name": repo_name, "repo_owner": repo_owner}, installation_id=installation_id, ) return self.execute( query=CREATE_ISSUE_MUTATION, variables={ "id": res["data"]["repository"]["id"], "title": title, "body": body, }, installation_id=installation_id, )
[docs] @public def create_ref( self, repo_name: str, repo_owner: str, source: str, target: str, installation_id=None, ) -> Dict[str, Any]: """Create a new reference (branch) in the specified GitHub repository. This method first retrieves the repository ID and the source reference (branch or tag) using the provided repository name, owner, and source reference. It then creates a new reference (branch) in that repository with the given target name. Args: repo_name (str): The name of the repository where the reference will be created. repo_owner (str): The owner of the repository where the reference will be created. source (str): The source reference (branch or tag) from which the new reference will be created. target (str): The name of the new reference (branch) to be created. installation_id (Optional[int]): The installation ID to use for authentication. Returns: Dict[str, Any]: The response data from the GitHub API containing the created reference details. Raises: RuntimeError: If there are errors in the response from the GitHub API. """ res = self.execute( query=GET_REPO_AND_REF_QUERY, variables={ "repo_name": repo_name, "repo_owner": repo_owner, "source": source, }, installation_id=installation_id, ) branch = self.execute( query=CREATE_REF_MUTATION, variables={ "id": res["data"]["repository"]["id"], "name": target, "oid": res["data"]["repository"]["ref"]["target"]["oid"], }, installation_id=installation_id, ) return branch
[docs] @public def create_pull_request( self, base_repo_name: str, base_repo_owner: str, base_ref_name: str, head_repo_name: str, head_repo_owner: str, head_ref_name: str, title: str, body: Optional[str] = None, maintainer_can_modify: Optional[bool] = None, draft: Optional[bool] = None, installation_id: Optional[int] = None, ) -> Dict[str, Any]: """Create a new pull request in the specified GitHub repository. This method creates a pull request from the head reference (branch) to the base reference (branch) in the specified repositories. It uses the provided title and body for the pull request description. Args: base_repo_name (str): The name of the base repository where the pull request will be created. base_repo_owner (str): The owner of the base repository. base_ref_name (str): The name of the base reference (branch) to which the changes will be merged. head_repo_name (str): The name of the head repository from which the changes will be taken. head_repo_owner (str): The owner of the head repository. head_ref_name (str): The name of the head reference (branch) from which the changes will be taken. title (str): The title of the pull request. body (Optional[str]): The body content of the pull request. Defaults to None. maintainer_can_modify (Optional[bool]): Whether maintainers can modify the pull request. Defaults to None. draft (Optional[bool]): Whether the pull request is a draft. Defaults to None. installation_id (Optional[int]): The installation ID to use for authentication. Returns: Dict[str, Any]: The response data from the GitHub API containing the created pull request details. Raises: RuntimeError: If there are errors in the response from the GitHub API. """ base = self.execute( query=GET_REPO_ID_QUERY, variables={"repo_name": base_repo_name, "repo_owner": base_repo_owner}, installation_id=installation_id, ) head = self.execute( query=GET_REPO_ID_QUERY, variables={"repo_name": head_repo_name, "repo_owner": head_repo_owner}, installation_id=installation_id, ) pull_request = self.execute( query=CREATE_PULL_REQUEST_MUTATION, variables={ "base_repo_id": base["data"]["repository"]["id"], "base_ref_name": base_ref_name, "head_repo_id": head["data"]["repository"]["id"], "head_ref_name": head_ref_name, "title": title, "body": body, "maintainer_can_modify": maintainer_can_modify, "draft": draft, }, installation_id=installation_id, ) return pull_request
[docs] class GithubResource(ConfigurableResource): """A resource configuration class for GitHub integration. This class provides configuration fields for setting up a GitHub Application, including the application ID, private RSA key, installation ID, and hostname. Attributes: github_app_id (int): The GitHub Application ID. For more information, see https://developer.github.com/apps/. github_app_private_rsa_key (str): The private RSA key text for the GitHub Application. For more information, see https://developer.github.com/apps/. github_installation_id (Optional[int]): The GitHub Application Installation ID. Defaults to None. For more information, see https://developer.github.com/apps/. github_hostname (Optional[str]): The GitHub hostname. Defaults to `api.github.com`. For more information, see https://developer.github.com/apps/. """ github_app_id: int = Field( description="Github Application ID, for more info see https://developer.github.com/apps/", ) github_app_private_rsa_key: str = Field( description=( "Github Application Private RSA key text, for more info see" " https://developer.github.com/apps/" ), ) github_installation_id: Optional[int] = Field( default=None, description=( "Github Application Installation ID, for more info see" " https://developer.github.com/apps/" ), ) github_hostname: Optional[str] = Field( default=None, description=( "Github hostname. Defaults to `api.github.com`, for more info see" " https://developer.github.com/apps/" ), ) @classmethod def _is_dagster_maintained(cls) -> bool: return True @public def get_client(self) -> GithubClient: """Instantiate and return a GitHub client. This method creates a new instance of `GithubClient` using the configuration attributes of the `GithubResource` instance. The client is initialized with an authenticated session and the necessary credentials for interacting with the GitHub API. Returns: GithubClient: An instance of `GithubClient` configured with the current resource settings. """ return GithubClient( client=requests.Session(), app_id=self.github_app_id, app_private_rsa_key=self.github_app_private_rsa_key, default_installation_id=self.github_installation_id, hostname=self.github_hostname, )
[docs] @dagster_maintained_resource @resource( config_schema=GithubResource.to_config_schema(), description="This resource is for connecting to Github", ) def github_resource(context) -> GithubClient: return GithubResource(**context.resource_config).get_client()