Skip to content

Commit

Permalink
Encrypt private key field with key or password in environment
Browse files Browse the repository at this point in the history
  • Loading branch information
mccwdev committed Feb 22, 2024
1 parent 45828ac commit 9403f24
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 27 deletions.
1 change: 1 addition & 0 deletions .github/workflows/unittests-postgresql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ jobs:
env:
BCL_CONFIG_FILE: config.ini.unittest
UNITTEST_DATABASE: postgresql
DB_FIELD_ENCRYPTION_PASSWORD: verybadpassword
run: coverage run --source=bitcoinlib -m unittest -v
4 changes: 3 additions & 1 deletion bitcoinlib/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
ALLOW_DATABASE_THREADS = None
DATABASE_ENCRYPTION_ENABLED = False
DB_FIELD_ENCRYPTION_KEY = None
DB_FIELD_ENCRYPTION_PASSWORD = None

# Services
TIMEOUT_REQUESTS = 5
Expand Down Expand Up @@ -238,7 +239,7 @@ def config_get(section, var, fallback, is_boolean=False):
global ALLOW_DATABASE_THREADS, DEFAULT_DATABASE_CACHE
global BCL_LOG_FILE, LOGLEVEL, ENABLE_BITCOINLIB_LOGGING
global TIMEOUT_REQUESTS, DEFAULT_LANGUAGE, DEFAULT_NETWORK, DEFAULT_WITNESS_TYPE
global SERVICE_CACHING_ENABLED, DATABASE_ENCRYPTION_ENABLED, DB_FIELD_ENCRYPTION_KEY
global SERVICE_CACHING_ENABLED, DATABASE_ENCRYPTION_ENABLED, DB_FIELD_ENCRYPTION_KEY, DB_FIELD_ENCRYPTION_PASSWORD
global SERVICE_MAX_ERRORS, BLOCK_COUNT_CACHE_TIME, MAX_TRANSACTIONS

# Read settings from Configuration file provided in OS environment~/.bitcoinlib/ directory
Expand Down Expand Up @@ -273,6 +274,7 @@ def config_get(section, var, fallback, is_boolean=False):
SERVICE_CACHING_ENABLED = config_get('common', 'service_caching_enabled', fallback=True, is_boolean=True)
DATABASE_ENCRYPTION_ENABLED = config_get('common', 'database_encryption_enabled', fallback=False, is_boolean=True)
DB_FIELD_ENCRYPTION_KEY = os.environ.get('DB_FIELD_ENCRYPTION_KEY')
DB_FIELD_ENCRYPTION_PASSWORD = os.environ.get('DB_FIELD_ENCRYPTION_PASSWORD')

# Log settings
ENABLE_BITCOINLIB_LOGGING = config_get("logs", "enable_bitcoinlib_logging", fallback=True, is_boolean=True)
Expand Down
60 changes: 38 additions & 22 deletions bitcoinlib/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from sqlalchemy.orm import sessionmaker, relationship, session
from urllib.parse import urlparse
from bitcoinlib.main import *
from bitcoinlib.encoding import aes_encrypt, aes_decrypt
from bitcoinlib.encoding import aes_encrypt, aes_decrypt, double_sha256

_logger = logging.getLogger(__name__)
Base = declarative_base()
Expand Down Expand Up @@ -136,28 +136,43 @@ def add_column(engine, table_name, column):
return result


def _get_encryption_key(default_impl):
impl = default_impl
key = None
if DATABASE_ENCRYPTION_ENABLED:
if not (DB_FIELD_ENCRYPTION_KEY or DB_FIELD_ENCRYPTION_PASSWORD):
_logger.warning("Database encryption is enabled but value DB_FIELD_ENCRYPTION_KEY not found in "
"environment. Please supply 32 bytes key as hexadecimal string.")
if DB_FIELD_ENCRYPTION_KEY:
impl = default_impl
key = bytes().fromhex(DB_FIELD_ENCRYPTION_KEY)
elif DB_FIELD_ENCRYPTION_PASSWORD:
impl = default_impl
key = double_sha256(bytes(DB_FIELD_ENCRYPTION_PASSWORD, 'utf8'))
return key, impl


