Skip to content

Commit

Permalink
Encrypt banlist (#88)
Browse files Browse the repository at this point in the history
Closes #66

Banlist is now encrypted, and decrypted into memory when creating a `Freqlog`/ `SQLiteBackend` object. Password is supplied through `password` arg in `Freqlog.__init__()` and `SQLiteBackend.__init__()` and by user through prompts in GUI and CLI. To make this easier the case-sensitivity code from all ban-related functions and `ban_lower` table from db were removed.

Additional changes:
- Fix bug where upgrade prompt is shown on first run
- Populate upgrade function in `SQLiteBackend` to upgrade from lower versions
- Add `is_backend_initialized` and `is_db_populated` functions to `Freqlog` and `SQLiteBackend`, respectively
- Add `encrypt`, `decrypt`, `set_password`, `check_password` and `_fetch_config` functions to `SQLiteBackend`
- Simplify GUI dialog generation code
- Improve GUI error/confirm dialogs
- Add cryptography dependency
- Gitignore all `sqlite3` files to ease backups
- Bump version to 0.5.0

---------

Signed-off-by: Raymond Li <[email protected]>
Co-authored-by: Priyanshu Tripathi <[email protected]>
  • Loading branch information
Raymo111 and GetPsyched authored Dec 7, 2023
1 parent 862b668 commit c4972e7
Show file tree
Hide file tree
Showing 13 changed files with 643 additions and 580 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.idea/
log.txt
nexus_freqlog_db.sqlite3
*.sqlite3
*.sqlite3-journal
build/
dist/
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ pyinstaller~=5.13
setuptools~=68.1
PySide6~=6.5
pySerial~=3.5
cryptography~=41.0
8 changes: 4 additions & 4 deletions src/nexus/CCSerial/CCSerial.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,13 @@ def list_device_chords(self) -> Iterator[str]:
for j, c in enumerate(chord_int):
if c < 32: # 10-bit scan code
chord_int[j + 1] = (chord_int[j] << 8) | chord_int[j + 1]
elif c == 296: # enter
elif c == 296: # Enter
chord_utf8.append("\n")
elif c == 298 and len(chord_utf8) > 0: # backspace
elif c == 298 and len(chord_utf8) > 0: # Backspace
chord_utf8.pop()
elif c == 299: # tab
elif c == 299: # Tab
chord_utf8.append("\t")
elif c == 544: # spaceright
elif c == 544: # Spaceright
chord_utf8.append(" ")
elif c > 126: # TODO: support non-ASCII characters
continue
Expand Down
79 changes: 47 additions & 32 deletions src/nexus/Freqlog/Freqlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime, timedelta
from queue import Empty as EmptyException, Queue
from threading import Thread
from typing import Optional

from pynput import keyboard as kbd, mouse
from serial import SerialException
Expand Down Expand Up @@ -104,9 +105,9 @@ def _log_and_reset_word(min_length: int = 2) -> None:
# Only log words/chords that have >= min_length characters
if len(word) >= min_length:
if avg_char_time_after_last_bs and avg_char_time_after_last_bs > timedelta(
milliseconds=self.chord_char_threshold): # word, based on backspace timing
milliseconds=self.chord_char_threshold): # Word, based on backspace timing
self._log_word(word, word_start_time, word_end_time)
else: # chord
else: # Chord
self._log_chord(word, word_start_time, word_end_time)

word = ""
Expand Down Expand Up @@ -149,7 +150,7 @@ def _log_and_reset_word(min_length: int = 2) -> None:
word = word[:word.rfind("\t")]
elif "\n" in word:
word = word[:word.rfind("\n")]
else: # word is only one word
else: # Word is only one word
word = ""
else:
word = word[:-1]
Expand Down Expand Up @@ -202,7 +203,7 @@ def _log_and_reset_word(min_length: int = 2) -> None:
word_end_time = time_pressed
self.q.task_done()

except EmptyException: # queue is empty
except EmptyException: # Queue is empty
# If word is older than NEW_WORD_THRESHOLD seconds, log and reset word
if word:
_log_and_reset_word()
Expand Down Expand Up @@ -232,14 +233,30 @@ def _get_chords(self):
if self.dev:
self.dev.close()

def __init__(self, path: str = Defaults.DEFAULT_DB_PATH, loggable: bool = True,
upgrade_callback: callable = None) -> None:
@staticmethod
def is_backend_initialized(backend_path: str) -> bool:
"""
Check if backend is initialized
:param backend_path: Path to backend (currently == SQLiteBackend)
:raises the same exceptions as SQLiteBackend.does_db_exist():
:return: True if backend is initialized, False otherwise
"""
return SQLiteBackend.is_db_populated(backend_path)

