isp-maintenance/mgrctl/api/base.py

145 lines
4.5 KiB
Python
Raw Normal View History

2024-05-26 16:06:58 +02:00
import sys
import json
import urllib
import requests
from mgrctl.settings.api import INPUT_URL, HEADERS
from mgrctl.settings.platform import (
PLATFORM_TYPE,
PLATFORM_VERIFY_SSL,
PLATFORM_DUMMY,
PLATFORM_DUMMY_VM6_API_URL,
PLATFORM_DUMMY_VM6_EMAIL,
PLATFORM_DUMMY_VM6_PASSWORD,
PLATFORM_DUMMY_VM6_TOKEN,
PLATFORM_DUMMY_DCI6_API_URL,
PLATFORM_DUMMY_DCI6_EMAIL,
PLATFORM_DUMMY_DCI6_PASSWORD,
PLATFORM_DUMMY_DCI6_TOKEN
)
2024-05-26 16:06:58 +02:00
class BaseAPI(object):
def __init__(self):
2024-05-26 16:06:58 +02:00
"""Announces required parameters"""
if PLATFORM_TYPE == 'vm':
if PLATFORM_DUMMY:
self.API_URL = PLATFORM_DUMMY_VM6_API_URL
self.AUTH_TYPE = 'Public'
self.HEADERS = {'x-xsrf-token': PLATFORM_DUMMY_VM6_TOKEN}
self.EMAIL = PLATFORM_DUMMY_VM6_EMAIL
self.PASSWORD = PLATFORM_DUMMY_VM6_PASSWORD
else:
self.API_URL = INPUT_URL
self.AUTH_TYPE = 'Internal'
self.HEADERS = HEADERS
if PLATFORM_TYPE == 'dci':
if PLATFORM_DUMMY:
self.API_URL = PLATFORM_DUMMY_DCI6_API_URL
self.AUTH_TYPE = 'Public'
self.HEADERS = {'x-xsrf-token': PLATFORM_DUMMY_DCI6_TOKEN}
self.EMAIL = PLATFORM_DUMMY_DCI6_EMAIL
self.PASSWORD = PLATFORM_DUMMY_DCI6_PASSWORD
else:
self.API_URL = INPUT_URL
self.AUTH_TYPE = 'Internal'
self.HEADERS = HEADERS
2024-05-26 16:06:58 +02:00
self.API_VERSION = 'v3'
self.API_DEFINITION = 'api'
self.VERIFY_SSL = PLATFORM_VERIFY_SSL
2024-05-26 16:06:58 +02:00
def _gen_request_url(self, url):
return f'{self.API_URL}/{self.API_DEFINITION}/{self.API_VERSION}{url}'
def call_api(self, url, method='GET', headers={}, data={}):
# Open session
with requests.Session() as session:
try:
url = self._gen_request_url(url)
headers = self.HEADERS if not headers else headers
params_str = urllib.parse.urlencode(data, safe="+'()")
if method == 'POST':
api_request = session.post(
url=url,
json=data,
headers=headers,
verify=self.VERIFY_SSL
)
if method == 'GET':
url = f'{url}?{params_str}' if params_str else url
api_request = session.get(
url=url,
headers=headers,
verify=self.VERIFY_SSL
)
except Exception as error:
api_request = {
'result': False,
'error': type(error).__name__
}
return api_request
finally:
session.close()
# Get response
try:
response = json.loads(api_request.text)
if 'error' in response and response['error']:
print(response['error'])
raise sys.exit()
return response
except json.decoder.JSONDecodeError:
response = {'error': 'Can not parse response'}
print(response)
raise sys.exit()
class BaseAuthAPI(BaseAPI):
def __init__(self):
super().__init__()
2024-05-26 16:06:58 +02:00
self.API_VERSION = 'v4'
self.API_DEFINITION = 'auth'
def get_auth_token(self, email=None, password=None) -> dict:
email = self.EMAIL if not email else email
password = self.PASSWORD if not password else password
2024-05-26 16:06:58 +02:00
return self.call_api(
url='/public/token',
method='POST',
data={'email': email, 'password': password}
)
def get_auth_key(self, token=None, user=None) -> dict:
2024-05-26 16:06:58 +02:00
headers = {}
user = self.EMAIL if not user else user
if token:
2024-05-26 16:06:58 +02:00
headers = self.make_auth_header(token)
return self.call_api(
url=f'/user/{user}/key',
method='POST',
headers=headers
)
def make_auth_header(self, token: str) -> dict:
return {'x-xsrf-token': token}
def whoami(self) -> dict:
2024-05-26 16:06:58 +02:00
return self.call_api(
url='/whoami',
method='GET'
2024-05-26 16:06:58 +02:00
)
class BaseIpAPI(BaseAPI):
def __init__(self):
super().__init__()
2024-05-26 16:06:58 +02:00
self.API_DEFINITION = 'ip'
class BaseDnsProxyAPI(BaseAPI):
def __init__(self):
super().__init__()
2024-05-26 16:06:58 +02:00
self.API_DEFINITION = 'dnsproxy'