controllers

trunk
HeNine 2 years ago
parent 234cee129b
commit 6703bb75e8

2
.gitignore vendored

@ -22,3 +22,5 @@ erl_crash.dump
*.iml
.elixir_ls
dev.exs

@ -0,0 +1,173 @@
#
# picc -- Home automation server
# Copyright (C) 2022 picc project
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
require Logger
defmodule Picc.Controller.RunningAverage do
@moduledoc """
Controller that models a variable as a gaussian.
"""
use GenServer
use AMQP
@exchange "picc"
alias AMQP.Basic
# alias Picc.Controller.RunningAverage
defstruct [
:target,
:variable,
:prior,
:amqp_channel,
:amqp_consumer_tag,
:amqp_queue
]
def start_link(init_arg) do
GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(%{entities: [target], arg: arg}) do
{:ok, amqp_channel} = AMQP.Application.get_channel(:amqp_default_channel)
queue_name = "RunningAverageMeasurement-#{target}-#{arg["variable"]}"
{:ok, _} = Queue.declare(amqp_channel, queue_name, durable: false, auto_delete: true)
# Find all entities in is-in relationship with target entity
entities_in_target = Picc.Directory.ResourceIndex.find_relationships("is-in", target) |> List.flatten()
# Subscribe to target
:ok =
Queue.bind(amqp_channel, queue_name, @exchange, routing_key: "#{Picc.Util.get_local_prefix()}.resources.#{target}")
# Subscribe to all entities in target
for entity <- entities_in_target do
:ok =
Queue.bind(
amqp_channel,
queue_name,
@exchange,
routing_key: "#{Picc.Util.get_local_prefix()}.resources.#{entity}"
)
end
# Find entities that 1. are sensors, 2. measure the desired variable
sensors =
Picc.Directory.ResourceIndex.fold_resources(
fn term, acc ->
case term do
{:sensor, id, _, _, measures} ->
if id in entities_in_target and arg["variable"] in measures, do: [id | acc], else: acc
_ ->
acc
end
end,
[]
)
# Subscribe to relevant sensors
for sensor <- sensors do
Queue.bind(amqp_channel, queue_name, @exchange,
routing_key: "#{Picc.Util.get_device_path(:sensor, sensor)}.#{arg["variable"]}"
)
end
{:ok, amqp_consumer_tag} = Basic.consume(amqp_channel, queue_name, nil, no_ack: true)
{:ok,
%__MODULE__{
target: target,
variable: arg["variable"],
prior: %{mu0: Map.get(arg, "mu0", 0), nu: Map.get(arg, "nu", 1)},
amqp_channel: amqp_channel,
amqp_consumer_tag: amqp_consumer_tag,
amqp_queue: queue_name
}}
end
@impl true
def handle_info({:basic_consume_ok, _}, state) do
Logger.info("Subscribed to exchange. Queue: #{state.amqp_queue}; Tag: #{state.amqp_consumer_tag}")
{:noreply, state}
end
def handle_info(
{:basic_deliver, payload, %{routing_key: routing_key, timestamp: _timestamp, delivery_tag: _tag}},
state
) do
with {:ok, payload_json} <- Jason.decode(payload) do
cond do
# Process new measurement
String.starts_with?(routing_key, Picc.Util.get_local_prefix() <> ".dev.") and
String.ends_with?(routing_key, state.variable) ->
[_sensor, _variable] =
routing_key
|> String.replace_leading("#{Picc.Util.get_local_prefix()}.dev.sensors.", "")
|> String.split(".")
# Compute new average
mu = (state.prior.nu * state.prior.mu0 + payload_json) / (state.prior.nu + 1)
# Publish
:ok =
Basic.publish(
state.amqp_channel,
@exchange,
Enum.join([Picc.Util.get_local_prefix(), Picc.Util.get_location_path(state.target), state.variable], "."),
"#{mu}"
)
{:noreply, %{state | prior: %{state.prior | mu0: mu}}}
# Process change in configuration
String.starts_with?(routing_key, Picc.Util.get_local_prefix() <> ".resources.") ->
cond do
payload_json["id"] == state.target ->
# Just restart the whole thing to reinitialize
{:stop, :terminate, state}
payload_json["$schema"] == "https://picc.app/schemata/v0.1-dev/devices/sensor" and
state.variable in payload_json["measures"] ->
# Subscribe to new sensor
:ok =
Queue.bind(state.amqp_channel, state.amqp_queue, @exchange,
routing_key: "#{Picc.Util.get_device_path(:sensor, payload_json["id"])}.#{state.variable}"
)
{:noreply, state}
end
end
end
# {:noreply, state}
end
@impl true
def terminate(_reason, state) do
Queue.delete(state.amqp_channel, state.amqp_queue)
end
end

