Skip to content

Commit

Permalink
Merge pull request #179 from zsichina/profile_id_credential_is_requir…
Browse files Browse the repository at this point in the history
…ed_to_list_profiles

Feat: make profile_id credential optional in list profiles
denisneuf authored Dec 15, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents 6ad6880 + 53cc9c4 commit fd6a2d8
Showing 3 changed files with 35 additions and 22 deletions.
2 changes: 1 addition & 1 deletion ad_api/auth/credentials.py
Original file line number Diff line number Diff line change
@@ -3,4 +3,4 @@ def __init__(self, credentials):
self.client_id = credentials['client_id']
self.client_secret = credentials['client_secret']
self.refresh_token = credentials['refresh_token']
self.profile_id = credentials['profile_id']
self.profile_id = credentials.get('profile_id')
9 changes: 6 additions & 3 deletions ad_api/base/client.py
Original file line number Diff line number Diff line change
@@ -33,8 +33,9 @@ def __init__(
timeout=None,
debug=False,
access_token=None,
verify_additional_credentials=True,
):
self.credentials = CredentialProvider(account, credentials).credentials
self.credentials = CredentialProvider(account, credentials, verify_additional_credentials).credentials
self._auth = AccessTokenClient(
credentials=self.credentials,
proxies=proxies,
@@ -53,13 +54,15 @@ def __init__(

@property
def headers(self):
return {
data = {
'User-Agent': self.user_agent,
'Amazon-Advertising-API-ClientId': self.credentials['client_id'],
'Authorization': 'Bearer %s' % self.auth.access_token,
'Amazon-Advertising-API-Scope': self.credentials['profile_id'],
'Content-Type': 'application/json',
}
if profile_id := self.credentials.get('profile_id'):
data['Amazon-Advertising-API-Scope'] = profile_id
return data

@property
def auth(self) -> AccessTokenResponse:
46 changes: 28 additions & 18 deletions ad_api/base/credential_provider.py
Original file line number Diff line number Diff line change
@@ -7,7 +7,14 @@
import logging

logger = logging.getLogger(__name__)
required_credentials = ['refresh_token', 'client_id', 'client_secret', 'profile_id']
REQUIRED_CREDENTIALS = [
'refresh_token',
'client_id',
'client_secret',
]
ADDITIONAL_CREDENTIALS = [
'profile_id'
]


class MissingCredentials(Exception):
@@ -23,17 +30,20 @@ def __init__(self, credentials: dict or str, *args, **kwargs):
self.credentials = credentials

@abc.abstractmethod
def load_credentials(self):
def load_credentials(self, verify_additional_credentials):
pass

def _get_env(self, key):
return os.environ.get(f'{key}', os.environ.get(key))

def check_credentials(self):
def check_credentials(self, verify_additional_credentials):
creds_to_check = REQUIRED_CREDENTIALS[:]
if verify_additional_credentials:
creds_to_check += ADDITIONAL_CREDENTIALS
try:
self.errors = [c for c in required_credentials if c not in self.credentials.keys() or not self.credentials[c]]
self.errors = [c for c in creds_to_check if c not in self.credentials.keys() or not self.credentials[c]]
except (AttributeError, TypeError):
raise MissingCredentials(f'Credentials are missing: {", ".join(required_credentials)}')
raise MissingCredentials(f'Credentials are missing: {", ".join(creds_to_check)}')
if not len(self.errors):
return self.credentials
raise MissingCredentials(f'Credentials are missing: {", ".join(self.errors)}')
@@ -43,15 +53,15 @@ class FromEnvCredentialProvider(BaseCredentialProvider):
def __init__(self, *args, **kwargs):
pass

def load_credentials(self):
def load_credentials(self, verify_additional_credentials):
account_data = dict(
refresh_token=self._get_env('AD_API_REFRESH_TOKEN'),
client_id=self._get_env('AD_API_CLIENT_ID'),
client_secret=self._get_env('AD_API_CLIENT_SECRET'),
profile_id=self._get_env('AD_API_PROFILE_ID'),
)
self.credentials = account_data
self.check_credentials()
self.check_credentials(verify_additional_credentials)
return self.credentials

def _get_env(self, key):
@@ -62,8 +72,8 @@ class FromCodeCredentialProvider(BaseCredentialProvider):
def __init__(self, credentials: dict, *args, **kwargs):
super().__init__(credentials)

def load_credentials(self):
self.check_credentials()
def load_credentials(self, verify_additional_credentials):
self.check_credentials(verify_additional_credentials)
return self.credentials


@@ -73,14 +83,14 @@ class FromConfigFileCredentialProvider(BaseCredentialProvider):
def __init__(self, account: str, *args, **kwargs):
self.account = account

def load_credentials(self):
def load_credentials(self, verify_additional_credentials):
try:
config = confuse.Configuration('python-ad-api')
config_filename = os.path.join(config.config_dir(), self.file)
config.set_file(config_filename)
account_data = config[self.account].get()
super().__init__(account_data)
self.check_credentials()
self.check_credentials(verify_additional_credentials)
return account_data

except confuse.exceptions.NotFoundError:
@@ -94,7 +104,7 @@ def load_credentials(self):


class CredentialProvider:
def load_credentials(self):
def load_credentials(self, verify_additional_credentials):
pass

def _get_env(self, key):
@@ -104,30 +114,30 @@ def __init__(
self,
account: str = 'default',
credentials: Optional[Dict[str, str]] = None,
verify_additional_credentials: bool = True,
):
client_id_env = (self._get_env('AD_API_CLIENT_ID'),)
client_id_env = self._get_env('AD_API_CLIENT_ID')
client_secret_env = self._get_env('AD_API_CLIENT_SECRET')
refresh_token_env = self._get_env('AD_API_REFRESH_TOKEN')
profile_id_env = self._get_env('AD_API_PROFILE_ID')

if client_id_env is not None and client_secret_env is not None and refresh_token_env is not None and profile_id_env is not None:
if client_id_env is not None and client_secret_env is not None and refresh_token_env is not None:
try:
self.credentials = FromEnvCredentialProvider().load_credentials()
self.credentials = FromEnvCredentialProvider().load_credentials(verify_additional_credentials)
except MissingCredentials:
logging.error("MissingCredentials")
logging.error(MissingCredentials)

elif isinstance(credentials, dict):
try:
self.credentials = FromCodeCredentialProvider(credentials).load_credentials()
self.credentials = FromCodeCredentialProvider(credentials).load_credentials(verify_additional_credentials)

except MissingCredentials:
logging.error("MissingCredentials")
logging.error(MissingCredentials)

else:
try:
self.credentials = FromConfigFileCredentialProvider(account).load_credentials()
self.credentials = FromConfigFileCredentialProvider(account).load_credentials(verify_additional_credentials)

except MissingCredentials:
logging.error("MissingCredentials")

0 comments on commit fd6a2d8

Please sign in to comment.