Skip to content

Commit

Permalink
Merge pull request #54 from epandurski/agent-type
Browse files Browse the repository at this point in the history
Emit update notifications
  • Loading branch information
epandurski authored Oct 14, 2023
2 parents f6db89d + 93efb88 commit 25de0a9
Show file tree
Hide file tree
Showing 20 changed files with 518 additions and 37 deletions.
27 changes: 19 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,21 @@ following servers:
that all incoming SMP messages for the creditors stored on the
PostgreSQL server instance, are routed to this queue.

Also, a [RabbitMQ exchange] named **`creditors_out`** must be configured
on the broker instance. This exchange is for messages that must be sent
to accounting authorities. The routing key will represent the debtor ID
as hexadecimal (lowercase). For example, for debtor ID equal to 10, the
routing key will be "00.00.00.00.00.00.00.0a".
Also, the following [RabbitMQ exchanges] must be configured on the
broker instance:

- **`creditors_out`**: For messages that must be sent to accounting
authorities. The routing key will represent the debtor ID as
hexadecimal (lowercase). For example, for debtor ID equal to 10, the
routing key will be "00.00.00.00.00.00.00.0a".

- **`to_trade`**: For policy change notifications that must be sent to
the subsystem that is responsible for performing automatic circular
trades. The routing key will represent the highest 24 bits of the MD5
digest of the creditor ID. For example, if the creditor ID is 123, the
routing key will be "1.1.1.1.1.1.0.0.0.0.0.1.0.0.0.0.0.1.1.0.0.0.1.1".
This allows different creditor accounts to be handled by different
database servers (sharding).

**Note:** If you execute the "configure" command (see below), with
the environment variable `SETUP_RABBITMQ_BINDINGS` set to `yes`, an
Expand Down Expand Up @@ -146,7 +156,7 @@ PROTOCOL_BROKER_THREADS=3
PROTOCOL_BROKER_PREFETCH_COUNT=10

# The binding key with which the "$PROTOCOL_BROKER_QUEUE"
# RabbitMQ queue is bound to the "creditors_in" RabbitMQ topic
# RabbitMQ queue is bound to the incoming messages' topic
# exchange (default "#"). The binding key must consist of zero or
# more 0s or 1s, separated by dots, ending with a hash symbol.
# For example: "0.1.#", "1.#", or "#".
Expand Down Expand Up @@ -230,7 +240,8 @@ container allows you to execute the following *documented commands*:
RabbitMQ broker, and remove the messages from the PostgreSQL database.

* `flush_configure_accounts`, `flush_prepare_transfers`,
`flush_finalize_transfers`
`flush_finalize_transfers`, `flush_updated_ledgers`,
`flush_updated_policies`, `flush_updated_flags`

Starts additional worker processes that send particular type of outgoing
messages to the RabbitMQ broker, and remove the messages from the
Expand Down Expand Up @@ -328,7 +339,7 @@ can be used for end-to-end testing.
[Swaptacular Messaging Protocol]: https://github.com/swaptacular/swpt_accounts/blob/master/protocol.rst
[RabbitMQ]: https://www.rabbitmq.com/
[RabbitMQ queue]: https://www.cloudamqp.com/blog/part1-rabbitmq-for-beginners-what-is-rabbitmq.html
[RabbitMQ exchange]: https://www.cloudamqp.com/blog/part4-rabbitmq-for-beginners-exchanges-routing-keys-bindings.html
[RabbitMQ exchanges]: https://www.cloudamqp.com/blog/part4-rabbitmq-for-beginners-exchanges-routing-keys-bindings.html
[Redis]: https://redis.io/
[OAuth 2.0]: https://oauth.net/2/
[nginx]: https://en.wikipedia.org/wiki/Nginx
Expand Down
5 changes: 4 additions & 1 deletion development.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Configuration settings #
##########################

MIN_CREDITOR_ID=0x0000010000000000
MIN_CREDITOR_ID=0x0000010100000000
MAX_CREDITOR_ID=0x000001ffffffffff