@ -47,7 +47,8 @@ defmodule Picc do
children = [
Picc.DBManager,
Picc.DriverManager,
Picc.Directory
Picc.Directory,
Picc.ControllerManager,
]
Supervisor.init(children, strategy: :one_for_one)

@ -0,0 +1,159 @@
#
# picc -- Home automation server
# Copyright (C) 2022 picc project
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
require Logger
require Picc.Util
require Picc.Directory.ResourceIndex, as: ResourceIndex
defmodule Picc.ControllerManager do
use AMQP
use GenServer
defstruct [
:amqp_channel,
:amqp_consumer_tag,
controllers: %{}
]
@queue "picc_controller_manager_queue"
@exchange "picc"
def start_link(init_arg) do
GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_) do
__MODULE__.ControllerSupervisor.start_link([])
{:ok, amqp_channel} = AMQP.Application.get_channel(:amqp_controller_manager_channel)
{:ok, _} = Queue.declare(amqp_channel, @queue, durable: false, auto_delete: true)
:ok = Queue.bind(amqp_channel, @queue, @exchange, routing_key: "#{Picc.Util.get_local_prefix()}.dev.controllers.*")
{:ok, amqp_consumer_tag} = Basic.consume(amqp_channel, @queue, nil, no_ack: true)
controller_specs = ResourceIndex.match_resource({:controller, :"$1", :_, :"$2", :_, :_})
controllers =
for [controller_id, controller_json] <- controller_specs, into: %{} do
child_id = start_controller(controller_json)
{controller_id, child_id}
end
{:ok, %__MODULE__{amqp_channel: amqp_channel, amqp_consumer_tag: amqp_consumer_tag, controllers: controllers}}
end
@impl true
def handle_info({:basic_consume_ok, _}, state) do
Logger.info("Subscribed to exchange. Queue: #{@queue}; Tag: #{state.amqp_consumer_tag}")
{:noreply, state}
end
@impl true
def handle_info(
{:basic_deliver, payload, %{routing_key: routing_key, timestamp: _timestamp, delivery_tag: tag}},
state
) do
:ok = Basic.ack(state.amqp_channel, tag)
{id, child_id} =
case Jason.decode(payload) do
{:ok, payload_json} ->
if Map.has_key?(state.controllers, payload_json["id"]) do
DynamicSupervisor.terminate_child(
__MODULE__.ControllerSupervisor,
state.controllers[payload_json["id"]]
)
end
{payload_json["id"], start_controller(payload_json)}
{:error, error} ->
Logger.warn("Invalid JSON on: #{routing_key}: #{inspect(error)}")
{nil, nil}
end
controllers =
if id != nil do
Map.put(state.controllers, id, child_id)
else
state.controllers
end
{:noreply, %{state | controllers: controllers}}
end
defp start_controller(controller_json) do
with {:ok, id} <- Map.fetch(controller_json, "id"),
{:ok, module} <- Map.fetch(controller_json, "module"),
{:ok, entities} <- Map.fetch(controller_json, "entities"),
{:ok, arg} <- Map.fetch(controller_json, "arg") do
Logger.notice("Starting controller: #{id}")
module = String.to_atom(module)
child_spec = module.child_spec(%{entities: entities, arg: arg})
# Start child with modified spec
{:ok, child_pid} =
case DynamicSupervisor.start_child(__MODULE__.ControllerSupervisor, %{
child_spec
| start: {__MODULE__, :start_child_wrapper, [child_spec[:start]]}
}) do
{:ok, child_pid} ->
{:ok, child_pid}
{:error, error} ->
Logger.error(inspect(error, pretty: true))
{:ok, nil}
end
child_pid
end
end
@doc false
def start_child_wrapper({child_module, child_start_function, child_start_args}) do
{:ok, child_pid} = apply(child_module, child_start_function, child_start_args)
Logger.notice("Starting controller: #{child_module}(#{inspect(child_start_args)})")
{:ok, child_pid}
end
@impl true
def terminate(_reason, state) do
Queue.delete(state.amqp_channel, @queue)
end
defmodule ControllerSupervisor do
use DynamicSupervisor
def start_link(init_arg) do
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_) do
DynamicSupervisor.init(strategy: :one_for_one)
end
end
end

