-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Add lru caching * Add cache settings * Use lru cache and clear the cache on environment update * Update default settings * revert dependency additions * Moderate refactoring * Add tests for caching implementation * Update docstring * Minor tweaks * Re-compile requirements * Linting & formatting fixes * Added TODO
- Loading branch information
1 parent
86f8617
commit 990b6f4
Showing
15 changed files
with
559 additions
and
247 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,3 +10,4 @@ pytest-asyncio | |
pytest-mock | ||
reorder-python-imports | ||
certifi | ||
pytest-freezegun |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,47 +1,57 @@ | ||
import logging | ||
from datetime import datetime | ||
|
||
import httpx | ||
import orjson | ||
|
||
from .exceptions import FlagsmithUnknownKeyError | ||
from .settings import Settings | ||
import typing | ||
from abc import ABC | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class CacheService: | ||
def __init__(self, settings: Settings): | ||
self.settings = settings | ||
class BaseEnvironmentsCache(ABC): | ||
def __init__(self, *args, **kwargs): | ||
self.last_updated_at = None | ||
|
||
def put_environment( | ||
self, | ||
environment_api_key: str, | ||
environment_document: typing.Dict[str, typing.Any], | ||
) -> bool: | ||
""" | ||
Update the environment cache for the given key with the given environment document. | ||
Returns a boolean confirming if the cache was updated or not (i.e. if the environment document | ||
was different from the one already in the cache). | ||
""" | ||
# TODO: can we use the environment header here instead of comparing the document? | ||
if environment_document != self.get_environment(environment_api_key): | ||
self._put_environment(environment_api_key, environment_document) | ||
return True | ||
return False | ||
|
||
def _put_environment( | ||
self, | ||
environment_api_key: str, | ||
environment_document: typing.Dict[str, typing.Any], | ||
) -> None: | ||
raise NotImplementedError() | ||
|
||
def get_environment( | ||
self, environment_api_key: str | ||
) -> typing.Dict[str, typing.Any] | None: | ||
raise NotImplementedError() | ||
|
||
|
||
class LocalMemEnvironmentsCache(BaseEnvironmentsCache): | ||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
self._cache = {} | ||
self._client = httpx.AsyncClient(timeout=settings.api_poll_timeout) | ||
|
||
async def fetch_document(self, server_side_key): | ||
response = await self._client.get( | ||
url=f"{self.settings.api_url}/environment-document/", | ||
headers={"X-Environment-Key": server_side_key}, | ||
) | ||
response.raise_for_status() | ||
return orjson.loads(response.text) | ||
|
||
async def refresh(self): | ||
received_error = False | ||
for key_pair in self.settings.environment_key_pairs: | ||
try: | ||
self._cache[key_pair.client_side_key] = await self.fetch_document( | ||
key_pair.server_side_key | ||
) | ||
except (httpx.HTTPError, orjson.JSONDecodeError): | ||
received_error = True | ||
logger.exception( | ||
f"Failed to fetch document for {key_pair.client_side_key}" | ||
) | ||
if not received_error: | ||
self.last_updated_at = datetime.now() | ||
|
||
def get_environment(self, client_side_key): | ||
try: | ||
return self._cache[client_side_key] | ||
except KeyError: | ||
raise FlagsmithUnknownKeyError(client_side_key) | ||
|
||
def _put_environment( | ||
self, | ||
environment_api_key: str, | ||
environment_document: typing.Dict[str, typing.Any], | ||
) -> None: | ||
self._cache[environment_api_key] = environment_document | ||
|
||
def get_environment( | ||
self, environment_api_key | ||
) -> typing.Dict[str, typing.Any] | None: | ||
return self._cache.get(environment_api_key) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
import logging | ||
import typing | ||
from datetime import datetime | ||
from functools import lru_cache | ||
|
||
import httpx | ||
from fastapi.responses import ORJSONResponse | ||
from flag_engine.engine import ( | ||
get_environment_feature_state, | ||
get_environment_feature_states, | ||
get_identity_feature_states, | ||
) | ||
from flag_engine.environments.builders import build_environment_model | ||
from flag_engine.identities.models import IdentityModel | ||
from orjson import orjson | ||
|
||
from src.cache import BaseEnvironmentsCache, LocalMemEnvironmentsCache | ||
from src.exceptions import FlagsmithUnknownKeyError | ||
from src.feature_utils import filter_out_server_key_only_feature_states | ||
from src.mappers import ( | ||
map_feature_state_to_response_data, | ||
map_feature_states_to_response_data, | ||
map_traits_to_response_data, | ||
) | ||
from src.models import IdentityWithTraits | ||
from src.settings import Settings | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class EnvironmentService: | ||
def __init__( | ||
self, | ||
cache: BaseEnvironmentsCache = None, | ||
client: httpx.AsyncClient = None, | ||
settings: Settings = None, | ||
): | ||
self.cache = cache or LocalMemEnvironmentsCache() | ||
self.settings = settings or Settings() | ||
self._client = client or httpx.AsyncClient(timeout=settings.api_poll_timeout) | ||
self.last_updated_at = None | ||
|
||
if settings.endpoint_caches: | ||
if settings.endpoint_caches.flags.use_cache: | ||
self.get_flags_response_data = lru_cache( | ||
maxsize=settings.endpoint_caches.flags.cache_max_size, | ||
)(self.get_flags_response_data) | ||
|
||
if settings.endpoint_caches.identities.use_cache: | ||
self.get_identity_response_data = lru_cache( | ||
maxsize=settings.endpoint_caches.identities.cache_max_size, | ||
)(self.get_identity_response_data) | ||
|
||
async def refresh_environment_caches(self): | ||
received_error = False | ||
for key_pair in self.settings.environment_key_pairs: | ||
try: | ||
environment_document = await self._fetch_document( | ||
key_pair.server_side_key | ||
) | ||
if self.cache.put_environment( | ||
environment_api_key=key_pair.client_side_key, | ||
environment_document=environment_document, | ||
): | ||
await self._clear_endpoint_caches() | ||
except (httpx.HTTPError, orjson.JSONDecodeError): | ||
logger.exception( | ||
f"Failed to fetch document for {key_pair.client_side_key}" | ||
) | ||
received_error = True | ||
if not received_error: | ||
self.last_updated_at = datetime.now() | ||
|
||
def get_flags_response_data( | ||
self, environment_key: str, feature: str = None | ||
) -> ORJSONResponse: | ||
environment_document = self.get_environment(environment_key) | ||
environment = build_environment_model(environment_document) | ||
|
||
if feature: | ||
feature_state = get_environment_feature_state(environment, feature) | ||
|
||
if not filter_out_server_key_only_feature_states( | ||
feature_states=[feature_state], | ||
environment=environment, | ||
): | ||
return ORJSONResponse( | ||
status_code=404, | ||
content={ | ||
"status": "not_found", | ||
"message": f"feature '{feature}' not found", | ||
}, | ||
) | ||
|
||
data = map_feature_state_to_response_data(feature_state) | ||
|
||
else: | ||
feature_states = filter_out_server_key_only_feature_states( | ||
feature_states=get_environment_feature_states(environment), | ||
environment=environment, | ||
) | ||
data = map_feature_states_to_response_data(feature_states) | ||
|
||
return ORJSONResponse(data) | ||
|
||
def get_identity_response_data( | ||
self, input_data: IdentityWithTraits, environment_key: str | ||
) -> ORJSONResponse: | ||
environment_document = self.get_environment(environment_key) | ||
environment = build_environment_model(environment_document) | ||
identity = IdentityModel( | ||
identifier=input_data.identifier, environment_api_key=environment_key | ||
) | ||
trait_models = input_data.traits | ||
flags = filter_out_server_key_only_feature_states( | ||
feature_states=get_identity_feature_states( | ||
environment, | ||
identity, | ||
override_traits=trait_models, | ||
), | ||
environment=environment, | ||
) | ||
data = { | ||
"traits": map_traits_to_response_data(trait_models), | ||
"flags": map_feature_states_to_response_data( | ||
flags, | ||
identity_hash_key=identity.composite_key, | ||
), | ||
} | ||
return ORJSONResponse(data) | ||
|
||
def get_environment(self, client_side_key: str) -> dict[str, typing.Any]: | ||
if environment_document := self.cache.get_environment(client_side_key): | ||
return environment_document | ||
raise FlagsmithUnknownKeyError(client_side_key) | ||
|
||
async def _fetch_document(self, server_side_key: str) -> dict[str, typing.Any]: | ||
response = await self._client.get( | ||
url=f"{self.settings.api_url}/environment-document/", | ||
headers={"X-Environment-Key": server_side_key}, | ||
) | ||
response.raise_for_status() | ||
return orjson.loads(response.text) | ||
|
||
async def _clear_endpoint_caches(self): | ||
for func in (self.get_identity_response_data, self.get_flags_response_data): | ||
try: | ||
func.cache_clear() | ||
except AttributeError: | ||
pass |
File renamed without changes.
Oops, something went wrong.