Skip to main content

PII redaction in compute logs

In this example, we'll explore how to protect sensitive data by redacting personally identifiable information (PII) from Dagster compute logs. When your pipelines process customer data, logs may inadvertently capture sensitive information like emails, phone numbers, or social security numbers. A custom compute log manager can automatically redact this data before it's displayed in the UI.

Problem: Sensitive data in logs

Imagine your assets process customer records, and debugging statements or error messages include PII. Without protection, this sensitive data would be visible to anyone with access to the Dagster UI, potentially violating privacy regulations like GDPR or HIPAA.

SolutionBest for
Redact on readPreserving originals on disk for debugging while redacting in UI
Redact on writeMaximum security; PII never reaches disk; works seamlessly in cloud

Consider an asset that logs customer information during processing:

import dagster as dg


@dg.asset
def process_customer_data() -> dg.MaterializeResult:
# This log output contains PII that will be redacted
print("Processing data for customer: John Doe")
print("Email: john.doe@example.com")
print("Phone: 555-123-4567")
print("SSN: 123-45-6789")
print("Credit Card: 4111-1111-1111-1111")
print("IP Address: 192.168.1.100")

return dg.MaterializeResult(metadata={"status": "processed"})

Without redaction, the logs would display all sensitive information:

Processing data for customer: John Doe
Email: john.doe@example.com
Phone: 555-123-4567
SSN: 123-45-6789
Credit Card: 4111-1111-1111-1111
IP Address: 192.168.1.100

Both solutions use a PII redactor that identifies and masks sensitive data using regex patterns for common PII types:

import re

# Define regex patterns for common PII types
PII_PATTERNS = {
"EMAIL": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"PHONE": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
"SSN": r"\b\d{3}-\d{2}-\d{4}\b",
"CREDIT_CARD": r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b",
"IP_ADDRESS": r"\b(?:\d{1,3}\.){3}\d{1,3}\b",
}


def redact_pii(text: str) -> str:
"""Redact PII from text using regex patterns."""
redacted = text
for pii_type, pattern in PII_PATTERNS.items():
redacted = re.sub(pattern, f"[{pii_type}]", redacted)
return redacted

Solution 1: Redact on read

The first approach redacts PII when logs are read for display, preserving the original unredacted logs on disk. This is useful when administrators need access to original logs for debugging or audit purposes, but requires additional handling in cloud deployments since files must be redacted before upload.

from collections.abc import Sequence
from typing import Optional

from dagster import Bool, Field, StringSource
from dagster._core.storage.captured_log_manager import CapturedLogData
from dagster._core.storage.compute_log_manager import ComputeIOType
from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager
from dagster._serdes import ConfigurableClass, ConfigurableClassData

from .pii_redactor import redact_pii


class PIIComputeLogManager(LocalComputeLogManager, ConfigurableClass):
"""A compute log manager that redacts PII from logs before displaying them."""

def __init__(
self,
base_dir: str = "compute_logs",
redact_for_ui: bool = True,
inst_data: Optional[ConfigurableClassData] = None,
):
super().__init__(base_dir)
self.redact_for_ui = redact_for_ui
self._inst_data = inst_data

@property
def inst_data(self) -> Optional[ConfigurableClassData]:
return self._inst_data

@classmethod
def config_type(cls):
return {
"base_dir": Field(
StringSource,
default_value="compute_logs",
description="Base directory for storing compute logs",
),
"redact_for_ui": Field(
Bool,
default_value=True,
description="Whether to redact PII for UI display",
),
}

@classmethod
def from_config_value(cls, inst_data: ConfigurableClassData, config_value):
return cls(inst_data=inst_data, **config_value)

def _redact_bytes(self, data: Optional[bytes]) -> Optional[bytes]:
"""Apply PII redaction to bytes data."""
if not data or not self.redact_for_ui:
return data
text = data.decode("utf-8", errors="replace")
redacted_text = redact_pii(text)
return redacted_text.encode("utf-8")

def get_log_data(
self,
log_key: Sequence[str],
cursor: Optional[str] = None,
max_bytes: Optional[int] = None,
) -> CapturedLogData:
"""Override to apply PII redaction when logs are read."""
original_data = super().get_log_data(log_key, cursor, max_bytes)

