import sys import json import click import urllib import requests from time import sleep from mgrctl.settings.api import ( API_INPUT_PORT, API_URL, API_HEADERS, API_EMAIL, API_PASSWORD, API_VERIFY_SSL, API_COUNT_TRY_CONNECTIONS ) from mgrctl.settings.platform import PLATFORM_TYPE class BaseAPI(object): def __init__(self): """Announces required parameters""" self.API_URL = API_URL self.API_HEADERS = API_HEADERS self.API_EMAIL = API_EMAIL self.API_PASSWORD = API_PASSWORD self.API_VERIFY_SSL = API_VERIFY_SSL self.API_VERSION = 'v3' self.API_DEFINITION = 'api' def _gen_request_url(self, url): return f'{self.API_URL}/{self.API_DEFINITION}/{self.API_VERSION}{url}' def call_api(self, url, method='GET', headers={}, data={}): attempt = API_COUNT_TRY_CONNECTIONS while attempt: attempt -= 1 try: uri = self._gen_request_url(url) headers = self.API_HEADERS if not headers else headers params_str = urllib.parse.urlencode(data, safe="+'()") if method == 'POST': api_request = requests.post( url=uri, json=data, headers=headers, verify=self.API_VERIFY_SSL ) if method == 'GET': uri = f'{uri}?{params_str}' if params_str else uri api_request = requests.get( url=uri, headers=headers, verify=self.API_VERIFY_SSL ) except Exception as error: ConnectionError = requests.exceptions.ConnectionError if type(error) is ConnectionError and PLATFORM_TYPE == 'dci': # ? workaround if new docker version use dashes # TODO: ISPsystem developers must set container_name !!! self.API_URL = f'http://dci-input-1:{API_INPUT_PORT}' if attempt == 0: click.echo(f'Error: {type(error).__name__}') sys.exit() else: continue else: click.echo(f'Error: {type(error).__name__}') sys.exit() # Get response: response = self._parse_response(api_request) # Validate response: if self._error_handler(response): continue # new attempt connection return response def _parse_response(self, api_request): try: response = json.loads(api_request.text) return response except json.decoder.JSONDecodeError: click.echo('Error: Invalid API response') raise sys.exit() def _is_error(self, response): if response.get('error'): return True def _is_error_3004(self, error): if error.get('code') == 3004: sleep(2) # wait 2 second timeout return True def _error_handler(self, response): if self._is_error(response): error = response.get('error') if self._is_error_3004(error): return True else: click.echo(f'Error: API return {response}') raise sys.exit() class BaseAuthAPI(BaseAPI): def __init__(self): super().__init__() self.API_VERSION = 'v4' self.API_DEFINITION = 'auth' def get_auth_token(self, email=None, password=None) -> dict: email = self.API_EMAIL if not email else email password = self.API_PASSWORD if not password else password return self.call_api( url='/public/token', method='POST', data={'email': email, 'password': password} ) def get_auth_key(self, token=None, user=None) -> dict: headers = {} user = self.API_EMAIL if not user else user if token: headers = self.make_auth_header(token) return self.call_api( url=f'/user/{user}/key', method='POST', headers=headers ) def make_auth_header(self, token: str) -> dict: return {'x-xsrf-token': token} def whoami(self) -> dict: return self.call_api( url='/whoami', method='GET' ) class BaseIpAPI(BaseAPI): def __init__(self): super().__init__() self.API_DEFINITION = 'ip' class BaseDnsProxyAPI(BaseAPI): def __init__(self): super().__init__() self.API_DEFINITION = 'dnsproxy'