operational_storage
Модуль operational_storage
содержит следующие функции и классы ошибок для работы с Operational Storage
:
1class InsertionError(Exception): 2 pass 3 4class UpsertionError(Exception): 5 pass 6 7class SelectionError(Exception): 8 pass 9 10class BadParamsError(Exception): 11 pass 12 13class NotFoundError(Exception): 14 pass 15 16 17def insert() -> Insert: 18 """ 19 insert() возвращает объект для вставки значений тегов типа event в Operational Storage 20 """ 21 22def select_latest(tag: Tag) -> SelectLatest: 23 """ 24 select_latest() возвращает объект для выборки последнего значения тэга типа event из Operational Storage 25 """ 26 27def select(tag: Tag, from_ts: datetime, to_ts: datetime) -> Select: 28 """ 29 select() возвращает объект для выборки значений из Operational Storage 30 Этот объект поддерживает аггрегацию, группировку, смещения и пр. 31 Применим для тэгов типа event и aggregate 32 """ 33 34def upsert() -> Upsert: 35 """ 36 upsert() возвращает объект для вставки с обновлением значений тегов типа aggregate в Operational Storage 37 то есть, если в хранилище значение тега в заданный timestamp уже существует - то значение тега обновляется 38 """ 39 40def select_current(tag: Tag) -> SelectCurrent: 41 """ 42 select_current() возвращает объект для выборки текущего значения тэга типа aggregate из Operational Storage 43 Под текущим значением тега понимается значение тега за текущий период агрегации 44 """
Вызов insert()
возвращает объект Insert
со следующим интерфейсом:
1class Insert: 2 def values(self, tag: Tag, value: Any, timestamp: datetime) -> 'Insert': 3 pass 4 5 def msg(self, msg: Message) -> 'Insert': 6 pass 7 8 def execute(self): 9 pass
Метод values(tag, value, timestamp)
предназначен для вставки конкретного значения. Для вставки значения из сообщения можно использовать метод msg(message)
:
1from coiiot_sdk import operational_storage as op_store, context 2 3ctx = context.current() 4 5# сохраняем полученное сообщение в хранилище 6op_store.insert().msg(ctx.msg).execute()
Очень часто нам нужно именно последнее значение тэга. За это отвечает функция select_latest(tag)
.
Вызов select_latest(tag)
возвращает объект класса SelectLatest
со следующим интерфейсом:
1class Event(TypedDict): 2 payload: Dict[str, Any] 3 received_at: int 4 time: int 5 value: Any 6 7class LatestEvent(TypedDict): 8 data: Event 9 tag_id: int 10 value_type: str 11 12 13class SelectLatest: 14 15 def execute(self) -> LatestEvent: 16 pass
Пример получения последнего значения тэга:
from coiiot_sdk import operational_storage as op_store, context, user_logs
logger = user_logs.get_logger()
ctx = context.current()
latest = op_store.select_latest(ctx.tag).execute()
logger.info(f"{ latest.data.value }")
Очень часто для тега типа aggregate
нам нужно его текущее значение, то есть значение за текущий, еще не завершенный период агрегации. За это отвечает функция select_current(tag)
.
Вызов select_current(tag)
возвращает объект класса SelectCurrent
со следующим интерфейсом:
1class Aggregate(TypedDict): 2 payload: Dict[str, Any] 3 received_at: int 4 time: int 5 value: Any 6 7class CurrentAggregate(TypedDict): 8 data: Aggregate 9 tag_id: int 10 value_type: str 11 12class SelectCurrent: 13 14 def execute(self) -> CurrentAggregate: 15 pass
Пример получения текущего значения тэга-агрегата:
1from coiiot_sdk import operational_storage as op_store, context, user_logs 2 3logger = user_logs.get_logger() 4ctx = context.current() 5 6current = op_store.select_current(ctx.tag).execute() 7logger.info(f"{ current.data.value }")
Вызов upsert()
возвращает объект Upsert
со следующим интерфейсом:
1class Upsert: 2 def values(self, tag: Tag, value: Any, timestamp: datetime) -> 'Upsert': 3 pass 4 5 def msg(self, msg: Message) -> 'Upsert': 6 pass 7 8 def execute(self): 9 pass
Метод values(tag, value, timestamp)
предназначен для вставки или обновления конкретного значения. Для вставки значения из сообщения можно использовать метод msg(message)
:
1from coiiot_sdk import operational_storage as op_store, context 2 3ctx = context.current() 4 5# сохраняем полученное сообщение в хранилище 6op_store.upsert().msg(ctx.msg).execute()
Если в хранилище значение тега в заданный timestamp уже существует, то значение тега обновляется.
Для более сложной выборки значений можно использовать метод select(tag, from_ts, to_ts)
.
Метод применим как к тегам типа event
, так и к тегам типа aggregate
.
Метод возвращает объект класса Select
со следующим интерфейсом:
1class Select: 2 3 def with_offset(self, offset: int) -> 'Select': 4 """ 5 Установить смещение для начала выборки значений 6 """ 7 8 def with_limit(self, limit: int) -> 'Select': 9 """ 10 Установить ограничение на количество выбранных значений 11 """ 12 13 def order_by(self, **kwargs) -> 'Select': 14 """ 15 Задает порядок сортировки выборки с помощью именованных параметров. 16 Имя параметра обозначает поле сортировки. 17 Значение параметра задает порядок сортировки, может быть либо "asc", либо "desc" 18 """ 19 20 def with_period(self, period: str) -> 'Select': 21 """ 22 Делать агрегацию по заданному временному периоду 23 """ 24 25 def calc_max_as(self, alias: str) -> 'Select': 26 """ 27 Вычислить максимум под псевдонимом, задаваемым alias (агрегация) 28 """ 29 30 def calc_max(self) -> 'Select': 31 """ 32 Сокращение для calc_max_as("max") 33 """ 34 35 def calc_min_as(self, alias: str) -> 'Select': 36 """ 37 Вычислить минимум под псевдонимом, задаваемым alias (агрегация) 38 """ 39 40 def calc_min(self) -> 'Select': 41 """ 42 Сокращение для calc_min_as("min") 43 """ 44 45 def calc_avg_as(self, alias: str) -> 'Select': 46 """ 47 Вычислить среднее под псевдонимом, задаваемым alias (агрегация) 48 """ 49 50 def calc_avg(self) -> 'Select': 51 """ 52 Сокращение для calc_avg_as("avg") 53 """ 54 55 def calc_count_as(self, alias: str) -> 'Select': 56 """ 57 Вычислить количество под псевдонимом, задаваемым alias (агрегация) 58 """ 59 60 def calc_count(self) -> 'Select': 61 """ 62 Сокращение для calc_count_as("count") 63 """ 64 65 def calc_sum_as(self, alias: str) -> 'Select': 66 """ 67 Вычислить сумму под псевдонимом, задаваемым alias (агрегация) 68 """ 69 70 def calc_sum(self) -> 'Select': 71 """ 72 Сокращение для calc_sum_as("sum") 73 """ 74 75 def execute(self) -> SelectionResult: 76 pass
Вариантов использования Select
много, поэтому метод execute()
возвращает достаточно сложный тип SelectionResult
.
Ниже приводится его полное описание.
1class Event(TypedDict): 2 payload: Dict[str, Any] 3 received_at: int 4 time: int 5 value: Any 6 7class Events(TypedDict): 8 data: List[Event] 9 tag_id: int 10 value_type: str 11 12class EventsAggregate(TypedDict): 13 data: Dict[str, Union[int, float]] 14 tag_id: int 15 value_type: str 16 17class GroupAggregateOfEvents(TypedDict): 18 end_time: int 19 start_time: int 20 value: Dict[str, Union[int, float]] 21 22class EventsGroupAggregate(TypedDict): 23 data: List[GroupAggregateOfEvents] 24 tag_id: int 25 value_type: str 26 27class Aggregate(TypedDict): 28 payload: Dict[str, Any] 29 received_at: int 30 time: int 31 value: Any 32 33class Aggregates(TypedDict): 34 data: List[Aggregate] 35 tag_id: int 36 value_type: str 37 38class AggregatesAggregate(TypedDict): 39 data: Dict[str, Union[int, float]] 40 tag_id: int 41 value_type: str 42 43class GroupAggregateOfAggregates(TypedDict): 44 end_time: int 45 start_time: int 46 value: Dict[str, Union[int, float]] 47 48class AggregatesGroupAggregate(TypedDict): 49 data: List[GroupAggregateOfAggregates] 50 tag_id: int 51 value_type: str 52 53AggregateEvents = Union[EventsAggregate, EventsGroupAggregate] 54EventsResult = Union[AggregateEvents, Events] 55 56AggregateAggregates = Union[AggregatesAggregate, AggregatesGroupAggregate] 57AggregatesResult = Union[AggregateAggregates, Aggregates] 58 59SelectionResult = Union[AggregatesResult, EventsResult]
Пример расчета минимума и максимума за последние 30 секунд:
1import datetime 2from coiiot_sdk import operational_storage as op_store, context, user_logs 3 4logger = user_logs.get_logger() 5ctx = context.current() 6 7to_ts = datetime.datetime.now() 8from_ts = to_ts - datetime.timedelta(seconds=30) 9r = op_store.select(ctx.tag, from_ts, to_ts).calc_max_as("max").calc_min_as("min").execute() 10logger.info(f'{ r.data["max"] }') 11logger.info(f'{ r.data["min"] }')
В этом примере значения группируются в периоды по 5 секунд и в этих периодах происходит расчет значений агрегатов:
1import datetime 2from coiiot_sdk import operational_storage as op_store, context, user_logs 3 4logger = user_logs.get_logger() 5ctx = context.current() 6 7to_ts = datetime.datetime.now() 8from_ts = to_ts - datetime.timedelta(seconds=30) 9r = op_store.select(ctx.tag, from_ts, to_ts).with_period("5s").calc_count_as("count").execute() 10 11logger.info(f'{ r.data[0].value["count"] }') # количество в первом периоде 12logger.info(f'{ r.data[1].value["count"] }') # количество во втором периоде и т.д.
Можно использовать сортировку результатов выборки через метод order_by(**kwargs)
. Пока что поддерживается сортировка только по полю time
.
Пример ниже показывает, как получить последние два значения в порядке «с конца» за указанный период:
1import datetime 2from coiiot_sdk import operational_storage as op_store, context, user_logs 3 4logger = user_logs.get_logger() 5ctx = context.current() 6 7from_ts = datetime.datetime(2021, 1, 1) 8to_ts = datetime.datetime.now() 9r = op_store.select(ctx.tag, from_ts, to_ts).with_limit(2).order_by(time="desc").execute() 10logger.info(str([rec.value for rec in r.data]))