VK Cloud logo

operational_storage

Описание

Модуль operational_storage содержит следующие функции и классы ошибок для работы с Operational Storage:

1class InsertionError(Exception):
2    pass
3
4class UpsertionError(Exception):
5    pass
6
7class SelectionError(Exception):
8    pass
9
10class BadParamsError(Exception):
11    pass
12
13class NotFoundError(Exception):
14    pass
15
16
17def insert() -> Insert:
18    """
19    insert() возвращает объект для вставки значений тегов типа event в Operational Storage
20    """
21
22def select_latest(tag: Tag) -> SelectLatest:
23    """
24    select_latest() возвращает объект для выборки последнего значения тэга типа event из Operational Storage
25    """
26
27def select(tag: Tag, from_ts: datetime, to_ts: datetime) -> Select:
28    """
29    select() возвращает объект для выборки значений из Operational Storage
30    Этот объект поддерживает аггрегацию, группировку, смещения и пр.
31    Применим для тэгов типа event и aggregate
32    """
33
34def upsert() -> Upsert:
35    """
36    upsert() возвращает объект для вставки с обновлением значений тегов типа aggregate в Operational Storage
37    то есть, если в хранилище значение тега в заданный timestamp уже существует - то значение тега обновляется
38    """
39
40def select_current(tag: Tag) -> SelectCurrent:
41    """
42    select_current() возвращает объект для выборки текущего значения тэга типа aggregate из Operational Storage
43    Под текущим значением тега понимается значение тега за текущий период агрегации
44    """

Event

Вызов insert() возвращает объект Insert со следующим интерфейсом:

1class Insert:
2    def values(self, tag: Tag, value: Any, timestamp: datetime) -> 'Insert':
3		pass
4
5    def msg(self, msg: Message) -> 'Insert':
6		pass
7
8    def execute(self):
9		pass

Метод values(tag, value, timestamp) предназначен для вставки конкретного значения. Для вставки значения из сообщения можно использовать метод msg(message):

1from coiiot_sdk import operational_storage as op_store, context
2
3ctx = context.current()
4
5# сохраняем полученное сообщение в хранилище
6op_store.insert().msg(ctx.msg).execute()

Получение последнего значения тега

Очень часто нам нужно именно последнее значение тэга. За это отвечает функция select_latest(tag).

Вызов select_latest(tag) возвращает объект класса SelectLatest со следующим интерфейсом:

1class Event(TypedDict):
2    payload: Dict[str, Any]
3    received_at: int
4    time: int
5    value: Any
6
7class LatestEvent(TypedDict):
8    data: Event
9    tag_id: int
10    value_type: str
11
12
13class SelectLatest:
14
15    def execute(self) -> LatestEvent:
16        pass

Пример получения последнего значения тэга:

from coiiot_sdk import operational_storage as op_store, context, user_logs

logger = user_logs.get_logger()
ctx = context.current()

latest = op_store.select_latest(ctx.tag).execute()
logger.info(f"{ latest.data.value }")

Aggregate

Очень часто для тега типа aggregate нам нужно его текущее значение, то есть значение за текущий, еще не завершенный период агрегации. За это отвечает функция select_current(tag).

Вызов select_current(tag) возвращает объект класса SelectCurrent со следующим интерфейсом:

1class Aggregate(TypedDict):
2    payload: Dict[str, Any]
3    received_at: int
4    time: int
5    value: Any
6
7class CurrentAggregate(TypedDict):
8    data: Aggregate
9    tag_id: int
10    value_type: str
11
12class SelectCurrent:
13
14    def execute(self) -> CurrentAggregate:
15        pass

Пример получения текущего значения тэга-агрегата:

1from coiiot_sdk import operational_storage as op_store, context, user_logs
2
3logger = user_logs.get_logger()
4ctx = context.current()
5
6current = op_store.select_current(ctx.tag).execute()
7logger.info(f"{ current.data.value }")

Вставка с обновлением значения тега

Вызов upsert() возвращает объект Upsert со следующим интерфейсом:

1class Upsert:
2    def values(self, tag: Tag, value: Any, timestamp: datetime) -> 'Upsert':
3		pass
4
5    def msg(self, msg: Message) -> 'Upsert':
6		pass
7
8    def execute(self):
9		pass

Метод values(tag, value, timestamp) предназначен для вставки или обновления конкретного значения. Для вставки значения из сообщения можно использовать метод msg(message):

1from coiiot_sdk import operational_storage as op_store, context
2
3ctx = context.current()
4
5# сохраняем полученное сообщение в хранилище
6op_store.upsert().msg(ctx.msg).execute()

Если в хранилище значение тега в заданный timestamp уже существует, то значение тега обновляется.

Получение значений

Для более сложной выборки значений можно использовать метод select(tag, from_ts, to_ts).

Метод применим как к тегам типа event, так и к тегам типа aggregate.

Метод возвращает объект класса Select со следующим интерфейсом:

