diff --git a/.gitignore b/.gitignore index 7d4a666..dfe5298 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ CHACHE_SQLITE3.db CACHE_LEVELDB Log FLAXKV_DB +test_db/ .idea .ipynb_checkpoints diff --git a/README.md b/README.md index 454ab4c..4ce3d4a 100644 --- a/README.md +++ b/README.md @@ -38,18 +38,8 @@ A high-performance dictionary database.

-The `flaxkv` module provides a dictionary-like interface for interacting with high-performance key-value databases (LMDB, LevelDB). -It abstracts the complexities of direct database interaction, allowing users to perform CRUD operations in a simple and -intuitive manner. You can use it just like a Python dictionary without worrying about it blocking your main process at any stage. - -**Use Cases** - -- **Key-Value Structure**: `flaxkv` is suitable for storing simple key-value structured datasets. - -- **High-Frequency Writing**: `flaxkv` is very suitable for scenarios that require high-frequency insertion/updating of data. - -- **Machine Learning**: `flaxkv` is perfect for storing various embeddings, images, texts, and other large datasets with key-value structures in machine learning. - +The `flaxkv` provides an interface very similar to a dictionary for interacting with high-performance key-value databases. More importantly, as a persistent database, it offers performance close to that of native dictionaries (in-memory access). +You can use it just like a Python dictionary without having to worry about blocking your user process when operating the database at any time. --- ## Key Features @@ -86,7 +76,7 @@ import numpy as np db = dictdb('./test_db') # or run server `flaxkv run --port 8000`, then: -# db = dictdb('http://localhost:8000', remote=True) +# db = dictdb('http://localhost:8000', remote=True, db_name='test_db', rebuild=False) db[1] = 1 db[1.1] = 1 / 3 @@ -112,12 +102,22 @@ for key, value in db.items(): print(len(db)) ``` -You might have noticed that even when the program ends, we didn't use `db.close()` to release resources! -Everything will be handled automatically. -More importantly, as a persistent database, it offers performance close to dictionary (in-memory) access! -(There should be a benchmark here.) - -P.S.: Of course, you can also manually call `db.close()` to release resources immediately. +### Tips +- `flaxkv` provides performance close to native dictionary (in-memory) access as a persistent database! (There should be a benchmark here) +- You may have noticed that in the previous example code, `db.close()` was not used to release resources! Because all this will be automatically handled by `flaxkv`. Of course, you can also manually call db.close() to immediately release resources. +- Since `flaxkv` saves data by buffered writing, this feature of delayed writing may not write data to the disk in time in some scenarios (such as in Jupyter), +in this case, you can use `db.write_immediately()` to immediately trigger a write operation. + +### Benchmark +todo + +### Use Cases +- **Key-Value Structure:** +Used to save simple key-value structure data. +- **High-Frequency Writing:** +Very suitable for scenarios that require high-frequency insertion/update of data. +- **Machine Learning:** +`flaxkv` is very suitable for saving various large datasets of embeddings, images, texts, and other key-value structures in machine learning. ## Citation If `FlaxKV` has been helpful to your research, please cite: diff --git a/README_ZH.md b/README_ZH.md index 606a2ee..dc299a8 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -40,18 +40,8 @@ -`flaxkv` 提供了一个类似字典的接口,用于与高性能键值数据库进行交互。 -抽象了直接数据库交互的复杂性,允许用户以简单直观的方式执行CRUD操作。 -你可以直接将它当成python字典来使用而不必担心在任何阶段它会阻塞你的主进程。 - -**适用场景** - -- **键-值型结构**: - `flaxkv` 适合于保存简单的键值结构数据集。 -- **高频写入**: - `flaxkv` 非常适合那些需要高频插入/更新数据的场景。 -- **机器学习**: - `flaxkv`十分适合用于保存机器学习中的各种嵌入向量、图像、文本和其它键-值结构的大型数据集。 +`flaxkv` 提供了一个非常类似字典的接口,用于与高性能键值数据库进行交互。更重要的是,它作为持久化数据库提供了接近原生字典(内存)存取的性能。 +你可以直接将它当成python字典来使用而不必担心在任何时候操作数据库时会阻塞你的用户进程。 --- @@ -91,7 +81,7 @@ import numpy as np db = dictdb('./test_db') # or run server `flaxkv run --port 8000`, then: -# db = dictdb('http://localhost:8000', remote=True) +# db = dictdb('http://localhost:8000', remote=True, db_name='test_db', rebuild=False) db[1] = 1 db[1.1] = 1 / 3 @@ -116,10 +106,27 @@ for key, value in db.items(): print(len(db)) ``` -也许你注意到即使到程序结束并没有使用到`db.close()`来进行资源释放!这一切都将被自动处理。 -更重要的是,它(作为持久化数据库)还提供了接近字典(内存)存取的性能!(这里应存在一个benchmark..) -PS: 当然也可以手动调用 `db.close()` 来立即释放资源 + +### Tips + +- `flaxkv`作为持久化数据库提供了接近原生字典(内存)存取的性能!(这里应存在一个benchmark) +- 也许你注意到在前面的示例代码中并没有使用到`db.close()`来进行资源释放!因为这一切都将被`flaxkv`自动处理。 当然也可以手动调用 `db.close()` 来立即释放资源 +- 由于`flaxkv`通过缓冲写入的方式来保存数据,这种延迟写入的特性在一些场景(如jupyter中)下将不能及时将数据写入磁盘, +此时可使用`db.write_immediately()`来立即触发写入操作。 + +### Benchmark +todo + + +### 适用场景 + +- **键-值型结构**: + 用于保存简单的键值结构数据 +- **高频写入**: + 非常适合需要高频插入/更新数据的场景 +- **机器学习**: + `flaxkv`十分适合用于保存机器学习中的各种嵌入向量、图像、文本和其它键-值结构的大型数据集。 ## 引用 diff --git a/flaxkv/__init__.py b/flaxkv/__init__.py index 638ca54..8be449d 100644 --- a/flaxkv/__init__.py +++ b/flaxkv/__init__.py @@ -15,7 +15,7 @@ from .core import LevelDBDict, LMDBDict from .serve.client import RemoteDictDB -__version__ = "0.1.6" +__version__ = "0.1.7" __all__ = [ "dictdb", diff --git a/flaxkv/core.py b/flaxkv/core.py index 1bf6fab..550b997 100644 --- a/flaxkv/core.py +++ b/flaxkv/core.py @@ -183,6 +183,18 @@ def _close_background_worker(self, write=True): "Warning: Background thread did not finish in time. Some data might not be saved." ) + def _encode_key(self, key): + if self.raw: + return key + else: + return encode(key) + + def _encode_value(self, value): + if self.raw: + return value + else: + return encode(value) + def get(self, key: Any, default=None): """ Retrieves the value associated with the given key. @@ -201,16 +213,13 @@ def get(self, key: Any, default=None): if key in self.buffer_dict: return self.buffer_dict[key] - if self.raw: - value = self._static_view.get(key) - else: - value = self._static_view.get(encode(key)) + key = self._encode_key(key) + value = self._static_view.get(key) + if value is None: return default - if self.raw: - return value - else: - return decode(value) + + return value if self.raw else decode(value) def get_db_value(self, key: str): """ @@ -222,10 +231,8 @@ def get_db_value(self, key: str): Returns: value: The encoded value associated with the key. """ - if self.raw: - return self._static_view.get(key) - else: - return self._static_view.get(encode(key)) + key = self._encode_key(key) + return self._static_view.get(key) def get_batch(self, keys): """ @@ -245,7 +252,8 @@ def get_batch(self, keys): if key in self.buffer_dict: values.append(self.buffer_dict[key]) continue - value = self._static_view.get(encode(key)) + key = self._encode_key(key) + value = self._static_view.get(key) if value is not None: value = decode(value) values.append(value) @@ -340,15 +348,11 @@ def _write_buffer_to_db( with self._db_manager.write() as wb: try: for key, value in buffer_dict_snapshot.items(): - if self.raw: - wb.put(key, value) - else: - wb.put(encode(key), encode(value)) + key, value = self._encode_key(key), self._encode_value(value) + wb.put(key, value) for key in delete_buffer_set_snapshot: - if self.raw: - wb.delete(key) - else: - wb.delete(encode(key)) + key = self._encode_key(key) + wb.delete(key) except Exception as e: traceback.print_exc() @@ -368,6 +372,12 @@ def _write_buffer_to_db( f"write {self._db_manager.db_type.upper()} buffer to db successfully-{current_write_num=}-{self._latest_write_num=}" ) + def __iter__(self): + """ + Returns an iterator over the keys. + """ + return iter(self.keys()) + def __getitem__(self, key): """ Retrieves the value for a given key using the dictionary access syntax. @@ -378,8 +388,9 @@ def __getitem__(self, key): Returns: value: The value associated with the key. """ - value = self.get(key) - if value is None: + + value = self.get(key, b'iamnone') + if isinstance(value, bytes) and value == b'iamnone': raise KeyError(f"Key `{key}` not found in the database.") return value @@ -432,10 +443,8 @@ def pop(self, key, default=None): else: return value else: - if self.raw: - value = self._static_view.get(key) - else: - value = self._static_view.get(encode(key)) + key = self._encode_key(key) + value = self._static_view.get(key) return decode(value) else: return default @@ -455,11 +464,8 @@ def __contains__(self, key): return True if key in self.delete_buffer_set: return False - - if self.raw: - return self._static_view.get(key) is not None - else: - return self._static_view.get(encode(key)) is not None + key = self._encode_key(key) + return self._static_view.get(key) is not None def clear(self): """ @@ -616,7 +622,7 @@ def keys(self, decode_raw=True): return list(lmdb_keys.union(buffer_keys) - delete_buffer_set) - def items(self, decode_raw=True): + def db_dict(self, decode_raw=True): ( buffer_dict, buffer_keys, @@ -638,6 +644,10 @@ def items(self, decode_raw=True): self._db_manager.close_static_view(session) + return _db_dict + + def items(self, decode_raw=True): + _db_dict = self.db_dict(decode_raw=decode_raw) return _db_dict.items() def set_mapsize(self, map_size): @@ -688,7 +698,7 @@ def keys(self, decode_raw=True): return list(db_keys.union(buffer_keys) - delete_buffer_set) - def items(self, decode_raw=True): + def db_dict(self, decode_raw=True): ( buffer_dict, buffer_keys, @@ -708,6 +718,10 @@ def items(self, decode_raw=True): _db_dict.update(buffer_dict) self._db_manager.close_static_view(snapshot) + return _db_dict + + def items(self, decode_raw=True): + _db_dict = self.db_dict(decode_raw=decode_raw) return _db_dict.items() def stat(self): diff --git a/flaxkv/serve/app.py b/flaxkv/serve/app.py index e443dd1..f4dbe18 100644 --- a/flaxkv/serve/app.py +++ b/flaxkv/serve/app.py @@ -20,6 +20,7 @@ @post(path="/attach") async def attach(data: AttachRequest) -> dict: + # todo switch `post` to `get` db = db_manager.get(data.db_name) if db is None or data.rebuild: db_manager.set_db( @@ -30,6 +31,7 @@ async def attach(data: AttachRequest) -> dict: @post(path="/detach") async def detach(data: DetachRequest) -> dict: + # todo switch `post` to `get` db = db_manager.detach(db_name=data.db_name) if db is None: return {"success": False, "info": "db not found"} @@ -75,7 +77,7 @@ async def update_raw(db_name: str, request: Request) -> dict: @post("/get_raw", media_type=MediaType.TEXT) -async def get_raw(db_name: str, request: Request) -> bytes: +async def _get(db_name: str, request: Request) -> bytes: db = db_manager.get(db_name) if db is None: raise ValueError("db not found") @@ -124,7 +126,7 @@ async def get_keys(db_name: str) -> dict: @get("/values", media_type=MediaType.TEXT) @msg_encoder -async def get_values(db_name: str) -> bytes: +async def _values(db_name: str) -> bytes: db = db_manager.get(db_name) if db is None: return {"success": False, "info": "db not found"} @@ -142,7 +144,7 @@ async def get_items(db_name: str) -> bytes: if db is None: return {"success": False, "info": "db not found"} try: - return {"success": True, "data": dict(db.items())} + return {"success": True, "data": db.db_dict()} except Exception as e: traceback.print_exc() return {"success": False, "info": str(e)} @@ -158,9 +160,9 @@ def on_shutdown(): detach, set_raw, update_raw, - get_raw, + _get, get_items, - get_values, + _values, set_value, contains, pop, diff --git a/flaxkv/serve/client.py b/flaxkv/serve/client.py index 53e2793..b9b199b 100644 --- a/flaxkv/serve/client.py +++ b/flaxkv/serve/client.py @@ -5,11 +5,17 @@ class RemoteDictDB: def __init__( - self, url: str, db_name: str, rebuild=False, backend="leveldb", timeout=6 + self, + url: str, + db_name: str, + rebuild=False, + backend="leveldb", + timeout=6, + **kwargs, ): self._url = url self._db_name = db_name - self._client = httpx.Client(timeout=timeout) + self._client = kwargs.pop("client", httpx.Client(timeout=timeout)) self._attach_db(rebuild=rebuild, backend=backend) def _attach_db(self, rebuild=False, backend="lmdb"): @@ -59,7 +65,7 @@ def pop(self, key, default=None): def _items_dict(self): url = f"{self._url}/items?db_name={self._db_name}" response = self._client.get(url) - return decode(response.read()) + return decode(response.read())["data"] def items(self): return self._items_dict().items() diff --git a/pyproject.toml b/pyproject.toml index 283582e..f3e0c39 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "msgspec>=0.18.4", "msgpack", "lmdb>=1.4.1", + "typing-extensions>=4.7.1", ] dynamic = ["version"] @@ -42,6 +43,7 @@ Source = "https://github.com/KenyonY/flaxkv" [project.optional-dependencies] test = [ + "litestar", "pytest", "pytest-aiohttp", ] diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..40ce0a0 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,8 @@ +[pytest] +faulthandler_timeout=180 +addopts = --doctest-modules --doctest-glob=README.md --doctest-glob=*.py --ignore=setup.py +testpaths = + tests +python_files = + test_*.py + *_test.py diff --git a/tests/test_serve.py b/tests/test_serve.py new file mode 100644 index 0000000..55ff008 --- /dev/null +++ b/tests/test_serve.py @@ -0,0 +1,38 @@ +from litestar.status_codes import HTTP_200_OK, HTTP_201_CREATED +from litestar.testing import TestClient + +from flaxkv.serve.app import app +from flaxkv.serve.client import RemoteDictDB + + +def test_api(): + db_name = "test_db" + backend = "lmdb" + rebuild = True + with TestClient(app=app) as client: + db = RemoteDictDB( + url="", db_name=db_name, backend=backend, rebuild=rebuild, client=client + ) + assert db.keys() == [] + target_dict = dict( + [ + ("string", "string"), + ("int", 123), + (2, 2), + (2.5, 1 / 3), + (True, False), + (b'string', b'string'), + ((1, 2, 3), [1, 2, 3]), + ((1, (2, 3)), [1, 2, 3]), + ((1, (2, 3), (2, 3, (3, 4))), [1, 2, 3]), + ("test_key", "test_value"), + ('dict', {'a': 1, 'b': 2}), + # ('set', {1, 2, 3, '1', '2', '3'}), # do not support currently + ('list', [1, 2, 3, '1', '2', '3']), + ('nest_dict', {'a': {'b': 1, 'c': 2, 'd': {'e': 1, 'f': '2', 'g': 3}}}), + ] + ) + for key, value in target_dict.items(): + db[key] = value + for key, value in target_dict.items(): + assert db[key] == value