Skip to content

Commit

Permalink
add prometheus to website
Browse files Browse the repository at this point in the history
  • Loading branch information
dsschult committed Jan 28, 2025
1 parent 830188e commit 260981d
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 74 deletions.
2 changes: 1 addition & 1 deletion iceprod/credentials/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ async def start(self):
i = Info('iceprod', 'IceProd information')
i.info({
'version': version_string,
'type': 'api',
'type': 'credentials',
})

for collection in self.indexes:
Expand Down
2 changes: 1 addition & 1 deletion iceprod/materialization/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ async def start(self):
i = Info('iceprod', 'IceProd information')
i.info({
'version': version_string,
'type': 'api',
'type': 'materialization',
})

for collection in self.indexes:
Expand Down
4 changes: 0 additions & 4 deletions iceprod/rest/handlers/pilots.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,6 @@ async def patch(self, pilot_id):
if not ret:
self.send_error(404, reason="Pilot not found")
else:
if 'site' in ret and ret['site']:
self.module.statsd.incr('site.{}.pilot'.format(ret['site']))
self.write(ret)
self.finish()

Expand All @@ -194,6 +192,4 @@ async def delete(self, pilot_id):
if not ret:
self.send_error(404, reason="Pilot not found")
else:
if 'site' in ret and ret['site']:
self.module.statsd.incr('site.{}.pilot_delete'.format(ret['site']))
self.write({})
25 changes: 1 addition & 24 deletions iceprod/rest/handlers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,6 @@ async def post(self):
logger.info('filter_query: %r', filter_query)
self.send_error(404, reason="Task not found")
else:
self.statsd.incr('site.{}.task_queued'.format(site))
self.write(ret)
self.finish()

Expand Down Expand Up @@ -903,25 +902,6 @@ async def post(self, task_id):
if 'site' in data:
site = data['site']
update_query['$set']['site'] = site
if self.statsd and 'reason' in data and data['reason']:
reason = 'other'
reasons = [
('Exception: failed to download', 'download_failure'),
('Exception: failed to upload', 'upload_failure'),
('Exception: module failed', 'module_failure'),
('Resource overusage for cpu', 'cpu_overuse'),
('Resource overusage for gpu', 'gpu_overuse'),
('Resource overusage for memory', 'memory_overuse'),
('Resource overusage for disk', 'disk_overuse'),
('Resource overusage for time', 'time_overuse'),
('pilot SIGTERM', 'sigterm'),
('killed', 'killed'),
]
for text,r in reasons:
if text in data['reason']:
reason = r
break
self.statsd.incr('site.{}.task_{}.{}'.format(site, self.final_status, reason))

