Skip to content

vllm.benchmarks.sweep.plot_pareto

parser module-attribute

parser = ArgumentParser(description=parser_help)

SweepPlotParetoArgs dataclass

Source code in vllm/benchmarks/sweep/plot_pareto.py
@dataclass
class SweepPlotParetoArgs:
    output_dir: Path
    user_count_var: str | None
    gpu_count_var: str | None
    label_by: list[str]
    dry_run: bool

    parser_name: ClassVar[str] = "plot_pareto"
    parser_help: ClassVar[str] = (
        "Plot Pareto frontier between tokens/s/user and tokens/s/GPU "
        "from parameter sweep results."
    )

    @classmethod
    def from_cli_args(cls, args: argparse.Namespace):
        output_dir = Path(args.OUTPUT_DIR)
        if not output_dir.exists():
            raise ValueError(f"No parameter sweep results under {output_dir}")

        label_by = [] if not args.label_by else args.label_by.split(",")

        return cls(
            output_dir=output_dir,
            user_count_var=args.user_count_var,
            gpu_count_var=args.gpu_count_var,
            label_by=label_by,
            dry_run=args.dry_run,
        )

    @classmethod
    def add_cli_args(cls, parser: argparse.ArgumentParser):
        parser.add_argument(
            "OUTPUT_DIR",
            type=str,
            default="results",
            help="The directory containing the sweep results to plot.",
        )
        parser.add_argument(
            "--user-count-var",
            type=str,
            default="max_concurrency",
            help="Result key that stores concurrent user count. "
            "Falls back to max_concurrent_requests if missing.",
        )
        parser.add_argument(
            "--gpu-count-var",
            type=str,
            default=None,
            help="Result key that stores GPU count. "
            "If not provided, falls back to num_gpus/gpu_count "
            "or tensor_parallel_size * pipeline_parallel_size.",
        )
        parser.add_argument(
            "--label-by",
            type=str,
            default="max_concurrency,gpu_count",
            help="Comma-separated list of fields to annotate on Pareto frontier "
            "points.",
        )
        parser.add_argument(
            "--dry-run",
            action="store_true",
            help="If set, prints the figures to plot without drawing them.",
        )

        return parser

dry_run instance-attribute

dry_run: bool

gpu_count_var instance-attribute

gpu_count_var: str | None

label_by instance-attribute

label_by: list[str]

output_dir instance-attribute

output_dir: Path

parser_help class-attribute

parser_help: str = "Plot Pareto frontier between tokens/s/user and tokens/s/GPU from parameter sweep results."

parser_name class-attribute

parser_name: str = 'plot_pareto'

user_count_var instance-attribute

user_count_var: str | None

__init__

__init__(
    output_dir: Path,
    user_count_var: str | None,
    gpu_count_var: str | None,
    label_by: list[str],
    dry_run: bool,
) -> None

add_cli_args classmethod

add_cli_args(parser: ArgumentParser)
Source code in vllm/benchmarks/sweep/plot_pareto.py
@classmethod
def add_cli_args(cls, parser: argparse.ArgumentParser):
    parser.add_argument(
        "OUTPUT_DIR",
        type=str,
        default="results",
        help="The directory containing the sweep results to plot.",
    )
    parser.add_argument(
        "--user-count-var",
        type=str,
        default="max_concurrency",
        help="Result key that stores concurrent user count. "
        "Falls back to max_concurrent_requests if missing.",
    )
    parser.add_argument(
        "--gpu-count-var",
        type=str,
        default=None,
        help="Result key that stores GPU count. "
        "If not provided, falls back to num_gpus/gpu_count "
        "or tensor_parallel_size * pipeline_parallel_size.",
    )
    parser.add_argument(
        "--label-by",
        type=str,
        default="max_concurrency,gpu_count",
        help="Comma-separated list of fields to annotate on Pareto frontier "
        "points.",
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        help="If set, prints the figures to plot without drawing them.",
    )

    return parser

from_cli_args classmethod

from_cli_args(args: Namespace)
Source code in vllm/benchmarks/sweep/plot_pareto.py
@classmethod
def from_cli_args(cls, args: argparse.Namespace):
    output_dir = Path(args.OUTPUT_DIR)
    if not output_dir.exists():
        raise ValueError(f"No parameter sweep results under {output_dir}")

    label_by = [] if not args.label_by else args.label_by.split(",")

    return cls(
        output_dir=output_dir,
        user_count_var=args.user_count_var,
        gpu_count_var=args.gpu_count_var,
        label_by=label_by,
        dry_run=args.dry_run,
    )

_first_present

_first_present(
    run_data: dict[str, object], keys: list[str]
)
Source code in vllm/benchmarks/sweep/plot_pareto.py
def _first_present(run_data: dict[str, object], keys: list[str]):
    for key in keys:
        for candidate in {key, key.replace("_", "-"), key.replace("-", "_")}:
            if candidate in run_data:
                return run_data[candidate]
    return None

_get_fig_path

