PII redaction in compute logs
In this example, we'll explore how to protect sensitive data by redacting personally identifiable information (PII) from Dagster compute logs. When your pipelines process customer data, logs may inadvertently capture sensitive information like emails, phone numbers, or social security numbers. A custom compute log manager can automatically redact this data before it's displayed in the UI.
Problem: Sensitive data in logs
Imagine your assets process customer records, and debugging statements or error messages include PII. Without protection, this sensitive data would be visible to anyone with access to the Dagster UI, potentially violating privacy regulations like GDPR or HIPAA.
| Solution | Best for |
|---|---|
| Redact on read | Preserving originals on disk for debugging while redacting in UI |
| Redact on write | Maximum security; PII never reaches disk; works seamlessly in cloud |
Consider an asset that logs customer information during processing:
import dagster as dg
@dg.asset
def process_customer_data() -> dg.MaterializeResult:
# This log output contains PII that will be redacted
print("Processing data for customer: John Doe")
print("Email: john.doe@example.com")
print("Phone: 555-123-4567")
print("SSN: 123-45-6789")
print("Credit Card: 4111-1111-1111-1111")
print("IP Address: 192.168.1.100")
return dg.MaterializeResult(metadata={"status": "processed"})
Without redaction, the logs would display all sensitive information:
Processing data for customer: John Doe
Email: john.doe@example.com
Phone: 555-123-4567
SSN: 123-45-6789
Credit Card: 4111-1111-1111-1111
IP Address: 192.168.1.100
Both solutions use a PII redactor that identifies and masks sensitive data using regex patterns for common PII types:
import re
# Define regex patterns for common PII types
PII_PATTERNS = {
"EMAIL": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"PHONE": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
"SSN": r"\b\d{3}-\d{2}-\d{4}\b",
"CREDIT_CARD": r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b",
"IP_ADDRESS": r"\b(?:\d{1,3}\.){3}\d{1,3}\b",
}
def redact_pii(text: str) -> str:
"""Redact PII from text using regex patterns."""
redacted = text
for pii_type, pattern in PII_PATTERNS.items():
redacted = re.sub(pattern, f"[{pii_type}]", redacted)
return redacted
Solution 1: Redact on read
The first approach redacts PII when logs are read for display, preserving the original unredacted logs on disk. This is useful when administrators need access to original logs for debugging or audit purposes, but requires additional handling in cloud deployments since files must be redacted before upload.
from collections.abc import Sequence
from typing import Optional
from dagster import Bool, Field, StringSource
from dagster._core.storage.captured_log_manager import CapturedLogData
from dagster._core.storage.compute_log_manager import ComputeIOType
from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager
from dagster._serdes import ConfigurableClass, ConfigurableClassData
from .pii_redactor import redact_pii
class PIIComputeLogManager(LocalComputeLogManager, ConfigurableClass):
"""A compute log manager that redacts PII from logs before displaying them."""
def __init__(
self,
base_dir: str = "compute_logs",
redact_for_ui: bool = True,
inst_data: Optional[ConfigurableClassData] = None,
):
super().__init__(base_dir)
self.redact_for_ui = redact_for_ui
self._inst_data = inst_data
@property
def inst_data(self) -> Optional[ConfigurableClassData]:
return self._inst_data
@classmethod
def config_type(cls):
return {
"base_dir": Field(
StringSource,
default_value="compute_logs",
description="Base directory for storing compute logs",
),
"redact_for_ui": Field(
Bool,
default_value=True,
description="Whether to redact PII for UI display",
),
}
@classmethod
def from_config_value(cls, inst_data: ConfigurableClassData, config_value):
return cls(inst_data=inst_data, **config_value)
def _redact_bytes(self, data: Optional[bytes]) -> Optional[bytes]:
"""Apply PII redaction to bytes data."""
if not data or not self.redact_for_ui:
return data
text = data.decode("utf-8", errors="replace")
redacted_text = redact_pii(text)
return redacted_text.encode("utf-8")
def get_log_data(
self,
log_key: Sequence[str],
cursor: Optional[str] = None,
max_bytes: Optional[int] = None,
) -> CapturedLogData:
"""Override to apply PII redaction when logs are read."""
original_data = super().get_log_data(log_key, cursor, max_bytes)
if not self.redact_for_ui:
return original_data
return CapturedLogData(
log_key=original_data.log_key,
stdout=self._redact_bytes(original_data.stdout),
stderr=self._redact_bytes(original_data.stderr),
cursor=original_data.cursor,
)
def get_log_data_for_type(
self,
log_key: Sequence[str],
io_type: ComputeIOType,
offset: int,
max_bytes: Optional[int],
) -> tuple[Optional[bytes], int]:
"""Override to apply PII redaction to log data chunks."""
data, new_offset = super().get_log_data_for_type(log_key, io_type, offset, max_bytes)
if self.redact_for_ui and data:
return self._redact_bytes(data), new_offset
return data, new_offset
Configure this approach in your dagster.yaml:
compute_logs:
module: your_project.pii_compute_log_manager
class: PIIComputeLogManager
config:
base_dir: compute_logs
redact_for_ui: true
Solution 2: Redact on write
For maximum security, redact PII as logs are written to disk. This ensures sensitive data never reaches storage and works seamlessly in cloud deployments since logs are pre-redacted. The trade-off is that original data cannot be recovered if needed for debugging.
from collections.abc import Iterator, Sequence
from contextlib import contextmanager
from typing import Optional
from dagster import Bool, Field, StringSource
from dagster._core.storage.captured_log_manager import CapturedLogContext
from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager
from dagster._serdes import ConfigurableClass, ConfigurableClassData
from .pii_redactor import redact_pii
class PIIRedactingStream:
"""A stream wrapper that redacts PII as data is written."""
def __init__(self, original_stream, encoding: str = "utf-8"):
self.original = original_stream
self.encoding = encoding
def write(self, data):
if isinstance(data, bytes):
text = data.decode(self.encoding, errors="replace")
redacted = redact_pii(text)
return self.original.write(redacted.encode(self.encoding))
elif isinstance(data, str) and data.strip():
redacted = redact_pii(data)
return self.original.write(redacted)
return self.original.write(data)
def flush(self):
return self.original.flush()
def fileno(self):
return self.original.fileno()
class PIIComputeLogManagerWrite(LocalComputeLogManager, ConfigurableClass):
"""A compute log manager that redacts PII when logs are written to disk."""
def __init__(
self,
base_dir: str = "compute_logs",
redact_on_write: bool = True,
inst_data: Optional[ConfigurableClassData] = None,
):
super().__init__(base_dir)
self.redact_on_write = redact_on_write
self._inst_data = inst_data
@property
def inst_data(self) -> Optional[ConfigurableClassData]:
return self._inst_data
@classmethod
def config_type(cls):
return {
"base_dir": Field(
StringSource,
default_value="compute_logs",
description="Base directory for storing compute logs",
),
"redact_on_write": Field(
Bool,
default_value=True,
description="Whether to redact PII when writing logs",
),
}
@classmethod
def from_config_value(cls, inst_data: ConfigurableClassData, config_value):
return cls(inst_data=inst_data, **config_value)
@contextmanager
def capture_logs(self, log_key: Sequence[str]) -> Iterator[CapturedLogContext]:
"""Override capture_logs to wrap streams with PII redaction."""
with super().capture_logs(log_key) as context:
if self.redact_on_write:
# Wrap the output streams with PII-redacting wrappers
original_out = context.out_fd
original_err = context.err_fd
context._out_fd = PIIRedactingStream(original_out)
context._err_fd = PIIRedactingStream(original_err)
yield context
Configure this approach in your dagster.yaml:
compute_logs:
module: your_project.pii_log_manager_write
class: PIIComputeLogManagerWrite
config:
base_dir: compute_logs
redact_on_write: true
For production deployments with stricter compliance requirements, consider using Microsoft Presidio for ML-based PII detection. Presidio provides higher accuracy and supports 30+ entity types including international formats. Replace the regex-based redact_pii function with:
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
def redact_pii(text: str) -> str:
results = analyzer.analyze(text=text, language="en")
anonymized = anonymizer.anonymize(text=text, analyzer_results=results)
return anonymized.text