From dcd43110e42e8f2c568a51c115fa4324a026d4c1 Mon Sep 17 00:00:00 2001 From: AaronHForgeFlow Date: Mon, 24 Feb 2025 13:50:43 +0100 Subject: [PATCH] [IMP] l10n_es_aeat: use certificate module --- l10n_es_aeat/README.rst | 6 + l10n_es_aeat/__manifest__.py | 3 +- .../migrations/18.0.1.0.0/post-migrate.py | 64 +++++++ .../migrations/18.0.1.0.0/pre-migrate.py | 50 ++++++ l10n_es_aeat/models/aeat_certificate.py | 54 +++--- l10n_es_aeat/models/aeat_mixin.py | 23 ++- l10n_es_aeat/readme/ROADMAP.md | 4 + l10n_es_aeat/security/ir.model.access.csv | 1 - l10n_es_aeat/static/description/index.html | 6 + .../tests/test_l10n_es_aeat_certificate.py | 169 +++++++----------- l10n_es_aeat/views/aeat_certificate_view.xml | 10 +- l10n_es_aeat/wizard/__init__.py | 1 - .../wizard/aeat_certificate_password.py | 120 ------------- .../wizard/aeat_certificate_password_view.xml | 24 --- 14 files changed, 231 insertions(+), 304 deletions(-) create mode 100644 l10n_es_aeat/migrations/18.0.1.0.0/post-migrate.py create mode 100644 l10n_es_aeat/migrations/18.0.1.0.0/pre-migrate.py delete mode 100644 l10n_es_aeat/wizard/aeat_certificate_password.py delete mode 100644 l10n_es_aeat/wizard/aeat_certificate_password_view.xml diff --git a/l10n_es_aeat/README.rst b/l10n_es_aeat/README.rst index c49da59de26..440c8acf0e3 100644 --- a/l10n_es_aeat/README.rst +++ b/l10n_es_aeat/README.rst @@ -126,6 +126,12 @@ Known issues / Roadmap auto-selecciona por fechas de validez. - Las partes específicas de las Diputaciones Forales no están incluidas. +- El módulo de certificate no incluye la funcionalidad de especificar + la carpeta dónde se almacena el certificado, se opta por eliminar la + funcionalidad en v18 +- El módulo de certificate guarda el password en el certificado, esa + información no está disponible antes de migrar por lo que se dejará + vacía Bug Tracker =========== diff --git a/l10n_es_aeat/__manifest__.py b/l10n_es_aeat/__manifest__.py index 3987e51b191..7e44a73fa78 100644 --- a/l10n_es_aeat/__manifest__.py +++ b/l10n_es_aeat/__manifest__.py @@ -22,7 +22,7 @@ "website": "https://github.com/OCA/l10n-spain", "category": "Accounting & Finance", "development_status": "Mature", - "depends": ["l10n_es", "account_tax_balance"], + "depends": ["l10n_es", "account_tax_balance", "certificate"], # odoo_test_helper is needed for the tests "external_dependencies": {"python": ["unidecode"]}, "data": [ @@ -33,7 +33,6 @@ "data/aeat_tax_agency_data.xml", "wizard/export_to_boe_wizard.xml", "wizard/compare_boe_file_views.xml", - "wizard/aeat_certificate_password_view.xml", "views/aeat_menuitem.xml", # it should be before the other views "views/aeat_map_tax_views.xml", "views/aeat_report_view.xml", diff --git a/l10n_es_aeat/migrations/18.0.1.0.0/post-migrate.py b/l10n_es_aeat/migrations/18.0.1.0.0/post-migrate.py new file mode 100644 index 00000000000..cba0ad6ae37 --- /dev/null +++ b/l10n_es_aeat/migrations/18.0.1.0.0/post-migrate.py @@ -0,0 +1,64 @@ +import base64 +import binascii +import json +import logging + +from odoo import SUPERUSER_ID, api + +_logger = logging.getLogger(__name__) + + +def migrate(cr, version): + """Restores AEAT certificate data into the new schema.""" + env = api.Environment(cr, SUPERUSER_ID, {}) + migration_data = env["ir.config_parameter"].get_param( + "l10n_es_aeat_certificate_migration_data" + ) + if not migration_data: + raise ValueError("No AEAT certificate migration data found") + + migration_data = json.loads(migration_data) + + for cert_id, cert_data in migration_data.items(): + private_key_data = cert_data.get("private_key", "") + date_start = cert_data.get("date_start") + date_end = cert_data.get("date_end") + public_key_path = cert_data.get("public_key", "") + try: + private_key_bytes = private_key_data.encode() if private_key_data else None + except binascii.Error: + private_key_bytes = None + if public_key_path: + try: + with open(public_key_path, "rb") as f: + public_key_content = f.read() + except FileNotFoundError: + _logger.info("Public key file not found") + key_id = ( + env["certificate.key"].create( + { + "name": f"Key for AEAT Cert {cert_id}", + "content": private_key_bytes or b"", + } + ) + if private_key_bytes + else None + ) + cert_id_new = env["certificate.certificate"].create( + { + "name": f"AEAT Cert {cert_id}", + "private_key_id": key_id.id if key_id else False, + "date_start": date_start, + "date_end": date_end, + "content": base64.b64encode(public_key_content).decode() + if public_key_content + else "", + } + ) + env["l10n.es.aeat.certificate"].browse(int(cert_id)).write( + {"certificate_id": cert_id_new.id} + ) + # Clean up stored migration data + env["ir.config_parameter"].sudo().set_param( + "l10n_es_aeat_certificate_migration_data", "" + ) diff --git a/l10n_es_aeat/migrations/18.0.1.0.0/pre-migrate.py b/l10n_es_aeat/migrations/18.0.1.0.0/pre-migrate.py new file mode 100644 index 00000000000..b8b2ed0be9b --- /dev/null +++ b/l10n_es_aeat/migrations/18.0.1.0.0/pre-migrate.py @@ -0,0 +1,50 @@ +import base64 +import json + +from openupgradelib import openupgrade + + +def _save_certificates(env): + cr = env.cr + cr.execute(""" + SELECT id, company_id, name, state, date_start, date_end, + public_key, private_key + FROM l10n_es_aeat_certificate + """) + certificates = cr.fetchall() + stored_data = {} + for cert in certificates: + ( + cert_id, + company_id, + name, + state, + date_start, + date_end, + public_key, + private_key_path, + ) = cert + private_key_content = "" + if private_key_path: + try: + with open(private_key_path, "rb") as f: + private_key_content = base64.b64encode(f.read()).decode() + except FileNotFoundError: + private_key_content = "" + stored_data[str(cert_id)] = { + "company_id": company_id, + "name": name, + "state": state, + "date_start": str(date_start) if date_start else None, + "date_end": str(date_end) if date_end else None, + "public_key": public_key or "", + "private_key": private_key_content, + } + env["ir.config_parameter"].set_param( + "l10n_es_aeat_certificate_migration_data", json.dumps(stored_data) + ) + + +@openupgrade.migrate() +def migrate(env, version): + _save_certificates(env) diff --git a/l10n_es_aeat/models/aeat_certificate.py b/l10n_es_aeat/models/aeat_certificate.py index 20bca1d1e96..a758a22493f 100644 --- a/l10n_es_aeat/models/aeat_certificate.py +++ b/l10n_es_aeat/models/aeat_certificate.py @@ -15,29 +15,18 @@ class L10nEsAeatCertificate(models.Model): selection=[("draft", "Draft"), ("active", "Active")], default="draft", ) - file = fields.Binary(required=True) - folder = fields.Char(string="Folder Name", required=True) - date_start = fields.Date(string="Start Date") - date_end = fields.Date(string="End Date") - public_key = fields.Char(readonly=True) - private_key = fields.Char(readonly=True) + certificate_id = fields.Many2one( + comodel_name="certificate.certificate", + string="Certificate", + ) company_id = fields.Many2one( comodel_name="res.company", string="Company", required=True, default=lambda self: self.env.company, ) - - def load_password_wizard(self): - self.ensure_one() - return { - "type": "ir.actions.act_window", - "name": self.env._("Insert Password"), - "res_model": "l10n.es.aeat.certificate.password", - "view_mode": "form", - "views": [(False, "form")], - "target": "new", - } + date_start = fields.Datetime(related="certificate_id.date_start") + date_end = fields.Datetime(related="certificate_id.date_end") def action_active(self): self.ensure_one() @@ -55,28 +44,25 @@ def get_certificates(self, company=False): aeat_certificate = self.search( [ ("company_id", "=", company.id), - ("public_key", "!=", False), - ("private_key", "!=", False), + ("certificate_id", "!=", False), "|", - ("date_start", "=", False), - ("date_start", "<=", today), + ("certificate_id.date_start", "=", False), + ("certificate_id.date_start", "<=", today), "|", - ("date_end", "=", False), - ("date_end", ">=", today), + ("certificate_id.date_end", "=", False), + ("certificate_id.date_end", ">=", today), ("state", "=", "active"), ], limit=1, ) - if aeat_certificate: - public_crt = aeat_certificate.public_key - private_key = aeat_certificate.private_key - else: - public_crt = self.env["ir.config_parameter"].get_param( - "l10n_es_aeat_certificate.publicCrt", False - ) - private_key = self.env["ir.config_parameter"].get_param( - "l10n_es_aeat_certificate.privateKey", False - ) - if not public_crt or not private_key: + if not aeat_certificate: raise exceptions.UserError(self.env._("Error! There aren't certificates.")) + + public_crt = aeat_certificate.certificate_id.pem_certificate + private_key_record = aeat_certificate.certificate_id.private_key_id + if not private_key_record or not private_key_record.pem_key: + raise exceptions.UserError(self.env._("Private key is missing or invalid.")) + + private_key = private_key_record.pem_key + return public_crt, private_key diff --git a/l10n_es_aeat/models/aeat_mixin.py b/l10n_es_aeat/models/aeat_mixin.py index 34e5b905d9f..d4188f6c804 100644 --- a/l10n_es_aeat/models/aeat_mixin.py +++ b/l10n_es_aeat/models/aeat_mixin.py @@ -5,6 +5,7 @@ # Copyright 2023-2024 Aures Tic - Jose Zambudio # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). import logging +import tempfile from requests import Session @@ -138,12 +139,22 @@ def _connect_aeat(self, mapping_key): company=self.company_id ) params = self._connect_params_aeat(mapping_key) - session = Session() - session.cert = (public_crt, private_key) - transport = Transport(session=session) - history = HistoryPlugin() - client = Client(wsdl=params["wsdl"], transport=transport, plugins=[history]) - return self._bind_service(client, params["port_name"], params["address"]) + # Create temporary files to store the certificate and key + with ( + tempfile.NamedTemporaryFile(delete=False, suffix=".crt") as cert_file, + tempfile.NamedTemporaryFile(delete=False, suffix=".key") as key_file, + ): + cert_file.write(public_crt) + key_file.write(private_key) + cert_file.flush() + key_file.flush() + # Set up session with certificate and key file paths + session = Session() + session.cert = (cert_file.name, key_file.name) # Provide file paths + transport = Transport(session=session) + history = HistoryPlugin() + client = Client(wsdl=params["wsdl"], transport=transport, plugins=[history]) + return self._bind_service(client, params["port_name"], params["address"]) def _get_aeat_country_code(self): self.ensure_one() diff --git a/l10n_es_aeat/readme/ROADMAP.md b/l10n_es_aeat/readme/ROADMAP.md index 865323be490..b61a51baf82 100644 --- a/l10n_es_aeat/readme/ROADMAP.md +++ b/l10n_es_aeat/readme/ROADMAP.md @@ -1,3 +1,7 @@ - La configuración de exportación a BOE no se filtran ni se auto-selecciona por fechas de validez. - Las partes específicas de las Diputaciones Forales no están incluidas. +- El módulo de certificate no incluye la funcionalidad de especificar la carpeta + dónde se almacena el certificado, se opta por eliminar la funcionalidad en v18 +- El módulo de certificate guarda el password en el certificado, esa información + no está disponible antes de migrar por lo que se dejará vacía \ No newline at end of file diff --git a/l10n_es_aeat/security/ir.model.access.csv b/l10n_es_aeat/security/ir.model.access.csv index 89ba628cd0f..f422e7a5f92 100644 --- a/l10n_es_aeat/security/ir.model.access.csv +++ b/l10n_es_aeat/security/ir.model.access.csv @@ -19,4 +19,3 @@ access_l10n_es_aeat_soap,access_l10n_es_aeat_soap,model_l10n_es_aeat_soap,group_ access_l10n_es_aeat_report_compare_boe_file,access_l10n_es_aeat_report_compare_boe_file,model_l10n_es_aeat_report_compare_boe_file,group_account_aeat,1,1,1,0 access_l10n_es_aeat_report_compare_boe_file_line,access_l10n_es_aeat_report_compare_boe_file_line,model_l10n_es_aeat_report_compare_boe_file_line,group_account_aeat,1,1,1,0 access_l10n_es_aeat_report_export_to_boe,access_l10n_es_aeat_report_export_to_boe,model_l10n_es_aeat_report_export_to_boe,group_account_aeat,1,1,1,0 -access_l10n_es_aeat_certificate_password,access_l10n_es_aeat_certificate_password,model_l10n_es_aeat_certificate_password,group_account_aeat,1,1,1,0 diff --git a/l10n_es_aeat/static/description/index.html b/l10n_es_aeat/static/description/index.html index 52b588d50b0..09f787da129 100644 --- a/l10n_es_aeat/static/description/index.html +++ b/l10n_es_aeat/static/description/index.html @@ -473,6 +473,12 @@

