diff --git a/requirements.txt b/requirements.txt index 11ee99b..edad02e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,5 @@ th2-grpc-common~=3.11.1 kubernetes==24.2.0 prometheus_client==0.14.1 th2-common-utils==1.4.2 +grpc-interceptor==0.15.0 +types-setuptools==65.4.0.0 diff --git a/setup.py b/setup.py index 126c910..f1bd9d9 100644 --- a/setup.py +++ b/setup.py @@ -41,7 +41,9 @@ 'th2-grpc-common~=3.11.1', 'kubernetes==24.2.0', 'prometheus_client==0.14.1', - 'th2-common-utils>=1.4.2' + 'th2-common-utils>=1.4.2', + 'grpc-interceptor==0.15.0', + 'types-setuptools==65.4.0.0' ], packages=[''] + find_packages(include=['th2_common', 'th2_common.*']), package_data={'': ['package_info.json'], 'th2_common.schema.log': ['log4py.conf', 'log_config.json']} diff --git a/th2_common/schema/configuration/abstract_configuration.py b/th2_common/schema/configuration/abstract_configuration.py index 515723e..407974a 100644 --- a/th2_common/schema/configuration/abstract_configuration.py +++ b/th2_common/schema/configuration/abstract_configuration.py @@ -11,15 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from abc import ABC import logging from typing import Any, Dict logger = logging.getLogger(__name__) -class AbstractConfiguration(ABC): +class AbstractConfiguration: def check_unexpected_args(self, kwargs: Dict[str, Any]) -> None: if len(kwargs) > 0: diff --git a/th2_common/schema/filter/strategy/abstract_filter_strategy.py b/th2_common/schema/filter/strategy/abstract_filter_strategy.py index 76cf3e4..4e44a89 100644 --- a/th2_common/schema/filter/strategy/abstract_filter_strategy.py +++ b/th2_common/schema/filter/strategy/abstract_filter_strategy.py @@ -11,8 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from abc import ABC from fnmatch import fnmatch from typing import Callable, Dict @@ -20,7 +18,7 @@ from th2_common.schema.message.configuration.message_configuration import FieldFilterConfiguration, FieldFilterOperation -class AbstractFilterStrategy(FilterStrategy, ABC): +class AbstractFilterStrategy(FilterStrategy): @staticmethod def check_value(value: str, filter_configuration: FieldFilterConfiguration) -> bool: diff --git a/th2_common/schema/grpc/grpc_interceptors.py b/th2_common/schema/grpc/grpc_interceptors.py new file mode 100644 index 0000000..234e06c --- /dev/null +++ b/th2_common/schema/grpc/grpc_interceptors.py @@ -0,0 +1,53 @@ +from typing import Any, Callable + +import grpc +from grpc_interceptor import ServerInterceptor +from grpc_interceptor.exceptions import GrpcException +from prometheus_client import Counter +from th2_common.schema.metrics.metric_utils import update_grpc_metrics + + +class MetricInterceptor(ServerInterceptor): + """ gRPC server interceptor for prometheus metrics""" + + def __init__(self, received_call_total: Counter, + received_request: Counter, received_response: Counter): + """ MetricInterceptor constructor function + Args: + received_call_total (Counter): Total number of consuming particular gRPC method. + received_request (Counter): Number of bytes received from particular gRPC call. + received_response (Counter): Number of bytes sent to particular gRPC call. + """ + self.received_call_total = received_call_total + self.received_request = received_request + self.received_response = received_response + + def intercept( + self, + method: Callable, + request: Any, + context: grpc.ServicerContext, + method_name: str, + ) -> Any: + """ + Increases Counter metrics by appropriate values + Args: + method: The next interceptor, or method implementation. + request: The RPC request, as a protobuf message. + context: The ServicerContext pass by gRPC to the service. + method_name: A string of the form + "/protobuf.package.Service/Method" + Returns: + RPC method response + """ + try: + + response = method(request, context) + update_grpc_metrics(method_name, response, self.received_call_total, + self.received_request, self.received_response) + + return response + except GrpcException as e: + context.set_code(e.status_code) + context.set_details(e.details) + raise diff --git a/th2_common/schema/grpc/router/abstract_grpc_router.py b/th2_common/schema/grpc/router/abstract_grpc_router.py index c269c98..ada44f6 100644 --- a/th2_common/schema/grpc/router/abstract_grpc_router.py +++ b/th2_common/schema/grpc/router/abstract_grpc_router.py @@ -12,16 +12,42 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import ABC from concurrent.futures.thread import ThreadPoolExecutor from typing import Dict, List, Optional import grpc +from prometheus_client import Counter from th2_common.schema.grpc.configuration.grpc_configuration import GrpcConfiguration, GrpcConnectionConfiguration +from th2_common.schema.grpc.grpc_interceptors import MetricInterceptor from th2_common.schema.grpc.router.grpc_router import GrpcRouter +import th2_common.schema.metrics.common_metrics as common_metrics -class AbstractGrpcRouter(GrpcRouter, ABC): +class AbstractGrpcRouter(GrpcRouter): + + GRPC_INVOKE_CALL_TOTAL = Counter('th2_grpc_invoke_call_total', + 'Total number of calling particular gRPC method', + common_metrics.GRPC_LABELS) + + GRPC_RECEIVE_CALL_TOTAL = Counter('th2_grpc_receive_call_total', + 'Total number of consuming particular gRPC method', + common_metrics.GRPC_LABELS) + + GRPC_INVOKE_REQUEST_BYTES = Counter('th2_grpc_invoke_call_request_bytes', + 'Number of bytes sent to particular gRPC call', + common_metrics.GRPC_LABELS) + + GRPC_RECEIVE_CALL_REQUEST_BYTES = Counter('th2_grpc_receive_call_request_bytes', + 'Number of bytes received from particular gRPC call', + common_metrics.GRPC_LABELS) + + GRPC_INVOKE_RESPONSE_BYTES = Counter('th2_grpc_invoke_call_response_bytes', + 'Number of bytes received from particular gRPC call', + common_metrics.GRPC_LABELS) + + GRPC_RECEIVE_CALL_RESPONSE_BYTES = Counter('th2_grpc_receive_call_response_bytes', + 'Number of bytes sent to particular gRPC call', + common_metrics.GRPC_LABELS) def __init__(self, grpc_configuration: GrpcConfiguration, @@ -47,8 +73,14 @@ def server(self) -> grpc.Server: Returns: grpc.Server: A server object. """ + interceptor = [ + MetricInterceptor(self.GRPC_RECEIVE_CALL_TOTAL, + self.GRPC_RECEIVE_CALL_REQUEST_BYTES, + self.GRPC_RECEIVE_CALL_RESPONSE_BYTES) + ] server = grpc.server(ThreadPoolExecutor(max_workers=self.grpc_router_configuration.workers), - options=self.grpc_router_configuration.request_size_limit) + options=self.grpc_router_configuration.request_size_limit, + interceptors=interceptor) self.__add_insecure_port(server) self.servers.append(server) diff --git a/th2_common/schema/grpc/router/impl/default_grpc_router.py b/th2_common/schema/grpc/router/impl/default_grpc_router.py index ca61bc3..286e878 100644 --- a/th2_common/schema/grpc/router/impl/default_grpc_router.py +++ b/th2_common/schema/grpc/router/impl/default_grpc_router.py @@ -23,10 +23,15 @@ from th2_common.schema.grpc.configuration.grpc_configuration import GrpcConfiguration, GrpcConnectionConfiguration, \ GrpcEndpointConfiguration, GrpcServiceConfiguration from th2_common.schema.grpc.router.abstract_grpc_router import AbstractGrpcRouter +from th2_common.schema.metrics.metric_utils import update_grpc_metrics class DefaultGrpcRouter(AbstractGrpcRouter): + GRPC_INVOKE_CALL_TOTAL = AbstractGrpcRouter.GRPC_INVOKE_CALL_TOTAL + GRPC_INVOKE_REQUEST_BYTES = AbstractGrpcRouter.GRPC_INVOKE_REQUEST_BYTES + GRPC_INVOKE_RESPONSE_BYTES = AbstractGrpcRouter.GRPC_INVOKE_RESPONSE_BYTES + def __init__(self, grpc_configuration: GrpcConfiguration, grpc_router_configuration: GrpcConnectionConfiguration) -> None: @@ -72,8 +77,15 @@ def create_request(self, stub = self.stubs[endpoint] if stub is not None: - return getattr(stub, request_name)(request, timeout=timeout) # type: ignore - + request_method = getattr(stub, request_name) + response = request_method(request, timeout=timeout) + update_grpc_metrics(request_method._method.decode('utf-8'), + response, + DefaultGrpcRouter.GRPC_INVOKE_CALL_TOTAL, + DefaultGrpcRouter.GRPC_INVOKE_REQUEST_BYTES, + DefaultGrpcRouter.GRPC_INVOKE_RESPONSE_BYTES) + + return response # type: ignore return None def _filter_services(self, properties: Optional[Dict[str, str]]) -> GrpcServiceConfiguration: diff --git a/th2_common/schema/metrics/common_metrics.py b/th2_common/schema/metrics/common_metrics.py index 246a318..da67241 100644 --- a/th2_common/schema/metrics/common_metrics.py +++ b/th2_common/schema/metrics/common_metrics.py @@ -29,6 +29,8 @@ DEFAULT_MESSAGE_TYPE_LABEL_NAME: str = 'message_type' DEFAULT_TH2_PIN_LABEL_NAME: str = 'th2_pin' DEFAULT_TH2_TYPE_LABEL_NAME: str = 'th2_type' +DEFAULT_GRPC_METHOD_LABEL_NAME: str = 'method_name' +DEFAULT_GRPC_SERVICE_LABEL_NAME: str = 'service_name' DEFAULT_LABELS: Tuple[str, str, str] = ( DEFAULT_TH2_PIN_LABEL_NAME, DEFAULT_SESSION_ALIAS_LABEL_NAME, @@ -47,6 +49,10 @@ DEFAULT_TH2_TYPE_LABEL_NAME, DEFAULT_QUEUE_LABEL_NAME ) +GRPC_LABELS = ( + DEFAULT_GRPC_SERVICE_LABEL_NAME, + DEFAULT_GRPC_METHOD_LABEL_NAME +) TH2_MESSAGE_TYPES: dict = {'raw': 'RAW_MESSAGE', 'parsed': 'MESSAGE'} LIVENESS_ARBITER = AggregatingMetric([PrometheusMetric('th2_liveness', 'Service liveness'), FileMetric('healthy')]) diff --git a/th2_common/schema/metrics/metric_utils.py b/th2_common/schema/metrics/metric_utils.py index 26f3e7b..b6e69b8 100644 --- a/th2_common/schema/metrics/metric_utils.py +++ b/th2_common/schema/metrics/metric_utils.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from google.protobuf.descriptor_pb2 import DescriptorProto from google.protobuf.internal.containers import RepeatedCompositeFieldContainer from prometheus_client import Counter, Gauge from th2_common.schema.metrics import common_metrics @@ -30,7 +31,9 @@ def update_total_metrics(batch: MessageGroupBatch, if group_counter: group_counter.labels(*gr_labels).inc() if sequence_gauge: - sequence_gauge.labels(*gr_labels).set(get_sequence(group)) + sequence = get_sequence(group) + value = 0 if sequence is None else sequence + sequence_gauge.labels(*gr_labels).set(value) def update_message_metrics(messages: RepeatedCompositeFieldContainer, counter: Counter, *labels: str) -> None: @@ -50,3 +53,14 @@ def update_dropped_metrics(batch: MessageGroupBatch, update_message_metrics(group.messages, message_counter, *labels) if group_counter: group_counter.labels(*labels).inc() + + +def update_grpc_metrics(full_name: str, data: DescriptorProto, method_call_total: Counter, + request_bytes: Counter, response_bytes: Counter) -> None: + service_name, method_name = full_name.split('.')[-1].split('/') + labels = service_name, method_name + byte_message = data.SerializeToString() + byte_response = data.SerializeToString() + method_call_total.labels(*labels).inc() + request_bytes.labels(*labels).inc(len(byte_message)) + response_bytes.labels(*labels).inc(len(byte_response))