def __init__(self, backend_path: str, password_callback: callable, loggable: bool = True,
upgrade_callback: Optional[callable] = None) -> None:
"""
Initialize Freqlog
:param path: Path to backend (currently == SQLiteBackend)
:param backend_path: Path to backend (currently == SQLiteBackend)
:param password_callback: Callback to call to get password to encrypt/decrypt banlist entries
Should take one argument: whether the password is being set for the first time
:param loggable: Whether to create listeners
:param upgrade_callback: Callback to run if database is upgraded
:raises ValueError: If the database version is newer than the current version
:raises PermissionError: If the database path is not readable or writable
:raises IsADirectoryError: If the database path is not a file
:raises FileNotFoundError: If the database path does not exist
:raises cryptography.fernet.InvalidToken: If the password is incorrect
"""
logging.info("Initializing freqlog")
self.dev: CCSerial | None = None
Expand Down Expand Up @@ -267,7 +284,7 @@ def __init__(self, path: str = Defaults.DEFAULT_DB_PATH, loggable: bool = True,

self.is_logging: bool = False # Used in self._get_chords, needs to be initialized here
if loggable:
logging.info(f"Logging set to freqlog db at {path}")
logging.info(f"Logging set to freqlog db at {backend_path}")

# Asynchronously get chords from device
if self.dev:
Expand All @@ -276,7 +293,7 @@ def __init__(self, path: str = Defaults.DEFAULT_DB_PATH, loggable: bool = True,
if self.dev:
self.dev.close()

self.backend: Backend = SQLiteBackend(path, upgrade_callback)
self.backend: Backend = SQLiteBackend(backend_path, password_callback, upgrade_callback)
self.q: Queue = Queue()
self.listener: kbd.Listener | None = None
self.mouse_listener: mouse.Listener | None = None
Expand Down Expand Up @@ -343,35 +360,34 @@ def get_chord_metadata(self, chord: str) -> ChordMetadata | None:
logging.info(f"Getting metadata for '{chord}'")
return self.backend.get_chord_metadata(chord)

def get_banlist_entry(self, word: str, case: CaseSensitivity) -> BanlistEntry | None:
def get_banlist_entry(self, word: str) -> BanlistEntry | None:
"""
Get a banlist entry
:param word: Word to get entry for
:param case: Case sensitivity
:return: BanlistEntry if word is banned for the specified case, None otherwise
"""
logging.info(f"Getting banlist entry for '{word}', case {case.name}")
return self.backend.get_banlist_entry(word, case)
logging.info(f"Getting banlist entry for '{word}'")
return self.backend.get_banlist_entry(word)

def check_banned(self, word: str, case: CaseSensitivity) -> bool:
def check_banned(self, word: str) -> bool:
"""
Check if a word is banned
:returns: True if word is banned, False otherwise
"""
logging.info(f"Checking if '{word}' is banned, case {case.name}")
return self.backend.check_banned(word, case)
logging.info(f"Checking if '{word}' is banned")
return self.backend.check_banned(word)

def ban_word(self, word: str, case: CaseSensitivity, time_added: datetime = datetime.now()) -> bool:
def ban_word(self, word: str, time_added: datetime = datetime.now()) -> bool:
"""
Delete a word/chord entry and add it to the ban list
:returns: True if word was banned, False if it was already banned
"""
logging.info(f"Banning '{word}', case {case.name} - {time}")
res = self.backend.ban_word(word, case, time_added)
logging.info(f"Banning '{word}' - {time}")
res = self.backend.ban_word(word, time_added)
if res:
logging.warning(f"Banned '{word}', case {case.name}")
logging.warning(f"Banned '{word}'")
else:
logging.warning(f"'{word}', case {case.name} already banned")
logging.warning(f"'{word}' is already banned")
return res

def ban_words(self, entries: dict[str: CaseSensitivity], time_added: datetime = datetime.now()) -> list[bool]:
Expand All @@ -382,7 +398,7 @@ def ban_words(self, entries: dict[str: CaseSensitivity], time_added: datetime =
:return: list of bools, True if word was banned, False if it was already banned
"""
logging.info(f"Banning {len(entries)} words - {time_added}")
return [self.ban_word(word, case, time_added) for word, case in entries.items()]
return [self.ban_word(word, time_added) for word, case in entries.items()]

def delete_word(self, word: str, case: CaseSensitivity) -> bool:
"""
Expand All @@ -406,29 +422,28 @@ def delete_words(self, entries: dict[str: CaseSensitivity]) -> list[bool]:
logging.info(f"Deleting {len(entries)} words")
return [self.delete_word(word, case) for word, case in entries.items()]

def unban_word(self, word: str, case: CaseSensitivity) -> bool:
def unban_word(self, word: str) -> bool:
"""
Remove a banlist entry
:param word: Word to unban
:param case: Case sensitivity
:returns: True if word was unbanned, False if it was already not banned
"""
logging.info(f"Unbanning '{word}', case {case.name}")
res = self.backend.unban_word(word, case)
logging.info(f"Unbanning '{word}'")
res = self.backend.unban_word(word)
if res:
logging.warning(f"Unbanned '{word}', case {case.name}")
logging.warning(f"Unbanned '{word}'")
else:
logging.warning(f"'{word}', case {case.name} isn't banned")
logging.warning(f"'{word}' isn't banned")
return res

def unban_words(self, entries: dict[str: CaseSensitivity]) -> list[bool]:
def unban_words(self, entries: list[str]) -> list[bool]:
"""
Remove multiple banlist entries
:param entries: dict of {word to ban: case sensitivity}
:return: list of bools, True if word was unbanned, False if it was already unbanned
"""
logging.info(f"Unbanning {len(entries)} words")
return [self.unban_word(word, case) for word, case in entries.items()]
return [self.unban_word(word) for word in entries]

def num_words(self, case: CaseSensitivity = CaseSensitivity.INSENSITIVE) -> int:
"""
Expand Down Expand Up @@ -534,13 +549,13 @@ def delete_logged_chords(self, chords: list[str]) -> list[bool]:
return [self.delete_logged_chord(chord) for chord in chords]

def list_banned_words(self, limit: int = -1, sort_by: BanlistAttr = BanlistAttr.word,
reverse: bool = False) -> tuple[set[BanlistEntry], set[BanlistEntry]]:
reverse: bool = False) -> list[BanlistEntry]:
"""
List banned words
:param limit: Maximum number of banned words to return
:param sort_by: Attribute to sort by: word
:param reverse: Reverse sort order
:returns: Tuple of (banned words with case, banned words without case)
:returns: List of banned words
"""
logging.info(f"Listing banned words, limit {limit}, sort_by {sort_by}, reverse {reverse}")
return self.backend.list_banned_words(limit, sort_by, reverse)
Expand Down
13 changes: 6 additions & 7 deletions src/nexus/Freqlog/backends/Backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ def get_chord_metadata(self, chord: str) -> ChordMetadata | None:
"""

@abstractmethod
def get_banlist_entry(self, word: str, case: CaseSensitivity) -> BanlistEntry | None:
def get_banlist_entry(self, word: str) -> BanlistEntry | None:
"""
Get a banlist entry
:param word: Word to get entry for
:param case: Case sensitivity
:return: BanlistEntry if word is banned for the specified case, None otherwise
"""

Expand All @@ -55,14 +54,14 @@ def log_chord(self, chord: str, end_time: datetime) -> None:
"""

@abstractmethod
def check_banned(self, word: str, case: CaseSensitivity) -> bool:
def check_banned(self, word: str) -> bool:
"""
Check if a word is banned
:returns: True if word is banned, False otherwise
"""

@abstractmethod
def ban_word(self, word: str, case: CaseSensitivity, time: datetime) -> bool:
def ban_word(self, word: str, time: datetime) -> bool:
"""
Delete a word/chord entry and add it to the ban list
:returns: True if word was banned, False if it was already banned
Expand All @@ -76,7 +75,7 @@ def delete_word(self, word: str, case: CaseSensitivity) -> bool:
"""

@abstractmethod
def unban_word(self, word: str, case: CaseSensitivity) -> bool:
def unban_word(self, word: str) -> bool:
"""
Remove a word from the ban list
:returns: True if word was unbanned, False if it was already not banned
Expand Down Expand Up @@ -126,13 +125,13 @@ def delete_chord(self, chord: str) -> bool:

@abstractmethod
def list_banned_words(self, limit: int, sort_by: BanlistAttr,
reverse: bool) -> tuple[set[BanlistEntry], set[BanlistEntry]]:
reverse: bool) -> list[BanlistEntry]:
"""
List banned words
:param limit: Maximum number of banned words to return
:param sort_by: Attribute to sort by: word
:param reverse: Reverse sort order
:returns: Tuple of (banned words with case, banned words without case)
:returns: List of banned words
"""

@abstractmethod
Expand Down
Loading

0 comments on commit c4972e7

Please sign in to comment.