Skip to content

Commit

Permalink
Merge pull request #1093 from OCR-D/create-default-queue
Browse files Browse the repository at this point in the history
Make ProcessingWorker create its queue by default
  • Loading branch information
kba authored Sep 12, 2023
2 parents 8006c98 + 8ec0338 commit b8d5d7d
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 23 deletions.
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ Some parts of the software are configured via environement variables:
* `OCRD_NETWORK_SERVER_ADDR_PROCESSING`: Default address of Processing Server to connect to (for `ocrd network client processing`).
* `OCRD_NETWORK_SERVER_ADDR_WORKFLOW`: Default address of Workflow Server to connect to (for `ocrd network client workflow`).
* `OCRD_NETWORK_SERVER_ADDR_WORKSPACE`: Default address of Workspace Server to connect to (for `ocrd network client workspace`).
* `OCRD_NETWORK_WORKER_QUEUE_CONNECT_ATTEMPTS`: Number of attempts for a worker to create its queue. Helpfull if the rabbitmq-server needs time to be fully started.


## Packages
Expand Down Expand Up @@ -283,9 +284,9 @@ To be used in a **loop over all selected pages**:
local in_pageId=($(ocrd__input_file $n pageId))
local out_id=$(ocrd__input_file $n outputFileId)
local out_fpath="${ocrd__argv[output_file_grp]}/${out_id}.xml

# process $in_fpath to $out_fpath ...

declare -a options
if [ -n "$in_pageId" ]; then
options=( -g $in_pageId )
Expand All @@ -303,11 +304,11 @@ To be used in a **loop over all selected pages**:
> **Note**: If the `--input-file-grp` is **multi-valued** (N fileGrps separated by commas),
> then usage is similar:
> * The function `ocrd__input_file` can be used, but
> its results will be **lists** (delimited by whitespace and surrounded by single quotes),
> its results will be **lists** (delimited by whitespace and surrounded by single quotes),
> e.g. `[url]='file1.xml file2.xml' [ID]='id_file1 id_file2' [mimetype]='application/vnd.prima.page+xml image/tiff' ...`.
> * Therefore its results should be encapsulated in a (non-associative) **array variable**
> and without extra quotes, e.g. `in_file=($(ocrd__input_file 3 url))`, or as shown above.
> * This will yield the first fileGrp's results on index 0,
> * This will yield the first fileGrp's results on index 0,
> which in bash will always be the same as if you referenced the array without index
> (so code does not need to be changed much), e.g. `test -f $in_file` which equals `test -f ${in_file[0]}`.
> * Additional fileGrps will have to be fetched from higher indexes, e.g. `test -f ${in_file[1]}`.
Expand Down
2 changes: 2 additions & 0 deletions ocrd/ocrd/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
\b
{config.describe('OCRD_NETWORK_SERVER_ADDR_WORKSPACE')}
\b
{config.describe('OCRD_NETWORK_WORKER_QUEUE_CONNECT_ATTEMPTS')}
\b
{config.describe('OCRD_PROFILE_FILE')}
\b
{config.describe('OCRD_PROFILE', wrap_text=False)}
Expand Down
19 changes: 1 addition & 18 deletions ocrd_network/ocrd_network/cli/processing_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,7 @@
help='The URL of the MongoDB, format: mongodb://host:port',
type=DatabaseParamType(),
required=True)
@click.option('--create-queue',
is_flag=True,
help='Create the rabbitmq-queue for the worker. Usually the processing server starts'
'the workers and creates the queues. This is to make external addition of workers for'
'new processors possible')
@click.option('--queue-connect-attempts',
type=int,
default=1,
help='Number of attempts to establish the connection to rabbitmq for creating the '
'queue. There is two seconds wait between attempts. Helpfull if the server needs '
'time to be fully started')
def processing_worker_cli(processor_name: str, queue: str, database: str, create_queue: bool,
queue_connect_attempts: int):
def processing_worker_cli(processor_name: str, queue: str, database: str):
"""
Start Processing Worker
(a specific ocr-d processor consuming tasks from RabbitMQ queue)
Expand All @@ -55,11 +43,6 @@ def processing_worker_cli(processor_name: str, queue: str, database: str, create
ocrd_tool=ocrd_tool,
processor_class=None, # For readability purposes assigned here
)
if create_queue:
processing_worker.create_queue(
connection_attempts=queue_connect_attempts,
retry_delay=2
)
# The RMQConsumer is initialized and a connection to the RabbitMQ is performed
processing_worker.connect_consumer()
# Start consuming from the queue with name `processor_name`
Expand Down
8 changes: 7 additions & 1 deletion ocrd_network/ocrd_network/processing_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
verify_database_uri,
verify_and_parse_mq_uri
)
from ocrd_utils import config


class ProcessingWorker:
Expand Down Expand Up @@ -79,6 +80,8 @@ def __init__(self, rabbitmq_addr, mongodb_addr, processor_name, ocrd_tool: dict,
# The publisher is connected when the `result_queue` field of the OcrdProcessingMessage is set for first time
# Used to publish OcrdResultMessage type message to the queue with name {processor_name}-result
self.rmq_publisher = None
# Always create a queue (idempotent)
self.create_queue()

def connect_consumer(self) -> None:
self.log.info(f'Connecting RMQConsumer to RabbitMQ server: '
Expand Down Expand Up @@ -261,7 +264,10 @@ def publish_to_result_queue(self, result_queue: str, result_message: OcrdResultM
message=encoded_result_message
)

def create_queue(self, connection_attempts=1, retry_delay=1):
def create_queue(
self,
connection_attempts: int = config.OCRD_NETWORK_WORKER_QUEUE_CONNECT_ATTEMPTS,
retry_delay: int = 3) -> None:
"""Create the queue for this worker
Originally only the processing-server created the queues for the workers according to the
Expand Down
5 changes: 5 additions & 0 deletions ocrd_utils/ocrd_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ def _ocrd_download_timeout_parser(val):
description="Default address of Workspace Server to connect to (for `ocrd network client workspace`).",
default=(True, ''))

config.add("OCRD_NETWORK_WORKER_QUEUE_CONNECT_ATTEMPTS",
description="Number of attempts for a worker to create its queue. Helpfull if the rabbitmq-server needs time to be fully started",
parser=int,
default=(True, 3))

config.add("HOME",
description="Directory to look for `ocrd_logging.conf`, fallback for unset XDG variables.",
# description="HOME directory, cf. https://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html",
Expand Down

0 comments on commit b8d5d7d

Please sign in to comment.