class EncryptedBinary(TypeDecorator):
"""
FieldType for encrypted Binary storage using EAS encryption
"""

impl = LargeBinary
cache_ok = True
key = None
if DATABASE_ENCRYPTION_ENABLED:
if not DB_FIELD_ENCRYPTION_KEY:
_logger.warning("Database encryption is enabled but value DB_FIELD_ENCRYPTION_KEY not found in "
"environment. Please supply 32 bytes key as hexadecimal string.")
else:
key = bytes().fromhex(DB_FIELD_ENCRYPTION_KEY)
key, impl = _get_encryption_key(LargeBinary)
# if DATABASE_ENCRYPTION_ENABLED:
# if not DB_FIELD_ENCRYPTION_KEY:
# _logger.warning("Database encryption is enabled but value DB_FIELD_ENCRYPTION_KEY not found in "
# "environment. Please supply 32 bytes key as hexadecimal string.")
# if DB_FIELD_ENCRYPTION_KEY:
# key = bytes().fromhex(DB_FIELD_ENCRYPTION_KEY)

def process_bind_param(self, value, dialect):
if value is None or self.key is None or not DATABASE_ENCRYPTION_ENABLED:
if value is None or self.key is None or not (DB_FIELD_ENCRYPTION_KEY or DB_FIELD_ENCRYPTION_PASSWORD):
return value
return aes_encrypt(value, self.key)

def process_result_value(self, value, dialect):
if value is None or self.key is None or not DATABASE_ENCRYPTION_ENABLED:
if value is None or self.key is None or not (DB_FIELD_ENCRYPTION_KEY or DB_FIELD_ENCRYPTION_PASSWORD):
return value
return aes_decrypt(value, self.key)

