diff --git a/.github/workflows/templates/test.yml.j2 b/.github/workflows/templates/test.yml.j2 index 8ca54829928..d0c66c2a16f 100644 --- a/.github/workflows/templates/test.yml.j2 +++ b/.github/workflows/templates/test.yml.j2 @@ -20,6 +20,7 @@ env: contains(github.event.pull_request.labels.*.name, 'backport') && github.event.pull_request.base.ref || 'main' ) || 'main' }}{% endraw %} + WEAVER_VERSION: "v0.22.1" PIP_EXISTS_ACTION: w CONTRIB_REPO_UTIL_HTTP: ${% raw %}{{ github.workspace }}{% endraw %}/opentelemetry-python-contrib/util/opentelemetry-util-http CONTRIB_REPO_INSTRUMENTATION: ${% raw %}{{ github.workspace }}{% endraw %}/opentelemetry-python-contrib/opentelemetry-instrumentation @@ -51,6 +52,14 @@ jobs: ref: ${% raw %}{{ env.CONTRIB_REPO_SHA }}{% endraw %} path: opentelemetry-python-contrib {%- endif %} + {%- if "test-opentelemetry-test-utils" in job_data.tox_env and job_data.os != "windows-latest" %} + + - name: Install weaver + run: | + WEAVER_URL="https://github.com/open-telemetry/weaver/releases/download/${% raw %}{{ env.WEAVER_VERSION }}{% endraw %}/weaver-x86_64-unknown-linux-gnu.tar.xz" + curl -sSL "$WEAVER_URL" | tar -xJ -C /tmp weaver-x86_64-unknown-linux-gnu/weaver + sudo mv /tmp/weaver-x86_64-unknown-linux-gnu/weaver /usr/local/bin/weaver + {%- endif %} - name: Set up Python {{ job_data.python_version }} uses: actions/setup-python@v5 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 1c09aa0e5dd..01ce3922d10 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -20,6 +20,7 @@ env: contains(github.event.pull_request.labels.*.name, 'backport') && github.event.pull_request.base.ref || 'main' ) || 'main' }} + WEAVER_VERSION: "v0.22.1" PIP_EXISTS_ACTION: w CONTRIB_REPO_UTIL_HTTP: ${{ github.workspace }}/opentelemetry-python-contrib/util/opentelemetry-util-http CONTRIB_REPO_INSTRUMENTATION: ${{ github.workspace }}/opentelemetry-python-contrib/opentelemetry-instrumentation @@ -2865,6 +2866,12 @@ jobs: - name: Checkout repo @ SHA - ${{ github.sha }} uses: actions/checkout@v4 + - name: Install weaver + run: | + WEAVER_URL="https://github.com/open-telemetry/weaver/releases/download/${{ env.WEAVER_VERSION }}/weaver-x86_64-unknown-linux-gnu.tar.xz" + curl -sSL "$WEAVER_URL" | tar -xJ -C /tmp weaver-x86_64-unknown-linux-gnu/weaver + sudo mv /tmp/weaver-x86_64-unknown-linux-gnu/weaver /usr/local/bin/weaver + - name: Set up Python 3.10 uses: actions/setup-python@v5 with: @@ -2884,6 +2891,12 @@ jobs: - name: Checkout repo @ SHA - ${{ github.sha }} uses: actions/checkout@v4 + - name: Install weaver + run: | + WEAVER_URL="https://github.com/open-telemetry/weaver/releases/download/${{ env.WEAVER_VERSION }}/weaver-x86_64-unknown-linux-gnu.tar.xz" + curl -sSL "$WEAVER_URL" | tar -xJ -C /tmp weaver-x86_64-unknown-linux-gnu/weaver + sudo mv /tmp/weaver-x86_64-unknown-linux-gnu/weaver /usr/local/bin/weaver + - name: Set up Python 3.11 uses: actions/setup-python@v5 with: @@ -2903,6 +2916,12 @@ jobs: - name: Checkout repo @ SHA - ${{ github.sha }} uses: actions/checkout@v4 + - name: Install weaver + run: | + WEAVER_URL="https://github.com/open-telemetry/weaver/releases/download/${{ env.WEAVER_VERSION }}/weaver-x86_64-unknown-linux-gnu.tar.xz" + curl -sSL "$WEAVER_URL" | tar -xJ -C /tmp weaver-x86_64-unknown-linux-gnu/weaver + sudo mv /tmp/weaver-x86_64-unknown-linux-gnu/weaver /usr/local/bin/weaver + - name: Set up Python 3.12 uses: actions/setup-python@v5 with: @@ -2922,6 +2941,12 @@ jobs: - name: Checkout repo @ SHA - ${{ github.sha }} uses: actions/checkout@v4 + - name: Install weaver + run: | + WEAVER_URL="https://github.com/open-telemetry/weaver/releases/download/${{ env.WEAVER_VERSION }}/weaver-x86_64-unknown-linux-gnu.tar.xz" + curl -sSL "$WEAVER_URL" | tar -xJ -C /tmp weaver-x86_64-unknown-linux-gnu/weaver + sudo mv /tmp/weaver-x86_64-unknown-linux-gnu/weaver /usr/local/bin/weaver + - name: Set up Python 3.13 uses: actions/setup-python@v5 with: @@ -2941,6 +2966,12 @@ jobs: - name: Checkout repo @ SHA - ${{ github.sha }} uses: actions/checkout@v4 + - name: Install weaver + run: | + WEAVER_URL="https://github.com/open-telemetry/weaver/releases/download/${{ env.WEAVER_VERSION }}/weaver-x86_64-unknown-linux-gnu.tar.xz" + curl -sSL "$WEAVER_URL" | tar -xJ -C /tmp weaver-x86_64-unknown-linux-gnu/weaver + sudo mv /tmp/weaver-x86_64-unknown-linux-gnu/weaver /usr/local/bin/weaver + - name: Set up Python 3.14 uses: actions/setup-python@v5 with: @@ -2960,6 +2991,12 @@ jobs: - name: Checkout repo @ SHA - ${{ github.sha }} uses: actions/checkout@v4 + - name: Install weaver + run: | + WEAVER_URL="https://github.com/open-telemetry/weaver/releases/download/${{ env.WEAVER_VERSION }}/weaver-x86_64-unknown-linux-gnu.tar.xz" + curl -sSL "$WEAVER_URL" | tar -xJ -C /tmp weaver-x86_64-unknown-linux-gnu/weaver + sudo mv /tmp/weaver-x86_64-unknown-linux-gnu/weaver /usr/local/bin/weaver + - name: Set up Python 3.14t uses: actions/setup-python@v5 with: @@ -2979,6 +3016,12 @@ jobs: - name: Checkout repo @ SHA - ${{ github.sha }} uses: actions/checkout@v4 + - name: Install weaver + run: | + WEAVER_URL="https://github.com/open-telemetry/weaver/releases/download/${{ env.WEAVER_VERSION }}/weaver-x86_64-unknown-linux-gnu.tar.xz" + curl -sSL "$WEAVER_URL" | tar -xJ -C /tmp weaver-x86_64-unknown-linux-gnu/weaver + sudo mv /tmp/weaver-x86_64-unknown-linux-gnu/weaver /usr/local/bin/weaver + - name: Set up Python pypy-3.10 uses: actions/setup-python@v5 with: diff --git a/CHANGELOG.md b/CHANGELOG.md index 0785e4b7244..8bd0a53c3c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,7 +28,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#5076](https://github.com/open-telemetry/opentelemetry-python/pull/5076)) - `opentelemetry-semantic-conventions`: use `X | Y` union annotation ([#5096](https://github.com/open-telemetry/opentelemetry-python/pull/5096)) - +- Add WeaverLiveCheck test util + ([#5088](https://github.com/open-telemetry/opentelemetry-python/pull/5088)) ## Version 1.41.0/0.62b0 (2026-04-09) diff --git a/tests/opentelemetry-test-utils/pyproject.toml b/tests/opentelemetry-test-utils/pyproject.toml index 974e1972089..a273592c771 100644 --- a/tests/opentelemetry-test-utils/pyproject.toml +++ b/tests/opentelemetry-test-utils/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "asgiref ~= 3.0", "opentelemetry-api == 1.42.0.dev", "opentelemetry-sdk == 1.42.0.dev", + "requests ~= 2.28", ] [project.urls] diff --git a/tests/opentelemetry-test-utils/src/opentelemetry/test/weaver_live_check.py b/tests/opentelemetry-test-utils/src/opentelemetry/test/weaver_live_check.py new file mode 100644 index 00000000000..451587578e4 --- /dev/null +++ b/tests/opentelemetry-test-utils/src/opentelemetry/test/weaver_live_check.py @@ -0,0 +1,464 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import json +import logging +import os +import shutil +import socket +import subprocess +from collections import defaultdict +from itertools import chain +from typing import Any + +from requests import Session, post +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +from opentelemetry.semconv.schemas import Schemas + +logger = logging.getLogger(__name__) + + +def _find_free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("", 0)) + return sock.getsockname()[1] + + +def _extract_violations(report: dict) -> list: + """Extract and deduplicate violations from the full report. + + Get all violations using python version of this jq filter: + [ .. | objects | select(has("live_check_result")) + | .live_check_result.all_advice[]? + | select(.level == "violation") ] + | group_by(.id, .message, .context, .signal_name, .signal_type) + | map({ ..., count: length }) + | sort_by(-.count, .message) + """ + raw: list[dict] = [] + + def _collect(obj: Any) -> list[dict]: + if isinstance(obj, dict): + result: list[dict] = [] + lcr = obj.get("live_check_result") + if isinstance(lcr, dict): + advices = lcr.get("all_advice") + if isinstance(advices, list): + result.extend( + a for a in advices if a.get("level") == "violation" + ) + for value in obj.values(): + result.extend(_collect(value)) + return result + if isinstance(obj, list): + return list(chain.from_iterable(_collect(item) for item in obj)) + return [] + + raw = _collect(report) + + groups: dict[tuple, list] = defaultdict(list) + for violation in raw: + ctx = violation.get("context") + key = ( + violation.get("id"), + violation.get("message"), + json.dumps(ctx, sort_keys=True) + if isinstance(ctx, (dict, list)) + else ctx, + violation.get("signal_name"), + violation.get("signal_type"), + ) + groups[key].append(violation) + + violations = [ + { + "id": k[0], + "message": k[1], + "context": vs[0].get( + "context" + ), # preserve original dict, not JSON string + "signal_name": k[3], + "signal_type": k[4], + "count": len(vs), + } + for k, vs in groups.items() + ] + violations.sort(key=lambda v: (-v["count"], v.get("message") or "")) + return violations + + +def _format_violations(violations: list) -> str: + """Format violations list as human-readable text.""" + lines = [] + for violation in violations: + signal = "" + signal_type = violation.get("signal_type") + signal_name = violation.get("signal_name") + if signal_type and signal_name: + signal = f" on {signal_type} '{signal_name}'" + elif signal_type: + signal = f" on {signal_type}" + elif signal_name: + signal = f" on '{signal_name}'" + lines.append( + f"- [{violation.get('id')}] {violation.get('message')} ({violation['count']} occurrence(s){signal})" + ) + return "\n".join(lines) + + +class LiveCheckError(AssertionError): + """Raised by :meth:`WeaverLiveCheck.end_and_check` when semconv violations are found. + + The full :class:`LiveCheckReport` is attached as :attr:`report` for + structured inspection beyond the human-readable message:: + + with pytest.raises(LiveCheckError) as exc_info: + weaver.end_and_check() + + err = exc_info.value + assert any( + v["id"] == "my_policy_check" + and v["context"]["attribute_name"] == "my.attribute" + for v in err.report.violations + ) + """ + + def __init__(self, message: str, report: "LiveCheckReport") -> None: + super().__init__(message) + self.report = report + + +class LiveCheckReport: + """The result of a weaver live-check run. + + Provides structured access to violations and the full raw JSON report. + + See https://github.com/open-telemetry/weaver/tree/main/crates/weaver_live_check#output + for the full report structure. + + Example — asserting on metrics statistics:: + + report = weaver.end() + seen = report["statistics"]["seen_registry_metrics"] + assert seen.get("http.server.request.duration") == 1 + + Example — asserting on violations:: + + report = weaver.end() + assert any( + v["id"] == "my_policy_check" + and v["context"]["attribute_name"] == "my.attribute" + for v in report.violations + ) + """ + + def __init__(self, report: dict[str, Any]) -> None: + self._report = report + + @functools.cached_property + def violations(self) -> list[dict[str, Any]]: + """Deduplicated list of semconv violations found in the report. + + Each item is a dict with keys: ``id``, ``message``, ``context`` + (the raw context dict from weaver, e.g. ``{"attribute_name": "foo"}``), + ``signal_name``, ``signal_type``, ``count``. + """ + return _extract_violations(self._report) + + def __getitem__(self, key: str) -> Any: + return self._report[key] + + def get(self, key: str, default: Any = None) -> Any: + return self._report.get(key, default) + + def __contains__(self, key: object) -> bool: + return key in self._report + + def __repr__(self) -> str: + num_violations = len(self.violations) + return f"LiveCheckReport({num_violations} violation{'s' if num_violations != 1 else ''})" + + +# NOTE: WeaverLiveCheck is experimental and its API is subject to change. +class WeaverLiveCheck: + """Runs ``weaver registry live-check`` as a subprocess and validates + OTLP telemetry against OpenTelemetry semantic conventions. + + .. note:: + This class is experimental and its API is subject to change without notice. + + + Requires the ``weaver`` binary on PATH: + https://github.com/open-telemetry/weaver/releases + + Typical use as a context manager:: + + def test_my_telemetry(self): + with WeaverLiveCheck() as weaver: + exporter = OTLPSpanExporter( + endpoint=weaver.otlp_endpoint, insecure=True + ) + # ... configure provider, emit telemetry ... + provider.force_flush() + + # Signals weaver to stop, raises LiveCheckError listing violations + # if any, or returns a LiveCheckReport on success. + report = weaver.end_and_check() + # __exit__ calls close(), which is idempotent if end_and_check() was already called + + Use :meth:`end` when you need the full :class:`LiveCheckReport` + regardless of whether violations were found — for example, to assert that + specific metrics were observed or to inspect violation fields directly:: + + with WeaverLiveCheck() as weaver: + # ... configure provider, emit telemetry ... + provider.force_flush() + report = weaver.end() + + seen_metrics = report["statistics"]["seen_registry_metrics"] + assert seen_metrics.get("http.server.request.duration") == 1 + + Lifecycle: + - :meth:`start` — launches weaver and waits for it to become ready. + - :attr:`otlp_endpoint` — gRPC OTLP endpoint to point exporters at. + - :meth:`end` — signals weaver to stop and always returns a + :class:`LiveCheckReport`. Never raises for semconv violations; use + this when you want to write your own assertions. + - :meth:`end_and_check` — signals weaver to stop and raises + :class:`LiveCheckError` with a human-readable violation list and the + full report attached if weaver exits non-zero. Returns a + :class:`LiveCheckReport` on success. + - :meth:`close` — stops weaver if not already stopped and terminates the + process. Never raises for semconv violations. Idempotent; safe to + call even if :meth:`end_and_check` or :meth:`end` was already called. + """ + + def __init__( + self, + registry: str | None = None, + schema_version: str | None = None, + policies_dir: str | None = None, + inactivity_timeout: int = 30, + otlp_port: int = 0, + admin_port: int = 0, + ): + weaver_bin = shutil.which("weaver") + if not weaver_bin: + raise RuntimeError( + "weaver binary not found on PATH. " + "Install it from https://github.com/open-telemetry/weaver/releases" + ) + + self._otlp_port = otlp_port or _find_free_port() + self._admin_port = admin_port or _find_free_port() + self._ready = False + self._stopped = False + self._process: subprocess.Popen[bytes] | None = None + + command = [ + weaver_bin, + "registry", + "live-check", + f"--inactivity-timeout={inactivity_timeout}", + f"--otlp-grpc-port={self._otlp_port}", + f"--admin-port={self._admin_port}", + "--output=http", + "--format=json", + ] + + if policies_dir: + command += ["--advice-policies", os.path.abspath(policies_dir)] + + if registry is None: + if schema_version is None: + schema_version = list(Schemas)[-1].value.rsplit("/", 1)[-1] + registry = f"https://github.com/open-telemetry/semantic-conventions/archive/refs/tags/v{schema_version}.tar.gz[model]" + elif os.path.isdir(registry): + registry = os.path.abspath(registry) + + command += ["--registry", registry] + + self._command = command + logger.debug("Weaver command: %s", command) + + def __enter__(self) -> "WeaverLiveCheck": + return self.start() + + def __exit__(self, exc_type: Any, *_: Any) -> None: + if exc_type is not None: + self._stopped = True + self.close() + + def start(self) -> "WeaverLiveCheck": + logger.debug("Starting WeaverLiveCheck process...") + self._process = subprocess.Popen( # pylint: disable=consider-using-with + self._command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + try: + self._wait_for_ready() + self._ready = True + except Exception as exc: # pylint: disable=broad-except + logs = self._read_weaver_logs() + logger.error( + "WeaverLiveCheck did not start: %s, logs: %s", exc, logs + ) + raise + return self + + def _wait_for_ready(self) -> None: + retry = Retry( + total=10, + backoff_factor=1, + backoff_max=1, + # Any non-2xx response from /health means weaver isn't ready yet. + status_forcelist=list(range(300, 600)), + raise_on_status=True, + allowed_methods=["GET"], + ) + session = Session() + session.mount("http://", HTTPAdapter(max_retries=retry)) + try: + session.get( + f"http://localhost:{self._admin_port}/health", timeout=5 + ) + except Exception as exc: # pylint: disable=broad-except + if self._process is not None and self._process.poll() is not None: + raise RuntimeError( + f"WeaverLiveCheck process exited unexpectedly (code {self._process.returncode})" + ) from exc + raise TimeoutError( + "WeaverLiveCheck did not become ready in time" + ) from exc + + @property + def otlp_endpoint(self) -> str: + return f"http://localhost:{self._otlp_port}" + + def _do_stop(self, timeout: int) -> tuple["LiveCheckReport", int]: + """POST /stop, wait for the process to exit, return (report, exit_code). + + Raises for infrastructure errors (HTTP failure, process communication). + Never raises for semconv violations. + """ + if not self._ready: + raise RuntimeError( + "WeaverLiveCheck process did not start successfully" + ) + try: + response = post( + f"http://localhost:{self._admin_port}/stop", timeout=5 + ) + response.raise_for_status() + report = LiveCheckReport(response.json()) + assert self._process is not None + exit_code = self._process.wait(timeout=timeout) + except Exception as exc: # pylint: disable=broad-except + logs = self._read_weaver_logs() + logger.error( + "Error communicating with weaver: %s, logs: %s", exc, logs + ) + raise + return report, exit_code + + def end(self, timeout: int = 30) -> "LiveCheckReport": + """Signal weaver to stop and return the full :class:`LiveCheckReport`. + + Never raises for semconv violations — use this when you want to write + your own assertions against :attr:`LiveCheckReport.violations` or the + raw report data. + + Raises :exc:`RuntimeError` for infrastructure problems (weaver failed + to start, HTTP communication error, etc.). + + See https://github.com/open-telemetry/weaver/tree/main/crates/weaver_live_check#output + for the report structure. + """ + if self._stopped: + logger.warning( + "end() called after weaver already stopped; returning empty report" + ) + return LiveCheckReport({}) + self._stopped = True + report, _ = self._do_stop(timeout) + return report + + def end_and_check(self, timeout: int = 30) -> "LiveCheckReport": + """Signal weaver to stop and assert no semconv violations were found. + + Returns the :class:`LiveCheckReport` when weaver exits successfully + (exit code 0). + + Does **not** return if weaver exits with a non-zero status — raises + :exc:`LiveCheckError` (a subclass of :exc:`AssertionError`) with a + human-readable list of violations and the full :class:`LiveCheckReport` + attached as :attr:`LiveCheckError.report`. + Use :meth:`end` if you need the report regardless of violations. + + Raises :exc:`RuntimeError` for infrastructure problems (weaver failed + to start, HTTP communication error, etc.). + """ + if self._stopped: + logger.warning( + "end_and_check() called after weaver already stopped; returning empty report" + ) + return LiveCheckReport({}) + self._stopped = True + report, exit_code = self._do_stop(timeout) + if exit_code == 0: + # Success — no violations found, no errors communicating with weaver + return report + raise LiveCheckError( + f"Semconv violations found:\n{_format_violations(report.violations)}", + report, + ) + + def _read_weaver_logs(self) -> str | None: + if self._process is None: + return None + try: + if self._process.poll() is None: + self._process.kill() + out, err = self._process.communicate() + return f"{out.decode()}\n{err.decode()}" + except Exception as exc: # pylint: disable=broad-except + logger.error("Could not get weaver logs: %s", exc) + return None + + def close(self) -> None: + """Stop weaver and clean up the process. + + If weaver has not been stopped yet, sends the ``/stop`` signal and + waits for the process to exit. Never raises for semconv violations. + Idempotent — safe to call multiple times or after :meth:`end` / + :meth:`end_and_check` has already been called. + """ + if not self._stopped: + self._stopped = True + if self._ready: + try: + self._do_stop(timeout=30) + return # process already exited cleanly + except Exception as exc: # pylint: disable=broad-except + logger.debug("Error stopping weaver during close: %s", exc) + if self._process and self._process.poll() is None: + self._process.terminate() + try: + self._process.wait(timeout=5) + except subprocess.TimeoutExpired: + self._process.kill() diff --git a/tests/opentelemetry-test-utils/test-requirements.txt b/tests/opentelemetry-test-utils/test-requirements.txt index 854e7327124..87de9a5f951 100644 --- a/tests/opentelemetry-test-utils/test-requirements.txt +++ b/tests/opentelemetry-test-utils/test-requirements.txt @@ -6,10 +6,16 @@ pluggy==1.6.0 py-cpuinfo==9.0.0 pytest==7.4.4 tomli==2.0.1 -typing_extensions==4.10.0 +typing_extensions==4.12.0 wrapt==1.16.0 zipp==3.19.2 -e opentelemetry-api -e opentelemetry-sdk -e opentelemetry-semantic-conventions -e tests/opentelemetry-test-utils +# these are required for weaver integration tests, we're running that only on linux +# because of lack of support for gRPC on Windows in some cases. +# note: tox does not support PEP 508 markers on `-e` editable installs, so these are installed non-editable +./opentelemetry-proto ; sys_platform != 'win32' +./exporter/opentelemetry-exporter-otlp-proto-common ; sys_platform != 'win32' +./exporter/opentelemetry-exporter-otlp-proto-grpc ; sys_platform != 'win32' \ No newline at end of file diff --git a/tests/opentelemetry-test-utils/tests/test_weaver_live_check.py b/tests/opentelemetry-test-utils/tests/test_weaver_live_check.py new file mode 100644 index 00000000000..209c5de6359 --- /dev/null +++ b/tests/opentelemetry-test-utils/tests/test_weaver_live_check.py @@ -0,0 +1,175 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Live-check tests using Weaver to validate SDK telemetry against semconv. + +Requires the `weaver` binary on PATH: + https://github.com/open-telemetry/weaver/releases +""" + +import os +import shutil +import unittest + +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.test.weaver_live_check import ( + LiveCheckError, + LiveCheckReport, + WeaverLiveCheck, +) + +try: + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( # pylint: disable=no-name-in-module + OTLPSpanExporter, + ) + + _HAS_GRPC = True +except ImportError: + _HAS_GRPC = False + +_TESTDATA_DIR = os.path.join(os.path.dirname(__file__), "testdata") +_REGISTRY_DIR = os.path.join(_TESTDATA_DIR, "registry") + + +def _make_provider(otlp_endpoint: str) -> TracerProvider: + resource = Resource.create({SERVICE_NAME: "test-service"}) + exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True) + provider = TracerProvider(resource=resource) + provider.add_span_processor(BatchSpanProcessor(exporter)) + return provider + + +@unittest.skipUnless( + _HAS_GRPC, + "grpc exporter not installed", +) +@unittest.skipUnless( + shutil.which("weaver") is not None, + "weaver binary not found on PATH — install from https://github.com/open-telemetry/weaver/releases", +) +class TestSDKInitLiveCheck(unittest.TestCase): + def test_end_and_check_no_violations(self): + """end_and_check() returns a LiveCheckReport with no violations on conformant telemetry.""" + with WeaverLiveCheck(registry=_REGISTRY_DIR) as weaver: + provider = _make_provider(weaver.otlp_endpoint) + with provider.get_tracer("test-tracer").start_as_current_span( + "test-span" + ): + pass + provider.force_flush() + report = weaver.end_and_check() + + self.assertIsInstance(report, LiveCheckReport) + self.assertEqual(report.violations, []) + + def test_end_and_check_raises_on_violations(self): + """end_and_check() raises LiveCheckError with the report attached.""" + with WeaverLiveCheck( + registry=_REGISTRY_DIR, policies_dir=_TESTDATA_DIR + ) as weaver: + provider = _make_provider(weaver.otlp_endpoint) + with provider.get_tracer("test-tracer").start_as_current_span( + "test-span" + ) as span: + span.set_attribute("never.use.this.attribute", "bad value") + + provider.force_flush() + + with self.assertRaises(LiveCheckError) as cm: + weaver.end_and_check() + + # Human-readable message lists the violation + self.assertIn( + "never.use.this.attribute is forbidden by this bogus policy", + str(cm.exception), + ) + # Structured report is attached for programmatic inspection + self.assertTrue( + any( + v["id"] == "test_check" + and v["context"].get("attribute_name") + == "never.use.this.attribute" + for v in cm.exception.report.violations + ) + ) + + def test_end_no_violations(self): + """end() returns a LiveCheckReport with no violations on conformant telemetry.""" + with WeaverLiveCheck(registry=_REGISTRY_DIR) as weaver: + provider = _make_provider(weaver.otlp_endpoint) + with provider.get_tracer("test-tracer").start_as_current_span( + "test-span" + ): + pass + provider.force_flush() + report = weaver.end() + + self.assertIsInstance(report, LiveCheckReport) + self.assertEqual(report.violations, []) + # LiveCheckReport supports dict-style access to the raw report data + self.assertIn("statistics", report) + self.assertIsNotNone(report.get("statistics")) + self.assertIsNone(report.get("nonexistent")) + + def test_end_with_violations(self): + """end() returns a LiveCheckReport with violations without raising.""" + with WeaverLiveCheck( + registry=_REGISTRY_DIR, policies_dir=_TESTDATA_DIR + ) as weaver: + provider = _make_provider(weaver.otlp_endpoint) + with provider.get_tracer("test-tracer").start_as_current_span( + "test-span" + ) as span: + span.set_attribute("never.use.this.attribute", "bad value") + + provider.force_flush() + report = weaver.end() + + self.assertIsInstance(report, LiveCheckReport) + # Check the violation id (maps to advice_type in the rego policy) + self.assertTrue( + any(v["id"] == "test_check" for v in report.violations) + ) + # Check the structured context identifies the offending attribute by name + self.assertTrue( + any( + isinstance(v["context"], dict) + and v["context"].get("attribute_name") + == "never.use.this.attribute" + for v in report.violations + ) + ) + + def test_report_span_statistics(self): + """The full report exposes span counts and individual span samples.""" + with WeaverLiveCheck(registry=_REGISTRY_DIR) as weaver: + provider = _make_provider(weaver.otlp_endpoint) + with provider.get_tracer("test-tracer").start_as_current_span( + "test-span" + ): + pass + provider.force_flush() + report = weaver.end() + + # Individual spans are accessible in report["samples"], each entry + # with a "span" key containing the span data. + span_samples = [ + s["span"] for s in report.get("samples", []) if "span" in s + ] + self.assertTrue( + any(s["name"] == "test-span" for s in span_samples), + f"Expected 'test-span' in samples, got: {[s['name'] for s in span_samples]}", + ) diff --git a/tests/opentelemetry-test-utils/tests/testdata/registry/attributes.yaml b/tests/opentelemetry-test-utils/tests/testdata/registry/attributes.yaml new file mode 100644 index 00000000000..d4a5e301470 --- /dev/null +++ b/tests/opentelemetry-test-utils/tests/testdata/registry/attributes.yaml @@ -0,0 +1,27 @@ +# this is a test registry, used to save up time on weaver loading semconv from GH repo +# and preventing flaky tests resulting from unpredictable network/performance in CI +groups: + - id: registry.test + type: attribute_group + brief: Minimal registry for WeaverLiveCheck self-tests. + attributes: + - id: service.name + type: string + brief: The name of the service. + stability: stable + examples: ["test-service"] + - id: telemetry.sdk.language + type: string + brief: The language of the telemetry SDK. + stability: stable + examples: ["python"] + - id: telemetry.sdk.name + type: string + brief: The name of the telemetry SDK. + stability: stable + examples: ["opentelemetry"] + - id: telemetry.sdk.version + type: string + brief: The version string of the telemetry SDK. + stability: stable + examples: ["1.0.0"] diff --git a/tests/opentelemetry-test-utils/tests/testdata/test-policy.rego b/tests/opentelemetry-test-utils/tests/testdata/test-policy.rego new file mode 100644 index 00000000000..de8e9a27f44 --- /dev/null +++ b/tests/opentelemetry-test-utils/tests/testdata/test-policy.rego @@ -0,0 +1,16 @@ +package live_check_advice + +import rego.v1 + +deny contains result if { + input.sample.attribute.name == "never.use.this.attribute" + result := { + "type": "advice", + "advice_type": "test_check", + "advice_level": "violation", + "context": { + "attribute_name": input.sample.attribute.name, + }, + "message": "never.use.this.attribute is forbidden by this bogus policy", + } +} diff --git a/uv.lock b/uv.lock index 8724095cd31..ab6be2bcb29 100644 --- a/uv.lock +++ b/uv.lock @@ -1114,6 +1114,7 @@ dependencies = [ { name = "asgiref" }, { name = "opentelemetry-api" }, { name = "opentelemetry-sdk" }, + { name = "requests" }, ] [package.metadata] @@ -1121,6 +1122,7 @@ requires-dist = [ { name = "asgiref", specifier = "~=3.0" }, { name = "opentelemetry-api", editable = "opentelemetry-api" }, { name = "opentelemetry-sdk", editable = "opentelemetry-sdk" }, + { name = "requests", specifier = "~=2.28" }, ] [[package]]