isp-maintenance/isp_maintenance/api/base.py

158 lines
4.7 KiB
Python

import sys
import json
import urllib
import requests
from settings.api import INPUT_HOSTNAME, INPUT_PORT, HEADERS
class BaseAPI(object):
def __init__(self, api_url=None, verify_ssl=True):
"""Announces required parameters"""
internal_api_url = f'http://{INPUT_HOSTNAME}:{INPUT_PORT}'
self.API_URL = api_url if api_url else internal_api_url
self.API_VERSION = 'v3'
self.API_DEFINITION = 'api'
self.HEADERS = HEADERS
self.VERIFY_SSL = verify_ssl
def _gen_request_url(self, url):
return f'{self.API_URL}/{self.API_DEFINITION}/{self.API_VERSION}{url}'
def call_api(self, url, method='GET', headers={}, data={}):
# Open session
with requests.Session() as session:
try:
url = self._gen_request_url(url)
headers = self.HEADERS if not headers else headers
params_str = urllib.parse.urlencode(data, safe="+'()")
if method == 'POST':
api_request = session.post(
url=url,
json=data,
headers=headers,
verify=self.VERIFY_SSL
)
if method == 'GET':
url = f'{url}?{params_str}' if params_str else url
api_request = session.get(
url=url,
headers=headers,
verify=self.VERIFY_SSL
)
except Exception as error:
api_request = {
'result': False,
'error': type(error).__name__
}
return api_request
finally:
session.close()
# Get response
try:
response = json.loads(api_request.text)
if 'error' in response and response['error']:
print(response['error'])
raise sys.exit()
return response
except json.decoder.JSONDecodeError:
response = {'error': 'Can not parse response'}
print(response)
raise sys.exit()
class BaseAuthAPI(BaseAPI):
def __init__(self, api_url=None, verify_ssl=True):
"""
Init class for /auth/v4 requests
Args:
api_url (str, optional): url api host. Defaults to None.
If None will use from settings.api.INPUT_HOSTNAME:INPUT_PORT
verify_ssl (bool, optional): Isn't recommended. Defaults to True.
Use only for testing if you don't have root sign SSL cert.
"""
super().__init__(api_url, verify_ssl)
self.API_VERSION = 'v4'
self.API_DEFINITION = 'auth'
def get_auth_token(self, email: str, password: str) -> dict:
"""
Get auth token for authentication
Arg:
email (str): user email
password (str): user password
Returns:
response (dict): {
"confirmed": true,
"expires_at": "date time",
"id": "int",
"token": "str"
}
"""
return self.call_api(
url='/public/token',
method='POST',
data={'email': email, 'password': password}
)
def get_auth_key(self, token: str, user, auth_type='Internal') -> dict:
"""
Key authentication
Args:
token (str): auth token
user (str or int): user id or email
auth_type (str, optional): May be Public for public auth.
Defaults to 'Internal'.
Returns:
response (dict): {
"id": "int",
"key": "str"
}
"""
headers = {}
if auth_type == 'Public':
headers = self.make_auth_header(token)
return self.call_api(
url=f'/user/{user}/key',
method='POST',
headers=headers
)
def make_auth_header(self, token: str) -> dict:
"""
Generate dict for auth header
Args:
token (str): auth token
Returns:
dict: {'x-xsrf-token': token} use it for request headers
"""
return {'x-xsrf-token': token}
def whoami(self, token: str) -> dict:
return self.call_api(
url='/whoami',
method='GET',
headers=self.make_auth_header(token)
)
class BaseIpAPI(BaseAPI):
def __init__(self, api_url=None, verify_ssl=True):
super().__init__(api_url, verify_ssl)
self.API_DEFINITION = 'ip'
class BaseDnsProxyAPI(BaseAPI):
def __init__(self, api_url=None, verify_ssl=True):
super().__init__(api_url, verify_ssl)
self.API_DEFINITION = 'dnsproxy'