Skip to content

Commit

Permalink
remove Engine from public Schema api
Browse files Browse the repository at this point in the history
- hide engine under the Schema
- add proper type annotations to Engine - make it generic over Executor
- add proper type annotations to Schema - make it generic over executor. Now it understands when Engine.execute returns awaitable
- use mutation graph if query ordered
- add deny_sync param to AsyncIOExecutor and make it work with sync resolvers by default. If user wants to deny sync resolvers it can pass deny_sync=True
  • Loading branch information
m.kindritskiy committed Aug 5, 2024
1 parent 168fe40 commit ee4f23e
Show file tree
Hide file tree
Showing 31 changed files with 319 additions and 235 deletions.
12 changes: 2 additions & 10 deletions docs/federation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ Now let's implement the Order service using Hiku:
from flask import Flask, request, jsonify
from hiku.graph import Graph, Root, Field, Link, Node, Option
from hiku.types import ID, Integer, TypeRef, String, Optional, Sequence
from hiku.engine import Engine
from hiku.executors.sync import SyncExecutor
from hiku.federation.schema import Schema
from hiku.federation.directives import Key
Expand All @@ -104,10 +103,7 @@ Now let's implement the Order service using Hiku:
app = Flask(__name__)
schema = Schema(
Engine(SyncExecutor()),
QUERY_GRAPH,
)
schema = Schema(SyncExecutor(), QUERY_GRAPH)
@app.route('/graphql', methods={'POST'})
def handle_graphql():
Expand All @@ -130,7 +126,6 @@ Next, let's implement the ShoppingCart service using Hiku:
from flask import Flask, request, jsonify
from hiku.graph import Graph, Root, Field, Link, Node, Option
from hiku.types import ID, Integer, TypeRef, String, Optional, Sequence
from hiku.engine import Engine
from hiku.executors.sync import SyncExecutor
from hiku.federation.schema import Schema
from hiku.federation.directives import Key
Expand Down Expand Up @@ -165,10 +160,7 @@ Next, let's implement the ShoppingCart service using Hiku:
app = Flask(__name__)
schema = Schema(
Engine(SyncExecutor()),
QUERY_GRAPH,
)
schema = Schema(SyncExecutor(), QUERY_GRAPH)
@app.route('/graphql', methods={'POST'})
def handle_graphql():
Expand Down
3 changes: 1 addition & 2 deletions examples/federation-compatibility/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
schema_directive,
)
from hiku.federation.graph import Graph, FederatedNode
from hiku.engine import Engine
from hiku.graph import (
Nothing,
Root,
Expand Down Expand Up @@ -570,7 +569,7 @@ class Custom(FederationSchemaDirective):
app = Flask(__name__)

schema = Schema(
Engine(SyncExecutor()),
SyncExecutor(),
QUERY_GRAPH,
)

Expand Down
3 changes: 1 addition & 2 deletions examples/graphql_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
Extends,
)
from hiku.federation.schema import Schema
from hiku.engine import Engine
from hiku.graph import (
Root,
Field,
Expand Down Expand Up @@ -129,7 +128,7 @@ def direct_link(ids):
app = Flask(__name__)

schema = Schema(
Engine(SyncExecutor()),
SyncExecutor(),
QUERY_GRAPH,
federation_version=1,
)
Expand Down
6 changes: 1 addition & 5 deletions examples/graphql_federation_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from hiku.federation.directive import Key
from hiku.federation.graph import Graph, FederatedNode
from hiku.federation.schema import Schema
from hiku.engine import Engine
from hiku.graph import (
Root,
Field,
Expand Down Expand Up @@ -136,10 +135,7 @@ def resolve_cart_item_reference(representations):

app = Flask(__name__)

schema = Schema(
Engine(SyncExecutor()),
QUERY_GRAPH,
)
schema = Schema(SyncExecutor(), QUERY_GRAPH)


@app.route('/graphql', methods={'POST'})
Expand Down
6 changes: 5 additions & 1 deletion hiku/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ def operation_type_name(self) -> str:

@property
def graph(self) -> Graph:
"""If operation type isn't specified, returns query graph."""
if self.operation is None:
# if query ordered we need to execute it on mutation graph
if self.query and self.query.ordered:
if self.mutation_graph:
return self.mutation_graph

assert self.query_graph is not None
return self.query_graph

Expand Down
89 changes: 40 additions & 49 deletions hiku/engine.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import abc
import contextlib
import inspect
import warnings
import dataclasses

from typing import (
Any,
Generic,
TypeVar,
Callable,
cast,
Expand All @@ -17,7 +17,7 @@
NoReturn,
Optional,
DefaultDict,
Awaitable,
overload,
)
from functools import partial
from itertools import chain, repeat
Expand All @@ -30,9 +30,12 @@
CacheSettings,
)
from .compat import Concatenate, ParamSpec
from .context import ExecutionContext, create_execution_context
from .executors.base import SyncAsyncExecutor
from .operation import Operation, OperationType
from .context import ExecutionContext
from .executors.base import (
BaseAsyncExecutor,
BaseSyncExecutor,
SyncAsyncExecutor,
)
from .query import (
Fragment,
Node as QueryNode,
Expand Down Expand Up @@ -993,56 +996,23 @@ def __getitem__(self, item: Any) -> Any:
)


class BaseEngine(abc.ABC):
_ExecutorType = TypeVar("_ExecutorType", bound=SyncAsyncExecutor)


class Engine(Generic[_ExecutorType]):
executor: _ExecutorType

def __init__(
self,
executor: SyncAsyncExecutor,
executor: _ExecutorType,
cache: Optional[CacheSettings] = None,
) -> None:
self.executor = executor
self.cache_settings = cache

@abc.abstractmethod
def execute_context(
self,
execution_context: ExecutionContext,
) -> Union[Proxy, Awaitable[Proxy]]:
...

def execute(
self,
graph: Graph,
query: QueryNode,
ctx: Optional[Dict] = None,
) -> Union[Proxy, Awaitable[Proxy]]:
execution_context = create_execution_context(
query,
query_graph=graph,
context=ctx,
operation=Operation(OperationType.QUERY, query, None),
)
return self.execute_context(execution_context)

def execute_mutation(
self,
graph: Graph,
query: QueryNode,
ctx: Optional[Dict] = None,
) -> Union[Proxy, Awaitable[Proxy]]:
execution_context = create_execution_context(
query,
mutation_graph=graph,
context=ctx,
operation=Operation(OperationType.MUTATION, query, None),
)
return self.execute_context(execution_context)


class Engine(BaseEngine):
def execute_context(
self,
execution_context: ExecutionContext,
) -> Union[Proxy, Awaitable[Proxy]]:
def _prepare_workflow(
self, execution_context: ExecutionContext
) -> Tuple[Queue, Query]:
graph = execution_context.graph
query = execution_context.query
ctx = execution_context.context
Expand All @@ -1064,4 +1034,25 @@ def execute_context(
queue, task_set, graph, query, Context(ctx), cache
)
query_workflow.start()
return self.executor.process(queue, query_workflow)
return queue, query_workflow

@overload
async def execute(
self: "Engine[BaseAsyncExecutor]",
execution_context: ExecutionContext,
) -> Proxy:
...

@overload
def execute(
self: "Engine[BaseSyncExecutor]",
execution_context: ExecutionContext,
) -> Proxy:
...

def execute(
self,
execution_context: ExecutionContext,
) -> Any:
queue, workflow = self._prepare_workflow(execution_context)
return self.executor.process(queue, workflow)
36 changes: 30 additions & 6 deletions hiku/executors/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,41 @@


class AsyncIOExecutor(BaseAsyncExecutor):
def __init__(self, loop: Optional[AbstractEventLoop] = None) -> None:
self._loop = loop or get_event_loop()
"""AsyncIOExecutor is an executor that uses asyncio event loop to run tasks.
By default it allows to run both synchronous and asynchronous tasks.
To deny synchronous tasks set deny_sync to True.
:param loop: asyncio event loop
:param deny_sync: deny synchronous tasks -
raise TypeError if a task is not awaitable
"""

def __init__(
self, loop: Optional[AbstractEventLoop] = None, deny_sync: bool = False
) -> None:
self.loop = loop or get_event_loop()
self.deny_sync = deny_sync

async def _wrapper(self, fn: Callable, *args: Any, **kwargs: Any) -> Any:
result = fn(*args, **kwargs)
if inspect.isawaitable(result):
return await result
else:
return result

def submit(self, fn: Callable, *args: Any, **kwargs: Any) -> Task:
coro = fn(*args, **kwargs)
if not inspect.isawaitable(coro):
raise TypeError(
"{!r} returned non-awaitable object {!r}".format(fn, coro)
)
if self.deny_sync:
raise TypeError(
"{!r} returned non-awaitable object {!r}".format(fn, coro)
)

return self.loop.create_task(self._wrapper(fn, *args, **kwargs))

coro = cast(Coroutine, coro)
return self._loop.create_task(coro)
return self.loop.create_task(coro)

async def process(self, queue: "Queue", workflow: "Workflow") -> Proxy:
try:
Expand Down
13 changes: 8 additions & 5 deletions hiku/federation/schema.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import List, Optional, Sequence, Type, Union, cast
from typing import Generic, List, Optional, Sequence, Type, Union, cast
from hiku.cache import CacheSettings
from hiku.context import ExecutionContext, ExecutionContextFinal
from hiku.engine import Engine
from hiku.engine import _ExecutorType
from hiku.federation.introspection import FederatedGraphQLIntrospection
from hiku.federation.sdl import print_sdl
from hiku.graph import GraphTransformer
Expand Down Expand Up @@ -37,7 +38,7 @@ def visit_graph(self, obj: Graph) -> Graph: # type: ignore
)


class Schema(BaseSchema):
class Schema(BaseSchema, Generic[_ExecutorType]):
"""Can execute either regular or federated queries.
Handles following fields of federated query:
- _service
Expand All @@ -50,28 +51,30 @@ class Schema(BaseSchema):

def __init__(
self,
engine: Engine,
executor: _ExecutorType,
graph: Graph,
mutation: Optional[Graph] = None,
batching: bool = False,
introspection: bool = True,
extensions: Optional[
Sequence[Union[Extension, Type[Extension]]]
] = None,
cache: Optional[CacheSettings] = None,
federation_version: int = DEFAULT_FEDERATION_VERSION,
):
transformers: List[GraphTransformer] = []
if federation_version == 1:
transformers.append(FederationV1EntityTransformer())

super().__init__(
engine=engine,
graph=graph,
mutation=mutation,
batching=batching,
introspection=introspection,
extensions=extensions,
transformers=transformers,
executor=executor,
cache=cache,
)
self.federation_version = federation_version

Expand Down
Loading

0 comments on commit ee4f23e

Please sign in to comment.