Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processing server extension (#1046) #1069

Merged
merged 20 commits into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 68 additions & 11 deletions ocrd_network/ocrd_network/database.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
""" The database is used to store information regarding jobs and workspaces.

Jobs: for every process-request a job is inserted into the database with a uuid, status and
Jobs: for every process-request a job is inserted into the database with an uuid, status and
information about the process like parameters and file groups. It is mainly used to track the status
(`ocrd_network.models.job.StateEnum`) of a job so that the state of a job can be queried. Finished
jobs are not deleted from the database.
Expand Down Expand Up @@ -35,18 +35,77 @@ async def sync_initiate_database(db_url: str):
await initiate_database(db_url)


async def db_get_workspace(workspace_id: str) -> DBWorkspace:
workspace = await DBWorkspace.find_one(
DBWorkspace.workspace_id == workspace_id
)
if not workspace:
raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
async def db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
workspace = None
if not workspace_id and not workspace_mets_path:
raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key')
if workspace_id:
workspace = await DBWorkspace.find_one(
DBWorkspace.workspace_id == workspace_id
)
if not workspace:
raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
if workspace_mets_path:
workspace = await DBWorkspace.find_one(
DBWorkspace.workspace_mets_path == workspace_mets_path
)
if not workspace:
raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.')
return workspace


@call_sync
async def sync_db_get_workspace(workspace_id: str) -> DBWorkspace:
return await db_get_workspace(workspace_id)
async def sync_db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
return await db_get_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path)


async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs):
workspace = None
if not workspace_id and not workspace_mets_path:
raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key')
if workspace_id:
workspace = await DBWorkspace.find_one(
DBWorkspace.workspace_id == workspace_id
)
if not workspace:
raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
if workspace_mets_path:
workspace = await DBWorkspace.find_one(
DBWorkspace.workspace_mets_path == workspace_mets_path
)
if not workspace:
raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.')

job_keys = list(workspace.__dict__.keys())
for key, value in kwargs.items():
if key not in job_keys:
raise ValueError(f'Field "{key}" is not available.')
if key == 'workspace_id':
workspace.workspace_id = value
elif key == 'workspace_mets_path':
workspace.workspace_mets_path = value
elif key == 'ocrd_identifier':
workspace.ocrd_identifier = value
elif key == 'bagit_profile_identifier':
workspace.bagit_profile_identifier = value
elif key == 'ocrd_base_version_checksum':
workspace.ocrd_base_version_checksum = value
elif key == 'ocrd_mets':
workspace.ocrd_mets = value
elif key == 'bag_info_adds':
workspace.bag_info_adds = value
elif key == 'deleted':
workspace.deleted = value
elif key == 'pages_locked':
workspace.pages_locked = value
else:
raise ValueError(f'Field "{key}" is not updatable.')
await workspace.save()


@call_sync
async def sync_db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs):
await db_update_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path, **kwargs)


async def db_get_processing_job(job_id: str) -> DBProcessorJob:
Expand All @@ -68,8 +127,6 @@ async def db_update_processing_job(job_id: str, **kwargs):
if not job:
raise ValueError(f'Processing job with id "{job_id}" not in the DB.')

# TODO: This may not be the best Pythonic way to do it. However, it works!
# There must be a shorter way with Pydantic. Suggest an improvement.
job_keys = list(job.__dict__.keys())
for key, value in kwargs.items():
if key not in job_keys:
Expand Down
3 changes: 2 additions & 1 deletion ocrd_network/ocrd_network/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self, config_path: str) -> None:
self.data_mongo: DataMongoDB = DataMongoDB(config['database'])
self.data_queue: DataRabbitMQ = DataRabbitMQ(config['process_queue'])
self.data_hosts: List[DataHost] = []
self.internal_callback_url = config.get('internal_callback_url', None)
for config_host in config['hosts']:
self.data_hosts.append(DataHost(config_host))

Expand Down Expand Up @@ -302,7 +303,7 @@ def deploy_mongodb(
) -> str:
if self.data_mongo.skip_deployment:
self.log.debug('MongoDB is externaly managed. Skipping deployment')
verify_mongodb_available(self.data_mongo.url);
verify_mongodb_available(self.data_mongo.url)
return self.data_mongo.url

self.log.debug(f"Trying to deploy '{image}', with modes: "
Expand Down
2 changes: 2 additions & 0 deletions ocrd_network/ocrd_network/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
'PYJobInput',
'PYJobOutput',
'PYOcrdTool',
'PYResultMessage',
'StateEnum',
]

Expand All @@ -18,5 +19,6 @@
PYJobOutput,
StateEnum
)
from .messages import PYResultMessage
from .ocrd_tool import PYOcrdTool
from .workspace import DBWorkspace
13 changes: 13 additions & 0 deletions ocrd_network/ocrd_network/models/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,22 @@


class StateEnum(str, Enum):
# The processing job is cached inside the Processing Server requests cache
cached = 'CACHED'
# The processing job is queued inside the RabbitMQ
queued = 'QUEUED'
# Processing job is currently running in a Worker or Processor Server
running = 'RUNNING'
# Processing job finished successfully
success = 'SUCCESS'
# Processing job failed
failed = 'FAILED'


class PYJobInput(BaseModel):
""" Wraps the parameters required to make a run-processor-request
"""
processor_name: Optional[str] = None
path_to_mets: Optional[str] = None
workspace_id: Optional[str] = None
description: Optional[str] = None
Expand All @@ -28,6 +35,10 @@ class PYJobInput(BaseModel):
# Used to toggle between sending requests to 'worker and 'server',
# i.e., Processing Worker and Processor Server, respectively
agent_type: Optional[str] = 'worker'
# Auto generated by the Processing Server when forwarding to the Processor Server
job_id: Optional[str] = None
# If set, specifies a list of job ids this job depends on
depends_on: Optional[List[str]] = None

class Config:
schema_extra = {
Expand Down Expand Up @@ -65,8 +76,10 @@ class DBProcessorJob(Document):
output_file_grps: Optional[List[str]]
page_id: Optional[str]
parameters: Optional[dict]
depends_on: Optional[List[str]]
result_queue_name: Optional[str]
callback_url: Optional[str]
internal_callback_url: Optional[str]
start_time: Optional[datetime]
end_time: Optional[datetime]
exec_time: Optional[str]
Expand Down
22 changes: 22 additions & 0 deletions ocrd_network/ocrd_network/models/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from pydantic import BaseModel
from typing import Optional
from .job import StateEnum


class PYResultMessage(BaseModel):
""" Wraps the parameters required to make a result message request
"""
job_id: str
state: StateEnum
path_to_mets: Optional[str] = None
workspace_id: Optional[str] = None

class Config:
schema_extra = {
'example': {
'job_id': '123123123',
'state': 'SUCCESS',
'path_to_mets': '/path/to/mets.xml',
'workspace_id': 'c7f25615-fc17-4365-a74d-ad20e1ddbd0e'
}
}
10 changes: 9 additions & 1 deletion ocrd_network/ocrd_network/models/workspace.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from beanie import Document
from typing import Optional
from typing import Dict, Optional


class DBWorkspace(Document):
Expand All @@ -15,6 +15,10 @@ class DBWorkspace(Document):
ocrd_mets Ocrd-Mets (optional)
bag_info_adds bag-info.txt can also (optionally) contain additional
key-value-pairs which are saved here
deleted the document is deleted if set, however, the record is still preserved
pages_locked a data structure that holds output `fileGrp`s and their respective locked `page_id`
that are currently being processed by an OCR-D processor (server or worker).
If no `page_id` field is set, an identifier "all_pages" will be used.
"""
workspace_id: str
workspace_mets_path: str
Expand All @@ -24,6 +28,10 @@ class DBWorkspace(Document):
ocrd_mets: Optional[str]
bag_info_adds: Optional[dict]
deleted: bool = False
# Dictionary structure:
# Key: fileGrp
# Value: Set of `page_id`s
pages_locked: Optional[Dict] = {}

class Settings:
name = "workspace"
Loading