-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathauth.py
More file actions
170 lines (143 loc) · 5.77 KB
/
auth.py
File metadata and controls
170 lines (143 loc) · 5.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import requests
import json
import warnings
from requests.utils import cookiejar_from_dict, default_headers
import pandas as pd
try:
import importlib.resources as pkg_resources
except ImportError:
# Try backported to PY<37 `importlib_resources`.
import importlib_resources as pkg_resources
from smsdk import config
from smsdk.utils import get_url
from smsdk.ma_session import MaSession
# # Load config for common api endpoints
ENDPOINTS = json.loads(pkg_resources.read_text(config, "api_endpoints.json"))
RESOURCE_CONFIG = json.loads(pkg_resources.read_text(config, "message_config.json"))
#
KEYWORDS = RESOURCE_CONFIG["keywords"]
SM_AUTH_HEADER_SECRET_ID = RESOURCE_CONFIG["auth_header-api-secret"]
SM_AUTH_HEADER_SECRET_ID_OLD = RESOURCE_CONFIG["auth_header-api-secret_old"]
SM_AUTH_HEADER_KEY_ID = RESOURCE_CONFIG["auth_header-api-key"]
X_SM_DB_SCHEMA = RESOURCE_CONFIG["x_sm_db_schema"]
class Authenticator(MaSession):
"""
Provide access to multiple authentication methods of the platform. This class
is not meant to be used outside of the Client object.
"""
def __init__(self, client):
"""
Initialize the authentication module.
:param client: The Client object that will interface with the Auth module.
:type client: :class:`Client`
"""
super(Authenticator, self).__init__()
# Setup session and store host
self.requests = requests
self.host = get_url(
client.config["protocol"], client.tenant, client.config["site.domain"]
)
self.session.headers = default_headers()
def _auth_basic(self, email=None, password=None):
"""
Authenticate by sending email and password combo.
:param email: The email associated with the user's account.
:type email: :class:`string`
:param password: The password associated with the user's account.
:type password: :class:`string`
"""
success = False
if not email or not password:
return success
payload = {
"username": email,
"email": email,
"password": password,
"remember": "yes",
}
url = "{}{}".format(self.host, ENDPOINTS["Auth"]["url"])
self.session.headers = default_headers()
resp = self.session.post(url, data=payload)
if not resp.ok:
raise RuntimeError("Failed login attempt to {}.".format(url))
elif KEYWORDS["KEYWORD_MISSING_ACCOUNT"] in resp.text:
raise RuntimeError(
"Failed login attempt to {}. Account not found.".format(url)
)
elif KEYWORDS["KEYWORD_INCORRECT_PASSWORD"] in resp.text:
raise RuntimeError(
"Failed login attempt to {}. Password does not match.".format(url)
)
elif KEYWORDS["KEYWORD_REQUIRES_SSO"] in resp.text:
raise RuntimeError(
"Failed login attempt to {}. Requires SSO authentication.".format(url)
)
else:
success = True
return success
def _auth_apikey(self, secret_id, key_id):
"""
Authenticate by sending an API key.
:param key: The API key associated with the user's account.
:type key: :class:`string`
"""
success = False
if not secret_id or not key_id:
return success
self.session.headers = self.get_json_headers()
self.session.headers.update({SM_AUTH_HEADER_SECRET_ID: secret_id})
self.session.headers.update({SM_AUTH_HEADER_SECRET_ID_OLD: secret_id}) #add v0/v1 compat
self.session.headers.update({SM_AUTH_HEADER_KEY_ID: key_id})
if not self.check_auth():
raise RuntimeError(
"Failed login attempt to {}. Invalid secret or key".format(self.host)
)
else:
success = True
return success
def login(self, method=None, **kwargs):
"""
Authenticate to the client by passing a method and any protocol
specific arguments.
:param method: The protocol that will be used to authenticate with the server.
:type method: :class:`string`
"""
success = False
if method == "basic":
success = self._auth_basic(**kwargs)
elif method == "apikey":
success = self._auth_apikey(**kwargs)
elif method is None:
try:
success = self._auth_apikey(**kwargs)
except Exception: # pylint:disable=broad-except
warnings.warn("Could not auto-connect using `apikey` method")
if success is False:
try:
success = self._auth_basic(**kwargs)
except Exception: # pylint:disable=broad-except
warnings.warn("Could not auto-connect using `basic` method")
else:
raise RuntimeError("Invalid login method: {}".format(method))
return success
def logout(self):
"""
Unauthenticate from the client and destroy credential cache.
"""
try:
self.session.cookies = cookiejar_from_dict({})
self.session.headers = default_headers()
return True
except Exception: # pylint:disable=broad-except
return False
def check_auth(self):
"""
Determine if SDK has access to the client by checking the Machine API.
"""
try:
url = "{}{}".format(self.host, ENDPOINTS["Machine"]["url_v1"])
resp = self._get_records_mongo_v1(url, limit=1, select=["id"])
resp = list(pd.concat(list(resp)).reset_index().values)
return isinstance(resp, list) and "error" not in resp
except Exception: # pylint:disable=broad-except
return False