mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-21 03:39:54 +00:00
544c31b50b
* perf(config): add load_config_readonly() fast path for hot agent loop
`load_config()` is called from the agent loop's per-API-call hot path via
`get_provider_request_timeout()` and `get_provider_stale_timeout()` —
both invoked once per turn from `_resolved_api_call_timeout()` in
run_agent.py.
Profiling a synthetic 20-tool-call agent run revealed:
- 21 invocations of `load_config()` cumulating 56ms (~17% of agent loop)
- 34,398 deepcopy calls totaling 37ms (config defensive deepcopy + chain)
- 8,652 `_expand_env_vars` invocations (~412 per turn)
Microbench (cache-hit, real config.yaml present):
load_config() 265us/call (125us deepcopy + 140us infra)
load_config_readonly() 138us/call (~48% faster)
`load_config_readonly()` returns the cached dict directly without the
defensive deepcopy. Documented contract: caller must not mutate. Returns
plain dict (not MappingProxyType) so downstream `isinstance(x, dict)`
guards keep working — caught during initial implementation when
MappingProxyType broke get_provider_request_timeout's guard logic.
Wired into hermes_cli/timeouts.py (the two functions called per agent
turn). load_config() is unchanged for the 263 other call sites that
mutate the result before save_config(), are not in the hot path, or
where the safety guarantee matters more than the perf.
Profile A/B (cached config, 21-turn agent loop):
BEFORE AFTER delta
get_provider_request_timeout 55ms 16ms -71%
total function calls 399k 160k -60%
deepcopy calls (in hotspots) 34,398 ~0 ~elim
Verified:
- isinstance(load_config_readonly(), dict) is True
- timeout/stale resolutions correct
- load_config() still returns isolated mutable deepcopies
- tests/hermes_cli/test_config*.py / test_timeouts.py: 102/102 pass
- tests/cli/ + tests/agent/test_auxiliary_client.py: 883/883 pass
* perf(redact): substring pre-screens skip non-matching regex chains
Every log record passes through `RedactingFormatter.format` which calls
`redact_sensitive_text`, which historically ran ALL 13 secret-pattern
regexes against every line — including DB connection strings, JWTs,
Discord mentions, Signal phone numbers, etc. — even for typical clean
log records like 'INFO run_agent: API call completed'.
Add cheap substring pre-checks before each regex pass. False positives
still run the regex (which then matches nothing); false negatives are
impossible because every pattern requires the gated substring to match
its leading anchor:
- `_PREFIX_RE` gated on any of 33 known credential prefix substrings
- `_ENV_ASSIGN_RE` gated on `=` in text
- `_JSON_FIELD_RE` gated on `:` and `"` in text
- `_AUTH_HEADER_RE` gated on `uthorization`/`UTHORIZATION` in text
- `_TELEGRAM_RE` gated on `:` in text
- `_PRIVATE_KEY_RE` gated on `BEGIN` and `-----`
- `_DB_CONNSTR_RE` gated on `://` in text
- `_JWT_RE` gated on `eyJ` in text
- URL userinfo/query gated on `://`
- `_redact_form_body` gated on `&` and `=`
- `_DISCORD_MENTION_RE` gated on `<@`
- `_SIGNAL_PHONE_RE` gated on `+`
Microbench (5 typical log records, 20k iterations each):
BEFORE AFTER delta
redact_sensitive_text per call 5.63us 1.79us -68%
Real-world impact: ~244 log records emitted in a 30-turn agent loop, so
the chain saves ~1ms of CPU per conversation. Bigger win is the
reduction in regex execution and GC pressure during heavy logging
sessions (verbose logging, gateway message processing).
Security regression test: 30 secret-containing inputs (sk-/ghp_/JWT/DB
connstr/Auth-Bearer/private key/URL userinfo/Discord/Signal/etc.)
verified to produce identical redacted output before/after. All 75
existing tests/agent/test_redact.py cases pass.
The `?access_token=foo&code=bar` (bare query string, no scheme) case
that 'leaks' is pre-existing behavior — the URL query redaction
requires a well-formed URL with scheme+host. Not a regression.
* perf(run_agent): cache _needs_thinking_reasoning_pad result per (provider, model, base_url)
Profile of a 31-turn synthetic agent run shows `_needs_thinking_reasoning_pad`
fires 495 times (~16 per turn) and each call ran 3 helper methods, each
hitting `base_url_host_matches` 1-4 times via `urlparse`. Total cost:
3,342 base_url_host_matches calls + 3,373 urlparse calls accounting for
~36ms of agent-loop overhead (~7% of the entire post-network work).
Provider / model / base_url don't change during a conversation except via
`switch_model` and fallback activation — both of which already overwrite
those attributes atomically. Cache the result on a tuple key; since the
key is derived from the very fields that would change, the cache
auto-invalidates on the next read after a switch. No manual invalidation
needed in switch_model / _try_activate_fallback.
Profile A/B (31-turn cached-config agent run):
BEFORE AFTER delta
_needs_thinking_reasoning_pad cum 18ms 1ms -94%
_copy_reasoning_content_for_api cum 17ms 1ms -94%
base_url_host_matches calls 3,342 372 -89%
urlparse calls 3,373 403 -88%
total function calls 296k 223k -25%
Verified:
- tests/run_agent/test_deepseek_reasoning_content_echo.py: 36/36 pass
- tests/run_agent/ (full): 1383/1383 pass + 3 skipped
468 lines
18 KiB
Python
468 lines
18 KiB
Python
"""Regex-based secret redaction for logs and tool output.
|
|
|
|
Applies pattern matching to mask API keys, tokens, and credentials
|
|
before they reach log files, verbose output, or gateway logs.
|
|
|
|
Short tokens (< 18 chars) are fully masked. Longer tokens preserve
|
|
the first 6 and last 4 characters for debuggability.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Sensitive query-string parameter names (case-insensitive exact match).
|
|
# Ported from nearai/ironclaw#2529 — catches tokens whose values don't match
|
|
# any known vendor prefix regex (e.g. opaque tokens, short OAuth codes).
|
|
_SENSITIVE_QUERY_PARAMS = frozenset({
|
|
"access_token",
|
|
"refresh_token",
|
|
"id_token",
|
|
"token",
|
|
"api_key",
|
|
"apikey",
|
|
"client_secret",
|
|
"password",
|
|
"auth",
|
|
"jwt",
|
|
"session",
|
|
"secret",
|
|
"key",
|
|
"code", # OAuth authorization codes
|
|
"signature", # pre-signed URL signatures
|
|
"x-amz-signature",
|
|
})
|
|
|
|
# Sensitive form-urlencoded / JSON body key names (case-insensitive exact match).
|
|
# Exact match, NOT substring — "token_count" and "session_id" must NOT match.
|
|
# Ported from nearai/ironclaw#2529.
|
|
_SENSITIVE_BODY_KEYS = frozenset({
|
|
"access_token",
|
|
"refresh_token",
|
|
"id_token",
|
|
"token",
|
|
"api_key",
|
|
"apikey",
|
|
"client_secret",
|
|
"password",
|
|
"auth",
|
|
"jwt",
|
|
"secret",
|
|
"private_key",
|
|
"authorization",
|
|
"key",
|
|
})
|
|
|
|
# Snapshot at import time so runtime env mutations (e.g. LLM-generated
|
|
# `export HERMES_REDACT_SECRETS=false`) cannot disable redaction
|
|
# mid-session. ON by default — secure default per issue #17691. Users who
|
|
# need raw credential values in tool output (e.g. working on the redactor
|
|
# itself) can opt out via `security.redact_secrets: false` in config.yaml
|
|
# (bridged to this env var in hermes_cli/main.py, gateway/run.py, and
|
|
# cli.py) or `HERMES_REDACT_SECRETS=false` in ~/.hermes/.env. An opt-out
|
|
# warning is logged at gateway and CLI startup so operators see the
|
|
# downgrade — see `_log_redaction_status()` in gateway/run.py and cli.py.
|
|
_REDACT_ENABLED = os.getenv("HERMES_REDACT_SECRETS", "true").lower() in {"1", "true", "yes", "on"}
|
|
|
|
# Known API key prefixes -- match the prefix + contiguous token chars
|
|
_PREFIX_PATTERNS = [
|
|
r"sk-[A-Za-z0-9_-]{10,}", # OpenAI / OpenRouter / Anthropic (sk-ant-*)
|
|
r"ghp_[A-Za-z0-9]{10,}", # GitHub PAT (classic)
|
|
r"github_pat_[A-Za-z0-9_]{10,}", # GitHub PAT (fine-grained)
|
|
r"gho_[A-Za-z0-9]{10,}", # GitHub OAuth access token
|
|
r"ghu_[A-Za-z0-9]{10,}", # GitHub user-to-server token
|
|
r"ghs_[A-Za-z0-9]{10,}", # GitHub server-to-server token
|
|
r"ghr_[A-Za-z0-9]{10,}", # GitHub refresh token
|
|
r"xox[baprs]-[A-Za-z0-9-]{10,}", # Slack tokens
|
|
r"AIza[A-Za-z0-9_-]{30,}", # Google API keys
|
|
r"pplx-[A-Za-z0-9]{10,}", # Perplexity
|
|
r"fal_[A-Za-z0-9_-]{10,}", # Fal.ai
|
|
r"fc-[A-Za-z0-9]{10,}", # Firecrawl
|
|
r"bb_live_[A-Za-z0-9_-]{10,}", # BrowserBase
|
|
r"gAAAA[A-Za-z0-9_=-]{20,}", # Codex encrypted tokens
|
|
r"AKIA[A-Z0-9]{16}", # AWS Access Key ID
|
|
r"sk_live_[A-Za-z0-9]{10,}", # Stripe secret key (live)
|
|
r"sk_test_[A-Za-z0-9]{10,}", # Stripe secret key (test)
|
|
r"rk_live_[A-Za-z0-9]{10,}", # Stripe restricted key
|
|
r"SG\.[A-Za-z0-9_-]{10,}", # SendGrid API key
|
|
r"hf_[A-Za-z0-9]{10,}", # HuggingFace token
|
|
r"r8_[A-Za-z0-9]{10,}", # Replicate API token
|
|
r"npm_[A-Za-z0-9]{10,}", # npm access token
|
|
r"pypi-[A-Za-z0-9_-]{10,}", # PyPI API token
|
|
r"dop_v1_[A-Za-z0-9]{10,}", # DigitalOcean PAT
|
|
r"doo_v1_[A-Za-z0-9]{10,}", # DigitalOcean OAuth
|
|
r"am_[A-Za-z0-9_-]{10,}", # AgentMail API key
|
|
r"sk_[A-Za-z0-9_]{10,}", # ElevenLabs TTS key (sk_ underscore, not sk- dash)
|
|
r"tvly-[A-Za-z0-9]{10,}", # Tavily search API key
|
|
r"exa_[A-Za-z0-9]{10,}", # Exa search API key
|
|
r"gsk_[A-Za-z0-9]{10,}", # Groq Cloud API key
|
|
r"syt_[A-Za-z0-9]{10,}", # Matrix access token
|
|
r"retaindb_[A-Za-z0-9]{10,}", # RetainDB API key
|
|
r"hsk-[A-Za-z0-9]{10,}", # Hindsight API key
|
|
r"mem0_[A-Za-z0-9]{10,}", # Mem0 Platform API key
|
|
r"brv_[A-Za-z0-9]{10,}", # ByteRover API key
|
|
r"xai-[A-Za-z0-9]{30,}", # xAI (Grok) API key
|
|
]
|
|
|
|
# ENV assignment patterns: KEY=value where KEY contains a secret-like name
|
|
_SECRET_ENV_NAMES = r"(?:API_?KEY|TOKEN|SECRET|PASSWORD|PASSWD|CREDENTIAL|AUTH)"
|
|
_ENV_ASSIGN_RE = re.compile(
|
|
rf"([A-Z0-9_]{{0,50}}{_SECRET_ENV_NAMES}[A-Z0-9_]{{0,50}})\s*=\s*(['\"]?)(\S+)\2",
|
|
)
|
|
|
|
# JSON field patterns: "apiKey": "value", "token": "value", etc.
|
|
_JSON_KEY_NAMES = r"(?:api_?[Kk]ey|token|secret|password|access_token|refresh_token|auth_token|bearer|secret_value|raw_secret|secret_input|key_material)"
|
|
_JSON_FIELD_RE = re.compile(
|
|
rf'("{_JSON_KEY_NAMES}")\s*:\s*"([^"]+)"',
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Authorization headers
|
|
_AUTH_HEADER_RE = re.compile(
|
|
r"(Authorization:\s*Bearer\s+)(\S+)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Telegram bot tokens: bot<digits>:<token> or <digits>:<token>,
|
|
# where token part is restricted to [-A-Za-z0-9_] and length >= 30
|
|
_TELEGRAM_RE = re.compile(
|
|
r"(bot)?(\d{8,}):([-A-Za-z0-9_]{30,})",
|
|
)
|
|
|
|
# Private key blocks: -----BEGIN RSA PRIVATE KEY----- ... -----END RSA PRIVATE KEY-----
|
|
_PRIVATE_KEY_RE = re.compile(
|
|
r"-----BEGIN[A-Z ]*PRIVATE KEY-----[\s\S]*?-----END[A-Z ]*PRIVATE KEY-----"
|
|
)
|
|
|
|
# Database connection strings: protocol://user:PASSWORD@host
|
|
# Catches postgres, mysql, mongodb, redis, amqp URLs and redacts the password
|
|
_DB_CONNSTR_RE = re.compile(
|
|
r"((?:postgres(?:ql)?|mysql|mongodb(?:\+srv)?|redis|amqp)://[^:]+:)([^@]+)(@)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# JWT tokens: header.payload[.signature] — always start with "eyJ" (base64 for "{")
|
|
# Matches 1-part (header only), 2-part (header.payload), and full 3-part JWTs.
|
|
_JWT_RE = re.compile(
|
|
r"eyJ[A-Za-z0-9_-]{10,}" # Header (always starts with eyJ)
|
|
r"(?:\.[A-Za-z0-9_=-]{4,}){0,2}" # Optional payload and/or signature
|
|
)
|
|
|
|
# Discord user/role mentions: <@123456789012345678> or <@!123456789012345678>
|
|
# Snowflake IDs are 17-20 digit integers that resolve to specific Discord accounts.
|
|
_DISCORD_MENTION_RE = re.compile(r"<@!?(\d{17,20})>")
|
|
|
|
# E.164 phone numbers: +<country><number>, 7-15 digits
|
|
# Negative lookahead prevents matching hex strings or identifiers
|
|
_SIGNAL_PHONE_RE = re.compile(r"(\+[1-9]\d{6,14})(?![A-Za-z0-9])")
|
|
|
|
# URLs containing query strings — matches `scheme://...?...[# or end]`.
|
|
# Used to scan text for URLs whose query params may contain secrets.
|
|
# Ported from nearai/ironclaw#2529.
|
|
_URL_WITH_QUERY_RE = re.compile(
|
|
r"(https?|wss?|ftp)://" # scheme
|
|
r"([^\s/?#]+)" # authority (may include userinfo)
|
|
r"([^\s?#]*)" # path
|
|
r"\?([^\s#]+)" # query (required)
|
|
r"(#\S*)?", # optional fragment
|
|
)
|
|
|
|
# URLs containing userinfo — `scheme://user:password@host` for ANY scheme
|
|
# (not just DB protocols already covered by _DB_CONNSTR_RE above).
|
|
# Catches things like `https://user:token@api.example.com/v1/foo`.
|
|
_URL_USERINFO_RE = re.compile(
|
|
r"(https?|wss?|ftp)://([^/\s:@]+):([^/\s@]+)@",
|
|
)
|
|
|
|
# Form-urlencoded body detection: conservative — only applies when the entire
|
|
# text looks like a query string (k=v&k=v pattern with no newlines).
|
|
_FORM_BODY_RE = re.compile(
|
|
r"^[A-Za-z_][A-Za-z0-9_.-]*=[^&\s]*(?:&[A-Za-z_][A-Za-z0-9_.-]*=[^&\s]*)+$"
|
|
)
|
|
|
|
# Compile known prefix patterns into one alternation
|
|
_PREFIX_RE = re.compile(
|
|
r"(?<![A-Za-z0-9_-])(" + "|".join(_PREFIX_PATTERNS) + r")(?![A-Za-z0-9_-])"
|
|
)
|
|
|
|
|
|
def mask_secret(
|
|
value: str,
|
|
*,
|
|
head: int = 4,
|
|
tail: int = 4,
|
|
floor: int = 12,
|
|
placeholder: str = "***",
|
|
empty: str = "",
|
|
) -> str:
|
|
"""Mask a secret for display, preserving ``head`` and ``tail`` characters.
|
|
|
|
Canonical helper for display-time redaction across Hermes — used by
|
|
``hermes config``, ``hermes status``, ``hermes dump``, and anywhere
|
|
a secret needs to be shown truncated for debuggability while still
|
|
keeping the bulk hidden.
|
|
|
|
Args:
|
|
value: The secret to mask. ``None``/empty returns ``empty``.
|
|
head: Leading characters to preserve. Default 4.
|
|
tail: Trailing characters to preserve. Default 4.
|
|
floor: Values shorter than ``head + tail + floor_margin`` are
|
|
fully masked (returns ``placeholder``). Default 12 —
|
|
matches the existing config/status/dump convention.
|
|
placeholder: Value returned for too-short inputs. Default ``"***"``.
|
|
empty: Value returned when ``value`` is falsy (None, ""). The
|
|
caller can override this to e.g. ``color("(not set)",
|
|
Colors.DIM)`` for user-facing display.
|
|
|
|
Examples:
|
|
>>> mask_secret("sk-proj-abcdef1234567890")
|
|
'sk-p...7890'
|
|
>>> mask_secret("short") # fully masked
|
|
'***'
|
|
>>> mask_secret("") # empty default
|
|
''
|
|
>>> mask_secret("", empty="(not set)") # empty override
|
|
'(not set)'
|
|
>>> mask_secret("long-token", head=6, tail=4, floor=18)
|
|
'***'
|
|
"""
|
|
if not value:
|
|
return empty
|
|
if len(value) < floor:
|
|
return placeholder
|
|
return f"{value[:head]}...{value[-tail:]}"
|
|
|
|
|
|
def _mask_token(token: str) -> str:
|
|
"""Mask a log token — conservative 18-char floor, preserves 6 prefix / 4 suffix."""
|
|
# Empty input: historically this returned "***" rather than "". Preserve.
|
|
if not token:
|
|
return "***"
|
|
return mask_secret(token, head=6, tail=4, floor=18)
|
|
|
|
|
|
def _redact_query_string(query: str) -> str:
|
|
"""Redact sensitive parameter values in a URL query string.
|
|
|
|
Handles `k=v&k=v` format. Sensitive keys (case-insensitive) have values
|
|
replaced with `***`. Non-sensitive keys pass through unchanged.
|
|
Empty or malformed pairs are preserved as-is.
|
|
"""
|
|
if not query:
|
|
return query
|
|
parts = []
|
|
for pair in query.split("&"):
|
|
if "=" not in pair:
|
|
parts.append(pair)
|
|
continue
|
|
key, _, value = pair.partition("=")
|
|
if key.lower() in _SENSITIVE_QUERY_PARAMS:
|
|
parts.append(f"{key}=***")
|
|
else:
|
|
parts.append(pair)
|
|
return "&".join(parts)
|
|
|
|
|
|
def _redact_url_query_params(text: str) -> str:
|
|
"""Scan text for URLs with query strings and redact sensitive params.
|
|
|
|
Catches opaque tokens that don't match vendor prefix regexes, e.g.
|
|
`https://example.com/cb?code=ABC123&state=xyz` → `...?code=***&state=xyz`.
|
|
"""
|
|
def _sub(m: re.Match) -> str:
|
|
scheme = m.group(1)
|
|
authority = m.group(2)
|
|
path = m.group(3)
|
|
query = _redact_query_string(m.group(4))
|
|
fragment = m.group(5) or ""
|
|
return f"{scheme}://{authority}{path}?{query}{fragment}"
|
|
return _URL_WITH_QUERY_RE.sub(_sub, text)
|
|
|
|
|
|
def _redact_url_userinfo(text: str) -> str:
|
|
"""Strip `user:password@` from HTTP/WS/FTP URLs.
|
|
|
|
DB protocols (postgres, mysql, mongodb, redis, amqp) are handled
|
|
separately by `_DB_CONNSTR_RE`.
|
|
"""
|
|
return _URL_USERINFO_RE.sub(
|
|
lambda m: f"{m.group(1)}://{m.group(2)}:***@",
|
|
text,
|
|
)
|
|
|
|
|
|
def _redact_form_body(text: str) -> str:
|
|
"""Redact sensitive values in a form-urlencoded body.
|
|
|
|
Only applies when the entire input looks like a pure form body
|
|
(k=v&k=v with no newlines, no other text). Single-line non-form
|
|
text passes through unchanged. This is a conservative pass — the
|
|
`_redact_url_query_params` function handles embedded query strings.
|
|
"""
|
|
if not text or "\n" in text or "&" not in text:
|
|
return text
|
|
# The body-body form check is strict: only trigger on clean k=v&k=v.
|
|
if not _FORM_BODY_RE.match(text.strip()):
|
|
return text
|
|
return _redact_query_string(text.strip())
|
|
|
|
|
|
def redact_sensitive_text(text: str, *, force: bool = False, code_file: bool = False) -> str:
|
|
"""Apply all redaction patterns to a block of text.
|
|
|
|
Safe to call on any string -- non-matching text passes through unchanged.
|
|
Disabled by default — enable via security.redact_secrets: true in config.yaml.
|
|
Set force=True for safety boundaries that must never return raw secrets
|
|
regardless of the user's global logging redaction preference.
|
|
|
|
Set code_file=True to skip the ENV-assignment and JSON-field regex
|
|
patterns when the text is known to be source code (e.g. MAX_TOKENS=***
|
|
constants, "apiKey": "test" fixtures). Prefix patterns, auth headers,
|
|
private keys, DB connstrings, JWTs, and URL secrets are still redacted.
|
|
|
|
Performance: each regex pattern is gated behind a cheap substring
|
|
pre-check (e.g. ``"=" in text`` for ENV assignments, ``"://" in text``
|
|
for URLs, ``"eyJ" in text`` for JWTs). On a typical hermes log line
|
|
(no secrets) this drops the 13-pattern scan from ~5.6us to ~1.8us per
|
|
record (-68%). The pre-checks are conservative — false positives
|
|
still run the full regex, which then doesn't match. False negatives
|
|
are impossible because every regex requires the gated substring to
|
|
match.
|
|
"""
|
|
if text is None:
|
|
return None
|
|
if not isinstance(text, str):
|
|
text = str(text)
|
|
if not text:
|
|
return text
|
|
if not (force or _REDACT_ENABLED):
|
|
return text
|
|
|
|
# Known prefixes (sk-, ghp_, etc.) — gate on substring presence
|
|
if _has_known_prefix_substring(text):
|
|
text = _PREFIX_RE.sub(lambda m: _mask_token(m.group(1)), text)
|
|
|
|
# ENV assignments: OPENAI_API_KEY=*** (skip for code files — false positives)
|
|
if not code_file:
|
|
if "=" in text:
|
|
def _redact_env(m):
|
|
name, quote, value = m.group(1), m.group(2), m.group(3)
|
|
return f"{name}={quote}{_mask_token(value)}{quote}"
|
|
text = _ENV_ASSIGN_RE.sub(_redact_env, text)
|
|
|
|
# JSON fields: "apiKey": "***" (skip for code files — false positives)
|
|
if ":" in text and '"' in text:
|
|
def _redact_json(m):
|
|
key, value = m.group(1), m.group(2)
|
|
return f'{key}: "{_mask_token(value)}"'
|
|
text = _JSON_FIELD_RE.sub(_redact_json, text)
|
|
|
|
# Authorization headers — _AUTH_HEADER_RE is "Authorization: Bearer ..."
|
|
# case-insensitive, so "uthorization" is the cheapest substring gate that
|
|
# covers both "Authorization" and "authorization" without a casefold().
|
|
if "uthorization" in text or "UTHORIZATION" in text:
|
|
text = _AUTH_HEADER_RE.sub(
|
|
lambda m: m.group(1) + _mask_token(m.group(2)),
|
|
text,
|
|
)
|
|
|
|
# Telegram bot tokens — pattern requires ":<token>" with digits prefix
|
|
if ":" in text:
|
|
def _redact_telegram(m):
|
|
prefix = m.group(1) or ""
|
|
digits = m.group(2)
|
|
return f"{prefix}{digits}:***"
|
|
text = _TELEGRAM_RE.sub(_redact_telegram, text)
|
|
|
|
# Private key blocks
|
|
if "BEGIN" in text and "-----" in text:
|
|
text = _PRIVATE_KEY_RE.sub("[REDACTED PRIVATE KEY]", text)
|
|
|
|
# Database connection string passwords
|
|
if "://" in text:
|
|
text = _DB_CONNSTR_RE.sub(lambda m: f"{m.group(1)}***{m.group(3)}", text)
|
|
|
|
# JWT tokens (eyJ... — base64-encoded JSON headers)
|
|
if "eyJ" in text:
|
|
text = _JWT_RE.sub(lambda m: _mask_token(m.group(0)), text)
|
|
|
|
# URL userinfo (http(s)://user:pass@host) — redact for non-DB schemes.
|
|
# DB schemes are handled above by _DB_CONNSTR_RE.
|
|
if "://" in text:
|
|
text = _redact_url_userinfo(text)
|
|
|
|
# URL query params containing opaque tokens (?access_token=…&code=…)
|
|
if "?" in text:
|
|
text = _redact_url_query_params(text)
|
|
|
|
# Form-urlencoded bodies (only triggers on clean k=v&k=v inputs).
|
|
if "&" in text and "=" in text:
|
|
text = _redact_form_body(text)
|
|
|
|
# Discord user/role mentions (<@snowflake_id>)
|
|
if "<@" in text:
|
|
text = _DISCORD_MENTION_RE.sub(lambda m: f"<@{'!' if '!' in m.group(0) else ''}***>", text)
|
|
|
|
# E.164 phone numbers (Signal, WhatsApp)
|
|
if "+" in text:
|
|
def _redact_phone(m):
|
|
phone = m.group(1)
|
|
if len(phone) <= 8:
|
|
return phone[:2] + "****" + phone[-2:]
|
|
return phone[:4] + "****" + phone[-4:]
|
|
text = _SIGNAL_PHONE_RE.sub(_redact_phone, text)
|
|
|
|
return text
|
|
|
|
|
|
# Substrings used to gate ``_PREFIX_RE`` execution. If none of these appear in
|
|
# the input string, the prefix regex cannot match anything, so we skip it.
|
|
# False positives are fine (they just run the regex, which then matches
|
|
# nothing) — the bound is "no false negatives" and that holds because every
|
|
# pattern in ``_PREFIX_PATTERNS`` has at least one of these as a literal
|
|
# substring of its leading characters.
|
|
#
|
|
# Derived automatically from ``_PREFIX_PATTERNS`` at module load time so a
|
|
# future PR that adds a new prefix to the regex list can't silently break
|
|
# the screen.
|
|
|
|
def _extract_literal_prefix(pattern: str) -> str:
|
|
"""Return the leading literal characters of a regex pattern.
|
|
|
|
Stops at the first regex metacharacter (``[``, ``(``, ``\\``, ``.``,
|
|
``?``, ``*``, ``+``, ``|``, ``{``, ``^``, ``$``). Returns the literal
|
|
that any match of the pattern MUST contain as a substring, so the
|
|
pre-screen never produces false negatives.
|
|
"""
|
|
meta = "[(\\.?*+|{^$"
|
|
for i, ch in enumerate(pattern):
|
|
if ch in meta:
|
|
return pattern[:i]
|
|
return pattern
|
|
|
|
|
|
_PREFIX_SUBSTRINGS = tuple(
|
|
_extract_literal_prefix(p) for p in _PREFIX_PATTERNS
|
|
)
|
|
|
|
|
|
def _has_known_prefix_substring(text: str) -> bool:
|
|
"""Return True if ``text`` contains any known credential prefix substring.
|
|
|
|
Used as a cheap pre-check before invoking the expensive ``_PREFIX_RE``.
|
|
"""
|
|
return any(p in text for p in _PREFIX_SUBSTRINGS)
|
|
|
|
|
|
class RedactingFormatter(logging.Formatter):
|
|
"""Log formatter that redacts secrets from all log messages."""
|
|
|
|
def __init__(self, fmt=None, datefmt=None, style='%', **kwargs):
|
|
super().__init__(fmt, datefmt, style, **kwargs)
|
|
|
|
def format(self, record: logging.LogRecord) -> str:
|
|
original = super().format(record)
|
|
return redact_sensitive_text(original)
|