diff --git a/setup.cfg b/setup.cfg index f44e4be..e7f8d71 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = pybritive -version = 1.6.1rc5 +version = 1.6.1rc6 author = Britive Inc. author_email = support@britive.com description = A pure Python CLI for Britive diff --git a/src/pybritive/britive_cli.py b/src/pybritive/britive_cli.py index 8200e7c..234c9e7 100644 --- a/src/pybritive/britive_cli.py +++ b/src/pybritive/britive_cli.py @@ -21,6 +21,7 @@ from .helpers.split import profile_split from .helpers import cloud_credential_printer as printer from .helpers.cache import Cache +import jwt default_table_format = 'fancy_grid' @@ -84,6 +85,23 @@ def set_credential_manager(self): else: raise click.ClickException(f'invalid credential backend {backend}.') + @staticmethod + def _extract_field_from_jwt(token: str, field: str, verify: bool = False): + try: + return jwt.decode( + token, + # validation of the token will occur on the Britive backend + # so not verifying everything here is okay since we are just + # trying to extract the token expiration time so we can store + # it in the ~/.britive/pybritive.credentials[.encrypted] file + options={ + 'verify_signature': verify, + 'verify_aud': verify + } + )[field] + except Exception: + return None + def login(self, explicit: bool = False, browser: str = None): # explicit means the user called pybritive login, otherwise it is being implicitly called by something else @@ -94,7 +112,7 @@ def login(self, explicit: bool = False, browser: str = None): raise click.ClickException('Interactive login unavailable when an API token is provided.') # taking a very straightforward approach here...if user provided a token and it doesn't work just exit - if self.token: + if self.token: # static token provided or BRITIVE_API_TOKEN set try: self.b = Britive( tenant=self.tenant_name, @@ -109,7 +127,7 @@ def login(self, explicit: bool = False, browser: str = None): pass else: raise e - else: + else: # user is asking for an interactive login or using token stored from an interactive login counter = 1 while True: # will break after we successfully get logged in or 3 attempts have occurred # protect against infinite loop @@ -119,18 +137,25 @@ def login(self, explicit: bool = False, browser: str = None): # attempt login and making an api call to ensure the credentials we have are valid try: self.set_credential_manager() + token = self.credential_manager.get_token() + jti = self._extract_field_from_jwt(token=token, field='jti') + self.debug(f'got token jti of {jti} from credential manager') self.b = Britive( tenant=self.tenant_name, - token=self.credential_manager.get_token(), + token=token, query_features=False ) self.b.my_access.whoami() # this is what may cause UnauthorizedRequest break except exceptions.UnauthorizedRequest as e: if '401 - e0000' in str(e).lower(): - self.print(f'attempt {counter} of 3 - login failed') + self.debug(f'attempt {counter} of 3 - login failed') self.debug(f'login error message was {str(e)}') - self.logout() + + # we know the token is invalid since we got that API response + # so we don't need to actually logout, just clear the token from + # the credentials manager + self._cleanup_credentials() else: raise e finally: diff --git a/src/pybritive/helpers/credentials.py b/src/pybritive/helpers/credentials.py index b15264e..4c52c0f 100644 --- a/src/pybritive/helpers/credentials.py +++ b/src/pybritive/helpers/credentials.py @@ -145,6 +145,23 @@ def perform_interactive_login(self): self.cli.print(f'Authenticated to tenant {self.tenant} via interactive login.') break + @staticmethod + def extract_field_from_jwt(token: str, field: str, verify: bool = False): + try: + return jwt.decode( + token, + # validation of the token will occur on the Britive backend + # so not verifying everything here is okay since we are just + # trying to extract the token expiration time so we can store + # it in the ~/.britive/pybritive.credentials[.encrypted] file + options={ + 'verify_signature': verify, + 'verify_aud': verify + } + )[field] + except Exception: + return None + @staticmethod def _extract_exp_from_jwt(token: str, verify: bool = False, convert_to_ms: bool = False): try: @@ -247,20 +264,22 @@ def _get_token(self): def get_token(self): if not self.has_valid_credentials(): # no credentials or expired creds for the tenant so do interactive login - + self.cli.debug('has_valid_credentials = False') # both methods below write the credentials out and update self.credentials as needed if self.federation_provider: self.perform_federation_provider_authentication() else: self.perform_interactive_login() - return self._get_token() + token = self._get_token() + return token def has_valid_credentials(self): if not self.credentials or self.credentials == {}: self.cli.print(f'Credentials for tenant {self.tenant} not found.') return False if int(time.time() * 1000) <= int(self.credentials.get('safeExpirationTime', 0)): + self.cli.debug('credentials.py::has_valid_credentials - credentials exist and are not expired so are valid') return True self.cli.print(f'Credentials for tenant {self.tenant} have expired.') return False @@ -292,8 +311,11 @@ def save(self, credentials: dict): full_credentials = self.load(full=True) if credentials is None: full_credentials.pop(self.alias, None) + self.credentials = None else: full_credentials[self.alias] = credentials + # effectively a deep copy + self.credentials = json.loads(json.dumps(credentials)) config = configparser.ConfigParser() config.optionxform = str # maintain key case @@ -302,7 +324,13 @@ def save(self, credentials: dict): # write the new credentials file with open(str(self.path), 'w', encoding='utf-8') as f: config.write(f, space_around_delimiters=False) - self.credentials = credentials + + jti = self.extract_field_from_jwt( + token=(self.credentials or {}).get('accessToken'), + verify=False, + field='jti' + ) + self.cli.debug(f'credentials.py::FileCredentialManager::save - set credentials to jwt id {jti}') def delete(self): self.save(None) @@ -353,6 +381,7 @@ def save(self, credentials: dict): full_credentials = self.load(full=True) if credentials is None: full_credentials.pop(self.alias, None) + self.credentials = None else: credentials['accessToken'] = self.encrypt(credentials['accessToken']) full_credentials[self.alias] = credentials @@ -367,5 +396,12 @@ def save(self, credentials: dict): with open(str(self.path), 'w', encoding='utf-8') as f: config.write(f, space_around_delimiters=False) + jti = self.extract_field_from_jwt( + token=(self.credentials or {}).get('accessToken'), + verify=False, + field='jti' + ) + self.cli.debug(f'credentials.py::FileCredentialManager::save - set credentials to jwt id {jti}') + def delete(self): self.save(None)