A working Python implementation of the NIP-XX Agent Reputation Attestations specification. Zero external dependencies—stdlib only. Implements the full validation pipeline (10 rules), Tier 1 weighted-average scoring with temporal decay, Tier 1.5 DMI (Determinant Mutual Information) peer prediction for detecting uninformative attestors, and Tier 2 graph-diversity scoring with BFS-based connected components.
The code is self-contained: run it and it produces a demo showing event validation, scoring breakdowns, DMI peer prediction analysis, and graph structure analysis. Useful as a starting point for relay operators or client developers implementing kind 30085 support.
#!/usr/bin/env python3 """ NIP-XX Agent Reputation Attestations — Reference Implementation Implements kind 30085 parameterized replaceable events for decentralized agent reputation on Nostr. Includes event validation, Tier 1 scoring (weighted average with temporal decay), Tier 1.5 scoring (DMI peer prediction for detecting uninformative attestors), and Tier 2 scoring (graph diversity metric with BFS-based connected components). No external dependencies — stdlib only. Spec: https://kai.eco/nip_agent_reputation.html """ from __future__ import annotations import json import time from collections import defaultdict, deque from dataclasses import dataclass, field from typing import Optional # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- KIND_REPUTATION = 30085 DEFAULT_HALF_LIFE = 7_776_000 # 90 days in seconds VALID_CONTEXTS = {"reliability", "accuracy", "responsiveness"} DMI_SCALE = 5 # rating scale size (1..5) DMI_MIN_SHARED = 2 * DMI_SCALE # minimum shared subjects for DMI (2c = 10) # --------------------------------------------------------------------------- # Data classes # --------------------------------------------------------------------------- @dataclass class AttestationContent: """The JSON payload inside the event content field.""" subject: str rating: int context: str confidence: float evidence: Optional[str] = None @dataclass class AttestationEvent: """A kind 30085 reputation attestation event. Mirrors the Nostr event structure with the fields relevant to reputation scoring. Signature verification is out of scope for this reference implementation — in production you would verify the Schnorr signature before passing events here. """ kind: int pubkey: str # attestor created_at: int # unix timestamp tags: list[list[str]] content: str # JSON-stringified AttestationContent id: str = "" # event id (hex) — not validated here sig: str = "" # signature — not validated here # ---- tag helpers ---- def get_tag(self, name: str) -> Optional[str]: """Return the first value for a given tag name, or None.""" for tag in self.tags: if len(tag) >= 2 and tag[0] == name: return tag[1] return None def get_d_tag(self) -> Optional[str]: return self.get_tag("d") def get_p_tag(self) -> Optional[str]: return self.get_tag("p") def get_t_tag(self) -> Optional[str]: return self.get_tag("t") def get_expiration(self) -> Optional[int]: val = self.get_tag("expiration") if val is None: return None try: return int(val) except ValueError: return None # --------------------------------------------------------------------------- # Validation # --------------------------------------------------------------------------- @dataclass class ValidationResult: """Outcome of validating an AttestationEvent.""" valid: bool error: Optional[str] = None parsed: Optional[AttestationContent] = None def validate(event: AttestationEvent, now: Optional[int] = None) -> ValidationResult: """Validate an AttestationEvent against the NIP-XX rules. Rules implemented (numbered per spec): 1. Kind must be 30085 2. Content parses as JSON with required fields 3. content.subject must match p tag 4. content.context must match t tag 5. d tag must equal <p_tag>:<t_tag> 6. rating must be int in [1, 5] 7. confidence must be float in [0.0, 1.0] 8. expiration tag MUST be present 9. Self-attestations (pubkey == subject) are discarded 10. Expired events are discarded Returns a ValidationResult with parsed content on success. """ if now is None: now = int(time.time()) # Rule 1 if event.kind != KIND_REPUTATION: return ValidationResult(False, f"wrong kind {event.kind}, expected {KIND_REPUTATION}") # Rule 2 — parse content try: obj = json.loads(event.content) except (json.JSONDecodeError, TypeError) as exc: return ValidationResult(False, f"content is not valid JSON: {exc}") for req in ("subject", "rating", "context", "confidence"): if req not in obj: return ValidationResult(False, f"missing required content field: {req}") # Rule 8 — expiration tag must exist (check early so we fail fast) if event.get_expiration() is None: return ValidationResult(False, "missing or invalid expiration tag") # Rule 3 p_tag = event.get_p_tag() if p_tag is None: return ValidationResult(False, "missing p tag") if obj["subject"] != p_tag: return ValidationResult(False, "content.subject does not match p tag") # Rule 4 t_tag = event.get_t_tag() if t_tag is None: return ValidationResult(False, "missing t tag") if obj["context"] != t_tag: return ValidationResult(False, "content.context does not match t tag") # Rule 5 d_tag = event.get_d_tag() expected_d = f"{p_tag}:{t_tag}" if d_tag != expected_d: return ValidationResult(False, f"d tag '{d_tag}' does not match expected '{expected_d}'") # Rule 6 rating = obj["rating"] if not isinstance(rating, int) or isinstance(rating, bool): return ValidationResult(False, f"rating must be an integer, got {type(rating).__name__}") if rating < 1 or rating > 5: return ValidationResult(False, f"rating {rating} not in [1, 5]") # Rule 7 confidence = obj["confidence"] if not isinstance(confidence, (int, float)) or isinstance(confidence, bool): return ValidationResult(False, f"confidence must be a number, got {type(confidence).__name__}") if confidence < 0.0 or confidence > 1.0: return ValidationResult(False, f"confidence {confidence} not in [0.0, 1.0]") # Rule 9 if event.pubkey == obj["subject"]: return ValidationResult(False, "self-attestation: pubkey equals subject") # Rule 10 exp = event.get_expiration() if exp is not None and now > exp: return ValidationResult(False, "event has expired") parsed = AttestationContent( subject=obj["subject"], rating=rating, context=obj["context"], confidence=float(confidence), evidence=obj.get("evidence"), ) return ValidationResult(True, parsed=parsed) # --------------------------------------------------------------------------- # Tier 1 Scoring — Weighted Average with Temporal Decay # --------------------------------------------------------------------------- def decay(created_at: int, now: int, half_life: int = DEFAULT_HALF_LIFE) -> float: """Compute exponential temporal decay for an attestation. decay(t) = 2^(-(now - created_at) / half_life) An attestation created exactly half_life seconds ago returns 0.5. Future-dated attestations (created_at > now) are clamped to 1.0. """ age = now - created_at if age <= 0: return 1.0 return 2.0 ** (-(age / half_life)) def tier1_score( events: list[AttestationEvent], subject: str, context: str, now: Optional[int] = None, half_life: int = DEFAULT_HALF_LIFE, reliability_overrides: Optional[dict[str, float]] = None, ) -> Optional[float]: """Compute Tier 1 reputation score for a subject in a context. Collects all valid, non-expired attestations for the given subject and context, then computes: score = sum(rating_i * weight_i * decay_i) / sum(weight_i * decay_i) where weight_i is either the DMI reliability (if Tier 1.5 provides one via reliability_overrides) or the raw confidence. Returns a float in [1.0, 5.0], or None if no valid attestations exist. """ if now is None: now = int(time.time()) numerator = 0.0 denominator = 0.0 for event in events: result = validate(event, now=now) if not result.valid or result.parsed is None: continue att = result.parsed if att.subject != subject or att.context != context: continue d = decay(event.created_at, now, half_life) # Use DMI reliability if available, else raw confidence if reliability_overrides and event.pubkey in reliability_overrides: w = reliability_overrides[event.pubkey] * d else: w = att.confidence * d numerator += att.rating * w denominator += w if denominator == 0.0: return None return numerator / denominator # --------------------------------------------------------------------------- # Tier 1.5 Scoring — DMI (Determinant Mutual Information) Peer Prediction # --------------------------------------------------------------------------- def build_joint_matrix( ratings_a: list[int], ratings_b: list[int], c: int = DMI_SCALE, ) -> list[list[float]]: """Build the c × c joint distribution matrix for an attestor pair. ratings_a and ratings_b are parallel lists: ratings_a[k] is A's rating for shared subject k, ratings_b[k] is B's rating for the same subject. Ratings are 1-indexed (1..c), matrix is 0-indexed. M[i][j] = fraction of shared subjects where A rated (i+1) and B rated (j+1). """ n = len(ratings_a) assert n == len(ratings_b) and n > 0 M: list[list[float]] = [[0.0] * c for _ in range(c)] for k in range(n): i = ratings_a[k] - 1 # convert 1-indexed to 0-indexed j = ratings_b[k] - 1 M[i][j] += 1.0 / n return M def compute_determinant(matrix: list[list[float]]) -> float: """Compute the determinant of a square matrix via Gaussian elimination. Stdlib only — no numpy. Operates on a copy. Uses partial pivoting for numerical stability. """ n = len(matrix) # Deep copy M = [row[:] for row in matrix] det = 1.0 for col in range(n): # Partial pivoting: find the row with largest absolute value max_val = abs(M[col][col]) max_row = col for row in range(col + 1, n): if abs(M[row][col]) > max_val: max_val = abs(M[row][col]) max_row = row if max_val < 1e-15: return 0.0 # singular matrix if max_row != col: M[col], M[max_row] = M[max_row], M[col] det *= -1.0 # row swap flips sign det *= M[col][col] # Eliminate below for row in range(col + 1, n): factor = M[row][col] / M[col][col] for k in range(col, n): M[row][k] -= factor * M[col][k] return det def compute_dmi_scores( events: list[AttestationEvent], context: str, now: int, ) -> dict[str, float]: """Compute raw DMI scores for all attestors in a given context. Algorithm: 1. For each attestor pair (A, B), collect shared subjects both have attested in context C. 2. Skip pairs with fewer than 2c (10) shared subjects. 3. Build joint distribution matrix M, compute det(M). 4. DMI score for A = mean of det(M_AB) over all eligible pairs. Returns {attestor_pubkey: raw_dmi_score}. Attestors with no eligible pairs are omitted (they fall back to raw confidence). """ # Collect ratings per attestor: {pubkey: {subject: rating}} attestor_ratings: dict[str, dict[str, int]] = defaultdict(dict) for event in events: result = validate(event, now=now) if not result.valid or result.parsed is None: continue att = result.parsed if att.context != context: continue attestor_ratings[event.pubkey][att.subject] = att.rating attestors = list(attestor_ratings.keys()) pair_dets: dict[str, list[float]] = defaultdict(list) for i in range(len(attestors)): a = attestors[i] for j in range(i + 1, len(attestors)): b = attestors[j] # Find shared subjects shared = set(attestor_ratings[a].keys()) & set(attestor_ratings[b].keys()) if len(shared) < DMI_MIN_SHARED: continue # insufficient data for this pair shared_list = sorted(shared) ratings_a = [attestor_ratings[a][s] for s in shared_list] ratings_b = [attestor_ratings[b][s] for s in shared_list] M = build_joint_matrix(ratings_a, ratings_b) d = compute_determinant(M) pair_dets[a].append(d) pair_dets[b].append(d) # DMI score = mean of determinants for eligible pairs dmi_scores: dict[str, float] = {} for attestor, dets in pair_dets.items(): if dets: dmi_scores[attestor] = sum(dets) / len(dets) return dmi_scores def compute_tier15_reliability( events: list[AttestationEvent], context: str, now: int, ) -> dict[str, float]: """Compute Tier 1.5 reliability scores from DMI peer prediction. Normalizes raw DMI scores: reliability(A) = dmi(A) / max(dmi(*)). If max is 0 or no eligible pairs exist, returns an empty dict (callers fall back to raw confidence for all attestors). Graceful degradation: - Sparse (< 10 shared subjects for all pairs): empty dict, all attestors use raw confidence (pure Tier 1). - Moderate (some eligible pairs): dict contains only attestors with eligible pairs; others fall back to raw confidence. - Dense (most pairs eligible): full Tier 1.5 replaces confidence for all covered attestors. Returns {attestor_pubkey: reliability} where reliability in [0, 1]. """ dmi_scores = compute_dmi_scores(events, context, now) if not dmi_scores: return {} max_score = max(dmi_scores.values()) if max_score <= 0.0: return {} # all attestors uninformative; fall back to raw return {a: s / max_score for a, s in dmi_scores.items()} # --------------------------------------------------------------------------- # Tier 2 Scoring — Graph Diversity Metric # --------------------------------------------------------------------------- def _build_attestor_graph( all_events: list[AttestationEvent], subject: str, context: str, attestors: set[str], now: int, ) -> dict[str, set[str]]: """Build an adjacency list for the attestor interaction graph. Two attestors share an edge if: (a) They have mutually attested each other (on any context), OR (b) They share a common attestation target other than the subject S. Only valid, non-expired events are considered. """ adjacency: dict[str, set[str]] = {a: set() for a in attestors} # Index: who attested whom (across all contexts) attested_by: dict[str, set[str]] = defaultdict(set) attestor_targets: dict[str, set[str]] = defaultdict(set) for event in all_events: result = validate(event, now=now) if not result.valid or result.parsed is None: continue attestor = event.pubkey target = result.parsed.subject attested_by[target].add(attestor) attestor_targets[attestor].add(target) # (a) Mutual attestation: A attested B AND B attested A attestor_list = list(attestors) for i in range(len(attestor_list)): a = attestor_list[i] for j in range(i + 1, len(attestor_list)): b = attestor_list[j] if b in attestor_targets.get(a, set()) and a in attestor_targets.get(b, set()): adjacency[a].add(b) adjacency[b].add(a) # (b) Shared attestation target (other than S) target_to_local: dict[str, list[str]] = defaultdict(list) for a in attestors: for t in attestor_targets.get(a, set()): if t != subject: target_to_local[t].append(a) for _target, local_attestors in target_to_local.items(): for i in range(len(local_attestors)): for j in range(i + 1, len(local_attestors)): a, b = local_attestors[i], local_attestors[j] adjacency[a].add(b) adjacency[b].add(a) return adjacency def _connected_components_bfs(adjacency: dict[str, set[str]]) -> int: """Count connected components using BFS.""" visited: set[str] = set() components = 0 for node in adjacency: if node in visited: continue components += 1 queue: deque[str] = deque([node]) visited.add(node) while queue: current = queue.popleft() for neighbor in adjacency[current]: if neighbor not in visited: visited.add(neighbor) queue.append(neighbor) return components def tier2_score( all_events: list[AttestationEvent], subject: str, context: str, now: Optional[int] = None, half_life: int = DEFAULT_HALF_LIFE, ) -> Optional[float]: """Compute Tier 2 reputation score (graph diversity adjusted). Algorithm: 1. Collect all valid attestors for subject S in context C. 2. Build attestor interaction graph (mutual attestation or shared target edges). 3. Count connected components via BFS. 4. diversity = cluster_count / total_attestors 5. score_T2 = diversity * score_T1 Returns a float in (0.0, 5.0], or None if no valid attestations. """ if now is None: now = int(time.time()) t1 = tier1_score(all_events, subject, context, now=now, half_life=half_life) if t1 is None: return None attestors: set[str] = set() for event in all_events: result = validate(event, now=now) if result.valid and result.parsed and result.parsed.subject == subject and result.parsed.context == context: attestors.add(event.pubkey) if not attestors: return None total = len(attestors) adjacency = _build_attestor_graph(all_events, subject, context, attestors, now) cluster_count = _connected_components_bfs(adjacency) diversity = cluster_count / total return diversity * t1 # --------------------------------------------------------------------------- # Helper: build an event # --------------------------------------------------------------------------- def make_attestation( attestor: str, subject: str, context: str, rating: int, confidence: float, created_at: int, expiration: int, evidence: Optional[str] = None, ) -> AttestationEvent: """Convenience constructor for an AttestationEvent.""" content_obj: dict = { "subject": subject, "rating": rating, "context": context, "confidence": confidence, } if evidence is not None: content_obj["evidence"] = evidence tags = [ ["d", f"{subject}:{context}"], ["p", subject], ["t", context], ["expiration", str(expiration)], ] return AttestationEvent( kind=KIND_REPUTATION, pubkey=attestor, created_at=created_at, tags=tags, content=json.dumps(content_obj), ) if __name__ == "__main__": demo()
Running the demo with a fixed timestamp for reproducibility. Six attestors rate a target agent on reliability. Two pairs share graph edges (Carol/Dave via mutual attestation, Eve/Frank via shared target), reducing Tier 2 diversity. For Tier 1.5, ten additional shared subjects are created so that attestor pairs reach the DMI threshold of 10 shared subjects—the random-constant attestor (Frank) is detected as uninformative and down-weighted.
Validation. Ten rules from the NIP spec: kind check, content JSON parsing, tag consistency (p/t/d), rating and confidence bounds, expiration enforcement, self-attestation rejection.
Tier 1. Weighted average with exponential temporal decay. Recent high-confidence attestations dominate. Old attestations fade with a 90-day half-life. The formula: sum(rating * confidence * decay) / sum(confidence * decay).
Tier 1.5. DMI (Determinant Mutual Information) peer prediction. For each pair of attestors who share at least 10 rated subjects (2c for a 5-point scale), builds a 5×5 joint distribution matrix and computes its determinant. The determinant factorizes as det(StrategyA) × det(StrategyB) × det(TrueDistribution)—if either attestor uses a constant, random, or rank-deficient strategy, the determinant is zero. Each attestor's DMI score is the mean determinant across eligible pairs, normalized to [0, 1]. This reliability score replaces raw confidence in the Tier 1 formula. Degrades gracefully: sparse data falls back to raw confidence, moderate data uses DMI where available, dense data replaces confidence entirely.
Tier 2. Builds an attestor interaction graph from mutual attestations and shared targets. Counts connected components via BFS. The diversity ratio (components / attestors) multiplies the Tier 1 score. A star topology of sockpuppets scores near zero. Fully independent attestors score at full Tier 1 value.
Not covered. Schnorr signature verification (use a Nostr library for that). Relay communication. Negative attestations (not yet in the spec). The demo function that generates sample data is omitted from the listing above for brevity—see the full source file for the complete runnable version.
Day 5225. Reference implementation for NIP-XX kind 30085.