Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use papis add for import #43

Merged
merged 9 commits into from
Mar 6, 2024
2 changes: 1 addition & 1 deletion papis_zotero/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def do_importer(from_bibtex: Optional[str], from_sql: Optional[str],
elif from_sql is not None:
import papis_zotero.sql
try:
papis_zotero.sql.add_from_sql(from_sql, outfolder)
papis_zotero.sql.add_from_sql(from_sql, outfolder, link)
except Exception as exc:
logger.error("Failed to import from file: %s",
from_sql,
Expand Down
81 changes: 26 additions & 55 deletions papis_zotero/sql.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import glob
import os
import re
import shutil
import sqlite3
from datetime import datetime
from typing import Any, Dict, List, Optional

import papis.yaml
import papis.config
import papis.bibtex
import papis.strings
import papis.document
import papis.logging
import papis.commands.add

import papis_zotero.utils

Expand Down Expand Up @@ -131,29 +129,20 @@ def get_creators(connection: sqlite3.Connection,


def get_files(connection: sqlite3.Connection, item_id: str, item_key: str,
input_path: str, output_path: str) -> List[str]:
input_path: str, out_folder: str) -> List[str]:
cursor = connection.cursor()
cursor.execute(
ZOTERO_QUERY_ITEM_ATTACHMENTS,
(item_id,) + tuple(papis_zotero.utils.ZOTERO_SUPPORTED_MIMETYPES_TO_EXTENSION))

files = []
for key, path, mime in cursor:
import_paths = glob.glob(os.path.join(input_path, "storage", key, "*.*"))
if not import_paths:
continue

import_path = import_paths[0]
_, ext = os.path.splitext(import_path)
file_name = "{}{}".format(key, ext)
local_path = os.path.join(output_path, item_key, file_name)

try:
shutil.copyfile(import_path, local_path)
files.append(file_name)
except Exception as exc:
logger.error("Failed to export attachment '%s': '%s' (%s).", key,
path, mime, exc_info=exc)
for key, path, mime_type in cursor:
if match := re.match("storage:(.*)", path):
file_name = match.group(1)
files.append(os.path.join(input_path, "storage", key, file_name))
else:
logger.error("Failed to export attachment %s (with type %s) from path '%s'",
key, mime_type, path)

return files

Expand Down Expand Up @@ -229,25 +218,25 @@ def get_collections(connection: sqlite3.Connection,
""".format(",".join(["?"] * len(papis_zotero.utils.ZOTERO_EXCLUDED_ITEM_TYPES)))


def add_from_sql(input_path: str, output_path: Optional[str] = None) -> None:
def add_from_sql(input_path: str, out_folder: Optional[str] = None,
link: bool = False) -> None:
"""
:param inpath: path to zotero SQLite database "zoter.sqlite" and
"storage" to be imported
:param outpath: path where all items will be exported to created if not
existing
"""
import yaml

if output_path is None:
output_path = papis.config.get_lib_dirs()[0]
if out_folder is None:
out_folder = papis.config.get_lib_dirs()[0]

if not os.path.exists(input_path):
raise FileNotFoundError(
"[Errno 2] No such file or directory: '{}'".format(input_path))

if not os.path.exists(output_path):
if not os.path.exists(out_folder):
raise FileNotFoundError(
"[Errno 2] No such file or directory: '{}'".format(output_path))
"[Errno 2] No such file or directory: '{}'".format(out_folder))

zotero_sqlite_file = os.path.join(input_path, "zotero.sqlite")
if not os.path.exists(zotero_sqlite_file):
Expand All @@ -264,11 +253,10 @@ def add_from_sql(input_path: str, output_path: Optional[str] = None) -> None:

cursor.execute(ZOTERO_QUERY_ITEMS,
papis_zotero.utils.ZOTERO_EXCLUDED_ITEM_TYPES)
for i, (item_id, item_type, item_key, date_added) in enumerate(cursor, start=1):
path = os.path.join(output_path, item_key)
if not os.path.exists(path):
os.makedirs(path)
if out_folder is not None:
papis.config.set_lib_from_name(out_folder)

for i, (item_id, item_type, item_key, date_added) in enumerate(cursor, start=1):
# convert fields
date_added = (
datetime.strptime(date_added, "%Y-%m-%d %H:%M:%S")
Expand All @@ -281,37 +269,20 @@ def add_from_sql(input_path: str, output_path: Optional[str] = None) -> None:
item_id,
item_key,
input_path=input_path,
output_path=output_path)
out_folder=out_folder)

item = {"type": item_type, "time-added": date_added, "files": files}
item.update(fields)
item.update(get_creators(connection, item_id))
item.update(get_tags(connection, item_id))
item.update(get_collections(connection, item_id))

# create a reference
ref = None
extra = item.get("extra", None)
if extra:
matches = re.search(r".*Citation Key: (\w+)", extra)
if matches:
ref = matches.group(1)

if ref is None:
ref = papis.bibtex.create_reference(item)

item["ref"] = ref
logger.info("[%4d/%-4d] Exporting item '%s' with ref '%s' to folder '%s'.",
i, items_count, item_key, ref, path)

# write out the info file
# FIXME: should use papis.yaml.data_to_yaml, but blocked by
# https://github.com/papis/papis/pull/571
with open(os.path.join(path, "info.yaml"), "w+", encoding="utf-8") as fd:
yaml.dump(item,
stream=fd,
allow_unicode=True,
default_flow_style=False)
logger.info("[%4d/%-4d] Exporting item '%s' to library '%s'.",
i, items_count, item_key, out_folder)

papis.commands.add.run(paths=files, data=item, link=link,
folder_name=papis.config.getstring("add-folder-name")
)

logger.info("Finished exporting from '%s'.", input_path)
logger.info("Exported files can be found at '%s'.", output_path)
logger.info("Exported files can be found at '%s'.", out_folder)
11 changes: 9 additions & 2 deletions tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,30 @@
from .testlib import TemporaryLibrary


@pytest.mark.skipif(os.name == "nt", reason="encoding is incorrect on windows")
@pytest.mark.library_setup(populate=False)
def test_simple(tmp_library: TemporaryLibrary) -> None:
sqlpath = os.path.join(os.path.dirname(__file__), "resources", "sql")
papis.config.set("add-folder-name", "{doc[author]}")
papis_zotero.sql.add_from_sql(sqlpath)

folders = os.listdir(tmp_library.libdir)
assert len(folders) == 5
assert len(glob.glob(tmp_library.libdir + "/**/*.pdf")) == 4

doc = papis.document.from_folder(os.path.join(tmp_library.libdir, "IH8J2JJP"))
doc = papis.document.from_folder(
os.path.join(
tmp_library.libdir,
"svard-magnus-and-nordstrom-jan"
)
)

info_name = os.path.join(os.path.dirname(__file__), "resources", "sql_out.yaml")
with open(info_name, encoding="utf-8") as fd:
data = yaml.load(fd, Loader=papis.yaml.Loader) # type: ignore[attr-defined]
expected_doc = papis.document.from_data(data)

assert expected_doc["ref"] == doc["ref"]
assert expected_doc["author"] == doc["author"]

# FIXME: currently fails on windows
# assert doc.get_files()
Loading