PIN_PROTECTION_SECRET=
Expand Down Expand Up @@ -51,6 +51,9 @@ APP_PROCESS_LEDGER_UPDATES_MAX_COUNT=100000
APP_FLUSH_CONFIGURE_ACCOUNTS_BURST_COUNT=10000
APP_FLUSH_PREPARE_TRANSFERS_BURST_COUNT=10000
APP_FLUSH_FINALIZE_TRANSFERS_BURST_COUNT=10000
APP_FLUSH_UPDATED_LEDGER_BURST_COUNT=10000
APP_FLUSH_UPDATED_POLICY_BURST_COUNT=10000
APP_FLUSH_UPDATED_FLAGS_BURST_COUNT=10000
APP_CREDITORS_SCAN_DAYS=7
APP_CREDITORS_SCAN_BLOCKS_PER_QUERY=40
APP_CREDITORS_SCAN_BEAT_MILLISECS=25
Expand Down
4 changes: 4 additions & 0 deletions docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,15 @@ case $1 in
exec flask swpt_creditors "$@"
;;
flush_configure_accounts | flush_prepare_transfers | flush_finalize_transfers \
| flush_updated_ledgers | flush_updated_policies | flush_updated_flags \
| flush_all)

flush_configure_accounts=ConfigureAccountSignal
flush_prepare_transfers=PrepareTransferSignal
flush_finalize_transfers=FinalizeTransferSignal
flush_updated_ledgers=UpdatedLedgerSignal
flush_updated_policies=UpdatedPolicySignal
flush_updated_flags=UpdatedFlagsSignal
flush_all=