_get_fig_path(
    fig_dir: Path, fig_group: tuple[tuple[str, str], ...]
) -> Path
Source code in vllm/benchmarks/sweep/plot_pareto.py
def _get_fig_path(
    fig_dir: Path,
    fig_group: tuple[tuple[str, str], ...],
) -> Path:
    parts = ["PARETO"]
    if fig_group:
        parts.extend(f"{k}={v}" for k, v in fig_group)
    filename = sanitize_filename("-".join(parts) + ".png")
    return fig_dir / filename

_get_numeric

_get_numeric(
    run_data: dict[str, object],
    keys: list[str],
    *,
    allow_zero: bool = True,
) -> float | None
Source code in vllm/benchmarks/sweep/plot_pareto.py
def _get_numeric(
    run_data: dict[str, object],
    keys: list[str],
    *,
    allow_zero: bool = True,
) -> float | None:
    value = _first_present(run_data, keys)
    if value is None:
        return None

    try:
        numeric = float(value)
    except (TypeError, ValueError) as exc:
        raise ValueError(
            f"Expected numeric value for one of {keys}, "
            f"but found {value!r} in {run_data=}"
        ) from exc

    if not allow_zero and numeric == 0:
        return None

    return numeric

_get_throughput

_get_throughput(
    run_data: dict[str, object], throughput_var: str
) -> float
Source code in vllm/benchmarks/sweep/plot_pareto.py
def _get_throughput(
    run_data: dict[str, object],
    throughput_var: str,
) -> float:
    throughput = _get_numeric(run_data, [throughput_var])
    if throughput is None:
        raise ValueError(
            f"Cannot find throughput metric {throughput_var!r} in run data. "
            f"Available keys: {sorted(run_data)}"
        )

    return throughput

_infer_gpu_count

_infer_gpu_count(
    run_data: dict[str, object], gpu_count_var: str | None
) -> float
Source code in vllm/benchmarks/sweep/plot_pareto.py
def _infer_gpu_count(
    run_data: dict[str, object],
    gpu_count_var: str | None,
) -> float:
    direct_candidates = [gpu_count_var] if gpu_count_var else []
    direct_gpu_count = _get_numeric(run_data, direct_candidates, allow_zero=False)
    if direct_gpu_count:
        return direct_gpu_count

    tp_size = _get_numeric(run_data, ["tensor_parallel_size", "tp"])
    pp_size = _get_numeric(run_data, ["pipeline_parallel_size", "pp"])
    dp_size = _get_numeric(run_data, ["data_parallel_size", "dp"])
    world_size = 1.0
    if tp_size:
        world_size *= tp_size
    if pp_size:
        world_size *= pp_size
    if dp_size:
        world_size *= dp_size

    return world_size

_infer_user_count

_infer_user_count(
    run_data: dict[str, object], user_count_var: str | None
) -> float | None
Source code in vllm/benchmarks/sweep/plot_pareto.py
def _infer_user_count(
    run_data: dict[str, object],
    user_count_var: str | None,
) -> float | None:
    candidates = [user_count_var] if user_count_var else []
    candidates.extend(["request_rate"])
    user_count = _get_numeric(run_data, candidates, allow_zero=False)
    if user_count is not None:
        return user_count

    # Fallback to the observed peak if configured value is missing.
    return _get_numeric(run_data, ["max_concurrent_requests"], allow_zero=False)

_pareto_frontier

_pareto_frontier(
    df: DataFrame,
    x_col: str,
    y_col: str,
    *,
    epsilon: float = 1e-09,
) -> DataFrame
Source code in vllm/benchmarks/sweep/plot_pareto.py
def _pareto_frontier(
    df: "pd.DataFrame",
    x_col: str,
    y_col: str,
    *,
    epsilon: float = 1e-9,
) -> "pd.DataFrame":
    sorted_df = df.sort_values([x_col, y_col], ascending=[False, False])
    frontier_indices = []
    best_y = -math.inf

    for idx, row in sorted_df.iterrows():
        y_val = row[y_col]
        if y_val >= best_y - epsilon:
            frontier_indices.append(idx)
            best_y = max(best_y, y_val)

    return df.loc[frontier_indices]

_plot_fig

