Skip to content

Commit

Permalink
Very Wide Observation Rows + Experiment generation (#17)
Browse files Browse the repository at this point in the history
As part of this PR I did a major rework of the observation generation
system.

tl;dr
* Observations are being generated at a rate of ~5k measurements per
second (i.e. it should be possible to reprocess the full dataset in less
than 3 days)
* Bodies from the measurements are being archived in WAR files (this is
currently what is slowing down the observation generation the most and
is what leads to the process stalling at the end with a huge queue of
bodies to archive). This needs some work to be optimised further.
* We are now able to generate Experiment results from the based
observations using ground truths at a rate of 15k measurements per
seconds (i.e. it should be possible to re-analyse the full OONI dataset
in less than a day)

If you care to read more details, see below:

## Very Wide Observation Rows

Each Web Connectivity measurements ends up producing observations that
are all of the same type and are written to the same DB table.

This has the benefit that we don't need to lookup the observations we
care about in several disparate tables, but can do it all in the same
one, which is incredibly fast.

A side effect is that we end up with tables are can be a bit sparse
(several columns are NULL), but this doesn't seem to present major
difficulties.

The biggest challenge in this approach is figuring out which
observations are related to each other so that they can be packed into
the same row. In order to do this I kept the original observation model
in place, which gave me guarantees that the data structures were
properly filled out, and then for each of them I tried to lookup the
relevant other ones.

Any observation that doesn't have a friend, just ends up on its own
database row all alone.

## WAR Body writer

I worked on separating the process of archiving bodies and finding
blocking fingerprints in it. Basically during the processing we create a
WAR file with inside it the raw bodies and write to a dedicated database
(or potentially the same in it's own table, but I need to find how to
get that to perform well).

We are then able to separately scan through all these WAR files hunting
for blockpage fingerprints, which is actually pretty fast. If we add new
blockpage fingerprints we can just re-scan the WAR files looking for
them and update the database column with what we found.

## Misc performance improvements

It turns out clickhouse is not too happy when you do many writes per
second to it. In their docs they state you shouldn't be making more than
1 request per second
(https://clickhouse.com/docs/en/about-us/performance/#performance-when-inserting-data).

I encountered this issue when I had optimised the processor to the point
that I was hitting this limit. The result is that the clickhouse process
starts consuming a bunch of CPU and memory and eventually just stops
dropping any connection attempt to it.

To overcome this the ClickhouseConnection database abstraction I added
the concept of a row buffer, which waits to become full with a certain
number of rows before flushing it to the clickhouse connection. This
worked surprisingly well and improved the overall performance of the
reprocessing task by 1 order of magnitude.

Quite a bit of additional changes were made to how multiprocessing is
done and small tweaks here and there based on iterations.

### Experiment result generation

I have added support in here for generating experiment results from the
Very Wide Observation Rows. Basically we process data in batches of 1
day. For each day we first generate a ground truth database which tells
us what we should expect to see by looking at all other web connectivity
control measurement, but in the future maybe from other measurements
too.
The process of generating the ground truths is actually pretty expensive
(it used to be the most expensive task) and takes about 80-90 seconds
for a given day.

We then need to efficiently lookup the ground truths that are related to
a specific measurement so that we can correlate them to what we are
seeing in the data.

In the beginning I went for the most naive solution of just putting it
all in a list and then doing a full scan of it for the relevant ground
truths. As the ground truths for a given day can be in the order of the
100s of thousands, this obviously turned out to be incredibly expensive.

I briefly experimented with creating some hash maps onto the data, so
that these lookups would be faster, but quickly realised I needed
multiple indexes and I was basically re-inventing a database. I
obviously could not use clickhouse for this purpose because doing many
per second there is not what it's made for.

I then realised that I actually already had a database right inside of
the standard library of python: SQLite!

So I quickly put together an in-memory groundtruth database to put all
the ground truths and then do the lookup.

This made things significantly faster.

Yet this was not enough, because when you are processing a measurement,
you don't actually care to look at all the ground truths for the full
day, but only those which are for that specific measurement. It's pretty
easy to figure out which is the subset of all ground truths you care
about, so I implemented a system that does some pre-filtering and
reduction of the ground truths for the full day into only those that are
related to a particular measurement.
Note: this part of the code was put together very quickly and is
currently a bit racy and not so nice to look at, so it needs some
refactoring (the goal was just to see if it would work at all).

After this last improvement, the performance went up by 1 order of
magnitude.

All in all I'm glad to see that it's starting to come together and it
offers the prospect of being a much more efficient and iterative way of
doing analysis on OONI data.

The current state of things that Experiment Result generation is
happening at a rate of 20k results per seconds that are mostly
bottlenecked by the database writes.

Some significant amount of work needs to happen on validating the data
outputs so that we can check if the analysis logic is good (I didn't
spend much time working on this after the big ground truth refactor, so
it likely has some bugs).

It's nice that the results are explainable and you can easily figure out
which part of the analysis code generated a particular outcome through
the blocking_meta key.
  • Loading branch information
hellais authored Dec 1, 2022
1 parent d8341c9 commit ad1a529
Show file tree
Hide file tree
Showing 36 changed files with 4,732 additions and 2,202 deletions.
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[run]
concurrency = multiprocessing,thread
parallel = True
17 changes: 14 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ${{ matrix.os }}-latest
strategy:
matrix:
os: [Ubuntu, MacOS]
os: [Ubuntu]
python-version: [3.7, 3.8, 3.9, "3.10"]
defaults:
run:
Expand Down Expand Up @@ -50,12 +50,23 @@ jobs:
path: tests/data/
key: tests-data-${{ hashFiles('tests/conftest.py') }}

- name: Install clickhouse
run: |
sudo apt-get install -y apt-transport-https ca-certificates dirmngr
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 8919F6BD2B48D754
echo "deb https://packages.clickhouse.com/deb stable main" | sudo tee \
/etc/apt/sources.list.d/clickhouse.list
sudo apt-get update
sudo apt-get install -y clickhouse-server clickhouse-client
- name: start clickhouse
run: sudo service clickhouse-server start

- name: Install dependencies
run: poetry install

- name: Run all tests
run: poetry run pytest --cov=./ --cov-report=xml -q tests
run: poetry run pytest --cov=./ --cov-report=xml --cov-report=term tests

- name: Upload coverage to codecov
uses: codecov/codecov-action@v3

3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
__pycache__
/.coverage*
/.coverage
/coverage.xml
/tests/data/datadir/*
/tests/data/raw_measurements/*
Expand All @@ -8,3 +8,4 @@ __pycache__
/datadir
/output
/attic
/prof
293 changes: 139 additions & 154 deletions Readme.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,68 @@
## OONI Data

## Using this repo
OONI Data is a collection of tooling for downloading, analyzing and interpreting
OONI network measurement data.

To get yourself started with using this repo, run the following:
Most users will likely be interested in using this as a CLI tool for downloading
measurements.

If that is your goal, getting started is easy, run:
```
poetry install
mkdir output/
poetry run python oonidata/processing.py --csv-dir output/ --geoip-dir ../historical-geoip/country-asn-databases --asn-map ../historical-geoip/as-orgs/all_as_org_map.json
pip install oonidata
```

You will then be able to download measurements via:
```
oonidata sync --probe-cc IT --start-day 2022-10-01 --end-day 2022-10-02 --output-dir measurements/
```

This will download all OONI measurements for Italy into the directory
`./measurements` that were uploaded between 2022-10-01 and 2022-10-02.

If you are interested in learning more about the design of the analysis tooling,
please read on.

## Developer setup

This project makes use of [poetry](https://python-poetry.org/) for dependency
management. Follow [their
instructions](https://python-poetry.org/docs/#installation) on how to set it up.

Once you have done that you should be able to run:
```
poetry install
poetry run python -m oonidata --help
```
## Architecture overview

This data pipeline works by dealing with the data in two different stages:
The analysis engine is made up of several components:
* Observation generation
* Verdict generation
* Response body archving
* Ground truth generation
* Experiment result generation

Below we explain each step of this process in detail

At a high level the pipeline looks like this:
```mermaid
graph
M{{Measurement}} --> OGEN[[make_observations]]
OGEN --> |many| O{{Observations}}
NDB[(NetInfoDB)] --> OGEN
OGEN --> RB{{ResponseBodies}}
RB --> BA[(BodyArchive)]
FDB[(FingerprintDB)] --> FPH
FPH --> BA
RB --> FPH[[fingerprint_hunter]]
O --> ODB[(ObservationTables)]
ODB --> MKGT[[make_ground_truths]]
MKGT --> GTDB[(GroundTruthDB)]
GTDB --> MKER
BA --> MKER
ODB --> MKER[[make_experiment_results]]
MKER --> |one| ER{{ExperimentResult}}
```

### Observation generation

Expand All @@ -32,170 +80,107 @@ that is to be determined when looking at data in aggregate and is the
responsibility of the Verdict generation stage.

During this stage we are also going to enrich observations with metadata about
IP addresses (using the IPInfoDB) and detecting known fingerprints of
blockpages or DNS responses using the FingerprintDB.
IP addresses (using the IPInfoDB).

The data flow of the observation generation pipeline looks as follows:
Each each measurement ends up producing observations that are all of the same
type and are written to the same DB table.

```mermaid
graph TD
IPInfoDB[(IPInfoDB)] --> MsmtProcessor
FingerprintDB[(FingerprintDB)] --> MsmtProcessor
Msmt[Raw Measurement] --> MsmtProcessor{{"measurement_processor()"}}
MsmtProcessor --> TCPObservations
MsmtProcessor --> DNSObservations
MsmtProcessor --> TLSObservations
MsmtProcessor --> HTTPObservations
```
This has the benefit that we don't need to lookup the observations we care about
in several disparate tables, but can do it all in the same one, which is
incredibly fast.

A side effect is that we end up with tables are can be a bit sparse (several
columns are NULL).

The `measurement_processor` stage can be run either in a streaming fashion as
measurements are uploaded to the collector or in batch mode by reprocessing
existing raw measurements.
The tricky part, in the case of complex tests like web_connectivity, is to
figure out which individual sub measurements fit into the same observation row.
For example we would like to have the TCP connect result to appear in the same
row as the DNS query that lead to it with the TLS handshake towards that IP,
port combination.

```mermaid
graph LR
P((Probe)) --> M{{Measurement}}
BE --> P
M --> PL[(Analysis)]
PL --> O{{Observations}}
O --> PL
PL --> BE{{ExperimentResult}}
BE --> E((Explorer))
O --> E
You can run the observation generation with a clickhouse backend like so:
```
poetry run python -m oonidata mkobs --clickhouse clickhouse://localhost/ --data-dir tests/data/datadir/ --start-day 2022-08-01 --end-day 2022-10-01 --create-tables --parallelism 20
```

### ExperimentResult generation
Here is the list of supported observations so far:
* [x] WebObservation, which has information about DNS, TCP, TLS and HTTP(s)
* [x] WebControlObservation, has the control measurements run by web connectivity (is used to generate ground truths)
* [ ] CircumventionToolObservation, still needs to be designed and implemented
(ideally we would use the same for OpenVPN, Psiphon, VanillaTor)

The data flow of the blocking event generation pipeline looks as follows:
```mermaid
classDiagram
direction RL
### Response body archving

ExperimentResult --* WebsiteExperimentResult
ExperimentResult --* WhatsAppExperimentResult
ExperimentResult : +String measurement_uid
ExperimentResult : +datetime timestamp
ExperimentResult : +int probe_asn
ExperimentResult : +String probe_cc
ExperimentResult : +String network_type
ExperimentResult : +struct resolver
ExperimentResult : +List[str] observation_ids
ExperimentResult : +List[BlockingEvent] blocking_events
ExperimentResult : +float ok_confidence
ExperimentResult : +bool anomaly
ExperimentResult : +bool confirmed
class WebsiteExperimentResult {
+String domain_name
+String website_name
}
It is optionally possible to also create WAR archives of HTTP response bodies
when running the observation generation.

class WhatsAppExperimentResult {
+float web_ok_confidence
+String web_blocking_detail
This is enabled by passing the extra command line argument `--archives-dir`.

+float registration_ok_confidence
+String registration_blocking_detail
Whenever a response body is detected in a measurement it is sent to the
archiving queue which takes the response body, looks up in the database if it
has seen it already (so we don't store exact duplicate bodies).
If we haven't archived it yet, we write the body to a WAR file and record it's
sha1 hash together with the filename where we wrote it to into a database.

+float endpoints_ok_confidence
+String endpoints_blocking_detail
}
These WAR archives can then be mined asynchronously for blockpages using the
fingerprint hunter command:
```
oonidata fphunt --data-dir tests/data/datadir/ --archives-dir warchives/ --parallelism 20
```

class BlockingEvent {
blocking_type: +BlockingType
blocking_subject: +String
blocking_detail: +String
blocking_meta: +json
confidence: +float
}
When a blockpage matching the fingerprint is detected, the relevant database row
for that fingerprint is updated with the ID of the fingerprint which was
detected.

class BlockingType {
<<enumeration>>
OK
BLOCKED
NATIONAL_BLOCK
ISP_BLOCK
LOCAL_BLOCK
SERVER_SIDE_BLOCK
DOWN
THROTTLING
}
```
### Ground Truth generation

```mermaid
graph
M{{Measurement}} --> OGEN[[observationGen]]
OGEN --> |many| O{{Observations}}
O --> CGEN[[controlGen]]
O --> ODB[(ObservationDB)]
ODB --> CGEN
CGEN --> |many| CTRL{{Controls}}
CTRL --> A[[Analysis]]
FDB[(FingerprintDB)] --> A
NDB[(NetInfoDB)] --> A
O --> A
A --> |one| ER{{ExperimentResult}}
ER --> |many| BE{{BlockingEvents}}
```
In order to establish if something is being blocked or not, we need some ground truth for comparison.

The goal of the ground truth generation task is to build a ground truth
database, which contains all the ground truths for every target that has been
tested in a particular day.

Currently it's implemented using the WebControlObservations, but in the future
we could just use other WebObservation.

Some precautions need to be taken when running the `verdict_generator()` in
batch compared to running it in streaming mode.
The challenge is that you don't want to have to regenerate baselines that often
because it's an expensive process.

Let us first discuss the usage of the Verdict generation in the context of a
batch workflow. When in batch mode, we will take all the Observations in the desired
`time_interval` and `target`. In practice what we would do is process the data
in daily batches and apply the `GROUP BY` clause to a particular target.
It is possible to parallelise these task across multiple cores (and possibly
even across multiple nodes).

A baseline is some ground truth information about the target on that given day,
we generate this once and then apply it to all the observations for that target
from every testing session to establish the outcome of the verdict.

It's reasonable to do this over a time window of a day, because that will mean
that the baseline will be pertaining to at most 24h from the observation.

The challenge is when you want to do something similar for data as it comes in.
The problem there is that if you use data from the last day, you will end up
with a delta from the observation that can be up to 48h, which might be to much.
OTOH if you use data from the current day, you may not have enough data.
Moreover, it means that the result of the `verdict_generator` in batch mode
will differ from a run in streaming, which can lead to inconsistent results.

I think we would like to have the property of having results be as close as
possible to the batch run, while in streaming mode, and have some way of getting
eventual consistency.

The proposed solution is to generate baselines for all the targets (which is a
small set and can even be kept in memory) on a rolling 1h basis. This way
verdicts can be calculated based on a baseline that will be from a delta of at
most 24h.

Once the day is finished, we can re-run the verdict generation using the batch
workflow and mark for deletion all the verdicts generated in streaming mode, leading
to an eventual consistency.

The possible outcomes for the verdict are:

* dns.blockpage
* dns.bogon
* dns.nxdomain
* dns.{failure}
* dns.inconsistent
* tls.mitm
* tls.{failure}
* http.{failure}
* https.{failure}
* http.blockpage
* http.bodydiff
* tcp.{failure}
Each ground truth database is actually just a sqlite3 database. For a given day
it's approximately 150MB in size and we load them in memory when we are running
the analysis workflow.

### ExperimentResult generation

An experiment result is the interpretation of one or more observations with a
determination of whether the target is `BLOCKED`, `DOWN` or `OK`.

For each of these states a confidence indicator is given which is an estimate of the
likelyhood of that result to be accurate.

For each of the 3 states, it's possible also specify a `blocking_detail`, which
gives more information as to why the block might be occurring.

It's important to note that for a given measurement, multiple experiment results
can be generated, because a target might be blocked in multiple ways or be OK in
some regards, but not in orders.

This is best explained through a concrete example. Let's say a censor is
blocking https://facebook.com/ with the following logic:
* any DNS query for facebook.com get's as answer "127.0.0.1"
* any TCP connect request to 157.240.231.35 gets a RST
* any TLS handshake with SNI facebook.com gets a RST

In this scenario, assuming the probe has discovered other IPs for facebook.com
through other means (ex. through the test helper or DoH as web_connectivity 0.5
does), we would like to emit the following experiment results:
* BLOCKED, `dns.bogon`, `facebook.com`
* BLOCKED, `tcp.rst`, `157.240.231.35:80`
* BLOCKED, `tcp.rst`, `157.240.231.35:443`
* OK, `tcp.ok`, `157.240.231.100:80`
* OK, `tcp.ok`, `157.240.231.100:443`
* BLOCKED, `tls.rst`, `157.240.231.35:443`
* BLOCKED, `tls.rst`, `157.240.231.100:443`

This way we are fully characterising the block in all the methods through which
it is implemented.

### Current pipeline

Expand Down
Loading

0 comments on commit ad1a529

Please sign in to comment.