from typing import Callable, Literal from glrocky.framework.schemas import ( TestSessionRuntime, Device as _Device, TestCaseExecutionState, TestCaseRuntime, ) from glrocky.framework.const_stash_keys import CASE_RUNTIME_CONFIG_KEY import pytest from glrocky.framework.schemas import TestCaseMertic as MetricItem from glrocky.framework.schemas import TestCaseMetrics as MetricPacked from glrocky.core.logger import core_logger @pytest.fixture(scope="function") def device_info( request: pytest.FixtureRequest, ) -> _Device: runtime: TestSessionRuntime | None = getattr(request.config, "project", None) if runtime is None: raise RuntimeError("unreachable!!!!!") ret = runtime.devices[0] if runtime.devices else None if not ret: raise RuntimeError("no device found") return ret @pytest.fixture(scope="function", autouse=True) def metric(record_metric: Callable[[MetricPacked], None]): """usage: def test_g(metric): metric.span() # optional metric.add() metric.add() metric.send_all() # optianl """ class M: def __init__(self): self.data: list[tuple[str, str, int, list[MetricItem]]] = [] self.span() def add( self, name: str, value: float | int | str, label: str | None = None, type: Literal["number", "image", "video", "text"] = "text", ) -> None: metric = MetricItem( name=name, value=value, label=label or name, unit=None, type=type ) self.data[-1][-1].append(metric) def span( self, material_type: str = "default", material_id: str = "default", material_iteration: int = 1, ): # DO NOT ADD DUPLICATE ITEM!! if len(self.data): last_span = self.data[-1] if all( ( last_span[0] == material_type, last_span[1] == material_id, last_span[2] == material_iteration, ) ): core_logger.debug( f"SKIP DUPLICATE SPAN:{material_type=},{material_id=},{material_iteration=}" ) return self.data.append((material_type, material_id, material_iteration, [])) core_logger.info( f"added new span for metric:{material_type=},{material_id=},{material_iteration=}" ) def send_all( self, ): if len(self.data) < 1: core_logger.debug("no materics to send") return # no span at all,skip data = self.data[::-1] self.data.clear() # TODO: not safe,to be improved core_logger.info(f"{len(data)} metrics for send.") while True: if len(data) == 0: break item = data.pop() material_type, material_id, material_iteration, metrics = ( item[0], item[1], item[2], item[3], ) if len(metrics) == 0: core_logger.info("skip empty metrics") continue for_send = MetricPacked( type=material_type, material=material_id, iteration=material_iteration, metrics=metrics, ) core_logger.info(f"rebuild metric:{for_send=}") core_logger.info("sending metrics to executor...") record_metric(for_send) core_logger.info("sent") _m = M() yield _m try: _m.send_all() except Exception as e: raise RuntimeError(f"Cloud not send metric to server,Detail: {e}")