Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#5093](https://github.com/open-telemetry/opentelemetry-python/pull/5093))
- `opentelemetry-sdk`: fix YAML structure injection via environment variable substitution in declarative file configuration; values containing newlines are now emitted as quoted YAML scalars per spec requirement
([#5091](https://github.com/open-telemetry/opentelemetry-python/pull/5091))
- `opentelemetry-sdk`: Fix `force_flush` on `MetricReader` and `PeriodicExportingMetricReader` to return a meaningful `bool` reflecting actual export success/failure instead of always returning `True`. Also fixes `detach(token)` not being called when export raises an exception. ([#5085](https://github.com/open-telemetry/opentelemetry-python/pull/5085))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also fixes detach(token) not being called when export raises an exception.
This was never an issue to begin with, why is this here?

- `opentelemetry-sdk`: Add `create_logger_provider`/`configure_logger_provider` to declarative file configuration, enabling LoggerProvider instantiation from config files without reading env vars
([#4990](https://github.com/open-telemetry/opentelemetry-python/pull/4990))
- `opentelemetry-sdk`: Add `service` resource detector support to declarative file configuration via `detection_development.detectors[].service`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from sys import stdout
from threading import Event, Lock, RLock, Thread
from time import perf_counter, time_ns
from typing import IO, Callable, Iterable, Optional
from typing import IO, Callable, Iterable

from typing_extensions import final

Expand Down Expand Up @@ -336,7 +336,7 @@ def __init__(
)

@final
def collect(self, timeout_millis: float = 10_000) -> None:
def collect(self, timeout_millis: float = 10_000) -> bool | None:
"""Collects the metrics from the internal SDK state and
invokes the `_receive_metrics` with the collection.

Expand All @@ -361,10 +361,11 @@ def collect(self, timeout_millis: float = 10_000) -> None:
self._metrics.record_collection(perf_counter() - start_time)

if metrics is not None:
self._receive_metrics(
return self._receive_metrics(
metrics,
timeout_millis=timeout_millis,
)
return None

@final
def _set_collect_callback(
Expand All @@ -386,17 +387,25 @@ def _receive_metrics(
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> None:
"""Called by `MetricReader.collect` when it receives a batch of metrics"""
) -> bool | None:
"""Called by `MetricReader.collect` when it receives a batch of metrics.

Subclasses should return ``True`` on success and ``False`` on failure.

.. note::
Existing subclasses that return ``None`` (the old implicit default)
will be treated as vacuous success by ``force_flush``, preserving
backward-compatible behaviour.
"""

def _set_meter_provider(self, meter_provider: MeterProvider) -> None:
self._metrics = MetricReaderMetrics(
self._otel_component_type, meter_provider
)

def force_flush(self, timeout_millis: float = 10_000) -> bool:
self.collect(timeout_millis=timeout_millis)
return True
result = self.collect(timeout_millis=timeout_millis)
return result is not False

@abstractmethod
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
Expand Down Expand Up @@ -436,7 +445,7 @@ def __init__(

def get_metrics_data(
self,
) -> Optional[MetricsData]:
) -> MetricsData | None:
"""Reads and returns current metrics from the SDK"""
with self._lock:
self.collect()
Expand All @@ -449,9 +458,10 @@ def _receive_metrics(
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> None:
) -> bool:
with self._lock:
self._metrics_data = metrics_data
return True

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass
Expand All @@ -470,8 +480,8 @@ class PeriodicExportingMetricReader(MetricReader):
def __init__(
self,
exporter: MetricExporter,
export_interval_millis: Optional[float] = None,
export_timeout_millis: Optional[float] = None,
export_interval_millis: float | None = None,
export_timeout_millis: float | None = None,
) -> None:
# PeriodicExportingMetricReader defers to exporter for configuration
super().__init__(
Expand Down Expand Up @@ -567,17 +577,19 @@ def _receive_metrics(
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> None:
) -> bool:
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
# pylint: disable=broad-exception-caught,invalid-name
try:
with self._export_lock:
self._exporter.export(
result = self._exporter.export(
metrics_data, timeout_millis=timeout_millis
)
return result is MetricExportResult.SUCCESS
except Exception:
_logger.exception("Exception while exporting metrics")
detach(token)
return False
finally:
detach(token)

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
deadline_ns = time_ns() + timeout_millis * 10**6
Expand All @@ -596,6 +608,6 @@ def _shutdown():
self._exporter.shutdown(timeout=(deadline_ns - time_ns()) / 10**6)

def force_flush(self, timeout_millis: float = 10_000) -> bool:
super().force_flush(timeout_millis=timeout_millis)
self._exporter.force_flush(timeout_millis=timeout_millis)
return True
if not super().force_flush(timeout_millis=timeout_millis):
return False
return self._exporter.force_flush(timeout_millis=timeout_millis)
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,54 @@ def test_metric_reader_metrics(self):
assert isinstance(name, str)
self.assertTrue(name.startswith("periodic_metric_reader/"))

mp.shutdown()
mp.shutdown()

def test_force_flush_returns_true_on_success(self):
exporter = FakeMetricsExporter()
pmr = self._create_periodic_reader(metrics, exporter)
result = pmr.force_flush(timeout_millis=5_000)
self.assertTrue(result)
pmr.shutdown()

def test_force_flush_returns_false_on_export_failure(self):
exporter = FakeMetricsExporter()
exporter.export = Mock(return_value=MetricExportResult.FAILURE)
pmr = self._create_periodic_reader(metrics, exporter)
result = pmr.force_flush(timeout_millis=5_000)
self.assertFalse(result)
pmr.shutdown()

def test_force_flush_skips_exporter_flush_when_collect_fails(self):
exporter = FakeMetricsExporter()
exporter.force_flush = Mock(return_value=True)
pmr = PeriodicExportingMetricReader(
exporter, export_interval_millis=math.inf
)
# No collect callback registered → collect returns None → force_flush
# on base treats None as not-False (success), so wire up a failing one
exporter.export = Mock(return_value=MetricExportResult.FAILURE)

def _collect_failure(reader, timeout_millis):
return metrics

pmr._set_collect_callback(_collect_failure)
exporter.export = Mock(return_value=MetricExportResult.FAILURE)
result = pmr.force_flush(timeout_millis=5_000)
self.assertFalse(result)
exporter.force_flush.assert_not_called()
pmr.shutdown()

def test_detach_called_on_export_failure(self):
"""detach(token) must run in finally even when export returns FAILURE."""
from unittest.mock import patch

exporter = FakeMetricsExporter()
exporter.export = Mock(return_value=MetricExportResult.FAILURE)
pmr = self._create_periodic_reader(metrics, exporter)

with patch(
"opentelemetry.sdk.metrics._internal.export.detach"
) as mock_detach:
pmr.force_flush(timeout_millis=5_000)
self.assertTrue(mock_detach.called)
pmr.shutdown()