Skip to content

Commit

Permalink
Fix transformers based on schema changes
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Sep 9, 2024
1 parent c7ac80a commit eb37cb2
Show file tree
Hide file tree
Showing 19 changed files with 225 additions and 154 deletions.
2 changes: 0 additions & 2 deletions oonidata/src/oonidata/models/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ class WebAnalysis:
analysis_id: str
observation_id: str

created_at: datetime

# This is the domain name associated with the target, for example for
# facebook it will be www.facebook.com, but also edge-mqtt.facebook.com
target_domain_name: str
Expand Down
2 changes: 2 additions & 0 deletions oonidata/src/oonidata/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class ProcessingMeta:
UInt16 = Annotated[int, "UInt16"]
UInt32 = Annotated[int, "UInt32"]

Float64 = Annotated[float, "Float64"]

ArrayString = Annotated[List[str], "Array(String)"]

OptionalDatetime64_3 = Annotated[Optional[datetime], "Nullable(DateTime64(3, 'UTC'))"]
Expand Down
74 changes: 45 additions & 29 deletions oonidata/src/oonidata/models/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from oonidata.models.base import (
ArrayString,
Float64,
OptionalDatetime,
UInt16,
UInt32,
Expand All @@ -34,7 +35,9 @@ class MeasurementMeta:
software_version: str
test_name: str
test_version: str
test_runtime: Float64

test_helper_address: str = ""
input: str = ""


Expand Down Expand Up @@ -117,37 +120,38 @@ class TLSObservation:
timestamp: datetime

failure: Failure
success: bool

server_name: str
version: str
cipher_suite: str

ip: Optional[str] = None
port: Optional[int] = None
ip: str = ""
port: int = 0

is_certificate_valid: Optional[bool] = None
is_certificate_valid: bool = False

end_entity_certificate_fingerprint: Optional[str] = None
end_entity_certificate_subject: Optional[str] = None
end_entity_certificate_subject_common_name: Optional[str] = None
end_entity_certificate_issuer: Optional[str] = None
end_entity_certificate_issuer_common_name: Optional[str] = None
end_entity_certificate_fingerprint: str = ""
end_entity_certificate_subject: str = ""
end_entity_certificate_subject_common_name: str = ""
end_entity_certificate_issuer: str = ""
end_entity_certificate_issuer_common_name: str = ""
end_entity_certificate_san_list: List[str] = field(default_factory=list)
end_entity_certificate_not_valid_after: Optional[datetime] = None
end_entity_certificate_not_valid_before: Optional[datetime] = None
peer_certificates: List[bytes] = field(default_factory=list)
certificate_chain_length: Optional[int] = None
certificate_chain_length: int = 0
certificate_chain_fingerprints: List[str] = field(default_factory=list)

handshake_read_count: Optional[int] = None
handshake_write_count: Optional[int] = None
handshake_read_bytes: Optional[float] = None
handshake_write_bytes: Optional[float] = None
handshake_last_operation: Optional[str] = None
handshake_time: Optional[float] = None
handshake_read_count: int = 0
handshake_write_count: int = 0
handshake_read_bytes: float = 0
handshake_write_bytes: float = 0
handshake_last_operation: str = ""
handshake_time: float = 0

transaction_id: Optional[int] = None
t: Optional[float] = None
transaction_id: int = 0
t: float = 0


@dataclass
Expand Down Expand Up @@ -186,7 +190,13 @@ class TCPObservation:

@table_model(
table_name="obs_web_ctrl",
table_index=("measurement_uid", "observation_id", "measurement_start_time"),
table_index=(
"measurement_start_time",
"fqdn",
"ip",
"measurement_uid",
"observation_idx",
),
)
@dataclass
class WebControlObservation:
Expand All @@ -204,7 +214,6 @@ class WebControlObservation:
ip_as_cc: str = ""
ip_cc: str = ""
ip_is_bogon: bool = False
# Changed in > v5.0.0-alpha.4

dns_failure: str = ""
dns_success: bool = False
Expand All @@ -224,7 +233,14 @@ class WebControlObservation:

@table_model(
table_name="obs_web",
table_index=("measurement_uid", "observation_id", "measurement_start_time"),
table_index=(
"measurement_start_time",
"probe_cc",
"probe_asn",
"fqdn",
"observation_idx",
"measurement_uid",
),
)
@dataclass
class WebObservation:
Expand Down Expand Up @@ -260,15 +276,16 @@ class WebObservation:
# from the probe
dns_answer_asn: UInt32 = 0
dns_answer_as_org_name: str = ""
dns_t: float = 0
dns_t: Float64 = 0

# TCP related observation
tcp_failure: str = ""
tcp_success: bool = False
tcp_t: float = 0
tcp_t: Float64 = 0

# TLS related observation
tls_failure: str = ""
tls_success: bool = False

tls_server_name: str = ""
tls_version: str = ""
Expand All @@ -291,8 +308,8 @@ class WebObservation:
tls_handshake_read_bytes: UInt32 = 0
tls_handshake_write_bytes: UInt32 = 0
tls_handshake_last_operation: str = ""
tls_handshake_time: float = 0
tls_t: float = 0
tls_handshake_time: Float64 = 0
tls_t: Float64 = 0

# HTTP related observation
http_request_url: str = ""
Expand All @@ -305,7 +322,7 @@ class WebObservation:
http_request_body_length: UInt32 = 0
http_request_method: str = ""

http_runtime: float = 0
http_runtime: Float64 = 0

http_response_body_length: UInt32 = 0
http_response_body_is_truncated: bool = False
Expand All @@ -316,7 +333,7 @@ class WebObservation:
http_response_header_server: str = ""
http_request_redirect_from: str = ""
http_request_body_is_truncated: bool = False
http_t: float = 0
http_t: Float64 = 0

# probe level analysis
probe_analysis: str = ""
Expand Down Expand Up @@ -344,17 +361,16 @@ class WebObservation:

@table_model(
table_name="obs_http_middlebox",
table_index=("measurement_uid", "measurement_start_time"),
table_index=("measurement_uid", "observation_idx", "measurement_start_time"),
)
@dataclass
class HTTPMiddleboxObservation:
measurement_meta: MeasurementMeta
probe_meta: ProbeMeta
processing_meta: ProcessingMeta

observation_idx: UInt16 = 0

created_at: OptionalDatetime = None

# Set the payload returned by the HTTP Invalid Request Line test
hirl_sent_0: str = ""
hirl_sent_1: str = ""
Expand Down
14 changes: 7 additions & 7 deletions oonipipeline/src/oonipipeline/analysis/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class WebGroundTruth(NamedTuple):
vp_cc: str
is_trusted_vp: bool

hostname: str
fqdn: str
ip: Optional[str]
port: Optional[int]

Expand Down Expand Up @@ -66,7 +66,7 @@ def iter_ground_truths_from_web_control(
vp_cc="ZZ",
timestamp=obs.measurement_meta.measurement_start_time,
is_trusted_vp=True,
hostname=obs.fqdn,
fqdn=obs.fqdn,
ip=obs.ip,
ip_asn=ip_asn,
ip_as_org_name=ip_as_org_name,
Expand Down Expand Up @@ -95,7 +95,7 @@ def iter_web_ground_truths(
end_day = (measurement_day + timedelta(days=1)).strftime("%Y-%m-%d")
column_names = [
"timestamp",
"hostname",
"fqdn",
"ip",
"port",
"dns_failure",
Expand All @@ -112,7 +112,7 @@ def iter_web_ground_truths(
q = """
SELECT (
toStartOfDay(measurement_start_time) as timestamp,
hostname,
fqdn,
ip,
port,
dns_failure,
Expand Down Expand Up @@ -171,7 +171,7 @@ class WebGroundTruthDB:
"""

_indexes = (
("hostname_idx", "hostname, vp_asn, vp_cc"),
("fqdn_idx", "fqdn, vp_asn, vp_cc"),
("ip_port_idx", "ip, port, vp_asn, vp_cc"),
("http_request_url_idx", "http_request_url, vp_asn, vp_cc"),
)
Expand Down Expand Up @@ -230,7 +230,7 @@ def create_query(self):
timestamp TEXT,
hostname TEXT,
fqdn TEXT,
ip TEXT,
ip_asn INT,
ip_as_org_name TEXT,
Expand Down Expand Up @@ -295,7 +295,7 @@ def select_query(
# to DNS resolutions, so we only get DNS failure or DNS success
# rows
[
" hostname = ? AND (dns_success = 1 OR dns_failure IS NOT NULL) "
" fqdn = ? AND (dns_success = 1 OR dns_failure IS NOT NULL) "
for _ in range(len(hostnames))
]
)
Expand Down
20 changes: 8 additions & 12 deletions oonipipeline/src/oonipipeline/analysis/web_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,7 @@ def make_tcp_analysis(
ground_truth_trusted_ok_count=None,
)

assert (
web_o.tcp_failure is not None
), "inconsistency between tcp_success and tcp_failure"
assert web_o.tcp_failure != "", "inconsistency between tcp_success and tcp_failure"

ground_truths = filter(
lambda gt: gt.ip == web_o.ip and gt.port == web_o.port, web_ground_truths
Expand Down Expand Up @@ -202,15 +200,15 @@ def make_dns_ground_truth(ground_truths: Iterable[WebGroundTruth]):
failure_count = 0
nxdomain_count = 0
for gt in ground_truths:
if gt.dns_success is None and gt.dns_failure is None:
if not gt.dns_success and not gt.dns_failure:
continue

if gt.dns_failure == "dns_nxdomain_error":
nxdomain_count += gt.count
nxdomain_cc_asn.add((gt.vp_cc, gt.vp_asn))
continue

if gt.dns_failure is not None:
if gt.dns_failure != "":
failure_count += gt.count
failure_cc_asn.add((gt.vp_cc, gt.vp_asn))
continue
Expand Down Expand Up @@ -308,7 +306,7 @@ def check_dns_consistency(
ground_truth_as_org_names.add(gt.ip_as_org_name.lower())

for web_o in dns_observations:
if web_o.dns_failure == None and web_o.dns_answer:
if web_o.dns_failure == "" and web_o.dns_answer:
consistency_results.success = True
consistency_results.answers.append(web_o.dns_answer)
consistency_results.answer_count += 1
Expand Down Expand Up @@ -396,7 +394,7 @@ def make_dns_analysis(
) -> DNSAnalysis:
dns_ground_truth = make_dns_ground_truth(
ground_truths=filter(
lambda gt: gt.hostname == hostname,
lambda gt: gt.fqdn == hostname,
web_ground_truths,
)
)
Expand Down Expand Up @@ -537,7 +535,7 @@ def make_http_analysis(
assert web_o.http_request_url

http_analysis = HTTPAnalysis(
success=web_o.http_failure == None,
success=web_o.http_failure == "",
failure=web_o.http_failure,
is_http_request_encrypted=web_o.http_request_url.startswith("https://"),
)
Expand Down Expand Up @@ -665,11 +663,11 @@ def make_web_analysis(
tcp_analysis = None
tls_analysis = None
http_analysis = None
if web_o.tcp_success is not None:
if web_o.tcp_success or web_o.tcp_failure:
tcp_analysis = make_tcp_analysis(
web_o=web_o, web_ground_truths=web_ground_truths
)
if web_o.tls_failure or web_o.tls_cipher_suite is not None:
if web_o.tls_failure or web_o.tls_cipher_suite != "":
tls_analysis = make_tls_analysis(
web_o=web_o, web_ground_truths=web_ground_truths
)
Expand All @@ -682,13 +680,11 @@ def make_web_analysis(
fingerprintdb=fingerprintdb,
)

created_at = datetime.now(timezone.utc).replace(tzinfo=None)
website_analysis = WebAnalysis(
measurement_meta=web_o.measurement_meta,
probe_meta=web_o.probe_meta,
processing_meta=ProcessingMeta(created_at=datetime.now(timezone.utc)),
observation_id=f"{web_o.measurement_meta.measurement_uid}_{web_o.observation_idx}",
created_at=created_at,
analysis_id=f"{web_o.measurement_meta.measurement_uid}_{idx}",
target_domain_name=domain_name,
target_detail=target_detail,
Expand Down
2 changes: 1 addition & 1 deletion oonipipeline/src/oonipipeline/db/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ def write_table_model_rows(
self.row_buffer[table_name] += row_list

if len(self.row_buffer[table_name]) >= self.write_batch_size:
log.debug(f"flushing table {table_name}")
self.flush(table_name)

def flush(self, table_name=None):
Expand All @@ -175,6 +174,7 @@ def flush(self, table_name=None):
self._flush_table(t_name)

def _flush_table(self, table_name):
log.debug(f"flushing table {table_name}")
if table_name in self.row_buffer and self.row_buffer[table_name]:
rows_to_flush = self.row_buffer[table_name]
if rows_to_flush:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def make_observations_for_file_entry(
obs_tuple = measurement_to_observations(
msmt=msmt,
netinfodb=netinfodb,
bucket_datetime=datetime.strptime("%Y-%m-%d", bucket_date),
bucket_datetime=datetime.strptime(bucket_date, "%Y-%m-%d"),
)
for obs_list in obs_tuple:
db.write_table_model_rows(obs_list, use_buffer_table=False)
Expand Down Expand Up @@ -264,7 +264,7 @@ def get_previous_range(params: GetPreviousRangeParams) -> List[PrevRange]:
get_prev_range(
db=db,
table_name=table_name,
bucket_date=params.bucket_date,
bucket_datetime=params.bucket_date,
test_name=params.test_name,
probe_cc=params.probe_cc,
),
Expand Down
Loading

0 comments on commit eb37cb2

Please sign in to comment.