Reject invalid Tailscale discovery JSON (#1556)

* Reject invalid Tailscale discovery JSON

* Guard nested Tailscale IP shapes
This commit is contained in:
red person
2026-06-03 08:11:31 +03:00
committed by GitHub
parent ab7145de83
commit 38bfa85ad0
2 changed files with 68 additions and 12 deletions

View File

@@ -16,6 +16,23 @@ _hosts_cache_time: float = 0
_HOSTS_CACHE_TTL = 60 # seconds _HOSTS_CACHE_TTL = 60 # seconds
def _parse_tailscale_status(raw: str) -> Dict[str, Any]:
try:
data = json.loads(raw)
except (TypeError, json.JSONDecodeError):
return {}
return data if isinstance(data, dict) else {}
def _first_tailscale_ipv4(value: Any) -> Optional[str]:
if not isinstance(value, list):
return None
for ip in value:
if isinstance(ip, str) and "." in ip:
return ip
return None
def discover_tailscale_hosts() -> List[str]: def discover_tailscale_hosts() -> List[str]:
"""Discover online Tailscale peers, returning their IPv4 addresses.""" """Discover online Tailscale peers, returning their IPv4 addresses."""
global _hosts_cache, _hosts_cache_time global _hosts_cache, _hosts_cache_time
@@ -33,17 +50,21 @@ def discover_tailscale_hosts() -> List[str]:
if result.returncode != 0: if result.returncode != 0:
return hosts return hosts
data = json.loads(result.stdout) data = _parse_tailscale_status(result.stdout)
if not data:
return hosts
# Add self # Add self
self_ips = data.get("Self", {}).get("TailscaleIPs", []) self_data = data.get("Self") if isinstance(data.get("Self"), dict) else {}
for ip in self_ips: self_ip = _first_tailscale_ipv4(self_data.get("TailscaleIPs"))
if "." in ip: # IPv4 only if self_ip:
hosts.append(ip) hosts.append(self_ip)
break
# Add online peers (skip funnel-ingress-nodes and android devices) # Add online peers (skip funnel-ingress-nodes and android devices)
for peer in data.get("Peer", {}).values(): peers = data.get("Peer") if isinstance(data.get("Peer"), dict) else {}
for peer in peers.values():
if not isinstance(peer, dict):
continue
if not peer.get("Online"): if not peer.get("Online"):
continue continue
hostname = peer.get("HostName", "") hostname = peer.get("HostName", "")
@@ -52,11 +73,9 @@ def discover_tailscale_hosts() -> List[str]:
os_name = peer.get("OS", "") os_name = peer.get("OS", "")
if os_name == "android": if os_name == "android":
continue continue
peer_ips = peer.get("TailscaleIPs", []) peer_ip = _first_tailscale_ipv4(peer.get("TailscaleIPs"))
for ip in peer_ips: if peer_ip:
if "." in ip: # IPv4 only hosts.append(peer_ip)
hosts.append(ip)
break
_hosts_cache = hosts _hosts_cache = hosts
_hosts_cache_time = now _hosts_cache_time = now

View File

@@ -0,0 +1,37 @@
from src import model_discovery
def test_parse_tailscale_status_rejects_wrong_shapes():
assert model_discovery._parse_tailscale_status("{bad") == {}
assert model_discovery._parse_tailscale_status("[]") == {}
assert model_discovery._parse_tailscale_status('{"Self": {}}') == {"Self": {}}
def test_discovery_ignores_invalid_peer_rows(monkeypatch):
class Result:
returncode = 0
stdout = '{"Self":{"TailscaleIPs":["100.1.1.1"]},"Peer":{"bad":"row","ok":{"Online":true,"HostName":"box","OS":"linux","TailscaleIPs":["100.1.1.2"]}}}'
monkeypatch.setattr(model_discovery.subprocess, "run", lambda *a, **k: Result())
model_discovery._hosts_cache = []
model_discovery._hosts_cache_time = 0
assert model_discovery.discover_tailscale_hosts() == ["100.1.1.1", "100.1.1.2"]
def test_discovery_ignores_invalid_tailscale_ip_shapes(monkeypatch):
class Result:
returncode = 0
stdout = (
'{"Self":{"TailscaleIPs":"100.1.1.1"},'
'"Peer":{'
'"string_ips":{"Online":true,"HostName":"bad","OS":"linux","TailscaleIPs":"100.1.1.2"},'
'"mixed_ips":{"Online":true,"HostName":"ok","OS":"linux","TailscaleIPs":[null,123,"100.1.1.3"]}'
'}}'
)
monkeypatch.setattr(model_discovery.subprocess, "run", lambda *a, **k: Result())
model_discovery._hosts_cache = []
model_discovery._hosts_cache_time = 0
assert model_discovery.discover_tailscale_hosts() == ["100.1.1.3"]