Skip to content

Commit

Permalink
refactor(appengine): exandra queries for device
Browse files Browse the repository at this point in the history
The first installment in the effort of refactoring the device module.

The change can be made little by little as exandra and cqex queries
can coexist.

This change ports a few queries to exandra, without making any big logic
refactor, but we can see the future direction of this refactor:
Astarte.AppEngine.API.Device.Queries becomes a module to expose _queries_
and not data from the database directly.

For this iteration, this is enough to elegantly use the queries in the
Device module, but more complex queries may require a more in-depth refactor.

The following queries have been ported:
- `retrieve_interfaces_list`
- `retrieve_all_endpoints_for_interface!`
- `retrieve_mapping`
- `prepare_value_type_query` / `execute_value_type_query`

Signed-off-by: Francesco Noacco <[email protected]>
  • Loading branch information
noaccOS committed Jan 22, 2025
1 parent cbfd1b2 commit b219689
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 178 deletions.
2 changes: 1 addition & 1 deletion apps/astarte_appengine_api/.formatter.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[
import_deps: [:phoenix, :ecto, :skogsra, :stream_data],
import_deps: [:phoenix, :ecto, :skogsra],
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
121 changes: 74 additions & 47 deletions apps/astarte_appengine_api/lib/astarte_appengine_api/device/device.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017-2023 Ispirata Srl
# Copyright 2017 - 2025 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,19 +31,20 @@ defmodule Astarte.AppEngine.API.Device do
alias Astarte.Core.CQLUtils
alias Astarte.Core.Device
alias Astarte.Core.InterfaceDescriptor
alias Astarte.Core.Interface.Aggregation
alias Astarte.Core.Interface.Type
alias Astarte.Core.Mapping
alias Astarte.Core.Mapping.EndpointsAutomaton
alias Astarte.Core.Mapping.ValueType
alias Astarte.DataAccess.Database
alias Astarte.DataAccess.Mappings
alias Astarte.DataAccess.Device, as: DeviceQueries
alias Astarte.DataAccess.Interface, as: InterfaceQueries
alias Astarte.DataAccess.Repo
alias Ecto.Changeset
alias Astarte.Core.CQLUtils
require Logger

import Ecto.Query

def list_devices!(realm_name, params) do
changeset = DevicesListOptions.changeset(%DevicesListOptions{}, params)

Expand Down Expand Up @@ -161,9 +162,12 @@ defmodule Astarte.AppEngine.API.Device do
Returns the list of interfaces.
"""
def list_interfaces(realm_name, encoded_device_id) do
with {:ok, client} <- Database.connect(realm: realm_name),
{:ok, device_id} <- Device.decode_device_id(encoded_device_id) do
Queries.retrieve_interfaces_list(client, device_id)
device_introspection = Queries.retrieve_interfaces_list(realm_name)

with {:ok, device_id} <- Device.decode_device_id(encoded_device_id),
{:ok, device} <- Repo.fetch(device_introspection, device_id, error: :device_not_found) do
interface_names = device.introspection |> Map.keys()
{:ok, interface_names}
end
end

Expand All @@ -182,9 +186,10 @@ defmodule Astarte.AppEngine.API.Device do
{:ok, interface_row} <-
InterfaceQueries.retrieve_interface_row(realm_name, interface, major_version) do
do_get_interface_values!(
realm_name,
client,
device_id,
Aggregation.from_int(interface_row[:aggregation]),
interface_row.aggregation,
interface_row,
options
)
Expand All @@ -210,16 +215,14 @@ defmodule Astarte.AppEngine.API.Device do
{:ok, interface_descriptor} <- InterfaceDescriptor.from_db_result(interface_row),
{:ok, endpoint_ids} <-
get_endpoint_ids(interface_descriptor.automaton, path, allow_guess: true) do
endpoint_query = Queries.prepare_value_type_query(interface_row[:interface_id])

do_get_interface_values!(
realm_name,
client,
device_id,
Aggregation.from_int(interface_row[:aggregation]),
Type.from_int(interface_row[:type]),
interface_row.aggregation,
interface_row.type,
interface_row,
endpoint_ids,
endpoint_query,
path,
options
)
Expand All @@ -235,8 +238,12 @@ defmodule Astarte.AppEngine.API.Device do
raw_value
) do
with {:ok, [endpoint_id]} <- get_endpoint_ids(interface_descriptor.automaton, path),
mapping <-
Queries.retrieve_mapping(client, interface_descriptor.interface_id, endpoint_id),
mapping =
Queries.retrieve_mapping(realm_name)
|> Repo.get_by!(%{
interface_id: interface_descriptor.interface_id,
endpoint_id: endpoint_id
}),
{:ok, value} <- InterfaceValue.cast_value(mapping.value_type, raw_value),
:ok <- validate_value_type(mapping.value_type, value),
wrapped_value = wrap_to_bson_struct(mapping.value_type, value),
Expand Down Expand Up @@ -726,7 +733,12 @@ defmodule Astarte.AppEngine.API.Device do
{:ownership, :server} <- {:ownership, interface_descriptor.ownership},
path <- "/" <> no_prefix_path,
{:ok, [endpoint_id]} <- get_endpoint_ids(interface_descriptor.automaton, path) do
mapping = Queries.retrieve_mapping(client, interface_descriptor.interface_id, endpoint_id)
mapping =
Queries.retrieve_mapping(realm_name)
|> Repo.get_by!(%{
interface_id: interface_descriptor.interface_id,
endpoint_id: endpoint_id
})

Queries.insert_value_into_db(
client,
Expand Down Expand Up @@ -768,9 +780,10 @@ defmodule Astarte.AppEngine.API.Device do
end
end

defp do_get_interface_values!(client, device_id, :individual, interface_row, opts) do
defp do_get_interface_values!(realm_name, client, device_id, :individual, interface_row, opts) do
endpoint_rows =
Queries.retrieve_all_endpoint_ids_for_interface!(client, interface_row[:interface_id])
Queries.retrieve_all_endpoint_ids_for_interface!(realm_name, interface_row.interface_id)
|> Repo.all()

values_map =
Enum.reduce(endpoint_rows, %{}, fn endpoint_row, values ->
Expand All @@ -779,10 +792,10 @@ defmodule Astarte.AppEngine.API.Device do
retrieve_endpoint_values(
client,
device_id,
Aggregation.from_int(interface_row[:aggregation]),
Type.from_int(interface_row[:type]),
interface_row.aggregation,
interface_row.type,
interface_row,
endpoint_row[:endpoint_id],
endpoint_row.endpoint_id,
endpoint_row,
"/",
opts
Expand All @@ -794,43 +807,50 @@ defmodule Astarte.AppEngine.API.Device do
{:ok, %InterfaceValues{data: MapTree.inflate_tree(values_map)}}
end

defp do_get_interface_values!(client, device_id, :object, interface_row, opts) do
defp do_get_interface_values!(realm_name, client, device_id, :object, interface_row, opts) do
# We need to know if mappings have explicit_timestamp set, so we retrieve it from the
# first one.
endpoint =
Queries.retrieve_all_endpoint_ids_for_interface!(client, interface_row[:interface_id])
|> CQEx.Result.head()
Queries.retrieve_all_endpoint_ids_for_interface!(realm_name, interface_row.interface_id)
|> limit(1)
|> Repo.one!()

mapping =
Queries.retrieve_mapping(client, interface_row[:interface_id], endpoint[:endpoint_id])
Queries.retrieve_mapping(realm_name)
|> Repo.get_by!(%{
interface_id: interface_row.interface_id,
endpoint_id: endpoint.endpoint_id
})

do_get_interface_values!(
realm_name,
client,
device_id,
Aggregation.from_int(interface_row[:aggregation]),
Type.from_int(interface_row[:type]),
interface_row.aggregation,
interface_row.type,
interface_row,
nil,
nil,
"/",
%{opts | explicit_timestamp: mapping.explicit_timestamp}
)
end

defp do_get_interface_values!(
realm_name,
client,
device_id,
:individual,
:properties,
interface_row,
endpoint_ids,
endpoint_query,
path,
opts
) do
result =
List.foldl(endpoint_ids, %{}, fn endpoint_id, values ->
endpoint_row = Queries.execute_value_type_query(client, endpoint_query, endpoint_id)
endpoint_row =
Queries.value_type_query(realm_name)
|> Repo.get_by!(%{interface_id: interface_row.interface_id, endpoint_id: endpoint_id})

value =
retrieve_endpoint_values(
Expand Down Expand Up @@ -861,19 +881,21 @@ defmodule Astarte.AppEngine.API.Device do
end

defp do_get_interface_values!(
realm_name,
client,
device_id,
:individual,
:datastream,
interface_row,
endpoint_ids,
endpoint_query,
path,
opts
) do
[endpoint_id] = endpoint_ids

endpoint_row = Queries.execute_value_type_query(client, endpoint_query, endpoint_id)
endpoint_row =
Queries.value_type_query(realm_name)
|> Repo.get_by!(%{interface_id: interface_row.interface_id, endpoint_id: endpoint_id})

retrieve_endpoint_values(
client,
Expand All @@ -889,27 +911,33 @@ defmodule Astarte.AppEngine.API.Device do
end

defp do_get_interface_values!(
realm_name,
client,
device_id,
:object,
:datastream,
interface_row,
_endpoint_ids,
_endpoint_query,
path,
opts
) do
# We need to know if mappings have explicit_timestamp set, so we retrieve it from the
# first one.
endpoint =
Queries.retrieve_all_endpoint_ids_for_interface!(client, interface_row[:interface_id])
|> CQEx.Result.head()
Queries.retrieve_all_endpoint_ids_for_interface!(realm_name, interface_row.interface_id)
|> limit(1)
|> Repo.one!()

mapping =
Queries.retrieve_mapping(client, interface_row[:interface_id], endpoint[:endpoint_id])
Queries.retrieve_mapping(realm_name)
|> Repo.get_by!(%{
interface_id: interface_row.interface_id,
endpoint_id: endpoint.endpoint_id
})

endpoint_rows =
Queries.retrieve_all_endpoints_for_interface!(client, interface_row[:interface_id])
Queries.retrieve_all_endpoints_for_interface!(realm_name, interface_row.interface_id)
|> Repo.all()

interface_values =
retrieve_endpoint_values(
Expand Down Expand Up @@ -983,7 +1011,7 @@ defmodule Astarte.AppEngine.API.Device do
) do
path = "/"

interface_id = interface_row[:interface_id]
interface_id = interface_row.interface_id

values =
Queries.retrieve_all_endpoint_paths!(client, device_id, interface_id, endpoint_id)
Expand Down Expand Up @@ -1017,7 +1045,7 @@ defmodule Astarte.AppEngine.API.Device do
nice_value =
AstarteValue.to_json_friendly(
v,
ValueType.from_int(endpoint_row[:value_type]),
endpoint_row.value_type,
fetch_biginteger_opts_or_default(opts)
)

Expand Down Expand Up @@ -1058,9 +1086,9 @@ defmodule Astarte.AppEngine.API.Device do
) do
path = "/"

interface_id = interface_row[:interface_id]
interface_id = interface_row.interface_id

endpoint_id = CQLUtils.endpoint_id(interface_row[:name], interface_row[:major_version], "")
endpoint_id = CQLUtils.endpoint_id(interface_row.name, interface_row.major_version, "")

{count, paths} =
Queries.retrieve_all_endpoint_paths!(client, device_id, interface_id, endpoint_id)
Expand Down Expand Up @@ -1155,10 +1183,10 @@ defmodule Astarte.AppEngine.API.Device do
Enum.reduce(endpoint_rows, {"", %{}, nil}, fn endpoint,
{query_acc, atoms_map,
prev_downsample_column_atom} ->
endpoint_name = endpoint[:endpoint]
endpoint_name = endpoint.endpoint
column_name = CQLUtils.endpoint_to_db_column_name(endpoint_name)

value_type = endpoint[:value_type] |> ValueType.from_int()
value_type = endpoint.value_type

next_query_acc = "#{query_acc} #{column_name}, "
column_atom = String.to_atom(column_name)
Expand Down Expand Up @@ -1247,7 +1275,7 @@ defmodule Astarte.AppEngine.API.Device do
nice_value =
AstarteValue.to_json_friendly(
row_value,
ValueType.from_int(endpoint_row[:value_type]),
endpoint_row.value_type,
fetch_biginteger_opts_or_default(opts)
)

Expand Down Expand Up @@ -1380,8 +1408,7 @@ defmodule Astarte.AppEngine.API.Device do
:datetime,
keep_milliseconds: opts.keep_milliseconds
),
"value" =>
AstarteValue.to_json_friendly(v, ValueType.from_int(endpoint_row[:value_type]), [])
"value" => AstarteValue.to_json_friendly(v, endpoint_row.value_type, [])
}
end

Expand Down Expand Up @@ -1416,7 +1443,7 @@ defmodule Astarte.AppEngine.API.Device do
AstarteValue.to_json_friendly(tstamp, :datetime, []),
AstarteValue.to_json_friendly(
v,
ValueType.from_int(endpoint_row[:value_type]),
endpoint_row.value_type,
keep_milliseconds: opts.keep_milliseconds
)
]
Expand Down Expand Up @@ -1449,7 +1476,7 @@ defmodule Astarte.AppEngine.API.Device do
[{:value_timestamp, tstamp}, _, _, {_, v}] = value

[
AstarteValue.to_json_friendly(v, ValueType.from_int(endpoint_row[:value_type]), []),
AstarteValue.to_json_friendly(v, endpoint_row.value_type, []),
AstarteValue.to_json_friendly(
tstamp,
:datetime,
Expand Down
Loading

0 comments on commit b219689

Please sign in to comment.