diff --git a/README.md b/README.md index 6ed500b..e196333 100644 --- a/README.md +++ b/README.md @@ -35,11 +35,21 @@ following servers: that all incoming SMP messages for the creditors stored on the PostgreSQL server instance, are routed to this queue. - Also, a [RabbitMQ exchange] named **`creditors_out`** must be configured - on the broker instance. This exchange is for messages that must be sent - to accounting authorities. The routing key will represent the debtor ID - as hexadecimal (lowercase). For example, for debtor ID equal to 10, the - routing key will be "00.00.00.00.00.00.00.0a". + Also, the following [RabbitMQ exchanges] must be configured on the + broker instance: + + - **`creditors_out`**: For messages that must be sent to accounting + authorities. The routing key will represent the debtor ID as + hexadecimal (lowercase). For example, for debtor ID equal to 10, the + routing key will be "00.00.00.00.00.00.00.0a". + + - **`to_trade`**: For policy change notifications that must be sent to + the subsystem that is responsible for performing automatic circular + trades. The routing key will represent the highest 24 bits of the MD5 + digest of the creditor ID. For example, if the creditor ID is 123, the + routing key will be "1.1.1.1.1.1.0.0.0.0.0.1.0.0.0.0.0.1.1.0.0.0.1.1". + This allows different creditor accounts to be handled by different + database servers (sharding). **Note:** If you execute the "configure" command (see below), with the environment variable `SETUP_RABBITMQ_BINDINGS` set to `yes`, an @@ -146,7 +156,7 @@ PROTOCOL_BROKER_THREADS=3 PROTOCOL_BROKER_PREFETCH_COUNT=10 # The binding key with which the "$PROTOCOL_BROKER_QUEUE" -# RabbitMQ queue is bound to the "creditors_in" RabbitMQ topic +# RabbitMQ queue is bound to the incoming messages' topic # exchange (default "#"). The binding key must consist of zero or # more 0s or 1s, separated by dots, ending with a hash symbol. # For example: "0.1.#", "1.#", or "#". @@ -230,7 +240,8 @@ container allows you to execute the following *documented commands*: RabbitMQ broker, and remove the messages from the PostgreSQL database. * `flush_configure_accounts`, `flush_prepare_transfers`, - `flush_finalize_transfers` + `flush_finalize_transfers`, `flush_updated_ledgers`, + `flush_updated_policies`, `flush_updated_flags` Starts additional worker processes that send particular type of outgoing messages to the RabbitMQ broker, and remove the messages from the @@ -328,7 +339,7 @@ can be used for end-to-end testing. [Swaptacular Messaging Protocol]: https://github.com/swaptacular/swpt_accounts/blob/master/protocol.rst [RabbitMQ]: https://www.rabbitmq.com/ [RabbitMQ queue]: https://www.cloudamqp.com/blog/part1-rabbitmq-for-beginners-what-is-rabbitmq.html -[RabbitMQ exchange]: https://www.cloudamqp.com/blog/part4-rabbitmq-for-beginners-exchanges-routing-keys-bindings.html +[RabbitMQ exchanges]: https://www.cloudamqp.com/blog/part4-rabbitmq-for-beginners-exchanges-routing-keys-bindings.html [Redis]: https://redis.io/ [OAuth 2.0]: https://oauth.net/2/ [nginx]: https://en.wikipedia.org/wiki/Nginx diff --git a/development.env b/development.env index cc2fa45..4845f9e 100644 --- a/development.env +++ b/development.env @@ -2,7 +2,7 @@ # Configuration settings # ########################## -MIN_CREDITOR_ID=0x0000010000000000 +MIN_CREDITOR_ID=0x0000010100000000 MAX_CREDITOR_ID=0x000001ffffffffff PIN_PROTECTION_SECRET= @@ -51,6 +51,9 @@ APP_PROCESS_LEDGER_UPDATES_MAX_COUNT=100000 APP_FLUSH_CONFIGURE_ACCOUNTS_BURST_COUNT=10000 APP_FLUSH_PREPARE_TRANSFERS_BURST_COUNT=10000 APP_FLUSH_FINALIZE_TRANSFERS_BURST_COUNT=10000 +APP_FLUSH_UPDATED_LEDGER_BURST_COUNT=10000 +APP_FLUSH_UPDATED_POLICY_BURST_COUNT=10000 +APP_FLUSH_UPDATED_FLAGS_BURST_COUNT=10000 APP_CREDITORS_SCAN_DAYS=7 APP_CREDITORS_SCAN_BLOCKS_PER_QUERY=40 APP_CREDITORS_SCAN_BEAT_MILLISECS=25 diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 6301e7a..563d422 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -115,11 +115,15 @@ case $1 in exec flask swpt_creditors "$@" ;; flush_configure_accounts | flush_prepare_transfers | flush_finalize_transfers \ + | flush_updated_ledgers | flush_updated_policies | flush_updated_flags \ | flush_all) flush_configure_accounts=ConfigureAccountSignal flush_prepare_transfers=PrepareTransferSignal flush_finalize_transfers=FinalizeTransferSignal + flush_updated_ledgers=UpdatedLedgerSignal + flush_updated_policies=UpdatedPolicySignal + flush_updated_flags=UpdatedFlagsSignal flush_all= # For example: if `$1` is "flush_configure_accounts", diff --git a/migrations/versions/4260660af5b1_.py b/migrations/versions/4260660af5b1_.py new file mode 100644 index 0000000..9d2f94b --- /dev/null +++ b/migrations/versions/4260660af5b1_.py @@ -0,0 +1,53 @@ +"""empty message + +Revision ID: 4260660af5b1 +Revises: 824705144577 +Create Date: 2023-10-11 16:22:24.946584 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '4260660af5b1' +down_revision = '824705144577' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('updated_ledger_signal', + sa.Column('creditor_id', sa.BigInteger(), nullable=False), + sa.Column('debtor_id', sa.BigInteger(), nullable=False), + sa.Column('update_id', sa.BigInteger(), nullable=False), + sa.Column('creation_date', sa.DATE(), nullable=False), + sa.Column('account_id', sa.String(), nullable=False), + sa.Column('principal', sa.BigInteger(), nullable=False), + sa.Column('last_transfer_number', sa.BigInteger(), nullable=False), + sa.Column('ts', sa.TIMESTAMP(timezone=True), nullable=False), + sa.Column('inserted_at', sa.TIMESTAMP(timezone=True), nullable=False), + sa.PrimaryKeyConstraint('creditor_id', 'debtor_id', 'update_id') + ) + op.create_table('updated_policy_signal', + sa.Column('creditor_id', sa.BigInteger(), nullable=False), + sa.Column('debtor_id', sa.BigInteger(), nullable=False), + sa.Column('update_id', sa.BigInteger(), nullable=False), + sa.Column('policy_name', sa.String(), nullable=True), + sa.Column('min_principal', sa.BigInteger(), nullable=False), + sa.Column('max_principal', sa.BigInteger(), nullable=False), + sa.Column('peg_exchange_rate', sa.FLOAT(), nullable=True), + sa.Column('peg_debtor_id', sa.BigInteger(), nullable=True), + sa.Column('ts', sa.TIMESTAMP(timezone=True), nullable=False), + sa.Column('inserted_at', sa.TIMESTAMP(timezone=True), nullable=False), + sa.PrimaryKeyConstraint('creditor_id', 'debtor_id', 'update_id') + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('updated_policy_signal') + op.drop_table('updated_ledger_signal') + # ### end Alembic commands ### diff --git a/migrations/versions/4e310d9db669_.py b/migrations/versions/4e310d9db669_.py new file mode 100644 index 0000000..45a67e9 --- /dev/null +++ b/migrations/versions/4e310d9db669_.py @@ -0,0 +1,36 @@ +"""empty message + +Revision ID: 4e310d9db669 +Revises: 4260660af5b1 +Create Date: 2023-10-13 16:39:40.541127 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '4e310d9db669' +down_revision = '4260660af5b1' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('updated_flags_signal', + sa.Column('creditor_id', sa.BigInteger(), nullable=False), + sa.Column('debtor_id', sa.BigInteger(), nullable=False), + sa.Column('update_id', sa.BigInteger(), nullable=False), + sa.Column('config_flags', sa.Integer(), nullable=False), + sa.Column('ts', sa.TIMESTAMP(timezone=True), nullable=False), + sa.Column('inserted_at', sa.TIMESTAMP(timezone=True), nullable=False), + sa.PrimaryKeyConstraint('creditor_id', 'debtor_id', 'update_id') + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('updated_flags_signal') + # ### end Alembic commands ### diff --git a/swpt_creditors/__init__.py b/swpt_creditors/__init__.py index bf731b0..0c4807a 100644 --- a/swpt_creditors/__init__.py +++ b/swpt_creditors/__init__.py @@ -227,6 +227,9 @@ class Configuration(metaclass=MetaEnvReader): APP_FLUSH_CONFIGURE_ACCOUNTS_BURST_COUNT = 10000 APP_FLUSH_PREPARE_TRANSFERS_BURST_COUNT = 10000 APP_FLUSH_FINALIZE_TRANSFERS_BURST_COUNT = 10000 + APP_FLUSH_UPDATED_LEDGER_BURST_COUNT = 10000 + APP_FLUSH_UPDATED_POLICY_BURST_COUNT = 10000 + APP_FLUSH_UPDATED_FLAGS_BURST_COUNT = 10000 APP_CREDITORS_SCAN_DAYS = 7.0 APP_CREDITORS_SCAN_BLOCKS_PER_QUERY = 40 APP_CREDITORS_SCAN_BEAT_MILLISECS = 25 diff --git a/swpt_creditors/actors.py b/swpt_creditors/actors.py index 2bd54b6..bb6f58c 100644 --- a/swpt_creditors/actors.py +++ b/swpt_creditors/actors.py @@ -160,8 +160,9 @@ def _on_rejected_direct_transfer_signal( **kwargs ) -> None: if coordinator_type != CT_DIRECT: # pragma: no cover - _LOGGER.error('Unexpected coordinator type: "%s"', coordinator_type) - return + raise RuntimeError( + f'Unexpected coordinator type: "{coordinator_type}"' + ) procedures.process_rejected_direct_transfer_signal( coordinator_id=coordinator_id, @@ -190,8 +191,9 @@ def _on_prepared_direct_transfer_signal( **kwargs ) -> None: if coordinator_type != CT_DIRECT: # pragma: no cover - _LOGGER.error('Unexpected coordinator type: "%s"', coordinator_type) - return + raise RuntimeError( + f'Unexpected coordinator type: "{coordinator_type}"' + ) procedures.process_prepared_direct_transfer_signal( debtor_id=debtor_id, @@ -220,8 +222,9 @@ def _on_finalized_direct_transfer_signal( **kwargs ) -> None: if coordinator_type != CT_DIRECT: # pragma: no cover - _LOGGER.error('Unexpected coordinator type: "%s"', coordinator_type) - return + raise RuntimeError( + f'Unexpected coordinator type: "{coordinator_type}"' + ) procedures.process_finalized_direct_transfer_signal( debtor_id=debtor_id, diff --git a/swpt_creditors/cli.py b/swpt_creditors/cli.py index 8b180c6..31eda92 100644 --- a/swpt_creditors/cli.py +++ b/swpt_creditors/cli.py @@ -47,8 +47,13 @@ def subscribe(): # pragma: no cover """ - from .extensions import CREDITORS_IN_EXCHANGE, CREDITORS_OUT_EXCHANGE + from .extensions import ( + CREDITORS_IN_EXCHANGE, + CREDITORS_OUT_EXCHANGE, + TO_TRADE_EXCHANGE, + ) + CA_CREDITORS_EXCHANGE = "ca.creditors" logger = logging.getLogger(__name__) queue_name = current_app.config["PROTOCOL_BROKER_QUEUE"] routing_key = current_app.config["PROTOCOL_BROKER_QUEUE_ROUTING_KEY"] @@ -59,11 +64,32 @@ def subscribe(): # pragma: no cover # declare exchanges channel.exchange_declare( - CREDITORS_IN_EXCHANGE, exchange_type="topic", durable=True + CREDITORS_IN_EXCHANGE, exchange_type="headers", durable=True + ) + channel.exchange_declare( + CA_CREDITORS_EXCHANGE, exchange_type="topic", durable=True ) channel.exchange_declare( CREDITORS_OUT_EXCHANGE, exchange_type="topic", durable=True ) + channel.exchange_declare( + TO_TRADE_EXCHANGE, exchange_type="topic", durable=True + ) + + # declare exchange bindings + channel.exchange_bind( + source=CREDITORS_IN_EXCHANGE, + destination=CA_CREDITORS_EXCHANGE, + arguments={ + "x-match": "all", + "ca-creditors": True, + }, + ) + logger.info( + 'Created a binding from "%s" to the "%s" exchange.', + CREDITORS_IN_EXCHANGE, + CA_CREDITORS_EXCHANGE, + ) # declare a corresponding dead-letter queue channel.queue_declare(dead_letter_queue_name, durable=True) @@ -82,13 +108,13 @@ def subscribe(): # pragma: no cover # bind the queue channel.queue_bind( - exchange=CREDITORS_IN_EXCHANGE, + exchange=CA_CREDITORS_EXCHANGE, queue=queue_name, routing_key=routing_key, ) logger.info( 'Created a binding from "%s" to "%s" with routing key "%s".', - CREDITORS_IN_EXCHANGE, + CA_CREDITORS_EXCHANGE, queue_name, routing_key, ) diff --git a/swpt_creditors/extensions.py b/swpt_creditors/extensions.py index dcbca07..55595db 100644 --- a/swpt_creditors/extensions.py +++ b/swpt_creditors/extensions.py @@ -19,6 +19,7 @@ CREDITORS_IN_EXCHANGE = "creditors_in" DEBTORS_OUT_EXCHANGE = "debtors_out" DEBTORS_IN_EXCHANGE = "debtors_in" +TO_TRADE_EXCHANGE = "to_trade" warnings.filterwarnings( diff --git a/swpt_creditors/models/signals.py b/swpt_creditors/models/signals.py index 13505c5..f29e505 100644 --- a/swpt_creditors/models/signals.py +++ b/swpt_creditors/models/signals.py @@ -1,8 +1,12 @@ from __future__ import annotations from flask import current_app from marshmallow import Schema, fields -from swpt_pythonlib.utils import i64_to_hex_routing_key -from swpt_creditors.extensions import db, CREDITORS_OUT_EXCHANGE +from swpt_pythonlib.utils import i64_to_hex_routing_key, calc_bin_routing_key +from swpt_creditors.extensions import ( + db, + CREDITORS_OUT_EXCHANGE, + TO_TRADE_EXCHANGE, +) from .common import Signal, CT_DIRECT @@ -123,3 +127,122 @@ def routing_key(self): # pragma: no cover @classproperty def signalbus_burst_count(self): return current_app.config["APP_FLUSH_FINALIZE_TRANSFERS_BURST_COUNT"] + + +class UpdatedLedgerSignal(Signal): + """Notifies about a change in account's principal balance. + + In addition to the new principal balance, the notification contains the + account's identity (a string), the account's creation date, and the ID + (a number) of latest transfer to/from the account. The subsystem that + performs automatic circular trades needs this information to do its + work. + """ + + exchange_name = TO_TRADE_EXCHANGE + + class __marshmallow__(Schema): + type = fields.Constant("UpdatedLedger") + debtor_id = fields.Integer() + creditor_id = fields.Integer() + update_id = fields.Integer() + account_id = fields.String() + creation_date = fields.Date() + principal = fields.Integer() + last_transfer_number = fields.Integer() + ts = fields.DateTime() + + __marshmallow_schema__ = __marshmallow__() + + creditor_id = db.Column(db.BigInteger, primary_key=True) + debtor_id = db.Column(db.BigInteger, primary_key=True) + update_id = db.Column(db.BigInteger, primary_key=True) + creation_date = db.Column(db.DATE, nullable=False) + account_id = db.Column(db.String, nullable=False) + principal = db.Column(db.BigInteger, nullable=False) + last_transfer_number = db.Column(db.BigInteger, nullable=False) + ts = db.Column(db.TIMESTAMP(timezone=True), nullable=False) + + @property + def routing_key(self): # pragma: no cover + return calc_bin_routing_key(self.creditor_id) + + @classproperty + def signalbus_burst_count(self): + return current_app.config["APP_FLUSH_UPDATED_LEDGER_BURST_COUNT"] + + +class UpdatedPolicySignal(Signal): + """Notifies about a change in account's automatic exchange policy. + + The subsystem that performs automatic circular trades needs this + information to do its work. + """ + + exchange_name = TO_TRADE_EXCHANGE + + class __marshmallow__(Schema): + type = fields.Constant("UpdatedPolicy") + debtor_id = fields.Integer() + creditor_id = fields.Integer() + update_id = fields.Integer() + policy_name = fields.String() + min_principal = fields.Integer() + max_principal = fields.Integer() + peg_exchange_rate = fields.Float() + peg_debtor_id = fields.Integer() + ts = fields.DateTime() + + __marshmallow_schema__ = __marshmallow__() + + creditor_id = db.Column(db.BigInteger, primary_key=True) + debtor_id = db.Column(db.BigInteger, primary_key=True) + update_id = db.Column(db.BigInteger, primary_key=True) + policy_name = db.Column(db.String) + min_principal = db.Column(db.BigInteger, nullable=False) + max_principal = db.Column(db.BigInteger, nullable=False) + peg_exchange_rate = db.Column(db.FLOAT) + peg_debtor_id = db.Column(db.BigInteger) + ts = db.Column(db.TIMESTAMP(timezone=True), nullable=False) + + @property + def routing_key(self): # pragma: no cover + return calc_bin_routing_key(self.creditor_id) + + @classproperty + def signalbus_burst_count(self): + return current_app.config["APP_FLUSH_UPDATED_POLICY_BURST_COUNT"] + + +class UpdatedFlagsSignal(Signal): + """Notifies about a change in account's configuration flags. + + The subsystem that performs automatic circular trades needs this + information to do its work. + """ + + exchange_name = TO_TRADE_EXCHANGE + + class __marshmallow__(Schema): + type = fields.Constant("UpdatedFlags") + debtor_id = fields.Integer() + creditor_id = fields.Integer() + update_id = fields.Integer() + config_flags = fields.Integer() + ts = fields.DateTime() + + __marshmallow_schema__ = __marshmallow__() + + creditor_id = db.Column(db.BigInteger, primary_key=True) + debtor_id = db.Column(db.BigInteger, primary_key=True) + update_id = db.Column(db.BigInteger, primary_key=True) + config_flags = db.Column(db.Integer, nullable=False) + ts = db.Column(db.TIMESTAMP(timezone=True), nullable=False) + + @property + def routing_key(self): # pragma: no cover + return calc_bin_routing_key(self.creditor_id) + + @classproperty + def signalbus_burst_count(self): + return current_app.config["APP_FLUSH_UPDATED_FLAGS_BURST_COUNT"] diff --git a/swpt_creditors/procedures/account_updates.py b/swpt_creditors/procedures/account_updates.py index 3c3401b..814e935 100644 --- a/swpt_creditors/procedures/account_updates.py +++ b/swpt_creditors/procedures/account_updates.py @@ -7,6 +7,7 @@ from swpt_creditors.models import ( AccountData, ConfigureAccountSignal, + UpdatedLedgerSignal, LogEntry, PendingLogEntry, PendingLedgerUpdate, @@ -130,6 +131,7 @@ def process_account_update_signal( assert creation_date >= data.creation_date is_new_server_account = creation_date > data.creation_date + is_account_id_changed = account_id != data.account_id is_config_effectual = ( last_config_ts == data.last_config_ts and last_config_seqnum == data.last_config_seqnum @@ -172,19 +174,44 @@ def process_account_update_signal( if is_info_updated: _insert_info_update_pending_log_entry(data, current_ts) - if is_new_server_account: + if is_new_server_account or is_account_id_changed: + if is_new_server_account: + data.ledger_pending_transfer_ts = None + ledger_principal = 0 + ledger_last_transfer_number = 0 + else: # pragma: no cover + # When the `account_id` field is changed, we should send a + # corresponding `UpdatedLedgerSignal` message. To do this + # consistently with the Web API, first we need to add a ledger + # update log entry, even when the ledger did not really change. + assert is_account_id_changed + ledger_principal = data.ledger_principal + ledger_last_transfer_number = data.ledger_last_transfer_number + log_entry = _update_ledger( data=data, - transfer_number=0, + transfer_number=ledger_last_transfer_number, acquired_amount=0, - principal=0, + principal=ledger_principal, current_ts=current_ts, + always_insert_ledger_update_log_entry=True, ) if log_entry: + db.session.add( + UpdatedLedgerSignal( + creditor_id=creditor_id, + debtor_id=debtor_id, + update_id=data.ledger_latest_update_id, + account_id=data.account_id, + creation_date=data.creation_date, + principal=ledger_principal, + last_transfer_number=ledger_last_transfer_number, + ts=current_ts, + ) + ) db.session.add(log_entry) db.session.scalar(uid_seq) - data.ledger_pending_transfer_ts = None ensure_pending_ledger_update(data.creditor_id, data.debtor_id) @@ -303,6 +330,16 @@ def process_pending_ledger_update( db.session.delete(pending_ledger_update) if log_entry: + db.session.add(UpdatedLedgerSignal( + creditor_id=creditor_id, + debtor_id=debtor_id, + update_id=data.ledger_latest_update_id, + account_id=data.account_id, + creation_date=data.creation_date, + principal=data.ledger_principal, + last_transfer_number=data.ledger_last_transfer_number, + ts=current_ts, + )) db.session.add(log_entry) db.session.scalar(uid_seq) @@ -393,6 +430,7 @@ def _update_ledger( acquired_amount: int, principal: int, current_ts: datetime, + always_insert_ledger_update_log_entry: bool = False, ) -> Optional[PendingLogEntry]: should_insert_ledger_update_log_entry = ( _make_correcting_ledger_entry_if_necessary( @@ -426,7 +464,10 @@ def _update_ledger( data.ledger_principal = principal data.ledger_last_transfer_number = transfer_number - if should_insert_ledger_update_log_entry: + if ( + should_insert_ledger_update_log_entry + or always_insert_ledger_update_log_entry + ): data.ledger_latest_update_id += 1 data.ledger_latest_update_ts = current_ts diff --git a/swpt_creditors/procedures/accounts.py b/swpt_creditors/procedures/accounts.py index 892d5c6..e77539a 100644 --- a/swpt_creditors/procedures/accounts.py +++ b/swpt_creditors/procedures/accounts.py @@ -7,6 +7,9 @@ Account, AccountData, ConfigureAccountSignal, + UpdatedPolicySignal, + UpdatedLedgerSignal, + UpdatedFlagsSignal, AccountDisplay, AccountKnowledge, AccountExchange, @@ -15,6 +18,9 @@ Creditor, DEFAULT_NEGLIGIBLE_AMOUNT, DEFAULT_CONFIG_FLAGS, + DATE0, + MIN_INT64, + MAX_INT64, uid_seq, ) from .common import ( @@ -194,6 +200,14 @@ def update_account_config( data.last_config_seqnum = increment_seqnum(data.last_config_seqnum) data.is_config_effectual = False + db.session.add(UpdatedFlagsSignal( + creditor_id=creditor_id, + debtor_id=debtor_id, + update_id=latest_update_id, + config_flags=data.config_flags, + ts=current_ts, + )) + paths, types = get_paths_and_types() db.session.add( PendingLogEntry( @@ -407,6 +421,18 @@ def update_account_exchange( perform_update() exchange.latest_update_ts = current_ts + db.session.add(UpdatedPolicySignal( + creditor_id=creditor_id, + debtor_id=debtor_id, + update_id=latest_update_id, + policy_name=exchange.policy, + min_principal=exchange.min_principal, + max_principal=exchange.max_principal, + peg_exchange_rate=exchange.peg_exchange_rate, + peg_debtor_id=exchange.peg_debtor_id, + ts=current_ts, + )) + paths, types = get_paths_and_types() db.session.add( PendingLogEntry( @@ -579,6 +605,40 @@ def _log_account_deletion( is_deleted=True, ) + # NOTE: When an account has been deleted, notification messages must be + # sent to the subsystem that performs automatic circular trades. These + # are otherwise regular notifications, but they contain the default safe + # values for all of the fields. (The default values forbid all automatic + # circular trades for the account.) + db.session.add(UpdatedLedgerSignal( + creditor_id=creditor_id, + debtor_id=debtor_id, + update_id=object_update_id, + account_id='', + creation_date=DATE0, + principal=0, + last_transfer_number=0, + ts=current_ts, + )) + db.session.add(UpdatedPolicySignal( + creditor_id=creditor_id, + debtor_id=debtor_id, + update_id=object_update_id, + policy_name=None, + min_principal=MIN_INT64, + max_principal=MAX_INT64, + peg_exchange_rate=None, + peg_debtor_id=None, + ts=current_ts, + )) + db.session.add(UpdatedFlagsSignal( + creditor_id=creditor_id, + debtor_id=debtor_id, + update_id=object_update_id, + config_flags=DEFAULT_CONFIG_FLAGS, + ts=current_ts, + )) + def _insert_info_update_pending_log_entry( data: AccountData, current_ts: datetime diff --git a/swpt_creditors/procedures/common.py b/swpt_creditors/procedures/common.py index 0667033..c99de5d 100644 --- a/swpt_creditors/procedures/common.py +++ b/swpt_creditors/procedures/common.py @@ -27,6 +27,7 @@ AccountData.creditor_id, AccountData.debtor_id, AccountData.creation_date, + AccountData.account_id, AccountData.ledger_principal, AccountData.ledger_last_entry_id, AccountData.ledger_last_transfer_number, diff --git a/swpt_creditors/schemas/accounts.py b/swpt_creditors/schemas/accounts.py index c10b953..73df4c4 100644 --- a/swpt_creditors/schemas/accounts.py +++ b/swpt_creditors/schemas/accounts.py @@ -412,10 +412,11 @@ class AccountLedgerSchema(MutableResourceSchema): format="int64", description=( "The `entryID` of the next ledger entry to come. This will" - " always be a positive number. The first ledger entry for each" - " account will have an ID of `1`, and the ID of each" - " subsequent ledger entry will be equal to the ID of the" - " previous ledger entry plus one." + " always be a positive number. Normally, the entry ID of the" + " first ledger entry for each account will be `1`, but this is" + " not guaranteed. What is guaranteed though, is that the ID" + " of each subsequent ledger entry will be equal to the ID of" + " the previous ledger entry plus one." ), example=123, ), diff --git a/swpt_creditors/table_scanners.py b/swpt_creditors/table_scanners.py index f627d8d..9f69b45 100644 --- a/swpt_creditors/table_scanners.py +++ b/swpt_creditors/table_scanners.py @@ -14,6 +14,7 @@ LedgerEntry, CommittedTransfer, PendingLedgerUpdate, + UpdatedLedgerSignal, uid_seq, is_valid_creditor_id, ) @@ -477,6 +478,17 @@ def _update_ledger( data.ledger_latest_update_id += 1 data.ledger_latest_update_ts = current_ts + db.session.add(UpdatedLedgerSignal( + creditor_id=data.creditor_id, + debtor_id=data.debtor_id, + update_id=data.ledger_latest_update_id, + account_id=data.account_id, + creation_date=data.creation_date, + principal=principal, + last_transfer_number=data.ledger_last_transfer_number, + ts=current_ts, + )) + return PendingLogEntry( creditor_id=creditor_id, added_at=current_ts, diff --git a/tests/conftest.py b/tests/conftest.py index 1f1dec2..095fbc9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -61,6 +61,9 @@ def db_session(app): "TRUNCATE TABLE configure_account_signal", "TRUNCATE TABLE prepare_transfer_signal", "TRUNCATE TABLE finalize_transfer_signal", + "TRUNCATE TABLE updated_ledger_signal", + "TRUNCATE TABLE updated_policy_signal", + "TRUNCATE TABLE updated_flags_signal", ]: db.session.execute(sqlalchemy.text(cmd)) db.session.commit() diff --git a/tests/test_cli.py b/tests/test_cli.py index 63fce06..aceb4c7 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -251,6 +251,7 @@ def test_scan_accounts(app, db_session, current_ts): data2.principal = 1000 data2.ledger_latest_update_ts = current_ts - timedelta(days=60) data2.last_config_ts = current_ts - timedelta(days=1000) + data2_creation_date = data2.creation_date data3 = m.AccountData.query.filter_by(debtor_id=3).one() data3.last_transfer_number = 1 @@ -291,6 +292,14 @@ def test_scan_accounts(app, db_session, current_ts): assert le.principal == 1000 assert le.added_at >= current_ts + uls = m.UpdatedLedgerSignal.query.one() + assert uls.creditor_id == C_ID + assert uls.debtor_id == 2 + assert uls.creation_date == data2_creation_date + assert uls.principal == 1000 + assert uls.last_transfer_number == 0 + assert isinstance(uls.ts, date) + data2 = m.AccountData.query.filter_by(debtor_id=2).one() ple = m.PendingLogEntry.query.filter_by( diff --git a/tests/test_models.py b/tests/test_models.py index 6d78f3d..f845e85 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -8,6 +8,9 @@ def test_sibnalbus_burst_count(app): assert isinstance(m.ConfigureAccountSignal.signalbus_burst_count, int) assert isinstance(m.PrepareTransferSignal.signalbus_burst_count, int) assert isinstance(m.FinalizeTransferSignal.signalbus_burst_count, int) + assert isinstance(m.UpdatedLedgerSignal.signalbus_burst_count, int) + assert isinstance(m.UpdatedPolicySignal.signalbus_burst_count, int) + assert isinstance(m.UpdatedFlagsSignal.signalbus_burst_count, int) def test_account_data(db_session): diff --git a/tests/test_procedures.py b/tests/test_procedures.py index 4b98377..854f223 100644 --- a/tests/test_procedures.py +++ b/tests/test_procedures.py @@ -128,10 +128,13 @@ def test_delete_account(account, current_ts): "transfer_note_max_bytes": 500, } + assert len(models.UpdatedLedgerSignal.query.all()) == 0 p.process_account_update_signal(**params) + assert len(models.UpdatedLedgerSignal.query.all()) == 1 with pytest.raises(p.UnsafeAccountDeletion): p.delete_account(C_ID, D_ID) + assert len(models.UpdatedFlagsSignal.query.all()) == 0 latest_update_id = p.get_account_config(C_ID, D_ID).config_latest_update_id p.update_account_config( C_ID, @@ -142,6 +145,14 @@ def test_delete_account(account, current_ts): config_data="", latest_update_id=latest_update_id + 1, ) + ufs = models.UpdatedFlagsSignal.query.one() + assert ufs.creditor_id == C_ID + assert ufs.debtor_id == D_ID + assert ufs.config_flags == ( + models.DEFAULT_CONFIG_FLAGS + | AccountData.CONFIG_SCHEDULED_FOR_DELETION_FLAG + ) + assert isinstance(ufs.ts, date) config = p.get_account_config(C_ID, D_ID) params["last_change_seqnum"] += 1 @@ -150,12 +161,41 @@ def test_delete_account(account, current_ts): params["negligible_amount"] = config.negligible_amount params["config_flags"] = config.config_flags p.process_account_update_signal(**params) + + assert len(models.UpdatedLedgerSignal.query.all()) == 1 + assert len(models.UpdatedPolicySignal.query.all()) == 0 p.process_account_purge_signal( debtor_id=D_ID, creditor_id=C_ID, creation_date=date(2020, 1, 15) ) - p.delete_account(C_ID, D_ID) assert not p.get_account(C_ID, D_ID) + assert len(models.UpdatedLedgerSignal.query.all()) == 2 + + uls = models.UpdatedLedgerSignal.query.filter_by(account_id='').one() + assert uls.creditor_id == C_ID + assert uls.debtor_id == D_ID + assert uls.creation_date == models.DATE0 + assert uls.principal == 0 + assert uls.last_transfer_number == 0 + assert isinstance(uls.ts, date) + + ups = models.UpdatedPolicySignal.query.one() + assert ups.creditor_id == C_ID + assert ups.debtor_id == D_ID + assert ups.policy_name is None + assert ups.min_principal == models.MIN_INT64 + assert ups.max_principal == models.MAX_INT64 + assert ups.peg_exchange_rate is None + assert ups.peg_debtor_id is None + assert isinstance(ups.ts, date) + + assert len(models.UpdatedLedgerSignal.query.all()) == 2 + ufs = models.UpdatedFlagsSignal.query.filter_by( + config_flags=models.DEFAULT_CONFIG_FLAGS + ).one() + assert ufs.creditor_id == C_ID + assert ufs.debtor_id == D_ID + assert isinstance(ufs.ts, date) def test_process_account_update_signal(account): @@ -246,6 +286,15 @@ def get_data(): assert ad.last_transfer_number == 22 assert ad.last_transfer_committed_at == current_ts - timedelta(days=2) assert ad.config_error is None + uls = models.UpdatedLedgerSignal.query.one() + assert uls.creditor_id == C_ID + assert uls.debtor_id == D_ID + assert uls.update_id == ad.ledger_latest_update_id + assert uls.account_id == str(C_ID) + assert uls.creation_date == creation_date + assert uls.principal == 0 + assert uls.last_transfer_number == 0 + assert uls.ts == ad.ledger_latest_update_ts p.process_account_update_signal(**params) assert ad.last_change_seqnum == 1 @@ -295,6 +344,17 @@ def get_data(): assert ad.ledger_principal == 0 assert ad.ledger_last_entry_id == 89 assert ad.ledger_last_transfer_number == 0 + assert len(models.UpdatedLedgerSignal.query.all()) == 2 + uls = models.UpdatedLedgerSignal.query.filter_by( + creation_date=creation_date + timedelta(days=2) + ).one() + assert uls.creditor_id == C_ID + assert uls.debtor_id == D_ID + assert uls.update_id == ad.ledger_latest_update_id + assert uls.account_id == str(C_ID) + assert uls.principal == 0 + assert uls.last_transfer_number == 0 + assert uls.ts == ad.ledger_latest_update_ts # Discard orphaned account. params["debtor_id"] = 1235 @@ -328,8 +388,9 @@ def get_data(): object_type_hint=LogEntry.OTH_ACCOUNT_LEDGER ).all() ) - == 1 + == 2 ) + assert len(models.UpdatedLedgerSignal.query.all()) == 2 def test_process_rejected_config_signal(account): @@ -752,7 +813,8 @@ def get_ledger_update_entries_count(): ts=current_ts, ttl=10000, ) - assert get_ledger_update_entries_count() == 0 + assert get_ledger_update_entries_count() == 1 + assert len(models.UpdatedLedgerSignal.query.all()) == 1 assert p.get_pending_ledger_updates() == [(C_ID, D_ID)] assert ( len(p.get_account_ledger_entries(C_ID, D_ID, prev=1000, count=1000)) @@ -765,6 +827,8 @@ def get_ledger_update_entries_count(): assert p.process_pending_ledger_update( C_ID, 1111, max_count=max_count, max_delay=timedelta(days=10000) ) + assert get_ledger_update_entries_count() == 1 + assert len(models.UpdatedLedgerSignal.query.all()) == 1 n = 0 while not p.process_pending_ledger_update( @@ -781,6 +845,7 @@ def get_ledger_update_entries_count(): == 3 ) assert p.get_pending_ledger_updates() == [] + assert len(models.UpdatedLedgerSignal.query.all()) == lue_count params["transfer_number"] = 21 params["previous_transfer_number"] = 20 @@ -858,13 +923,14 @@ def get_ledger_update_entries_count(): pass assert len(PendingLedgerUpdate.query.all()) == 0 lue_count = get_ledger_update_entries_count() - assert lue_count == 0 + assert lue_count == 1 + assert len(models.UpdatedLedgerSignal.query.all()) == lue_count data = p.get_account_ledger(C_ID, D_ID) ledger_last_entry_id = data.ledger_last_entry_id assert data.ledger_principal == 0 assert data.ledger_last_entry_id >= 0 assert data.ledger_last_transfer_number == 0 - assert data.ledger_latest_update_id == ledger_latest_update_id + assert data.ledger_latest_update_id == ledger_latest_update_id + 1 assert len(PendingLedgerUpdate.query.all()) == 0 max_delay = timedelta(days=10) @@ -875,12 +941,13 @@ def get_ledger_update_entries_count(): ): pass lue_count = get_ledger_update_entries_count() - assert lue_count == 1 + assert lue_count == 2 + assert len(models.UpdatedLedgerSignal.query.all()) == lue_count data = p.get_account_ledger(C_ID, D_ID) assert data.ledger_principal == 1000 assert data.ledger_last_entry_id == ledger_last_entry_id + 1 assert data.ledger_last_transfer_number == 3 - assert data.ledger_latest_update_id == ledger_latest_update_id + 1 + assert data.ledger_latest_update_id == ledger_latest_update_id + 2 def test_process_rejected_direct_transfer_signal(account, current_ts): diff --git a/tests/test_routes.py b/tests/test_routes.py index df528a2..bfac85f 100644 --- a/tests/test_routes.py +++ b/tests/test_routes.py @@ -1066,11 +1066,21 @@ def test_account_config(client, account): r = client.patch( "/creditors/4294967296/accounts/1/config", json=request_data ) + + assert len(m.UpdatedPolicySignal.query.all()) == 0 assert r.status_code == 200 r = client.patch( "/creditors/4294967296/accounts/1/config", json=request_data ) assert r.status_code == 200 + ufs = m.UpdatedFlagsSignal.query.one() + assert ufs.debtor_id == 1 + assert ufs.creditor_id == 4294967296 + assert ufs.config_flags == ( + m.DEFAULT_CONFIG_FLAGS + | m.AccountData.CONFIG_SCHEDULED_FOR_DELETION_FLAG + ) + data = r.get_json() assert data["type"] == "AccountConfig" assert data["uri"] == "/creditors/4294967296/accounts/1/config" @@ -1323,6 +1333,7 @@ def test_account_exchange(client, account): "latestUpdateId": latestUpdateId + 1, } + assert len(m.UpdatedPolicySignal.query.all()) == 0 r = client.patch( "/creditors/4294967296/accounts/1111/exchange", json=request_data ) @@ -1332,10 +1343,20 @@ def test_account_exchange(client, account): "/creditors/4294967296/accounts/1/exchange", json=request_data ) assert r.status_code == 200 + ups = m.UpdatedPolicySignal.query.one() + assert ups.debtor_id == 1 + assert ups.creditor_id == 4294967296 + assert ups.policy_name is None + assert ups.min_principal == 1000 + assert ups.max_principal == 2000 + assert ups.peg_exchange_rate is None + assert ups.peg_debtor_id is None + r = client.patch( "/creditors/4294967296/accounts/1/exchange", json=request_data ) assert r.status_code == 200 + data = r.get_json() assert data["type"] == "AccountExchange" assert data["uri"] == "/creditors/4294967296/accounts/1/exchange"