Known issues / Roadmap

auto-selecciona por fechas de validez.
  • Las partes específicas de las Diputaciones Forales no están incluidas.
  • +
  • El módulo de certificate no incluye la funcionalidad de especificar +la carpeta dónde se almacena el certificado, se opta por eliminar la +funcionalidad en v18
  • +
  • El módulo de certificate guarda el password en el certificado, esa +información no está disponible antes de migrar por lo que se dejará +vacía
  • diff --git a/l10n_es_aeat/tests/test_l10n_es_aeat_certificate.py b/l10n_es_aeat/tests/test_l10n_es_aeat_certificate.py index e71faacfa6a..eeb65981b8b 100644 --- a/l10n_es_aeat/tests/test_l10n_es_aeat_certificate.py +++ b/l10n_es_aeat/tests/test_l10n_es_aeat_certificate.py @@ -2,133 +2,88 @@ # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html) import base64 -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone -import cryptography from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa -from cryptography.hazmat.primitives.serialization import BestAvailableEncryption, pkcs12 -from cryptography.x509 import oid -from odoo import exceptions +from odoo.exceptions import UserError +from odoo.tests import TransactionCase, tagged -from odoo.addons.base.tests.common import BaseCommon -CRYPTOGRAPHY_VERSION_3 = tuple(map(int, cryptography.__version__.split("."))) >= (3, 0) -if not CRYPTOGRAPHY_VERSION_3: - from cryptography.hazmat.backends import default_backend - - def generate_private_key(public_exponent, key_size): - return rsa.generate_private_key( - public_exponent=public_exponent, - key_size=key_size, - backend=default_backend(), - ) - - from OpenSSL import crypto +@tagged("post_install", "-at_install") +class TestKeysCertificates(TransactionCase): + @classmethod + def setUpClass(cls): + super().setUpClass() - def serialize_key_and_certificates(private_key, certificate, password): - p12 = crypto.PKCS12() - p12.set_privatekey( - crypto.load_privatekey( - crypto.FILETYPE_PEM, - private_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption(), - ), - ) - ) - p12.set_certificate( - crypto.load_certificate( - crypto.FILETYPE_PEM, - certificate.public_bytes( - encoding=serialization.Encoding.PEM, + cls.subject = cls.issuer = x509.Name( + [ + x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, "BE"), + x509.NameAttribute( + x509.oid.NameOID.STATE_OR_PROVINCE_NAME, "Brabant wallon" ), - ) + x509.NameAttribute(x509.oid.NameOID.LOCALITY_NAME, "Grand Rosière"), + x509.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, "Odoo S.A."), + x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "odoo.com"), + ] ) - p12data = p12.export(password) - return p12data - -else: - generate_private_key = rsa.generate_private_key - def serialize_key_and_certificates(private_key, certificate, password): - return pkcs12.serialize_key_and_certificates( - None, - private_key, - certificate, - None, - BestAvailableEncryption(password), + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + cls.test_key_1 = cls.env["certificate.key"].create( + { + "name": "Test key", + "content": base64.b64encode( + private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + ), + } ) - -class TestL10nEsAeatCertificateBase(BaseCommon): - @classmethod - def setUpClass(cls): - super().setUpClass() - cls.certificate_password = b"794613" - private_key = generate_private_key(public_exponent=65537, key_size=2048) - public_key = private_key.public_key() - builder = x509.CertificateBuilder() - cls.certificate_name = "Test Certificate" - one_day = timedelta(1, 0, 0) - builder = ( - builder.subject_name( - x509.Name( - [x509.NameAttribute(oid.NameOID.COMMON_NAME, cls.certificate_name)] - ) - ) - .issuer_name( - x509.Name( - [ - x509.NameAttribute(oid.NameOID.COMMON_NAME, "cryptography.io"), - ] - ) - ) - .not_valid_before(datetime.today() - one_day) - .not_valid_after(datetime.today() + (one_day * 30)) + cls.certificate_1 = ( + x509.CertificateBuilder() + .subject_name(cls.subject) + .issuer_name(cls.issuer) + .public_key(private_key.public_key()) .serial_number(x509.random_serial_number()) - .public_key(public_key) + .not_valid_before(datetime.now(timezone.utc) - timedelta(days=10)) + .not_valid_after(datetime.now(timezone.utc) + timedelta(days=10)) + .add_extension( + x509.SubjectAlternativeName([x509.DNSName("localhost")]), + critical=False, + ) + .sign(private_key, hashes.SHA256()) ) - sign_params = {"private_key": private_key, "algorithm": hashes.SHA256()} - if not CRYPTOGRAPHY_VERSION_3: - sign_params["backend"] = default_backend() - certificate = builder.sign(**sign_params) - content = serialize_key_and_certificates( - private_key, - certificate, - cls.certificate_password, + + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + certificate = cls.env["certificate.certificate"].create( + { + "name": "Test AEAT Certificate", + "content": base64.b64encode( + cls.certificate_1.public_bytes(encoding=serialization.Encoding.PEM) + ), + "private_key_id": cls.test_key_1.id, + } ) cls.sii_cert = cls.env["l10n.es.aeat.certificate"].create( { - "folder": "Test folder", - "file": base64.b64encode(content), + "certificate_id": certificate.id, + "state": "active", } ) - def _activate_certificate(self, passwd=None): - """Obtain Keys from .pfx and activate the cetificate""" - if not passwd: - passwd = self.certificate_password - wizard = self.env["l10n.es.aeat.certificate.password"].create( - {"password": passwd} + def test_get_certificates(self): + pem_certificate, private_key = self.sii_cert.get_certificates() + self.assertEqual(pem_certificate, self.sii_cert.certificate_id.pem_certificate) + self.assertEqual( + private_key, self.sii_cert.certificate_id.private_key_id.pem_key ) - wizard.with_context(active_id=self.sii_cert.id).get_keys() - self.sii_cert.action_active() - self.sii_cert.company_id.write( - {"name": "ENTIDAD FICTICIO ACTIVO", "vat": "ESJ7102572J"} - ) - self.assertEqual(self.certificate_name, self.sii_cert.name) - -class TestL10nEsAeatCertificate(TestL10nEsAeatCertificateBase): - def test_activate_certificate(self): - self.assertRaises( - exceptions.ValidationError, - self._activate_certificate, - b"Wrong passwd", - ) - self._activate_certificate(self.certificate_password) - self.assertEqual(self.sii_cert.state, "active") + # Test that an error is raised when no valid certificates exist + self.sii_cert.state = "draft" + with self.assertRaises(UserError): + self.sii_cert.get_certificates() diff --git a/l10n_es_aeat/views/aeat_certificate_view.xml b/l10n_es_aeat/views/aeat_certificate_view.xml index 7161fc884f1..1f42c15f477 100644 --- a/l10n_es_aeat/views/aeat_certificate_view.xml +++ b/l10n_es_aeat/views/aeat_certificate_view.xml @@ -6,11 +6,6 @@
    -