Files
odysseus/services/hwfit/fit.py
2026-06-02 13:07:20 +09:00

528 lines
20 KiB
Python

import re
from services.hwfit.models import (
params_b, estimate_memory_gb, infer_use_case,
get_models, is_prequantized, _active_params_b, QUANT_BYTES_PER_PARAM,
QUANT_SPEED_MULT, QUANT_QUALITY_PENALTY,
)
GPU_BANDWIDTH = {
"5090": 1792, "5080": 960, "5070 ti": 896, "5070": 672, "5060 ti": 448, "5060": 256,
"4090": 1008, "4080 super": 736, "4080": 717, "4070 ti super": 672, "4070 ti": 504, "4070 super": 504, "4070": 504, "4060 ti": 288, "4060": 272,
"3090 ti": 1008, "3090": 936, "3080 ti": 912, "3080": 760, "3070 ti": 608, "3070": 448, "3060 ti": 448, "3060": 360,
"2080 ti": 616, "2080 super": 496, "2080": 448, "2070 super": 448, "2070": 448, "2060 super": 448, "2060": 336,
"1660 ti": 288, "1660 super": 336, "1660": 192, "1650 super": 192, "1650": 128,
"h100 sxm": 3350, "h100": 2039, "h200": 4800, "a100 sxm": 2039, "a100": 1555,
"l40s": 864, "l40": 864, "l4": 300, "a10g": 600, "a10": 600, "t4": 320,
"v100 sxm": 900, "v100": 897, "a6000": 768, "a5000": 768, "a4000": 448,
"7900 xtx": 960, "7900 xt": 800, "7900 gre": 576, "7800 xt": 624, "7700 xt": 432, "7600": 288,
"6950 xt": 576, "6900 xt": 512, "6800 xt": 512, "6800": 512, "6700 xt": 384, "6600 xt": 256, "6600": 224,
"mi300x": 5300, "mi300": 5300, "mi250x": 3277, "mi250": 3277, "mi210": 1638, "mi100": 1229,
"9070 xt": 624, "9070": 488,
# Apple Silicon unified-memory bandwidth (GB/s). Keyed off the chip name
# reported by sysctl machdep.cpu.brand_string (e.g. "Apple M4 Max"). Listed
# before the bare "m_" keys matters less than length-sorting (done below),
# which guarantees "m4 max" is tried before "m4".
"m1 ultra": 800, "m1 max": 400, "m1 pro": 200, "m1": 68,
"m2 ultra": 800, "m2 max": 400, "m2 pro": 200, "m2": 100,
"m3 ultra": 800, "m3 max": 300, "m3 pro": 150, "m3": 100,
"m4 max": 546, "m4 pro": 273, "m4": 120,
"m5 max": 546, "m5 pro": 273, "m5": 150,
}
# Pre-sort keys by length descending for correct substring matching
_BW_KEYS_SORTED = sorted(GPU_BANDWIDTH.keys(), key=len, reverse=True)
# metal: backstop for Apple Silicon chips not in GPU_BANDWIDTH (e.g. a future
# M5) — the named chips above take the accurate bandwidth path instead.
FALLBACK_K = {"cuda": 220, "rocm": 180, "metal": 150, "cpu_x86": 70, "cpu_arm": 90}
USE_CASE_WEIGHTS = {
"general": (0.45, 0.30, 0.15, 0.10),
"coding": (0.50, 0.20, 0.15, 0.15),
"reasoning": (0.55, 0.15, 0.15, 0.15),
"chat": (0.40, 0.35, 0.15, 0.10),
"multimodal": (0.50, 0.20, 0.15, 0.15),
"embedding": (0.30, 0.40, 0.20, 0.10),
"tts": (0.40, 0.35, 0.15, 0.10),
"stt": (0.40, 0.35, 0.15, 0.10),
}
SPEED_TARGET = {
"general": 40, "coding": 40, "multimodal": 40, "chat": 40,
"reasoning": 25, "embedding": 200, "tts": 40, "stt": 40,
}
CONTEXT_TARGET = {
"general": 4096, "chat": 4096, "coding": 8192,
"reasoning": 8192, "multimodal": 4096, "embedding": 512,
"tts": 2048, "stt": 2048,
}
def _lookup_bandwidth(gpu_name):
if not gpu_name:
return None
gn = gpu_name.lower()
for key in _BW_KEYS_SORTED:
if key in gn:
return GPU_BANDWIDTH[key]
return None
def _estimate_speed(model, quant, run_mode, system):
"""Estimate tok/s. Uses active params for MoE (only active experts run per token)."""
pb = _active_params_b(model)
is_moe = model.get("is_moe", False)
bw = _lookup_bandwidth(system.get("gpu_name"))
backend = system.get("backend", "cpu_x86")
if bw and run_mode in ("gpu", "cpu_offload"):
bpp = QUANT_BYTES_PER_PARAM.get(quant, 0.5)
model_gb = pb * bpp
if model_gb <= 0:
return 0.0
efficiency = 0.55
raw_tps = (bw / model_gb) * efficiency
if run_mode == "cpu_offload":
mode_factor = 0.5
elif is_moe:
mode_factor = 0.8
else:
mode_factor = 1.0
return raw_tps * mode_factor
k = FALLBACK_K.get(backend, 70)
if pb <= 0:
return 0.0
sm = QUANT_SPEED_MULT.get(quant, 1.0)
return k / pb * sm
def _architecture_bonus(model):
name = (model.get("name") or "").lower()
arch = (model.get("architecture") or "").lower()
text = f"{name} {arch}"
# Keep this intentionally small: hardware fit and speed still matter, but
# current model families should not be scored the same as older Qwen2/LLama
# era entries just because the parameter count is similar.
if "qwen3.6" in text or "qwen3_6" in text:
return 9
if "qwen3.5" in text or "qwen3_5" in text:
return 8
if "qwen3-next" in text or "qwen3_next" in text:
return 6
if "qwen3" in text or arch.startswith("qwen3"):
return 4
if "qwen2.5" in text or "qwen2_5" in text:
return 2
return 0
def _quality_score(model, quant, use_case):
pb = params_b(model)
if pb < 1:
base = 30
elif pb < 3:
base = 45
elif pb < 7:
base = 60
elif pb < 10:
base = 75
elif pb < 20:
base = 82
elif pb < 40:
base = 89
else:
base = 95
name_lower = model.get("name", "").lower()
if "qwen" in name_lower:
base += 2
if "deepseek" in name_lower:
base += 3
if "llama" in name_lower:
base += 2
if "mistral" in name_lower or "mixtral" in name_lower:
base += 1
if "gemma" in name_lower:
base += 1
base += _architecture_bonus(model)
base += QUANT_QUALITY_PENALTY.get(quant, 0)
model_uc = infer_use_case(model)
if model_uc == "coding" and use_case == "coding":
base += 6
if model_uc == "reasoning" and use_case == "reasoning" and pb >= 13:
base += 5
if model_uc == "multimodal" and use_case == "multimodal":
base += 6
return max(0, min(100, base))
def _speed_score(tps, use_case):
target = SPEED_TARGET.get(use_case, 40)
return max(0, min(100, (tps / target) * 100))
def _fit_score(required, available):
if required > available:
return 0
if available <= 0:
return 0
ratio = required / available
if ratio <= 0.5:
return 60 + (ratio / 0.5) * 40
if ratio <= 0.8:
return 100
if ratio <= 0.9:
return 70
return 50
def _context_score(ctx, use_case):
target = CONTEXT_TARGET.get(use_case, 4096)
if ctx >= target:
return 100
if ctx >= target / 2:
return 70
return 30
def _try_quant_at(model, quant, ctx, gpu_vram, available_ram):
"""Try a specific quant at a given context. Returns (run_mode, quant, ctx, mem) or None."""
mem = estimate_memory_gb(model, quant, ctx)
if gpu_vram > 0 and mem <= gpu_vram:
return "gpu", quant, ctx, mem
if gpu_vram > 0 and mem <= available_ram:
return "cpu_offload", quant, ctx, mem
if gpu_vram <= 0 and mem <= available_ram:
return "cpu_only", quant, ctx, mem
# Try halving context
cur_ctx = ctx // 2
while cur_ctx >= 1024:
mem = estimate_memory_gb(model, quant, cur_ctx)
if gpu_vram > 0 and mem <= gpu_vram:
return "gpu", quant, cur_ctx, mem
if mem <= available_ram:
return ("cpu_offload" if gpu_vram > 0 else "cpu_only"), quant, cur_ctx, mem
cur_ctx //= 2
return None
def _quant_bits(q):
"""Approximate bit-width of a quant label so GGUF quant tiers (Q4/Q8/…) can
be matched against prequantized formats (AWQ 4, AWQ-8bit, FP8, GPTQ-4bit…).
Returns 0 when unknown (caller treats unknown as "don't filter")."""
qu = (q or "").upper().replace("-", "").replace("_", "").replace(" ", "")
# GGUF k-quants + float formats
if qu.startswith("Q8") or "FP8" in qu or "INT8" in qu or qu.startswith("W8"):
return 8
if qu.startswith("Q4") or qu.startswith("IQ4") or "FP4" in qu or "NF4" in qu or "INT4" in qu or qu.startswith("W4"):
return 4
if qu.startswith("Q2") or qu.startswith("IQ2"):
return 2
if qu.startswith("Q3") or qu.startswith("IQ3"):
return 3
if qu.startswith("Q5"):
return 5
if qu.startswith("Q6"):
return 6
if qu.startswith("F16") or qu.startswith("BF16") or qu.startswith("F32"):
return 16
# Prequantized formats: pull the bit-width digit (AWQ4 / AWQ4BIT / GPTQ8 / 4BIT / INT8 ...)
m = re.search(r"(?:AWQ|GPTQ|MLX|EXL2|BNB|INT|W)(\d{1,2})", qu) or re.search(r"(\d{1,2})BIT", qu)
if m:
b = int(m.group(1))
if 2 <= b <= 16:
return b
return 0
def analyze_model(model, system, target_quant=None, scoring_use_case=None):
pb = params_b(model)
if pb <= 0:
return None
model_use_case = infer_use_case(model)
score_use_case = scoring_use_case or "general"
has_gpu = system.get("has_gpu", False)
gpu_vram = (system.get("gpu_vram_gb") or 0) if has_gpu else 0
gpu_count = system.get("gpu_count", 1) or 1
single_gpu_vram = gpu_vram / gpu_count if gpu_count > 1 else gpu_vram
available_ram = system.get("available_ram_gb", 0)
# When the user has explicitly picked a GPU config (not RAM mode), they want
# to see what runs ON the GPU(s) — not big models that only "fit" by spilling
# most layers to system RAM. Zeroing the offload budget makes _try_quant_at
# take only its GPU branches (fit on VRAM, shrinking context if needed),
# otherwise return None. Fixes "96 GB GPU still lists a 175 GB model".
gpu_only = bool(system.get("gpu_only")) and has_gpu and gpu_vram > 0
eff_ram = 0 if gpu_only else available_ram
is_moe = model.get("is_moe", False)
ctx = model.get("context_length", 4096) or 4096
native_quant = model.get("quantization", "Q4_K_M")
if "nvfp4" in (model.get("name") or "").lower():
native_quant = "NVFP4"
preq = is_prequantized(model)
# GGUF models can't be sharded across GPUs — use single GPU VRAM
is_gguf = bool(model.get("gguf_sources"))
quant_upper = (native_quant or "").upper()
is_gguf_quant = any(quant_upper.startswith(p) for p in ("Q2", "Q3", "Q4", "Q5", "Q6", "Q8", "IQ", "F16", "F32"))
# Single-GPU VRAM only applies to GGUF/dense builds (llama.cpp can't shard
# across GPUs). Prequantized formats (AWQ/GPTQ/FP8) are served sharded by
# vLLM across all GPUs, so they get the FULL multi-GPU VRAM — even when the
# model also lists a GGUF alternate download (gguf_sources).
if (is_gguf or is_gguf_quant) and not preq:
effective_vram = single_gpu_vram
else:
effective_vram = gpu_vram
native_gpu_only = preq and not native_quant.startswith("mlx-")
# Determine which quant to evaluate at
native_quant_prefixes = (
"AWQ-", "GPTQ-", "FP8", "FP4", "NVFP4", "MXFP4", "NF4",
"INT4", "INT8", "W4A16", "W8A8", "W8A16",
)
if preq:
# Native HF/vLLM quantized repos come at a fixed format. If the user
# picked a GGUF quant tier (Q4/Q8/etc.), do not treat same-bit
# AWQ/GPTQ/FP8/FP4 builds as equivalent; those formats are separate
# serving paths and only appear when explicitly selected or unfiltered.
if target_quant:
if not any(target_quant.startswith(p) for p in native_quant_prefixes):
return None
_tb, _nb = _quant_bits(target_quant), _quant_bits(native_quant)
if _tb and _nb and _tb != _nb:
return None
quant_to_try = native_quant
elif target_quant:
# User picked a specific quant
quant_to_try = target_quant
else:
# Default: Q4_K_M (user's stated preference)
quant_to_try = "Q4_K_M"
result = _try_quant_at(model, quant_to_try, ctx, effective_vram, 0 if native_gpu_only else eff_ram)
if result is None:
# Model doesn't fit on the user's current hardware. Surface it
# anyway with a "too_tight" badge instead of silently dropping
# it — without this, editing the hardware config to try LARGER
# tiers never revealed the bigger models, because they were
# filtered out before the user could see what would fit. The
# client already knows how to render too_tight (red row).
oversized_required = estimate_memory_gb(model, quant_to_try, ctx)
return {
"name": model.get("name"),
"provider": model.get("provider"),
"parameter_count": model.get("parameter_count"),
"params_b": round(pb, 1),
"is_moe": is_moe,
"use_case": model_use_case,
"fit_level": "too_tight",
"run_mode": "no_fit",
"quant": quant_to_try,
"context": ctx,
"required_gb": round(oversized_required, 1),
"speed_tps": 0,
"score": 0,
"scores": {"quality": 0, "speed": 0, "fit": 0, "context": 0},
"gguf_sources": model.get("gguf_sources", []),
"context_length": model.get("context_length", 4096),
}
run_mode, quant, fit_ctx, required_gb = result
# Determine fit level
budget = effective_vram if run_mode == "gpu" else available_ram
if required_gb > budget:
return None
if run_mode == "gpu":
rec = model.get("recommended_ram_gb") or required_gb
if rec <= gpu_vram:
fit_level = "perfect"
elif gpu_vram >= required_gb * 1.2:
fit_level = "good"
else:
fit_level = "marginal"
elif run_mode == "cpu_offload":
fit_level = "good" if available_ram >= required_gb * 1.2 else "marginal"
else:
fit_level = "marginal"
tps = _estimate_speed(model, quant, run_mode, system)
q_score = _quality_score(model, quant, score_use_case)
s_score = _speed_score(tps, score_use_case)
f_score = _fit_score(required_gb, budget)
c_score = _context_score(fit_ctx, score_use_case)
wq, ws, wf, wc = USE_CASE_WEIGHTS.get(score_use_case, (0.45, 0.30, 0.15, 0.10))
composite = q_score * wq + s_score * ws + f_score * wf + c_score * wc
return {
"name": model.get("name"),
"provider": model.get("provider"),
"parameter_count": model.get("parameter_count"),
"params_b": round(pb, 1),
"is_moe": is_moe,
"use_case": model_use_case,
"fit_level": fit_level,
"run_mode": run_mode,
"quant": quant,
"context": fit_ctx,
"required_gb": round(required_gb, 1),
"speed_tps": round(tps, 1),
"score": round(composite, 1),
"scores": {
"quality": round(q_score, 1),
"speed": round(s_score, 1),
"fit": round(f_score, 1),
"context": round(c_score, 1),
},
"gguf_sources": model.get("gguf_sources", []),
"context_length": model.get("context_length", 4096),
}
SORT_KEYS = {
"score": lambda r: r["score"],
"speed": lambda r: r["speed_tps"],
"vram": lambda r: r["required_gb"],
"params": lambda r: r["params_b"],
"context": lambda r: r["context"],
}
def rank_models(system, use_case=None, limit=50, search=None, sort="score", quant=None):
"""Rank all models against detected hardware. Returns sorted list of fit results."""
models = get_models()
results = []
# Include image gen models only when explicitly filtered
if use_case == "image_gen":
try:
from services.hwfit.image_models import rank_image_models
except ImportError:
rank_image_models = None
if rank_image_models:
img_results = rank_image_models(system, search=search)
else:
img_results = []
for im in img_results:
fit_map = {"perfect": "perfect", "good": "good", "tight": "marginal", "no_fit": "too_tight", "no_gpu": "too_tight"}
results.append({
"name": im["id"],
"provider": im["provider"],
"parameter_count": f"{im['params_b']}B",
"params_b": im["params_b"],
"is_moe": False,
"use_case": "image_gen",
"fit_level": fit_map.get(im["fit"], "too_tight"),
"run_mode": "gpu" if im["fits"] else "no_fit",
"quant": im.get("quant", "BF16"),
"context": 0,
"context_length": 0,
"required_gb": round(im.get("vram_needed") or 0, 1),
"speed_tps": 0,
"score": float(im["score"]),
"scores": {"quality": float(im["quality"]), "speed": float(im["speed"]), "fit": 0, "context": 0},
"gguf_sources": [],
"is_image_gen": True,
"capabilities": im.get("capabilities", []),
"description": im.get("description", ""),
})
if use_case == "image_gen":
sort_fn = SORT_KEYS.get(sort, SORT_KEYS["score"])
results.sort(key=sort_fn, reverse=(sort != "vram"))
return results[:limit]
# If user picked a native prequantized format, filter to only those models.
filter_native = quant and any(quant.startswith(p) for p in (
"AWQ-", "GPTQ-", "FP8", "FP4", "NVFP4", "MXFP4", "NF4",
"INT4", "INT8", "W4A16", "W8A8", "W8A16",
))
system_backend = (system.get("backend") or "").lower()
apple_silicon = system_backend in ("mps", "metal", "apple")
rocm = system_backend == "rocm"
for m in models:
native_q = m.get("quantization", "")
if "nvfp4" in (m.get("name") or "").lower():
native_q = "NVFP4"
# MLX needs the mlx_lm runtime, which Odysseus does not generate serve
# commands for. Hide it on every backend, including Metal.
if native_q.startswith("mlx-") or "mlx" in (m.get("name") or "").lower():
continue
# ROCm support for vLLM/SGLang quantized safetensors is too brittle to
# recommend blindly in the default scan. Keep AWQ/GPTQ/FP8 discoverable
# only when the user explicitly picks that format from the quant filter;
# otherwise prefer GGUF/Q* entries that Odysseus can route through
# llama.cpp/Ollama without pretending "fits VRAM" means "servable".
if rocm and is_prequantized(m) and not filter_native:
continue
# On Apple Silicon the only serving engines are llama.cpp and Ollama,
# both GGUF-only (vLLM/SGLang are CUDA/ROCm and don't run on macOS). So
# a model is Metal-servable ONLY if it ships a real GGUF. Drop everything
# else — raw safetensors repos (which the catalog still tags with a
# default GGUF quant) and vLLM-only AWQ/GPTQ/FP8 builds alike. Without
# this the Cookbook recommends models the Mac can't run; on CUDA these
# stay visible because vLLM serves safetensors directly.
if apple_silicon and not (m.get("is_gguf") or m.get("gguf_sources")):
continue
# Format filter: AWQ tab -> only AWQ models, FP4 tab -> FP4-family models, etc.
if filter_native:
if quant == "FP8" and native_q != "FP8":
continue
if quant == "FP4" and native_q not in ("FP4", "NVFP4", "MXFP4", "NF4"):
continue
if quant.startswith("AWQ") and not native_q.startswith("AWQ"):
continue
if quant.startswith("GPTQ") and not native_q.startswith("GPTQ"):
continue
if quant.startswith("NVFP4") and not native_q.startswith("NVFP4"):
continue
if quant in ("INT4", "INT8", "W4A16", "W8A8", "W8A16") and native_q != quant:
continue
if search:
name = m.get("name", "").lower()
provider = m.get("provider", "").lower()
if search.lower() not in name and search.lower() not in provider:
continue
result = analyze_model(m, system, target_quant=quant, scoring_use_case=(use_case or "general"))
if result is None:
continue
if use_case:
model_uc = infer_use_case(m)
if use_case != model_uc and use_case != "general":
continue
results.append(result)
# Pick the visible SET by best fit (score) first, so it stays the same no
# matter which column the user sorts by — otherwise sorting by params would
# truncate to the N biggest models (huge ones that don't even fit) while
# sorting by vram showed the N smallest. Only AFTER choosing the set do we
# order it by the requested column.
results.sort(key=SORT_KEYS["score"], reverse=True)
results = results[:limit]
sort_fn = SORT_KEYS.get(sort, SORT_KEYS["score"])
# vram ascending (smallest first), everything else descending (biggest first)
results.sort(key=sort_fn, reverse=(sort != "vram"))
return results