Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Opentelemetry jaeger" #644

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions deploy/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ spec:
value: {{ .Values.arsserver.env.TR_NORMALIZER }}
- name: TR_ANNOTATOR
value: {{ .Values.arsserver.env.TR_ANNOTATOR }}
- name: TR_JAEGERUI
value: {{ .Values.arsserver.env.TR_JAEGERUI }}
{{- with .Values.arsserver.resources }}
resources:
{{- toYaml . | nindent 12 }}
Expand All @@ -61,8 +59,6 @@ spec:
value: {{ .Values.celeryworker.env.TR_NORMALIZER }}
- name: TR_ANNOTATOR
value: {{ .Values.celeryworker.env.TR_ANNOTATOR }}
- name: TR_JAEGERUI
value: {{ .Values.celeryworker.env.TR_JAEGERUI }}
{{- with .Values.celeryworker.resources }}
resources:
{{- toYaml . | nindent 12 }}
Expand Down
7 changes: 0 additions & 7 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,3 @@ statistics
sympy
objsize
reasoner-pydantic
opentelemetry-api
opentelemetry-sdk
opentelemetry-instrumentation
opentelemetry-instrumentation-django
opentelemetry-exporter-jaeger
opentelemetry-instrumentation-requests
opentelemetry-instrumentation-celery
276 changes: 133 additions & 143 deletions tr_sys/tr_ars/api.py

Large diffs are not rendered by default.

22 changes: 4 additions & 18 deletions tr_sys/tr_ars/pubsub.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,13 @@
from django.core import serializers
import sys, logging, json, threading, queue, requests, os
import sys, logging, json, threading, queue, requests
from .models import Message
from .tasks import send_message
from tr_ars.tasks import send_message
from django.utils import timezone
from django.conf import settings
import django
from opentelemetry import trace
from opentelemetry.propagate import inject

# Set the Django settings module
#os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tr_sys.settings')
#django.setup()

# Initialize OpenTelemetry
#from tr_sys.tr_sys.otel_config import configure_opentelemetry
#configure_opentelemetry()

logger = logging.getLogger(__name__)
# headers={}
# inject(headers)

def send_messages(actors, messages):
logger.debug("++ sending messages ++")
for mesg in messages:
Expand All @@ -33,9 +22,8 @@ def send_messages(actors, messages):
logger.debug("Skipping actor %s/%s; it's inactive..." % (
actor.agent, actor.url()))
elif settings.USE_CELERY:
span = trace.get_current_span()
logger.debug(f"CURRENT span before Celery task submission: {span}")
result = send_message.delay(actor.to_dict(), mesg.to_dict())
#logger.debug('>>>> task future: %s' % result)
result.forget()
else:
queue1.put((actor, mesg))
Expand All @@ -47,8 +35,6 @@ def __init__(self, **kwargs):
def run(self):
logger.debug('%s: BackgroundWorker started!' % __name__)
while True:
# headers={}
# inject(headers)
actor, mesg = queue1.get()
if actor is None:
break
Expand Down
277 changes: 126 additions & 151 deletions tr_sys/tr_ars/tasks.py

Large diffs are not rendered by default.

120 changes: 55 additions & 65 deletions tr_sys/tr_ars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
Response as vResponse
)
from pydantic import ValidationError
from opentelemetry import trace
tracer = trace.get_tracer(__name__)


