Skip to content

Commit

Permalink
Merge pull request #130 from britive/develop
Browse files Browse the repository at this point in the history
v1.6.1rc6
  • Loading branch information
twratl authored Dec 18, 2023
2 parents 2d213c5 + 57a1647 commit 16d5e12
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 9 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = pybritive
version = 1.6.1rc5
version = 1.6.1rc6
author = Britive Inc.
author_email = [email protected]
description = A pure Python CLI for Britive
Expand Down
35 changes: 30 additions & 5 deletions src/pybritive/britive_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .helpers.split import profile_split
from .helpers import cloud_credential_printer as printer
from .helpers.cache import Cache
import jwt


default_table_format = 'fancy_grid'
Expand Down Expand Up @@ -84,6 +85,23 @@ def set_credential_manager(self):
else:
raise click.ClickException(f'invalid credential backend {backend}.')

@staticmethod
def _extract_field_from_jwt(token: str, field: str, verify: bool = False):
try:
return jwt.decode(
token,
# validation of the token will occur on the Britive backend
# so not verifying everything here is okay since we are just
# trying to extract the token expiration time so we can store
# it in the ~/.britive/pybritive.credentials[.encrypted] file
options={
'verify_signature': verify,
'verify_aud': verify
}
)[field]
except Exception:
return None

def login(self, explicit: bool = False, browser: str = None):
# explicit means the user called pybritive login, otherwise it is being implicitly called by something else

Expand All @@ -94,7 +112,7 @@ def login(self, explicit: bool = False, browser: str = None):
raise click.ClickException('Interactive login unavailable when an API token is provided.')

# taking a very straightforward approach here...if user provided a token and it doesn't work just exit
if self.token:
if self.token: # static token provided or BRITIVE_API_TOKEN set
try:
self.b = Britive(
tenant=self.tenant_name,
Expand All @@ -109,7 +127,7 @@ def login(self, explicit: bool = False, browser: str = None):
pass
else:
raise e
else:
else: # user is asking for an interactive login or using token stored from an interactive login
counter = 1
while True: # will break after we successfully get logged in or 3 attempts have occurred
# protect against infinite loop
Expand All @@ -119,18 +137,25 @@ def login(self, explicit: bool = False, browser: str = None):
# attempt login and making an api call to ensure the credentials we have are valid
try:
self.set_credential_manager()
token = self.credential_manager.get_token()
jti = self._extract_field_from_jwt(token=token, field='jti')
self.debug(f'got token jti of {jti} from credential manager')
self.b = Britive(
tenant=self.tenant_name,
token=self.credential_manager.get_token(),
token=token,
query_features=False
)
self.b.my_access.whoami() # this is what may cause UnauthorizedRequest
break
except exceptions.UnauthorizedRequest as e:
if '401 - e0000' in str(e).lower():
self.print(f'attempt {counter} of 3 - login failed')
self.debug(f'attempt {counter} of 3 - login failed')
self.debug(f'login error message was {str(e)}')
self.logout()

# we know the token is invalid since we got that API response
# so we don't need to actually logout, just clear the token from
# the credentials manager
self._cleanup_credentials()
else:
raise e
finally:
Expand Down
42 changes: 39 additions & 3 deletions src/pybritive/helpers/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,23 @@ def perform_interactive_login(self):
self.cli.print(f'Authenticated to tenant {self.tenant} via interactive login.')
break

@staticmethod
def extract_field_from_jwt(token: str, field: str, verify: bool = False):
try:
return jwt.decode(
token,
# validation of the token will occur on the Britive backend
# so not verifying everything here is okay since we are just
# trying to extract the token expiration time so we can store
# it in the ~/.britive/pybritive.credentials[.encrypted] file
options={
'verify_signature': verify,
'verify_aud': verify
}
)[field]
except Exception:
return None

@staticmethod
def _extract_exp_from_jwt(token: str, verify: bool = False, convert_to_ms: bool = False):
try:
Expand Down Expand Up @@ -247,20 +264,22 @@ def _get_token(self):

def get_token(self):
if not self.has_valid_credentials(): # no credentials or expired creds for the tenant so do interactive login

self.cli.debug('has_valid_credentials = False')
# both methods below write the credentials out and update self.credentials as needed
if self.federation_provider:
self.perform_federation_provider_authentication()
else:
self.perform_interactive_login()

return self._get_token()
token = self._get_token()
return token

def has_valid_credentials(self):
if not self.credentials or self.credentials == {}:
self.cli.print(f'Credentials for tenant {self.tenant} not found.')
return False
if int(time.time() * 1000) <= int(self.credentials.get('safeExpirationTime', 0)):
self.cli.debug('credentials.py::has_valid_credentials - credentials exist and are not expired so are valid')
return True
self.cli.print(f'Credentials for tenant {self.tenant} have expired.')
return False
Expand Down Expand Up @@ -292,8 +311,11 @@ def save(self, credentials: dict):
full_credentials = self.load(full=True)
if credentials is None:
full_credentials.pop(self.alias, None)
self.credentials = None
else:
full_credentials[self.alias] = credentials
# effectively a deep copy
self.credentials = json.loads(json.dumps(credentials))

config = configparser.ConfigParser()
config.optionxform = str # maintain key case
Expand All @@ -302,7 +324,13 @@ def save(self, credentials: dict):
# write the new credentials file
with open(str(self.path), 'w', encoding='utf-8') as f:
config.write(f, space_around_delimiters=False)
self.credentials = credentials

jti = self.extract_field_from_jwt(
token=(self.credentials or {}).get('accessToken'),
verify=False,
field='jti'
)
self.cli.debug(f'credentials.py::FileCredentialManager::save - set credentials to jwt id {jti}')

def delete(self):
self.save(None)
Expand Down Expand Up @@ -353,6 +381,7 @@ def save(self, credentials: dict):
full_credentials = self.load(full=True)
if credentials is None:
full_credentials.pop(self.alias, None)
self.credentials = None
else:
credentials['accessToken'] = self.encrypt(credentials['accessToken'])
full_credentials[self.alias] = credentials
Expand All @@ -367,5 +396,12 @@ def save(self, credentials: dict):
with open(str(self.path), 'w', encoding='utf-8') as f:
config.write(f, space_around_delimiters=False)

jti = self.extract_field_from_jwt(
token=(self.credentials or {}).get('accessToken'),
verify=False,
field='jti'
)
self.cli.debug(f'credentials.py::FileCredentialManager::save - set credentials to jwt id {jti}')

def delete(self):
self.save(None)

0 comments on commit 16d5e12

Please sign in to comment.