-
Notifications
You must be signed in to change notification settings - Fork 3
/
submission.py
1544 lines (1362 loc) · 72.7 KB
/
submission.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import os.path
import sys
import traceback
import re
import datetime
import dateutil.parser
import shutil
import zipfile
import tarfile
import logging
import json
import csv
import uuid
import pkgutil
import tempfile
import sqlite3
import requests
from glob import glob
from bdbag import bdbag_api
from bdbag.bdbagit import BagError, BagValidationError
import frictionless
from deriva.core import DerivaServer, get_credential, init_logging, urlquote
from . import exception, tableschema
from .registry import Registry, WebauthnUser, WebauthnAttribute, nochange, terms
from .datapackage import CfdeDataPackage, submission_schema_json, portal_prep_schema_json, portal_schema_json, registry_schema_json, sql_literal, sql_identifier, make_session_config, tables_topo_sorted
from .cfde_login import get_archive_headers_map
logger = logging.getLogger(__name__)
class Submission (object):
"""Processing support for C2M2 datapackage submissions.
This class collects some utility functions, but instances of the
class represent a stateful processing lifecycle which is coupled
with updates to the CFDE submission registry, causing side-effects
in the registry as other processing methods are invoked.
Typical call-sequence from automated ingest pipeline:
from cfde_deriva.submission import Submission, Registry, \
ErmrestUser, ErmrestAttributes
registry = Registry('https', 'server.nih-cfde.org')
# gather request context
dcc_id = ...
user = WebauthnUser.from_globus(
globus_user_uuid,
display_name,
full_name,
email,
[
WebauthnAttribute.from_globus(
group_uuid,
display_name
)
for group_uuid, display_name in ...
]
)
# early pre-flight check of submission context
# (raises exceptions for early abort cases)
registry.validate_dcc_id(dcc_id, user)
# other ingest system prep, like archiving submitted data
archive_url = ...
submission = Submission(registry, id, dcc_id, archive_url, submitting_user)
# register and perform ingest processing of archived data
# should register (if pre-flight worked above)
# raises exceptions for aysnc failure (should update registry before that)
submission.ingest()
"""
# Allow monkey-patching or other caller-driven reconfig in future?
content_path_root = '/var/tmp/cfde_deriva_submissions'
def __init__(self, server, registry, id, dcc_id, archive_url, submitting_user, archive_headers_map=None, skip_dcc_check=False):
"""Represent a stateful processing flow for a C2M2 submission.
:param server: A DerivaServer binding object where review catalogs are created.
:param registry: A Registry binding object.
:param id: The unique identifier for the submission, i.e. UUID.
:param dcc_id: The submitting DCC, using a registry dcc.id key value.
:param archive_url: The stable URL where the submission BDBag can be found.
:param submitting_user: A WebauthnUser instance representing submitting user.
:param archive_headers_map: A map of URL patterns to additional request headers.
:param skip_dcc_check: True overrides normal safety check during constructor (default False).
The new instance is a binding for a submission which may or
may not yet exist in the registry. The constructor WILL NOT
cause state changes to the registry.
The archive_headers_map is a dict hierarchy with structure like:
{
"url_regexp": { "header-name": "header-content", ... }
}
For example:
{
"https://[^/]*[.]data[.]globus[.]org/.*": { "Authorization": "Bearer globus-token-here" }
}
The purpose is to allow the caller to specify trust policies
for which URLs ought to be requested with potentially
sensitive request headers. For all patterns matched by
re.fullmatch(pattern, url), the corresponding header
dictionaries will be merged via dict.update() while iterating
over all matched rules. Supply an ordered dict if you care
about the order of this merge.
Raises UnknownDccId if dcc_id is not known by registry.
Raises Forbidden if submitting_user is not known as a submitter for DCC by registry.
Raises non-CfdeError exceptions for operational errors.
"""
if not skip_dcc_check:
registry.validate_dcc_id(dcc_id, submitting_user)
self.server = server
self.registry = registry
self.datapackage_id = id
self.submitting_dcc_id = dcc_id
self.archive_url = archive_url
self.review_catalog = None
self.submitting_user = submitting_user
self.archive_headers_map = archive_headers_map
# check filesystem config early to abort ASAP on errors
# TBD: check permissions for safe service config?
os.makedirs(os.path.dirname(self.download_filename), exist_ok=True)
os.makedirs(os.path.dirname(self.ingest_sqlite_filename), exist_ok=True)
os.makedirs(os.path.dirname(self.portal_prep_sqlite_filename), exist_ok=True)
os.makedirs(os.path.dirname(self.content_path), exist_ok=True)
def dump_progress(self, progress):
with open(self.restart_marker_filename, 'w') as f:
json.dump(progress, f, indent=2)
logger.info("Dumped restart marker file %s" % self.restart_marker_filename)
@classmethod
def purge_multiple(cls, server, registry, purge_mode='auto', horizon=datetime.timedelta(weeks=-2)):
"""Purge multiple datapackge catalogs, updating records appropriately
:param purge_mode: Target selection mode string (default 'auto')
:param horizon: A timedelta horizon to influence 'auto' purge mode
Supported purge_mode values:
- 'auto': heuristically select likely dead-end datapackages
- 'ALL': purge ALL catalogs, regardless of status
The horizon is a relative time offset (from NOW) to divide the timeline
into (earlier) irrelevant times and (more recent) elevant times.
"""
if purge_mode not in {'auto', 'ALL'}:
raise ValueError('Invalid purge_mode %r' % (purge_mode,))
# convert relative horizon to absolute
horizon = datetime.datetime.now(datetime.timezone.utc) + horizon
# find exclusions we want to protect in 'auto' mode
excluding = {
dp_row['id']
for dp_row in registry.get_latest_approved_datapackages(True, False).values()
}
for dp_row in registry.list_datapackages(sortby="submission_time"):
submission_time = dateutil.parser.parse(dp_row['submission_time'])
in_past = submission_time < horizon
if purge_mode == 'ALL':
logger.info("Purging datapackage %(id)s in purge ALL mode" % dp_row)
purge_this = True
elif dp_row['id'] in excluding:
logger.info("Skipping datapackage %(id)s excluded by heuristic guard." % dp_row)
purge_this = False
elif dp_row["status"] in {
terms.cfde_registry_dp_status.ops_error,
terms.cfde_registry_dp_status.obsoleted,
terms.cfde_registry_dp_status.bag_error,
terms.cfde_registry_dp_status.check_error,
terms.cfde_registry_dp_status.content_error,
}:
logger.info("Purging datapackage %(id)s with unconditional purge status %(status)s" % dp_row)
purge_this = True
elif in_past and dp_row["status"] in {
terms.cfde_registry_dp_status.submitted,
terms.cfde_registry_dp_status.bag_valid,
terms.cfde_registry_dp_status.check_valid,
terms.cfde_registry_dp_status.content_ready,
}:
logger.info("Purging past datapackage %(id)s with conditional purge status %(status)s" % dp_row)
purge_this = True
else:
purge_this = False
logger.info("Skipping datapackage %(id)s with unmatched status %(status)s for purge." % dp_row)
if purge_this:
Submission.purge(server, registry, dp_row['id'])
@classmethod
def purge(cls, server, registry, datapackage_id):
"""Purge datapackage catalog state from service, updating datapackage record appropriately"""
dp_row = registry.get_datapackage(datapackage_id)
status = dp_row["status"]
catalog_url = dp_row["review_ermrest_url"]
new_state = {
terms.cfde_registry_dp_status.content_ready: terms.cfde_registry_dp_status.obsoleted,
# are we puring an active submission?
terms.cfde_registry_dp_status.submitted: terms.cfde_registry_dp_status.ops_error,
terms.cfde_registry_dp_status.bag_valid: terms.cfde_registry_dp_status.ops_error,
terms.cfde_registry_dp_status.check_valid: terms.cfde_registry_dp_status.ops_error,
}.get(status, status)
if status != new_state:
logger.info('Changing datapackage %r status %s -> %s' % (datapackage_id, status, new_state))
registry.update_datapackage(datapackage_id, status=new_state)
if catalog_url:
logger.info('Deleting ermrest catalog %r' % (catalog_url,))
catalog_id = cls.extract_catalog_id(server, catalog_url)
try:
server.delete('/ermrest/catalog/%s' % urlquote(catalog_id))
mesg = 'Purged catalog %r for datapackage %r.' % (catalog_id, datapackage_id)
except requests.exceptions.HTTPError as e:
if e.response.status_code == requests.codes.not_found:
mesg = 'Catalog %r for datapackage %r already purged?' % (catalog_id, datapackage_id)
else:
raise
else:
mesg = 'Purged review URLs for datapackage %r.' % datapackage_id
registry.update_datapackage(datapackage_id, review_ermrest_url=None, review_browse_url=None, review_summary_url=None)
logger.info(mesg)
def ingest(self):
"""Idempotently run submission-ingest processing lifecycle.
Performs this reentrant/idempotent sequence:
1. Register this datapackage submission
2. Retrieve and unpack datapackage into working dir
3. Perform client-side datapackage pre-validation steps
4. Prepare a temporary sqlite DB to validate content and compute derived portal tables
5. Create and register (empty) C2M2 review catalog
6. Load datapackage content into review catalog
7. Update registry with success/failure status
Raises RegistrationError and aborts if step (1) failed.
...
Raises non-CfdeError exceptions for operational errors.
In error cases other than RegistrationError, the registry will
be updated with error status prior to the exception being
raised, unless an operational error prevents that action as
well. Errors derived from CfdeError will be reflected in the
datapackage.diagnostics field of the registry, while other
operational errors will only be reflected in the logger
output.
"""
# idempotently get into registry
try:
dp = self.registry.register_datapackage(
self.datapackage_id,
self.submitting_dcc_id,
self.submitting_user,
self.archive_url,
)
except Exception as e:
logger.error('Got exception %s when registering datapackage %s, aborting!' % (e, self.datapackage_id,))
raise exception.RegistrationError(e)
try:
os.makedirs(os.path.dirname(self.restart_marker_filename), exist_ok=True)
with open(self.restart_marker_filename, 'r') as f:
progress = json.load(f)
logger.info("Loaded restart marker file %s" % self.restart_marker_filename)
except:
progress = dict()
# general sequence (with many idempotent steps)
failed = True
diagnostics = 'An unknown operational error has occurred.'
failed_exc = None
# streamline handling of error reporting...
# next_error_state anticipates how to categorize exceptions
# based on what we are trying to do during sequence
next_error_state = terms.cfde_registry_dp_status.ops_error
try:
# shortcut if already in terminal state
if dp['status'] in {
terms.cfde_registry_dp_status.content_ready,
terms.cfde_registry_dp_status.rejected,
terms.cfde_registry_dp_status.release_pending,
terms.cfde_registry_dp_status.obsoleted,
}:
logger.info('Skipping ingest for datapackage %s with existing terminal status %s.' % (
self.datapackage_id,
dp['status'],
))
return
self.retrieve_datapackage(self.archive_url, self.download_filename, self.archive_headers_map)
self.unpack_datapackage(self.download_filename, self.content_path)
next_error_state = terms.cfde_registry_dp_status.bag_error
if dp['status'] not in {
terms.cfde_registry_dp_status.bag_valid,
terms.cfde_registry_dp_status.check_valid,
}:
self.bdbag_validate(self.content_path)
self.registry.update_datapackage(self.datapackage_id, status=terms.cfde_registry_dp_status.bag_valid)
def dpt_prepare(packagefile):
"""Prepare lookup tools for packagefile"""
with open(packagefile, 'r') as f:
packagedoc = json.load(f)
resources = packagedoc.get('resources', [])
rname_to_pos = dict()
rpath_to_pos = dict()
for pos in range(len(resources)):
rname_to_pos[resources[pos].get("name")] = pos
rpath_to_pos[resources[pos].get("path")] = pos
return (
{ k: v for k, v in rname_to_pos.items() if k is not None },
{ k: v for k, v in rpath_to_pos.items() if k is not None },
resources,
)
def dpt_register(content_path, packagefile):
"""Register resources in frictionless schema"""
self.rname_to_pos, self.rpath_to_pos, self.resources = dpt_prepare(packagefile)
for pos in range(len(self.resources)):
# this could be a null or repeated name, which we'll reject later...
self.registry.register_datapackage_table(self.datapackage_id, pos, self.resources[pos].get("name"))
def dpt_update1(content_path, packagefile, report):
"""Update status of resources from frictionless report"""
for task in report.tasks:
if not hasattr(task, 'resource'):
continue
if not hasattr(task.resource, 'path'):
continue
if not hasattr(task.resource, 'name'):
logger.debug('Could not understand report resource lacking "name": %s' % task.resource)
continue
if task.errors:
status = terms.cfde_registry_dpt_status.check_error
diagnostics = 'Tabular resource found %d errors. First error: %s' % (
len(task.errors),
task.errors[0].message
)
else:
status = nochange
diagnostics = nochange
num_rows = task.resource.stats.get('rows', nochange)
self.registry.update_datapackage_table(
self.datapackage_id,
self.rname_to_pos[task.resource.name],
status= status,
num_rows= num_rows,
diagnostics= diagnostics
)
def dpt_update2(name, path):
"""Update status of resource following content upload"""
try:
pos = self.rname_to_pos[name],
self.registry.update_datapackage_table(
self.datapackage_id,
self.rname_to_pos[name],
status= terms.cfde_registry_dpt_status.content_ready
)
except KeyError as e:
logger.debug("Swallowing dpt_update2 callback for table %s lacking position in datapackage" % (name,))
def dpt_error2(name, path, diagnostics):
try:
pos = self.rname_to_pos[name]
self.registry.update_datapackage_table(
self.datapackage_id,
pos,
status= terms.cfde_registry_dpt_status.content_error,
diagnostics= diagnostics,
)
except KeyError as e:
logger.debug("Swallowing dpt_error2 callback for table %s lacking position in datapackage" % (name,))
if dp['status'] not in {
terms.cfde_registry_dp_status.check_valid,
}:
next_error_state = terms.cfde_registry_dp_status.check_error
self.datapackage_model_check(self.content_path, pre_process=dpt_register)
self.datapackage_validate(self.content_path, post_process=dpt_update1, check_fkeys=False, check_keys=False)
next_error_state = terms.cfde_registry_dp_status.ops_error
self.provision_sqlite(submission_schema_json, self.ingest_sqlite_filename)
self.provision_sqlite(portal_prep_schema_json, self.portal_prep_sqlite_filename)
if self.review_catalog is None:
self.review_catalog = self.create_review_catalog(self.server, self.registry, self.datapackage_id)
next_error_state = terms.cfde_registry_dp_status.content_error
self.load_sqlite(self.content_path, self.ingest_sqlite_filename, onconflict='abort', table_error_callback=dpt_error2)
self.sqlite_datapackage_check(submission_schema_json, self.content_path, self.ingest_sqlite_filename, table_error_callback=dpt_error2)
self.registry.update_datapackage(self.datapackage_id, status=terms.cfde_registry_dp_status.check_valid)
self.prepare_sqlite_derived_data(self.portal_prep_sqlite_filename, attach={"submission": self.ingest_sqlite_filename})
self.record_vocab_usage(self.registry, self.portal_prep_sqlite_filename, self.datapackage_id)
self.download_resource_markdown_to_sqlite(self.registry, self.portal_prep_sqlite_filename)
# this needs project_root from prepare_sqlite_derived_data...
next_error_state = terms.cfde_registry_dp_status.content_error
self.validate_submission_dcc_table(self.portal_prep_sqlite_filename, self.submitting_dcc_id)
self.validate_collection_names(self.portal_prep_sqlite_filename)
next_error_state = terms.cfde_registry_dp_status.ops_error
self.upload_sqlite_raw_content(self.review_catalog, self.ingest_sqlite_filename, table_done_callback=dpt_update2, table_error_callback=dpt_error2)
self.upload_sqlite_content(self.review_catalog, self.portal_prep_sqlite_filename, table_done_callback=dpt_update2, table_error_callback=dpt_error2)
review_browse_url = '%s/chaise/recordset/#%s/CFDE:file' % (
self.review_catalog._base_server_uri,
self.review_catalog.catalog_id,
)
review_summary_url = '%s/dcc_review.html?catalogId=%s' % (
self.review_catalog._base_server_uri,
self.review_catalog.catalog_id,
)
self.registry.update_datapackage(
self.datapackage_id,
review_browse_url=review_browse_url,
review_summary_url=review_summary_url,
)
# guard against unexpected abends not caught explicitly...
failed = False
except exception.CfdeError as e:
# assume we can expose CfdeError text content
failed, failed_exc, diagnostics = True, e, str(e)
raise
except Exception as e:
# don't assume we can expose unexpected error content
failed, failed_exc = True, e
raise
finally:
# record whatever we've discovered above
if failed:
status, diagnostics = next_error_state, diagnostics
if failed_exc is not None:
et, ev, tb = sys.exc_info()
logger.debug(traceback.format_exception(et, ev, tb))
else:
diagnostics = 'Processing interrupted?'
logger.error(
'Failed with exception %s in ingest sequence with next_error_state=%s for datapackage %s' \
% (failed_exc, next_error_state, self.datapackage_id,)
)
else:
status, diagnostics = terms.cfde_registry_dp_status.content_ready, None
logger.debug(
'Finished ingest processing for datapackage %s' % (self.datapackage_id,)
)
logger.debug(
'Updating datapackage %s status=%s diagnostics=%s...' % (
self.datapackage_id,
status,
'(nochange)' if diagnostics is nochange else diagnostics
)
)
self.registry.update_datapackage(self.datapackage_id, status=status, diagnostics=diagnostics)
logger.debug('Datapackage %s status successfully updated.' % (self.datapackage_id,))
## mapping of submission ID to local processing resource names
@property
def download_filename(self):
"""Return download_filename target name for given submission id.
We use a deterministic mapping of submission id to so that we
can do reentrant processing.
"""
# TBD: check or remap id character range?
# naive mapping should be OK for UUIDs...
return '%s/downloads/%s' % (self.content_path_root, self.datapackage_id)
@property
def content_path(self):
"""Return content_path working state path for a given submssion id.
We use a deterministic mapping of submission id to
content_path so that we can do reentrant processing.
"""
# TBD: check or remap id character range?
# naive mapping should be OK for UUIDs...
return '%s/unpacked/%s' % (self.content_path_root, self.datapackage_id)
@property
def ingest_sqlite_filename(self):
"""Return ingest_sqlite_filename scratch C2M2 DB target name for given submssion id.
We use a deterministic mapping of submission id to
ingest_sqlite_filename so that we can do reentrant processing.
"""
# TBD: check or remap id character range?
# naive mapping should be OK for UUIDs...
return '%s/databases/%s_submission.sqlite3' % (self.content_path_root, self.datapackage_id)
@property
def portal_prep_sqlite_filename(self):
"""Return portal_prep_sqlite_filename scratch C2M2 DB target name for given submssion id.
We use a deterministic mapping of submission id to
portal_prep_sqlite_filename so that we can do reentrant processing.
"""
# TBD: check or remap id character range?
# naive mapping should be OK for UUIDs...
return '%s/databases/%s_portal_prep.sqlite3' % (self.content_path_root, self.datapackage_id)
## utility functions to help with various processing and validation tasks
@property
def restart_marker_filename(self):
"""Return restart_marker JSON file name for given submission id.
"""
return '%s/progress/%s.json' % (self.content_path_root, self.datapackage_id)
@classmethod
def report_external_ops_error(cls, registry, id, diagnostics=None, status=terms.cfde_registry_dp_status.ops_error):
"""Idempotently update datapackage entry in registry with ops error and diagnostics
:param registry: An instance of Registry authorized to make updates to submissions.
:param id: The datapackage.id key for the failed submission
:param diagnostics: Optional text describing the error to a user.
:param status: The error status state to report.
This method is to report errors detected outside the normal
ingest() routine, e.g. in a parent detecting a child process
failure. It will set the submission status and diagnostics
information.
The diagostics should be a sanitized message appropriate for
sharing with system users, while the caller should log any
additional information necessary for devops staff.
The status should normally be left at its default to report an
ops-error.
"""
try:
dp = registry.get_datapackage(id)
except exception.DatapackageUnknown as e:
logger.debug('report_external_ops_error: Cannot adjust status of unknown submission id=%s' % (id,))
logger.debug('report_external_ops_error: Discarding status="%s" diagnostics="%s"' % (status, diagnostics))
return
# strip newlines for presumed expansion inside a markdown table in UI
def get_stripped_default(orig, default):
if not orig:
orig = default
return orig.replace('\n', ' ').strip(' .')
# idempotently append to diagnostics string
# in case prior diagnostics are also useful to retain for user
new_diagnostics = get_stripped_default(diagnostics, "Unknown ingest() process failure")
diagnostics = get_stripped_default(dp["diagnostics"], new_diagnostics)
if diagnostics.endswith(new_diagnostics):
diagnostics += '.'
else:
diagnostics = '%s. %s' % (diagnostics, new_diagnostics)
# figure out changes for nicer, idempotent behavior and logging
update = {
k: v
for k, v in { "status": status, "diagnostics": diagnostics }.items()
if dp[k] != v
}
if update:
logger.debug("report_external_ops_error: Updating submission id=%(id)s status=%(status)s diagnostics=%(diagnostics)s" % dp)
logger.debug("report_external_ops_error: id=%s changes: %s" % (
id,
", ".join([ '%s="%s"' % (k, v) for k, v in update.items() ]),
))
registry.update_datapackage(id, **update)
else:
logger.debug('report_external_ops_error: No change needed on submission id=%(id)s status="%(status)s" diagnostics="%(diagnostics)s"' % dp)
@classmethod
def retrieve_datapackage(cls, archive_url, download_filename, archive_headers_map):
"""Idempotently stage datapackage content from archive_url into download_filename.
Uses a temporary download name and renames after successful
download, so we can assume a file present at this name is
already downloaded.
"""
if os.path.isfile(download_filename):
return
headers = dict()
if archive_headers_map is not None:
for pat, hdrs in archive_headers_map.items():
if re.fullmatch(pat, archive_url):
headers.update(hdrs)
fd, tmp_name = None, None
try:
# use a temporary download file in same dir
fd, tmp_name = tempfile.mkstemp(
suffix='.downloading',
prefix=os.path.basename(download_filename) + '_',
dir=os.path.dirname(download_filename),
text=False,
)
logger.debug('Downloading %s to temporary download file "%s"' % (
archive_url,
tmp_name,
))
r = requests.get(archive_url, headers=headers, stream=True)
r.raise_for_status()
for chunk in r.iter_content(chunk_size=128*1024):
os.write(fd, chunk)
os.close(fd)
logger.debug('Finished downloading to "%s"' % (tmp_name,))
fd = None
# rename completed download to canonical name
os.rename(tmp_name, download_filename)
logger.info('Renamed download "%s" to final "%s"' % (tmp_name, download_filename))
tmp_name = None
finally:
if fd is not None:
os.close(fd)
if tmp_name is not None:
os.remove(tmp_name)
@classmethod
def unpack_datapackage(cls, download_filename, content_path):
"""Idempotently unpack download_filename (a BDBag) into content_path (bag contents dir).
Uses a temporary unpack name and renames after successful
unpack, so we can assume a contents dir already present at
this name is already completely unpacked.
"""
if os.path.isdir(content_path):
return
tmp_name = None
try:
# use temporary unpack dir in same parent dir
tmp_name = tempfile.mkdtemp(
suffix='.unpacking',
prefix=os.path.basename(content_path) + '_',
dir=os.path.dirname(content_path),
)
logger.debug('Extracting "%s" in temporary unpack dir "%s"' % (
download_filename,
tmp_name,
))
abs_tmp_name = os.path.realpath(tmp_name)
# unpack ourselves so we can control output names vs. extract_bag()
def is_allowed_path(target):
"""Check that the archive member target path is safe
This is important with tarfiles and the zipfile module
documentation still warns about safety even though it
tries to do some path canonicalization.
We only process submitted archives from authenticated
and trustworthy peers, but should check anyway, since
they could unknowingly be preparing archives on a
compromised system.
NOTE: our path safety assumption is based on unpacking
into our new tmp_name directory, which starts empty,
and assuming we will not create any hard or symbolic
links inside this temporary sandbox.
"""
abs_target = os.path.realpath(os.path.join(abs_tmp_name, target))
prefix = os.path.commonprefix([abs_tmp_name, abs_target])
return prefix == abs_tmp_name
if zipfile.is_zipfile(download_filename):
with open(download_filename, 'rb') as bag_file:
with zipfile.ZipFile(bag_file) as decoder:
for member in decoder.infolist():
# this check may be redundant if zipfile is stripping '..' and leading '/'
#
# TODO: revisit if zipfile module starts supporting more than file+dir types
# (currently ZipInfo only has is_dir() but no is_file()
if not is_allowed_path(member.filename):
raise exception.InvalidDatapackage(
"Zipfile member %r not allowed" % (member.filename)
)
#
decoder.extractall(abs_tmp_name)
elif tarfile.is_tarfile(download_filename):
with tarfile.open(download_filename) as decoder:
def has_allowed_type(member):
return member.isfile() or member.isdir()
for member in decoder.getmembers():
# to maintain our safety assumptions, we should never extract links
# and other special file types are not appropriate for datapackages either...
if not (has_allowed_type(member)
and is_allowed_path(member.name)):
member_type = {
tarfile.LNKTYPE: 'link',
tarfile.SYMTYPE: 'symlink',
tarfile.FIFOTYPE: 'fifo special',
tarfile.CHRTYPE: 'char special',
tarfile.BLKTYPE: 'block special',
}.get(member.type, member.type)
raise exception.InvalidDatapackage(
"Tarfile member %r (type %r) not allowed" % (member.name, member_type)
)
#
decoder.extractall(abs_tmp_name)
else:
raise exception.InvalidDatapackage('Unknown or unsupported bag archive format')
logger.debug('Finished extracting to "%s"' % (tmp_name,))
children = glob('%s/*' % tmp_name)
if len(children) < 1:
raise exception.InvalidDatapackage('Did not find expected top-level folder in bag archive')
elif len(children) > 1:
raise exception.InvalidDatapackage('Found too many top-level folders in bag archive')
os.rename(children[0], content_path)
logger.info('Renamed output "%s" to final "%s"' % (children[0], content_path))
finally:
if tmp_name is not None:
shutil.rmtree(tmp_name)
@classmethod
def bdbag_validate(cls, content_path):
"""Perform BDBag validation of unpacked bag contents."""
if os.getenv('CFDE_SKIP_BDBAG', 'false').lower() == 'true':
logger.info('SKIPPING validation of bag "%s" due to CFDE_SKIP_BDBAG environment variable!' % content_path)
return
try:
logger.debug('Validating unpacked bag at "%s"' % (content_path,))
bdbag_api.validate_bag(content_path)
logger.info('Bag valid at %s' % content_path)
except (BagError, BagValidationError) as e:
logger.error('Validation failed for bag "%s" with error "%s"' % (content_path, e,))
raise exception.InvalidDatapackage(e)
@classmethod
def datapackage_name_from_path(cls, content_path):
"""Find datapackage name by globbing ./data/*.json under content_path."""
candidates = glob('%s/data/*.json' % content_path)
if len(candidates) < 1:
raise exception.FilenameError('Could not locate datapackage *.json file.')
elif len(candidates) > 1:
raise exception.FilenameError('Found too many (%d) potential datapackage *.json choices.' % (len(candidates),))
return candidates[0]
@classmethod
def datapackage_model_check(cls, content_path, pre_process=None):
"""Perform datapackage model validation for submission content.
This validation compares the JSON datapackage specification
against the expectations of the CFDE project for C2M2
datapackages, i.e. ensuring that the package does not
introduce any undesired deviations in the model definition.
"""
canon_dp = CfdeDataPackage(submission_schema_json)
packagefile = cls.datapackage_name_from_path(content_path)
if pre_process:
pre_process(content_path, packagefile)
submitted_dp = CfdeDataPackage(packagefile)
canon_dp.validate_model_subset(submitted_dp)
@classmethod
def datapackage_validate(cls, content_path, post_process=None, check_keys=True, check_fkeys=True):
"""Perform datapackage validation.
:param content_path: The path to the submission
:param post_process: Optional callback function with signature lambda content_path, packagefilename, report: ... (default None)
:param check_keys: Whether to check primary key and uniqueness constraints (default True)
:param check_fkeys: Whether to check foreign key reference constraints (default True)
This validation considers the TSV content of the datapackage
to be sure it conforms to its own JSON datapackage
specification.
"""
packagefile = cls.datapackage_name_from_path(content_path)
if os.getenv('CFDE_SKIP_FRICTIONLESS', 'false').lower() == 'true':
logger.info('SKIPPING validation of frictionless datapackage at "%s" due to CFDE_SKIP_FRICTIONLESS environment variable!' % packagefile)
return
logger.info('Validating frictionless datapackage at "%s"' % packagefile)
package = frictionless.Package(packagefile, trusted=False)
for resource in package.resources:
if not check_fkeys:
resource.schema.pop('foreignKeys', None)
if not check_keys:
resource.schema.pop('primaryKey', None)
for field in resource.schema.fields:
field.constraints.pop('unique', None)
# frictionless-py 4.14.0 doesn't like if we skip the CSV dialect...
resource.setdefault(
'dialect',
{
"delimiter": "\t",
"doubleQuote": False,
"lineTerminator": "\n",
"skipInitialSpace": True,
"header": True
},
)
report = frictionless.package.validate.validate(package, original=True, parallel=False)
if post_process:
post_process(content_path, packagefile, report)
if report.stats['errors'] > 0:
if report.errors:
message = report.errors[0].message
else:
message = report.flatten(['message'])[0][0]
raise exception.InvalidDatapackage(
'Found %d errors in datapackage "%s". First error: %s' % (
report.stats['errors'],
os.path.basename(packagefile),
message,
))
logger.info('Frictionless package valid.')
@classmethod
def provision_sqlite(cls, schema_json, sqlite_filename):
"""Idempotently prepare sqlite database, with givem model and base vocab."""
dp = CfdeDataPackage(schema_json)
# this with block produces a transaction in sqlite3
with sqlite3.connect(sqlite_filename) as conn:
logger.debug('Idempotently provisioning schema in %s' % (sqlite_filename,))
dp.provision_sqlite(conn)
dp.sqlite_import_data_files(conn, onconflict='skip')
@classmethod
def load_sqlite(cls, content_path, sqlite_filename, table_error_callback=None, progress=None, onconflict='skip'):
"""Idempotently insert submission content."""
if progress is None:
progress = dict()
packagefile = cls.datapackage_name_from_path(content_path)
submitted_dp = CfdeDataPackage(packagefile)
# this with block produces a transaction in sqlite3
with sqlite3.connect(sqlite_filename) as conn:
logger.debug('Idempotently loading data for %s into %s' % (content_path, sqlite_filename))
submitted_dp.sqlite_import_data_files(conn, onconflict=onconflict, table_error_callback=table_error_callback, progress=progress)
@classmethod
def sqlite_datapackage_check(cls, schema_json, content_path, sqlite_filename, table_error_callback=None, tablenames=None, progress=None):
canonical_dp = CfdeDataPackage(schema_json)
packagefile = cls.datapackage_name_from_path(content_path)
submitted_dp = CfdeDataPackage(packagefile)
with sqlite3.connect(sqlite_filename) as conn:
logger.debug('Checking database %s for submission %r against schema %r constraints' % (sqlite_filename, content_path, schema_json))
canonical_dp.check_sqlite_tables(conn, submitted_dp, table_error_callback, tablenames, progress)
@classmethod
def validate_submission_dcc_table(cls, sqlite_filename, submitting_dcc):
"""Validate that the dcc table in sqlite has exactly one row matching the submitting_dcc"""
with sqlite3.connect(sqlite_filename) as conn:
cur = conn.cursor()
cur.execute("""SELECT count(*) FROM dcc;""")
cnt = cur.fetchone()[0]
if cnt != 1:
raise exception.InvalidDatapackage('The CFDE submission must have one entry in the dcc table, not %d.' % cnt)
cur.execute("""SELECT id FROM dcc;""")
dcc_id = cur.fetchone()[0]
if dcc_id != submitting_dcc:
raise exception.InvalidDatapackage('Submission dcc.id = %s does not match submitting DCC %s' % (dcc_id, submitting_dcc,))
cur.execute("""
SELECT
i.id,
p.local_id,
pr.nid IS NOT NULL AS is_project_root
FROM dcc d
JOIN project p ON (d.project = p.nid)
JOIN id_namespace i ON (p.id_namespace = i.nid)
LEFT OUTER JOIN project_root pr ON (d.project = pr.project);
""")
id_namespace, local_id, is_root = cur.fetchone()
if not is_root:
raise exception.InvalidDatapackage('DCC project identifier (%s, %s) does not designate a root in the project hierarchy' % (
id_namespace,
local_id
))
# TODO: revisit if DCCs permitted to submit records not attributed to themselves!
cur.execute("""
SELECT
i.id,
p.local_id
FROM (
SELECT p.nid FROM project p
EXCEPT
SELECT pipt.member_project
FROM project_in_project_transitive pipt
JOIN dcc d ON (d.project = pipt.leader_project)
) s
JOIN project p ON (s.nid = p.nid)
JOIN id_namespace i ON (p.id_namespace = i.nid);
""")
rows = list(cur)
if rows:
id_namespace2, local_id2 = rows[0]
raise exception.InvalidDatapackage('Project identifier (%r, %r) and %d others not connected as a sub-project of the root DCC project (%r, %r)' % (
id_namespace2,
local_id2,
len(rows) - 1,
id_namespace,
local_id,
))
@classmethod
def validate_collection_names(cls, sqlite_filename):
"""Validate that collection.name is unique within this submission"""
with sqlite3.connect(sqlite_filename) as conn:
cur = conn.cursor()
cur.execute("""
SELECT "name", count(*)
FROM collection
WHERE "name" IS NOT NULL
GROUP BY "name"
HAVING count(*) > 1
ORDER BY count(*) DESC;
""")
for row in cur:
nm, cnt = row
raise exception.InvalidDatapackage('Submission collection.name = %r occurs %d times, but must be unique within a single submission' % (nm, cnt))
@classmethod
def _test_get_sqlite_etl_sql(cls):
submission_dp = CfdeDataPackage(submission_schema_json)
prep_dp = CfdeDataPackage(portal_prep_schema_json)
return [
prep_dp.generate_resource_etl_sql(submission_dp, 'submission', resource)
for resource in prep_dp.package_def['resources']
if 'derivation_sql_path' in resource
]
@classmethod
def prepare_sqlite_derived_data(cls, sqlite_filename, progress=None, attach={}):
"""Prepare derived content via embedded SQL ETL
This method will clear and recompute the derived results
each time it is invoked.
"""
if progress is None:
progress = dict()
submission_dp = CfdeDataPackage(submission_schema_json)
prep_dp = CfdeDataPackage(portal_prep_schema_json)
def array_join(j, sep):
if j is None:
return None
try:
a = json.loads(j)
except Exception as e:
logger.error('array_join(%r) JSON decode failed: %s' % (j, e))
raise
if not isinstance(a, list):
logger.error('array_join unexpected array input %r' % (j,))
raise ValueError(j)
if not isinstance(sep, str):
logger.error('array_join unexpected separator input %r' % (sep,))
raise ValueError(sep)
return sep.join(a)
def json_sorted(j):
if j is None:
return None
try:
v = json.loads(j)
except Exception as e:
logger.error('json_sorted(%r) JSON decode failed: %s' % (j, e))
raise
if not isinstance(v, list):
logger.error('json_sorted unexpected input %r' % (j,))
raise ValueError(j)