-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactors test_ingest_events.py removing Mocks
- Loading branch information
Mark Kasaboski
committed
Nov 29, 2024
1 parent
8aa3e9d
commit ec0a66b
Showing
2 changed files
with
163 additions
and
50 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
import json | ||
import pytest | ||
|
||
from datetime import datetime | ||
from pytest import FixtureRequest | ||
from typing import Any | ||
from typing import Dict | ||
from typing import List | ||
from typing import Optional | ||
|
||
|
||
class FakeStoragePassword: | ||
def __init__(self, username: str, clear_password: str) -> None: | ||
self._state = { | ||
"username": username, | ||
"clear_password": clear_password, | ||
} | ||
|
||
@property | ||
def content(self: "FakeStoragePassword") -> "FakeStoragePassword": | ||
return self | ||
|
||
@property | ||
def username(self) -> str: | ||
return self._state["username"] | ||
|
||
@property | ||
def clear_password(self) -> str: | ||
return self._state["clear_password"] | ||
|
||
|
||
class FakeStoragePasswords: | ||
def __init__(self, passwords: List[FakeStoragePassword]) -> None: | ||
self._passwords = passwords | ||
|
||
def list(self) -> List[FakeStoragePassword]: | ||
return self._passwords | ||
|
||
|
||
class FakeKVStoreCollectionData: | ||
def __init__(self) -> None: | ||
self._data: dict[str, str] = {} | ||
|
||
def insert(self, data: str) -> dict[str, str]: | ||
entry = json.loads(data) | ||
self._data[entry["_key"]] = entry["value"] | ||
return entry | ||
|
||
def update(self, id: str, data: str) -> dict[str, str]: | ||
entry = json.loads(data) | ||
self._data[id] = entry["value"] | ||
return entry | ||
|
||
def query(self, **query: dict) -> List[Dict[str, str]]: | ||
return [{"_key": key, "value": value} for key, value in self._data.items()] | ||
|
||
|
||
class FakeKVStoreCollection: | ||
def __init__(self) -> None: | ||
self._data = FakeKVStoreCollectionData() | ||
|
||
@property | ||
def data(self) -> FakeKVStoreCollectionData: | ||
return self._data | ||
|
||
|
||
class FakeKVStoreCollections: | ||
def __init__(self) -> None: | ||
self._collections: dict[str, Any] = {} | ||
|
||
def __getitem__(self, key: str) -> FakeKVStoreCollection: | ||
return self._collections[key] | ||
|
||
def __contains__(self, key: str) -> bool: | ||
return key in self._collections | ||
|
||
def create(self, name: str, fields: dict) -> dict[str, Any]: | ||
self._collections[name] = FakeKVStoreCollection() | ||
return {"headers": {}, "reason": "Created", "status": 200, "body": ""} | ||
|
||
|
||
class FakeLogger: | ||
def __init__(self) -> None: | ||
self.messages: List[str] = [] | ||
|
||
def info(self, message: str) -> None: | ||
self.messages.append(f"INFO: {message}") | ||
|
||
def error(self, message: str) -> None: | ||
self.messages.append(f"ERROR: {message}") | ||
|
||
|
||
class FakeFlareAPI: | ||
def __init__(self, api_key: str, tenant_id: int) -> None: | ||
self.api_key = api_key | ||
self.tenant_id = tenant_id | ||
|
||
def fetch_feed_events( | ||
self, | ||
next: Optional[str], | ||
start_date: Optional[datetime], | ||
ingest_metadata_only: bool, | ||
) -> List[tuple[dict, str]]: | ||
return [({"event": "test_event"}, "next_token")] | ||
|
||
|
||
@pytest.fixture | ||
def storage_passwords(request: FixtureRequest) -> FakeStoragePasswords: | ||
passwords: list[FakeStoragePassword] = request.param | ||
return FakeStoragePasswords(passwords=passwords) | ||
|
||
|
||
@pytest.fixture | ||
def kvstore() -> FakeKVStoreCollections: | ||
return FakeKVStoreCollections() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters