Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update advanced-csv-import #891

Merged
merged 8 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions OpenOversight/app/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ def bulk_add_officers(filename, no_create, update_by_name, update_static_fields)
@click.option("--links-csv", type=click.Path(exists=True))
@click.option("--incidents-csv", type=click.Path(exists=True))
@click.option("--force-create", is_flag=True, help="Only for development/testing!")
@click.option("--overwrite-assignments", is_flag=True)
@with_appcontext
def advanced_csv_import(
department_name,
Expand All @@ -502,6 +503,7 @@ def advanced_csv_import(
links_csv,
incidents_csv,
force_create,
overwrite_assignments,
):
"""
Add or update officers, assignments, salaries, links and incidents from csv
Expand All @@ -524,6 +526,7 @@ def advanced_csv_import(
links_csv,
incidents_csv,
force_create,
overwrite_assignments
)


Expand Down
152 changes: 123 additions & 29 deletions OpenOversight/app/csv_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ def _create_or_update_model(
update_method,
force_create=False,
model=None,
always_create=False,
):
if not row["id"]:
if (always_create and not force_create) or not row["id"]:
return create_method(row)
else:
if not force_create:
Expand Down Expand Up @@ -102,7 +103,7 @@ def _handle_officers_csv(
with _csv_reader(officers_csv) as csv_reader:
_check_provided_fields(
csv_reader,
required_fields=["id", "department_name"],
required_fields=["id", "department_name"] if not force_create else ["id"],
optional_fields=[
"last_name",
"first_name",
Expand All @@ -113,8 +114,10 @@ def _handle_officers_csv(
"employment_date",
"birth_year",
"unique_internal_identifier",
"department_name",
# the following are unused, but allowed since they are included in the csv output
"badge_number",
"unique_identifier",
"job_title",
"most_recent_salary",
"last_employment_date",
Expand All @@ -125,7 +128,8 @@ def _handle_officers_csv(

for row in csv_reader:
# can only update department with given name
assert row["department_name"] == department_name
if not force_create:
assert row["department_name"] == department_name
row["department_id"] = department_id
connection_id = row["id"]
if row["id"].startswith("#"):
Expand All @@ -148,24 +152,39 @@ def _handle_officers_csv(


def _handle_assignments_csv(
incidents_csv: str,
assignments_csv: str,
department_id: int,
all_officers: Dict[str, Officer],
force_create: bool,
overwrite_assignments: bool,
) -> None:
counter = 0
with _csv_reader(incidents_csv) as csv_reader:
with _csv_reader(assignments_csv) as csv_reader:
field_names = csv_reader.fieldnames
if "start_date" in field_names:
field_names[field_names.index("start_date")] = "star_date"
if "badge_number" in field_names:
field_names[field_names.index("badge_number")] = "star_no"
if "end_date" in field_names:
field_names[field_names.index("end_date")] = "resign_date"
if "unit_description" in field_names:
field_names[field_names.index("unit_description")] = "unit_name"
required_fields = ["officer_id", "job_title"]
if not overwrite_assignments:
required_fields.append("id")

_check_provided_fields(
csv_reader,
required_fields=["id", "officer_id", "job_title"],
optional_fields=["star_no", "unit_id", "star_date", "resign_date"],
required_fields=required_fields,
optional_fields=[
"id",
"star_no",
"unit_id",
"unit_name",
"star_date",
"resign_date",
"officer_unique_identifier",
],
csv_name="assignments",
)
jobs_for_department = list(
Expand All @@ -174,33 +193,101 @@ def _handle_assignments_csv(
job_title_to_id = {
job.job_title.strip().lower(): job.id for job in jobs_for_department
}
existing_assignments = (
Assignment.query.join(Assignment.baseofficer)
.filter(Officer.department_id == department_id)
.all()
)
id_to_assignment = {
assignment.id: assignment for assignment in existing_assignments
unit_descrip_to_id = {
unit.descrip.strip().lower(): unit.id
for unit in Unit.query.filter_by(department_id=department_id).all()
}
for row in csv_reader:
if row.get("unit_id"):
assert (
Unit.query.filter_by(id=int(row.get("unit_id"))).one().department.id
== department_id
)
job_id = job_title_to_id.get(row["job_title"].strip().lower())
if job_id is None:
if overwrite_assignments:
id_to_assignment = {}
rows = []
all_rel_officers = set()
for row in csv_reader:
rows.append(row)
officer_id = row["officer_id"]
if officer_id != "" and officer_id[0] != "#":
all_rel_officers.add(int(officer_id))
wrong_department = all_rel_officers - set(
[int(oid) for oid in all_officers.keys() if oid[0] != "#"]
)
if len(wrong_department) > 0:
raise Exception(
"Job title {} not found for department.".format(row["job_title"])
"Referenced {} officers in assignment csv that belong to different department. Example ids: {}".format(
len(wrong_department),
", ".join(map(str, list(wrong_department)[:3])),
)
)
row["job_id"] = job_id
print(
"Deleting assignments from {} officers to overwrite.".format(
len(all_rel_officers)
)
)
(
db.session.query(Assignment)
.filter(Assignment.officer_id.in_(all_rel_officers))
.delete(synchronize_session=False)
)
db.session.flush()
# assign rows to csv_reader since we already iterated over reader
csv_reader = rows
else:
existing_assignments = (
Assignment.query.join(Assignment.baseofficer)
.filter(Officer.department_id == department_id)
.all()
)
id_to_assignment = {
assignment.id: assignment for assignment in existing_assignments
}
for row in csv_reader:
officer = all_officers.get(row["officer_id"])
if not officer:
raise Exception(
"Officer with id {} does not exist (in this department)".format(
row["officer_id"]
)
)
if row.get("unit_id"):
assert (
Unit.query.filter_by(id=int(row.get("unit_id"))).one().department.id
== department_id
)
elif row.get("unit_name"):
unit_name = row["unit_name"].strip()
descrip = unit_name.lower()
unit_id = unit_descrip_to_id.get(descrip)
if unit_id is None:
unit = Unit(
descrip=unit_name,
department_id=officer.department_id,
)
db.session.add(unit)
db.session.flush()
unit_id = unit.id
unit_descrip_to_id[descrip] = unit_id
row["unit_id"] = unit_id
job_title = row["job_title"].strip().lower()
job_id = job_title_to_id.get(job_title)
if job_id is None:
num_existing_ranks = len(
Job.query.filter_by(department_id=officer.department_id).all()
)
if num_existing_ranks > 0:
auto_order = num_existing_ranks + 1
else:
auto_order = 0
# create new job
job = Job(
job_title=row["job_title"].strip(),
is_sworn_officer=False,
department_id=officer.department_id,
order=auto_order,
)
db.session.add(job)
db.session.flush()
job_id = job.id
job_title_to_id[job_title] = job_id

row["job_id"] = job_id
row["officer_id"] = officer.id
_create_or_update_model(
row=row,
Expand All @@ -209,6 +296,7 @@ def _handle_assignments_csv(
update_method=update_assignment_from_dict,
force_create=force_create,
model=Assignment,
always_create=overwrite_assignments,
)
counter += 1
if counter % 1000 == 0:
Expand Down Expand Up @@ -397,26 +485,32 @@ def import_csv_files(
links_csv: Optional[str],
incidents_csv: Optional[str],
force_create: bool = False,
overwrite_assignments: bool = False,
):
department = Department.query.filter_by(name=department_name).one_or_none()
if department is None:
raise Exception("Department with name '{}' does not exist!".format(department_name))
raise Exception(
"Department with name '{}' does not exist!".format(department_name)
)
department_id = department.id

existing_officers = Officer.query.filter_by(department_id=department_id).all()
id_to_officer = {officer.id: officer for officer in existing_officers}
all_officers = {str(k): v for k, v in id_to_officer.items()}

if officers_csv is not None:
new_officers = _handle_officers_csv(
officers_csv, department_name, department_id, id_to_officer, force_create
)

all_officers = {str(k): v for k, v in id_to_officer.items()}
all_officers.update(new_officers)
all_officers.update(new_officers)

if assignments_csv is not None:
_handle_assignments_csv(
assignments_csv, department_id, all_officers, force_create
assignments_csv,
department_id,
all_officers,
force_create,
overwrite_assignments,
)

if salaries_csv is not None:
Expand Down
98 changes: 98 additions & 0 deletions OpenOversight/tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,104 @@ def test_advanced_csv_import__force_create(session, department_with_ranks, tmp_p
assert cop1.links[0] == link


def test_advanced_csv_import__overwrite_assignments(
session, department_with_ranks, tmp_path
):
tmp_path = str(tmp_path)

department_name = department_with_ranks.name

other_department = Department(name="Other department", short_name="OPD")
session.add(other_department)

officer = Officer(
id=99001,
department_id=department_with_ranks.id,
first_name="Already",
last_name="InDatabase",
)
officer2 = Officer(
id=99002,
department_id=department_with_ranks.id,
first_name="Also",
last_name="InDatabase",
)
assignment = Assignment(
id=123,
officer_id=officer.id,
job_id=Job.query.filter_by(job_title="Police Officer").first().id,
)
assignment2 = Assignment(
id=124,
officer_id=officer2.id,
job_id=Job.query.filter_by(job_title="Police Officer").first().id,
)
session.add(officer)
session.add(assignment)
session.add(officer2)
session.add(assignment2)
session.flush()

# create temporary csv files
officers_data = [
{
"id": "#1",
"department_name": department_name,
"last_name": "Test",
"first_name": "Second",
},
]

officers_csv = _create_csv(officers_data, tmp_path, "officers.csv")

assignments_data = [
{
"officer_id": 99001,
"job title": "Captain",
"badge number": "12345",
"start date": "2020-07-24",
},
{
"officer_id": "#1",
"job title": "Police Officer",
"badge number": "999",
"start date": "2020-07-21",
},
]
assignments_csv = _create_csv(assignments_data, tmp_path, "assignments.csv")

# run command with --overwrite-assignments
result = run_command_print_output(
advanced_csv_import,
[
str(department_with_ranks.name),
"--officers-csv",
officers_csv,
"--assignments-csv",
assignments_csv,
"--overwrite-assignments",
],
)

# make sure command did not fail
assert result.exception is None
assert result.exit_code == 0

# make sure all the data is imported as expected
cop1 = Officer.query.get(99001)
assert len(cop1.assignments.all()) == 1
assert cop1.assignments[0].star_no == "12345"

cop2 = Officer.query.get(99002)
assert len(cop2.assignments.all()) == 1
assert cop2.assignments[0] == Assignment.query.get(124)

cop3 = Officer.query.filter_by(first_name="Second").first()
assert len(cop3.assignments.all()) == 1
assert cop3.assignments[0].star_no == "999"
assert cop3.assignments[0].job.job_title == "Police Officer"


def test_advanced_csv_import__extra_fields_officers(
session, department_with_ranks, tmp_path
):
Expand Down
Loading