# For example: if `$1` is "flush_configure_accounts",
Expand Down
53 changes: 53 additions & 0 deletions migrations/versions/4260660af5b1_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""empty message
Revision ID: 4260660af5b1
Revises: 824705144577
Create Date: 2023-10-11 16:22:24.946584
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '4260660af5b1'
down_revision = '824705144577'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('updated_ledger_signal',
sa.Column('creditor_id', sa.BigInteger(), nullable=False),
sa.Column('debtor_id', sa.BigInteger(), nullable=False),
sa.Column('update_id', sa.BigInteger(), nullable=False),
sa.Column('creation_date', sa.DATE(), nullable=False),
sa.Column('account_id', sa.String(), nullable=False),
sa.Column('principal', sa.BigInteger(), nullable=False),
sa.Column('last_transfer_number', sa.BigInteger(), nullable=False),
sa.Column('ts', sa.TIMESTAMP(timezone=True), nullable=False),
sa.Column('inserted_at', sa.TIMESTAMP(timezone=True), nullable=False),
sa.PrimaryKeyConstraint('creditor_id', 'debtor_id', 'update_id')
)
op.create_table('updated_policy_signal',
sa.Column('creditor_id', sa.BigInteger(), nullable=False),
sa.Column('debtor_id', sa.BigInteger(), nullable=False),
sa.Column('update_id', sa.BigInteger(), nullable=False),
sa.Column('policy_name', sa.String(), nullable=True),
sa.Column('min_principal', sa.BigInteger(), nullable=False),
sa.Column('max_principal', sa.BigInteger(), nullable=False),
sa.Column('peg_exchange_rate', sa.FLOAT(), nullable=True),
sa.Column('peg_debtor_id', sa.BigInteger(), nullable=True),
sa.Column('ts', sa.TIMESTAMP(timezone=True), nullable=False),
sa.Column('inserted_at', sa.TIMESTAMP(timezone=True), nullable=False),
sa.PrimaryKeyConstraint('creditor_id', 'debtor_id', 'update_id')
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('updated_policy_signal')
op.drop_table('updated_ledger_signal')
# ### end Alembic commands ###
36 changes: 36 additions & 0 deletions migrations/versions/4e310d9db669_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""empty message
Revision ID: 4e310d9db669
Revises: 4260660af5b1
Create Date: 2023-10-13 16:39:40.541127
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '4e310d9db669'
down_revision = '4260660af5b1'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('updated_flags_signal',
sa.Column('creditor_id', sa.BigInteger(), nullable=False),
sa.Column('debtor_id', sa.BigInteger(), nullable=False),
sa.Column('update_id', sa.BigInteger(), nullable=False),
sa.Column('config_flags', sa.Integer(), nullable=False),
sa.Column('ts', sa.TIMESTAMP(timezone=True), nullable=False),
sa.Column('inserted_at', sa.TIMESTAMP(timezone=True), nullable=False),
sa.PrimaryKeyConstraint('creditor_id', 'debtor_id', 'update_id')
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('updated_flags_signal')
# ### end Alembic commands ###
3 changes: 3 additions & 0 deletions swpt_creditors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ class Configuration(metaclass=MetaEnvReader):
APP_FLUSH_CONFIGURE_ACCOUNTS_BURST_COUNT = 10000
APP_FLUSH_PREPARE_TRANSFERS_BURST_COUNT = 10000
APP_FLUSH_FINALIZE_TRANSFERS_BURST_COUNT = 10000
APP_FLUSH_UPDATED_LEDGER_BURST_COUNT = 10000
APP_FLUSH_UPDATED_POLICY_BURST_COUNT = 10000
APP_FLUSH_UPDATED_FLAGS_BURST_COUNT = 10000
APP_CREDITORS_SCAN_DAYS = 7.0
APP_CREDITORS_SCAN_BLOCKS_PER_QUERY = 40
APP_CREDITORS_SCAN_BEAT_MILLISECS = 25
Expand Down
15 changes: 9 additions & 6 deletions swpt_creditors/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,9 @@ def _on_rejected_direct_transfer_signal(
**kwargs
) -> None:
if coordinator_type != CT_DIRECT: # pragma: no cover
_LOGGER.error('Unexpected coordinator type: "%s"', coordinator_type)
return
raise RuntimeError(
f'Unexpected coordinator type: "{coordinator_type}"'
)

procedures.process_rejected_direct_transfer_signal(
coordinator_id=coordinator_id,
Expand Down Expand Up @@ -190,8 +191,9 @@ def _on_prepared_direct_transfer_signal(
**kwargs
) -> None:
if coordinator_type != CT_DIRECT: # pragma: no cover
_LOGGER.error('Unexpected coordinator type: "%s"', coordinator_type)
return
raise RuntimeError(
f'Unexpected coordinator type: "{coordinator_type}"'
)

procedures.process_prepared_direct_transfer_signal(
debtor_id=debtor_id,
Expand Down Expand Up @@ -220,8 +222,9 @@ def _on_finalized_direct_transfer_signal(
**kwargs
) -> None:
if coordinator_type != CT_DIRECT: # pragma: no cover
_LOGGER.error('Unexpected coordinator type: "%s"', coordinator_type)
return
raise RuntimeError(
f'Unexpected coordinator type: "{coordinator_type}"'
)

procedures.process_finalized_direct_transfer_signal(
debtor_id=debtor_id,
Expand Down
34 changes: 30 additions & 4 deletions swpt_creditors/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,13 @@ def subscribe(): # pragma: no cover
"""

from .extensions import CREDITORS_IN_EXCHANGE, CREDITORS_OUT_EXCHANGE
from .extensions import (
CREDITORS_IN_EXCHANGE,
CREDITORS_OUT_EXCHANGE,
TO_TRADE_EXCHANGE,
)

CA_CREDITORS_EXCHANGE = "ca.creditors"
logger = logging.getLogger(__name__)
queue_name = current_app.config["PROTOCOL_BROKER_QUEUE"]
routing_key = current_app.config["PROTOCOL_BROKER_QUEUE_ROUTING_KEY"]
Expand All @@ -59,11 +64,32 @@ def subscribe(): # pragma: no cover

# declare exchanges
channel.exchange_declare(
CREDITORS_IN_EXCHANGE, exchange_type="topic", durable=True
CREDITORS_IN_EXCHANGE, exchange_type="headers", durable=True
)
channel.exchange_declare(
CA_CREDITORS_EXCHANGE, exchange_type="topic", durable=True
)
channel.exchange_declare(
CREDITORS_OUT_EXCHANGE, exchange_type="topic", durable=True
)
channel.exchange_declare(
TO_TRADE_EXCHANGE, exchange_type="topic", durable=True
)

# declare exchange bindings
channel.exchange_bind(
source=CREDITORS_IN_EXCHANGE,
destination=CA_CREDITORS_EXCHANGE,
arguments={
"x-match": "all",
"ca-creditors": True,
},
)
logger.info(
'Created a binding from "%s" to the "%s" exchange.',
CREDITORS_IN_EXCHANGE,
CA_CREDITORS_EXCHANGE,
)

# declare a corresponding dead-letter queue
channel.queue_declare(dead_letter_queue_name, durable=True)
Expand All @@ -82,13 +108,13 @@ def subscribe(): # pragma: no cover

# bind the queue
channel.queue_bind(
exchange=CREDITORS_IN_EXCHANGE,
exchange=CA_CREDITORS_EXCHANGE,
queue=queue_name,
routing_key=routing_key,
)
logger.info(
'Created a binding from "%s" to "%s" with routing key "%s".',
CREDITORS_IN_EXCHANGE,
CA_CREDITORS_EXCHANGE,
queue_name,
routing_key,
)
Expand Down
1 change: 1 addition & 0 deletions swpt_creditors/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
CREDITORS_IN_EXCHANGE = "creditors_in"
DEBTORS_OUT_EXCHANGE = "debtors_out"
DEBTORS_IN_EXCHANGE = "debtors_in"
TO_TRADE_EXCHANGE = "to_trade"


warnings.filterwarnings(
Expand Down
Loading

0 comments on commit 25de0a9

Please sign in to comment.