diff --git a/hatchet/external/console.py b/hatchet/external/console.py index 77b01b35..f1d61620 100644 --- a/hatchet/external/console.py +++ b/hatchet/external/console.py @@ -35,14 +35,20 @@ import numpy as np import warnings from ..util.colormaps import ColorMaps +from ..util.perf_measure import annotate + + +_console_class_annotate = annotate(fmt="HatchetConsoleRenderer.{}") class ConsoleRenderer: + @_console_class_annotate def __init__(self, unicode=False, color=False): self.unicode = unicode self.color = color self.visited = [] + @_console_class_annotate def render(self, roots, dataframe, **kwargs): self.render_header = kwargs["render_header"] @@ -161,6 +167,7 @@ def render(self, roots, dataframe, **kwargs): return result.encode("utf-8") # pylint: disable=W1401 + @_console_class_annotate def render_preamble(self): lines = [ r" __ __ __ __ ", @@ -174,6 +181,7 @@ def render_preamble(self): return "\n".join(lines) + @_console_class_annotate def render_legend(self): def render_label(index, low, high): metric_range = self.max_metric - self.min_metric @@ -247,6 +255,7 @@ def render_label(index, low, high): return legend + @_console_class_annotate def render_frame(self, node, dataframe, indent="", child_indent=""): node_depth = node._depth if node_depth < self.depth: @@ -288,8 +297,8 @@ def render_frame(self, node, dataframe, indent="", child_indent=""): "none": "", "constant": "\U00002192", "phased": "\U00002933", - "dynamic": "\U000021DD", - "sporadic": "\U0000219D", + "dynamic": "\U000021dd", + "sporadic": "\U0000219d", } pattern_metric = dataframe.loc[df_index, self.annotation_column] annotation_content = self.temporal_symbols[pattern_metric] diff --git a/hatchet/frame.py b/hatchet/frame.py index b090a963..bcd5919f 100644 --- a/hatchet/frame.py +++ b/hatchet/frame.py @@ -5,6 +5,11 @@ from functools import total_ordering +from .util.perf_measure import annotate + + +_frame_annotate = annotate(fmt="Frame.{}") + @total_ordering class Frame: @@ -14,6 +19,7 @@ class Frame: attrs (dict): dictionary of attributes and values """ + @_frame_annotate def __init__(self, attrs=None, **kwargs): """Construct a frame from a dictionary, or from immediate kwargs. @@ -48,41 +54,52 @@ def __init__(self, attrs=None, **kwargs): self._tuple_repr = None + @_frame_annotate def __eq__(self, other): return self.tuple_repr == other.tuple_repr + @_frame_annotate def __lt__(self, other): return self.tuple_repr < other.tuple_repr + @_frame_annotate def __gt__(self, other): return self.tuple_repr > other.tuple_repr + @_frame_annotate def __hash__(self): return hash(self.tuple_repr) + @_frame_annotate def __str__(self): """str() with sorted attributes, so output is deterministic.""" return "{%s}" % ", ".join("'%s': '%s'" % (k, v) for k, v in self.tuple_repr) + @_frame_annotate def __repr__(self): return "Frame(%s)" % self @property + @_frame_annotate def tuple_repr(self): """Make a tuple of attributes and values based on reader.""" if not self._tuple_repr: self._tuple_repr = tuple(sorted((k, v) for k, v in self.attrs.items())) return self._tuple_repr + @_frame_annotate def copy(self): return Frame(self.attrs.copy()) + @_frame_annotate def __getitem__(self, name): return self.attrs[name] + @_frame_annotate def get(self, name, default=None): return self.attrs.get(name, default) + @_frame_annotate def values(self, names): """Return a tuple of attribute values from this Frame.""" if isinstance(names, (list, tuple)): diff --git a/hatchet/graph.py b/hatchet/graph.py index 55194bd2..c787a8de 100644 --- a/hatchet/graph.py +++ b/hatchet/graph.py @@ -6,8 +6,13 @@ from collections import defaultdict from .node import Node, traversal_order, node_traversal_order +from .util.perf_measure import annotate +_graph_annotate = annotate(fmt="Graph.{}") + + +@annotate() def index_by(attr, objects): """Put objects into lists based on the value of an attribute. @@ -23,11 +28,13 @@ def index_by(attr, objects): class Graph: """A possibly multi-rooted tree or graph from one input dataset.""" + @_graph_annotate def __init__(self, roots): assert roots is not None self.roots = roots self.node_ordering = False + @_graph_annotate def traverse(self, order="pre", attrs=None, visited=None): """Preorder traversal of all roots of this Graph. @@ -47,6 +54,7 @@ def traverse(self, order="pre", attrs=None, visited=None): for value in root.traverse(order=order, attrs=attrs, visited=visited): yield value + @_graph_annotate def node_order_traverse(self, order="pre", attrs=None, visited=None): """Preorder traversal of all roots of this Graph, sorting by "node order" column. @@ -69,6 +77,7 @@ def node_order_traverse(self, order="pre", attrs=None, visited=None): ): yield value + @_graph_annotate def is_tree(self): """True if this graph is a tree, false otherwise.""" if len(self.roots) > 1: @@ -78,6 +87,7 @@ def is_tree(self): list(self.traverse(visited=visited)) return all(v == 1 for v in visited.values()) + @_graph_annotate def find_merges(self): """Find nodes that have the same parent and frame. @@ -135,6 +145,7 @@ def _find_child_merges(node_list): return merges + @_graph_annotate def merge_nodes(self, merges): """Merge some nodes in a graph into others. @@ -159,11 +170,13 @@ def transform(node_list): child.parents = transform(child.parents) self.roots = transform(self.roots) + @_graph_annotate def normalize(self): merges = self.find_merges() self.merge_nodes(merges) return merges + @_graph_annotate def copy(self, old_to_new=None): """Create and return a copy of this graph. @@ -192,6 +205,7 @@ def copy(self, old_to_new=None): return graph + @_graph_annotate def union(self, other, old_to_new=None): """Create the union of self and other and return it as a new Graph. @@ -342,6 +356,7 @@ def connect(parent, new_node): return graph + @_graph_annotate def enumerate_depth(self): def _iter_depth(node, visited): for child in node.children: @@ -356,6 +371,7 @@ def _iter_depth(node, visited): root._depth = 0 # depth of root node is 0 _iter_depth(root, visited) + @_graph_annotate def enumerate_traverse(self): if not self._check_enumerate_traverse(): # if "node order" column exists, we traverse sorting by _hatchet_nid @@ -379,10 +395,12 @@ def _check_enumerate_traverse(self): if i != node._hatchet_nid: return False + @_graph_annotate def __len__(self): """Size of the graph in terms of number of nodes.""" return sum(1 for _ in self.traverse()) + @_graph_annotate def __eq__(self, other): """Check if two graphs have the same structure by comparing frame at each node. @@ -415,10 +433,12 @@ def __eq__(self, other): return True + @_graph_annotate def __ne__(self, other): return not (self == other) @staticmethod + @_graph_annotate def from_lists(*roots): """Convenience method to invoke Node.from_lists() on each root value.""" if not all(isinstance(r, (list, tuple)) for r in roots): diff --git a/hatchet/graphframe.py b/hatchet/graphframe.py index d041597b..a7d2ff6a 100644 --- a/hatchet/graphframe.py +++ b/hatchet/graphframe.py @@ -26,6 +26,7 @@ ) from .util.deprecated import deprecated_params from .util.dot import trees_to_dot +from .util.perf_measure import annotate try: from .cython_modules.libs import graphframe_modules as _gfm_cy @@ -39,6 +40,10 @@ raise +_graphframe_annotate = annotate(fmt="GraphFrame.{}") + + +@annotate() def parallel_apply(filter_function, dataframe, queue): """A function called in parallel, which does a pandas apply on part of a dataframe and returns the results via multiprocessing queue function.""" @@ -52,6 +57,7 @@ class GraphFrame: and a dataframe. """ + @_graphframe_annotate def __init__( self, graph, @@ -94,6 +100,7 @@ def __init__( self.query_engine = QueryEngine() @staticmethod + @_graphframe_annotate def from_hpctoolkit(dirname): """Read an HPCToolkit database directory into a new GraphFrame. @@ -110,6 +117,7 @@ def from_hpctoolkit(dirname): return HPCToolkitReader(dirname).read() @staticmethod + @_graphframe_annotate def from_hpctoolkit_latest( dirname: str, max_depth: int = None, @@ -139,6 +147,7 @@ def from_hpctoolkit_latest( ).read() @staticmethod + @_graphframe_annotate def from_caliper(filename_or_stream, query=None): """Read in a Caliper .cali or .json file. @@ -154,6 +163,7 @@ def from_caliper(filename_or_stream, query=None): return CaliperReader(filename_or_stream, query).read() @staticmethod + @_graphframe_annotate def from_caliperreader( filename_or_caliperreader, native=False, string_attributes=[] ): @@ -174,6 +184,7 @@ def from_caliperreader( ).read() @staticmethod + @_graphframe_annotate def from_timeseries( filename_or_caliperreader, level="loop.start_iteration", @@ -197,6 +208,7 @@ def from_timeseries( ).read_timeseries(level=level) @staticmethod + @_graphframe_annotate def from_spotdb(db_key, list_of_ids=None): """Read multiple graph frames from a SpotDB instance @@ -221,6 +233,7 @@ def from_spotdb(db_key, list_of_ids=None): return SpotDBReader(db_key, list_of_ids).read() @staticmethod + @_graphframe_annotate def from_gprof_dot(filename): """Read in a DOT file generated by gprof2dot.""" # import this lazily to avoid circular dependencies @@ -229,6 +242,7 @@ def from_gprof_dot(filename): return GprofDotReader(filename).read() @staticmethod + @_graphframe_annotate def from_cprofile(filename): """Read in a pstats/prof file generated using python's cProfile.""" # import this lazily to avoid circular dependencies @@ -237,6 +251,7 @@ def from_cprofile(filename): return CProfileReader(filename).read() @staticmethod + @_graphframe_annotate def from_pyinstrument(filename): """Read in a JSON file generated using Pyinstrument.""" # import this lazily to avoid circular dependencies @@ -245,6 +260,7 @@ def from_pyinstrument(filename): return PyinstrumentReader(filename).read() @staticmethod + @_graphframe_annotate def from_tau(dirname): """Read in a profile generated using TAU.""" # import this lazily to avoid circular dependencies @@ -253,6 +269,7 @@ def from_tau(dirname): return TAUReader(dirname).read() @staticmethod + @_graphframe_annotate def from_timemory(input=None, select=None, **_kwargs): """Read in timemory data. @@ -337,6 +354,7 @@ def from_timemory(input=None, select=None, **_kwargs): raise @staticmethod + @_graphframe_annotate def from_literal(graph_dict): """Create a GraphFrame from a list of dictionaries.""" # import this lazily to avoid circular dependencies @@ -345,6 +363,7 @@ def from_literal(graph_dict): return LiteralReader(graph_dict).read() @staticmethod + @_graphframe_annotate def from_lists(*lists): """Make a simple GraphFrame from lists. @@ -367,24 +386,28 @@ def from_lists(*lists): return gf @staticmethod + @_graphframe_annotate def from_json(json_spec, **kwargs): from .readers.json_reader import JsonReader return JsonReader(json_spec).read(**kwargs) @staticmethod + @_graphframe_annotate def from_hdf(filename, **kwargs): # import this lazily to avoid circular dependencies from .readers.hdf5_reader import HDF5Reader return HDF5Reader(filename).read(**kwargs) + @_graphframe_annotate def to_hdf(self, filename, key="hatchet_graphframe", **kwargs): # import this lazily to avoid circular dependencies from .writers.hdf5_writer import HDF5Writer HDF5Writer(filename).write(self, key=key, **kwargs) + @_graphframe_annotate def copy(self): """Return a partially shallow copy of the graphframe. @@ -411,6 +434,7 @@ def copy(self): copy.copy(self.metadata), ) + @_graphframe_annotate def deepcopy(self): """Return a deep copy of the graphframe. @@ -446,6 +470,7 @@ def deepcopy(self): copy.deepcopy(self.metadata), ) + @_graphframe_annotate def drop_index_levels(self, function=np.mean): """Drop all index levels but `node`.""" index_names = list(self.dataframe.index.names) @@ -465,6 +490,7 @@ def drop_index_levels(self, function=np.mean): self.dataframe = agg_df + @_graphframe_annotate def filter( self, filter_obj, @@ -571,6 +597,7 @@ def filter( return filtered_gf.squash(update_inc_cols) return filtered_gf + @_graphframe_annotate def squash(self, update_inc_cols=True): """Rewrite the Graph to include only nodes present in the DataFrame's rows. @@ -690,6 +717,7 @@ def _init_sum_columns(self, columns, out_columns): return out_columns + @_graphframe_annotate def subtree_sum( self, columns, out_columns=None, function=lambda x: x.sum(min_count=1) ): @@ -750,6 +778,7 @@ def subtree_sum( self.dataframe.loc[[node] + node.children, col] ) + @_graphframe_annotate def subgraph_sum( self, columns, out_columns=None, function=lambda x: x.sum(min_count=1) ): @@ -813,6 +842,7 @@ def subgraph_sum( function(self.dataframe.loc[(subgraph_nodes), columns]) ) + @_graphframe_annotate def generate_exclusive_columns(self, inc_metrics=None): """Generates exclusive metrics from available inclusive metrics. Arguments: @@ -905,6 +935,7 @@ def generate_exclusive_columns(self, inc_metrics=None): self.exc_metrics.extend([metric_tuple[0] for metric_tuple in generation_pairs]) self.exc_metrics = list(set(self.exc_metrics)) + @_graphframe_annotate def update_inclusive_columns(self): """Update inclusive columns (typically after operations that rewire the graph. @@ -938,10 +969,12 @@ def update_inclusive_columns(self): self.subgraph_sum(self.exc_metrics, self.inc_metrics) self.inc_metrics = list(set(self.inc_metrics + old_inc_metrics)) + @_graphframe_annotate def show_metric_columns(self): """Returns a list of dataframe column labels.""" return list(self.exc_metrics + self.inc_metrics) + @_graphframe_annotate def unify(self, other): """Returns a unified graphframe. @@ -984,6 +1017,7 @@ def unify(self, other): context="context_column", invert_colors="invert_colormap", ) + @_graphframe_annotate def tree( self, metric_column=None, @@ -1068,6 +1102,7 @@ def tree( max_value=max_value, ) + @_graphframe_annotate def to_dot(self, metric=None, name="name", rank=0, thread=0, threshold=0.0): """Write the graph in the graphviz dot format: https://www.graphviz.org/doc/info/lang.html @@ -1078,6 +1113,7 @@ def to_dot(self, metric=None, name="name", rank=0, thread=0, threshold=0.0): self.graph.roots, self.dataframe, metric, name, rank, thread, threshold ) + @_graphframe_annotate def to_flamegraph(self, metric=None, name="name", rank=0, thread=0, threshold=0.0): """Write the graph in the folded stack output required by FlameGraph http://www.brendangregg.com/flamegraphs.html @@ -1141,6 +1177,7 @@ def to_flamegraph(self, metric=None, name="name", rank=0, thread=0, threshold=0. return folded_stack + @_graphframe_annotate def to_literal(self, name="name", rank=0, thread=0, cat_columns=[]): """Format this graph as a list of dictionaries for Roundtrip visualizations. @@ -1223,6 +1260,7 @@ def add_nodes(hnode): return graph_literal + @_graphframe_annotate def to_dict(self): hatchet_dict = {} @@ -1251,6 +1289,7 @@ def to_dict(self): return hatchet_dict + @_graphframe_annotate def to_json(self): return json.dumps(self.to_dict()) @@ -1386,6 +1425,7 @@ def _insert_missing_rows(self, other): return self + @_graphframe_annotate def groupby_aggregate(self, groupby_function, agg_function): """Groupby-aggregate dataframe and reindex the Graph. @@ -1503,6 +1543,7 @@ def reindex(node, parent, visited): new_gf.drop_index_levels() return new_gf + @_graphframe_annotate def add(self, other): """Returns the column-wise sum of two graphframes as a new graphframe. @@ -1521,6 +1562,7 @@ def add(self, other): return self_copy._operator(other_copy, self_copy.dataframe.add) + @_graphframe_annotate def sub(self, other): """Returns the column-wise difference of two graphframes as a new graphframe. @@ -1540,6 +1582,7 @@ def sub(self, other): return self_copy._operator(other_copy, self_copy.dataframe.sub) + @_graphframe_annotate def div(self, other): """Returns the column-wise float division of two graphframes as a new graphframe. @@ -1558,6 +1601,7 @@ def div(self, other): return self_copy._operator(other_copy, self_copy.dataframe.divide) + @_graphframe_annotate def mul(self, other): """Returns the column-wise float multiplication of two graphframes as a new graphframe. @@ -1576,6 +1620,7 @@ def mul(self, other): return self_copy._operator(other_copy, self_copy.dataframe.multiply) + @_graphframe_annotate def __iadd__(self, other): """Computes column-wise sum of two graphframes and stores the result in self. @@ -1595,6 +1640,7 @@ def __iadd__(self, other): return self._operator(other_copy, self.dataframe.add) + @_graphframe_annotate def __add__(self, other): """Returns the column-wise sum of two graphframes as a new graphframe. @@ -1606,6 +1652,7 @@ def __add__(self, other): """ return self.add(other) + @_graphframe_annotate def __mul__(self, other): """Returns the column-wise multiplication of two graphframes as a new graphframe. @@ -1617,6 +1664,7 @@ def __mul__(self, other): """ return self.mul(other) + @_graphframe_annotate def __isub__(self, other): """Computes column-wise difference of two graphframes and stores the result in self. @@ -1636,6 +1684,7 @@ def __isub__(self, other): return self._operator(other_copy, self.dataframe.sub) + @_graphframe_annotate def __sub__(self, other): """Returns the column-wise difference of two graphframes as a new graphframe. @@ -1648,6 +1697,7 @@ def __sub__(self, other): """ return self.sub(other) + @_graphframe_annotate def __idiv__(self, other): """Computes column-wise float division of two graphframes and stores the result in self. @@ -1667,6 +1717,7 @@ def __idiv__(self, other): return self._operator(other_copy, self.dataframe.div) + @_graphframe_annotate def __truediv__(self, other): """Returns the column-wise float division of two graphframes as a new graphframe. @@ -1679,6 +1730,7 @@ def __truediv__(self, other): """ return self.div(other) + @_graphframe_annotate def __imul__(self, other): """Computes column-wise float multiplication of two graphframes and stores the result in self. diff --git a/hatchet/node.py b/hatchet/node.py index 672627a2..61979d9e 100644 --- a/hatchet/node.py +++ b/hatchet/node.py @@ -6,13 +6,19 @@ from functools import total_ordering from .frame import Frame +from .util.perf_measure import annotate +_node_annotate = annotate(fmt="Node.{}") + + +@annotate() def traversal_order(node): """Deterministic key function for sorting nodes in traversals.""" return (node.frame, id(node)) +@annotate() def node_traversal_order(node): """Deterministic key function for sorting nodes by specified "node order" (which gets assigned to _hatchet_nid) in traversals.""" @@ -23,6 +29,7 @@ def node_traversal_order(node): class Node: """A node in the graph. The node only stores its frame.""" + @_node_annotate def __init__(self, frame_obj, parent=None, hnid=-1, depth=-1): self.frame = frame_obj self._depth = depth @@ -33,16 +40,19 @@ def __init__(self, frame_obj, parent=None, hnid=-1, depth=-1): self.add_parent(parent) self.children = [] + @_node_annotate def add_parent(self, node): """Adds a parent to this node's list of parents.""" assert isinstance(node, Node) self.parents.append(node) + @_node_annotate def add_child(self, node): """Adds a child to this node's list of children.""" assert isinstance(node, Node) self.children.append(node) + @_node_annotate def paths(self): """List of tuples, one for each path from this node to any root. @@ -58,6 +68,7 @@ def paths(self): paths.extend([path + node_value for path in parent_paths]) return paths + @_node_annotate def path(self, attrs=None): """Path to this node from root. Raises if there are multiple paths. @@ -71,6 +82,7 @@ def path(self, attrs=None): raise MultiplePathError("Node has more than one path: " % paths) return paths[0] + @_node_annotate def dag_equal(self, other, vs=None, vo=None): """Check if DAG rooted at self has the same structure as that rooted at other. @@ -113,6 +125,7 @@ def dag_equal(self, other, vs=None, vo=None): return True + @_node_annotate def traverse(self, order="pre", attrs=None, visited=None): """Traverse the tree depth-first and yield each node. @@ -149,6 +162,7 @@ def value(node): if order == "post": yield value(self) + @_node_annotate def node_order_traverse(self, order="pre", attrs=None, visited=None): """Traverse the tree depth-first and yield each node, sorting children by "node order". @@ -187,27 +201,34 @@ def value(node): if order == "post": yield value(self) + @_node_annotate def __hash__(self): return self._hatchet_nid + @_node_annotate def __eq__(self, other): return self._hatchet_nid == other._hatchet_nid + @_node_annotate def __lt__(self, other): return self._hatchet_nid < other._hatchet_nid + @_node_annotate def __gt__(self, other): return self._hatchet_nid > other._hatchet_nid + @_node_annotate def __str__(self): """Returns a string representation of the node.""" return str(self.frame) + @_node_annotate def copy(self): """Copy this node without preserving parents or children.""" return Node(frame_obj=self.frame.copy()) @classmethod + @_node_annotate def from_lists(cls, lists): r"""Construct a hierarchy of nodes from recursive lists. @@ -278,6 +299,7 @@ def _from_lists(lists, parent): return _from_lists(lists, None) + @_node_annotate def __repr__(self): return "Node({%s})" % ", ".join( "%s: %s" % (repr(k), repr(v)) for k, v in sorted(self.frame.attrs.items()) diff --git a/hatchet/query/compat.py b/hatchet/query/compat.py index d62a0c5c..fce36f99 100644 --- a/hatchet/query/compat.py +++ b/hatchet/query/compat.py @@ -26,12 +26,23 @@ from .string_dialect import parse_string_dialect from .engine import QueryEngine from .errors import BadNumberNaryQueryArgs, InvalidQueryPath +from ..util.perf_measure import annotate # QueryEngine object for running the legacy "apply" methods COMPATABILITY_ENGINE = QueryEngine() +_abstract_query_annotate = annotate(fmt="AbstractQuery.{}") +_nary_query_annotate = annotate(fmt="NaryQuery.{}") +_and_query_annotate = annotate(fmt="AndQuery.{}") +_or_query_annotate = annotate(fmt="OrQuery.{}") +_xor_query_annotate = annotate(fmt="XorQuery.{}") +_not_query_annotate = annotate(fmt="NotQuery.{}") +_query_matcher_annotate = annotate(fmt="QueryMatcher.{}") +_cypher_query_annotate = annotate(fmt="CypherQuery.{}") + + class AbstractQuery(ABC): """Base class for all 'old-style' queries.""" @@ -39,6 +50,7 @@ class AbstractQuery(ABC): def apply(self, gf): pass + @_abstract_query_annotate def __and__(self, other): """Create a new AndQuery using this query and another. @@ -50,6 +62,7 @@ def __and__(self, other): """ return AndQuery(self, other) + @_abstract_query_annotate def __or__(self, other): """Create a new OrQuery using this query and another. @@ -61,6 +74,7 @@ def __or__(self, other): """ return OrQuery(self, other) + @_abstract_query_annotate def __xor__(self, other): """Create a new XorQuery using this query and another. @@ -72,6 +86,7 @@ def __xor__(self, other): """ return XorQuery(self, other) + @_abstract_query_annotate def __invert__(self): """Create a new NotQuery using this query. @@ -89,6 +104,7 @@ class NaryQuery(AbstractQuery): """Base class for all compound queries that act on and merged N separate subqueries.""" + @_nary_query_annotate def __init__(self, *args): """Create a new NaryQuery object. @@ -115,6 +131,7 @@ def __init__(self, *args): high-level query or a subclass of AbstractQuery" ) + @_nary_query_annotate def apply(self, gf): """Applies the query to the specified GraphFrame. @@ -150,6 +167,7 @@ class AndQuery(NaryQuery): """Compound query that returns the intersection of the results of the subqueries.""" + @_and_query_annotate def __init__(self, *args): """Create a new AndQuery object. @@ -180,6 +198,7 @@ class OrQuery(NaryQuery): """Compound query that returns the union of the results of the subqueries""" + @_or_query_annotate def __init__(self, *args): """Create a new OrQuery object. @@ -210,6 +229,7 @@ class XorQuery(NaryQuery): """Compound query that returns the symmetric difference (i.e., set-based XOR) of the results of the subqueries""" + @_xor_query_annotate def __init__(self, *args): """Create a new XorQuery object. @@ -240,6 +260,7 @@ class NotQuery(NaryQuery): """Compound query that returns all nodes in the GraphFrame that are not returned from the subquery.""" + @_not_query_annotate def __init__(self, *args): """Create a new NotQuery object. @@ -265,6 +286,7 @@ def _convert_to_new_query(self, subqueries): class QueryMatcher(AbstractQuery): """Processes and applies base syntax queries and Object-based queries to GraphFrames.""" + @_query_matcher_annotate def __init__(self, query=None): """Create a new QueryMatcher object. @@ -285,6 +307,7 @@ def __init__(self, query=None): else: raise InvalidQueryPath("Provided query is not a valid object dialect query") + @_query_matcher_annotate def match(self, wildcard_spec=".", filter_func=lambda row: True): """Start a query with a root node described by the arguments. @@ -299,6 +322,7 @@ def match(self, wildcard_spec=".", filter_func=lambda row: True): self.true_query.match(wildcard_spec, filter_func) return self + @_query_matcher_annotate def rel(self, wildcard_spec=".", filter_func=lambda row: True): """Add another edge and node to the query. @@ -313,6 +337,7 @@ def rel(self, wildcard_spec=".", filter_func=lambda row: True): self.true_query.rel(wildcard_spec, filter_func) return self + @_query_matcher_annotate def apply(self, gf): """Apply the query to a GraphFrame. @@ -336,6 +361,7 @@ def _get_new_query(self): class CypherQuery(QueryMatcher): """Processes and applies Strinb-based queries to GraphFrames.""" + @_cypher_query_annotate def __init__(self, cypher_query): """Create a new Cypher object. @@ -358,6 +384,7 @@ def _get_new_query(self): return self.true_query +@annotate() def parse_cypher_query(cypher_query): """Parse all types of String-based queries, including multi-queries that leverage the curly brace delimiters. diff --git a/hatchet/query/compound.py b/hatchet/query/compound.py index 73a1ed1a..b35995cc 100644 --- a/hatchet/query/compound.py +++ b/hatchet/query/compound.py @@ -11,11 +11,20 @@ from .string_dialect import parse_string_dialect from .object_dialect import ObjectQuery from .errors import BadNumberNaryQueryArgs +from ..util.perf_measure import annotate + + +_compound_query_annotate = annotate(fmt="CompoundQuery.{}") +_conjunction_query_annotate = annotate(fmt="ConjunctionQuery.{}") +_disjunction_query_annotate = annotate(fmt="DisjunctionQuery.{}") +_exclusive_disjunction_query_annotate = annotate(fmt="ExclusiveDisjunctionQuery.{}") +_negation_query_annotate = annotate(fmt="NegationQuery.{}") class CompoundQuery(object): """Base class for all types of compound queries.""" + @_compound_query_annotate def __init__(self, *queries): """Collect the provided queries into a list, constructing ObjectQuery and StringQuery objects as needed. @@ -51,6 +60,7 @@ class ConjunctionQuery(CompoundQuery): using set conjunction. """ + @_conjunction_query_annotate def __init__(self, *queries): """Create the ConjunctionQuery. @@ -66,6 +76,7 @@ def __init__(self, *queries): "ConjunctionQuery requires 2 or more subqueries" ) + @_conjunction_query_annotate def _apply_op_to_results(self, subquery_results, graph): """Combines the results of the subqueries using set conjunction. @@ -85,6 +96,7 @@ class DisjunctionQuery(CompoundQuery): using set disjunction. """ + @_disjunction_query_annotate def __init__(self, *queries): """Create the DisjunctionQuery. @@ -100,6 +112,7 @@ def __init__(self, *queries): "DisjunctionQuery requires 2 or more subqueries" ) + @_disjunction_query_annotate def _apply_op_to_results(self, subquery_results, graph): """Combines the results of the subqueries using set disjunction. @@ -119,6 +132,7 @@ class ExclusiveDisjunctionQuery(CompoundQuery): using exclusive set disjunction. """ + @_exclusive_disjunction_query_annotate def __init__(self, *queries): """Create the ExclusiveDisjunctionQuery. @@ -132,6 +146,7 @@ def __init__(self, *queries): if len(self.subqueries) < 2: raise BadNumberNaryQueryArgs("XorQuery requires 2 or more subqueries") + @_exclusive_disjunction_query_annotate def _apply_op_to_results(self, subquery_results, graph): """Combines the results of the subqueries using exclusive set disjunction. @@ -153,6 +168,7 @@ class NegationQuery(CompoundQuery): its single subquery. """ + @_negation_query_annotate def __init__(self, *queries): """Create the NegationQuery. @@ -166,6 +182,7 @@ def __init__(self, *queries): if len(self.subqueries) != 1: raise BadNumberNaryQueryArgs("NotQuery requires exactly 1 subquery") + @_negation_query_annotate def _apply_op_to_results(self, subquery_results, graph): """Inverts the results of the subquery so that all nodes not in the results are returned. diff --git a/hatchet/query/engine.py b/hatchet/query/engine.py index 1f0bc7b5..221c5f7b 100644 --- a/hatchet/query/engine.py +++ b/hatchet/query/engine.py @@ -12,19 +12,26 @@ from .compound import CompoundQuery from .object_dialect import ObjectQuery from .string_dialect import parse_string_dialect +from ..util.perf_measure import annotate, begin_code_region, end_code_region + + +_query_engine_annotate = annotate(fmt="QueryEngine.{}") class QueryEngine: """Class for applying queries to GraphFrames.""" + @_query_engine_annotate def __init__(self): """Creates the QueryEngine.""" self.search_cache = {} + @_query_engine_annotate def reset_cache(self): """Resets the cache in the QueryEngine.""" self.search_cache = {} + @_query_engine_annotate def apply(self, query, graph, dframe): """Apply the query to a GraphFrame. @@ -59,6 +66,7 @@ def apply(self, query, graph, dframe): else: raise TypeError("Invalid query data type ({})".format(str(type(query)))) + @_query_engine_annotate def _cache_node(self, node, query, dframe): """Cache (Memoize) the parts of the query that the node matches. @@ -74,14 +82,19 @@ def _cache_node(self, node, query, dframe): for i, node_query in enumerate(query): _, filter_func = node_query row = None + begin_code_region("get_perf_data_for_node") if isinstance(dframe.index, pd.MultiIndex): row = pd.concat([dframe.loc[node]], keys=[node], names=["node"]) else: row = dframe.loc[node] + end_code_region("get_perf_data_for_node") + begin_code_region("apply_predicate") if filter_func(row): matches.append(i) + end_code_region("apply_predicate") self.search_cache[node._hatchet_nid] = matches + @_query_engine_annotate def _match_0_or_more(self, query, dframe, node, wcard_idx): """Process a "*" predicate in the query on a subgraph. @@ -128,6 +141,7 @@ def _match_0_or_more(self, query, dframe, node, wcard_idx): return [[]] return None + @_query_engine_annotate def _match_1(self, query, dframe, node, idx): """Process a "." predicate in the query on a subgraph. @@ -156,6 +170,7 @@ def _match_1(self, query, dframe, node, idx): return None return matches + @_query_engine_annotate def _match_pattern(self, query, dframe, pattern_root, match_idx): """Try to match the query pattern starting at the provided root node. @@ -221,6 +236,7 @@ def _match_pattern(self, query, dframe, pattern_root, match_idx): pattern_idx += 1 return matches + @_query_engine_annotate def _apply_impl(self, query, dframe, node, visited, matches): """Traverse the subgraph with the specified root, and collect all paths that match the query. diff --git a/hatchet/query/object_dialect.py b/hatchet/query/object_dialect.py index daf55c65..886ac9c6 100644 --- a/hatchet/query/object_dialect.py +++ b/hatchet/query/object_dialect.py @@ -15,6 +15,7 @@ from .errors import InvalidQueryPath, InvalidQueryFilter, MultiIndexModeMismatch from .query import Query +from ..util.perf_measure import annotate def _process_multi_index_mode(apply_result, multi_index_mode): @@ -27,6 +28,7 @@ def _process_multi_index_mode(apply_result, multi_index_mode): ) +@annotate() def _process_predicate(attr_filter, multi_index_mode): """Converts high-level API attribute filter to a lambda""" compops = ("<", ">", "==", ">=", "<=", "<>", "!=") # , @@ -218,6 +220,7 @@ def filter_choice(df_row): class ObjectQuery(Query): """Class for representing and parsing queries using the Object-based dialect.""" + @annotate(fmt="ObjectQuery.{}") def __init__(self, query, multi_index_mode="off"): """Builds a new ObjectQuery from an instance of the Object-based dialect syntax. diff --git a/hatchet/query/query.py b/hatchet/query/query.py index 39f17743..7f58bded 100644 --- a/hatchet/query/query.py +++ b/hatchet/query/query.py @@ -4,15 +4,21 @@ # SPDX-License-Identifier: MIT from .errors import InvalidQueryPath +from ..util.perf_measure import annotate + + +_query_annotate = annotate(fmt="Query.{}") class Query(object): """Class for representing and building Hatchet Call Path Queries""" + @_query_annotate def __init__(self): """Create new Query""" self.query_pattern = [] + @_query_annotate def match(self, quantifier=".", predicate=lambda row: True): """Start a query with a root node described by the arguments. @@ -28,6 +34,7 @@ def match(self, quantifier=".", predicate=lambda row: True): self._add_node(quantifier, predicate) return self + @_query_annotate def rel(self, quantifier=".", predicate=lambda row: True): """Add a new node to the end of the query. @@ -57,6 +64,7 @@ def relation(self, quantifer=".", predicate=lambda row: True): """ return self.rel(quantifer, predicate) + @_query_annotate def __len__(self): """Returns the length of the query.""" return len(self.query_pattern) diff --git a/hatchet/query/string_dialect.py b/hatchet/query/string_dialect.py index 791128fe..9ec9baf5 100644 --- a/hatchet/query/string_dialect.py +++ b/hatchet/query/string_dialect.py @@ -15,6 +15,10 @@ from .errors import InvalidQueryPath, InvalidQueryFilter, RedundantQueryFilterWarning from .query import Query +from ..util.perf_measure import annotate + + +_string_query_annotate = annotate(fmt="StringQuery.{}") # PEG grammar for the String-based dialect @@ -65,6 +69,7 @@ def cname(obj): return obj.__class__.__name__ +@annotate() def filter_check_types(type_check, df_row, filt_lambda): """Utility function used in String-based predicates to make sure the node data used in the actual boolean predicate @@ -97,6 +102,7 @@ def filter_check_types(type_check, df_row, filt_lambda): class StringQuery(Query): """Class for representing and parsing queries using the String-based dialect.""" + @_string_query_annotate def __init__(self, cypher_query, multi_index_mode="off"): """Builds a new StringQuery object representing a query in the String-based dialect. @@ -128,6 +134,7 @@ def __init__(self, cypher_query, multi_index_mode="off"): self._build_lambdas() self._build_query() + @_string_query_annotate def _build_query(self): """Builds the entire query using 'match' and 'rel' using the pre-parsed quantifiers and predicates. @@ -149,6 +156,7 @@ def _build_query(self): else: self.rel(quantifier=wcard, predicate=eval(filt_str)) + @_string_query_annotate def _build_lambdas(self): """Constructs the final predicate lambdas from the pre-parsed predicate information. @@ -175,6 +183,7 @@ def _build_lambdas(self): ) self.lambda_filters[i] = bool_expr + @_string_query_annotate def _parse_path(self, path_obj): """Parses the MATCH statement of a String-based query.""" nodes = path_obj.path.nodes @@ -188,6 +197,7 @@ def _parse_path(self, path_obj): self.wcard_pos[n.name] = idx idx += 1 + @_string_query_annotate def _parse_conditions(self, cond_expr): """Top level function for parsing the WHERE statement of a String-based query. @@ -722,9 +732,7 @@ def _parse_num_eq(self, obj): This condition will always be false. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -747,9 +755,7 @@ def _parse_num_eq(self, obj): This condition will always be false. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -798,9 +804,7 @@ def _parse_num_eq_multi_idx(self, obj): This condition will always be false. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -823,9 +827,7 @@ def _parse_num_eq_multi_idx(self, obj): This condition will always be false. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -872,9 +874,7 @@ def _parse_num_lt(self, obj): This condition will always be false. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -897,9 +897,7 @@ def _parse_num_lt(self, obj): This condition will always be false. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -941,9 +939,7 @@ def _parse_num_lt_multi_idx(self, obj): This condition will always be false. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -966,9 +962,7 @@ def _parse_num_lt_multi_idx(self, obj): This condition will always be false. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -1015,9 +1009,7 @@ def _parse_num_gt(self, obj): This condition will always be true. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -1040,9 +1032,7 @@ def _parse_num_gt(self, obj): This condition will always be true. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -1084,9 +1074,7 @@ def _parse_num_gt_multi_idx(self, obj): This condition will always be true. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -1109,9 +1097,7 @@ def _parse_num_gt_multi_idx(self, obj): This condition will always be true. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -1158,9 +1144,7 @@ def _parse_num_lte(self, obj): This condition will always be false. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -1183,9 +1167,7 @@ def _parse_num_lte(self, obj): This condition will always be false. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -1227,9 +1209,7 @@ def _parse_num_lte_multi_idx(self, obj): This condition will always be false. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -1252,9 +1232,7 @@ def _parse_num_lte_multi_idx(self, obj): This condition will always be false. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -1301,9 +1279,7 @@ def _parse_num_gte(self, obj): This condition will always be true. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -1326,9 +1302,7 @@ def _parse_num_gte(self, obj): This condition will always be true. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -1370,9 +1344,7 @@ def _parse_num_gte_multi_idx(self, obj): This condition will always be true. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -1395,9 +1367,7 @@ def _parse_num_gte_multi_idx(self, obj): This condition will always be true. The statement that triggered this warning is: {} - """.format( - obj - ), + """.format(obj), RedundantQueryFilterWarning, ) return [ @@ -1687,6 +1657,7 @@ def _parse_num_not_inf_multi_idx(self, obj): ] +@annotate() def parse_string_dialect(query_str, multi_index_mode="off"): """Parse all types of String-based queries, including multi-queries that leverage the curly brace delimiters. diff --git a/hatchet/readers/caliper_native_reader.py b/hatchet/readers/caliper_native_reader.py index e4c8edf5..b9f935cd 100644 --- a/hatchet/readers/caliper_native_reader.py +++ b/hatchet/readers/caliper_native_reader.py @@ -15,6 +15,10 @@ from hatchet.graph import Graph from hatchet.frame import Frame from hatchet.util.timer import Timer +from hatchet.util.perf_measure import annotate + + +_cali_native_reader_annotate = annotate(fmt="CaliperNativeReader.{}") def __raise_cali_type_error(msg): @@ -45,6 +49,7 @@ class CaliperNativeReader: ), } + @_cali_native_reader_annotate def __init__(self, filename_or_caliperreader, native, string_attributes): """Read in a native cali using Caliper's python reader. @@ -81,6 +86,7 @@ def __init__(self, filename_or_caliperreader, native, string_attributes): if isinstance(self.string_attributes, str): self.string_attributes = [self.string_attributes] + @_cali_native_reader_annotate def _create_metric_df(self, metrics): """Make a list of metric columns and create a dataframe, group by node""" for col in self.record_data_cols: @@ -90,6 +96,7 @@ def _create_metric_df(self, metrics): df_new = df_metrics.groupby(df_metrics["nid"]).aggregate("first").reset_index() return df_new + @_cali_native_reader_annotate def _reset_metrics(self, metrics): """Since the initial functions (i.e. main) are only called once, this keeps a small subset of the timeseries data and resets the rest so future iterations will be filled with nans @@ -106,6 +113,7 @@ def _reset_metrics(self, metrics): new_mets.append({k: node_dict.get(k, np.nan) for k in cols_to_keep}) return new_mets + @_cali_native_reader_annotate def read_metrics(self, ctx="path"): """append each metrics table to a list and return the list, split on timeseries_level if exists""" metric_dfs = [] @@ -192,6 +200,7 @@ def read_metrics(self, ctx="path"): # will return a list with only one element unless it is a timeseries return metric_dfs + @_cali_native_reader_annotate def create_graph(self, ctx="path"): list_roots = [] @@ -347,6 +356,7 @@ def _create_parent(child_node, parent_callpath): return list_roots + @_cali_native_reader_annotate def _parse_metadata(self, mdata): """Convert Caliper Metadata values into correct Python objects. @@ -385,6 +395,7 @@ def _parse_metadata(self, mdata): parsed_mdata[k] = v return parsed_mdata + @_cali_native_reader_annotate def read(self): """Read the caliper records to extract the calling context tree.""" if isinstance(self.filename_or_caliperreader, str): @@ -414,7 +425,6 @@ def read(self): # If not a timeseries there will just be one element in the list for df_fixed_data in metrics_list: - metrics = pd.DataFrame.from_dict(data=df_fixed_data) # add missing intermediate nodes to the df_fixed_data dataframe @@ -571,6 +581,7 @@ def read(self): # othewise we'll have populated the timeseries list of gfs attribute and can ignore the return value return self.gf_list[0] + @_cali_native_reader_annotate def read_timeseries(self, level="loop.start_iteration"): """Read in a timeseries Cali file. We need to intercept the read function so we can get a list of profiles for thicket diff --git a/hatchet/readers/caliper_reader.py b/hatchet/readers/caliper_reader.py index 4d2dacc8..3fc077ac 100644 --- a/hatchet/readers/caliper_reader.py +++ b/hatchet/readers/caliper_reader.py @@ -19,13 +19,18 @@ from hatchet.frame import Frame from hatchet.util.timer import Timer from hatchet.util.executable import which +from hatchet.util.perf_measure import annotate unknown_label_counter = 0 +_cali_reader_annotate = annotate(fmt="CaliperReader.{}") + + class CaliperReader: """Read in a Caliper file (`cali` or split JSON) or file-like object.""" + @_cali_reader_annotate def __init__(self, filename_or_stream, query=""): """Read from Caliper files (`cali` or split JSON). @@ -55,6 +60,7 @@ def __init__(self, filename_or_stream, query=""): if isinstance(self.filename_or_stream, str): _, self.filename_ext = os.path.splitext(filename_or_stream) + @_cali_reader_annotate def read_json_sections(self): # if cali-query exists, extract data from .cali to a file-like object if self.filename_ext == ".cali": @@ -140,6 +146,7 @@ def read_json_sections(self): if self.json_cols[idx] != "rank" and item["is_value"] is True: self.metric_columns.append(self.json_cols[idx]) + @_cali_reader_annotate def create_graph(self): list_roots = [] @@ -189,6 +196,7 @@ def create_graph(self): return list_roots + @_cali_reader_annotate def read(self): """Read the caliper JSON file to extract the calling context tree.""" with self.timer.phase("read json"): diff --git a/hatchet/readers/cprofile_reader.py b/hatchet/readers/cprofile_reader.py index 3b3f5caa..481534e4 100644 --- a/hatchet/readers/cprofile_reader.py +++ b/hatchet/readers/cprofile_reader.py @@ -12,6 +12,10 @@ from hatchet.node import Node from hatchet.graph import Graph from hatchet.frame import Frame +from hatchet.util.perf_measure import annotate + + +_cprof_annotate = annotate(fmt="CProfileReader.{}") def print_incomptable_msg(stats_file): @@ -44,12 +48,14 @@ class NameData: class CProfileReader: + @_cprof_annotate def __init__(self, filename): self.pstats_file = filename self.name_to_hnode = {} self.name_to_dict = {} + @_cprof_annotate def _create_node_and_row(self, fn_data, fn_name, stats_dict): """ Description: Takes a profiled function as specified in a pstats file @@ -74,10 +80,12 @@ def _create_node_and_row(self, fn_data, fn_name, stats_dict): return fn_hnode + @_cprof_annotate def _get_src(self, stat): """Gets the source/parent of our current desitnation node""" return stat[StatData.SRCNODE] + @_cprof_annotate def _add_node_metadata(self, stat_name, stat_module_data, stats, hnode): """Puts all the metadata associated with a node in a dictionary to insert into pandas.""" node_dict = { @@ -92,6 +100,7 @@ def _add_node_metadata(self, stat_name, stat_module_data, stats, hnode): } self.name_to_dict[stat_name] = node_dict + @_cprof_annotate def create_graph(self): """Performs the creation of our node graph""" try: @@ -124,6 +133,7 @@ def create_graph(self): return list_roots + @_cprof_annotate def read(self): roots = self.create_graph() graph = Graph(roots) diff --git a/hatchet/readers/dataframe_reader.py b/hatchet/readers/dataframe_reader.py index 7b298c22..0fb32165 100644 --- a/hatchet/readers/dataframe_reader.py +++ b/hatchet/readers/dataframe_reader.py @@ -6,6 +6,7 @@ import hatchet.graphframe from hatchet.node import Node from hatchet.graph import Graph +from hatchet.util.perf_measure import annotate from abc import abstractmethod @@ -21,6 +22,10 @@ ABC = ABCMeta("ABC", (object,), {"__slots__": ()}) +_dataframe_reader_annotate = annotate(fmt="DataframeReader.{}") + + +@annotate() def _get_node_from_df_iloc(df, ind): node = None if isinstance(df.iloc[ind].name, tuple): @@ -34,6 +39,7 @@ def _get_node_from_df_iloc(df, ind): return node +@annotate() def _get_parents_and_children(df): rel_dict = {} for i in range(len(df)): @@ -45,6 +51,7 @@ def _get_parents_and_children(df): return rel_dict +@annotate() def _reconstruct_graph(df, rel_dict): node_list = sorted(list(df.index.to_frame()["node"])) for i in range(len(df)): @@ -60,6 +67,7 @@ def _reconstruct_graph(df, rel_dict): class DataframeReader(ABC): """Abstract Base Class for reading in checkpointing files.""" + @_dataframe_reader_annotate def __init__(self, filename): self.filename = filename @@ -67,6 +75,7 @@ def __init__(self, filename): def _read_dataframe_from_file(self, **kwargs): pass + @_dataframe_reader_annotate def read(self, **kwargs): df = self._read_dataframe_from_file(**kwargs) rel_dict = _get_parents_and_children(df) diff --git a/hatchet/readers/gprof_dot_reader.py b/hatchet/readers/gprof_dot_reader.py index 1ac4c502..5b754ba3 100644 --- a/hatchet/readers/gprof_dot_reader.py +++ b/hatchet/readers/gprof_dot_reader.py @@ -14,11 +14,16 @@ from ..frame import Frame from ..util.timer import Timer from ..util.config import dot_keywords +from ..util.perf_measure import annotate + + +_gprofdot_reader_annotate = annotate(fmt="GprofDotReader.{}") class GprofDotReader: """Read in gprof/callgrind output in dot format generated by gprof2dot.""" + @_gprofdot_reader_annotate def __init__(self, filename): self.dotfile = filename @@ -27,6 +32,7 @@ def __init__(self, filename): self.timer = Timer() + @_gprofdot_reader_annotate def create_graph(self): """Read the DOT files to create a graph.""" graphs = pydot.graph_from_dot_file(self.dotfile, encoding="utf-8") @@ -97,6 +103,7 @@ def create_graph(self): return list_roots + @_gprofdot_reader_annotate def read(self): """Read the DOT file generated by gprof2dot to create a graphframe. The DOT file contains a call graph. diff --git a/hatchet/readers/hdf5_reader.py b/hatchet/readers/hdf5_reader.py index b9a48544..1e63d8b9 100644 --- a/hatchet/readers/hdf5_reader.py +++ b/hatchet/readers/hdf5_reader.py @@ -7,9 +7,14 @@ import pandas as pd import sys from .dataframe_reader import DataframeReader +from ..util.perf_measure import annotate + + +_hdf5_reader_annotate = annotate(fmt="HDF5Reader.{}") class HDF5Reader(DataframeReader): + @_hdf5_reader_annotate def __init__(self, filename): # TODO Remove Arguments when Python 2.7 support is dropped if sys.version_info[0] == 2: @@ -17,6 +22,7 @@ def __init__(self, filename): else: super().__init__(filename) + @_hdf5_reader_annotate def _read_dataframe_from_file(self, **kwargs): df = None with warnings.catch_warnings(): diff --git a/hatchet/readers/hpctoolkit_reader.py b/hatchet/readers/hpctoolkit_reader.py index a0e0dae9..9fa3924d 100644 --- a/hatchet/readers/hpctoolkit_reader.py +++ b/hatchet/readers/hpctoolkit_reader.py @@ -37,17 +37,23 @@ from hatchet.graph import Graph from hatchet.util.timer import Timer from hatchet.frame import Frame +from hatchet.util.perf_measure import annotate src_file = 0 +_hpctk_reader_mod_annotate = annotate(fmt="hpctoolkit_reader.{}") +_hpctk_reader_annotate = annotate(fmt="HPCToolkitReader.{}") + + def init_shared_array(buf_): """Initialize shared array.""" global shared_metrics shared_metrics = buf_ +@_hpctk_reader_mod_annotate def read_metricdb_file(args): """Read a single metricdb file into a 1D array.""" ( @@ -93,6 +99,7 @@ class HPCToolkitReader: metric-db files. """ + @_hpctk_reader_annotate def __init__(self, dir_name): # this is the name of the HPCToolkit database directory. The directory # contains an experiment.xml and some metric-db files @@ -147,6 +154,7 @@ def __init__(self, dir_name): self.timer = Timer() + @_hpctk_reader_annotate def fill_tables(self): """Read certain sections of the experiment.xml file to create dicts of load modules, src_files, procedure_names, and metric_names. @@ -171,6 +179,7 @@ def fill_tables(self): self.metric_names, ) + @_hpctk_reader_annotate def read_all_metricdb_files(self): """Read all the metric-db files and create a dataframe with num_nodes X num_metricdb_files rows and num_metrics columns. Three additional columns @@ -234,6 +243,7 @@ def read_all_metricdb_files(self): # subtract_exclusive_metric_vals/ num nodes is already calculated self.total_execution_threads = self.num_threads_per_rank * self.num_ranks + @_hpctk_reader_annotate def read(self): """Read the experiment.xml file to extract the calling context tree and create a dataframe out of it. Then merge the two dataframes to create the final @@ -318,6 +328,7 @@ def read(self): return hatchet.graphframe.GraphFrame(graph, dataframe, exc_metrics, inc_metrics) + @_hpctk_reader_annotate def parse_xml_children(self, xml_node, hnode): """Parses all children of an XML node.""" for xml_child in xml_node: @@ -326,6 +337,7 @@ def parse_xml_children(self, xml_node, hnode): line = int(xml_node.get("l")) self.parse_xml_node(xml_child, nid, line, hnode) + @_hpctk_reader_annotate def parse_xml_node(self, xml_node, parent_nid, parent_line, hparent): """Parses an XML node and its children recursively.""" nid = int(xml_node.get("i")) @@ -414,6 +426,7 @@ def parse_xml_node(self, xml_node, parent_nid, parent_line, hparent): hparent.add_child(hnode) self.parse_xml_children(xml_node, hnode) + @_hpctk_reader_annotate def create_node_dict(self, nid, hnode, name, node_type, src_file, line, module): """Create a dict with all the node attributes.""" node_dict = { @@ -428,6 +441,7 @@ def create_node_dict(self, nid, hnode, name, node_type, src_file, line, module): return node_dict + @_hpctk_reader_annotate def count_cpu_threads_per_rank(self): metricdb_files = glob.glob(self.dir_name + "/*.metric-db") cpu_thread_ids = set() diff --git a/hatchet/readers/hpctoolkit_reader_latest.py b/hatchet/readers/hpctoolkit_reader_latest.py index 237d47dd..7467b6fd 100644 --- a/hatchet/readers/hpctoolkit_reader_latest.py +++ b/hatchet/readers/hpctoolkit_reader_latest.py @@ -14,8 +14,14 @@ from hatchet.graph import Graph from hatchet.graphframe import GraphFrame from hatchet.node import Node +from hatchet.util.perf_measure import annotate +_hpctk_reader_latest_mod_annotate = annotate(fmt="hpctoolkit_reader_latest.{}") +_hpctk_reader_latest_annotate = annotate(fmt="HPCToolkitReaderLatest.{}") + + +@_hpctk_reader_latest_mod_annotate def safe_unpack( format: str, data: bytes, offset: int, index: int = None, index_length: int = None ) -> tuple: @@ -25,6 +31,7 @@ def safe_unpack( return struct.unpack(format, data[offset : offset + length]) +@_hpctk_reader_latest_mod_annotate def read_string(data: bytes, offset: int) -> str: result = "" while True: @@ -49,7 +56,7 @@ def read_string(data: bytes, offset: int) -> str: class HPCToolkitReaderLatest: - + @_hpctk_reader_latest_annotate def __init__( self, dir_path: str, @@ -99,6 +106,7 @@ def __init__( if self._profile_file is None: raise ValueError("ERROR: profile.db not found.") + @_hpctk_reader_latest_annotate def _read_metric_descriptions(self) -> None: with open(self._meta_file, "rb") as file: file.seek(FILE_HEADER_OFFSET + 4 * 8) @@ -146,6 +154,7 @@ def _read_metric_descriptions(self) -> None: self._metric_descriptions[propMetricId] = metric_full_name + @_hpctk_reader_latest_annotate def _parse_source_file(self, meta_db: bytes, pFile: int) -> Dict[str, str]: if pFile not in self._source_files: (pPath,) = safe_unpack( @@ -160,6 +169,7 @@ def _parse_source_file(self, meta_db: bytes, pFile: int) -> Dict[str, str]: return self._source_files[pFile] + @_hpctk_reader_latest_annotate def _parse_load_module(self, meta_db: bytes, pModule: int) -> Dict[str, str]: if pModule not in self._load_modules: (pPath,) = safe_unpack( @@ -174,6 +184,7 @@ def _parse_load_module(self, meta_db: bytes, pModule: int) -> Dict[str, str]: return self._load_modules[pModule] + @_hpctk_reader_latest_annotate def _parse_function( self, meta_db: bytes, pFunction: int ) -> Dict[str, Union[str, int]]: @@ -216,6 +227,7 @@ def _parse_function( return self._functions[pFunction] + @_hpctk_reader_latest_annotate def _store_cct_node( self, ctxId: int, frame: dict, parent: Node = None, depth: int = 0 ) -> Node: @@ -228,9 +240,7 @@ def _store_cct_node( "node": node, "name": ( # f"{frame['type']}: {frame['name']}" - frame["name"] - if frame["name"] != 1 - else "entry" + frame["name"] if frame["name"] != 1 else "entry" ), } @@ -241,6 +251,7 @@ def _store_cct_node( return node + @_hpctk_reader_latest_annotate def _parse_context( self, current_offset: int, @@ -249,7 +260,6 @@ def _parse_context( meta_db: bytes, parent_time: int, ) -> None: - final_offset = current_offset + total_size while current_offset < final_offset: @@ -308,10 +318,10 @@ def _parse_context( my_time, ) + @_hpctk_reader_latest_annotate def _read_summary_profile( self, ) -> None: - with open(self._profile_file, "rb") as file: file.seek(FILE_HEADER_OFFSET) formatProfileInfos = " None: @@ -426,6 +437,7 @@ def _read_cct( print("DATA IMPORTED") return graphframe + @_hpctk_reader_latest_annotate def read(self) -> GraphFrame: self._read_metric_descriptions() self._read_summary_profile() diff --git a/hatchet/readers/json_reader.py b/hatchet/readers/json_reader.py index 407536ba..fc4f8c62 100644 --- a/hatchet/readers/json_reader.py +++ b/hatchet/readers/json_reader.py @@ -11,6 +11,10 @@ from hatchet.node import Node from hatchet.graph import Graph from hatchet.frame import Frame +from hatchet.util.perf_measure import annotate + + +_json_reader_annotate = annotate(fmt="JsonReader.{}") class JsonReader: @@ -20,6 +24,7 @@ class JsonReader: (GraphFrame): graphframe containing data from dictionaries """ + @_json_reader_annotate def __init__(self, json_spec): """Read from a json string specification of a graphframe @@ -27,6 +32,7 @@ def __init__(self, json_spec): """ self.spec_dict = json.loads(json_spec) + @_json_reader_annotate def read(self): roots = [] for graph_spec in self.spec_dict["graph"]: diff --git a/hatchet/readers/literal_reader.py b/hatchet/readers/literal_reader.py index c3c95a01..cf79065f 100644 --- a/hatchet/readers/literal_reader.py +++ b/hatchet/readers/literal_reader.py @@ -9,6 +9,10 @@ from hatchet.node import Node from hatchet.graph import Graph from hatchet.frame import Frame +from hatchet.util.perf_measure import annotate + + +_literal_reader_annotate = annotate(fmt="LiteralReader.{}") class LiteralReader: @@ -59,6 +63,7 @@ class LiteralReader: (GraphFrame): graphframe containing data from dictionaries """ + @_literal_reader_annotate def __init__(self, graph_dict): """Read from list of dictionaries. @@ -66,6 +71,7 @@ def __init__(self, graph_dict): """ self.graph_dict = graph_dict + @_literal_reader_annotate def parse_node_literal( self, frame_to_node_dict, node_dicts, child_dict, hparent, seen_nids ): @@ -110,6 +116,7 @@ def parse_node_literal( frame_to_node_dict, node_dicts, child, hnode, seen_nids ) + @_literal_reader_annotate def read(self): list_roots = [] node_dicts = [] diff --git a/hatchet/readers/pyinstrument_reader.py b/hatchet/readers/pyinstrument_reader.py index 3ce00400..34d968e1 100644 --- a/hatchet/readers/pyinstrument_reader.py +++ b/hatchet/readers/pyinstrument_reader.py @@ -11,15 +11,21 @@ from hatchet.node import Node from hatchet.graph import Graph from hatchet.frame import Frame +from hatchet.util.perf_measure import annotate + + +_pyinstrument_reader_annotate = annotate(fmt="PyinstrumentReader.{}") class PyinstrumentReader: + @_pyinstrument_reader_annotate def __init__(self, filename): self.pyinstrument_json_filename = filename self.graph_dict = {} self.list_roots = [] self.node_dicts = [] + @_pyinstrument_reader_annotate def create_graph(self): def parse_node_literal(child_dict, hparent): """Create node_dict for one node and then call the function @@ -85,6 +91,7 @@ def parse_node_literal(child_dict, hparent): return graph + @_pyinstrument_reader_annotate def read(self): with open(self.pyinstrument_json_filename) as pyinstrument_json: self.graph_dict = json.load(pyinstrument_json) diff --git a/hatchet/readers/spotdb_reader.py b/hatchet/readers/spotdb_reader.py index 3913e1d3..e554da15 100644 --- a/hatchet/readers/spotdb_reader.py +++ b/hatchet/readers/spotdb_reader.py @@ -10,6 +10,10 @@ from hatchet.graph import Graph from hatchet.frame import Frame from hatchet.util.timer import Timer +from hatchet.util.perf_measure import annotate + +_spot_dset_reader_annotate = annotate(fmt="SpotDatasetReader.{}") +_spot_db_reader_annotate = annotate(fmt="SpotDBReader.{}") def _find_child_node(node, name): @@ -23,6 +27,7 @@ def _find_child_node(node, name): class SpotDatasetReader: """Reads a (single-run) dataset from SpotDB""" + @_spot_dset_reader_annotate def __init__(self, regionprofile, metadata, attr_info): """Initialize SpotDataset reader @@ -49,6 +54,7 @@ def __init__(self, regionprofile, metadata, attr_info): self.timer = Timer() + @_spot_dset_reader_annotate def create_graph(self): """Create the graph. Fills in df_data and metric_columns.""" @@ -81,6 +87,7 @@ def create_graph(self): self.df_data.append(dict({"name": name, "node": node}, **metrics)) + @_spot_dset_reader_annotate def read(self, default_metric="Total time (inc)"): """Create GraphFrame for the given Spot dataset.""" @@ -116,6 +123,7 @@ def read(self, default_metric="Total time (inc)"): default_metric=default_metric, ) + @_spot_dset_reader_annotate def _create_node(self, path): parent = self.roots.get(path[0], None) if parent is None: @@ -136,6 +144,7 @@ def _create_node(self, path): class SpotDBReader: """Import multiple runs as graph frames from a SpotDB instance""" + @_spot_db_reader_annotate def __init__(self, db_key, list_of_ids=None, default_metric="Total time (inc)"): """Initialize SpotDBReader @@ -156,6 +165,7 @@ def __init__(self, db_key, list_of_ids=None, default_metric="Total time (inc)"): self.list_of_ids = list_of_ids self.default_metric = default_metric + @_spot_db_reader_annotate def read(self): """Read given runs from SpotDB diff --git a/hatchet/readers/tau_reader.py b/hatchet/readers/tau_reader.py index aed2af11..4fc889ae 100644 --- a/hatchet/readers/tau_reader.py +++ b/hatchet/readers/tau_reader.py @@ -11,11 +11,16 @@ from hatchet.node import Node from hatchet.graph import Graph from hatchet.frame import Frame +from hatchet.util.perf_measure import annotate + + +_tau_reader_annotate = annotate(fmt="TAUReader.{}") class TAUReader: """Read in a profile generated using TAU.""" + @_tau_reader_annotate def __init__(self, dirname): self.dirname = dirname self.node_dicts = [] @@ -28,6 +33,7 @@ def __init__(self, dirname): self.multiple_ranks = False self.multiple_threads = False + @_tau_reader_annotate def create_node_dict( self, node, @@ -55,6 +61,7 @@ def create_node_dict( node_dict[columns[i + 1]] = metric_values[i] return node_dict + @_tau_reader_annotate def create_graph(self): def _get_name_file_module(is_parent, node_info, symbol): """This function gets the name, file and module information @@ -443,6 +450,7 @@ def _construct_column_list(first_rank_filenames): return list_roots + @_tau_reader_annotate def read(self): """Read the TAU profile file to extract the calling context tree.""" # Add all nodes and roots. diff --git a/hatchet/readers/timemory_reader.py b/hatchet/readers/timemory_reader.py index 4e0a7f70..2182c6ac 100644 --- a/hatchet/readers/timemory_reader.py +++ b/hatchet/readers/timemory_reader.py @@ -12,11 +12,16 @@ from ..graph import Graph from ..frame import Frame from ..util.timer import Timer +from ..util.perf_measure import annotate + + +_timemory_reader_annotate = annotate(fmt="TimemoryReader.{}") class TimemoryReader: """Read in timemory JSON output""" + @_timemory_reader_annotate def __init__(self, input, select=None, **_kwargs): """Arguments: input (str or file-stream or dict or None): @@ -77,6 +82,7 @@ def _get_select(val): else: raise TypeError("select must be None or list of string") + @_timemory_reader_annotate def create_graph(self): """Create graph and dataframe""" list_roots = [] @@ -538,7 +544,6 @@ def read_properties(properties, _metric_name, _metric_data): if self.multiple_ranks or self.multiple_threads: dataframe = dataframe.unstack() for idx, row in dataframe.iterrows(): - # There is always a valid name for an index. # Take that valid name and assign to other ranks/rows. name = row["name"][row["name"].first_valid_index()] @@ -564,6 +569,7 @@ def read_properties(properties, _metric_name, _metric_data): graph, dataframe, exc_metrics, inc_metrics, self.default_metric ) + @_timemory_reader_annotate def read(self): """Read timemory json.""" diff --git a/hatchet/tests/query_compat.py b/hatchet/tests/query_compat.py index d8d0b720..11db4b5c 100644 --- a/hatchet/tests/query_compat.py +++ b/hatchet/tests/query_compat.py @@ -1,4 +1,4 @@ -# Copyright 2017-2022 Lawrence Livermore National Security, LLC and other +# Copyright 2017-2023 Lawrence Livermore National Security, LLC and other # Hatchet Project Developers. See the top-level LICENSE file for details. # # SPDX-License-Identifier: MIT diff --git a/hatchet/util/dot.py b/hatchet/util/dot.py index 84cd62a8..6fa35b3f 100644 --- a/hatchet/util/dot.py +++ b/hatchet/util/dot.py @@ -6,7 +6,10 @@ import matplotlib.cm import matplotlib.colors +from .perf_measure import annotate + +@annotate() def trees_to_dot(roots, dataframe, metric, name, rank, thread, threshold): """Calls to_dot in turn for each tree in the graph/forest.""" text = ( @@ -33,6 +36,7 @@ def trees_to_dot(roots, dataframe, metric, name, rank, thread, threshold): return text +@annotate() def to_dot(hnode, dataframe, metric, name, rank, thread, threshold, visited): """Write to graphviz dot format.""" colormap = matplotlib.cm.Reds @@ -40,7 +44,6 @@ def to_dot(hnode, dataframe, metric, name, rank, thread, threshold, visited): max_time = dataframe[metric].max() def add_nodes_and_edges(hnode): - # set dataframe index based on if rank is a part of the index if "rank" in dataframe.index.names and "thread" in dataframe.index.names: df_index = (hnode, rank, thread) diff --git a/hatchet/util/perf_measure.py b/hatchet/util/perf_measure.py new file mode 100644 index 00000000..b63baf1b --- /dev/null +++ b/hatchet/util/perf_measure.py @@ -0,0 +1,65 @@ +# Copyright 2017-2023 Lawrence Livermore National Security, LLC and other +# Hatchet Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT + +import os + +HATCHET_PERF_ENV_VAR = "HATCHET_PERF_PLUGIN" + +_HATCHET_PERF_PLUGIN = "none" +_HATCHET_PERF_ENABLED = False +if HATCHET_PERF_ENV_VAR in os.environ: + if os.environ[HATCHET_PERF_ENV_VAR].lower() == "caliper": + try: + from pycaliper import annotate_function + from pycaliper.instrumentation import begin_region, end_region + + _HATCHET_PERF_PLUGIN = "caliper" + _HATCHET_PERF_ENABLED = True + except Exception: + print("User requested Caliper annotations, but could not import Caliper") + elif os.environ[HATCHET_PERF_ENV_VAR].lower() == "perfflowaspect": + try: + import perfflowaspect.aspect + + _HATCHET_PERF_PLUGIN = "perfflowaspect" + _HATCHET_PERF_ENABLED = True + except Exception: + print("User requested PerfFlow Aspect annotations, but could not import Caliper") + else: + print("'{}' is an invalid value for {}. Not enabling performance annotations".format(os.environ[HATCHET_PERF_ENV_VAR], HATCHET_PERF_ENV_VAR)) + + +def annotate(name=None, fmt=None): + def inner_decorator(func): + if not _HATCHET_PERF_ENABLED: + return func + else: + real_name = name + if name is None or name == "": + real_name = func.__name__ + if fmt is not None and fmt != "": + real_name = fmt.format(real_name) + if _HATCHET_PERF_PLUGIN == "caliper": + return annotate_function(name=real_name)(func) + elif _HATCHET_PERF_PLUGIN == "perfflowaspect": + return perfflowaspect.aspect.critical_path(pointcut="around")(func) + else: + return func + + return inner_decorator + + +def begin_code_region(name): + if _HATCHET_PERF_ENABLED: + if _HATCHET_PERF_PLUGIN == "caliper": + begin_region(name) + return + + +def end_code_region(name): + if _HATCHET_PERF_ENABLED: + if _HATCHET_PERF_PLUGIN == "caliper": + end_region(name) + return diff --git a/hatchet/writers/dataframe_writer.py b/hatchet/writers/dataframe_writer.py index b23c6172..ee631e79 100644 --- a/hatchet/writers/dataframe_writer.py +++ b/hatchet/writers/dataframe_writer.py @@ -4,6 +4,7 @@ # SPDX-License-Identifier: MIT from hatchet.node import Node +from hatchet.util.perf_measure import annotate from abc import abstractmethod @@ -19,6 +20,11 @@ ABC = ABCMeta("ABC", (object,), {"__slots__": ()}) +_dataframe_writer_mod_annotate = annotate(fmt="dataframe_writer.{}") +_dataframe_writer_annotate = annotate(fmt="DataframeWriter.{}") + + +@_dataframe_writer_mod_annotate def _get_node_from_df_iloc(df, ind): node = None if isinstance(df.iloc[ind].name, tuple): @@ -32,6 +38,7 @@ def _get_node_from_df_iloc(df, ind): return node +@_dataframe_writer_mod_annotate def _fill_children_and_parents(dump_df): dump_df["children"] = [[] for _ in range(len(dump_df))] dump_df["parents"] = [[] for _ in range(len(dump_df))] @@ -49,6 +56,7 @@ def _fill_children_and_parents(dump_df): class DataframeWriter(ABC): + @_dataframe_writer_annotate def __init__(self, filename): self.filename = filename @@ -56,6 +64,7 @@ def __init__(self, filename): def _write_dataframe_to_file(self, df, **kwargs): pass + @_dataframe_writer_annotate def write(self, gf, **kwargs): gf_cpy = gf.deepcopy() dump_df = _fill_children_and_parents(gf_cpy.dataframe) diff --git a/hatchet/writers/hdf5_writer.py b/hatchet/writers/hdf5_writer.py index 81489015..40e692a3 100644 --- a/hatchet/writers/hdf5_writer.py +++ b/hatchet/writers/hdf5_writer.py @@ -7,15 +7,18 @@ import sys from .dataframe_writer import DataframeWriter +from ..util.perf_measure import annotate class HDF5Writer(DataframeWriter): + @annotate(fmt="HDF5Writer.{}") def __init__(self, filename): if sys.version_info[0] == 2: super(HDF5Writer, self).__init__(filename) else: super().__init__(filename) + @annotate(fmt="HDF5Writer.{}") def _write_dataframe_to_file(self, df, **kwargs): if "key" not in kwargs: raise KeyError("Writing to HDF5 requires a user-supplied key")