diff --git a/CHANGELOG.md b/CHANGELOG.md index 8bd0a53c3c..09ed997b75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#5093](https://github.com/open-telemetry/opentelemetry-python/pull/5093)) - `opentelemetry-sdk`: fix YAML structure injection via environment variable substitution in declarative file configuration; values containing newlines are now emitted as quoted YAML scalars per spec requirement ([#5091](https://github.com/open-telemetry/opentelemetry-python/pull/5091)) +- `opentelemetry-sdk`: Fix `force_flush` on `MetricReader` and `PeriodicExportingMetricReader` to return a meaningful `bool` reflecting actual export success/failure instead of always returning `True`. Also fixes `detach(token)` not being called when export raises an exception. ([#5085](https://github.com/open-telemetry/opentelemetry-python/pull/5085)) - `opentelemetry-sdk`: Add `create_logger_provider`/`configure_logger_provider` to declarative file configuration, enabling LoggerProvider instantiation from config files without reading env vars ([#4990](https://github.com/open-telemetry/opentelemetry-python/pull/4990)) - `opentelemetry-sdk`: Add `service` resource detector support to declarative file configuration via `detection_development.detectors[].service` diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index 66f327306a..2e1e4c4fc9 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -23,7 +23,7 @@ from sys import stdout from threading import Event, Lock, RLock, Thread from time import perf_counter, time_ns -from typing import IO, Callable, Iterable, Optional +from typing import IO, Callable, Iterable from typing_extensions import final @@ -336,7 +336,7 @@ def __init__( ) @final - def collect(self, timeout_millis: float = 10_000) -> None: + def collect(self, timeout_millis: float = 10_000) -> bool | None: """Collects the metrics from the internal SDK state and invokes the `_receive_metrics` with the collection. @@ -361,10 +361,11 @@ def collect(self, timeout_millis: float = 10_000) -> None: self._metrics.record_collection(perf_counter() - start_time) if metrics is not None: - self._receive_metrics( + return self._receive_metrics( metrics, timeout_millis=timeout_millis, ) + return None @final def _set_collect_callback( @@ -386,8 +387,16 @@ def _receive_metrics( metrics_data: MetricsData, timeout_millis: float = 10_000, **kwargs, - ) -> None: - """Called by `MetricReader.collect` when it receives a batch of metrics""" + ) -> bool | None: + """Called by `MetricReader.collect` when it receives a batch of metrics. + + Subclasses should return ``True`` on success and ``False`` on failure. + + .. note:: + Existing subclasses that return ``None`` (the old implicit default) + will be treated as vacuous success by ``force_flush``, preserving + backward-compatible behaviour. + """ def _set_meter_provider(self, meter_provider: MeterProvider) -> None: self._metrics = MetricReaderMetrics( @@ -395,8 +404,8 @@ def _set_meter_provider(self, meter_provider: MeterProvider) -> None: ) def force_flush(self, timeout_millis: float = 10_000) -> bool: - self.collect(timeout_millis=timeout_millis) - return True + result = self.collect(timeout_millis=timeout_millis) + return result is not False @abstractmethod def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: @@ -436,7 +445,7 @@ def __init__( def get_metrics_data( self, - ) -> Optional[MetricsData]: + ) -> MetricsData | None: """Reads and returns current metrics from the SDK""" with self._lock: self.collect() @@ -449,9 +458,10 @@ def _receive_metrics( metrics_data: MetricsData, timeout_millis: float = 10_000, **kwargs, - ) -> None: + ) -> bool: with self._lock: self._metrics_data = metrics_data + return True def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass @@ -470,8 +480,8 @@ class PeriodicExportingMetricReader(MetricReader): def __init__( self, exporter: MetricExporter, - export_interval_millis: Optional[float] = None, - export_timeout_millis: Optional[float] = None, + export_interval_millis: float | None = None, + export_timeout_millis: float | None = None, ) -> None: # PeriodicExportingMetricReader defers to exporter for configuration super().__init__( @@ -567,17 +577,19 @@ def _receive_metrics( metrics_data: MetricsData, timeout_millis: float = 10_000, **kwargs, - ) -> None: + ) -> bool: token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) - # pylint: disable=broad-exception-caught,invalid-name try: with self._export_lock: - self._exporter.export( + result = self._exporter.export( metrics_data, timeout_millis=timeout_millis ) + return result is MetricExportResult.SUCCESS except Exception: _logger.exception("Exception while exporting metrics") - detach(token) + return False + finally: + detach(token) def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: deadline_ns = time_ns() + timeout_millis * 10**6 @@ -596,6 +608,6 @@ def _shutdown(): self._exporter.shutdown(timeout=(deadline_ns - time_ns()) / 10**6) def force_flush(self, timeout_millis: float = 10_000) -> bool: - super().force_flush(timeout_millis=timeout_millis) - self._exporter.force_flush(timeout_millis=timeout_millis) - return True + if not super().force_flush(timeout_millis=timeout_millis): + return False + return self._exporter.force_flush(timeout_millis=timeout_millis) diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index 3e47e57768..81f9ed74e4 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -359,4 +359,54 @@ def test_metric_reader_metrics(self): assert isinstance(name, str) self.assertTrue(name.startswith("periodic_metric_reader/")) - mp.shutdown() + mp.shutdown() + + def test_force_flush_returns_true_on_success(self): + exporter = FakeMetricsExporter() + pmr = self._create_periodic_reader(metrics, exporter) + result = pmr.force_flush(timeout_millis=5_000) + self.assertTrue(result) + pmr.shutdown() + + def test_force_flush_returns_false_on_export_failure(self): + exporter = FakeMetricsExporter() + exporter.export = Mock(return_value=MetricExportResult.FAILURE) + pmr = self._create_periodic_reader(metrics, exporter) + result = pmr.force_flush(timeout_millis=5_000) + self.assertFalse(result) + pmr.shutdown() + + def test_force_flush_skips_exporter_flush_when_collect_fails(self): + exporter = FakeMetricsExporter() + exporter.force_flush = Mock(return_value=True) + pmr = PeriodicExportingMetricReader( + exporter, export_interval_millis=math.inf + ) + # No collect callback registered → collect returns None → force_flush + # on base treats None as not-False (success), so wire up a failing one + exporter.export = Mock(return_value=MetricExportResult.FAILURE) + + def _collect_failure(reader, timeout_millis): + return metrics + + pmr._set_collect_callback(_collect_failure) + exporter.export = Mock(return_value=MetricExportResult.FAILURE) + result = pmr.force_flush(timeout_millis=5_000) + self.assertFalse(result) + exporter.force_flush.assert_not_called() + pmr.shutdown() + + def test_detach_called_on_export_failure(self): + """detach(token) must run in finally even when export returns FAILURE.""" + from unittest.mock import patch + + exporter = FakeMetricsExporter() + exporter.export = Mock(return_value=MetricExportResult.FAILURE) + pmr = self._create_periodic_reader(metrics, exporter) + + with patch( + "opentelemetry.sdk.metrics._internal.export.detach" + ) as mock_detach: + pmr.force_flush(timeout_millis=5_000) + self.assertTrue(mock_detach.called) + pmr.shutdown()