@ -27,7 +27,8 @@ defmodule Picc.Directory do
use AMQP
alias Picc.Directory, as: Directory
alias Picc.Directory
alias Picc.Directory.ResourceIndex
@queue "picc_directory_queue"
@exchange "picc"
@ -76,12 +77,16 @@ defmodule Picc.Directory do
{:ok, amqp_consumer_tag} = Basic.consume(amqp_channel, @queue, nil, no_ack: true)
init_data = GenServer.call(Picc.DBManager, :directory_init)
init_data = for [_, log_timestamp, event_timestamp, _, path, value] <- init_data do
init_data =
for [_, log_timestamp, event_timestamp, _, path, value] <- init_data do
{path, value, event_timestamp, log_timestamp}
end
:ets.insert(:picc_directory, init_data)
:ok = ResourceIndex.init()
{:ok,
%Directory{
amqp_channel: amqp_channel,
@ -90,6 +95,11 @@ defmodule Picc.Directory do
}}
end
@impl true
def handle_call({:check_resource_schema, resource}, _from, state) do
{:reply, ResourceIndex.check_resource_schema?(resource), state}
end
@impl true
def handle_info({:basic_consume_ok, _}, state) do
Logger.info("Subscribed to exchange. Queue: #{@queue}; Tag: #{state.amqp_consumer_tag}")
@ -99,15 +109,20 @@ defmodule Picc.Directory do
@impl true
def handle_info(
{:basic_deliver, payload, %{routing_key: routing_key, timestamp: timestamp}},
{:basic_deliver, payload, %{routing_key: routing_key, timestamp: timestamp, delivery_tag: tag}},
state
) do
# Logger.trace("#{routing_key |> String.pad_leading(100)}: #{payload}")
:ok = Basic.ack(state.amqp_channel, tag) # TODO: check if needed
case Jason.decode(payload) do
{:ok, payload_json} ->
:ets.insert(state.directory_tid, {routing_key, payload_json, timestamp, DateTime.now("Etc/UTC")})
if String.starts_with?(routing_key, "#{Picc.Util.get_local_prefix()}.resources.") and
ResourceIndex.check_resource_schema?(payload_json) do
ResourceIndex.add_resource(payload_json)
end
{:error, error} ->
Logger.warn("Invalid JSON on: #{routing_key}: #{inspect(error)}")
end
@ -119,4 +134,6 @@ defmodule Picc.Directory do
def terminate(_reason, state) do
Queue.delete(state.amqp_channel, @queue)
end
defdelegate get_resource(id), to: ResourceIndex
end

