diff --git a/cli.py b/cli.py index 608e78c40..ca3d837f7 100644 --- a/cli.py +++ b/cli.py @@ -203,6 +203,18 @@ def slack_message_cli(message): manage.slack_message(message) +@app.cli.command('check_long_queries') +@click.argument('minutes', default=5, required=False) +def check_long_queries_cli(minutes): + manage.check_long_queries(minutes) + + +@app.cli.command('clear_long_queries') +@click.argument('minutes', default=5, required=False) +def clear_long_queries_cli(minutes): + manage.clear_long_queries(minutes) + + @app.shell_context_processor def make_shell_context(): return {'app': app, 'db': db, 'models': models} diff --git a/manage.py b/manage.py index 6249edd54..e6db8ef97 100755 --- a/manage.py +++ b/manage.py @@ -152,6 +152,86 @@ def cf_startup(): subprocess.Popen(["python", "cli.py", "refresh_materialized"]) +def check_long_queries(minutes: int): + """ + Check for queries running longer than interval, default is 5 + """ + SLACK_BOTS = "#bots" + SQL = """ + SELECT * + FROM pg_stat_activity + WHERE datname <> '{}' + and usename = '{}' + and state = 'active' + and lower(query) like 'select %' + and lower(query) not like '%refresh%' + and lower(query) not like '%rollback%' + and (now() - pg_stat_activity.query_start) >= interval '{} minutes' + order by pg_stat_activity.query_start desc; + """ + datname = env.get_credential('DB_DATNAME') + usename = env.get_credential('DB_USENAME') + SQL_formatted = SQL.format(datname, usename, minutes) + try: + if minutes < 2: + raise ValueError("Interval must be greater than 2 minutes") + space = env.app.get("space_name") + if space == 'prod' and (db.session.get_bind().url.__to_string__()) != env.get_credential('SQLA_FOLLOWERS'): + raise ValueError("Must be run in RO replica") + results = db.session.execute(SQL_formatted) + rows = (results.fetchall()) + for row in rows: + logger.info(row) + total_rows = results.rowcount + slack_message = "Currently {} queries running longer than {} minutes in {}".format(total_rows, minutes, space) + logger.info(slack_message) + post_to_slack(slack_message, SLACK_BOTS) + except Exception as error: + logger.exception(error) + slack_message = "*ERROR* long running query check failed." + slack_message = slack_message + "\n Error message: " + str(error) + post_to_slack(slack_message, SLACK_BOTS) + + +def clear_long_queries(minutes: int): + """ + Terminate queries running longer than interval minutes, default is 5 + """ + SLACK_BOTS = "#bots" + SQL = """ + SELECT pg_terminate_backend(pid) + FROM pg_stat_activity + WHERE datname <> '{}' + and usename = '{}' + and state = 'active' + and lower(query) like 'select %' + and lower(query) not like '%refresh%' + and lower(query) not like '%rollback%' + and (now() - pg_stat_activity.query_start) >= interval '{} minutes' + order by pg_stat_activity.query_start desc; + """ + datname = env.get_credential('DB_DATNAME') + usename = env.get_credential('DB_USENAME') + SQL_formatted = SQL.format(datname, usename, minutes) + try: + if minutes < 2: + raise ValueError("Interval must be greater than 2 minutes") + space = env.app.get("space_name") + if space == 'prod' and (db.session.get_bind().url.__to_string__()) != env.get_credential('SQLA_FOLLOWERS'): + raise ValueError("Must be run in RO replica") + results = db.session.execute(SQL_formatted) + total_rows = results.rowcount + space = env.app.get("space_name") + slack_message = "Terminated {} queries running longer than {} minutes in {}".format(total_rows, minutes, space) + logger.info(slack_message) + post_to_slack(slack_message, SLACK_BOTS) + except Exception as error: + logger.exception(error) + slack_message = "*ERROR* long running query termination failed." + slack_message = slack_message + "\n Error message: " + str(error) + post_to_slack(slack_message, SLACK_BOTS) + + def slack_message(message): """ Sends a message to the bots channel. you can add this command to ping you when a task is done, etc. run ./manage.py slack_message 'The message you want to post'