VK Cloud logo

connectors

Этот модуль содержит функцию get_http_connector(name), которая нужна для получения объекта HTTP-коннектора.

HTTP-коннектор позволяет делать запросы к внешним системам.

1class BadParamsError(Exception):
2    pass
3
4class ConnectorNotFoundError(Exception):
5    pass
6
7class UnknownError(Exception):
8    pass
9
10class NonHTTPConnectorError(Exception):
11    pass
12
13
14def get_http_connector(name: str) -> HTTPConnector:
15	pass

HTTP-коннектор позволяет делать get, post, put, patch, delete запросы к внешним системам:

1Headers = Dict[str, str]
2Params = Dict[str, str]
3
4
5class HTTPResponse:
6	status_code:  int
7	headers:      Headers
8	text:         str
9	json:         Any
10
11class HTTPConnector:
12
13    def get(self,
14            path: Union[str, None] = None,
15            params: Union[Params, None] = None,
16            headers: Union[Headers, None] = None) -> HTTPResponse:
17
18        pass
19
20
21    def post(self,
22             path: Union[str, None] = None,
23             data: Union[Any, None] = None,
24             headers: Union[Headers, None] = None) -> HTTPResponse:
25
26        pass
27
28
29    def put(self,
30            path: Union[str, None] = None,
31            data: Union[Any, None] = None,
32            headers: Union[Headers, None] = None) -> HTTPResponse:
33
34        pass
35
36
37    def patch(self,
38              path: Union[str, None] = None,
39              data: Union[Any, None] = None,
40              headers: Union[Headers, None] = None) -> HTTPResponse:
41
42        pass
43
44
45    def delete(self,
46               path: Union[str, None] = None,
47               headers: Union[Headers, None] = None) -> HTTPResponse:
48
49        pass

Пример использования HTTP-коннектора:

1from coiiot_sdk import connectors, context
2
3connector = connectors.get_http_connector("custom-resource")
4
5ctx = context.current()
6connector.get(path=f"/v1/clients/{ctx.msg.value}/root_tag")