if not self.redact_for_ui:
return original_data

return CapturedLogData(
log_key=original_data.log_key,
stdout=self._redact_bytes(original_data.stdout),
stderr=self._redact_bytes(original_data.stderr),
cursor=original_data.cursor,
)


def get_log_data_for_type(
self,
log_key: Sequence[str],
io_type: ComputeIOType,
offset: int,
max_bytes: Optional[int],
) -> tuple[Optional[bytes], int]:
"""Override to apply PII redaction to log data chunks."""
data, new_offset = super().get_log_data_for_type(log_key, io_type, offset, max_bytes)

if self.redact_for_ui and data:
return self._redact_bytes(data), new_offset

return data, new_offset

Configure this approach in your dagster.yaml:

compute_logs:
module: your_project.pii_compute_log_manager
class: PIIComputeLogManager
config:
base_dir: compute_logs
redact_for_ui: true

Solution 2: Redact on write

For maximum security, redact PII as logs are written to disk. This ensures sensitive data never reaches storage and works seamlessly in cloud deployments since logs are pre-redacted. The trade-off is that original data cannot be recovered if needed for debugging.

from collections.abc import Iterator, Sequence
from contextlib import contextmanager
from typing import Optional

from dagster import Bool, Field, StringSource
from dagster._core.storage.captured_log_manager import CapturedLogContext
from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager
from dagster._serdes import ConfigurableClass, ConfigurableClassData

from .pii_redactor import redact_pii


class PIIRedactingStream:
"""A stream wrapper that redacts PII as data is written."""

def __init__(self, original_stream, encoding: str = "utf-8"):
self.original = original_stream
self.encoding = encoding

def write(self, data):
if isinstance(data, bytes):
text = data.decode(self.encoding, errors="replace")
redacted = redact_pii(text)
return self.original.write(redacted.encode(self.encoding))
elif isinstance(data, str) and data.strip():
redacted = redact_pii(data)
return self.original.write(redacted)
return self.original.write(data)

def flush(self):
return self.original.flush()

def fileno(self):
return self.original.fileno()




class PIIComputeLogManagerWrite(LocalComputeLogManager, ConfigurableClass):
"""A compute log manager that redacts PII when logs are written to disk."""

def __init__(
self,
base_dir: str = "compute_logs",
redact_on_write: bool = True,
inst_data: Optional[ConfigurableClassData] = None,
):
super().__init__(base_dir)
self.redact_on_write = redact_on_write
self._inst_data = inst_data

@property
def inst_data(self) -> Optional[ConfigurableClassData]:
return self._inst_data

@classmethod
def config_type(cls):
return {
"base_dir": Field(
StringSource,
default_value="compute_logs",
description="Base directory for storing compute logs",
),
"redact_on_write": Field(
Bool,
default_value=True,
description="Whether to redact PII when writing logs",
),
}

@classmethod
def from_config_value(cls, inst_data: ConfigurableClassData, config_value):
return cls(inst_data=inst_data, **config_value)

@contextmanager
def capture_logs(self, log_key: Sequence[str]) -> Iterator[CapturedLogContext]:
"""Override capture_logs to wrap streams with PII redaction."""
with super().capture_logs(log_key) as context:
if self.redact_on_write:
# Wrap the output streams with PII-redacting wrappers
original_out = context.out_fd
original_err = context.err_fd

context._out_fd = PIIRedactingStream(original_out)
context._err_fd = PIIRedactingStream(original_err)

yield context

Configure this approach in your dagster.yaml:

compute_logs:
module: your_project.pii_log_manager_write
class: PIIComputeLogManagerWrite
config:
base_dir: compute_logs
redact_on_write: true
note

For production deployments with stricter compliance requirements, consider using Microsoft Presidio for ML-based PII detection. Presidio provides higher accuracy and supports 30+ entity types including international formats. Replace the regex-based redact_pii function with:

from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine

analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()

def redact_pii(text: str) -> str:
results = analyzer.analyze(text=text, language="en")
anonymized = anonymizer.anonymize(text=text, analyzer_results=results)
return anonymized.text