1class Select:
2
3    def with_offset(self, offset: int) -> 'Select':
4		"""
5		Установить смещение для начала выборки значений
6		"""
7    
8    def with_limit(self, limit: int) -> 'Select':
9		"""
10		Установить ограничение на количество выбранных значений
11		"""
12
13    def order_by(self, **kwargs) -> 'Select':
14		"""
15		Задает порядок сортировки выборки с помощью именованных параметров.
16        Имя параметра обозначает поле сортировки.
17        Значение параметра задает порядок сортировки, может быть либо "asc", либо "desc"
18		"""
19
20    def with_period(self, period: str) -> 'Select':
21		"""
22		Делать агрегацию по заданному временному периоду
23		"""
24
25    def calc_max_as(self, alias: str) -> 'Select':
26		"""
27		Вычислить максимум под псевдонимом, задаваемым alias (агрегация)
28		"""
29
30    def calc_max(self) -> 'Select':
31		"""
32		Сокращение для calc_max_as("max")
33		"""
34
35    def calc_min_as(self, alias: str) -> 'Select':
36		"""
37		Вычислить минимум под псевдонимом, задаваемым alias (агрегация)
38		"""
39    
40    def calc_min(self) -> 'Select':
41		"""
42		Сокращение для calc_min_as("min")
43		"""
44
45    def calc_avg_as(self, alias: str) -> 'Select':
46		"""
47		Вычислить среднее под псевдонимом, задаваемым alias (агрегация)
48		"""
49
50    def calc_avg(self) -> 'Select':
51		"""
52		Сокращение для calc_avg_as("avg")
53		"""
54    
55    def calc_count_as(self, alias: str) -> 'Select':
56		"""
57		Вычислить количество под псевдонимом, задаваемым alias (агрегация)
58		"""
59
60    def calc_count(self) -> 'Select':
61		"""
62		Сокращение для calc_count_as("count")
63		"""
64
65    def calc_sum_as(self, alias: str) -> 'Select':
66		"""
67		Вычислить сумму под псевдонимом, задаваемым alias (агрегация)
68		"""
69    
70    def calc_sum(self) -> 'Select':
71		"""
72		Сокращение для calc_sum_as("sum")
73		"""
74    
75    def execute(self) -> SelectionResult:
76		pass

Вариантов использования Select много, поэтому метод execute() возвращает достаточно сложный тип SelectionResult.

Ниже приводится его полное описание.

1class Event(TypedDict):
2    payload: Dict[str, Any]
3    received_at: int
4    time: int
5    value: Any
6
7class Events(TypedDict):
8    data: List[Event]
9    tag_id: int
10    value_type: str
11
12class EventsAggregate(TypedDict):
13    data: Dict[str, Union[int, float]]
14    tag_id: int
15    value_type: str
16
17class GroupAggregateOfEvents(TypedDict):
18    end_time: int
19    start_time: int
20    value: Dict[str, Union[int, float]]
21
22class EventsGroupAggregate(TypedDict):
23    data: List[GroupAggregateOfEvents]
24    tag_id: int
25    value_type: str
26
27class Aggregate(TypedDict):
28    payload: Dict[str, Any]
29    received_at: int
30    time: int
31    value: Any
32
33class Aggregates(TypedDict):
34    data: List[Aggregate]
35    tag_id: int
36    value_type: str
37
38class AggregatesAggregate(TypedDict):
39    data: Dict[str, Union[int, float]]
40    tag_id: int
41    value_type: str
42
43class GroupAggregateOfAggregates(TypedDict):
44    end_time: int
45    start_time: int
46    value: Dict[str, Union[int, float]]
47
48class AggregatesGroupAggregate(TypedDict):
49    data: List[GroupAggregateOfAggregates]
50    tag_id: int
51    value_type: str
52
53AggregateEvents = Union[EventsAggregate, EventsGroupAggregate]
54EventsResult = Union[AggregateEvents, Events]
55
56AggregateAggregates = Union[AggregatesAggregate, AggregatesGroupAggregate]
57AggregatesResult = Union[AggregateAggregates, Aggregates]
58
59SelectionResult = Union[AggregatesResult, EventsResult]

Пример расчета минимума и максимума за последние 30 секунд:

1import datetime
2from coiiot_sdk import operational_storage as op_store, context, user_logs
3
4logger = user_logs.get_logger()
5ctx = context.current()
6
7to_ts = datetime.datetime.now()
8from_ts = to_ts - datetime.timedelta(seconds=30)
9r = op_store.select(ctx.tag, from_ts, to_ts).calc_max_as("max").calc_min_as("min").execute()
10logger.info(f'{ r.data["max"] }')
11logger.info(f'{ r.data["min"] }')

В этом примере значения группируются в периоды по 5 секунд и в этих периодах происходит расчет значений агрегатов:

1import datetime
2from coiiot_sdk import operational_storage as op_store, context, user_logs
3
4logger = user_logs.get_logger()
5ctx = context.current()
6
7to_ts = datetime.datetime.now()
8from_ts = to_ts - datetime.timedelta(seconds=30)
9r = op_store.select(ctx.tag, from_ts, to_ts).with_period("5s").calc_count_as("count").execute()
10
11logger.info(f'{ r.data[0].value["count"] }') # количество в первом периоде
12logger.info(f'{ r.data[1].value["count"] }') # количество во втором периоде и т.д.

Можно использовать сортировку результатов выборки через метод order_by(**kwargs). Пока что поддерживается сортировка только по полю time.

Пример ниже показывает, как получить последние два значения в порядке «с конца» за указанный период:

1import datetime
2from coiiot_sdk import operational_storage as op_store, context, user_logs
3
4logger = user_logs.get_logger()
5ctx = context.current()
6
7from_ts = datetime.datetime(2021, 1, 1)
8to_ts = datetime.datetime.now()
9r = op_store.select(ctx.tag, from_ts, to_ts).with_limit(2).order_by(time="desc").execute()
10logger.info(str([rec.value for rec in r.data]))