_plot_fig(
    fig_dir: Path,
    fig_group_data: tuple[
        tuple[tuple[str, str], ...], list[dict[str, object]]
    ],
    label_by: list[str],
    *,
    dry_run: bool,
)
Source code in vllm/benchmarks/sweep/plot_pareto.py
def _plot_fig(
    fig_dir: Path,
    fig_group_data: tuple[tuple[tuple[str, str], ...], list[dict[str, object]]],
    label_by: list[str],
    *,
    dry_run: bool,
):
    fig_group, fig_data = fig_group_data
    fig_path = _get_fig_path(fig_dir, fig_group)

    print("[BEGIN FIGURE]")
    print(f"Group: {dict(fig_group)}")
    print(f"Output file: {fig_path}")

    if dry_run:
        print("[END FIGURE]")
        return

    df = pd.DataFrame.from_records(fig_data)
    df = df.dropna(subset=["tokens_per_user", "tokens_per_gpu"])

    if df.empty:
        print("No data points available after filtering; skipping.")
        print("[END FIGURE]")
        return

    frontier = _pareto_frontier(df, "tokens_per_user", "tokens_per_gpu")
    frontier = frontier.sort_values("tokens_per_user")

    fig, ax = plt.subplots()
    sns.scatterplot(
        data=df,
        x="tokens_per_user",
        y="tokens_per_gpu",
        color="0.5",
        alpha=0.6,
        ax=ax,
        label="All runs",
    )
    sns.lineplot(
        data=frontier,
        x="tokens_per_user",
        y="tokens_per_gpu",
        marker="o",
        ax=ax,
        label="Pareto frontier",
    )

    if label_by:
        for _, row in frontier.iterrows():
            label_parts = []
            for key in label_by:
                if key in row:
                    label_parts.append(f"{key}={row[key]}")
            if label_parts:
                ax.text(
                    row["tokens_per_user"],
                    row["tokens_per_gpu"],
                    "\n".join(label_parts),
                    fontsize=8,
                )

    ax.set_xlabel("Tokens/s/user")
    ax.set_ylabel("Tokens/s/GPU")
    ax.grid(True, linestyle="--", linewidth=0.5, alpha=0.6)
    fig.tight_layout()
    fig.savefig(fig_path)
    plt.close(fig)

    print(
        f"Plotted {len(df)} points; Pareto frontier size: {len(frontier)}.",
    )
    print("[END FIGURE]")

_prepare_records

_prepare_records(
    all_data: list[dict[str, object]],
    *,
    user_count_var: str | None,
    gpu_count_var: str | None,
) -> tuple[list[dict[str, object]], int]
Source code in vllm/benchmarks/sweep/plot_pareto.py
def _prepare_records(
    all_data: list[dict[str, object]],
    *,
    user_count_var: str | None,
    gpu_count_var: str | None,
) -> tuple[list[dict[str, object]], int]:
    prepared = []
    skipped_missing_users = 0

    for record in all_data:
        throughput = _get_throughput(record, "output_throughput")
        user_count = _infer_user_count(record, user_count_var)
        if user_count is None:
            skipped_missing_users += 1
            continue

        gpu_count = _infer_gpu_count(record, gpu_count_var)
        tokens_per_user = throughput / user_count
        tokens_per_gpu = throughput / gpu_count

        prepared.append(
            {
                **record,
                "tokens_per_user": tokens_per_user,
                "tokens_per_gpu": tokens_per_gpu,
                "user_count_estimate": user_count,
                "gpu_count": gpu_count,
            }
        )

    return prepared, skipped_missing_users

main

main(args: Namespace)
Source code in vllm/benchmarks/sweep/plot_pareto.py
def main(args: argparse.Namespace):
    run_main(SweepPlotParetoArgs.from_cli_args(args))

plot_pareto

plot_pareto(
    output_dir: Path,
    user_count_var: str | None,
    gpu_count_var: str | None,
    label_by: list[str],
    *,
    dry_run: bool,
)
Source code in vllm/benchmarks/sweep/plot_pareto.py
def plot_pareto(
    output_dir: Path,
    user_count_var: str | None,
    gpu_count_var: str | None,
    label_by: list[str],
    *,
    dry_run: bool,
):
    fig_dir = output_dir / "pareto"
    raw_data = [
        run_data
        for path in output_dir.rglob("**/summary.json")
        for run_data in _json_load_bytes(path)
    ]

    if not raw_data:
        raise ValueError(f"Did not find any parameter sweep results under {output_dir}")

    fig_dir.mkdir(parents=True, exist_ok=True)

    prepared_data, skipped_missing_users = _prepare_records(
        raw_data,
        user_count_var=user_count_var,
        gpu_count_var=gpu_count_var,
    )

    if skipped_missing_users:
        print(
            f"Skipped {skipped_missing_users} runs without a user count "
            "(`max_concurrency` or `max_concurrent_requests`).",
        )

    if not prepared_data:
        raise ValueError(
            "No data points with both throughput and user count available "
            "to plot Pareto frontier.",
        )

    fig_groups = full_groupby(
        prepared_data,
        key=lambda item: tuple(),
    )

    with DummyExecutor() if len(fig_groups) <= 1 else ProcessPoolExecutor() as executor:
        all(
            executor.map(
                partial(
                    _plot_fig,
                    fig_dir,
                    label_by=label_by,
                    dry_run=dry_run,
                ),
                fig_groups,
            )
        )

run_main

run_main(args: SweepPlotParetoArgs)
Source code in vllm/benchmarks/sweep/plot_pareto.py
def run_main(args: SweepPlotParetoArgs):
    return plot_pareto(
        output_dir=args.output_dir,
        user_count_var=args.user_count_var,
        gpu_count_var=args.gpu_count_var,
        label_by=args.label_by,
        dry_run=args.dry_run,
    )