isp-maintenance/mgrctl/api/base.py

118 lines
3.4 KiB
Python

import sys
import json
import urllib
import requests
from mgrctl.settings.api import (
API_URL,
API_HEADERS,
API_EMAIL,
API_PASSWORD,
API_VERIFY_SSL
)
class BaseAPI(object):
def __init__(self):
"""Announces required parameters"""
self.API_URL = API_URL
self.API_HEADERS = API_HEADERS
self.API_EMAIL = API_EMAIL
self.API_PASSWORD = API_PASSWORD
self.API_VERIFY_SSL = API_VERIFY_SSL
self.API_VERSION = 'v3'
self.API_DEFINITION = 'api'
def _gen_request_url(self, url):
return f'{self.API_URL}/{self.API_DEFINITION}/{self.API_VERSION}{url}'
def call_api(self, url, method='GET', headers={}, data={}):
# Open session
with requests.Session() as session:
try:
url = self._gen_request_url(url)
headers = self.API_HEADERS if not headers else headers
params_str = urllib.parse.urlencode(data, safe="+'()")
if method == 'POST':
api_request = session.post(
url=url,
json=data,
headers=headers,
verify=self.API_VERIFY_SSL
)
if method == 'GET':
url = f'{url}?{params_str}' if params_str else url
api_request = session.get(
url=url,
headers=headers,
verify=self.API_VERIFY_SSL
)
except Exception as error:
api_request = {
'result': False,
'error': type(error).__name__
}
return api_request
finally:
session.close()
# Get response
try:
response = json.loads(api_request.text)
if 'error' in response and response['error']:
print(response['error'])
raise sys.exit()
return response
except json.decoder.JSONDecodeError:
response = {'error': 'Can not parse response'}
print(response)
raise sys.exit()
class BaseAuthAPI(BaseAPI):
def __init__(self):
super().__init__()
self.API_VERSION = 'v4'
self.API_DEFINITION = 'auth'
def get_auth_token(self, email=None, password=None) -> dict:
email = self.API_EMAIL if not email else email
password = self.API_PASSWORD if not password else password
return self.call_api(
url='/public/token',
method='POST',
data={'email': email, 'password': password}
)
def get_auth_key(self, token=None, user=None) -> dict:
headers = {}
user = self.API_EMAIL if not user else user
if token:
headers = self.make_auth_header(token)
return self.call_api(
url=f'/user/{user}/key',
method='POST',
headers=headers
)
def make_auth_header(self, token: str) -> dict:
return {'x-xsrf-token': token}
def whoami(self) -> dict:
return self.call_api(
url='/whoami',
method='GET'
)
class BaseIpAPI(BaseAPI):
def __init__(self):
super().__init__()
self.API_DEFINITION = 'ip'
class BaseDnsProxyAPI(BaseAPI):
def __init__(self):
super().__init__()
self.API_DEFINITION = 'dnsproxy'