From 38bfa85ad02b720047bcbe0b8376ae6101ab74bf Mon Sep 17 00:00:00 2001 From: red person Date: Wed, 3 Jun 2026 08:11:31 +0300 Subject: [PATCH] Reject invalid Tailscale discovery JSON (#1556) * Reject invalid Tailscale discovery JSON * Guard nested Tailscale IP shapes --- src/model_discovery.py | 43 ++++++++++++++++++++-------- tests/test_model_discovery_status.py | 37 ++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 12 deletions(-) create mode 100644 tests/test_model_discovery_status.py diff --git a/src/model_discovery.py b/src/model_discovery.py index f109651..ca62a9f 100644 --- a/src/model_discovery.py +++ b/src/model_discovery.py @@ -16,6 +16,23 @@ _hosts_cache_time: float = 0 _HOSTS_CACHE_TTL = 60 # seconds +def _parse_tailscale_status(raw: str) -> Dict[str, Any]: + try: + data = json.loads(raw) + except (TypeError, json.JSONDecodeError): + return {} + return data if isinstance(data, dict) else {} + + +def _first_tailscale_ipv4(value: Any) -> Optional[str]: + if not isinstance(value, list): + return None + for ip in value: + if isinstance(ip, str) and "." in ip: + return ip + return None + + def discover_tailscale_hosts() -> List[str]: """Discover online Tailscale peers, returning their IPv4 addresses.""" global _hosts_cache, _hosts_cache_time @@ -33,17 +50,21 @@ def discover_tailscale_hosts() -> List[str]: if result.returncode != 0: return hosts - data = json.loads(result.stdout) + data = _parse_tailscale_status(result.stdout) + if not data: + return hosts # Add self - self_ips = data.get("Self", {}).get("TailscaleIPs", []) - for ip in self_ips: - if "." in ip: # IPv4 only - hosts.append(ip) - break + self_data = data.get("Self") if isinstance(data.get("Self"), dict) else {} + self_ip = _first_tailscale_ipv4(self_data.get("TailscaleIPs")) + if self_ip: + hosts.append(self_ip) # Add online peers (skip funnel-ingress-nodes and android devices) - for peer in data.get("Peer", {}).values(): + peers = data.get("Peer") if isinstance(data.get("Peer"), dict) else {} + for peer in peers.values(): + if not isinstance(peer, dict): + continue if not peer.get("Online"): continue hostname = peer.get("HostName", "") @@ -52,11 +73,9 @@ def discover_tailscale_hosts() -> List[str]: os_name = peer.get("OS", "") if os_name == "android": continue - peer_ips = peer.get("TailscaleIPs", []) - for ip in peer_ips: - if "." in ip: # IPv4 only - hosts.append(ip) - break + peer_ip = _first_tailscale_ipv4(peer.get("TailscaleIPs")) + if peer_ip: + hosts.append(peer_ip) _hosts_cache = hosts _hosts_cache_time = now diff --git a/tests/test_model_discovery_status.py b/tests/test_model_discovery_status.py new file mode 100644 index 0000000..17be910 --- /dev/null +++ b/tests/test_model_discovery_status.py @@ -0,0 +1,37 @@ +from src import model_discovery + + +def test_parse_tailscale_status_rejects_wrong_shapes(): + assert model_discovery._parse_tailscale_status("{bad") == {} + assert model_discovery._parse_tailscale_status("[]") == {} + assert model_discovery._parse_tailscale_status('{"Self": {}}') == {"Self": {}} + + +def test_discovery_ignores_invalid_peer_rows(monkeypatch): + class Result: + returncode = 0 + stdout = '{"Self":{"TailscaleIPs":["100.1.1.1"]},"Peer":{"bad":"row","ok":{"Online":true,"HostName":"box","OS":"linux","TailscaleIPs":["100.1.1.2"]}}}' + + monkeypatch.setattr(model_discovery.subprocess, "run", lambda *a, **k: Result()) + model_discovery._hosts_cache = [] + model_discovery._hosts_cache_time = 0 + + assert model_discovery.discover_tailscale_hosts() == ["100.1.1.1", "100.1.1.2"] + + +def test_discovery_ignores_invalid_tailscale_ip_shapes(monkeypatch): + class Result: + returncode = 0 + stdout = ( + '{"Self":{"TailscaleIPs":"100.1.1.1"},' + '"Peer":{' + '"string_ips":{"Online":true,"HostName":"bad","OS":"linux","TailscaleIPs":"100.1.1.2"},' + '"mixed_ips":{"Online":true,"HostName":"ok","OS":"linux","TailscaleIPs":[null,123,"100.1.1.3"]}' + '}}' + ) + + monkeypatch.setattr(model_discovery.subprocess, "run", lambda *a, **k: Result()) + model_discovery._hosts_cache = [] + model_discovery._hosts_cache_time = 0 + + assert model_discovery.discover_tailscale_hosts() == ["100.1.1.3"]