VK Cloud logo

long_term_storage

Описание

Рассмотрим работу с хранилищем Long Term Storage. Работа с долговременным хранилищем очень похожа на работу с Operational Storage.

Модуль coiiot_sdk.long_term_storage определяет следующие функции и классы ошибок:

1class BadParamsError(Exception):
2    pass
3
4class NotFoundError(Exception):
5    pass
6
7class SelectionError(Exception):
8    pass
9
10class InsertionError(Exception):
11    pass
12
13class UpsertionError(Exception):
14    pass
15
16class SQLError(Exception):
17    pass
18
19
20def insert() -> Insert:
21	pass
22
23def upsert() -> Upsert:
24	pass
25
26def select_latest(tag: Tag) -> SelectLatest:
27	pass
28
29def select_current(tag: Tag) -> SelectCurrent:
30	pass
31
32def select(tag: Tag, from_ts: datetime, to_ts: datetime) -> Select:
33	pass

У Long Term Storage есть одно явное отличие от Operational Storage: в select(tag, from_ts, to_ts) поддерживается периодизация только по следующим периодам:

1def with_period_minute(self) -> 'Select':
2	pass
3
4def with_period_hour(self) -> 'Select':
5	pass
6
7def with_period_day(self) -> 'Select':
8	pass
9
10def with_period_month(self) -> 'Select':
11	pass
12
13def with_period_year(self) -> 'Select':
14	pass

Event

Вызов insert() возвращает объект Insert со следующим интерфейсом:

1class InsertT:
2    def value(self, tag_id: int, value: Any, timestamp: datetime):
3		pass
4
5    def msg(self, msg: MessageT):
6		pass

Метод value(tag_id, value, timestamp) предназначен для вставки конкретного значения. Для вставки значения из сообщения можно использовать метод msg(message):

1from coiiot_sdk import long_term_storage as lt_store, context
2
3ctx = context.current()
4
5# сохраняем полученное сообщение в хранилище
6lt_store.insert().msg(ctx.msg)

Получение последнего значения тега

Здесь все аналогично работе с Operational Storage. Вызов select_latest(tag) возвращает объект класса SelectLatest с единственным методом execute(). Пример получения последнего значения текущего тега:

1from coiiot_sdk import long_term_storage as lt_store, context, user_logs
2
3logger = user_logs.get_logger()
4ctx = context.current()
5
6latest = lt_store.select_latest(ctx.tag).execute()
7logger.info(f'{ latest.data.value }')

Aggregate

Вызов upsert() возвращает объект Upsert со следующим интерфейсом:

1class UpsertT:
2    def value(self, tag_id: int, value: Any, timestamp: datetime):
3		pass
4
5    def msg(self, msg: MessageT):
6		pass

Метод value(tag_id, value, timestamp) предназначен для вставки с обновлением конкретного значения.

Для вставки с обновлением значения из сообщения можно использовать метод msg(message):

1from coiiot_sdk import long_term_storage as lt_store, context
2
3ctx = context.current()
4
5# сохраняем полученное сообщение в хранилище
6lt_store.upsert().msg(ctx.msg)

Получение текущего значения тега

Здесь все аналогично работе с Operational Storage. Вызов select_current(tag) возвращает объект класса SelectCurrent с единственным методом execute().

Пример получения текущего значения текущего тега типа aggregate:

1from coiiot_sdk import long_term_storage as lt_store, context, user_logs
2
3logger = user_logs.get_logger()
4ctx = context.current()
5
6current = lt_store.select_current(ctx.tag).execute()
7logger.info(f'{ current.data.value }')

Получение значений из Long Term Storage

Аналогично работе с Operational Storage, даже класс SelectionResult имеет такое же определение.

Приведем лишь пример агрегации по периоду в одну минуту в Long Term Storage:

1import datetime
2from coiiot_sdk import long_term_storage as lt_store, context, user_logs
3
4logger = user_logs.get_logger()
5ctx = context.current()
6
7to_ts = datetime.datetime.now()
8from_ts = to_ts - datetime.timedelta(minutes=30)
9r = lt_store.select(ctx.tag, from_ts, to_ts).with_period_minute().calc_count_as("count").execute()
10logger.info(f'{ r.data[0].value["count"] }') # количество в первом периоде
11logger.info(f'{ r.data[1].value["count"] }') # количество во втором периоде и т.д.