Expand All @@ -167,26 +182,27 @@ class EncryptedString(TypeDecorator):
FieldType for encrypted String storage using EAS encryption
"""

impl = String
# impl = String
cache_ok = True
key = None
if DATABASE_ENCRYPTION_ENABLED:
if not DB_FIELD_ENCRYPTION_KEY:
_logger.warning("Database encryption is enabled but value DB_FIELD_ENCRYPTION_KEY not found in "
"environment. Please supply 32 bytes key as hexadecimal string.")
else:
impl = LargeBinary
key = bytes().fromhex(DB_FIELD_ENCRYPTION_KEY)
# key = None
# if DATABASE_ENCRYPTION_ENABLED:
# if not DB_FIELD_ENCRYPTION_KEY:
# _logger.warning("Database encryption is enabled but value DB_FIELD_ENCRYPTION_KEY not found in "
# "environment. Please supply 32 bytes key as hexadecimal string.")
# if DB_FIELD_ENCRYPTION_KEY:
# impl = LargeBinary
# key = bytes().fromhex(DB_FIELD_ENCRYPTION_KEY)
key, impl = _get_encryption_key(String)

def process_bind_param(self, value, dialect):
if value is None or self.key is None or not DATABASE_ENCRYPTION_ENABLED:
if value is None or self.key is None or not (DB_FIELD_ENCRYPTION_KEY or DB_FIELD_ENCRYPTION_PASSWORD):
return value
if not isinstance(value, bytes):
value = bytes(value, 'utf8')
return aes_encrypt(value, self.key)

def process_result_value(self, value, dialect):
if value is None or self.key is None or not DATABASE_ENCRYPTION_ENABLED:
if value is None or self.key is None or not (DB_FIELD_ENCRYPTION_KEY or DB_FIELD_ENCRYPTION_PASSWORD):
return value
return aes_decrypt(value, self.key).decode('utf8')

Expand Down
38 changes: 34 additions & 4 deletions tests/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from sqlalchemy.sql import text
from bitcoinlib.db import BCL_DATABASE_DIR
from bitcoinlib.wallets import Wallet
from bitcoinlib.config.config import DATABASE_ENCRYPTION_ENABLED
from bitcoinlib.keys import HDKey


try:
Expand Down Expand Up @@ -59,7 +59,7 @@

class TestSecurity(TestCase):

def test_security_wallet_field_encryption(self):
def test_security_wallet_field_encryption_key(self):
pk = 'xprv9s21ZrQH143K2HrtPWvqgD8mUhMrrfE1ZME43baM8ti3hWgJwWX1wjHc25y2x11seT5G3KeHFY28MyTRxceeW22kMDAWsMDn7' \
'rcWnEMFP3t'
pk_wif_enc_hex = \
Expand All @@ -68,8 +68,8 @@ def test_security_wallet_field_encryption(self):
'17f1ffea8e20844309f6fb6b281349a2b3915af3d12dc4c90c3b68f6666eb665682d'
pk_enc_hex = 'f8777f10a435d5e3fdbb64cfdcb929626ce38c7103e772921ad1fc21c5e69e474423a998523bf53565ab45711a14086c'

if not DATABASE_ENCRYPTION_ENABLED:
self.skipTest("Database encryption not enabled, skip this test")
if not os.environ.get('DB_FIELD_ENCRYPTION_KEY'):
self.skipTest("Database field encryption key not found in environment, skip this test")
self.assertEqual(os.environ.get('DB_FIELD_ENCRYPTION_KEY'),
'11223344556677889900aabbccddeeff11223344556677889900aabbccddeeff')

Expand All @@ -87,6 +87,36 @@ def test_security_wallet_field_encryption(self):
self.assertEqual(encrypted_main_key_wif.hex(), pk_wif_enc_hex)
self.assertEqual(encrypted_main_key_private.hex(), pk_enc_hex)

def test_security_wallet_field_encryption_password(self):
pk = ('zprvAWgYBBk7JR8GivM5h6vdbXRrYRC6CU9aFDsVp2gLZ82Tx74UGf7nnN4cToSvNsDnK19tkuyXjzBMDcYvuseYYE5Q4qQo9JaLuNGz'
'hfcovSp')
pk_wif_enc_hex = \
('92410397d5a80ce75cdb5b0fe1204fd2e1411752c75f7b32a8c6a5574570d8be97155bcc4a86b6d34b0e6c22dfe32d340cc90dae1'
'5e54316a9db538ad8a274881c7a45a7be0a00d6e5deda2ea28d8fa0ffcf8783b1fb580df6f5c056e43b79a93859bd083fc1922c86'
'c17a3f945bbad5fa699a9d1cb2fc9f240708a1eee90b')
pk_enc_hex = '1ff6958f0edc774f16d09d9fb36baa912fb9034f0e354c354a0a91c21e58fe05ad7c8089642565bf4fffd357db108117'

if not os.environ.get('DB_FIELD_ENCRYPTION_PASSWORD'):
self.skipTest("Database field encryption password not found in environment, skip this test")
self.assertEqual(os.environ.get('DB_FIELD_ENCRYPTION_PASSWORD'),
'verybadpassword')

wallet = Wallet.create('wlt-private-key-encryption-test-pwd', keys=pk,
db_uri=DATABASEFILE_UNITTESTS_ENCRYPTED)
wallet.new_key()
self.assertEqual(wallet.main_key.wif, pk)

if os.getenv('UNITTEST_DATABASE') == 'mysql':
db_query = text("SELECT wif, private FROM `keys` WHERE id=%d" % wallet._dbwallet.main_key_id)
else:
db_query = text("SELECT wif, private FROM keys WHERE id=%d" % wallet._dbwallet.main_key_id)
encrypted_main_key_wif = wallet.session.execute(db_query).fetchone()[0]
encrypted_main_key_private = wallet.session.execute(db_query).fetchone()[1]
self.assertIn(type(encrypted_main_key_wif), (bytes, memoryview), "Encryption of database private key failed!")
self.assertEqual(encrypted_main_key_wif.hex(), pk_wif_enc_hex)
self.assertEqual(encrypted_main_key_private.hex(), pk_enc_hex)
self.assertNotEqual(encrypted_main_key_private, HDKey(pk).private_byte)


if __name__ == '__main__':
main()

0 comments on commit 9403f24

Please sign in to comment.