mirror of
https://git.isptech.ru/ISPsystem/isp-maintenance.git
synced 2025-02-01 19:00:52 +01:00
118 lines
3.4 KiB
Python
118 lines
3.4 KiB
Python
import sys
|
|
import json
|
|
import urllib
|
|
import requests
|
|
|
|
from mgrctl.settings.api import (
|
|
API_URL,
|
|
API_HEADERS,
|
|
API_EMAIL,
|
|
API_PASSWORD,
|
|
API_VERIFY_SSL
|
|
)
|
|
|
|
|
|
class BaseAPI(object):
|
|
def __init__(self):
|
|
"""Announces required parameters"""
|
|
self.API_URL = API_URL
|
|
self.API_HEADERS = API_HEADERS
|
|
self.API_EMAIL = API_EMAIL
|
|
self.API_PASSWORD = API_PASSWORD
|
|
self.API_VERIFY_SSL = API_VERIFY_SSL
|
|
self.API_VERSION = 'v3'
|
|
self.API_DEFINITION = 'api'
|
|
|
|
def _gen_request_url(self, url):
|
|
return f'{self.API_URL}/{self.API_DEFINITION}/{self.API_VERSION}{url}'
|
|
|
|
def call_api(self, url, method='GET', headers={}, data={}):
|
|
# Open session
|
|
with requests.Session() as session:
|
|
try:
|
|
url = self._gen_request_url(url)
|
|
headers = self.API_HEADERS if not headers else headers
|
|
params_str = urllib.parse.urlencode(data, safe="+'()")
|
|
if method == 'POST':
|
|
api_request = session.post(
|
|
url=url,
|
|
json=data,
|
|
headers=headers,
|
|
verify=self.API_VERIFY_SSL
|
|
)
|
|
if method == 'GET':
|
|
url = f'{url}?{params_str}' if params_str else url
|
|
api_request = session.get(
|
|
url=url,
|
|
headers=headers,
|
|
verify=self.API_VERIFY_SSL
|
|
)
|
|
except Exception as error:
|
|
api_request = {
|
|
'result': False,
|
|
'error': type(error).__name__
|
|
}
|
|
return api_request
|
|
finally:
|
|
session.close()
|
|
|
|
# Get response
|
|
try:
|
|
response = json.loads(api_request.text)
|
|
if 'error' in response and response['error']:
|
|
print(response['error'])
|
|
raise sys.exit()
|
|
return response
|
|
except json.decoder.JSONDecodeError:
|
|
response = {'error': 'Can not parse response'}
|
|
print(response)
|
|
raise sys.exit()
|
|
|
|
|
|
class BaseAuthAPI(BaseAPI):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.API_VERSION = 'v4'
|
|
self.API_DEFINITION = 'auth'
|
|
|
|
def get_auth_token(self, email=None, password=None) -> dict:
|
|
email = self.API_EMAIL if not email else email
|
|
password = self.API_PASSWORD if not password else password
|
|
return self.call_api(
|
|
url='/public/token',
|
|
method='POST',
|
|
data={'email': email, 'password': password}
|
|
)
|
|
|
|
def get_auth_key(self, token=None, user=None) -> dict:
|
|
headers = {}
|
|
user = self.API_EMAIL if not user else user
|
|
if token:
|
|
headers = self.make_auth_header(token)
|
|
return self.call_api(
|
|
url=f'/user/{user}/key',
|
|
method='POST',
|
|
headers=headers
|
|
)
|
|
|
|
def make_auth_header(self, token: str) -> dict:
|
|
return {'x-xsrf-token': token}
|
|
|
|
def whoami(self) -> dict:
|
|
return self.call_api(
|
|
url='/whoami',
|
|
method='GET'
|
|
)
|
|
|
|
|
|
class BaseIpAPI(BaseAPI):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.API_DEFINITION = 'ip'
|
|
|
|
|
|
class BaseDnsProxyAPI(BaseAPI):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.API_DEFINITION = 'dnsproxy'
|