@ -0,0 +1,389 @@
#
# picc -- Home automation server
# Copyright (C) 2022 picc project
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
require Logger
require Picc.Util
defmodule Picc.Directory.ResourceIndex do
use AMQP
@moduledoc """
Functions for managing resources.
"""
alias Picc.Directory.ResourceIndex
@schema_cache_table :picc_resource_index_schema_cache
@resource_index_table :picc_resource_index
@relationship_table :picc_resource_index_relationships
@exchange "picc"
@doc """
Register a resource.
Argument is a map which much contain the `$schema` element, which determines the type of resource.
"""
@spec register_resource(map()) :: :ok | :error
def register_resource(resource) do
with {:ok, channel} <- AMQP.Application.get_channel(:amqp_default_channel),
{:ok, id} <- Map.fetch(resource, "id"),
{:ok, resource_json} <- Jason.encode(resource) do
if GenServer.call(Picc.Directory, {:check_resource_schema, resource}) do
Basic.publish(
channel,
@exchange,
"#{Picc.Util.get_local_prefix()}.resources.#{id}",
resource_json,
headers: [],
content_type: "application/json"
)
else
Logger.error("Invalid resource JSON: #{inspect(resource)}")
:error
end
end
end
@doc """
Get a resource from the index by `id`.
"""
@spec get_resource(String.t()) :: {:ok, map()} | {:error, String.t()}
def get_resource(id) do
case :ets.lookup(@resource_index_table, id) do
[{_, ^id, _, resource}] -> {:ok, resource}
[] -> {:error, "Resource not found: #{id}"}
_ -> {:error, "Unknown error: #{id}"}
end
end
@doc """
Run a `match_spec` against the resource index.
The `match_spec` is described in `:ets.match()`.
"""
@spec match_resource(tuple) :: [list]
def match_resource(match_spec) do
:ets.match(@resource_index_table, match_spec)
end
@doc """
Fold a function over the resource index table.
"""
def fold_resources(fun, acc) do
:ets.foldl(fun, acc, @resource_index_table)
end
@doc """
Find all entities that are in `relationship` with `entity`.
"""
def find_relationships(relationship, entity) do
:ets.match(@relationship_table, {relationship, entity, :"$1"})
end
# -----------------------------------
@doc """
Initialize resources. Called from Picc.Directory.
"""
def init() do
:ets.new(@schema_cache_table, [:set, :protected, :named_table, keypos: 1])
:ets.new(@resource_index_table, [:set, :protected, :named_table, keypos: 2])
# Relationship instances
:ets.new(@relationship_table, [:bag, :protected, :named_table, keypos: 2])
config_dirs = Application.fetch_env!(:picc, :resource_config)
config_dirs_valid =
Enum.filter(config_dirs, fn dir ->
case File.lstat(dir) do
{:ok, %File.Stat{type: :directory}} ->
true
{:ok, _} ->
Logger.error("Resource directory is not directory: #{dir}")
false
{:error, error_code} ->
Logger.error("Resource directory access error: #{dir}; #{error_code}")
false
end
end)
resource_files = Enum.flat_map(config_dirs_valid, &parse_directory/1)
Logger.debug("Loading resources: \n\t#{Enum.join(resource_files, "\n\t")}")
for resource_file <- resource_files do
load_file(resource_file)
end
:ok
end
defp parse_directory(path) do
files = File.ls!(path)
Enum.flat_map(files, fn file ->
full_file = Path.join(path, file)
case File.lstat(full_file) do
{:ok, %File.Stat{type: :directory}} ->
unless String.starts_with?(file, "."), do: parse_directory(full_file), else: []
{:ok, %File.Stat{type: :regular}} ->
if String.ends_with?(file, ".json"), do: [full_file], else: []
{:ok, %File.Stat{type: :symlink}} ->
if String.ends_with?(file, ".json"), do: [full_file], else: []
{:ok, _} ->
Logger.error("Resource is not valid: #{file}")
[]
{:error, error_code} ->
Logger.error("Resource access error: #{file}; #{error_code}")
[]
end
end)
end
defp load_file(file_path) do
file_string = File.read!(file_path)
file_json = Jason.decode!(file_string)
if check_resource_schema?(file_json) do
add_resource(file_json["$schema"], file_json)
:ok
else
Logger.error("Invalid resource: #{file_path}")
:error
end
end
@doc false
#
# URI is a URI structure
#
def get_schema(uri = %URI{}) do
get_schema(URI.to_string(uri))
end
#
# URI is plain string
#
def get_schema(schema_uri) do
case :ets.lookup(@schema_cache_table, schema_uri) do
[{_, schema, schema_json}] ->
{:ok, schema, schema_json}
[] ->
with {:ok, response} <- HTTPoison.get(schema_uri),
{:ok, schema_json} <- Jason.decode(response.body),
schema = JsonXema.new(schema_json, loader: ResourceIndex.SchemaLoader) do
:ets.insert(@schema_cache_table, {schema_uri, schema, schema_json})
{:ok, schema, schema_json}
end
end
end
#
# Check if resource complies with schema
#
@doc false
def check_resource_schema?(resource, schema \\ nil) do
{:ok, schema, _} = get_schema(Map.get(resource, "$schema", schema))
Picc.Util.log_fail(
JsonXema.valid?(schema, resource),
"Invalid resource: #{inspect(resource)}"
)
end
#
# Add a resource to the index based on schema
#
def add_resource(resource) do
add_resource(resource["$schema"], resource)
end
@spec add_resource(String.t(), map()) :: :ok | :error
defp add_resource("https://picc.app/schemata/v0.1-dev/relationship", resource) do
:ets.insert(
@resource_index_table,
{:relationship, resource["id"], resource["name"], resource, resource["arity"]}
)
end
defp add_resource("https://picc.app/schemata/v0.1-dev/variables/real", resource) do
:ets.insert(
@resource_index_table,
{:variable, resource["id"], resource["name"], resource, :real}
)
end
defp add_resource("https://picc.app/schemata/v0.1-dev/devices/sensor", resource) do
if Enum.all?(resource["measures"], &check_variable?/1) do
:ets.insert(
@resource_index_table,
{:sensor, resource["id"], resource["name"], resource, Map.get(resource, "measures", [])}
)
end
end
defp add_resource("https://picc.app/schemata/v0.1-dev/locations/site", resource) do
# Check that all inline buildings comply with the schema and add them
for building <- resource["buildings"] do
if check_resource_schema?(building, "https://picc.app/schemata/v0.1-dev/locations/building") do
add_resource(
"https://picc.app/schemata/v0.1-dev/locations/building",
Map.put(building, "site", resource["id"])
)
end
end
:ets.insert(
@resource_index_table,
{:site, resource["id"], resource["name"], resource}
)
end
defp add_resource("https://picc.app/schemata/v0.1-dev/locations/building", resource) do
# Check that all inline rooms comply with the schema and edd them
for room <- Map.get(resource, "rooms", []) do
if check_resource_schema?(room, "https://picc.app/schemata/v0.1-dev/locations/room") do
add_resource(
"https://picc.app/schemata/v0.1-dev/locations/room",
Map.put(room, "building", resource["id"])
)
end
end
:ets.insert(
@resource_index_table,
{:building, resource["id"], resource["name"], resource}
)
end
defp add_resource("https://picc.app/schemata/v0.1-dev/locations/room", resource) do
# Check relationships
if Enum.all?(Map.get(resource, "relationships", []), &check_relationship?/1) do
:ets.insert(@resource_index_table, {:room, resource["id"], resource["name"], resource})
# Add relationships
for relationship <- Map.get(resource, "relationships", []) do
:ets.insert(
@relationship_table,
List.to_tuple([relationship["id"]] ++ relationship["args"])
)
end
:ok
else
Logger.error("Failed to add room: #{resource["id"]}")
:error
end
end
defp add_resource("https://picc.app/schemata/v0.1-dev/controller", resource) do
# Check that all measured and controlled variables exist
if Enum.all?(Map.get(resource, "measures", []), &check_variable?/1) and
Enum.all?(Map.get(resource, "controls", []), &check_variable?/1) and
Enum.all?(resource["entities"], &check_entity?/1) do
:ets.insert(
@resource_index_table,
{:controller, resource["id"], resource["name"], resource, resource["measures"], resource["controls"]}
)
{:ok, amqp_channel} = AMQP.Application.get_channel(:amqp_directory_channel)
# TODO: move to controller manager
AMQP.Basic.publish(
amqp_channel,
@exchange,
"#{Picc.Util.get_local_prefix()}.dev.controllers.#{resource["id"]}",
Jason.encode!(resource)
)
:ok
else
Logger.error("Cannot load: #{resource["id"]}")
:error
end
end
defp add_resource(schema, _) do
Logger.error("Unknown resource type: #{schema}")
:error
end
#
# Check if variable exists
#
defp check_variable?(variable) do
Picc.Util.log_fail(
:ets.member(@resource_index_table, variable),
"Variable not defined: #{variable}"
)
end
#
# Check if entity exists
#
defp check_entity?(entity) do
Picc.Util.log_fail(
:ets.member(@resource_index_table, entity),
"Entity not defined: #{entity}"
)
end
#
# Check if relationship exists and has correct arity
#
defp check_relationship?(%{"id" => id, "args" => args}) do
case :ets.lookup(@resource_index_table, id) do
[{:relationship, id, _, _, arity}] ->
Picc.Util.log_fail(length(args) == arity, "Relationship arity invalid: #{id}")
[] ->
Logger.error("Relationship not found: #{id}")
false
_ ->
Logger.error("Invalid relationship: #{id}")
false
end
end
#
# Loader module for remote schemata for Xema.
#
defmodule SchemaLoader do
@moduledoc false
@behaviour Xema.Loader
@impl true
@spec fetch(URI.t()) :: {:ok, any} | {:error, any}
def fetch(uri) do
with {:ok, _, schema_json} <- Picc.Directory.ResourceIndex.get_schema(uri),
do: {:ok, schema_json}
end
end
end

