Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ th2-grpc-common~=3.11.1
kubernetes==24.2.0
prometheus_client==0.14.1
th2-common-utils==1.4.2
grpc-interceptor==0.15.0
types-setuptools==65.4.0.0
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
'th2-grpc-common~=3.11.1',
'kubernetes==24.2.0',
'prometheus_client==0.14.1',
'th2-common-utils>=1.4.2'
'th2-common-utils>=1.4.2',
'grpc-interceptor==0.15.0',
'types-setuptools==65.4.0.0'
],
packages=[''] + find_packages(include=['th2_common', 'th2_common.*']),
package_data={'': ['package_info.json'], 'th2_common.schema.log': ['log4py.conf', 'log_config.json']}
Expand Down
4 changes: 1 addition & 3 deletions th2_common/schema/configuration/abstract_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC
import logging
from typing import Any, Dict

logger = logging.getLogger(__name__)


class AbstractConfiguration(ABC):
class AbstractConfiguration:

def check_unexpected_args(self, kwargs: Dict[str, Any]) -> None:
if len(kwargs) > 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC
from fnmatch import fnmatch
from typing import Callable, Dict

from th2_common.schema.filter.strategy.filter_strategy import FilterStrategy
from th2_common.schema.message.configuration.message_configuration import FieldFilterConfiguration, FieldFilterOperation


class AbstractFilterStrategy(FilterStrategy, ABC):
class AbstractFilterStrategy(FilterStrategy):

@staticmethod
def check_value(value: str, filter_configuration: FieldFilterConfiguration) -> bool:
Expand Down
53 changes: 53 additions & 0 deletions th2_common/schema/grpc/grpc_interceptors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from typing import Any, Callable

import grpc
from grpc_interceptor import ServerInterceptor
from grpc_interceptor.exceptions import GrpcException
from prometheus_client import Counter
from th2_common.schema.metrics.metric_utils import update_grpc_metrics


class MetricInterceptor(ServerInterceptor):
""" gRPC server interceptor for prometheus metrics"""

def __init__(self, received_call_total: Counter,
received_request: Counter, received_response: Counter):
""" MetricInterceptor constructor function
Args:
received_call_total (Counter): Total number of consuming particular gRPC method.
received_request (Counter): Number of bytes received from particular gRPC call.
received_response (Counter): Number of bytes sent to particular gRPC call.
"""
self.received_call_total = received_call_total
self.received_request = received_request
self.received_response = received_response

def intercept(
self,
method: Callable,
request: Any,
context: grpc.ServicerContext,
method_name: str,
) -> Any:
"""
Increases Counter metrics by appropriate values
Args:
method: The next interceptor, or method implementation.
request: The RPC request, as a protobuf message.
context: The ServicerContext pass by gRPC to the service.
method_name: A string of the form
"/protobuf.package.Service/Method"
Returns:
RPC method response
"""
try:

response = method(request, context)
update_grpc_metrics(method_name, response, self.received_call_total,
self.received_request, self.received_response)

return response
except GrpcException as e:
context.set_code(e.status_code)
context.set_details(e.details)
raise
38 changes: 35 additions & 3 deletions th2_common/schema/grpc/router/abstract_grpc_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,42 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Dict, List, Optional

import grpc
from prometheus_client import Counter
from th2_common.schema.grpc.configuration.grpc_configuration import GrpcConfiguration, GrpcConnectionConfiguration
from th2_common.schema.grpc.grpc_interceptors import MetricInterceptor
from th2_common.schema.grpc.router.grpc_router import GrpcRouter
import th2_common.schema.metrics.common_metrics as common_metrics


class AbstractGrpcRouter(GrpcRouter, ABC):
class AbstractGrpcRouter(GrpcRouter):

GRPC_INVOKE_CALL_TOTAL = Counter('th2_grpc_invoke_call_total',
'Total number of calling particular gRPC method',
common_metrics.GRPC_LABELS)

GRPC_RECEIVE_CALL_TOTAL = Counter('th2_grpc_receive_call_total',
'Total number of consuming particular gRPC method',
common_metrics.GRPC_LABELS)

GRPC_INVOKE_REQUEST_BYTES = Counter('th2_grpc_invoke_call_request_bytes',
'Number of bytes sent to particular gRPC call',
common_metrics.GRPC_LABELS)

GRPC_RECEIVE_CALL_REQUEST_BYTES = Counter('th2_grpc_receive_call_request_bytes',
'Number of bytes received from particular gRPC call',
common_metrics.GRPC_LABELS)

GRPC_INVOKE_RESPONSE_BYTES = Counter('th2_grpc_invoke_call_response_bytes',
'Number of bytes received from particular gRPC call',
common_metrics.GRPC_LABELS)

GRPC_RECEIVE_CALL_RESPONSE_BYTES = Counter('th2_grpc_receive_call_response_bytes',
'Number of bytes sent to particular gRPC call',
common_metrics.GRPC_LABELS)

