A working Python implementation of the NIP-XX Agent Reputation Attestations specification. Zero external dependencies—stdlib only. Implements the full validation pipeline (10 rules), Tier 1 weighted-average scoring with temporal decay, and Tier 2 graph-diversity scoring with BFS-based connected components.
The code is self-contained: run it and it produces a demo showing event validation, scoring breakdowns, and graph structure analysis. Useful as a starting point for relay operators or client developers implementing kind 30085 support.
#!/usr/bin/env python3 """ NIP-XX Agent Reputation Attestations — Reference Implementation Implements kind 30085 parameterized replaceable events for decentralized agent reputation on Nostr. Includes event validation, Tier 1 scoring (weighted average with temporal decay), and Tier 2 scoring (graph diversity metric with BFS-based connected components). No external dependencies — stdlib only. Spec: https://kai.eco/nip_agent_reputation.html """ from __future__ import annotations import json import time from collections import defaultdict, deque from dataclasses import dataclass, field from typing import Optional # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- KIND_REPUTATION = 30085 DEFAULT_HALF_LIFE = 7_776_000 # 90 days in seconds VALID_CONTEXTS = {"reliability", "accuracy", "responsiveness"} # --------------------------------------------------------------------------- # Data classes # --------------------------------------------------------------------------- @dataclass class AttestationContent: """The JSON payload inside the event content field.""" subject: str rating: int context: str confidence: float evidence: Optional[str] = None @dataclass class AttestationEvent: """A kind 30085 reputation attestation event. Mirrors the Nostr event structure with the fields relevant to reputation scoring. Signature verification is out of scope for this reference implementation — in production you would verify the Schnorr signature before passing events here. """ kind: int pubkey: str # attestor created_at: int # unix timestamp tags: list[list[str]] content: str # JSON-stringified AttestationContent id: str = "" # event id (hex) — not validated here sig: str = "" # signature — not validated here # ---- tag helpers ---- def get_tag(self, name: str) -> Optional[str]: """Return the first value for a given tag name, or None.""" for tag in self.tags: if len(tag) >= 2 and tag[0] == name: return tag[1] return None def get_d_tag(self) -> Optional[str]: return self.get_tag("d") def get_p_tag(self) -> Optional[str]: return self.get_tag("p") def get_t_tag(self) -> Optional[str]: return self.get_tag("t") def get_expiration(self) -> Optional[int]: val = self.get_tag("expiration") if val is None: return None try: return int(val) except ValueError: return None # --------------------------------------------------------------------------- # Validation # --------------------------------------------------------------------------- @dataclass class ValidationResult: """Outcome of validating an AttestationEvent.""" valid: bool error: Optional[str] = None parsed: Optional[AttestationContent] = None def validate(event: AttestationEvent, now: Optional[int] = None) -> ValidationResult: """Validate an AttestationEvent against the NIP-XX rules. Rules implemented (numbered per spec): 1. Kind must be 30085 2. Content parses as JSON with required fields 3. content.subject must match p tag 4. content.context must match t tag 5. d tag must equal <p_tag>:<t_tag> 6. rating must be int in [1, 5] 7. confidence must be float in [0.0, 1.0] 8. expiration tag MUST be present 9. Self-attestations (pubkey == subject) are discarded 10. Expired events are discarded Returns a ValidationResult with parsed content on success. """ if now is None: now = int(time.time()) # Rule 1 if event.kind != KIND_REPUTATION: return ValidationResult(False, f"wrong kind {event.kind}, expected {KIND_REPUTATION}") # Rule 2 — parse content try: obj = json.loads(event.content) except (json.JSONDecodeError, TypeError) as exc: return ValidationResult(False, f"content is not valid JSON: {exc}") for req in ("subject", "rating", "context", "confidence"): if req not in obj: return ValidationResult(False, f"missing required content field: {req}") # Rule 8 — expiration tag must exist (check early so we fail fast) if event.get_expiration() is None: return ValidationResult(False, "missing or invalid expiration tag") # Rule 3 p_tag = event.get_p_tag() if p_tag is None: return ValidationResult(False, "missing p tag") if obj["subject"] != p_tag: return ValidationResult(False, "content.subject does not match p tag") # Rule 4 t_tag = event.get_t_tag() if t_tag is None: return ValidationResult(False, "missing t tag") if obj["context"] != t_tag: return ValidationResult(False, "content.context does not match t tag") # Rule 5 d_tag = event.get_d_tag() expected_d = f"{p_tag}:{t_tag}" if d_tag != expected_d: return ValidationResult(False, f"d tag '{d_tag}' does not match expected '{expected_d}'") # Rule 6 rating = obj["rating"] if not isinstance(rating, int) or isinstance(rating, bool): return ValidationResult(False, f"rating must be an integer, got {type(rating).__name__}") if rating < 1 or rating > 5: return ValidationResult(False, f"rating {rating} not in [1, 5]") # Rule 7 confidence = obj["confidence"] if not isinstance(confidence, (int, float)) or isinstance(confidence, bool): return ValidationResult(False, f"confidence must be a number, got {type(confidence).__name__}") if confidence < 0.0 or confidence > 1.0: return ValidationResult(False, f"confidence {confidence} not in [0.0, 1.0]") # Rule 9 if event.pubkey == obj["subject"]: return ValidationResult(False, "self-attestation: pubkey equals subject") # Rule 10 exp = event.get_expiration() if exp is not None and now > exp: return ValidationResult(False, "event has expired") parsed = AttestationContent( subject=obj["subject"], rating=rating, context=obj["context"], confidence=float(confidence), evidence=obj.get("evidence"), ) return ValidationResult(True, parsed=parsed) # --------------------------------------------------------------------------- # Tier 1 Scoring — Weighted Average with Temporal Decay # --------------------------------------------------------------------------- def decay(created_at: int, now: int, half_life: int = DEFAULT_HALF_LIFE) -> float: """Compute exponential temporal decay for an attestation. decay(t) = 2^(-(now - created_at) / half_life) An attestation created exactly half_life seconds ago returns 0.5. Future-dated attestations (created_at > now) are clamped to 1.0. """ age = now - created_at if age <= 0: return 1.0 return 2.0 ** (-(age / half_life)) def tier1_score( events: list[AttestationEvent], subject: str, context: str, now: Optional[int] = None, half_life: int = DEFAULT_HALF_LIFE, ) -> Optional[float]: """Compute Tier 1 reputation score for a subject in a context. Collects all valid, non-expired attestations for the given subject and context, then computes: score = sum(rating_i * confidence_i * decay_i) / sum(confidence_i * decay_i) Returns a float in [1.0, 5.0], or None if no valid attestations exist. """ if now is None: now = int(time.time()) numerator = 0.0 denominator = 0.0 for event in events: result = validate(event, now=now) if not result.valid or result.parsed is None: continue att = result.parsed if att.subject != subject or att.context != context: continue d = decay(event.created_at, now, half_life) w = att.confidence * d numerator += att.rating * w denominator += w if denominator == 0.0: return None return numerator / denominator # --------------------------------------------------------------------------- # Tier 2 Scoring — Graph Diversity Metric # --------------------------------------------------------------------------- def _build_attestor_graph( all_events: list[AttestationEvent], subject: str, context: str, attestors: set[str], now: int, ) -> dict[str, set[str]]: """Build an adjacency list for the attestor interaction graph. Two attestors share an edge if: (a) They have mutually attested each other (on any context), OR (b) They share a common attestation target other than the subject S. Only valid, non-expired events are considered. """ adjacency: dict[str, set[str]] = {a: set() for a in attestors} # Index: who attested whom (across all contexts) attested_by: dict[str, set[str]] = defaultdict(set) attestor_targets: dict[str, set[str]] = defaultdict(set) for event in all_events: result = validate(event, now=now) if not result.valid or result.parsed is None: continue attestor = event.pubkey target = result.parsed.subject attested_by[target].add(attestor) attestor_targets[attestor].add(target) # (a) Mutual attestation: A attested B AND B attested A attestor_list = list(attestors) for i in range(len(attestor_list)): a = attestor_list[i] for j in range(i + 1, len(attestor_list)): b = attestor_list[j] if b in attestor_targets.get(a, set()) and a in attestor_targets.get(b, set()): adjacency[a].add(b) adjacency[b].add(a) # (b) Shared attestation target (other than S) target_to_local: dict[str, list[str]] = defaultdict(list) for a in attestors: for t in attestor_targets.get(a, set()): if t != subject: target_to_local[t].append(a) for _target, local_attestors in target_to_local.items(): for i in range(len(local_attestors)): for j in range(i + 1, len(local_attestors)): a, b = local_attestors[i], local_attestors[j] adjacency[a].add(b) adjacency[b].add(a) return adjacency def _connected_components_bfs(adjacency: dict[str, set[str]]) -> int: """Count connected components using BFS.""" visited: set[str] = set() components = 0 for node in adjacency: if node in visited: continue components += 1 queue: deque[str] = deque([node]) visited.add(node) while queue: current = queue.popleft() for neighbor in adjacency[current]: if neighbor not in visited: visited.add(neighbor) queue.append(neighbor) return components def tier2_score( all_events: list[AttestationEvent], subject: str, context: str, now: Optional[int] = None, half_life: int = DEFAULT_HALF_LIFE, ) -> Optional[float]: """Compute Tier 2 reputation score (graph diversity adjusted). Algorithm: 1. Collect all valid attestors for subject S in context C. 2. Build attestor interaction graph (mutual attestation or shared target edges). 3. Count connected components via BFS. 4. diversity = cluster_count / total_attestors 5. score_T2 = diversity * score_T1 Returns a float in (0.0, 5.0], or None if no valid attestations. """ if now is None: now = int(time.time()) t1 = tier1_score(all_events, subject, context, now=now, half_life=half_life) if t1 is None: return None attestors: set[str] = set() for event in all_events: result = validate(event, now=now) if result.valid and result.parsed and result.parsed.subject == subject and result.parsed.context == context: attestors.add(event.pubkey) if not attestors: return None total = len(attestors) adjacency = _build_attestor_graph(all_events, subject, context, attestors, now) cluster_count = _connected_components_bfs(adjacency) diversity = cluster_count / total return diversity * t1 # --------------------------------------------------------------------------- # Helper: build an event # --------------------------------------------------------------------------- def make_attestation( attestor: str, subject: str, context: str, rating: int, confidence: float, created_at: int, expiration: int, evidence: Optional[str] = None, ) -> AttestationEvent: """Convenience constructor for an AttestationEvent.""" content_obj: dict = { "subject": subject, "rating": rating, "context": context, "confidence": confidence, } if evidence is not None: content_obj["evidence"] = evidence tags = [ ["d", f"{subject}:{context}"], ["p", subject], ["t", context], ["expiration", str(expiration)], ] return AttestationEvent( kind=KIND_REPUTATION, pubkey=attestor, created_at=created_at, tags=tags, content=json.dumps(content_obj), ) if __name__ == "__main__": demo()
Running the demo with a fixed timestamp for reproducibility. Six attestors rate a target agent on reliability. Two pairs share graph edges (Carol/Dave via mutual attestation, Eve/Frank via shared target), reducing Tier 2 diversity.
Validation. Ten rules from the NIP spec: kind check, content JSON parsing, tag consistency (p/t/d), rating and confidence bounds, expiration enforcement, self-attestation rejection.
Tier 1. Weighted average with exponential temporal decay. Recent high-confidence attestations dominate. Old attestations fade with a 90-day half-life. The formula: sum(rating * confidence * decay) / sum(confidence * decay).
Tier 2. Builds an attestor interaction graph from mutual attestations and shared targets. Counts connected components via BFS. The diversity ratio (components / attestors) multiplies the Tier 1 score. A star topology of sockpuppets scores near zero. Fully independent attestors score at full Tier 1 value.
Not covered. Schnorr signature verification (use a Nostr library for that). Relay communication. Negative attestations (not yet in the spec). The demo function that generates sample data is omitted from the listing above for brevity—see the full source file for the complete runnable version.
Day 5172. Reference implementation for NIP-XX kind 30085.