ARS_ACTOR = {
Expand Down Expand Up @@ -924,39 +922,37 @@ def scrub_null_attributes(data):
aux_graph['attributes']=[]



def appraise(mesg,data, agent_name,retry_counter=0):
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
json_data = json.dumps(data)
logging.info('sending data for agent: %s to APPRAISER URL: %s' % (agent_name, APPRAISER_URL))
with tracer.start_as_current_span("get_appraisal") as span:
try:
with requests.post(APPRAISER_URL,data=json_data,headers=headers, stream=True) as r:
logging.info("Appraiser being called at: "+APPRAISER_URL)
logging.info('the response for agent %s to appraiser code is: %s' % (agent_name, r.status_code))
if r.status_code==200:
rj = r.json()
#for now, just update the whole message, but we could be more precise/efficient
logging.info("Updating message with appraiser data for agent %s and pk %s " % (agent_name, str(mesg.id)))
data['message']['results']=rj['message']['results']
logging.info("Updating message with appraiser data complete for "+str(mesg.id))
try:
with requests.post(APPRAISER_URL,data=json_data,headers=headers, stream=True) as r:
logging.info("Appraiser being called at: "+APPRAISER_URL)
logging.info('the response for agent %s to appraiser code is: %s' % (agent_name, r.status_code))
if r.status_code==200:
rj = r.json()
#for now, just update the whole message, but we could be more precise/efficient
logging.info("Updating message with appraiser data for agent %s and pk %s " % (agent_name, str(mesg.id)))
data['message']['results']=rj['message']['results']
logging.info("Updating message with appraiser data complete for "+str(mesg.id))
else:
retry_counter +=1
logging.info("Received Error state from appraiser for agent %s and pk %s Code %s Attempt %s" % (agent_name,str(mesg.id),str(r.status_code),str(retry_counter)))
logging.info("JSON fields "+str(json_data)[:100])
if retry_counter<3:
appraise(mesg,data, agent_name,retry_counter)
else:
retry_counter +=1
logging.info("Received Error state from appraiser for agent %s and pk %s Code %s Attempt %s" % (agent_name,str(mesg.id),str(r.status_code),str(retry_counter)))
logging.info("JSON fields "+str(json_data)[:100])
if retry_counter<3:
appraise(mesg,data, agent_name,retry_counter)
else:
logging.error("3 consecutive Errors from appraise for agent %s and pk %s " % (agent_name,str(mesg.id)))
raise Exception
except Exception as e:
logging.error("3 consecutive Errors from appraise for agent %s and pk %s " % (agent_name,str(mesg.id)))
raise Exception
except Exception as e:

logging.error("Problem with appraiser for agent %s and pk %s " % (agent_name,str(mesg.id)))
logging.error(type(e).__name__)
logging.error(e.args)
logging.error("Adding default ordering_components for agent %s and pk %s " % (agent_name,str(mesg.id)))
span.set_attribute("error", True)
span.set_attribute("exception", str(e))
raise e
logging.error("Problem with appraiser for agent %s and pk %s " % (agent_name,str(mesg.id)))
logging.error(type(e).__name__)
logging.error(e.args)
logging.error("Adding default ordering_components for agent %s and pk %s " % (agent_name,str(mesg.id)))
raise e


def annotate_nodes(mesg,data,agent_name):
Expand Down Expand Up @@ -985,33 +981,31 @@ def annotate_nodes(mesg,data,agent_name):


json_data = json.dumps(nodes_message)
logging.info('posting data to the annotator URL %s' % ANNOTATOR_URL)
# with open(str(mesg.pk)+'_'+agent_name+"_KG_nodes_annotator.json", "w") as outfile:
# outfile.write(json_data)
with tracer.start_as_current_span("annotator") as span:
try:
r = requests.post(ANNOTATOR_URL,data=json_data,headers=headers)
r.raise_for_status()
rj=r.json()
logging.info('the response status for agent %s node annotator is: %s' % (agent_name,r.status_code))
if r.status_code==200:
for key, value in rj.items():
if 'attributes' in value.keys() and value['attributes'] is not None:
for attribute in value['attributes']:
if attribute is not None:
add_attribute(data['message']['knowledge_graph']['nodes'][key],attribute)

#Not sure about adding back clearly borked nodes, but it is in keeping with policy of non-destructiveness
if len(invalid_nodes)>0:
data['message']['knowledge_graph']['nodes'].update(invalid_nodes)
else:
post_processing_error(mesg,data,"Error in annotation of nodes")
except Exception as e:
logging.info('node annotation internal error msg is for agent %s with pk: %s is %s' % (agent_name,str(mesg.pk),str(e)))
logging.exception("error in node annotation internal function")
span.set_attribute("error", True)
span.set_attribute("exception", str(e))
raise e
try:
logging.info('posting data to the annotator URL %s' % ANNOTATOR_URL)
# with open(str(mesg.pk)+'_'+agent_name+"_KG_nodes_annotator.json", "w") as outfile:
# outfile.write(json_data)
r = requests.post(ANNOTATOR_URL,data=json_data,headers=headers)
r.raise_for_status()
rj=r.json()
logging.info('the response status for agent %s node annotator is: %s' % (agent_name,r.status_code))
if r.status_code==200:
for key, value in rj.items():
if 'attributes' in value.keys() and value['attributes'] is not None:
for attribute in value['attributes']:
if attribute is not None:
add_attribute(data['message']['knowledge_graph']['nodes'][key],attribute)

#Not sure about adding back clearly borked nodes, but it is in keeping with policy of non-destructiveness
if len(invalid_nodes)>0:
data['message']['knowledge_graph']['nodes'].update(invalid_nodes)
else:
post_processing_error(mesg,data,"Error in annotation of nodes")
except Exception as e:
logging.info('node annotation internal error msg is for agent %s with pk: %s is %s' % (agent_name,str(mesg.pk),str(e)))
logging.exception("error in node annotation internal function")

raise e
#else:
# with open(str(mesg.actor)+".json", "w") as outfile:
# outfile.write(json_data)
Expand Down Expand Up @@ -1176,15 +1170,11 @@ def canonize(curies):
"drug_chemical_conflate":True
}
logging.info('the normalizer_URL is %s' % NORMALIZER_URL)
with tracer.start_as_current_span("get_normalized_node") as span:
try:
r = requests.post(NORMALIZER_URL,json.dumps(j))
rj=r.json()
return rj
except Exception as e:
span.set_attribute("error", True)
span.set_attribute("exception", str(e))
raise
r = requests.post(NORMALIZER_URL,json.dumps(j))
rj=r.json()
return rj


def canonizeResults(results):
canonical_results=[]
for result in results:
Expand Down
3 changes: 1 addition & 2 deletions tr_sys/tr_sys/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from celery import Celery
from celery.schedules import crontab
from opentelemetry.instrumentation.celery import CeleryInstrumentor


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tr_sys.settings')
Expand All @@ -20,7 +20,6 @@
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

CeleryInstrumentor().instrument()

@app.task(bind=True)
def debug_task(self):
Expand Down
46 changes: 0 additions & 46 deletions tr_sys/tr_sys/otel_config.py

This file was deleted.

3 changes: 0 additions & 3 deletions tr_sys/tr_sys/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
"""

import os
from .otel_config import configure_opentelemetry

configure_opentelemetry()

# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
Expand Down