@ -1,27 +0,0 @@
#
# picc -- Home automation server
# Copyright (C) 2022 picc project
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
require Logger
defmodule Picc.ResourceManager do
use AMQP
@moduledoc false
end

@ -19,6 +19,9 @@
defmodule Picc.Util do
use AMQP
@doc """
Get a subscription to an MQTT topic.
"""
def get_mqtt_sub(routing_key \\ "#", queue_name \\ "") do
{:ok, amqp_connection} = AMQP.Application.get_connection(:amqp_main_connection)
{:ok, amqp_channel} = Channel.open(amqp_connection)
@ -28,6 +31,9 @@ defmodule Picc.Util do
{amqp_channel, queue}
end
@doc """
Get the main AMQP channel.
"""
@spec get_amqp_channel :: AMQP.Channel.t()
def get_amqp_channel() do
{:ok, amqp_conn} = AMQP.Application.get_connection(:amqp_main_connection)
@ -37,6 +43,20 @@ defmodule Picc.Util do
amqp_channel
end
@doc """
Strip the local prefix from a path.
"""
def get_local_path(path) do
String.replace_leading(path, "#{get_local_prefix()}.", "")
end
@doc """
Get the prefix for paths on the local server: `app.picc.<server_domain>`.
"""
def get_local_prefix() do
"app.picc.#{Application.fetch_env!(:picc, :server_domain)}"
end
@doc """
Composes a picc device path for a given device type and device id.
@ -52,6 +72,44 @@ defmodule Picc.Util do
_ -> raise "Invalid device type!"
end
"app.picc.#{Application.fetch_env!(:picc, :server_domain)}.dev.#{device_type_string}#{id}"
"#{get_local_prefix()}.dev.#{device_type_string}#{id}"
end
@doc """
Get a full path for a location, i.e., `room`, `building`, and `site`.
"""
@spec get_location_path(String.t()) :: String.t()
def get_location_path(id) do
{:ok, resource} = Picc.Directory.get_resource(id)
case resource["$schema"] do
"https://picc.app/schemata/v0.1-dev/locations/room" ->
if Map.has_key?(resource, "building") do
get_location_path(resource["building"]) <> "." <> "room_#{id}"
else
"room_#{id}"
end
"https://picc.app/schemata/v0.1-dev/locations/building" ->
if Map.has_key?(resource, "site") do
get_location_path(resource["site"]) <> "." <> "building_#{id}"
else
"building_#{id}"
end
"https://picc.app/schemata/v0.1-dev/locations/site" ->
"site_#{id}"
end
end
@doc """
Macro for logging failures.
If `test` is `true`, just return `true`. If `false`, issue a `Logger.error` with `message` and return false.
"""
defmacro log_fail(test, message) do
quote do
!unless unquote(test), do: Logger.error(unquote(message))
end
end
end

@ -65,6 +65,16 @@ defmodule ShellyHT do
{:ok, _response = {_, _, body}} = :httpc.request("http://#{ip}/status")
response_json = Jason.decode!(body)
Picc.Directory.ResourceIndex.register_resource(%{
"$schema" => "https://picc.app/schemata/v0.1-dev/devices/sensor",
"id" => id,
"name" => "Shelly HT - #{id}",
"measures" => [
"temperature",
"humidity"
]
})
if response_json["tmp"]["is_valid"] do
Basic.publish(
amqp_channel,

@ -62,7 +62,9 @@ defmodule Picc.MixProject do
{:amqp, "~> 3.1"},
{:postgrex, "~> 0.16.2"},
{:jason, "~> 1.0"},
{:ex_doc, "~> 0.27", only: :dev, runtime: false}
{:ex_doc, "~> 0.27", only: :dev, runtime: false},
{:httpoison, "~> 2.0"},
{:json_xema, "~> 0.3"}
]
end
end

@ -9,47 +9,22 @@
# move said applications out of the umbrella.
import Config
# Sample configuration:
#
# config :logger, :console,
# level: :info,
# format: "$date $time [$level] $metadata$message\n",
# metadata: [:user_id]
#
config :logger, :console,
format: "$date $time [$level] $metadata$message\n",
metadata: [:module]
if config_env() == :dev or config_env() == :test do
config :picc,
modules: [
Picc.MQTTMonitor,
ShellySpawner
],
db: [
hostname: "192.168.2.201",
database: "picc",
username: "postgres",
password: "piccpass"
],
server_domain: "com.raptorpond"
config :amqp,
connections: [
amqp_main_connection: [
host: "192.168.2.200",
port: 5672
]
],
channels: [
amqp_default_channel: [connection: :amqp_main_connection],
amqp_db_channel: [connection: :amqp_main_connection],
amqp_directory_channel: [connection: :amqp_main_connection]
amqp_directory_channel: [connection: :amqp_main_connection],
amqp_controller_manager_channel: [connection: :amqp_main_connection]
]
end
if config_env() == :test do
config :logger,
:console,
level: :all
end
import_config "#{config_env()}.exs"

@ -0,0 +1,32 @@
import Config
if config_env() == :dev or config_env() == :test do
end
if config_env() == :prod do
config :picc,
modules: [
Picc.MQTTMonitor,
ShellySpawner
],
db: [
hostname: System.fetch_env!("PICC_DB_HOST"),
port: System.fetch_env!("PICC_DB_PORT"),
database: System.fetch_env!("PICC_DB_NAME"),
username: System.fetch_env!("PICC_DB_USER"),
password: System.fetch_env!("PICC_DB_PASSWORD")
] ++ (case System.fetch_env!("PICC_DB_PORT") do
{:ok, port} -> [port: Integer.parse(port)]
:error -> []
end),
server_domain: System.fetch_env!("PICC_SERVER_DOMAIN")
config :amqp,
connections: [
amqp_main_connection: [
host: System.fetch_env!("PICC_RABBITMQ_HOST"),
port: Integer.parse(System.fetch_env!("PICC_RABBITMQ_PORT"))
]
]
end

@ -33,6 +33,12 @@ defmodule PiccUnbrella.MixProject do
# The main page in the docs
# main: "picc",
extras: ["README.md"]
],
releases: [
picc_prod: [
applications: [picc: :permanent]
]
]
]
end
@ -44,7 +50,8 @@ defmodule PiccUnbrella.MixProject do
# Run "mix help deps" for examples and options.
defp deps do
[
{:ex_doc, "~> 0.27", only: :dev, runtime: false}
{:ex_doc, "~> 0.27", only: :dev, runtime: false},
{:dialyxir, "~> 1.0", only: [:dev], runtime: false},
]
end
end

@ -1,20 +1,34 @@
%{
"amqp": {:hex, :amqp, "3.1.1", "a96ee272d196dfd1bf4ffc15dc7dcf900004d928dbdc6f5fcb80e6b0da03927c", [:mix], [{:amqp_client, "~> 3.9.1", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "ee7ca576351b4629b6be0701db8c085e203e242c577c59f344be56ef5a262056"},
"amqp_client": {:hex, :amqp_client, "3.9.11", "4ebe8040be3ee195e42bb483d37cd64faf3c306201dc22a3f5cce2a91a9e562e", [:make, :rebar3], [{:rabbit_common, "3.9.11", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "cdd74bc8e9d5e8610975009dcae1293bdf7198ee6d8315a1ffb5055467010520"},
"certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"},
"connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"},
"conv_case": {:hex, :conv_case, "0.2.3", "c1455c27d3c1ffcdd5f17f1e91f40b8a0bc0a337805a6e8302f441af17118ed8", [:mix], [], "hexpm", "88f29a3d97d1742f9865f7e394ed3da011abb7c5e8cc104e676fdef6270d4b4a"},
"credentials_obfuscation": {:hex, :credentials_obfuscation, "2.4.0", "9fb57683b84899ca3546b384e59ab5d3054a9f334eba50d74c82cd0ae82dd6ca", [:rebar3], [], "hexpm", "d28a89830e30698b075de9a4dbe683a20685c6bed1e3b7df744a0c06e6ff200a"},
"db_connection": {:hex, :db_connection, "2.4.2", "f92e79aff2375299a16bcb069a14ee8615c3414863a6fef93156aee8e86c2ff3", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4fe53ca91b99f55ea249693a0229356a08f4d1a7931d8ffa79289b145fe83668"},
"decimal": {:hex, :decimal, "2.0.0", "a78296e617b0f5dd4c6caf57c714431347912ffb1d0842e998e9792b5642d697", [:mix], [], "hexpm", "34666e9c55dea81013e77d9d87370fe6cb6291d1ef32f46a1600230b1d44f577"},
"dialyxir": {:hex, :dialyxir, "1.2.0", "58344b3e87c2e7095304c81a9ae65cb68b613e28340690dfe1a5597fd08dec37", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "61072136427a851674cab81762be4dbeae7679f85b1272b6d25c3a839aff8463"},
"earmark_parser": {:hex, :earmark_parser, "1.4.25", "2024618731c55ebfcc5439d756852ec4e85978a39d0d58593763924d9a15916f", [:mix], [], "hexpm", "56749c5e1c59447f7b7a23ddb235e4b3defe276afc220a6227237f3efe83f51e"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.28.3", "6eea2f69995f5fba94cd6dd398df369fe4e777a47cd887714a0976930615c9e6", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "05387a6a2655b5f9820f3f627450ed20b4325c25977b2ee69bed90af6688e718"},
"hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~>2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"},
"httpoison": {:hex, :httpoison, "2.0.0", "d38b091f5e481e45cc700aba8121ce49b66d348122a097c9fbc2dc6876d88090", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "f1253bf455be73a4c3f6ae3407e7e3cf6fc91934093e056d737a0566126e2930"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"},
"json_xema": {:hex, :json_xema, "0.6.1", "3681272f0c0332b1ac43165d6617143b418cb4e0ccde42ac5ec3681c0d426802", [:mix], [{:conv_case, "~> 0.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:xema, "~> 0.16", [hex: :xema, repo: "hexpm", optional: false]}], "hexpm", "62e0c28429dd7f9261d78405eb1b101ca422a6a169746d94a934aa66c1548b2f"},
"jsx": {:hex, :jsx, "3.1.0", "d12516baa0bb23a59bb35dccaf02a1bd08243fcbb9efe24f2d9d056ccff71268", [:rebar3], [], "hexpm", "0c5cc8fdc11b53cc25cf65ac6705ad39e54ecc56d1c22e4adb8f5a53fb9427f3"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
"parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
"postgrex": {:hex, :postgrex, "0.16.2", "0f83198d0e73a36e8d716b90f45f3bde75b5eebf4ade4f43fa1f88c90a812f74", [:mix], [{:connection, "~> 1.1", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "a9ea589754d9d4d076121090662b7afe155b374897a6550eb288f11d755acfa0"},
"rabbit_common": {:hex, :rabbit_common, "3.9.11", "25df900b1aec7357c90253cc4528b43c5ff064f27c8c627707b747ae986ebf77", [:make, :rebar3], [{:credentials_obfuscation, "2.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:jsx, "3.1.0", [hex: :jsx, repo: "hexpm", optional: false]}, {:recon, "2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "1bcac63760a0bf0e55d7d3c2ff36ed2310e0b560bd110a5a2d602d76d9c08e1a"},
"recon": {:hex, :recon, "2.5.1", "430ffa60685ac1efdfb1fe4c97b8767c92d0d92e6e7c3e8621559ba77598678a", [:mix, :rebar3], [], "hexpm", "5721c6b6d50122d8f68cccac712caa1231f97894bab779eff5ff0f886cb44648"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
"telemetry": {:hex, :telemetry, "1.1.0", "a589817034a27eab11144ad24d5c0f9fab1f58173274b1e9bae7074af9cbee51", [:rebar3], [], "hexpm", "b727b2a1f75614774cff2d7565b64d0dfa5bd52ba517f16543e6fc7efcc0df48"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
"xema": {:hex, :xema, "0.17.0", "982e397ce0af55cdf1c6bf9c5ee6e20c5ea4a24e58e5266339cfff0dadbfa01e", [:mix], [{:conv_case, "~> 0.2.2", [hex: :conv_case, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "9020afc75c5b9fba1c5875fd735a19c3c544db058cd97ef4c4675e479fc8bcbe"},
}

Loading…
Cancel
Save