def __init__(self,
grpc_configuration: GrpcConfiguration,
Expand All @@ -47,8 +73,14 @@ def server(self) -> grpc.Server:
Returns:
grpc.Server: A server object.
"""
interceptor = [
MetricInterceptor(self.GRPC_RECEIVE_CALL_TOTAL,
self.GRPC_RECEIVE_CALL_REQUEST_BYTES,
self.GRPC_RECEIVE_CALL_RESPONSE_BYTES)
]
server = grpc.server(ThreadPoolExecutor(max_workers=self.grpc_router_configuration.workers),
options=self.grpc_router_configuration.request_size_limit)
options=self.grpc_router_configuration.request_size_limit,
interceptors=interceptor)
self.__add_insecure_port(server)
self.servers.append(server)

Expand Down
16 changes: 14 additions & 2 deletions th2_common/schema/grpc/router/impl/default_grpc_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@
from th2_common.schema.grpc.configuration.grpc_configuration import GrpcConfiguration, GrpcConnectionConfiguration, \
GrpcEndpointConfiguration, GrpcServiceConfiguration
from th2_common.schema.grpc.router.abstract_grpc_router import AbstractGrpcRouter
from th2_common.schema.metrics.metric_utils import update_grpc_metrics


class DefaultGrpcRouter(AbstractGrpcRouter):

GRPC_INVOKE_CALL_TOTAL = AbstractGrpcRouter.GRPC_INVOKE_CALL_TOTAL
GRPC_INVOKE_REQUEST_BYTES = AbstractGrpcRouter.GRPC_INVOKE_REQUEST_BYTES
GRPC_INVOKE_RESPONSE_BYTES = AbstractGrpcRouter.GRPC_INVOKE_RESPONSE_BYTES

def __init__(self,
grpc_configuration: GrpcConfiguration,
grpc_router_configuration: GrpcConnectionConfiguration) -> None:
Expand Down Expand Up @@ -72,8 +77,15 @@ def create_request(self,
stub = self.stubs[endpoint]

if stub is not None:
return getattr(stub, request_name)(request, timeout=timeout) # type: ignore

request_method = getattr(stub, request_name)
response = request_method(request, timeout=timeout)
update_grpc_metrics(request_method._method.decode('utf-8'),
response,
DefaultGrpcRouter.GRPC_INVOKE_CALL_TOTAL,
DefaultGrpcRouter.GRPC_INVOKE_REQUEST_BYTES,
DefaultGrpcRouter.GRPC_INVOKE_RESPONSE_BYTES)

return response # type: ignore
return None

def _filter_services(self, properties: Optional[Dict[str, str]]) -> GrpcServiceConfiguration:
Expand Down
6 changes: 6 additions & 0 deletions th2_common/schema/metrics/common_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
DEFAULT_MESSAGE_TYPE_LABEL_NAME: str = 'message_type'
DEFAULT_TH2_PIN_LABEL_NAME: str = 'th2_pin'
DEFAULT_TH2_TYPE_LABEL_NAME: str = 'th2_type'
DEFAULT_GRPC_METHOD_LABEL_NAME: str = 'method_name'
DEFAULT_GRPC_SERVICE_LABEL_NAME: str = 'service_name'
DEFAULT_LABELS: Tuple[str, str, str] = (
DEFAULT_TH2_PIN_LABEL_NAME,
DEFAULT_SESSION_ALIAS_LABEL_NAME,
Expand All @@ -47,6 +49,10 @@
DEFAULT_TH2_TYPE_LABEL_NAME,
DEFAULT_QUEUE_LABEL_NAME
)
GRPC_LABELS = (
DEFAULT_GRPC_SERVICE_LABEL_NAME,
DEFAULT_GRPC_METHOD_LABEL_NAME
)
TH2_MESSAGE_TYPES: dict = {'raw': 'RAW_MESSAGE', 'parsed': 'MESSAGE'}

LIVENESS_ARBITER = AggregatingMetric([PrometheusMetric('th2_liveness', 'Service liveness'), FileMetric('healthy')])
Expand Down
16 changes: 15 additions & 1 deletion th2_common/schema/metrics/metric_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from google.protobuf.descriptor_pb2 import DescriptorProto
from google.protobuf.internal.containers import RepeatedCompositeFieldContainer
from prometheus_client import Counter, Gauge
from th2_common.schema.metrics import common_metrics
Expand All @@ -30,7 +31,9 @@ def update_total_metrics(batch: MessageGroupBatch,
if group_counter:
group_counter.labels(*gr_labels).inc()
if sequence_gauge:
sequence_gauge.labels(*gr_labels).set(get_sequence(group))
sequence = get_sequence(group)
value = 0 if sequence is None else sequence
sequence_gauge.labels(*gr_labels).set(value)


def update_message_metrics(messages: RepeatedCompositeFieldContainer, counter: Counter, *labels: str) -> None:
Expand All @@ -50,3 +53,14 @@ def update_dropped_metrics(batch: MessageGroupBatch,
update_message_metrics(group.messages, message_counter, *labels)
if group_counter:
group_counter.labels(*labels).inc()


def update_grpc_metrics(full_name: str, data: DescriptorProto, method_call_total: Counter,
request_bytes: Counter, response_bytes: Counter) -> None:
service_name, method_name = full_name.split('.')[-1].split('/')
labels = service_name, method_name
byte_message = data.SerializeToString()
byte_response = data.SerializeToString()
method_call_total.labels(*labels).inc()
request_bytes.labels(*labels).inc(len(byte_message))
response_bytes.labels(*labels).inc(len(byte_response))