ret = await self.db.tasks.find_one_and_update(
filter_query,
Expand Down Expand Up @@ -985,11 +965,8 @@ async def post(self, task_id):

if 'time_used' in data:
update_query['$set']['walltime'] = data['time_used']/3600.
site = 'unknown'
if 'site' in data:
site = data['site']
update_query['$set']['site'] = site
self.statsd.incr('site.{}.task_complete'.format(site))
update_query['$set']['site'] = data['site']

ret = await self.db.tasks.find_one_and_update(
filter_query,
Expand Down
60 changes: 16 additions & 44 deletions iceprod/website/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,22 @@
from iceprod.core.jsonUtil import json_encode

from cachetools.func import ttl_cache
from prometheus_client import Info, start_http_server
import tornado.web
import tornado.httpserver
import tornado.gen
import jwt
import tornado.concurrent
from rest_tools.client import RestClient, ClientCredentialsAuth
from rest_tools.server import catch_error, RestServer, RestHandlerSetup, RestHandler, OpenIDLoginHandler
from rest_tools import telemetry as wtt
from wipac_dev_tools import from_environment

import iceprod
from iceprod import __version__ as version_string
from iceprod.roles_groups import GROUPS
from iceprod.core.config import CONFIG_SCHEMA as DATASET_SCHEMA
from iceprod.server.config import CONFIG_SCHEMA as SERVER_SCHEMA
import iceprod.core.functions
from iceprod.server import documentation
from iceprod.server.module import FakeStatsClient, StatsClientIgnoreErrors
import iceprod.server.states
from iceprod.server.util import datetime2str, nowstr

Expand Down Expand Up @@ -197,18 +196,16 @@ def initialize(self, cred_rest_client=None, full_url=None, **kwargs):

class PublicHandler(TokenStorageMixin, RestHandler):
"""Default Handler"""
def initialize(self, statsd=None, rest_api=None, cred_rest_client=None, system_rest_client=None, full_url=None, **kwargs):
def initialize(self, rest_api=None, cred_rest_client=None, system_rest_client=None, full_url=None, **kwargs):
"""
Get some params from the website module
:param statsd: statsd client
:param rest_api: the rest api url
:param cred_rest_client: the rest api url for the cred service
:param system_rest_client: the rest client for the system role
:param full_url: the full base url for the website
"""
super().initialize(**kwargs)
self.statsd = statsd
self.rest_api = rest_api
self.cred_rest_client = cred_rest_client
self.system_rest_client = system_rest_client
Expand All @@ -217,7 +214,7 @@ def initialize(self, statsd=None, rest_api=None, cred_rest_client=None, system_r

def get_template_namespace(self):
namespace = super().get_template_namespace()
namespace['version'] = iceprod.__version__
namespace['version'] = version_string
namespace['section'] = self.request.uri.lstrip('/').split('?')[0].split('/')[0]
namespace['json_encode'] = json_encode
namespace['states'] = iceprod.server.states
Expand All @@ -235,7 +232,6 @@ async def get_current_user_async(self):
self.current_user = ret
return ret

@wtt.evented(all_args=True)
def write_error(self, status_code=500, **kwargs):
"""Write out custom error page."""
self.set_status(status_code)
Expand All @@ -258,7 +254,6 @@ class Default(PublicHandler):
async def get(self):
# try to get the user, if available
await self.get_current_user_async()
self.statsd.incr('default')
self.render('main.html')


Expand All @@ -279,7 +274,6 @@ class Submit(PublicHandler):
@authenticated
async def get(self):
logger.info('here')
self.statsd.incr('submit')
token = self.auth_access_token
groups = self.auth_groups
default_config = {
Expand Down Expand Up @@ -308,7 +302,6 @@ class Config(PublicHandler):
"""Handle /config urls"""
@authenticated
async def get(self):
self.statsd.incr('config')
dataset_id = self.get_argument('dataset_id',default=None)
if not dataset_id:
self.write_error(400,message='must provide dataset_id')
Expand Down Expand Up @@ -339,7 +332,6 @@ async def get_usernames(self):

@authenticated
async def get(self):
self.statsd.incr('dataset_browse')
usernames = await self.get_usernames()
filter_options = {
'status': ['processing', 'suspended', 'errors', 'complete', 'truncated'],
Expand Down Expand Up @@ -375,8 +367,6 @@ class Dataset(PublicHandler):
"""Handle /dataset urls"""
@authenticated
async def get(self, dataset_id):
self.statsd.incr('dataset')

if dataset_id.isdigit():
try:
d_num = int(dataset_id)
Expand Down Expand Up @@ -430,9 +420,7 @@ class TaskBrowse(PublicHandler):
"""Handle /task urls"""
@authenticated
async def get(self, dataset_id):
self.statsd.incr('task_browse')
status = self.get_argument('status',default=None)

if status:
tasks = await self.rest_client.request('GET','/datasets/{}/tasks?status={}'.format(dataset_id,status))
for t in tasks:
Expand All @@ -450,9 +438,7 @@ class Task(PublicHandler):
"""Handle /task urls"""
@authenticated
async def get(self, dataset_id, task_id):
self.statsd.incr('task')
status = self.get_argument('status', default=None)

passkey = self.auth_access_token

dataset = await self.rest_client.request('GET', '/datasets/{}'.format(dataset_id))
Expand Down Expand Up @@ -483,9 +469,7 @@ class JobBrowse(PublicHandler):
"""Handle /job urls"""
@authenticated
async def get(self, dataset_id):
self.statsd.incr('job')
status = self.get_argument('status',default=None)

passkey = self.auth_access_token

jobs = await self.rest_client.request('GET', '/datasets/{}/jobs'.format(dataset_id))
Expand All @@ -501,9 +485,7 @@ class Job(PublicHandler):
"""Handle /job urls"""
@authenticated
async def get(self, dataset_id, job_id):
self.statsd.incr('job')
status = self.get_argument('status',default=None)

passkey = self.auth_access_token

dataset = await self.rest_client.request('GET', '/datasets/{}'.format(dataset_id))
Expand All @@ -522,7 +504,6 @@ class Documentation(PublicHandler):
async def get(self, url):
# try to get the user, if available
await self.get_current_user_async()
self.statsd.incr('documentation')
doc_path = str(importlib.resources.files('iceprod.server')/'data'/'docs')
full_path = os.path.join(doc_path, url)
if not full_path.startswith(doc_path):
Expand All @@ -536,7 +517,6 @@ async def get(self, url):
class Log(PublicHandler):
@authenticated
async def get(self, dataset_id, log_id):
self.statsd.incr('log')
ret = await self.rest_client.request('GET','/datasets/{}/logs/{}'.format(dataset_id, log_id))
log_text = ret['data']
html = '<html><head><title>' + ret['name'] + '</title></head><body>'
Expand All @@ -552,7 +532,6 @@ class Help(PublicHandler):
async def get(self):
# try to get the user, if available
await self.get_current_user_async()
self.statsd.incr('help')
self.render('help.html')


Expand All @@ -562,7 +541,6 @@ class Other(PublicHandler):
async def get(self):
# try to get the user, if available
await self.get_current_user_async()
self.statsd.incr('other')
path = self.request.path
self.set_status(404)
self.render('404.html', path=path)
Expand All @@ -572,7 +550,6 @@ class Profile(PublicHandler):
"""Handle user profile page"""
@authenticated
async def get(self):
self.statsd.incr('profile')
username = self.current_user
groups = self.auth_groups
group_creds = {}
Expand Down Expand Up @@ -634,7 +611,6 @@ async def post(self):
class Logout(PublicHandler):
@catch_error
async def get(self):
self.statsd.incr('logout')
self.clear_tokens()
self.current_user = None
self.request.uri = '/' # for login redirect, fake the main page
Expand Down Expand Up @@ -690,8 +666,7 @@ def __init__(self):
'ICEPROD_CRED_CLIENT_ID': '',
'ICEPROD_CRED_CLIENT_SECRET': '',
'COOKIE_SECRET': '',
'STATSD_ADDRESS': '',
'STATSD_PREFIX': 'rest_api',
'PROMETHEUS_PORT': 0,
'CI_TESTING': '',
}
config = from_environment(default_config)
Expand All @@ -714,7 +689,7 @@ def __init__(self):

rest_config = {
'debug': config['DEBUG'],
'server_header': 'IceProd/' + iceprod.__version__,
'server_header': 'IceProd/' + version_string,
}

if config['OPENID_URL']:
Expand All @@ -734,17 +709,8 @@ def __init__(self):
else:
raise RuntimeError('OPENID_URL not specified, and CI_TESTING not enabled!')

statsd = FakeStatsClient()
if config['STATSD_ADDRESS']:
try:
addr = config['STATSD_ADDRESS']
port = 8125
if ':' in addr:
addr,port = addr.split(':')
port = int(port)
statsd = StatsClientIgnoreErrors(addr, port=port, prefix=config['STATSD_PREFIX'])
except Exception:
logger.warning('failed to connect to statsd: %r', config['STATSD_ADDRESS'], exc_info=True)
# enable monitoring
self.prometheus_port = config['PROMETHEUS_PORT'] if config['PROMETHEUS_PORT'] > 0 else None

if config['ICEPROD_CRED_CLIENT_ID'] and config['ICEPROD_CRED_CLIENT_SECRET']:
logging.info(f'enabling auth via {config["OPENID_URL"]} for aud "{config["OPENID_AUDIENCE"]}"')
Expand Down Expand Up @@ -785,7 +751,6 @@ def __init__(self):
raise RuntimeError('ICEPROD_API_CLIENT_ID or ICEPROD_API_CLIENT_SECRET not specified, and CI_TESTING not enabled!')

handler_args.update({
'statsd': statsd,
'rest_api': rest_address,
'system_rest_client': rest_client,
})
Expand Down Expand Up @@ -828,7 +793,14 @@ def __init__(self):
self.server = server

async def start(self):
pass
if self.prometheus_port:
logging.info("starting prometheus on {}", self.prometheus_port)
start_http_server(self.prometheus_port)
i = Info('iceprod', 'IceProd information')
i.info({
'version': version_string,
'type': 'website',
})

async def stop(self):
await self.server.stop()

0 comments on commit 260981d

Please sign in to comment.