Skip to content

Commit

Permalink
Update advanced-csv-import (lucyparsons#891)
Browse files Browse the repository at this point in the history
* Backport csv-import.

* Fix reference issue with upload script if officers aren't defined

* backport csv_import

* Add unit/job creation on upload for the advanced assignments importer

* Update docs

* Update required/optional fields for csv_import.

* Allow field named unit_name.

* Add assignment overwrite functionality.

* Add unit test for assignment overwriting command.

* Update documentation with added flag.

* Pin package versions.

Co-authored-by: Madison Swain-Bowden <[email protected]>
  • Loading branch information
2 people authored and sea-kelp committed Oct 5, 2023
1 parent 7a32d14 commit 2210a80
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 37 deletions.
3 changes: 3 additions & 0 deletions OpenOversight/app/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ def bulk_add_officers(filename, no_create, update_by_name, update_static_fields)
@click.option("--links-csv", type=click.Path(exists=True))
@click.option("--incidents-csv", type=click.Path(exists=True))
@click.option("--force-create", is_flag=True, help="Only for development/testing!")
@click.option("--overwrite-assignments", is_flag=True)
@with_appcontext
def advanced_csv_import(
department_name,
Expand All @@ -567,6 +568,7 @@ def advanced_csv_import(
links_csv,
incidents_csv,
force_create,
overwrite_assignments,
):
"""
Add or update officers, assignments, salaries, links and incidents from csv
Expand All @@ -589,6 +591,7 @@ def advanced_csv_import(
links_csv,
incidents_csv,
force_create,
overwrite_assignments
)


Expand Down
93 changes: 73 additions & 20 deletions OpenOversight/app/csv_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ def _create_or_update_model(
update_method,
force_create=False,
model=None,
always_create=False,
):
if not row["id"]:
if (always_create and not force_create) or not row["id"]:
return create_method(row)
else:
if not force_create:
Expand Down Expand Up @@ -102,7 +103,7 @@ def _handle_officers_csv(
with _csv_reader(officers_csv) as csv_reader:
_check_provided_fields(
csv_reader,
required_fields=["id", "department_name"],
required_fields=["id", "department_name"] if not force_create else ["id"],
optional_fields=[
"last_name",
"first_name",
Expand All @@ -113,8 +114,10 @@ def _handle_officers_csv(
"employment_date",
"birth_year",
"unique_internal_identifier",
"department_name",
# the following are unused, but allowed since they are included in the csv output
"badge_number",
"unique_identifier",
"job_title",
"most_recent_salary",
"last_employment_date",
Expand All @@ -125,7 +128,8 @@ def _handle_officers_csv(

for row in csv_reader:
# can only update department with given name
assert row["department_name"] == department_name
if not force_create:
assert row["department_name"] == department_name
row["department_id"] = department_id
connection_id = row["id"]
if row["id"].startswith("#"):
Expand All @@ -148,29 +152,38 @@ def _handle_officers_csv(


def _handle_assignments_csv(
incidents_csv: str,
assignments_csv: str,
department_id: int,
all_officers: Dict[str, Officer],
force_create: bool,
overwrite_assignments: bool,
) -> None:
counter = 0
with _csv_reader(incidents_csv) as csv_reader:
with _csv_reader(assignments_csv) as csv_reader:
field_names = csv_reader.fieldnames
if "start_date" in field_names:
field_names[field_names.index("start_date")] = "star_date"
if "badge_number" in field_names:
field_names[field_names.index("badge_number")] = "star_no"
if "end_date" in field_names:
field_names[field_names.index("end_date")] = "resign_date"
if "unit_description" in field_names:
field_names[field_names.index("unit_description")] = "unit_name"
required_fields = ["officer_id", "job_title"]
if not overwrite_assignments:
required_fields.append("id")

_check_provided_fields(
csv_reader,
required_fields=["id", "officer_id", "job_title"],
required_fields=required_fields,
optional_fields=[
"id",
"star_no",
"unit_id",
"unit_description",
"unit_name",
"star_date",
"resign_date",
"officer_unique_identifier",
],
csv_name="assignments",
)
Expand All @@ -184,14 +197,47 @@ def _handle_assignments_csv(
unit.descrip.strip().lower(): unit.id
for unit in Unit.query.filter_by(department_id=department_id).all()
}
existing_assignments = (
Assignment.query.join(Assignment.baseofficer)
.filter(Officer.department_id == department_id)
.all()
)
id_to_assignment = {
assignment.id: assignment for assignment in existing_assignments
}
if overwrite_assignments:
id_to_assignment = {}
rows = []
all_rel_officers = set()
for row in csv_reader:
rows.append(row)
officer_id = row["officer_id"]
if officer_id != "" and officer_id[0] != "#":
all_rel_officers.add(int(officer_id))
wrong_department = all_rel_officers - set(
[int(oid) for oid in all_officers.keys() if oid[0] != "#"]
)
if len(wrong_department) > 0:
raise Exception(
"Referenced {} officers in assignment csv that belong to different department. Example ids: {}".format(
len(wrong_department),
", ".join(map(str, list(wrong_department)[:3])),
)
)
print(
"Deleting assignments from {} officers to overwrite.".format(
len(all_rel_officers)
)
)
(
db.session.query(Assignment)
.filter(Assignment.officer_id.in_(all_rel_officers))
.delete(synchronize_session=False)
)
db.session.flush()
# assign rows to csv_reader since we already iterated over reader
csv_reader = rows
else:
existing_assignments = (
Assignment.query.join(Assignment.baseofficer)
.filter(Officer.department_id == department_id)
.all()
)
id_to_assignment = {
assignment.id: assignment for assignment in existing_assignments
}
for row in csv_reader:
officer = all_officers.get(row["officer_id"])
if not officer:
Expand All @@ -205,12 +251,13 @@ def _handle_assignments_csv(
Unit.query.filter_by(id=int(row.get("unit_id"))).one().department.id
== department_id
)
elif row.get("unit_description"):
descrip = row["unit_description"].strip().lower()
elif row.get("unit_name"):
unit_name = row["unit_name"].strip()
descrip = unit_name.lower()
unit_id = unit_descrip_to_id.get(descrip)
if unit_id is None:
unit = Unit(
descrip=row["unit_description"],
descrip=unit_name,
department_id=officer.department_id,
)
db.session.add(unit)
Expand All @@ -230,7 +277,7 @@ def _handle_assignments_csv(
auto_order = 0
# create new job
job = Job(
job_title=row["job_title"],
job_title=row["job_title"].strip(),
is_sworn_officer=False,
department_id=officer.department_id,
order=auto_order,
Expand All @@ -249,6 +296,7 @@ def _handle_assignments_csv(
update_method=update_assignment_from_dict,
force_create=force_create,
model=Assignment,
always_create=overwrite_assignments,
)
counter += 1
if counter % 1000 == 0:
Expand Down Expand Up @@ -437,6 +485,7 @@ def import_csv_files(
links_csv: Optional[str],
incidents_csv: Optional[str],
force_create: bool = False,
overwrite_assignments: bool = False,
):
department = Department.query.filter_by(name=department_name).one_or_none()
if department is None:
Expand All @@ -457,7 +506,11 @@ def import_csv_files(

if assignments_csv is not None:
_handle_assignments_csv(
assignments_csv, department_id, all_officers, force_create
assignments_csv,
department_id,
all_officers,
force_create,
overwrite_assignments,
)

if salaries_csv is not None:
Expand Down
98 changes: 98 additions & 0 deletions OpenOversight/tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,104 @@ def test_advanced_csv_import__force_create(session, department_with_ranks, tmp_p
assert cop1.links[0] == link


def test_advanced_csv_import__overwrite_assignments(
session, department_with_ranks, tmp_path
):
tmp_path = str(tmp_path)

department_name = department_with_ranks.name

other_department = Department(name="Other department", short_name="OPD")
session.add(other_department)

officer = Officer(
id=99001,
department_id=department_with_ranks.id,
first_name="Already",
last_name="InDatabase",
)
officer2 = Officer(
id=99002,
department_id=department_with_ranks.id,
first_name="Also",
last_name="InDatabase",
)
assignment = Assignment(
id=123,
officer_id=officer.id,
job_id=Job.query.filter_by(job_title="Police Officer").first().id,
)
assignment2 = Assignment(
id=124,
officer_id=officer2.id,
job_id=Job.query.filter_by(job_title="Police Officer").first().id,
)
session.add(officer)
session.add(assignment)
session.add(officer2)
session.add(assignment2)
session.flush()

# create temporary csv files
officers_data = [
{
"id": "#1",
"department_name": department_name,
"last_name": "Test",
"first_name": "Second",
},
]

officers_csv = _create_csv(officers_data, tmp_path, "officers.csv")

assignments_data = [
{
"officer_id": 99001,
"job title": "Captain",
"badge number": "12345",
"start date": "2020-07-24",
},
{
"officer_id": "#1",
"job title": "Police Officer",
"badge number": "999",
"start date": "2020-07-21",
},
]
assignments_csv = _create_csv(assignments_data, tmp_path, "assignments.csv")

# run command with --overwrite-assignments
result = run_command_print_output(
advanced_csv_import,
[
str(department_with_ranks.name),
"--officers-csv",
officers_csv,
"--assignments-csv",
assignments_csv,
"--overwrite-assignments",
],
)

# make sure command did not fail
assert result.exception is None
assert result.exit_code == 0

# make sure all the data is imported as expected
cop1 = Officer.query.get(99001)
assert len(cop1.assignments.all()) == 1
assert cop1.assignments[0].star_no == "12345"

cop2 = Officer.query.get(99002)
assert len(cop2.assignments.all()) == 1
assert cop2.assignments[0] == Assignment.query.get(124)

cop3 = Officer.query.filter_by(first_name="Second").first()
assert len(cop3.assignments.all()) == 1
assert cop3.assignments[0].star_no == "999"
assert cop3.assignments[0].job.job_title == "Police Officer"


def test_advanced_csv_import__extra_fields_officers(
session, department_with_ranks, tmp_path
):
Expand Down
Loading

0 comments on commit 2210a80

Please sign in to comment.