forked from signadot/microservices-demo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrecommendation_server.py
154 lines (118 loc) · 5.06 KB
/
recommendation_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/python
import os
import random
import time
from concurrent import futures
import grpc
from random import (
random,
sample,
)
from time import sleep
import demo_pb2
import demo_pb2_grpc
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.grpc import (
GrpcInstrumentorClient,
GrpcInstrumentorServer,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from logger import getJSONLogger
logger = getJSONLogger('recommendationservice-server')
worker_pool = futures.ThreadPoolExecutor(max_workers=10)
tracer = trace.get_tracer(__name__)
def get_random_wait_time(max_time, buckets):
num = 0
val = max_time / buckets
for i in range(buckets):
num += random() * val
return num
def sleep_random(max_time):
rnd = get_random_wait_time(max_time, 4)
time.sleep(rnd / 1000)
def mock_database_call(max_time, name, query):
with tracer.start_as_current_span(name) as span:
# span = trace.get_current_span()
span.set_attribute("db.statement", query)
span.set_attribute("db.name", "recommendation")
sleep_random(max_time)
class RecommendationService(demo_pb2_grpc.RecommendationServiceServicer):
def ListRecommendations(self, request, context):
with tracer.start_as_current_span("ListRecommendationsFunction"):
mock_database_call(250,
"SELECT recommendation.products",
"SELECT * FROM products WHERE category IN (?)")
span = trace.get_current_span()
span.set_attribute("app.python.active_threads", len(worker_pool._threads))
span.set_attribute("app.python.pending_pool", worker_pool._work_queue.qsize())
max_responses = 5
# fetch list of products from product catalog stub
cat_response = product_catalog_stub.ListProducts(demo_pb2.Empty())
product_ids = [x.id for x in cat_response.products]
filtered_products = list(set(product_ids) - set(request.product_ids))
num_products = len(filtered_products)
num_return = min(max_responses, num_products)
# sample list of indicies to return
indices = sample(range(num_products), num_return)
# fetch product ids from indices
prod_list = [filtered_products[i] for i in indices]
logger.info("[Recv ListRecommendations] product_ids={}".format(prod_list))
# build and return response
response = demo_pb2.ListRecommendationsResponse()
response.product_ids.extend(prod_list)
return response
def Check(self, request, context):
return health_pb2.HealthCheckResponse(
status=health_pb2.HealthCheckResponse.SERVING)
def Watch(self, request, context):
return health_pb2.HealthCheckResponse(
status=health_pb2.HealthCheckResponse.UNIMPLEMENTED)
if __name__ == "__main__":
logger.info("initializing recommendationservice")
# create Resource attributes used by the OpenTelemetry SDK
resource = Resource(attributes={
"service.name": os.environ.get("SERVICE_NAME"),
"service.version": "0.1", "ip": os.environ.get('POD_IP')
})
# create the OTLP exporter to send data an insecure OpenTelemetry Collector
otlp_exporter = OTLPSpanExporter(
endpoint=os.environ.get('OTEL_EXPORTER_OTLP_ENDPOINT'),
insecure=True
)
# create a Trace Provider
trace_provider = TracerProvider(resource=resource)
trace_provider.add_span_processor(
BatchSpanProcessor(otlp_exporter)
)
# set the Trace Provider to be used by the OpenTelemetry SDK
trace.set_tracer_provider(trace_provider)
# Add OpenTelemetry auto-instrumentation hooks for gRPC client and server communications
client_instrumentor = GrpcInstrumentorClient().instrument()
server_instrumentor = GrpcInstrumentorServer().instrument()
catalog_addr = os.environ.get('PRODUCT_CATALOG_SERVICE_ADDR', '')
if catalog_addr == "":
raise Exception('PRODUCT_CATALOG_SERVICE_ADDR environment variable not set')
logger.info("product catalog address: " + catalog_addr)
channel = grpc.insecure_channel(catalog_addr)
product_catalog_stub = demo_pb2_grpc.ProductCatalogServiceStub(channel)
server = grpc.server(worker_pool)
# add class to gRPC server
service = RecommendationService()
demo_pb2_grpc.add_RecommendationServiceServicer_to_server(service, server)
health_pb2_grpc.add_HealthServicer_to_server(service, server)
# start server
port = os.environ.get('PORT', "8080")
logger.info("listening on port: " + port)
server.add_insecure_port('[::]:' + port)
server.start()
# keep alive
try:
while True:
time.sleep(10000)
except KeyboardInterrupt:
server.stop(0)