diff --git a/examples/eviction/distributed_eviction.py b/examples/eviction/distributed_eviction.py new file mode 100644 index 00000000..3bcc829b --- /dev/null +++ b/examples/eviction/distributed_eviction.py @@ -0,0 +1,73 @@ +from gptcache import Cache +from gptcache.embedding import Onnx + +from gptcache.manager.eviction import EvictionBase + +from gptcache.manager import get_data_manager, CacheBase, VectorBase, manager_factory + + +def get_data_manager_example(): + """ + This example shows how to create a data manager with a mongo as a scalar storage, faiss vector base, + and redis eviction base. + This type of configuration can be used to scale GPTCache horizontally. + Where keys will be maintained in redis key-value store instead of in-memory. + The eviction of the keys will be handled based on the eviction policy of redis. + """ + onnx = Onnx() + data_manager = get_data_manager(cache_base=CacheBase("mongo", url="mongodb://localhost:27017/"), + vector_base=VectorBase("faiss", dimension=onnx.dimension), + eviction_base=EvictionBase("redis", + maxmemory="100mb", + policy="allkeys-lru", + ttl=100)) + + cache = Cache() + cache.init(data_manager=data_manager) + question = "What is github?" + answer = "Online platform for version control and code collaboration." + embedding = onnx.to_embeddings(question) + cache.import_data([question], [answer], [embedding]) + + +def get_manager_example_redis_only(): + """ + Note: Since, `RedisScalarStorage` can be configured to internally handle the ttl of the keys and their eviction. + In this scenario, `no_op_eviction` is used as the eviction base. It will not add any keys or update their ttls. + + This example shows how to create a data manager with a redis as a scalar storage, as well as eviction base. + This type of configuration can be used to scale GPTCache horizontally. + Where keys will be maintained in redis key-value store instead of in-memory. + The eviction of the keys will be handled based on the eviction policy of redis. + + """ + onnx = Onnx() + data_manager = get_data_manager(cache_base=CacheBase("redis", maxmemory="100mb", policy="allkeys-lru", ttl=100), + vector_base=VectorBase("faiss", dimension=onnx.dimension), + eviction_base=EvictionBase("no_op_eviction")) + + cache = Cache() + cache.init(data_manager=data_manager) + question = "What is github?" + answer = "Online platform for version control and code collaboration." + embedding = onnx.to_embeddings(question) + cache.import_data([question], [answer], [embedding]) + + +def manager_factory_example(): + onnx = Onnx() + data_manager = manager_factory("redis,faiss", + eviction_manager="redis", + scalar_params={"url": "redis://localhost:6379"}, + vector_params={"dimension": onnx.dimension}, + eviction_params={"maxmemory": "100mb", + "policy": "allkeys-lru", + "ttl": 1} + ) + + cache = Cache() + cache.init(data_manager=data_manager) + question = "What is github?" + answer = "Online platform for version control and code collaboration." + embedding = onnx.to_embeddings(question) + cache.import_data([question], [answer], [embedding]) diff --git a/gptcache/manager/data_manager.py b/gptcache/manager/data_manager.py index 46bfb3d2..28d5a21d 100644 --- a/gptcache/manager/data_manager.py +++ b/gptcache/manager/data_manager.py @@ -7,6 +7,7 @@ import requests from gptcache.manager.eviction import EvictionBase +from gptcache.manager.eviction.distributed_cache import NoOpEviction from gptcache.manager.eviction_manager import EvictionManager from gptcache.manager.object_data.base import ObjectBase from gptcache.manager.scalar_data.base import ( @@ -30,11 +31,11 @@ def save(self, question, answer, embedding_data, **kwargs): @abstractmethod def import_data( - self, - questions: List[Any], - answers: List[Any], - embedding_datas: List[Any], - session_ids: List[Optional[str]], + self, + questions: List[Any], + answers: List[Any], + embedding_datas: List[Any], + session_ids: List[Optional[str]], ): pass @@ -213,12 +214,10 @@ class SSDataManager(DataManager): :type s: CacheStorage :param v: VectorBase to manager the vector data, it can be generated with :meth:`gptcache.manager.VectorBase`. :type v: VectorBase - :param max_size: the max size for the cache, defaults to 1000. - :type max_size: int - :param clean_size: the size to clean up, defaults to `max_size * 0.2`. - :type clean_size: int - :param eviction: The eviction policy, it is support "LRU" and "FIFO" now, and defaults to "LRU". - :type eviction: str + :param o: ObjectBase to manager the object data, it can be generated with :meth:`gptcache.manager.ObjectBase`. + :type o: ObjectBase + :param e: EvictionBase to manager the eviction data, it can be generated with :meth:`gptcache.manager.EvictionBase`. + :type e: EvictionBase """ def __init__( @@ -226,24 +225,26 @@ def __init__( s: CacheStorage, v: VectorBase, o: Optional[ObjectBase], + e: Optional[EvictionBase], max_size, clean_size, - policy="LRU", + policy="LRU" ): - self.max_size = max_size - self.clean_size = clean_size self.s = s self.v = v self.o = o self.eviction_manager = EvictionManager(self.s, self.v) - self.eviction_base = EvictionBase( - name="memory", - policy=policy, - maxsize=max_size, - clean_size=clean_size, - on_evict=self._clear, - ) - self.eviction_base.put(self.s.get_ids(deleted=False)) + if e is None: + e = EvictionBase(name="memory", + maxsize=max_size, + clean_size=clean_size, + policy=policy, + on_evict=self._clear) + self.eviction_base = e + + if not isinstance(self.eviction_base, NoOpEviction): + # if eviction manager is no op redis, we don't need to put data into eviction base + self.eviction_base.put(self.s.get_ids(deleted=False)) def _clear(self, marked_keys): self.eviction_manager.soft_evict(marked_keys) diff --git a/gptcache/manager/eviction/distributed_cache.py b/gptcache/manager/eviction/distributed_cache.py new file mode 100644 index 00000000..271ab9a8 --- /dev/null +++ b/gptcache/manager/eviction/distributed_cache.py @@ -0,0 +1,121 @@ +# pylint: disable=wrong-import-position +from abc import ABC, abstractmethod +from typing import List + +from gptcache.utils import import_redis +from gptcache.manager.eviction.base import EvictionBase + +import_redis() +import redis +from redis_om import get_redis_connection + + +class DistributedEviction(EvictionBase, ABC): + """ + Base class for Distributed Eviction Strategy. + """ + + @abstractmethod + def put(self, objs: List[str]): + pass + + @abstractmethod + def get(self, obj: str): + pass + + @property + @abstractmethod + def policy(self) -> str: + pass + + +class RedisCacheEviction(DistributedEviction, ABC): + """eviction: Distributed Cache Eviction Strategy using Redis. + + :param host: the host of redis + :type host: str + :param port: the port of redis + :type port: int + :param policy: eviction strategy policy of redis such as allkeys-lru, volatile-lru, allkeys-random, volatile-random, etc. + refer https://redis.io/docs/reference/eviction/ for more information. + :type policy: str + :param maxsize: the maxsize of cache data + :type maxsize: int + :param on_evict: the function for cleaning the data in the store + :type on_evict: Callable[[List[Any]], None] + :param maxmemory: the maxmemory of redis + :type maxmemory: str + :param global_key_prefix: the global key prefix + :type global_key_prefix: str + :param ttl: the ttl of the cache data + :type ttl: int + :param maxmemory_samples: Number of keys to sample when evicting keys + :type maxmemory_samples: int + :param kwargs: the kwargs + :type kwargs: Any + """ + + def __init__(self, + host="localhost", + port=6379, + maxmemory: str = None, + policy: str = None, + global_key_prefix="gptcache", + ttl: int = None, + maxmemory_samples: int = None, + **kwargs): + self._redis = get_redis_connection(host=host, port=port, **kwargs) + if maxmemory: + self._redis.config_set("maxmemory", maxmemory) + if maxmemory_samples: + self._redis.config_set("maxmemory-samples", maxmemory_samples) + if policy: + self._redis.config_set("maxmemory-policy", policy) + self._policy = policy.lower() + + self._global_key_prefix = global_key_prefix + self._ttl = ttl + + def _create_key(self, key: str) -> str: + return f"{self._global_key_prefix}:evict:{key}" + + def put(self, objs: List[str], expire=False): + ttl = self._ttl if expire else None + for key in objs: + self._redis.set(self._create_key(key), "True", ex=ttl) + + def get(self, obj: str): + + try: + value = self._redis.get(self._create_key(obj)) + # update key expire time when accessed + if self._ttl: + self._redis.expire(self._create_key(obj), self._ttl) + return value + except redis.RedisError: + print(f"Error getting key {obj} from cache") + return None + + @property + def policy(self) -> str: + return self._policy + + +class NoOpEviction(EvictionBase): + """eviction: No Op Eviction Strategy. This is used when Eviction is managed internally + by the Databases such as Redis or memcached and no eviction is required to perform. + + """ + + @property + def policy(self) -> str: + return "" + + def __init__(self, **kwargs): + pass + + def put(self, objs: List[str]): + pass + + def get(self, obj: str): + pass diff --git a/gptcache/manager/eviction/manager.py b/gptcache/manager/eviction/manager.py index 6efe9651..27adc238 100644 --- a/gptcache/manager/eviction/manager.py +++ b/gptcache/manager/eviction/manager.py @@ -18,8 +18,8 @@ def __init__(self): @staticmethod def get( name: str, - policy: str, - maxsize: int, + policy: str = "LRU", + maxsize: int = 1000, clean_size: int = 0, on_evict: Callable[[List[Any]], None] = None, **kwargs @@ -32,6 +32,17 @@ def get( eviction_base = MemoryCacheEviction( policy, maxsize, clean_size, on_evict, **kwargs ) + return eviction_base + if name == "redis": + from gptcache.manager.eviction.distributed_cache import RedisCacheEviction + if policy == "LRU": + policy = None + eviction_base = RedisCacheEviction(policy=policy, **kwargs) + return eviction_base + if name == "no_op_eviction": + from gptcache. manager.eviction.distributed_cache import NoOpEviction + eviction_base = NoOpEviction() + return eviction_base + else: raise NotFoundError("eviction base", name) - return eviction_base diff --git a/gptcache/manager/eviction/memory_cache.py b/gptcache/manager/eviction/memory_cache.py index b9b3fc90..81d5c171 100644 --- a/gptcache/manager/eviction/memory_cache.py +++ b/gptcache/manager/eviction/memory_cache.py @@ -33,12 +33,12 @@ class MemoryCacheEviction(EvictionBase): """ def __init__( - self, - policy: str, - maxsize: int, - clean_size: int = 0, - on_evict: Callable[[List[Any]], None] = None, - **kwargs, + self, + policy: str = "LRU", + maxsize: int = 1000, + clean_size: int = 0, + on_evict: Callable[[List[Any]], None] = None, + **kwargs, ): self._policy = policy.upper() if self._policy == "LRU": diff --git a/gptcache/manager/factory.py b/gptcache/manager/factory.py index d464f8ad..db690b6f 100644 --- a/gptcache/manager/factory.py +++ b/gptcache/manager/factory.py @@ -4,18 +4,21 @@ from gptcache.manager import CacheBase, VectorBase, ObjectBase from gptcache.manager.data_manager import SSDataManager, MapDataManager +from gptcache.manager.eviction import EvictionBase +from gptcache.manager.scalar_data.redis_storage import RedisCacheStorage +from gptcache.utils.log import gptcache_log def manager_factory(manager="map", data_dir="./", max_size=1000, - clean_size=None, - eviction: str = "LRU", + eviction_manager: str = "memory", get_data_container: Callable = None, scalar_params=None, vector_params=None, - object_params=None): - + object_params=None, + eviction_params=None + ): """Factory of DataManager. By using this factory method, you only need to specify the root directory of the data, and it can automatically manage all the local files. @@ -25,12 +28,11 @@ def manager_factory(manager="map", :type manager: str :param data_dir: Root path for data storage. :type data_dir: str - :param max_size: the max size for the cache, defaults to 1000. + :param max_size: the max size for the LRU cache in MapDataManager, defaults to 1000. :type max_size: int - :param clean_size: the size to clean up, defaults to `max_size * 0.2`. - :type clean_size: int - :param eviction: the eviction policy, it is support "LRU" and "FIFO" now, and defaults to "LRU". - :type eviction: str + :param eviction_manager: The eviction manager, defaults to "memory". + It supports "memory" and "redis" and 'no_op_eviction'. + :type eviction_manager: str :param get_data_container: a Callable to get the data container, defaults to None. :type get_data_container: Callable @@ -43,6 +45,8 @@ def manager_factory(manager="map", :param object_params: Params of object storage. :type object_params: dict + :param eviction_params: Params of eviction. + :type eviction_params: dict :return: SSDataManager or MapDataManager. Example: @@ -51,6 +55,17 @@ def manager_factory(manager="map", from gptcache.manager import manager_factory data_manager = manager_factory("sqlite,faiss", data_dir="./workspace", vector_params={"dimension": 128}) + + # or using manager factory enabled with redis cache instead of in-memory cache + from gptcache.manager import manager_factory + data_manager = manager_factory("redis,faiss", + eviction_manager="redis", + scalar_params={"maxmemory": "2mb", + "policy": "allkeys-lru" + }, + vector_params={"dimension": 128}, + eviction_params=dict(url="redis://localhost:6379") + ) """ Path(data_dir).mkdir(parents=True, exist_ok=True) @@ -61,7 +76,8 @@ def manager_factory(manager="map", db_infos = manager.split(",") if len(db_infos) not in [2, 3]: - raise RuntimeError("Error manager format: %s, the correct is \"{scalar},{vector},{object}\", object is optional" % manager) + raise RuntimeError( + "Error manager format: %s, the correct is \"{scalar},{vector},{object}\", object is optional" % manager) if len(db_infos) == 2: db_infos.append("") @@ -87,18 +103,39 @@ def manager_factory(manager="map", if obj == "local": object_params["path"] = os.path.join(data_dir, "local_obj") o = ObjectBase(name=obj, **object_params) if obj else None - return get_data_manager(s, v, o, max_size, clean_size, eviction) + + if eviction_params is None: + eviction_params = {} + + if isinstance(s, RedisCacheStorage) and eviction_manager == "redis": + # if cache manager and eviction manager are both redis, we use no op redis to avoid redundant operations + eviction_manager = "no_op_eviction" + gptcache_log.info("Since Scalar Storage and Eviction manager are both redis, " + "no_op_eviction will be used to avoid redundant operations.") + s.init_eviction_params( + maxmemory=eviction_params.get("maxmemory", scalar_params.get("maxmemory")), + policy=eviction_params.get("policy", scalar_params.get("policy")), + ttl=eviction_params.get("ttl", scalar_params.get("ttl")), + maxmemory_samples=eviction_params.get("maxmemory_samples", scalar_params.get("maxmemory_samples")), + ) + + e = EvictionBase( + name=eviction_manager, + **eviction_params + ) + return get_data_manager(s, v, o, e) def get_data_manager( - cache_base: Union[CacheBase, str] = None, - vector_base: Union[VectorBase, str] = None, - object_base: Union[ObjectBase, str] = None, - max_size: int = 1000, - clean_size: int = None, - eviction: str = "LRU", - data_path: str = "data_map.txt", - get_data_container: Callable = None, + cache_base: Union[CacheBase, str] = None, + vector_base: Union[VectorBase, str] = None, + object_base: Union[ObjectBase, str] = None, + eviction_base: Union[EvictionBase, str] = None, + max_size: int = 1000, + clean_size=None, + eviction: str = "LRU", + data_path: str = "data_map.txt", + get_data_container: Callable = None, ): """Generate `SSDataManager` (with `cache_base`, `vector_base`, `max_size`, `clean_size` and `eviction` params), or `MAPDataManager` (with `data_path`, `max_size` and `get_data_container` params) to manager the data. @@ -111,12 +148,17 @@ def get_data_manager( :type vector_base: :class:`VectorBase` or str :param object_base: a object storage, supports local path and s3. :type object_base: :class:`ObjectBase` or str - :param max_size: the max size for the cache, defaults to 1000. + :param max_size: the max size for the LRU cache in MapDataManager, defaults to 1000. :type max_size: int - :param clean_size: the size to clean up, defaults to `max_size * 0.2`. + :param eviction_base: a EvictionBase object, or the name of the eviction, it supports: + - 'memory' + - 'redis' + - 'no_op_eviction'. + :type eviction_base: :class:`EvictionBase` or str + :param clean_size: the clean size for the LRU cache in MapDataManager, defaults to None. :type clean_size: int - :param eviction: the eviction policy, it is support "LRU" and "FIFO" now, and defaults to "LRU". - :type eviction: str + :param eviction: the eviction policy for the LRU cache in MapDataManager, defaults to 'LRU'. + :type eviction: str :param data_path: the path to save the map data, defaults to 'data_map.txt'. :type data_path: str :param get_data_container: a Callable to get the data container, defaults to None. @@ -131,6 +173,18 @@ def get_data_manager( from gptcache.manager import get_data_manager, CacheBase, VectorBase data_manager = get_data_manager(CacheBase('sqlite'), VectorBase('faiss', dimension=128)) + + # or using manager factory enabled with redis cache instead of in-memory cache + # example 1: using redis eviction base with sqlite cache base + data_manager = get_data_manager(cache_base=CacheBase("sqlite"), + vector_base=VectorBase("faiss", dimension=onnx.dimension), + eviction_base=EvictionBase("redis", maxmemory="0", policy="noeviction", ttl=1)) + + # example 2: using redis eviction base with redis cache base + # here no_op_eviction is used since `redis` cache base already handles evictions internally + data_manager = get_data_manager(cache_base=CacheBase("redis", maxmemory="0", policy="noeviction", ttl=1), + vector_base=VectorBase("faiss", dimension=onnx.dimension), + eviction_base=EvictionBase("no_op_eviction")) """ if not cache_base and not vector_base: return MapDataManager(data_path, max_size, get_data_container) @@ -141,5 +195,7 @@ def get_data_manager( vector_base = VectorBase(name=vector_base) if isinstance(object_base, str): object_base = ObjectBase(name=object_base) + if isinstance(eviction_base, str): + eviction_base = EvictionBase(name=eviction_base) assert cache_base and vector_base - return SSDataManager(cache_base, vector_base, object_base, max_size, clean_size, eviction) + return SSDataManager(cache_base, vector_base, object_base, eviction_base, max_size, clean_size, eviction) diff --git a/gptcache/manager/scalar_data/redis_storage.py b/gptcache/manager/scalar_data/redis_storage.py index dd4d3c83..18c32886 100644 --- a/gptcache/manager/scalar_data/redis_storage.py +++ b/gptcache/manager/scalar_data/redis_storage.py @@ -47,40 +47,34 @@ def incr(cls): def get(cls): return cls.database.get(cls.key_name) - class Embedding: + class EmbeddingType: """ - Custom class for storing embedding result. - An embedding of type ``bytes`` is stored against Hash record type for the provided key. - :param pk: Primary key against which hash data for embedding would be stored - :type pk: str - :param embedding: Embedding information to store - :type embedding: bytes - - Note: - As of this implementation, redis-om doesn't have a good compatibility to store bytes data - and successfully retrieve it without corruption. - In addition to that, decoding while getting the response is disabled as well. + Directly using bytes for embedding data is not supported by redis-om as of now. + Custom type for embedding data. This will be stored as bytes in redis. + Latin-1 encoding is used to convert the bytes to string and vice versa. """ - prefix = global_key + ":embedding" + def __init__(self, data: bytes): + self.data = data - def __init__(self, pk: str, embedding: bytes): - self.pk = pk - self.embedding = embedding - - def save(self, pipeline: Pipeline): - pipeline.hset(self.prefix + ":" + str(self.pk), "embedding", self.embedding) + @classmethod + def __get_validators__(cls): + yield cls.validate @classmethod - def get(cls, key: int, db: Redis): - """ - Returns embedding stored against the ``key``. - Decode only key value while creating a response - :param key: redis key to fetch embedding - :type key: str - """ - result = db.hgetall(cls.prefix + ":" + str(key)) - return {k.decode("utf-8"): v for k, v in result.items()} + def validate(cls, v: [np.array, bytes]): + if isinstance(v, np.ndarray): + return cls(v.astype(np.float32).tobytes()) + elif isinstance(v, bytes): + return cls(v) + + return cls(v) + + def to_numpy(self) -> np.ndarray: + return np.frombuffer(self.data.encode("latin-1"), dtype=np.float32) + + def __repr__(self): + return f"{self.data}" class Answers(EmbeddedJsonModel): """ @@ -93,6 +87,15 @@ class Answers(EmbeddedJsonModel): class Meta: database = redis_connection + class QuestionDeps(EmbeddedJsonModel): + """ + Question Dep collection + """ + + dep_name: str + dep_data: str + dep_type: int + class Questions(JsonModel): """ questions collection @@ -103,12 +106,20 @@ class Questions(JsonModel): last_access: datetime.datetime deleted: int = Field(index=True) answers: List[Answers] + deps: List[QuestionDeps] + embedding: EmbeddingType class Meta: global_key_prefix = global_key model_key_prefix = "questions" database = redis_connection + class Config: + json_encoders = { + EmbeddingType: lambda n: n.data.decode("latin-1") + if isinstance(n.data, bytes) else n.data + } + class Sessions(JsonModel): """ session collection @@ -123,21 +134,6 @@ class Meta: session_question: str question_id: str = Field(index=True) - class QuestionDeps(JsonModel): - """ - Question Dep collection - """ - - class Meta: - global_key_prefix = global_key - model_key_prefix = "ques_deps" - database = redis_connection - - question_id: str = Field(index=True) - dep_name: str - dep_data: str - dep_type: int - class Report(JsonModel): """ Report collection @@ -157,7 +153,7 @@ class Meta: cache_time: datetime.datetime = Field(index=True) extra: Optional[str] - return Questions, Embedding, Answers, QuestionDeps, Sessions, Counter, Report + return Questions, Answers, QuestionDeps, Sessions, Counter, Report class RedisCacheStorage(CacheStorage): @@ -165,15 +161,22 @@ class RedisCacheStorage(CacheStorage): Using redis-om as OM to store data in redis cache storage :param host: redis host, default value 'localhost' - :type host: str - :param port: redis port, default value 27017 - :type port: int - :param global_key_prefix: A global prefix for keys against which data is stored. - For example, for a global_key_prefix ='gptcache', keys would be constructed would look like this: - gptcache:questions:abc123 - :type global_key_prefix: str - :param kwargs: Additional parameters to provide in order to create redis om connection - + :type host: str + :param port: redis port, default value 27017 + :type port: int + :param global_key_prefix: A global prefix for keys against which data is stored. + For example, for a global_key_prefix ='gptcache', keys would be constructed would look like this: + gptcache:questions:abc123 + :type global_key_prefix: str + :param maxmemory: Maximum memory to use for redis cache storage + :type maxmemory: str + :param policy: Policy to use for eviction, default value 'allkeys-lru' + :type policy: str + :param ttl: Time to live for keys in milliseconds, default value None + :type ttl: int + :param maxmemory_samples: Number of keys to sample when evicting keys + :type maxmemory_samples: int + :param kwargs: Additional parameters to provide in order to create redis om connection Example: .. code-block:: python @@ -197,20 +200,20 @@ class RedisCacheStorage(CacheStorage): def __init__( self, - global_key_prefix="gptcache", + global_key_prefix: str = "gptcache", host: str = "localhost", port: int = 6379, + maxmemory: str = None, + policy: str = None, + ttl: int = None, + maxmemory_samples: int = None, **kwargs ): self.con = get_redis_connection(host=host, port=port, **kwargs) - - self.con_encoded = get_redis_connection( - host=host, port=port, decode_responses=False, **kwargs - ) - + self.default_ttl = ttl + self.init_eviction_params(policy=policy, maxmemory=maxmemory, maxmemory_samples=maxmemory_samples, ttl=ttl) ( self._ques, - self._embedding, self._answer, self._ques_dep, self._session, @@ -220,6 +223,15 @@ def __init__( Migrator().run() + def init_eviction_params(self, policy, maxmemory, maxmemory_samples, ttl): + self.default_ttl = ttl + if maxmemory: + self.con.config_set("maxmemory", maxmemory) + if policy: + self.con.config_set("maxmemory-policy", policy) + if maxmemory_samples: + self.con.config_set("maxmemory-samples", maxmemory_samples) + def create(self): pass @@ -235,6 +247,21 @@ def _insert(self, data: CacheData, pipeline: Pipeline = None): ) all_data.append(answer_data) + all_deps = [] + if isinstance(data.question, Question) and data.question.deps is not None: + for dep in data.question.deps: + all_deps.append( + self._ques_dep( + dep_name=dep.name, + dep_data=dep.data, + dep_type=dep.dep_type, + ) + ) + embedding_data = ( + data.embedding_data + if data.embedding_data is not None + else None + ) ques_data = self._ques( pk=pk, question=data.question @@ -244,30 +271,12 @@ def _insert(self, data: CacheData, pipeline: Pipeline = None): last_access=datetime.datetime.utcnow(), deleted=0, answers=answers, + deps=all_deps, + embedding=embedding_data ) ques_data.save(pipeline) - embedding_data = ( - data.embedding_data.astype(np.float32).tobytes() - if data.embedding_data is not None - else None - ) - self._embedding(pk=ques_data.pk, embedding=embedding_data).save(pipeline) - - if isinstance(data.question, Question) and data.question.deps is not None: - all_deps = [] - for dep in data.question.deps: - all_deps.append( - self._ques_dep( - question_id=ques_data.pk, - dep_name=dep.name, - dep_data=dep.data, - dep_type=dep.dep_type, - ) - ) - self._ques_dep.add(all_deps, pipeline=pipeline) - if data.session_id: session_data = self._session( question_id=ques_data.pk, @@ -277,7 +286,8 @@ def _insert(self, data: CacheData, pipeline: Pipeline = None): else data.question.content, ) session_data.save(pipeline) - + if self.default_ttl: + ques_data.expire(self.default_ttl, pipeline=pipeline) return int(ques_data.pk) def batch_insert(self, all_data: List[CacheData]): @@ -297,22 +307,20 @@ def get_data_by_id(self, key: str): qs.update(last_access=datetime.datetime.utcnow()) res_ans = [(item.answer, item.answer_type) for item in qs.answers] - - deps = self._ques_dep.find(self._ques_dep.question_id == key).all() res_deps = [ - QuestionDep(item.dep_name, item.dep_data, item.dep_type) for item in deps + QuestionDep(item.dep_name, item.dep_data, item.dep_type) for item in qs.deps ] session_ids = [ obj.session_id for obj in self._session.find(self._session.question_id == key).all() ] - - res_embedding = self._embedding.get(qs.pk, self.con_encoded)["embedding"] + if self.default_ttl: + qs.expire(self.default_ttl) return CacheData( - question=qs.question if not deps else Question(qs.question, res_deps), + question=qs.question if not res_deps else Question(qs.question, res_deps), answers=res_ans, - embedding_data=np.frombuffer(res_embedding, dtype=np.float32), + embedding_data=qs.embedding.to_numpy(), session_id=session_ids, create_on=qs.create_on, last_access=qs.last_access, @@ -334,11 +342,6 @@ def clear_deleted_data(self): ).all() self._session.delete_many(sessions_to_delete, pipeline) - deps_to_delete = self._ques_dep.find( - self._ques_dep.question_id << q_ids - ).all() - self._ques_dep.delete_many(deps_to_delete, pipeline) - pipeline.execute() def get_ids(self, deleted=True): diff --git a/tests/unit_tests/eviction/test_distributed_cache.py b/tests/unit_tests/eviction/test_distributed_cache.py new file mode 100644 index 00000000..afdef9fb --- /dev/null +++ b/tests/unit_tests/eviction/test_distributed_cache.py @@ -0,0 +1,211 @@ +import time +import unittest +from pathlib import Path +from tempfile import TemporaryDirectory + +from redis_om import get_redis_connection + +from gptcache.embedding import Onnx +from gptcache.manager import manager_factory, get_data_manager, CacheBase, VectorBase +from gptcache.manager.eviction import EvictionBase + + +class TestDistributedCache(unittest.TestCase): + url: str = "redis://default:default@localhost:6379" + + def setUp(cls) -> None: + cls._clear_test_db() + + @staticmethod + def _clear_test_db(): + r = get_redis_connection(url=TestDistributedCache.url) + r.flushall() + r.flushdb() + time.sleep(1) + + def test_redis_only_cache_eviction(self): + manager = manager_factory("redis,faiss", + eviction_manager="redis", + vector_params={"dimension": 5}, + eviction_params=dict(url=self.url, + maxmemory="100mb", + policy="allkeys-lru")) + self.assertEqual(type(manager.eviction_base).__name__, "NoOpEviction") + self.assertEqual(manager.eviction_base.policy, "") + self.assertEqual(type(manager.s).__name__, "RedisCacheStorage") + + def test_eviction_base_str(self): + data_manager = get_data_manager(cache_base="sqlite", + vector_base=VectorBase("faiss", dimension=5), + eviction_base="redis" + ) + self.assertIsNotNone(data_manager.eviction_base) + self.assertEqual(type(data_manager.eviction_base).__name__, "RedisCacheEviction") + + data_manager = get_data_manager(cache_base="sqlite", + vector_base=VectorBase("faiss", dimension=5), + eviction_base="no_op_eviction" + ) + self.assertIsNotNone(data_manager.eviction_base) + self.assertEqual(type(data_manager.eviction_base).__name__, "NoOpEviction") + + + def test_redis_sqlite_cache_eviction(self): + with TemporaryDirectory(dir="./") as root: + db_name = "sqlite" + db_path = Path(root) / f"{db_name}.db" + manager = manager_factory("sqlite,faiss", + eviction_manager="redis", + scalar_params={ + "url": f"{db_name}:///" + str(db_path), + }, + vector_params={"dimension": 5}, + eviction_params=dict(url=self.url, + maxmemory="100mb", + policy="allkeys-lru")) + + self.assertEqual(type(manager.s).__name__, "SQLStorage") + self.assertEqual(type(manager.eviction_base).__name__, "RedisCacheEviction") + self.assertEqual(manager.eviction_base.policy, "allkeys-lru") + self.assertEqual(manager.eviction_base._ttl, None) + + def test_noeviction_policy(self): + onnx = Onnx() + data_manager = manager_factory("redis,faiss", + eviction_manager="redis", + scalar_params={"url": self.url + }, + vector_params={"dimension": onnx.dimension}, + eviction_params={"maxmemory": "0", + "policy": "noeviction"} + ) + questions = [] + answers = [] + idx_list = [] + for i in range(100): + idx_list.append(i) + questions.append(f'This is a question_{i}') + answers.append(f'This is an answer_{i}') + + for idx, question, answer in zip(idx_list, questions, answers): + embedding = onnx.to_embeddings(question) + data_manager.save( + question=question, + answer=answer, + embedding_data=embedding + ) + + self.assertEqual(data_manager.s.count(), len(idx_list)) + + def test_ttl(self): + onnx = Onnx() + data_manager = manager_factory("redis,faiss", + eviction_manager="redis", + scalar_params={"url": self.url + }, + vector_params={"dimension": onnx.dimension}, + eviction_params={"maxmemory": "0", + "policy": "noeviction", + "ttl": 1} + ) + questions = [] + answers = [] + idx_list = [] + embeddings = [] + for i in range(10): + idx_list.append(i) + questions.append(f'This is a question_{i}') + answers.append(f'This is an answer_{i}') + embeddings.append(onnx.to_embeddings(questions[-1])) + + data_manager.import_data(questions, answers, embedding_datas=embeddings, + session_ids=[None for _ in range(len(questions))]) + time.sleep(5) + self.assertEqual(data_manager.s.count(), 0) + + def test_redis_only_config(self): + onnx = Onnx() + data_manager = get_data_manager(cache_base=CacheBase("redis", maxmemory="100mb", policy="allkeys-lru"), + vector_base=VectorBase("faiss", dimension=onnx.dimension), + eviction_base=EvictionBase("redis")) + questions = [] + answers = [] + idx_list = [] + embeddings = [] + for i in range(10): + idx_list.append(i) + questions.append(f'This is a question_{i}') + answers.append(f'This is an answer_{i}') + embeddings.append(onnx.to_embeddings(questions[-1])) + + data_manager.import_data(questions, answers, embedding_datas=embeddings, + session_ids=[None for _ in range(len(questions))]) + search_data = data_manager.search(embeddings[0], top_k=1) + for res in search_data: + self.assertEqual(data_manager.eviction_base.get(res[1]), "True") + + def test_redis_only_with_no_op_eviction_config(self): + onnx = Onnx() + data_manager = get_data_manager(cache_base=CacheBase("redis", maxmemory="100mb", policy="allkeys-lru"), + vector_base=VectorBase("faiss", dimension=onnx.dimension), + eviction_base=EvictionBase("no_op_eviction")) + questions = [] + answers = [] + idx_list = [] + embeddings = [] + for i in range(10): + idx_list.append(i) + questions.append(f'This is a question_{i}') + answers.append(f'This is an answer_{i}') + embeddings.append(onnx.to_embeddings(questions[-1])) + + data_manager.import_data(questions, answers, embedding_datas=embeddings, + session_ids=[None for _ in range(len(questions))]) + search_data = data_manager.search(embeddings[0], top_k=1) + for res in search_data: + self.assertEqual(data_manager.eviction_base.get(res[1]), None) + + def test_cache_configuration(self): + e = EvictionBase("redis", maxmemory="4mb", policy="allkeys-lru", maxmemory_samples=5) + + memory_conf = e._redis.config_get("maxmemory") + self.assertIsNotNone(memory_conf) + self.assertEqual(memory_conf["maxmemory"], "4194304") + + policy_conf = e._redis.config_get("maxmemory-policy") + self.assertIsNotNone(policy_conf) + self.assertEqual(policy_conf['maxmemory-policy'], "allkeys-lru") + + samples_conf = e._redis.config_get("maxmemory-samples") + self.assertIsNotNone(samples_conf) + self.assertEqual(samples_conf['maxmemory-samples'], "5") + + def test_ttl_access(self): + e = EvictionBase("redis", maxmemory="4mb", policy="allkeys-lru", maxmemory_samples=5, ttl=5) + + e.put(["key"]) + value = e.get("key") + self.assertEqual(value, "True") + + ttl = e._redis.ttl("key") + self.assertIsNotNone(ttl) + + time.sleep(2) + ttl = e._redis.ttl("key") + self.assertLess(ttl, 5) + + time.sleep(4) + ttl = e._redis.ttl("key") + self.assertEqual(ttl, -2) + + value = e.get("key") + self.assertIsNone(value) + + value = e.get("key1") + self.assertIsNone(value) + + e = EvictionBase("redis", maxmemory="4mb", policy="allkeys-lru", maxmemory_samples=5) + e.put(["key"]) + ttl = e._redis.ttl("key") + self.assertEqual(e.get("key"), "True") + self.assertEqual(ttl, -2) diff --git a/tests/unit_tests/manager/test_redis_cache_storage.py b/tests/unit_tests/manager/test_redis_cache_storage.py index cef3d985..260a767a 100644 --- a/tests/unit_tests/manager/test_redis_cache_storage.py +++ b/tests/unit_tests/manager/test_redis_cache_storage.py @@ -4,17 +4,19 @@ import numpy as np from gptcache.manager.scalar_data.base import CacheData, Question -from gptcache.manager.scalar_data.redis_storage import RedisCacheStorage, get_models +from gptcache.manager.scalar_data.redis_storage import RedisCacheStorage from gptcache.utils import import_redis import_redis() -from redis_om import get_redis_connection, RedisModel +from redis_om import get_redis_connection class TestRedisStorage(unittest.TestCase): test_dbname = "gptcache_test" url = "redis://default:default@localhost:6379" + # url = "redis://default:default@localhost:7000" + def setUp(cls) -> None: cls._clear_test_db() @@ -64,7 +66,9 @@ def test_normal(self): def test_with_deps(self): redis_storage = RedisCacheStorage(global_key_prefix=self.test_dbname, - url=self.url) + url=self.url, + # cluster=True + ) data_id = redis_storage.batch_insert( [ CacheData( @@ -151,3 +155,30 @@ def test_session(self): redis_storage.delete_session(ids[:3]) assert len(redis_storage.list_sessions()) == 7 + + def test_cache_configuration(self): + redis_storage = RedisCacheStorage(global_key_prefix=self.test_dbname, + url=self.url, + maxmemory_samples=5, + maxmemory="4mb", + policy="allkeys-lru", + ttl=10) + + memory_conf = redis_storage.con.config_get("maxmemory") + assert memory_conf is not None + assert memory_conf["maxmemory"] == "4194304" + policy_conf = redis_storage.con.config_get("maxmemory-policy") + assert policy_conf is not None + assert policy_conf['maxmemory-policy'] == "allkeys-lru" + + samples_conf = redis_storage.con.config_get("maxmemory-samples") + assert samples_conf is not None + assert samples_conf['maxmemory-samples'] == "5" + + ids = redis_storage.batch_insert( + [CacheData("question_1", ["answer_1"] * 2, np.random.rand(5), session_id=1)]) + ttl = redis_storage.con.ttl(redis_storage._ques.make_key(ids[0])) + assert ttl is not None + time.sleep(2) + ttl = redis_storage.con.ttl(redis_storage._ques.make_key(ids[0])) + assert ttl < 10