1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154 | from collections import defaultdict
import operator
from typing import Any
from pytest_park.core.naming import parse_method_name
def default_pytest_benchmark_group_stats(
config: Any,
benchmarks: list[Any],
group_by: str,
*,
original_postfix: str | None = None,
reference_postfix: str | None = None,
group_values_by_postfix: dict[str, str] | None = None,
ignore_params: list[str] | None = None,
) -> list[tuple[str, list[Any]]]:
"""Group pytest-benchmark entries by split base method name.
This is intended as a drop-in helper for overriding ``pytest_benchmark_group_stats``
in a test suite and keeping benchmark name parts available in ``extra_info``.
"""
configured_original_postfix = original_postfix or _read_postfix(config, "benchmark_original_postfix")
configured_reference_postfix = reference_postfix or _read_postfix(config, "benchmark_reference_postfix")
postfixes = [value for value in (configured_original_postfix, configured_reference_postfix) if value]
postfix_value_map = {
key.strip(): value for key, value in (group_values_by_postfix or {}).items() if key and key.strip()
}
groups: dict[str | None, list[Any]] = defaultdict(list)
for benchmark in benchmarks:
benchmark_name = _read_benchmark_name(benchmark)
parts = parse_method_name(benchmark_name, postfixes)
_store_name_parts(benchmark, parts.base_name, parts.parameters, parts.postfix)
key = []
for grouping in group_by.split(","):
if grouping in {"group", "name", "method", "func"}:
key.append(parts.base_name)
elif grouping == "fullname":
fullname = _read_benchmark_attr(benchmark, "fullname") or benchmark_name
prefix, separator, _ = fullname.rpartition("::")
fullname_base = prefix + separator + parts.base_name
if "[" in fullname:
fullname_base += fullname[fullname.index("[") :]
key.append(fullname_base)
elif grouping == "fullfunc":
fullname = _read_benchmark_attr(benchmark, "fullname") or benchmark_name
prefix, separator, _ = fullname.rpartition("::")
fullname_base = prefix + separator + parts.base_name
key.append(fullname_base)
elif grouping == "param":
key.append(_filter_ignored_params(benchmark, ignore_params))
elif grouping.startswith("param:"):
param_name = grouping[len("param:") :]
if not ignore_params or param_name not in ignore_params:
params_dict = _read_benchmark_attr(benchmark, "params") or {}
if param_name in params_dict:
key.append(f"{param_name}={params_dict[param_name]}")
elif grouping in {"postfix", "benchmark_postfix"}:
fallback = parts.postfix or "none"
key.append(postfix_value_map.get(fallback, fallback))
else:
# Fallback for unknown groupings
key.append(benchmark_name)
group_key = " ".join(str(p) for p in key if p is not None) or None
groups[group_key].append(benchmark)
for grouped_benchmarks in groups.values():
grouped_benchmarks.sort(
key=lambda b: _read_benchmark_attr(b, "fullname" if "full" in group_by else "name") or ""
)
return sorted(
((k, v) for k, v in groups.items() if k is not None),
key=operator.itemgetter(0),
)
def _read_benchmark_attr(benchmark: Any, attr: str) -> Any:
if isinstance(benchmark, dict):
return benchmark.get(attr)
return getattr(benchmark, attr, None)
def _filter_ignored_params(bench: Any, ignore_params: list[str] | None) -> str:
param_str = _read_benchmark_attr(bench, "param") or ""
params_dict = _read_benchmark_attr(bench, "params") or {}
if not ignore_params or not params_dict:
return str(param_str)
filtered_parts = str(param_str).split("-") if param_str else []
for param_name in ignore_params:
if param_name in params_dict:
param_value = str(params_dict[param_name])
filtered_parts = [p for p in filtered_parts if p != param_value]
return "-".join(filtered_parts) if filtered_parts else str(param_str)
def _read_postfix(config: Any, option_name: str) -> str | None:
getoption = getattr(config, "getoption", None)
if callable(getoption):
value = getoption(option_name, default="")
if isinstance(value, str) and value.strip():
return value.strip()
option = getattr(config, "option", None)
if option is None:
return None
value = getattr(option, option_name, "")
if isinstance(value, str) and value.strip():
return value.strip()
return None
def _read_benchmark_name(benchmark: Any) -> str:
if isinstance(benchmark, dict):
return str(benchmark.get("name") or benchmark.get("fullname") or "unknown")
name = getattr(benchmark, "name", None)
if isinstance(name, str) and name:
return name
fullname = getattr(benchmark, "fullname", None)
if isinstance(fullname, str) and fullname:
return fullname.rsplit("::", 1)[-1]
return "unknown"
def _store_name_parts(benchmark: Any, base_name: str, parameters: str | None, postfix: str | None) -> None:
name_parts = {
"base_name": base_name,
"parameters": parameters,
"postfix": postfix,
}
if isinstance(benchmark, dict):
extra_info = benchmark.get("extra_info")
if not isinstance(extra_info, dict):
extra_info = {}
benchmark["extra_info"] = extra_info
extra_info["pytest_park_name_parts"] = name_parts
return
extra_info = getattr(benchmark, "extra_info", None)
if not isinstance(extra_info, dict):
extra_info = {}
benchmark.extra_info = extra_info
extra_info["pytest_park_name_parts"] = name_parts
|