121 lines
3.9 KiB
Python
121 lines
3.9 KiB
Python
from typing import Callable, Literal
|
|
from glrocky.framework.schemas import (
|
|
TestSessionRuntime,
|
|
Device as _Device,
|
|
TestCaseExecutionState,
|
|
TestCaseRuntime,
|
|
)
|
|
from glrocky.framework.const_stash_keys import CASE_RUNTIME_CONFIG_KEY
|
|
import pytest
|
|
from glrocky.framework.schemas import TestCaseMertic as MetricItem
|
|
from glrocky.framework.schemas import TestCaseMetrics as MetricPacked
|
|
from glrocky.core.logger import core_logger
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def device_info(
|
|
request: pytest.FixtureRequest,
|
|
) -> _Device:
|
|
runtime: TestSessionRuntime | None = getattr(request.config, "project", None)
|
|
if runtime is None:
|
|
raise RuntimeError("unreachable!!!!!")
|
|
ret = runtime.devices[0] if runtime.devices else None
|
|
if not ret:
|
|
raise RuntimeError("no device found")
|
|
return ret
|
|
|
|
|
|
@pytest.fixture(scope="function", autouse=True)
|
|
def metric(record_metric: Callable[[MetricPacked], None]):
|
|
"""usage:
|
|
def test_g(metric):
|
|
metric.span() # optional
|
|
metric.add()
|
|
metric.add()
|
|
metric.send_all() # optianl
|
|
"""
|
|
|
|
class M:
|
|
def __init__(self):
|
|
self.data: list[tuple[str, str, int, list[MetricItem]]] = []
|
|
self.span()
|
|
|
|
def add(
|
|
self,
|
|
name: str,
|
|
value: float | int | str,
|
|
label: str | None = None,
|
|
type: Literal["number", "image", "video", "text"] = "text",
|
|
) -> None:
|
|
metric = MetricItem(
|
|
name=name, value=value, label=label or name, unit=None, type=type
|
|
)
|
|
self.data[-1][-1].append(metric)
|
|
|
|
def span(
|
|
self,
|
|
material_type: str = "default",
|
|
material_id: str = "default",
|
|
material_iteration: int = 1,
|
|
):
|
|
# DO NOT ADD DUPLICATE ITEM!!
|
|
if len(self.data):
|
|
last_span = self.data[-1]
|
|
if all(
|
|
(
|
|
last_span[0] == material_type,
|
|
last_span[1] == material_id,
|
|
last_span[2] == material_iteration,
|
|
)
|
|
):
|
|
core_logger.debug(
|
|
f"SKIP DUPLICATE SPAN:{material_type=},{material_id=},{material_iteration=}"
|
|
)
|
|
return
|
|
|
|
self.data.append((material_type, material_id, material_iteration, []))
|
|
core_logger.info(
|
|
f"added new span for metric:{material_type=},{material_id=},{material_iteration=}"
|
|
)
|
|
|
|
def send_all(
|
|
self,
|
|
):
|
|
if len(self.data) < 1:
|
|
core_logger.debug("no materics to send")
|
|
return # no span at all,skip
|
|
|
|
data = self.data[::-1]
|
|
self.data.clear() # TODO: not safe,to be improved
|
|
core_logger.info(f"{len(data)} metrics for send.")
|
|
while True:
|
|
if len(data) == 0:
|
|
break
|
|
item = data.pop()
|
|
material_type, material_id, material_iteration, metrics = (
|
|
item[0],
|
|
item[1],
|
|
item[2],
|
|
item[3],
|
|
)
|
|
if len(metrics) == 0:
|
|
core_logger.info("skip empty metrics")
|
|
continue
|
|
for_send = MetricPacked(
|
|
type=material_type,
|
|
material=material_id,
|
|
iteration=material_iteration,
|
|
metrics=metrics,
|
|
)
|
|
core_logger.info(f"rebuild metric:{for_send=}")
|
|
core_logger.info("sending metrics to executor...")
|
|
record_metric(for_send)
|
|
core_logger.info("sent")
|
|
|
|
_m = M()
|
|
yield _m
|
|
try:
|
|
_m.send_all()
|
|
except Exception as e:
|
|
raise RuntimeError(f"Cloud not send metric to server,Detail: {e}")
|