Skip to content

Commit

Permalink
<refact>: del cron job when rule is disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
hidaris committed Dec 14, 2020
1 parent 7b1a02b commit 1ba0662
Showing 1 changed file with 19 additions and 14 deletions.
33 changes: 19 additions & 14 deletions thingtalk/rule_engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import typing
import uuid

from enum import Enum

Expand Down Expand Up @@ -68,7 +67,10 @@ class Rule(RuleInput):


class Operation:
def __init__(self, questions: typing.Dict[str, Question], enabled=True, conclusion=None):
def __init__(self,
questions: typing.Dict[str, Question],
enabled=True,
conclusion=None):
self.questions = questions
self.enabled = enabled
self.conclusion = conclusion
Expand Down Expand Up @@ -97,7 +99,7 @@ def __init__(self):
self.memo_map = {}

# pylint: disable=no-else-return
async def visit(self, expr: Operation):
async def visit(self, expr: Operation) -> bool:
"""Apply the visitor to an expression."""
if expr in self.memo_map:
return self.memo_map[expr]
Expand All @@ -113,16 +115,16 @@ async def visit(self, expr: Operation):

return res

async def visit_and(self, _: And):
async def visit_and(self, _: And) -> bool:
raise NotImplementedError()

async def visit_or(self, _: Or):
async def visit_or(self, _: Or) -> bool:
raise NotImplementedError()


class RuleComputeVisitor(OperationFunctor):

def compute_question(self, question_key: str, question: Question):
def compute_question(self, question_key: str, question: Question) -> bool:
logger.debug(f"{question_env[question_key]} {question.value}")
if question.op == "eq":
res = question_env[question_key] == question.value
Expand All @@ -137,14 +139,14 @@ def compute_question(self, question_key: str, question: Question):
res = False
return res

async def run_conclusion(self, _operation: Operation):
async def run_conclusion(self, _operation: Operation) -> None:
for conclusion in _operation.conclusion:
logger.debug(conclusion.topic)
ee.emit(conclusion.topic, conclusion)
for question_key, should_value in tuple(_operation.questions.items()):
question_env[question_key] = None

async def visit_and(self, _and: And):
async def visit_and(self, _and: And) -> bool:
ans = True
for question_key, question in tuple(_and.questions.items()):
logger.debug(f"Question: {question}")
Expand All @@ -155,7 +157,7 @@ async def visit_and(self, _and: And):
logger.debug(_and.conclusion)
await self.run_conclusion(_and)

async def visit_or(self, _or: Or):
async def visit_or(self, _or: Or) -> bool:
ans = False
for question_key, question in tuple(_or.questions.items()):
logger.debug(f"Question: {question}")
Expand All @@ -165,26 +167,26 @@ async def visit_or(self, _or: Or):
await self.run_conclusion(_or)


def generate_question_id(topic: str, property_name: str):
def generate_question_id(topic: str, property_name: str) -> bool:
topic_words = topic.split("/")
return f"things_{topic_words[1]}_{property_name}"


def generate_scenes_id(topic: str):
def generate_scenes_id(topic: str) -> bool:
topic_words = topic.split("/")
return f"scenes_{topic_words[1]}"


def generate_cron_id(rule_id, messageType, time):
def generate_cron_id(rule_id, messageType, time) -> bool:
return f"cron_{rule_id}_{messageType}_{time}"


def generate_rule_id(topic: str, property_name: str, property_value: typing.Any):
def generate_rule_id(topic: str, property_name: str, property_value: typing.Any) -> bool:
topic_words = topic.split("/")
return f"things_{topic_words[1]}_{property_name}_{property_value}"


async def report_cron_status(question_key):
async def report_cron_status(question_key) -> None:
message = {
"topic": question_key,
"messageType": "cronStatus",
Expand Down Expand Up @@ -361,4 +363,7 @@ async def disable_rule(self, pre, rule):
if rule_map:
if rule_map.get(rule_pk):
del rule_map[rule_pk]
logger.debug(msh.jobs)
msh.del_job(rule_id)
logger.debug(msh.jobs)
logger.info(self.rule_env)

0 comments on commit 1ba0662

Please sign in to comment.