doubao
This commit is contained in:
120
conftest.py
Normal file
120
conftest.py
Normal file
@@ -0,0 +1,120 @@
|
||||
from typing import Callable, Literal
|
||||
from glrocky.framework.schemas import (
|
||||
TestSessionRuntime,
|
||||
Device as _Device,
|
||||
TestCaseExecutionState,
|
||||
TestCaseRuntime,
|
||||
)
|
||||
from glrocky.framework.const_stash_keys import CASE_RUNTIME_CONFIG_KEY
|
||||
import pytest
|
||||
from glrocky.framework.schemas import TestCaseMertic as MetricItem
|
||||
from glrocky.framework.schemas import TestCaseMetrics as MetricPacked
|
||||
from glrocky.core.logger import core_logger
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def device_info(
|
||||
request: pytest.FixtureRequest,
|
||||
) -> _Device:
|
||||
runtime: TestSessionRuntime | None = getattr(request.config, "project", None)
|
||||
if runtime is None:
|
||||
raise RuntimeError("unreachable!!!!!")
|
||||
ret = runtime.devices[0] if runtime.devices else None
|
||||
if not ret:
|
||||
raise RuntimeError("no device found")
|
||||
return ret
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def metric(record_metric: Callable[[MetricPacked], None]):
|
||||
"""usage:
|
||||
def test_g(metric):
|
||||
metric.span() # optional
|
||||
metric.add()
|
||||
metric.add()
|
||||
metric.send_all() # optianl
|
||||
"""
|
||||
|
||||
class M:
|
||||
def __init__(self):
|
||||
self.data: list[tuple[str, str, int, list[MetricItem]]] = []
|
||||
self.span()
|
||||
|
||||
def add(
|
||||
self,
|
||||
name: str,
|
||||
value: float | int | str,
|
||||
label: str | None = None,
|
||||
type: Literal["number", "image", "video", "text"] = "text",
|
||||
) -> None:
|
||||
metric = MetricItem(
|
||||
name=name, value=value, label=label or name, unit=None, type=type
|
||||
)
|
||||
self.data[-1][-1].append(metric)
|
||||
|
||||
def span(
|
||||
self,
|
||||
material_type: str = "default",
|
||||
material_id: str = "default",
|
||||
material_iteration: int = 1,
|
||||
):
|
||||
# DO NOT ADD DUPLICATE ITEM!!
|
||||
if len(self.data):
|
||||
last_span = self.data[-1]
|
||||
if all(
|
||||
(
|
||||
last_span[0] == material_type,
|
||||
last_span[1] == material_id,
|
||||
last_span[2] == material_iteration,
|
||||
)
|
||||
):
|
||||
core_logger.debug(
|
||||
f"SKIP DUPLICATE SPAN:{material_type=},{material_id=},{material_iteration=}"
|
||||
)
|
||||
return
|
||||
|
||||
self.data.append((material_type, material_id, material_iteration, []))
|
||||
core_logger.info(
|
||||
f"added new span for metric:{material_type=},{material_id=},{material_iteration=}"
|
||||
)
|
||||
|
||||
def send_all(
|
||||
self,
|
||||
):
|
||||
if len(self.data) < 1:
|
||||
core_logger.debug("no materics to send")
|
||||
return # no span at all,skip
|
||||
|
||||
data = self.data[::-1]
|
||||
self.data.clear() # TODO: not safe,to be improved
|
||||
core_logger.info(f"{len(data)} metrics for send.")
|
||||
while True:
|
||||
if len(data) == 0:
|
||||
break
|
||||
item = data.pop()
|
||||
material_type, material_id, material_iteration, metrics = (
|
||||
item[0],
|
||||
item[1],
|
||||
item[2],
|
||||
item[3],
|
||||
)
|
||||
if len(metrics) == 0:
|
||||
core_logger.info("skip empty metrics")
|
||||
continue
|
||||
for_send = MetricPacked(
|
||||
type=material_type,
|
||||
material=material_id,
|
||||
iteration=material_iteration,
|
||||
metrics=metrics,
|
||||
)
|
||||
core_logger.info(f"rebuild metric:{for_send=}")
|
||||
core_logger.info("sending metrics to executor...")
|
||||
record_metric(for_send)
|
||||
core_logger.info("sent")
|
||||
|
||||
_m = M()
|
||||
yield _m
|
||||
try:
|
||||
_m.send_all()
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Cloud not send metric to server,Detail: {e}")
|
||||
Reference in New Issue
Block a user