long_term_storage
Рассмотрим работу с хранилищем Long Term Storage
. Работа с долговременным хранилищем очень похожа на работу с Operational Storage
.
Модуль coiiot_sdk.long_term_storage
определяет следующие функции и классы ошибок:
1class BadParamsError(Exception): 2 pass 3 4class NotFoundError(Exception): 5 pass 6 7class SelectionError(Exception): 8 pass 9 10class InsertionError(Exception): 11 pass 12 13class UpsertionError(Exception): 14 pass 15 16class SQLError(Exception): 17 pass 18 19 20def insert() -> Insert: 21 pass 22 23def upsert() -> Upsert: 24 pass 25 26def select_latest(tag: Tag) -> SelectLatest: 27 pass 28 29def select_current(tag: Tag) -> SelectCurrent: 30 pass 31 32def select(tag: Tag, from_ts: datetime, to_ts: datetime) -> Select: 33 pass
У Long Term Storage
есть одно явное отличие от Operational Storage:
в select(tag, from_ts, to_ts)
поддерживается периодизация только по следующим периодам:
1def with_period_minute(self) -> 'Select': 2 pass 3 4def with_period_hour(self) -> 'Select': 5 pass 6 7def with_period_day(self) -> 'Select': 8 pass 9 10def with_period_month(self) -> 'Select': 11 pass 12 13def with_period_year(self) -> 'Select': 14 pass
Вызов insert()
возвращает объект Insert
со следующим интерфейсом:
1class InsertT: 2 def value(self, tag_id: int, value: Any, timestamp: datetime): 3 pass 4 5 def msg(self, msg: MessageT): 6 pass
Метод value(tag_id, value, timestamp)
предназначен для вставки конкретного значения. Для вставки значения из сообщения можно использовать метод msg(message)
:
1from coiiot_sdk import long_term_storage as lt_store, context 2 3ctx = context.current() 4 5# сохраняем полученное сообщение в хранилище 6lt_store.insert().msg(ctx.msg)
Здесь все аналогично работе с Operational Storage
. Вызов select_latest(tag)
возвращает объект класса SelectLatest
с единственным методом execute()
.
Пример получения последнего значения текущего тега:
1from coiiot_sdk import long_term_storage as lt_store, context, user_logs 2 3logger = user_logs.get_logger() 4ctx = context.current() 5 6latest = lt_store.select_latest(ctx.tag).execute() 7logger.info(f'{ latest.data.value }')
Вызов upsert()
возвращает объект Upsert
со следующим интерфейсом:
1class UpsertT: 2 def value(self, tag_id: int, value: Any, timestamp: datetime): 3 pass 4 5 def msg(self, msg: MessageT): 6 pass
Метод value(tag_id, value, timestamp)
предназначен для вставки с обновлением конкретного значения.
Для вставки с обновлением значения из сообщения можно использовать метод msg(message)
:
1from coiiot_sdk import long_term_storage as lt_store, context 2 3ctx = context.current() 4 5# сохраняем полученное сообщение в хранилище 6lt_store.upsert().msg(ctx.msg)
Здесь все аналогично работе с Operational Storage
. Вызов select_current(tag)
возвращает объект класса SelectCurrent
с единственным методом execute()
.
Пример получения текущего значения текущего тега типа aggregate
:
1from coiiot_sdk import long_term_storage as lt_store, context, user_logs 2 3logger = user_logs.get_logger() 4ctx = context.current() 5 6current = lt_store.select_current(ctx.tag).execute() 7logger.info(f'{ current.data.value }')
Аналогично работе с Operational Storage
, даже класс SelectionResult
имеет такое же определение.
Приведем лишь пример агрегации по периоду в одну минуту в Long Term Storage
:
1import datetime 2from coiiot_sdk import long_term_storage as lt_store, context, user_logs 3 4logger = user_logs.get_logger() 5ctx = context.current() 6 7to_ts = datetime.datetime.now() 8from_ts = to_ts - datetime.timedelta(minutes=30) 9r = lt_store.select(ctx.tag, from_ts, to_ts).with_period_minute().calc_count_as("count").execute() 10logger.info(f'{ r.data[0].value["count"] }') # количество в первом периоде 11logger.info(f'{ r.data[1].value["count"] }') # количество во втором периоде и т.д.