From 8c2a11ded68d5febb48f2cacf3fbc545b4b96e51 Mon Sep 17 00:00:00 2001 From: Eduardo Cuducos Date: Sat, 2 Nov 2019 12:36:32 -0400 Subject: [PATCH 01/10] Add company dataset abstraction This commit adds an abstraction to build the company dataset. It includes: - Interface to download data from Brasil.IO (Receita Federal CNPJ dataset) - Interface to get CNAE (economic activity) description from IBGE website - Interface to get geo-coordinates from Open Street Maps's Nominatim API --- serenata_toolbox/companies/__init__.py | 0 serenata_toolbox/companies/cnae.py | 67 ++++++++ serenata_toolbox/companies/dataset.py | 162 ++++++++++++++++++ serenata_toolbox/companies/db.py | 117 +++++++++++++ serenata_toolbox/companies/google_drive.py | 70 ++++++++ .../companies/open_street_maps.py | 23 +++ setup.py | 8 +- 7 files changed, 445 insertions(+), 2 deletions(-) create mode 100644 serenata_toolbox/companies/__init__.py create mode 100644 serenata_toolbox/companies/cnae.py create mode 100644 serenata_toolbox/companies/dataset.py create mode 100644 serenata_toolbox/companies/db.py create mode 100644 serenata_toolbox/companies/google_drive.py create mode 100644 serenata_toolbox/companies/open_street_maps.py diff --git a/serenata_toolbox/companies/__init__.py b/serenata_toolbox/companies/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/serenata_toolbox/companies/cnae.py b/serenata_toolbox/companies/cnae.py new file mode 100644 index 0000000..27362aa --- /dev/null +++ b/serenata_toolbox/companies/cnae.py @@ -0,0 +1,67 @@ +import re +from tempfile import NamedTemporaryFile + +import requests +from openpyxl import load_workbook + +from serenata_toolbox import log + + +class Cnae: + """This database abstraction complements the CNPJ dataset with economic + activities (CNAE) description that comes from a separate file from the + Federal Revenue.""" + + CHUNK = 2 ** 12 + CNAE_DESCRIPTION_FILE = ( + "https://cnae.ibge.gov.br" + "/images/concla/documentacao/" + "CNAE_Subclasses_2_3_Estrutura_Detalhada.xlsx" + ) + + def __init__(self): + self._activities = dict() # cache + + @staticmethod + def parse_code(code): + if not code: + return + + cleaned = re.sub(r"\D", "", code) + try: + return int(cleaned) + except ValueError: + return + + def _load_activities(self): + log.info("Fetching CNAE descriptions…") + with NamedTemporaryFile(suffix=".xlsx") as tmp: + response = requests.get(self.CNAE_DESCRIPTION_FILE) + + with open(tmp.name, "wb") as fobj: + log.debug(f"Dowloading {response.url} to {tmp.name}…") + for chunk in response.iter_content(self.CHUNK): + if chunk: + fobj.write(chunk) + + wb = load_workbook(tmp.name) + for row in wb.active.rows: + code = self.parse_code(row[4].value) + description = row[5].value + if not all((code, description)): + continue + + self._activities[code] = description + + @property + def activities(self): + """Dictionary with the descriptions of the economic activity (CNAE) + not included in the Reveita Federal dataset.""" + if self._activities: + return self._activities + + self._load_activities() + return self._activities + + def __call__(self, code): + return self.activities.get(code) diff --git a/serenata_toolbox/companies/dataset.py b/serenata_toolbox/companies/dataset.py new file mode 100644 index 0000000..0002bd9 --- /dev/null +++ b/serenata_toolbox/companies/dataset.py @@ -0,0 +1,162 @@ +import asyncio +import json +import os +import re +from datetime import date, datetime + +import aiohttp +import numpy as np +import pandas as pd + +from serenata_toolbox import log +from serenata_toolbox.companies.db import Database + + +class Dataset: + TRANSLATION = { + "abertura": "opening", + "atividade_principal": "main_activity", + "atividades_secundarias": "secondary_activities", + "bairro": "neighborhood", + "capital_social": "share_capital", + "cep": "zip_code", + "cnae_fiscal": "main_activity_code", + "codigo_municipio": "city_code", + "codigo_natureza_juridica": "judicial_nature_code", + "complemento": "additional_address_details", + "data_exclusao_do_simples": "removed_from_simples_since", + "data_inicio_atividade": "opened_in", + "data_opcao_pelo_simples": "opted_for_comples_in", + "data_situacao_cadastral": "situation_date", + "data_situacao_especial": "special_situation_date", + "ddd_fax": "fax_area_code", + "ddd_telefone_1": "phone1_area_code", + "ddd_telefone_2": "phone2_area_code", + "descricao_tipo_logradouro": "address_type", + "efr": "responsible_federative_entity", + "identificador_matriz_filial": "hq_or_subsidiary_code", + "logradouro": "address", + "motivo_situacao_cadastral": "situation_reason", + "municipio": "city", + "natureza_juridica": "legal_entity", + "nome_cidade_exterior": "city_abroad_name", + "nome_fantasia": "trade_name", + "numero": "number", + "opcao_pelo_mei": "mei", + "opcao_pelo_simples": "simples", + "porte": "size", + "qualificacao_do_responsavel": "administrative_person_category", + "razao_social": "name", + "situacao_cadastral": "situation", + "situacao_especial": "special_situation", + "telefone": "phone", + "tipo": "type", + "uf": "state", + "ultima_atualizacao": "last_updated", + } + TRANSLATION_PARTNER = { + "identificador_de_socio": "id", + "nome_socio": "name", + "cnpj_cpf_do_socio": "cnpj_cpf", + "codigo_qualificacao_socio": "type", + "percentual_capital_social": "percent_shares", + "data_entrada_sociedade": "partner_since", + "cpf_representante_legal": "legal_representative_document", + "nome_representante_legal": "legal_representative_name", + "codigo_qualificacao_representante_legal": "legal_representative_code", + } + + def __init__(self, datasets, path="data", db=None, header="cnpj_cpf"): + """The `datasets` parameter expects a list of paths to datasets (CSV or + LZMA) containing the `header` column.""" + if not os.path.isdir(path): + os.mkdir(os.path.join(path)) + + self.path = path + self.header = header + self.db = Database(db) + self.datasets = (datasets,) if isinstance(datasets, str) else datasets + + self.last_count_at = datetime.now() + self.count = 0 + + @staticmethod + def is_cnpj(number): + return len(re.sub(r"\D", "", number)) == 14 + + @property + def documents(self): + numbers = [] + for dataset in self.datasets: + log.info(f"Reading {dataset}…") + df = pd.read_csv( + dataset, + dtype={self.header: np.str}, + encoding="utf-8", + low_memory=False, + usecols=(self.header,), + ) + log.info(f"Filtering unique CNPJs from {dataset}…") + numbers.extend(df[self.header].unique()) + + yield from (num for num in set(numbers) if self.is_cnpj(num)) + + def translate_dict_keys(self, obj, translations=None): + translations = translations or self.TRANSLATION + for pt, en in translations.items(): + obj[en] = obj.pop(pt, None) + return obj + + def serialize(self, company): + if company["partners"]: + company["partners"] = tuple( + self.translate_dict_keys(partner, self.TRANSLATION_PARTNER) + for partner in company["partners"] + ) + + to_json = ("partners", "secondary_activities") + for key in to_json: + company[key] = json.dumps(company[key]) + + return self.translate_dict_keys(company) + + async def companies(self): + companies = [] + semaphore = asyncio.Semaphore(2 ** 12) + + async with semaphore, aiohttp.ClientSession() as session: + for cnpj in self.documents: + company = await self.db.get_company(session, cnpj) + if not company: + continue + + company = self.serialize(company) + self.count += 1 + companies.append(company) + + if self.count % 100 == 0: + self.log_count() + + if self.count % 100 != 0: + self.log_count() + + return companies + + def log_count(self): + now = datetime.now() + delta = now - self.last_count_at + ratio = self.count / delta.total_seconds() + + msg = f"{self.count:,} companies fetched ({ratio:.2f} companies/s)" + log.info(msg) + + self.last_count_at = now + + def __call__(self): + timestamp = str(date.today()) + path = os.path.join(self.path, f"{timestamp}-companies.csv.gz") + + companies = asyncio.run(self.companies()) + df = pd.DataFrame(companies) + df.to_csv(path, index=False, compression="xz") + log.info("Comanies dataset saved to {path}!") diff --git a/serenata_toolbox/companies/db.py b/serenata_toolbox/companies/db.py new file mode 100644 index 0000000..9669a67 --- /dev/null +++ b/serenata_toolbox/companies/db.py @@ -0,0 +1,117 @@ +import os +import re +import sqlite3 +from contextlib import contextmanager +from functools import partial +from gzip import GzipFile +from tempfile import NamedTemporaryFile + +import requests +from async_lru import alru_cache +from openpyxl import load_workbook + +from serenata_toolbox import log +from serenata_toolbox.companies.cnae import Cnae +from serenata_toolbox.companies.google_drive import GoogleDriveFile +from serenata_toolbox.companies.open_street_maps import Nominatim + + +class Database: + """This database abstraction downloads the SQLite file from Brasil.IO and + offersa wrapper to make queries easier. Data is complemented with + economic activities (CNAE) description that comes from a separate file from + the Federal Revenue, and with geo-coordinates from Open Street Maps.""" + + def __init__(self, file_id=None): + """`file_id` is the Google Drive file ID for the SQLite version of the + database maintaned by Brasil.IO. If the `file_id` is an existing local + path, the database is initiated loading this local file instead.""" + self._to_close = [] # objects to close on self.close() method + self._cursor = None # cache + self.cnae = Cnae() + self.nominatim = Nominatim() + + if file_id and os.path.exists(file_id): + self.file = file_id + else: + google_drive = GoogleDriveFile(file_id) + self._to_close.append(self.google_drive.file) + self.file = google_drive.download() + + @property + def cursor(self): + if self._cursor: + return self._cursor + + conn = sqlite3.connect(self.file) + self._to_close.append(conn) + self._cursor = conn.cursor() + self.assure_indexes() + return self._cursor + + def assure_indexes(self): + log.debug("Creating database indexes (if needed)…") + tables = ("empresa", "socio", "cnae_secundaria") + for table in tables: + sql = f""" + CREATE INDEX IF NOT EXISTS idx_{table}_cnpj + ON {table} (cnpj); + """ + self.cursor.execute(sql) + + @staticmethod + def _row_to_dict(keys, row, include_cnpj=False): + obj = dict(zip(keys, row)) + if include_cnpj: + return obj + return {k: v for k, v in obj.items() if k != "cnpj"} + + def _get_by_cnpj(self, cnpj, table, unique=False): + cnpj = re.sub(r"\D", "", cnpj) + self.cursor.execute(f"SELECT * FROM {table} WHERE cnpj = '{cnpj}'") + result = self.cursor.fetchone() if unique else self.cursor.fetchall() + if not result: + return + + keys = tuple(obj[0] for obj in self.cursor.description) + if unique: + return self._row_to_dict(keys, result, include_cnpj=True) + + return tuple(self._row_to_dict(keys, row) for row in result) + + async def get_company(self, session, cnpj): + get = partial(self._get_by_cnpj, cnpj) + company = get("empresa", unique=True) + if not company: + return + + company["partners"] = get("socio") + company["secondary_activities"] = get("cnae_secundaria") + + # add secondary activities descriptions + if company["secondary_activities"]: + for activity in company["secondary_activities"]: + activity["code"] = activity.pop("cnae", None) + activity["name"] = self.cnae(activity["code"]) + + # add latitude/longitude + street = ( + company["numero"], + company["descricao_tipo_logradouro"], + company["logradouro"], + ) + cep = str(company["cep"]) + params = { + "street": " ".join(street), + "city": company["municipio"], + "state": company["uf"], + "postalcode": "-".join((cep[:5], cep[5:])), + } + log.debug(f"Getting {company['cnpj']} coordinates…") + coordinates = await self.nominatim.coordinates(session, **params) + company.update(coordinates) + return company + + def close(self): + for obj in self._to_close[::-1]: + obj.close() diff --git a/serenata_toolbox/companies/google_drive.py b/serenata_toolbox/companies/google_drive.py new file mode 100644 index 0000000..898b3cb --- /dev/null +++ b/serenata_toolbox/companies/google_drive.py @@ -0,0 +1,70 @@ +from contextlib import contextmanager +from gzip import GzipFile +from tempfile import NamedTemporaryFile + +import requests + +from serenata_toolbox import log + + +class GoogleDriveTokenNotFound(Exception): + pass + + +class GoogleDriveFile: + """This abstraction downloads the SQLite file from Brasil.IO""" + + CHUNK = 2 ** 12 + URL = "https://docs.google.com/uc?export=download" + DEFAULT_GOOGLE_DRIVE_FILE_ID = "19DU3bi_XycAPISWrMniMYbwoiLpAO3D4" + + def __init__(self, file_id=None): + """`file_id` is the Google Drive file ID for the SQLite version of the + database maintaned by Brasil.IO.""" + self.file_id = file_id or self.DEFAULT_GOOGLE_DRIVE_FILE_ID + self.file = NamedTemporaryFile(suffix=".sqlite3") + + def save(self, response, target): + log.debug(f"Dowloading {response.url} to {target}…") + with open(target, "wb") as fobj: + for chunk in response.iter_content(self.CHUNK): + if chunk: + fobj.write(chunk) + + @contextmanager + def token(self, session): + params = {"id": self.file_id} + log.debug(f"Requesting token for {self.file_id}…") + response = session.get(self.URL, params=params, stream=True) + token = None + for key, value in response.cookies.items(): + if key.startswith("download_warning"): + token = value + break + + if not token: + raise log.error(f"Cannot get the token {self.URL}") + + yield token + + def decompress(self, path): + log.info(f"Decompressing {path} to {self.file.name}…") + with GzipFile(path, mode="rb") as gzip: + with open(self.file.name, mode="wb") as fobj: + chunck = gzip.read(self.CHUNK) + while chunck: + fobj.write(chunck) + chunck = gzip.read(self.CHUNK) + + def download(self): + session = requests.Session() + with self.token(session) as token: + params = {"id": self.file_id, "confirm": token} + response = session.get(self.URL, params=params, stream=True) + + with NamedTemporaryFile(suffix=".gz") as tmp: + self.save(response, tmp.name) + self.decompress(tmp.name) + + log.info(f"Database file ready at {self.file.name}") + return self.file.name diff --git a/serenata_toolbox/companies/open_street_maps.py b/serenata_toolbox/companies/open_street_maps.py new file mode 100644 index 0000000..0da7455 --- /dev/null +++ b/serenata_toolbox/companies/open_street_maps.py @@ -0,0 +1,23 @@ +from async_lru import alru_cache + +from serenata_toolbox import log + + +class Nominatim: + """This abstraction wraps Open Street Maps's Nominatim API""" + + URL = "https://nominatim.openstreetmap.org/search/" + + @alru_cache(maxsize=2 ** 16) + async def coordinates(self, session, **params): + """Expected Nominatim params: street, city, state and postal""" + params.update({"country": "Brazil", "format": "json"}) + async with session.get(self.URL, params=params) as response: + log.debug(f"Getting coordinates for {response.url}…") + data = await response.json() + + if not data: + return {"latitude": None, "longitude": None} + + result, *_ = data + return {"latitude": result["lat"], "longitude": result["lon"]} diff --git a/setup.py b/setup.py index e6aaf4d..fcedb3a 100644 --- a/setup.py +++ b/setup.py @@ -19,11 +19,14 @@ zip_safe=False, install_requires=[ 'aiofiles', + 'async_lru', 'aiohttp', 'beautifulsoup4>=4.4', 'lxml>=3.6', + 'openpyxl', 'pandas>=0.18', 'python-decouple>=3.1', + 'requests', 'tqdm' ], keywords='serenata de amor, data science, brazil, corruption', @@ -32,9 +35,10 @@ name='serenata-toolbox', packages=[ 'serenata_toolbox', - 'serenata_toolbox.federal_senate', 'serenata_toolbox.chamber_of_deputies', - 'serenata_toolbox.datasets' + 'serenata_toolbox.companies', + 'serenata_toolbox.datasets', + 'serenata_toolbox.federal_senate', ], scripts=['serenata_toolbox/serenata-toolbox'], url=REPO_URL, From 8c268dea5549d4274350a35f36303717156a0d3f Mon Sep 17 00:00:00 2001 From: Eduardo Cuducos Date: Sun, 17 Nov 2019 09:31:53 -0500 Subject: [PATCH 02/10] Minor edits --- serenata_toolbox/companies/dataset.py | 8 +++++--- setup.py | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/serenata_toolbox/companies/dataset.py b/serenata_toolbox/companies/dataset.py index 0002bd9..97abdd3 100644 --- a/serenata_toolbox/companies/dataset.py +++ b/serenata_toolbox/companies/dataset.py @@ -86,7 +86,7 @@ def is_cnpj(number): @property def documents(self): - numbers = [] + numbers = set() for dataset in self.datasets: log.info(f"Reading {dataset}…") df = pd.read_csv( @@ -97,9 +97,11 @@ def documents(self): usecols=(self.header,), ) log.info(f"Filtering unique CNPJs from {dataset}…") - numbers.extend(df[self.header].unique()) + for number in df[self.header].unique(): + if self.is_cnpj(number): + numbers.add(number) - yield from (num for num in set(numbers) if self.is_cnpj(num)) + yield from numbers def translate_dict_keys(self, obj, translations=None): translations = translations or self.TRANSLATION diff --git a/setup.py b/setup.py index fcedb3a..99dbbf9 100644 --- a/setup.py +++ b/setup.py @@ -19,8 +19,8 @@ zip_safe=False, install_requires=[ 'aiofiles', - 'async_lru', 'aiohttp', + 'async_lru', 'beautifulsoup4>=4.4', 'lxml>=3.6', 'openpyxl', From a90b47c4381130fd4e201f2546fb81c1c358fa69 Mon Sep 17 00:00:00 2001 From: Eduardo Cuducos Date: Sun, 17 Nov 2019 10:03:23 -0500 Subject: [PATCH 03/10] Clean-up and update README --- README.rst | 66 +++++++++++++++++++++++++++--------------------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/README.rst b/README.rst index 2362393..13f0d3a 100644 --- a/README.rst +++ b/README.rst @@ -2,14 +2,6 @@ :target: https://travis-ci.org/okfn-brasil/serenata-toolbox :alt: Travis CI build status (Linux) -.. image:: https://readthedocs.org/projects/serenata-toolbox/badge/?version=latest - :target: http://serenata-toolbox.readthedocs.io/en/latest/?badge=latest - :alt: Documentation Status - -.. image:: https://landscape.io/github/okfn-brasil/serenata-toolbox/master/landscape.svg?style=flat - :target: https://landscape.io/github/okfn-brasil/serenata-toolbox/master - :alt: Code Health - .. image:: https://coveralls.io/repos/github/okfn-brasil/serenata-toolbox/badge.svg?branch=master :target: https://coveralls.io/github/okfn-brasil/serenata-toolbox?branch=master :alt: Coveralls @@ -17,6 +9,9 @@ .. image:: https://badge.fury.io/py/serenata-toolbox.svg :alt: PyPI package version +.. image:: https://img.shields.io/pypi/pyversions/serenata_toolbox + :alt: PyPI - Python Version + .. image:: https://img.shields.io/badge/donate-apoia.se-EB4A3B.svg :target: https://apoia.se/serenata :alt: Donation Page @@ -24,10 +19,7 @@ Serenata de Amor Toolbox ======================== -`pip `_ installable package to support `Serenata de Amor `_ -and `Rosie `_ development. - -Serenata_toolbox is compatible with Python 3.6+ +Python package to support `Serenata de Amor `_ development. ``serenata_toolbox`` is compatible with Python 3.6+. Installation ------------ @@ -36,39 +28,48 @@ Installation $ pip install -U serenata-toolbox -If you are a regular user you are ready to get started after `pip install`. - -If you are a core developer willing to upload datasets to the cloud you need to configure `AMAZON_ACCESS_KEY` and `AMAZON_SECRET_KEY` environment variables before running the toolbox. - Usage ----- -We have `plenty of them `_ ready for you to download from our servers. And this toolbox helps you get them. Here some examples: +This toolbox helps you get datasets used in `Serenata de Amor services `_ and `notebooks `_. + +Example 1: Using the CLI +^^^^^^^^^^^^^^^^^^^^^^^^ -Example 1: Using the command line wrapper -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Without any arguments, it will download our pre-processed datasets and store into ``data`` folder: .. code-block:: bash - # without any arguments will download our pre-processed datasets and store into data/ folder $ serenata-toolbox - # will download these specific datasets and store into /tmp/serenata-data folder +But you can specify which datasets to download and where to save them. For example, to download ``chamber_of_deputies`` and ``federal_senate`` datasets to ``/tmp/serenata-data``: + +.. code-block:: bash + $ serenata-toolbox /tmp/serenata-data --module federal_senate chamber_of_deputies - # you can specify a dataset and a year +Yet, you can specify a specific year: + +.. code-block:: bash + $ serenata-toolbox --module chamber_of_deputies --year 2009 - # or specify all options simultaneously +Or use it all together: + +.. code-block:: bash + $ serenata-toolbox /tmp/serenata-data --module federal_senate --year 2017 - # getting help +Finally, you might want to get help: + +.. code-block:: bash + $ serenata-toolbox --help -Example 2: How do I download the datasets? -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Example 2: Using Python +^^^^^^^^^^^^^^^^^^^^^^^ -Another option is creating your own Python script: +Another option is creating your own Python scripts: .. code-block:: python @@ -100,7 +101,7 @@ If the last example doesn't look that simple, there are some fancy shortcuts ava Example 4: Generating datasets ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -If you ever wonder how did we generated these datasets, this toolbox can help you too (at least with the more used ones — the other ones are generated `in our main repo `_): +If you ever wonder how did we generated these datasets, this toolbox can help you too (at least with the most used used ones — the other ones are generated `in our main repo `_): .. code-block:: python @@ -128,8 +129,7 @@ The `full documentation `_ is still a w Contributing ------------ -Firstly, you should create a development environment with Python's `venv `_ module to isolate your development. -Then clone the repository and build the package by running: +Firstly, you should create a development environment with Python's `venv `_ module to isolate your development. Then clone the repository and build the package by running: .. code-block:: bash @@ -137,21 +137,21 @@ Then clone the repository and build the package by running: $ cd serenata-toolbox $ python setup.py develop -Always add tests to your contribution — if you want to test it locally before opening the PR: +Always add tests to your contribution — if you want to test it locally before opening the PR: .. code-block:: bash $ pip install tox $ tox -When the tests are passing, also check for coverage of the modules you edited or added — if you want to check it before opening the PR: +When the tests are passing, also check for coverage of the modules you edited or added — if you want to check it before opening the PR: .. code-block:: bash $ tox $ open htmlcov/index.html -Follow `PEP8 `_ and best practices implemented by `Landscape `_ in the `veryhigh` strictness level — if you want to check them locally before opening the PR: +Follow `PEP8 `_ and its best practices implemented by `Landscape `_ in the `veryhigh` strictness level — if you want to check them locally before opening the PR: .. code-block:: bash From 3e2e3789c780660976ec3632295a8ffc191e57ab Mon Sep 17 00:00:00 2001 From: Eduardo Cuducos Date: Sun, 17 Nov 2019 11:11:53 -0500 Subject: [PATCH 04/10] Simplify companies API using data directory --- .gitignore | 1 + README.rst | 6 ++- serenata_toolbox/companies/dataset.py | 56 ++++++++++++++-------- serenata_toolbox/companies/db.py | 45 +++++++++-------- serenata_toolbox/companies/google_drive.py | 30 ++++-------- 5 files changed, 76 insertions(+), 62 deletions(-) diff --git a/.gitignore b/.gitignore index 559d677..6235ef2 100644 --- a/.gitignore +++ b/.gitignore @@ -6,5 +6,6 @@ .tox/ __pycache__/ build/ +data/ dist/ htmlcov/ diff --git a/README.rst b/README.rst index 13f0d3a..955b389 100644 --- a/README.rst +++ b/README.rst @@ -105,8 +105,9 @@ If you ever wonder how did we generated these datasets, this toolbox can help yo .. code-block:: python - from serenata_toolbox.federal_senate.dataset import Dataset as SenateDataset from serenata_toolbox.chamber_of_deputies.reimbursements import Reimbursements as ChamberDataset + from serenata_toolbox.companies.dataset import Dataset as CompaniesDataset + from serenata_toolbox.federal_senate.dataset import Dataset as SenateDataset chamber = ChamberDataset('2018', 'data/') chamber() @@ -116,6 +117,9 @@ If you ever wonder how did we generated these datasets, this toolbox can help yo senate.translate() senate.clean() + companies = CompaniesDataset('data/') + companies() + Documentation (WIP) ------------------- diff --git a/serenata_toolbox/companies/dataset.py b/serenata_toolbox/companies/dataset.py index 97abdd3..8174fb4 100644 --- a/serenata_toolbox/companies/dataset.py +++ b/serenata_toolbox/companies/dataset.py @@ -1,8 +1,8 @@ import asyncio import json -import os import re from datetime import date, datetime +from pathlib import Path import aiohttp import numpy as np @@ -66,19 +66,35 @@ class Dataset: "codigo_qualificacao_representante_legal": "legal_representative_code", } - def __init__(self, datasets, path="data", db=None, header="cnpj_cpf"): + def __init__(self, path="data", header="cnpj_cpf"): """The `datasets` parameter expects a list of paths to datasets (CSV or LZMA) containing the `header` column.""" - if not os.path.isdir(path): - os.mkdir(os.path.join(path)) + self.path = Path(path) + if not self.path.exists(): + self.path.mkdir() - self.path = path + self.output = self.path / f"{date.today()}-companies.csv.xz" self.header = header - self.db = Database(db) - self.datasets = (datasets,) if isinstance(datasets, str) else datasets + self.db = Database(path) self.last_count_at = datetime.now() self.count = 0 + self._datasets = None # cache + + @property + def datasets(self): + if self._datasets: + return self._datasets + + data = Path(self.path) + extensions = ("csv", "xz") + self._datasets = tuple( + str(dataset.resolve()) + for extension in extensions + for dataset in data.glob(f"*.{extension}") + if not dataset.name.endswith("-companies.csv.xz") + ) + return self._datasets @staticmethod def is_cnpj(number): @@ -89,13 +105,18 @@ def documents(self): numbers = set() for dataset in self.datasets: log.info(f"Reading {dataset}…") - df = pd.read_csv( - dataset, - dtype={self.header: np.str}, - encoding="utf-8", - low_memory=False, - usecols=(self.header,), - ) + try: + df = pd.read_csv( + dataset, + dtype={self.header: np.str}, + encoding="utf-8", + low_memory=False, + usecols=(self.header,), + ) + except ValueError: + log.info(f"Skipping {dataset} (no `{self.header}` column)") + continue + log.info(f"Filtering unique CNPJs from {dataset}…") for number in df[self.header].unique(): if self.is_cnpj(number): @@ -155,10 +176,7 @@ def log_count(self): self.last_count_at = now def __call__(self): - timestamp = str(date.today()) - path = os.path.join(self.path, f"{timestamp}-companies.csv.gz") - companies = asyncio.run(self.companies()) df = pd.DataFrame(companies) - df.to_csv(path, index=False, compression="xz") - log.info("Comanies dataset saved to {path}!") + df.to_csv(self.output, index=False, compression="xz") + log.info(f"Comanies dataset saved to {self.output}!") diff --git a/serenata_toolbox/companies/db.py b/serenata_toolbox/companies/db.py index 9669a67..33bc201 100644 --- a/serenata_toolbox/companies/db.py +++ b/serenata_toolbox/companies/db.py @@ -1,14 +1,8 @@ -import os import re import sqlite3 -from contextlib import contextmanager from functools import partial from gzip import GzipFile -from tempfile import NamedTemporaryFile - -import requests -from async_lru import alru_cache -from openpyxl import load_workbook +from pathlib import Path from serenata_toolbox import log from serenata_toolbox.companies.cnae import Cnae @@ -22,28 +16,39 @@ class Database: economic activities (CNAE) description that comes from a separate file from the Federal Revenue, and with geo-coordinates from Open Street Maps.""" - def __init__(self, file_id=None): - """`file_id` is the Google Drive file ID for the SQLite version of the - database maintaned by Brasil.IO. If the `file_id` is an existing local - path, the database is initiated loading this local file instead.""" - self._to_close = [] # objects to close on self.close() method - self._cursor = None # cache + DEFAULT_FILENAME = "socios-brasil.sqlite" + + def __init__(self, path="data"): + self.compressed = Path(path) / f"{self.DEFAULT_FILENAME}.gz" + self.decompressed = Path(path) / self.DEFAULT_FILENAME + + if not self.compressed.exists(): + GoogleDriveFile(self.compressed).download() + + if not self.decompressed.exists(): + self.decompress() + self.cnae = Cnae() self.nominatim = Nominatim() - if file_id and os.path.exists(file_id): - self.file = file_id - else: - google_drive = GoogleDriveFile(file_id) - self._to_close.append(self.google_drive.file) - self.file = google_drive.download() + self._to_close = [] # objects to close on self.close() + self._cursor = None # cache + + def decompress(self): + log.info(f"Decompressing {self.compressed} to {self.decompressed}…") + with GzipFile(self.compressed, mode="rb") as compressed: + with self.decompressed.open("wb") as decompressed: + chunck = compressed.read(self.CHUNK) + while chunck: + decompressed.write(chunck) + chunck = compressed.read(self.CHUNK) @property def cursor(self): if self._cursor: return self._cursor - conn = sqlite3.connect(self.file) + conn = sqlite3.connect(str(self.decompressed)) self._to_close.append(conn) self._cursor = conn.cursor() self.assure_indexes() diff --git a/serenata_toolbox/companies/google_drive.py b/serenata_toolbox/companies/google_drive.py index 898b3cb..6ba6813 100644 --- a/serenata_toolbox/companies/google_drive.py +++ b/serenata_toolbox/companies/google_drive.py @@ -1,6 +1,4 @@ from contextlib import contextmanager -from gzip import GzipFile -from tempfile import NamedTemporaryFile import requests @@ -18,15 +16,15 @@ class GoogleDriveFile: URL = "https://docs.google.com/uc?export=download" DEFAULT_GOOGLE_DRIVE_FILE_ID = "19DU3bi_XycAPISWrMniMYbwoiLpAO3D4" - def __init__(self, file_id=None): + def __init__(self, target, file_id=None): """`file_id` is the Google Drive file ID for the SQLite version of the database maintaned by Brasil.IO.""" self.file_id = file_id or self.DEFAULT_GOOGLE_DRIVE_FILE_ID - self.file = NamedTemporaryFile(suffix=".sqlite3") + self.target = target - def save(self, response, target): - log.debug(f"Dowloading {response.url} to {target}…") - with open(target, "wb") as fobj: + def save(self, response): + log.debug(f"Dowloading {response.url} to {self.target}…") + with self.target.open("wb") as fobj: for chunk in response.iter_content(self.CHUNK): if chunk: fobj.write(chunk) @@ -47,24 +45,12 @@ def token(self, session): yield token - def decompress(self, path): - log.info(f"Decompressing {path} to {self.file.name}…") - with GzipFile(path, mode="rb") as gzip: - with open(self.file.name, mode="wb") as fobj: - chunck = gzip.read(self.CHUNK) - while chunck: - fobj.write(chunck) - chunck = gzip.read(self.CHUNK) - def download(self): session = requests.Session() with self.token(session) as token: params = {"id": self.file_id, "confirm": token} response = session.get(self.URL, params=params, stream=True) + self.save(response) - with NamedTemporaryFile(suffix=".gz") as tmp: - self.save(response, tmp.name) - self.decompress(tmp.name) - - log.info(f"Database file ready at {self.file.name}") - return self.file.name + log.info(f"Database file ready at {self.target}") + return self.target From ea6a25c042573d80e6e517cba0192c1955446c11 Mon Sep 17 00:00:00 2001 From: Eduardo Cuducos Date: Sun, 17 Nov 2019 11:18:56 -0500 Subject: [PATCH 05/10] Clean-up Landscape (free) is down for ages, so let's drop it --- .landscape.yml | 6 ------ 1 file changed, 6 deletions(-) delete mode 100644 .landscape.yml diff --git a/.landscape.yml b/.landscape.yml deleted file mode 100644 index 956e640..0000000 --- a/.landscape.yml +++ /dev/null @@ -1,6 +0,0 @@ -python-targets: 3 -max-line-length: 120 -strictness: veryhigh -doc-warnings: true -ignore-paths: - - docs From 6fe22c0bba953f14a826243f3e954776c7fbc148 Mon Sep 17 00:00:00 2001 From: Eduardo Cuducos Date: Sun, 17 Nov 2019 11:20:24 -0500 Subject: [PATCH 06/10] Add companies module to the CLI --- README.rst | 4 +++- serenata_toolbox/serenata-toolbox | 7 +++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 955b389..9824402 100644 --- a/README.rst +++ b/README.rst @@ -48,6 +48,8 @@ But you can specify which datasets to download and where to save them. For examp $ serenata-toolbox /tmp/serenata-data --module federal_senate chamber_of_deputies +Available modules are ``chamber_of_deputies``, ``companies`` and ``federal_senate``. + Yet, you can specify a specific year: .. code-block:: bash @@ -58,7 +60,7 @@ Or use it all together: .. code-block:: bash - $ serenata-toolbox /tmp/serenata-data --module federal_senate --year 2017 + $ serenata-toolbox /tmp/serenata-data --module federal_senate companies --year 2017 Finally, you might want to get help: diff --git a/serenata_toolbox/serenata-toolbox b/serenata_toolbox/serenata-toolbox index c6b9049..b2d382b 100755 --- a/serenata_toolbox/serenata-toolbox +++ b/serenata_toolbox/serenata-toolbox @@ -6,6 +6,7 @@ from argparse import ArgumentParser from serenata_toolbox import log from serenata_toolbox.chamber_of_deputies.reimbursements import Reimbursements as ChamberDataset +from serenata_toolbox.companies.dataset import Dataset as CompaniesDataset from serenata_toolbox.datasets import Datasets from serenata_toolbox.federal_senate.dataset import Dataset as SenateDataset @@ -26,6 +27,7 @@ def main(): datasets = Datasets(path) datasets.downloader.download(datasets.downloader.LATEST) return + if 'chamber_of_deputies' in args.module: if args.year: for year in args.year: @@ -33,6 +35,7 @@ def main(): else: chamber = ChamberDataset(path=path) chamber() + if 'federal_senate' in args.module: if args.year: for year in args.year: @@ -43,6 +46,10 @@ def main(): senate.translate() senate.clean() + if 'companies' in args.module: + companies = CompaniesDataset(path) + companies() + if __name__ == '__main__': main() From a13471db638c5589f1fce027f3c4626fe6e44908 Mon Sep 17 00:00:00 2001 From: Eduardo Cuducos Date: Sun, 17 Nov 2019 11:42:26 -0500 Subject: [PATCH 07/10] Add progress bar to companies --- serenata_toolbox/companies/dataset.py | 31 +++++++-------------------- 1 file changed, 8 insertions(+), 23 deletions(-) diff --git a/serenata_toolbox/companies/dataset.py b/serenata_toolbox/companies/dataset.py index 8174fb4..7209da0 100644 --- a/serenata_toolbox/companies/dataset.py +++ b/serenata_toolbox/companies/dataset.py @@ -7,6 +7,7 @@ import aiohttp import numpy as np import pandas as pd +from tqdm import tqdm from serenata_toolbox import log from serenata_toolbox.companies.db import Database @@ -76,10 +77,7 @@ def __init__(self, path="data", header="cnpj_cpf"): self.output = self.path / f"{date.today()}-companies.csv.xz" self.header = header self.db = Database(path) - - self.last_count_at = datetime.now() - self.count = 0 - self._datasets = None # cache + self._datasets, self._documents = None, None # cache @property def datasets(self): @@ -102,6 +100,9 @@ def is_cnpj(number): @property def documents(self): + if self._documents: + return self._documents + numbers = set() for dataset in self.datasets: log.info(f"Reading {dataset}…") @@ -122,7 +123,8 @@ def documents(self): if self.is_cnpj(number): numbers.add(number) - yield from numbers + self._documents = tuple(numbers) + return self._documents def translate_dict_keys(self, obj, translations=None): translations = translations or self.TRANSLATION @@ -148,33 +150,16 @@ async def companies(self): semaphore = asyncio.Semaphore(2 ** 12) async with semaphore, aiohttp.ClientSession() as session: - for cnpj in self.documents: + for cnpj in tqdm(self.documents, unit="companies"): company = await self.db.get_company(session, cnpj) if not company: continue company = self.serialize(company) - self.count += 1 companies.append(company) - if self.count % 100 == 0: - self.log_count() - - if self.count % 100 != 0: - self.log_count() - return companies - def log_count(self): - now = datetime.now() - delta = now - self.last_count_at - ratio = self.count / delta.total_seconds() - - msg = f"{self.count:,} companies fetched ({ratio:.2f} companies/s)" - log.info(msg) - - self.last_count_at = now - def __call__(self): companies = asyncio.run(self.companies()) df = pd.DataFrame(companies) From 0dc1dc9b1a3b0ed5deed112494dacb51baa6d8ba Mon Sep 17 00:00:00 2001 From: Eduardo Cuducos Date: Sun, 17 Nov 2019 12:39:54 -0500 Subject: [PATCH 08/10] Enhance command command output --- serenata_toolbox/companies/cnae.py | 4 ++-- serenata_toolbox/companies/dataset.py | 13 +++++-------- serenata_toolbox/companies/db.py | 2 ++ serenata_toolbox/companies/google_drive.py | 3 +-- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/serenata_toolbox/companies/cnae.py b/serenata_toolbox/companies/cnae.py index 27362aa..d7bfede 100644 --- a/serenata_toolbox/companies/cnae.py +++ b/serenata_toolbox/companies/cnae.py @@ -33,7 +33,7 @@ def parse_code(code): except ValueError: return - def _load_activities(self): + def load_activities(self): log.info("Fetching CNAE descriptions…") with NamedTemporaryFile(suffix=".xlsx") as tmp: response = requests.get(self.CNAE_DESCRIPTION_FILE) @@ -60,7 +60,7 @@ def activities(self): if self._activities: return self._activities - self._load_activities() + self.load_activities() return self._activities def __call__(self, code): diff --git a/serenata_toolbox/companies/dataset.py b/serenata_toolbox/companies/dataset.py index 7209da0..79ed5bc 100644 --- a/serenata_toolbox/companies/dataset.py +++ b/serenata_toolbox/companies/dataset.py @@ -94,10 +94,6 @@ def datasets(self): ) return self._datasets - @staticmethod - def is_cnpj(number): - return len(re.sub(r"\D", "", number)) == 14 - @property def documents(self): if self._documents: @@ -119,11 +115,12 @@ def documents(self): continue log.info(f"Filtering unique CNPJs from {dataset}…") - for number in df[self.header].unique(): - if self.is_cnpj(number): - numbers.add(number) + for num in df[self.header].unique(): + if isinstance(num, str) and len(re.sub(r"\D", "", num)) == 14: + numbers.add(num) - self._documents = tuple(numbers) + log.info(f"Consolidating {len(numbers):,} different CNPJ numbers…") + self._documents = tuple(numbers) # tuple is way smaller than set return self._documents def translate_dict_keys(self, obj, translations=None): diff --git a/serenata_toolbox/companies/db.py b/serenata_toolbox/companies/db.py index 33bc201..193c404 100644 --- a/serenata_toolbox/companies/db.py +++ b/serenata_toolbox/companies/db.py @@ -17,6 +17,7 @@ class Database: the Federal Revenue, and with geo-coordinates from Open Street Maps.""" DEFAULT_FILENAME = "socios-brasil.sqlite" + CHUNK = 2 ** 12 def __init__(self, path="data"): self.compressed = Path(path) / f"{self.DEFAULT_FILENAME}.gz" @@ -29,6 +30,7 @@ def __init__(self, path="data"): self.decompress() self.cnae = Cnae() + self.cnae.load_activities() self.nominatim = Nominatim() self._to_close = [] # objects to close on self.close() diff --git a/serenata_toolbox/companies/google_drive.py b/serenata_toolbox/companies/google_drive.py index 6ba6813..b4f1664 100644 --- a/serenata_toolbox/companies/google_drive.py +++ b/serenata_toolbox/companies/google_drive.py @@ -23,7 +23,7 @@ def __init__(self, target, file_id=None): self.target = target def save(self, response): - log.debug(f"Dowloading {response.url} to {self.target}…") + log.info(f"Dowloading {response.url} to {self.target}…") with self.target.open("wb") as fobj: for chunk in response.iter_content(self.CHUNK): if chunk: @@ -52,5 +52,4 @@ def download(self): response = session.get(self.URL, params=params, stream=True) self.save(response) - log.info(f"Database file ready at {self.target}") return self.target From 2b3061758882adbc14ee9fdff436009ef1b6611d Mon Sep 17 00:00:00 2001 From: Eduardo Cuducos Date: Mon, 18 Nov 2019 06:39:42 -0500 Subject: [PATCH 09/10] Handles OSM Nominatim HTTP error --- serenata_toolbox/companies/open_street_maps.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/serenata_toolbox/companies/open_street_maps.py b/serenata_toolbox/companies/open_street_maps.py index 0da7455..e6c7de7 100644 --- a/serenata_toolbox/companies/open_street_maps.py +++ b/serenata_toolbox/companies/open_street_maps.py @@ -1,5 +1,7 @@ from async_lru import alru_cache +from aiohttp.client_exceptions import ContentTypeError + from serenata_toolbox import log @@ -11,13 +13,17 @@ class Nominatim: @alru_cache(maxsize=2 ** 16) async def coordinates(self, session, **params): """Expected Nominatim params: street, city, state and postal""" + empty_result = {"latitude": None, "longitude": None} params.update({"country": "Brazil", "format": "json"}) async with session.get(self.URL, params=params) as response: log.debug(f"Getting coordinates for {response.url}…") - data = await response.json() + try: + data = await response.json() + except ContentTypeError: + return empty_result if not data: - return {"latitude": None, "longitude": None} + return empty_result result, *_ = data return {"latitude": result["lat"], "longitude": result["lon"]} From 9f4394b30ca70abc3cf46261ebb19fa2ae949df3 Mon Sep 17 00:00:00 2001 From: Eduardo Cuducos Date: Wed, 15 Jan 2020 12:04:33 -0500 Subject: [PATCH 10/10] Typos --- README.rst | 2 +- serenata_toolbox/companies/dataset.py | 2 +- serenata_toolbox/companies/db.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.rst b/README.rst index 9824402..eee50d9 100644 --- a/README.rst +++ b/README.rst @@ -103,7 +103,7 @@ If the last example doesn't look that simple, there are some fancy shortcuts ava Example 4: Generating datasets ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -If you ever wonder how did we generated these datasets, this toolbox can help you too (at least with the most used used ones — the other ones are generated `in our main repo `_): +If you ever wonder how did we generate these datasets, this toolbox can help you too (at least with the most used ones — the other ones are generated `in our main repo `_): .. code-block:: python diff --git a/serenata_toolbox/companies/dataset.py b/serenata_toolbox/companies/dataset.py index 79ed5bc..92b9691 100644 --- a/serenata_toolbox/companies/dataset.py +++ b/serenata_toolbox/companies/dataset.py @@ -68,7 +68,7 @@ class Dataset: } def __init__(self, path="data", header="cnpj_cpf"): - """The `datasets` parameter expects a list of paths to datasets (CSV or + """The `path` parameter expects directory with datasets (CSV or LZMA) containing the `header` column.""" self.path = Path(path) if not self.path.exists(): diff --git a/serenata_toolbox/companies/db.py b/serenata_toolbox/companies/db.py index 193c404..4df63cb 100644 --- a/serenata_toolbox/companies/db.py +++ b/serenata_toolbox/companies/db.py @@ -12,7 +12,7 @@ class Database: """This database abstraction downloads the SQLite file from Brasil.IO and - offersa wrapper to make queries easier. Data is complemented with + offers a wrapper to make queries easier. Data is complemented with economic activities (CNAE) description that comes from a separate file from the Federal Revenue, and with geo-coordinates from Open Street Maps."""