-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexport_kakuma_advert_contacts.py
198 lines (171 loc) · 9.34 KB
/
export_kakuma_advert_contacts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
import argparse
import csv
import json
from collections import OrderedDict
from core_data_modules.logging import Logger
from core_data_modules.cleaners import Codes
from core_data_modules.traced_data.io import TracedDataJsonIO
from id_infrastructure.firestore_uuid_table import FirestoreUuidTable
from storage.google_cloud import google_cloud_utils
from src.lib import PipelineConfiguration
from configurations.code_schemes import CodeSchemes
Logger.set_project_name("WUSC-KEEP-II")
log = Logger(__name__)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Generates a CSV of phone numbers and their repective Textit ISO 639 "
"language code based on their manually labeled demographic response")
parser.add_argument("google_cloud_credentials_file_path", metavar="google-cloud-credentials-file-path",
help="Path to a Google Cloud service account credentials file to use to access the "
"credentials bucket")
parser.add_argument("pipeline_configuration_file_path", metavar="pipeline-configuration-file",
help="Path to WUSC-KEEP-II-KAKUMA pipeline configuration json file")
parser.add_argument("listening_group_data_dir", metavar="listening-group-data-dir",
help="Directory path to read listening group CSV files to extract listening group data from,")
parser.add_argument("messages_traced_data_paths", metavar="messages-traced-data-paths", nargs="+",
help="Paths to the messages traced data files to extract phone numbers from")
parser.add_argument("contacts_csv_path", metavar="contacts-csv-path",
help="CSV file path to write the contacts data to")
args = parser.parse_args()
google_cloud_credentials_file_path = args.google_cloud_credentials_file_path
pipeline_configuration_file_path = args.pipeline_configuration_file_path
listening_group_data_dir = args.listening_group_data_dir
messages_traced_data_paths = args.messages_traced_data_paths
contacts_csv_path = args.contacts_csv_path
# Read the settings from the configuration file
log.info("Loading Pipeline Configuration File...")
with open(pipeline_configuration_file_path) as f:
pipeline_configuration = PipelineConfiguration.from_configuration_file(f)
assert pipeline_configuration.pipeline_name in ["kakuma_s01_pipeline", "kakuma_s02_pipeline", "kakuma_s03_pipeline", "kakuma_all_seasons_pipeline"], \
"PipelineName must be either a 'seasonal pipeline' or 'all seasons pipeline'"
log.info("Downloading Firestore UUID Table credentials...")
firestore_uuid_table_credentials = json.loads(google_cloud_utils.download_blob_to_string(
google_cloud_credentials_file_path,
pipeline_configuration.phone_number_uuid_table.firebase_credentials_file_url
))
phone_number_uuid_table = FirestoreUuidTable(
pipeline_configuration.phone_number_uuid_table.table_name,
firestore_uuid_table_credentials,
"avf-phone-uuid-"
)
log.info("Initialised the Firestore UUID table")
# Search the Messages TracedData and listening group CSV files for uuids for kakuma participants based on their
# manually labelled household language i.e Oromo/Sudanese-Juba-arabic/Somali/English/Swahili speakers.
oromo_uuids = set()
sudanese_juba_arabic_uuids = set()
turkana_uuids = set()
somali_uuids = set()
english_uuids = set()
swahili_uuids = set()
all_uuids = set()
for path in messages_traced_data_paths:
# Load the traced data
log.info(f"Loading previous traced data from file '{path}'...")
with open(path) as f:
messages = TracedDataJsonIO.import_jsonl_to_traced_data_iterable(f)
log.info(f"Loaded {len(messages)} traced data objects")
log.info(f'Searching for the participants uuids vis-a-vis` their manually labelled '
f'demographic language response')
for msg in messages:
if msg['uid'] in all_uuids or msg["consent_withdrawn"] == Codes.TRUE:
continue
all_uuids.add(msg['uid'])
if CodeSchemes.KAKUMA_HOUSEHOLD_LANGUAGE.get_code_with_code_id(
msg["household_language_coded"]["CodeID"]).string_value == "oromo":
oromo_uuids.add(msg['uid'])
elif CodeSchemes.KAKUMA_HOUSEHOLD_LANGUAGE.get_code_with_code_id(
msg["household_language_coded"]["CodeID"]).string_value == "sudanese":
sudanese_juba_arabic_uuids.add(msg['uid'])
elif CodeSchemes.KAKUMA_HOUSEHOLD_LANGUAGE.get_code_with_code_id(
msg["household_language_coded"]["CodeID"]).string_value == "turkana":
turkana_uuids.add(msg['uid'])
elif CodeSchemes.KAKUMA_HOUSEHOLD_LANGUAGE.get_code_with_code_id(
msg["household_language_coded"]["CodeID"]).string_value == "somali":
somali_uuids.add(msg['uid'])
elif CodeSchemes.KAKUMA_HOUSEHOLD_LANGUAGE.get_code_with_code_id(
msg["household_language_coded"]["CodeID"]).string_value == "english":
english_uuids.add(msg['uid'])
else:
swahili_uuids.add(msg['uid'])
# Load all listening group de-identified CSV files
listening_group_csvs = []
for listening_group_csv_url in pipeline_configuration.listening_group_csv_urls:
listening_group_csvs.append(listening_group_csv_url.split("/")[-1])
for listening_group_csv in listening_group_csvs:
with open(f'{listening_group_data_dir}/Raw Data/{listening_group_csv}', "r", encoding='utf-8-sig') as f:
data = list(csv.DictReader(f))
log.info(
f'Loaded {len(data)} listening group participants from {listening_group_data_dir}/Raw Data/{listening_group_csv}')
# Add the lg avf-phone-uuids to their respective language set
for row in data:
if row['avf-phone-uuid'] in all_uuids: # because we are giving precedence to the participants manually
# labeled language
continue
all_uuids.add(row['avf-phone-uuid'])
if row['Language'] == 'orm':
oromo_uuids.add(row['avf-phone-uuid'])
elif row['Language'] == 'apd':
sudanese_juba_arabic_uuids.add(row['avf-phone-uuid'])
elif row['Language'] == 'tuv':
turkana_uuids.add(row['avf-phone-uuid'])
elif row['Language'] == 'som':
somali_uuids.add(row['avf-phone-uuid'])
elif row['Language'] == 'eng':
english_uuids.add(row['avf-phone-uuid'])
else:
swahili_uuids.add(row['avf-phone-uuid'])
# Convert the uuids to phone numbers
log.info("Converting the uuids to phone numbers...")
uuids_to_phone_numbers = phone_number_uuid_table.uuid_to_data_batch(list(all_uuids))
oromo_phone_numbers = [f"+{uuids_to_phone_numbers[uuid]}" for uuid in oromo_uuids]
sudanese_phone_numbers = [f"+{uuids_to_phone_numbers[uuid]}" for uuid in sudanese_juba_arabic_uuids]
turkana_phone_numbers = [f"+{uuids_to_phone_numbers[uuid]}" for uuid in turkana_uuids]
somali_phone_numbers = [f"+{uuids_to_phone_numbers[uuid]}" for uuid in somali_uuids]
english_phone_numbers = [f"+{uuids_to_phone_numbers[uuid]}" for uuid in english_uuids]
swahili_phone_numbers = [f"+{uuids_to_phone_numbers[uuid]}" for uuid in swahili_uuids]
# Export the phone number and language pairs to a CSV
# TODO upload this to keep_ii_kakuma textit instance through API?
advert_contacts = OrderedDict()
for phone_number in oromo_phone_numbers:
advert_contacts[phone_number] = {
"URN:Tel": phone_number,
"Name": None,
"Language": 'orm'
}
for phone_number in sudanese_phone_numbers:
advert_contacts[phone_number] = {
"URN:Tel": phone_number,
"Name": None,
"Language": 'apd'
}
for phone_number in turkana_phone_numbers:
advert_contacts[phone_number] = {
"URN:Tel": phone_number,
"Name": None,
"Language": 'tuv'
}
for phone_number in somali_phone_numbers:
advert_contacts[phone_number] = {
"URN:Tel": phone_number,
"Name": None,
"Language": 'som'
}
for phone_number in english_phone_numbers:
advert_contacts[phone_number] = {
"URN:Tel": phone_number,
"Name": None,
"Language": 'eng'
}
for phone_number in swahili_phone_numbers:
advert_contacts[phone_number] = {
"URN:Tel": phone_number,
"Name": None,
"Language": 'swh'
}
log.warning(f"Exporting {len(advert_contacts)} contacts to {contacts_csv_path}")
with open(contacts_csv_path, "w") as f:
headers = ["URN:Tel", "Name", "Language"]
writer = csv.DictWriter(f, fieldnames=headers, lineterminator="\n")
writer.writeheader()
for phone_number in advert_contacts.values():
writer.writerow(phone_number)
log.info(f"Wrote {len(advert_contacts)} contacts to {contacts_csv_path}")