diff --git a/acd_cli.py b/acd_cli.py index b611c03..cd8b5b0 100755 --- a/acd_cli.py +++ b/acd_cli.py @@ -1,35 +1,33 @@ #!/usr/bin/env python3 -import sys -import os -import json import argparse +import json import logging import logging.handlers +import os +import re import signal +import sys import time -import re -import appdirs - from collections import namedtuple from configparser import ConfigParser from functools import partial from multiprocessing import Event - from pkgutil import walk_packages + +import appdirs from pkg_resources import iter_entry_points import acdcli +from acdcli import plugins from acdcli.api import client from acdcli.api.common import RequestError, is_valid_id from acdcli.cache import format, db +from acdcli.cache.db import CacheConsts from acdcli.utils import hashing, progress from acdcli.utils.conf import get_conf from acdcli.utils.threading import QueuedLoader from acdcli.utils.time import * -# load local plugin modules (default ones, for developers) -from acdcli import plugins - for importer, modname, ispkg in walk_packages(path=plugins.__path__, prefix=plugins.__name__ + '.', onerror=lambda x: None): if not ispkg: @@ -121,11 +119,13 @@ def pprint(d: dict): # Glue functions (API, cache) # - -class CacheConsts(object): - CHECKPOINT_KEY = 'checkpoint' - LAST_SYNC_KEY = 'last_sync' - MAX_AGE = 30 +def sync_owner_id(): + global cache + owner_id = cache.KeyValueStorage.get(CacheConsts.OWNER_ID) + if not owner_id: + owner_id = acd_client.get_owner_id() + cache.KeyValueStorage[CacheConsts.OWNER_ID] = owner_id + return owner_id def sync_node_list(full=False, to_file=None, from_file=None) -> 'Union[int, None]': @@ -194,12 +194,14 @@ def sync_node_list(full=False, to_file=None, from_file=None) -> 'Union[int, None print() if to_file: out.close() + sync_owner_id() def old_sync() -> 'Union[int, None]': global cache cache.drop_all() cache = db.NodeCache(CACHE_PATH) + sync_owner_id() try: folders = acd_client.get_folder_list() folders.extend(acd_client.get_trashed_folders()) @@ -1154,11 +1156,14 @@ def mount_action(args: argparse.Namespace): import acdcli.acd_fuse acdcli.acd_fuse.mount(args.path, dict(acd_client=acd_client, cache=cache, nlinks=args.nlinks, autosync=asp, - settings_path=SETTINGS_PATH), + settings_path=SETTINGS_PATH, + umask = args.umask, + uid = args.uid, + gid = args.gid + ), ro=args.read_only, foreground=args.foreground, nothreads=args.single_threaded, nonempty=args.nonempty, modules=args.modules, - umask=args.umask,gid=args.gid,uid=args.uid, allow_root=args.allow_root, allow_other=args.allow_other, volname=args.volname) diff --git a/acdcli/acd_fuse.py b/acdcli/acd_fuse.py index 2d30158..2817e10 100644 --- a/acdcli/acd_fuse.py +++ b/acdcli/acd_fuse.py @@ -2,18 +2,30 @@ import configparser import errno +import io +import json import logging import os import stat import sys +import tempfile from collections import deque, defaultdict from multiprocessing import Process -from queue import Queue, Full as QueueFull -from threading import Thread, Lock, Event +from threading import Lock, Thread from time import time, sleep import ctypes.util +import binascii + +import requests + +from acdcli.cache.db import CacheConsts + +from fuse import FUSE, FuseOSError as FuseError, Operations +from acdcli.api.common import RequestError +from acdcli.utils.conf import get_conf + ctypes.util.__find_library = ctypes.util.find_library def find_library(*args): @@ -25,12 +37,6 @@ def find_library(*args): return ctypes.util.__find_library(*args) ctypes.util.find_library = find_library - -from fuse import FUSE, FuseOSError as FuseError, Operations -from acdcli.api.common import RequestError -from acdcli.utils.conf import get_conf -from acdcli.utils.time import * - logger = logging.getLogger(__name__) try: @@ -43,11 +49,18 @@ def find_library(*args): errno.EREMOTEIO = errno.EIO _SETTINGS_FILENAME = 'fuse.ini' +_XATTR_PROPERTY_NAME = 'xattrs' +_XATTR_MTIME_OVERRIDE_NAME = 'fuse.mtime' +_XATTR_MODE_OVERRIDE_NAME = 'fuse.mode' +_XATTR_UID_OVERRIDE_NAME = 'fuse.uid' +_XATTR_GID_OVERRIDE_NAME = 'fuse.gid' +_XATTR_SYMLINK_OVERRIDE_NAME = 'fuse.symlink' +_XATTR_DELAY = 2 # seconds to wait for additional xattr changes before flushing to amazon _def_conf = configparser.ConfigParser() -_def_conf['fs'] = dict(block_size=512) -_def_conf['read'] = dict(open_chunk_limit=10, timeout=5) -_def_conf['write'] = dict(buffer_size = 32, timeout=30) +_def_conf['fs'] = dict(block_size=io.DEFAULT_BUFFER_SIZE) +_def_conf['read'] = dict(open_chunk_limit=10, timeout=5, cache_small_file_size=1024) +_def_conf['write'] = dict(buffer_size=int(1e9), timeout=30) class FuseOSError(FuseError): @@ -206,139 +219,82 @@ class WriteProxy(object): def __init__(self, acd_client, cache, buffer_size, timeout): self.acd_client = acd_client self.cache = cache - self.files = defaultdict(lambda: WriteProxy.WriteStream(buffer_size, timeout)) - - class WriteStream(object): - """A WriteStream is a binary file-like object that is backed by a Queue. - It will remember its current offset.""" - - __slots__ = ('q', 'offset', 'error', 'closed', 'done', 'timeout') - - def __init__(self, buffer_size, timeout): - self.q = Queue(maxsize=buffer_size) - """a queue that buffers written blocks""" - self.offset = 0 - """the beginning fpos""" - self.error = False - """whether the read or write failed""" - self.closed = False - self.done = Event() - """done event is triggered when file is successfully read and transferred""" - self.timeout = timeout - - def write(self, data: bytes): - """Writes data into queue. - - :raises: FuseOSError on timeout""" - - if self.error: - raise FuseOSError(errno.EREMOTEIO) - try: - self.q.put(data, timeout=self.timeout) - except QueueFull: - logger.error('Write timeout.') - raise FuseOSError(errno.ETIMEDOUT) - self.offset += len(data) - - def read(self, ln=0) -> bytes: - """Returns as much byte data from queue as possible. - Returns empty bytestring (EOF) if queue is empty and file was closed. - - :raises: IOError""" - - if self.error: - raise IOError(errno.EIO, errno.errorcode[errno.EIO]) - - if self.closed and self.q.empty(): - return b'' - - b = [self.q.get()] - self.q.task_done() - while not self.q.empty(): - b.append(self.q.get()) - self.q.task_done() - - return b''.join(b) - - def flush(self): - """Waits until the queue is emptied. - - :raises: FuseOSError""" - - while True: - if self.error: - raise FuseOSError(errno.EREMOTEIO) - if self.q.empty(): - return - sleep(1) - - def close(self): - """Sets the closed flag to signal 'EOF' to the read function. - Then, waits until :attr:`done` event is triggered. - - :raises: FuseOSError""" - - self.closed = True - # prevent read deadlock - self.q.put(b'') + self.buffers = defaultdict(lambda: WriteProxy.WriteBuffer(buffer_size)) - # wait until read is complete - while True: - if self.error: - raise FuseOSError(errno.EREMOTEIO) - if self.done.wait(1): - return - - def write_n_sync(self, stream: WriteStream, node_id: str): - """Try to overwrite file with id ``node_id`` with content from ``stream``. - Triggers the :attr:`WriteStream.done` event on success. + class WriteBuffer(object): + def __init__(self, buffer_size): + self.f = tempfile.SpooledTemporaryFile(max_size=buffer_size) + self.lock = Lock() + self.dirty = True + self.len = 0 - :param stream: a file-like object""" + def read(self, offset, length: int): + with self.lock: + self.f.seek(offset) + return self.f.read(length) + def write(self, offset, bytes_: bytes): + with self.lock: + self.dirty = True + self.f.seek(offset) + ret = self.f.write(bytes_) + self.f.seek(0, os.SEEK_END) + self.len = self.f.tell() + return ret + + def length(self): + return self.len + + def get_file(self): + """Return the file for direct access. Be sure to lock from the outside when doing so""" + self.f.seek(0) + return self.f + + def _write_and_sync(self, buffer: WriteBuffer, node_id: str): try: - r = self.acd_client.overwrite_stream(stream, node_id) + with buffer.lock: + if not buffer.dirty: + return + r = self.acd_client.overwrite_tempfile(node_id, buffer.get_file()) + buffer.dirty = False except (RequestError, IOError) as e: - stream.error = True logger.error('Error writing node "%s". %s' % (node_id, str(e))) else: - self.cache.insert_node(r) - stream.done.set() + self.cache.insert_node(r, flush_resolve_cache=False) - def write(self, node_id, fh, offset, bytes_): - """Gets WriteStream from defaultdict. Creates overwrite thread if offset is 0, - tries to continue otherwise. + def read(self, node_id, fh, offset, length: int): + b = self.buffers.get(node_id) + if b: + return b.read(offset, length) + + def write(self, node_id, fh, offset, bytes_: bytes): + """Gets WriteBuffer from defaultdict. :raises: FuseOSError: wrong offset or writing failed""" - f = self.files[fh] + b = self.buffers[node_id] + b.write(offset, bytes_) - if f.offset == offset: - f.write(bytes_) - else: - f.error = True # necessary? - logger.error('Wrong offset for writing to fh %s.' % fh) - raise FuseOSError(errno.ESPIPE) + def length(self, node_id, fh): + b = self.buffers.get(node_id) + if b: + return b.length() - if offset == 0: - t = Thread(target=self.write_n_sync, args=(f, node_id)) - t.daemon = True - t.start() + def flush(self, node_id, fh): + b = self.buffers.get(node_id) + if b: + self._write_and_sync(b, node_id) - def flush(self, fh): - f = self.files.get(fh) - if f: - f.flush() + def release(self, node_id, fh): + b = self.buffers.get(node_id) + if b: + self._write_and_sync(b, node_id) + del self.buffers[node_id] - def release(self, fh): - """:raises: FuseOSError""" - f = self.files.get(fh) - if f: - try: - f.close() - except: - raise - finally: - del self.files[fh] + def remove(self, node_id, fh): + b = self.buffers.get(node_id) + if b: + del self.buffers[node_id] class LoggingMixIn(object): @@ -353,6 +309,8 @@ def __call__(self, op, path, *args): targs = (len(args[0]),) + args[1:] elif op == 'chmod': targs = (oct(args[0]),) + args[1:] + elif op == 'setxattr': + targs = (args[0],) + (len(args[1]),) logger.debug('-> %s %s %s', op, path, repr(args if not targs else targs)) @@ -366,6 +324,8 @@ def __call__(self, op, path, *args): finally: if op == 'read': ret = len(ret) + elif op == 'getxattr' and ret and ret != '[Errno 61] No data available': + ret = len(ret) logger.debug('<- %s %s', op, repr(ret)) @@ -378,8 +338,13 @@ def __init__(self, **kwargs): :param kwargs: cache (NodeCache), acd_client (ACDClient), autosync (partial)""" + self.xattr_cache = {} + self.xattr_dirty = set() + self.xattr_cache_lock = Lock() + self.cache = kwargs['cache'] self.acd_client = kwargs['acd_client'] + self.acd_client_owner = self.cache.KeyValueStorage.get(CacheConsts.OWNER_ID) autosync = kwargs['autosync'] conf = kwargs['conf'] self.conf = conf @@ -403,12 +368,22 @@ def __init__(self, **kwargs): """manually calculated available disk space""" self.fh = 1 """file handle counter\n\n :type: int""" - self.handles = {} - """map fh->node\n\n :type: dict""" + self.fh_to_node = {} + """map fh->node_id\n\n :type: dict""" + self.node_to_fh = defaultdict(lambda: set()) + """map node_id to list of interested file handles""" self.fh_lock = Lock() """lock for fh counter increment and handle dict writes""" self.nlinks = kwargs.get('nlinks', False) """whether to calculate the number of hardlinks for folders""" + self.uid = kwargs['uid'] + """sets the default uid""" + self.gid = kwargs['gid'] + """sets the default gid""" + self.umask = kwargs['umask'] + """sets the default umask""" + self.cache_small_file_size = conf.getint('read', 'cache_small_file_size') + """size of files under which we cache the contents automatically""" self.destroyed = autosync.keywords['stop'] """:type: multiprocessing.Event""" @@ -430,55 +405,201 @@ def readdir(self, path, fh) -> 'List[str]': if not node.type == 'folder': raise FuseOSError(errno.ENOTDIR) - return [_ for _ in ['.', '..'] + [c for c in self.cache.childrens_names(node.id)]] + folders, files = self.cache.list_children(folder_id=node.id, folder_path=path) + return [_ for _ in ['.', '..'] + [c.name for c in folders + files]] def getattr(self, path, fh=None) -> dict: """Creates a stat-like attribute dict, see :manpage:`stat(2)`. Calculates correct number of links for folders if :attr:`nlinks` is set.""" if fh: - node = self.handles[fh] + node_id = self.fh_to_node[fh] + node = self.cache.get_node(node_id) else: node = self.cache.resolve(path) if not node: raise FuseOSError(errno.ENOENT) + return self._getattr(node, fh) - times = dict(st_atime=time(), - st_mtime=node.modified.timestamp(), - st_ctime=node.created.timestamp()) + def _getattr(self, node, fh=None) -> dict: + try: mtime = self._getxattr(node.id, _XATTR_MTIME_OVERRIDE_NAME) + except: mtime = node.modified.timestamp() + + size = self.wp.length(node.id, fh) + if size is None: size = node.size + + try: uid = self._getxattr(node.id, _XATTR_UID_OVERRIDE_NAME) + except: uid = self.uid + + try: gid = self._getxattr(node.id, _XATTR_GID_OVERRIDE_NAME) + except: gid = self.gid + + attrs = dict(st_atime=time(), + st_mtime=mtime, + st_ctime=node.created.timestamp(), + st_uid=uid, + st_gid=gid) + + try: mode = self._getxattr(node.id, _XATTR_MODE_OVERRIDE_NAME) + except: mode = None if node.is_folder: - return dict(st_mode=stat.S_IFDIR | 0o0777, + # directory + mode = stat.S_IFDIR | (stat.S_IMODE(mode) if mode else 0o0777 & ~self.umask) + + return dict(st_mode=mode, st_nlink=self.cache.num_children(node.id) if self.nlinks else 1, - **times) + **attrs) elif node.is_file: - return dict(st_mode=stat.S_IFREG | 0o0666, + # symlink + if mode and stat.S_ISLNK(mode): mode = stat.S_IFLNK | 0o0777 + # file + else: mode = stat.S_IFREG | (stat.S_IMODE(mode) if mode else 0o0666 & ~self.umask) + + return dict(st_mode=mode, st_nlink=self.cache.num_parents(node.id) if self.nlinks else 1, - st_size=node.size, + st_size=size, st_blksize=self.conf.getint('fs', 'block_size'), - st_blocks=(node.size+511)//512, - **times) + st_blocks=(size + 511) // 512, # this field always expects a 512 block size + **attrs) - def read(self, path, length, offset, fh) -> bytes: + def listxattr(self, path): + node_id = self.cache.resolve_id(path) + if not node_id: + raise FuseOSError(errno.ENOENT) + return self._listxattr(node_id) + + def _listxattr(self, node_id): + self._xattr_load(node_id) + with self.xattr_cache_lock: + try: + return [k for k, v in self.xattr_cache[node_id].items()] + except: + return [] + + def getxattr(self, path, name, position=0): + node_id = self.cache.resolve_id(path) + if not node_id: + raise FuseOSError(errno.ENOENT) + return self._getxattr_bytes(node_id, name) + + def _getxattr(self, node_id, name): + self._xattr_load(node_id) + with self.xattr_cache_lock: + try: + ret = self.xattr_cache[node_id][name] + if ret is not None: + return ret + except: + pass + raise FuseOSError(errno.ENODATA) # should be ENOATTR + + def _getxattr_bytes(self, node_id, name): + return binascii.a2b_base64(self._getxattr(node_id, name)) + + def removexattr(self, path, name): + node_id = self.cache.resolve_id(path) + if not node_id: + raise FuseOSError(errno.ENOENT) + self._removexattr(node_id, name) + + def _removexattr(self, node_id, name): + self._xattr_load(node_id) + with self.xattr_cache_lock: + if name in self.xattr_cache[node_id]: + del self.xattr_cache[node_id][name] + self.xattr_dirty.add(node_id) + + def setxattr(self, path, name, value, options, position=0): + node_id = self.cache.resolve_id(path) + if not node_id: + raise FuseOSError(errno.ENOENT) + self._setxattr_bytes(node_id, name, value) + + def _setxattr(self, node_id, name, value): + self._xattr_load(node_id) + with self.xattr_cache_lock: + try: + self.xattr_cache[node_id][name] = value + self.xattr_dirty.add(node_id) + except: + raise FuseOSError(errno.ENOTSUP) + + def _setxattr_bytes(self, node_id, name, value: bytes): + self._setxattr(node_id, name, binascii.b2a_base64(value).decode("utf-8")) + + def _xattr_load(self, node_id): + with self.xattr_cache_lock: + if node_id not in self.xattr_cache: + xattrs_str = self.cache.get_property(node_id, self.acd_client_owner, _XATTR_PROPERTY_NAME) + try: self.xattr_cache[node_id] = json.loads(xattrs_str) + except: self.xattr_cache[node_id] = {} + + def _xattr_flush(self, node_id): + # collect all xattr changes while any fh's are open so we talk to amazon less + with self.fh_lock: + if self.node_to_fh.get(node_id): + return + Thread(target=self._xattr_write_and_sync, args=(node_id,)).start() + + def _xattr_write_and_sync(self, node_id): + # try to collect many xattr changes at once so we talk to amazon less + sleep(_XATTR_DELAY) + with self.xattr_cache_lock: + if node_id in self.xattr_dirty: + try: + xattrs_str = json.dumps(self.xattr_cache[node_id]) + self.acd_client.add_property(node_id, self.acd_client_owner, _XATTR_PROPERTY_NAME, + xattrs_str) + except (RequestError, IOError) as e: + logger.error('Error writing node xattrs "%s". %s' % (node_id, str(e))) + try: del self.xattr_cache[node_id] + except: pass + else: + self.cache.insert_property(node_id, self.acd_client_owner, _XATTR_PROPERTY_NAME, xattrs_str) + self.xattr_dirty.discard(node_id) + + def read(self, path, length, offset, fh=None) -> bytes: """Read ```length`` bytes from ``path`` at ``offset``.""" if fh: - node = self.handles[fh] + node_id = self.fh_to_node[fh] + node = self.cache.get_node(node_id) else: - node = self.cache.resolve(path, trash=False) + node = self.cache.resolve(path) if not node: raise FuseOSError(errno.ENOENT) - if node.size <= offset: + size = self.wp.length(node.id, fh) + if size is None: size = node.size + + if size <= offset: return b'' - if node.size < offset + length: - length = node.size - offset + if size < offset + length: + length = size - offset + + """If we attempt to read something we just wrote, give it back""" + ret = self.wp.read(node.id, fh, offset, length) + if ret is not None: + return ret + + """Next, check our local cache""" + content = self.cache.get_content(node.id, node.version) + if content is not None: + return content[offset:offset+length] + """For small files, read and cache the whole file""" + if node.size <= self.cache_small_file_size: + content = self.acd_client.download_chunk(node.id, 0, node.size) + self.cache.insert_content(node.id, node.version, content) + return content[offset:offset+length] + + """For all other files, stream from amazon""" return self.rp.get(node.id, offset, length, node.size) def statfs(self, path) -> dict: - """Gets some filesystem statistics as specified in :manpage:`stat(2)`.""" + """Gets some filesystem statistics as specified in :manpage:`statfs(2)`.""" bs = self.conf.getint('fs', 'block_size') return dict(f_bsize=bs, @@ -490,22 +611,25 @@ def statfs(self, path) -> dict: ) def mkdir(self, path, mode): - """Creates a directory at ``path`` (see :manpage:`mkdir(2)`). - - :param mode: not used""" + """Creates a directory at ``path`` (see :manpage:`mkdir(2)`).""" name = os.path.basename(path) ppath = os.path.dirname(path) - p = self.cache.resolve(ppath) - if not p: + p_id = self.cache.resolve_id(ppath) + if not p_id: raise FuseOSError(errno.ENOTDIR) try: - r = self.acd_client.create_folder(name, p.id) + r = self.acd_client.create_folder(name, p_id) except RequestError as e: FuseOSError.convert(e) else: - self.cache.insert_node(r) + self.cache.insert_node(r, flush_resolve_cache=False) + node_id = r['id'] + self.cache.resolve_cache_add(path, node_id) + if mode is not None: + self._setxattr(node_id, _XATTR_MODE_OVERRIDE_NAME, stat.S_IFDIR | (stat.S_IMODE(mode))) + self._xattr_flush(node_id) def _trash(self, path): logger.debug('trash %s' % path) @@ -522,7 +646,8 @@ def _trash(self, path): except RequestError as e: FuseOSError.convert(e) else: - self.cache.insert_node(r) + self.cache.insert_node(r, flush_resolve_cache=False) + self.cache.resolve_cache_del(path) def rmdir(self, path): """Moves a directory into ACD trash.""" @@ -532,28 +657,45 @@ def unlink(self, path): """Moves a file into ACD trash.""" self._trash(path) - def create(self, path, mode) -> int: - """Creates an empty file at ``path``. + def create(self, path, mode, **kwargs) -> int: + """Creates an empty file at ``path`` with access ``mode``. - :param mode: not used + :param mode: + :param path: :returns int: file handle""" name = os.path.basename(path) ppath = os.path.dirname(path) - p = self.cache.resolve(ppath, False) - if not p: + p_id = self.cache.resolve_id(ppath, False) + if not p_id: raise FuseOSError(errno.ENOTDIR) try: - r = self.acd_client.create_file(name, p.id) - self.cache.insert_node(r) - node = self.cache.get_node(r['id']) + r = self.acd_client.create_file(name, p_id) + self.cache.insert_node(r, flush_resolve_cache=False) + node_id = r['id'] + self.cache.resolve_cache_add(path, node_id) except RequestError as e: + # file all ready exists, see what we know about it since the + # cache may be out of sync or amazon missed a rename + if e.status_code == requests.codes.conflict: + prior_node_id = json.loads(e.msg)["info"]["nodeId"] + logger.error('create: duplicate name: %s prior_node_id: %s' % (name, prior_node_id)) + prior_node_amazon = self.acd_client.get_metadata(prior_node_id, False, False) + logger.error('create: prior_node(amazon): %s' % str(prior_node_amazon)) + prior_node_cache = self.cache.get_node(prior_node_id) + logger.error('create: prior_node(cache): %s' % str(prior_node_cache)) + # if prior_node_cache.name != prior_node_amazon["name"]: + # self._rename(prior_node_id, prior_node_cache.name) FuseOSError.convert(e) + if mode is not None: + self._setxattr(node_id, _XATTR_MODE_OVERRIDE_NAME, stat.S_IFREG | (stat.S_IMODE(mode))) + with self.fh_lock: self.fh += 1 - self.handles[self.fh] = node + self.fh_to_node[self.fh] = node_id + self.node_to_fh[node_id].add(self.fh) return self.fh def rename(self, old, new): @@ -581,47 +723,56 @@ def rename(self, old, new): else: raise FuseOSError(errno.EEXIST) + self.cache.resolve_cache_del(old) + if new_bn != old_bn: - self._rename(node.id, new_bn) + self._rename(node, new_bn) if new_dn != old_dn: # odir_id = self.cache.resolve_path(old_dn, False) ndir = self.cache.resolve(new_dn, False) if not ndir: raise FuseOSError(errno.ENOTDIR) - self._move(node.id, ndir.id) + self._move(node, ndir.id) - def _rename(self, id, name): + self.cache.resolve_cache_add(new, node.id) + + def _rename(self, node, name): try: - r = self.acd_client.rename_node(id, name) + r = self.acd_client.rename_node(node.id, name) except RequestError as e: FuseOSError.convert(e) else: - self.cache.insert_node(r) + self.cache.insert_node(r, flush_resolve_cache=node.is_folder) - def _move(self, id, new_folder): + def _move(self, node, new_folder): try: - r = self.acd_client.move_node(id, new_folder) + r = self.acd_client.move_node(node.id, new_folder) except RequestError as e: FuseOSError.convert(e) else: - self.cache.insert_node(r) + self.cache.insert_node(r, flush_resolve_cache=node.is_folder) def open(self, path, flags) -> int: """Opens a file. + :param path: :param flags: flags defined as in :manpage:`open(2)` :returns: file handle""" if (flags & os.O_APPEND) == os.O_APPEND: raise FuseOSError(errno.EFAULT) - node = self.cache.resolve(path, False) - if not node: + node_id = self.cache.resolve_id(path, False) + if not node_id: raise FuseOSError(errno.ENOENT) + return self._open(node_id) + + def _open(self, node_id): with self.fh_lock: self.fh += 1 - self.handles[self.fh] = node + self.fh_to_node[self.fh] = node_id + self.node_to_fh[node_id].add(self.fh) return self.fh def write(self, path, data, offset, fh) -> int: @@ -629,59 +780,109 @@ def write(self, path, data, offset, fh) -> int: :returns: number of bytes written""" - node_id = self.handles[fh].id + if fh: + node_id = self.fh_to_node[fh] + else: + # This is not resolving by path on purpose, since flushing to + # amazon is done on closing all interested file handles. + node_id = None + if not node_id: + raise FuseOSError(errno.ENOENT) + self.wp.write(node_id, fh, offset, data) return len(data) def flush(self, path, fh): - """Flushes ``fh`` in WriteProxy.""" - self.wp.flush(fh) + if fh: + node_id = self.fh_to_node[fh] + else: + node_id = self.cache.resolve_id(path) + if not node_id: + raise FuseOSError(errno.ENOENT) + self.wp.flush(node_id, fh) def truncate(self, path, length, fh=None): - """Pseudo-truncates a file, i.e. clears content if ``length``==0 or does nothing - if ``length`` is equal to current file size. + """Pseudo-truncates a file, i.e. clears content if ``length``==0 or grows + newly created nodes if ``length`` is greater than the write-back cache size. :raises FuseOSError: if pseudo-truncation to length is not supported""" if fh: - node = self.handles[fh] + node_id = self.fh_to_node[fh] + node = self.cache.get_node(node_id) else: node = self.cache.resolve(path) if not node: raise FuseOSError(errno.ENOENT) + # cut file size to 0 if length == 0: - try: - r = self.acd_client.clear_file(node.id) - except RequestError as e: - raise FuseOSError.convert(e) - else: - self.cache.insert_node(r) - elif length > 0: - if node.size != length: - raise FuseOSError(errno.ENOSYS) + if node.size: + try: + r = self.acd_client.clear_file(node.id) + except RequestError as e: + raise FuseOSError.convert(e) + else: + self.cache.insert_node(r, flush_resolve_cache=False) + self.wp.remove(node.id, None) + return 0 + + # grow newly created files + if node.size == 0 and length: + size = self.wp.length(node.id, fh) + if size is None: size = node.size + if length > size: + # amazon doesn't understand sparse files, so we send zeros + internal_fh = self._open(node.id) + self.wp.write(node.id, fh, length-1, bytes(1)) + self.release(path, internal_fh) + return 0 + + # throw until there's an api for modifying existing files' length + raise FuseOSError(errno.ENOSYS) def release(self, path, fh): """Releases an open ``path``.""" if fh: - node = self.handles[fh] - else: - node = self.cache.resolve(path, trash=False) - if node: - self.rp.release(node.id) - self.wp.release(fh) - with self.fh_lock: - del self.handles[fh] + node_id = self.fh_to_node[fh] else: + node_id = self.cache.resolve_id(path) + if not node_id: raise FuseOSError(errno.ENOENT) + last_handle = False + with self.fh_lock: + """release the writer if there's no more interest. This allows many file + handles to write to a single node provided they do it in order. + """ + interest = self.node_to_fh.get(node_id) + if interest: + interest.discard(fh) + if not interest: + last_handle = True + del self.node_to_fh[node_id] + del self.fh_to_node[fh] + + if last_handle: + self.rp.release(node_id) + self.wp.release(node_id, None) + self._xattr_flush(node_id) + + return 0 + def utimens(self, path, times=None): - """Not functional. Should set node atime and mtime to values as passed in ``times`` - or current time (see :manpage:`utimesat(2)`). + """Should set node atime and mtime to values as passed in ``times`` + or current time (see :manpage:`utimensat(2)`). + Note that this is only implemented for modified time. + :param path: :param times: [atime, mtime]""" + node_id = self.cache.resolve_id(path) + if not node_id: + raise FuseOSError(errno.ENOENT) + if times: # atime = times[0] mtime = times[1] @@ -689,18 +890,73 @@ def utimens(self, path, times=None): # atime = time() mtime = time() + try: + self._setxattr(node_id, _XATTR_MTIME_OVERRIDE_NAME, mtime) + self._xattr_flush(node_id) + except: + raise FuseOSError(errno.ENOTSUP) + + return 0 + def chmod(self, path, mode): - """Not implemented.""" - pass + node = self.cache.resolve(path) + if not node: + raise FuseOSError(errno.ENOENT) + return self._chmod(node, mode) + + def _chmod(self, node, mode): + mode_perms = stat.S_IMODE(mode) + mode_type = stat.S_IFMT(self._getattr(node)['st_mode']) + self._setxattr(node.id, _XATTR_MODE_OVERRIDE_NAME, mode_type | mode_perms) + self._xattr_flush(node.id) + return 0 def chown(self, path, uid, gid): - """Not implemented.""" - pass + node_id = self.cache.resolve_id(path) + if not node_id: + raise FuseOSError(errno.ENOENT) + return self._chown(node_id, uid, gid) + + def _chown(self, node_id, uid, gid): + if uid != -1: self._setxattr(node_id, _XATTR_UID_OVERRIDE_NAME, uid) + if gid != -1: self._setxattr(node_id, _XATTR_GID_OVERRIDE_NAME, gid) + self._xattr_flush(node_id) + return 0 + + def symlink(self, target, source): + source_bytes = source.encode('utf-8') + fh = self.create(target, None) + node_id = self.fh_to_node[fh] + self._setxattr(node_id, _XATTR_MODE_OVERRIDE_NAME, stat.S_IFLNK | 0o0777) + self.write(target, source_bytes, 0, fh) + self.release(target, fh) + return 0 + + def readlink(self, path): + node = self.cache.resolve(path) + if not node: + raise FuseOSError(errno.ENOENT) + + source = None + + if source is None: + source_bytes = self.cache.get_content(node.id, node.version) + if source_bytes is not None: + source = source_bytes.decode('utf-8') + + if source is None: + size = self.wp.length(node.id, None) + if size is None: size = node.size + source_bytes = self.read(path, size, 0) + source = source_bytes.decode('utf-8') + self.cache.insert_content(node.id, node.version, source_bytes) + return source def mount(path: str, args: dict, **kwargs) -> 'Union[int, None]': """Fusermounts Amazon Cloud Drive to specified mountpoint. + :param path: :raises: RuntimeError :param args: args to pass on to ACDFuse init :param kwargs: fuse mount options as described in :manpage:`fuse(8)`""" diff --git a/acdcli/api/common.py b/acdcli/api/common.py index 0a4d71c..505f9a3 100644 --- a/acdcli/api/common.py +++ b/acdcli/api/common.py @@ -1,6 +1,6 @@ -import requests import re +import requests from requests.exceptions import ConnectionError try: @@ -14,6 +14,13 @@ class ReadTimeoutError(Exception): # status codes that indicate request success OK_CODES = [requests.codes.OK] +RETRY_CODES = [requests.codes.server_error, # 500 + requests.codes.service_unavailable, # 503 + requests.codes.gateway_timeout, # 504 + requests.codes.bad_request, # 400 + requests.codes.request_timeout, # 408 + requests.codes.too_many_requests, # 429 + ] class RequestError(Exception): diff --git a/acdcli/api/content.py b/acdcli/api/content.py index 901ab43..710b13a 100644 --- a/acdcli/api/content.py +++ b/acdcli/api/content.py @@ -1,11 +1,12 @@ import http.client as http -import os -import json import io +import json +import logging import mimetypes +import os from collections import OrderedDict -import logging from urllib.parse import quote_plus + from requests import Response from requests_toolbelt import MultipartEncoder @@ -61,57 +62,63 @@ class ContentMixin(object): """Implements content portion of the ACD API.""" def create_folder(self, name: str, parent=None) -> dict: - body = {'kind': 'FOLDER', 'name': name} - if parent: - body['parents'] = [parent] - body_str = json.dumps(body) + while True: + body = {'kind': 'FOLDER', 'name': name} + if parent: + body['parents'] = [parent] + body_str = json.dumps(body) - acc_codes = [http.CREATED] + acc_codes = [http.CREATED] - r = self.BOReq.post(self.metadata_url + 'nodes', acc_codes=acc_codes, data=body_str) + r = self.BOReq.post(self.metadata_url + 'nodes', acc_codes=acc_codes, data=body_str) + if r.status_code in RETRY_CODES: continue # the fault lies not in our stars, but in amazon - if r.status_code not in acc_codes: - raise RequestError(r.status_code, r.text) + if r.status_code not in acc_codes: + raise RequestError(r.status_code, r.text) - return r.json() + return r.json() def create_file(self, file_name: str, parent: str = None) -> dict: - params = {'suppress': 'deduplication'} + while True: + params = {'suppress': 'deduplication'} - basename = os.path.basename(file_name) - metadata = {'kind': 'FILE', 'name': basename} - if parent: - metadata['parents'] = [parent] - mime_type = _get_mimetype(basename) - f = io.BytesIO() + basename = os.path.basename(file_name) + metadata = {'kind': 'FILE', 'name': basename} + if parent: + metadata['parents'] = [parent] + mime_type = _get_mimetype(basename) + f = io.BytesIO() - # basename is ignored - m = MultipartEncoder(fields=OrderedDict([('metadata', json.dumps(metadata)), - ('content', (quote_plus(basename), f, mime_type))]) - ) + # basename is ignored + m = MultipartEncoder(fields=OrderedDict([('metadata', json.dumps(metadata)), + ('content', (quote_plus(basename), f, mime_type))]) + ) - ok_codes = [http.CREATED] - r = self.BOReq.post(self.content_url + 'nodes', params=params, data=m, - acc_codes=ok_codes, headers={'Content-Type': m.content_type}) + ok_codes = [http.CREATED] + r = self.BOReq.post(self.content_url + 'nodes', params=params, data=m, + acc_codes=ok_codes, headers={'Content-Type': m.content_type}) + if r.status_code in RETRY_CODES: continue # the fault lies not in our stars, but in amazon - if r.status_code not in ok_codes: - raise RequestError(r.status_code, r.text) - return r.json() + if r.status_code not in ok_codes: + raise RequestError(r.status_code, r.text) + return r.json() def clear_file(self, node_id: str) -> dict: """Clears a file's content by overwriting it with an empty BytesIO. :param node_id: valid file node ID""" - m = MultipartEncoder(fields={('content', (' ', io.BytesIO(), _get_mimetype()))}) + while True: + m = MultipartEncoder(fields={('content', (' ', io.BytesIO(), _get_mimetype()))}) - r = self.BOReq.put(self.content_url + 'nodes/' + node_id + '/content', params={}, - data=m, stream=True, headers={'Content-Type': m.content_type}) + r = self.BOReq.put(self.content_url + 'nodes/' + node_id + '/content', params={}, + data=m, stream=True, headers={'Content-Type': m.content_type}) + if r.status_code in RETRY_CODES: continue # the fault lies not in our stars, but in amazon - if r.status_code not in OK_CODES: - raise RequestError(r.status_code, r.text) + if r.status_code not in OK_CODES: + raise RequestError(r.status_code, r.text) - return r.json() + return r.json() def upload_file(self, file_name: str, parent: str = None, read_callbacks=None, deduplication=False) -> dict: @@ -209,6 +216,41 @@ def overwrite_file(self, node_id: str, file_name: str, return r.json() + def overwrite_tempfile(self, node_id: str, file, + read_callbacks: list = None) -> dict: + """Overwrite content of node with ID *node_id* with content of *file*. + + :param file: readable and seekable object""" + + # If we're writing 0 bytes, clear instead + file.seek(0, os.SEEK_END) + if file.tell() == 0: + return self.clear_file(node_id) + + while True: + # logger.debug('OVERWRITE: node_id: %s' % node_id) + file.seek(0) + + metadata = {} + import uuid + boundary = uuid.uuid4().hex + + try: + r = self.BOReq.put(self.content_url + 'nodes/' + node_id + '/content', + data=self._multipart_stream(metadata, file, boundary, read_callbacks), + headers={'Content-Type': 'multipart/form-data; boundary=%s' + % boundary}) + except RequestError as e: + if e.status_code == RequestError.CODE.CONN_EXCEPTION: continue + raise + + if r.status_code in RETRY_CODES: continue # the fault lies not in our stars, but in amazon + + if r.status_code not in OK_CODES: + raise RequestError(r.status_code, r.text) + + return r.json() + def overwrite_stream(self, stream, node_id: str, read_callbacks: list = None) -> dict: """Overwrite content of node with ID *node_id* with content of *stream*. @@ -355,21 +397,26 @@ def chunked_download(self, node_id: str, file: io.BufferedWriter, **kwargs): return def response_chunk(self, node_id: str, offset: int, length: int, **kwargs) -> Response: - ok_codes = [http.PARTIAL_CONTENT] - end = offset + length - 1 - logger.debug('chunk o %d l %d' % (offset, length)) - - r = self.BOReq.get(self.content_url + 'nodes/' + node_id + '/content', - acc_codes=ok_codes, stream=True, - headers={'Range': 'bytes=%d-%d' % (offset, end)}, **kwargs) - # if r.status_code == http.REQUESTED_RANGE_NOT_SATISFIABLE: - # return - if r.status_code not in ok_codes: - raise RequestError(r.status_code, r.text) + while True: + ok_codes = [http.PARTIAL_CONTENT] + end = offset + length - 1 + logger.debug('chunk o %d l %d' % (offset, length)) - return r + try: + r = self.BOReq.get(self.content_url + 'nodes/' + node_id + '/content', + acc_codes=ok_codes, stream=True, + headers={'Range': 'bytes=%d-%d' % (offset, end)}, **kwargs) + except RequestError as e: + if e.status_code == RequestError.CODE.CONN_EXCEPTION: continue + raise + # if r.status_code == http.REQUESTED_RANGE_NOT_SATISFIABLE: + # return + if r.status_code in RETRY_CODES: continue # the fault lies not in our stars, but in amazon + if r.status_code not in ok_codes: + raise RequestError(r.status_code, r.text) + return r - def download_chunk(self, node_id: str, offset: int, length: int, **kwargs) -> bytearray: + def download_chunk(self, node_id: str, offset: int, length: int, **kwargs) -> bytes: """Load a file chunk into memory. :param length: the length of the download chunk""" @@ -385,7 +432,7 @@ def download_chunk(self, node_id: str, offset: int, length: int, **kwargs) -> by buffer.extend(chunk) finally: r.close() - return buffer + return bytes(buffer) def download_thumbnail(self, node_id: str, file_name: str, max_dim=128): """Download a movie's or picture's thumbnail into a file. diff --git a/acdcli/api/metadata.py b/acdcli/api/metadata.py index 494d6fc..c5edcd3 100644 --- a/acdcli/api/metadata.py +++ b/acdcli/api/metadata.py @@ -1,8 +1,8 @@ """Node metadata operations""" +import http.client import json import logging -import http.client import tempfile from collections import namedtuple @@ -53,7 +53,7 @@ def get_changes(self, checkpoint='', include_purged=False, silent=True, file=Non if file: tmp = open(file, 'w+b') else: - tmp = tempfile.TemporaryFile('w+b') + tmp = tempfile.SpooledTemporaryFile(max_size=1e9, mode='w+b') try: for line in r.iter_lines(chunk_size=10 * 1024 ** 2, decode_unicode=False): if line: @@ -153,11 +153,13 @@ def get_metadata(self, node_id: str, assets=False, temp_link=True) -> dict: # this will increment the node's version attribute def update_metadata(self, node_id: str, properties: dict) -> dict: """Update a node's properties like name, description, status, parents, ...""" - body = json.dumps(properties) - r = self.BOReq.patch(self.metadata_url + 'nodes/' + node_id, data=body) - if r.status_code not in OK_CODES: - raise RequestError(r.status_code, r.text) - return r.json() + while True: + body = json.dumps(properties) + r = self.BOReq.patch(self.metadata_url + 'nodes/' + node_id, data=body) + if r.status_code in RETRY_CODES: continue # the fault lies not in our stars, but in amazon + if r.status_code not in OK_CODES: + raise RequestError(r.status_code, r.text) + return r.json() def get_root_node(self) -> dict: """Gets the root node metadata""" @@ -225,11 +227,22 @@ def move_node_from(self, node_id: str, old_parent_id: str, new_parent_id: str) - return r.json() def move_node(self, node_id: str, parent_id: str) -> dict: - return self.update_metadata(node_id, {'parents': [parent_id]}) + properties = {'parents': [parent_id]} + logger.debug('move_node: node_id: %s parents: %s' % (node_id, str([parent_id]))) + while True: + ret = self.update_metadata(node_id, properties) + logger.debug('move_node: metadata: %s' % str(ret)) + if ret['parents'] == [parent_id]: break + return ret def rename_node(self, node_id: str, new_name: str) -> dict: properties = {'name': new_name} - return self.update_metadata(node_id, properties) + logger.debug('rename_node: node_id: %s new_name: %s' % (node_id, new_name)) + while True: + ret = self.update_metadata(node_id, properties) + logger.debug('rename_node: metadata: %s' % str(ret)) + if ret['name'] == new_name: break + return ret def set_available(self, node_id: str) -> dict: """Sets node status from 'PENDING' to 'AVAILABLE'.""" @@ -260,13 +273,15 @@ def add_property(self, node_id: str, owner_id: str, key: str, value: str) -> dic :returns dict: {'key': '', 'location': '/properties/', 'value': ''}""" - ok_codes = [requests.codes.CREATED] - r = self.BOReq.put(self.metadata_url + 'nodes/' + node_id + - '/properties/' + owner_id + '/' + key, - data=json.dumps({'value': value}), acc_codes=ok_codes) - if r.status_code not in ok_codes: - raise RequestError(r.status_code, r.text) - return r.json() + while True: + ok_codes = [requests.codes.CREATED] + r = self.BOReq.put(self.metadata_url + 'nodes/' + node_id + + '/properties/' + owner_id + '/' + key, + data=json.dumps({'value': value}), acc_codes=ok_codes) + if r.status_code in RETRY_CODES: continue # the fault lies not in our stars, but in amazon + if r.status_code not in ok_codes: + raise RequestError(r.status_code, r.text) + return r.json() def delete_property(self, node_id: str, owner_id: str, key: str): """Deletes *key* property from node with ID *node_id*.""" diff --git a/acdcli/api/trash.py b/acdcli/api/trash.py index 36c7186..904f203 100644 --- a/acdcli/api/trash.py +++ b/acdcli/api/trash.py @@ -12,10 +12,12 @@ def list_trash(self) -> list: return self.BOReq.paginated_get(self.metadata_url + 'trash') def move_to_trash(self, node_id: str) -> dict: - r = self.BOReq.put(self.metadata_url + 'trash/' + node_id) - if r.status_code not in OK_CODES: - raise RequestError(r.status_code, r.text) - return r.json() + while True: + r = self.BOReq.put(self.metadata_url + 'trash/' + node_id) + if r.status_code in RETRY_CODES: continue # the fault lies not in our stars, but in amazon + if r.status_code not in OK_CODES: + raise RequestError(r.status_code, r.text) + return r.json() def restore(self, node_id: str) -> dict: r = self.BOReq.post(self.metadata_url + 'trash/' + node_id + '/restore') diff --git a/acdcli/cache/db.py b/acdcli/cache/db.py index efdac5e..d72dbae 100644 --- a/acdcli/cache/db.py +++ b/acdcli/cache/db.py @@ -4,7 +4,7 @@ import re import sqlite3 import sys -from threading import local +from threading import local, RLock from acdcli.utils.conf import get_conf @@ -25,7 +25,11 @@ _def_conf['sqlite'] = dict(filename='nodes.db', busy_timeout=30000, journal_mode='wal') _def_conf['blacklist'] = dict(folders=[]) - +class CacheConsts(object): + CHECKPOINT_KEY = 'checkpoint' + LAST_SYNC_KEY = 'last_sync' + OWNER_ID = 'owner_id' + MAX_AGE = 30 class IntegrityError(Exception): def __init__(self, msg): @@ -65,6 +69,12 @@ def __init__(self, cache_path: str='', settings_path='', check=IntegrityCheckTyp self._conn.create_function('REGEXP', _regex_match.__code__.co_argcount, _regex_match) + self.node_id_to_node_cache = {} + self.path_to_node_id_cache = {} + self.node_cache_lock = RLock() + """There are a huge number of repeated path lookups, + so cache results and selectively invalidate.""" + with cursor(self._conn) as c: c.execute(_ROOT_ID_SQL) row = c.fetchone() diff --git a/acdcli/cache/query.py b/acdcli/cache/query.py index 469b953..b7c66fe 100644 --- a/acdcli/cache/query.py +++ b/acdcli/cache/query.py @@ -1,4 +1,5 @@ import logging +import os from datetime import datetime from .cursors import cursor @@ -25,6 +26,12 @@ def datetime_from_string(dt: str) -> datetime: WHERE p.parent = (?) ORDER BY n.name""" +PARENTS_SQL = """SELECT n.*, f.* FROM nodes n + JOIN parentage p ON n.id = p.parent + LEFT OUTER JOIN files f ON n.id = f.id + WHERE p.child = (?) + ORDER BY n.name""" + CHILDRENS_NAMES_SQL = """SELECT n.name FROM nodes n JOIN parentage p ON n.id = p.child WHERE p.parent = (?) AND n.status == 'AVAILABLE' @@ -51,6 +58,11 @@ def datetime_from_string(dt: str) -> datetime: NODE_BY_ID_SQL = """SELECT n.*, f.* FROM nodes n LEFT OUTER JOIN files f ON n.id = f.id WHERE n.id = (?)""" +PROPERTY_BY_ID_SQL = """SELECT * FROM properties WHERE id=? AND owner=? AND key=?""" + +CONTENT_BY_ID_SQL = """SELECT * FROM content WHERE id=? AND version=?""" +CONTENT_ACCESSED_SQL = """UPDATE content SET accessed=? WHERE id=?""" + USAGE_SQL = 'SELECT SUM(size) FROM files' FIND_BY_NAME_SQL = """SELECT n.*, f.* FROM nodes n @@ -98,6 +110,10 @@ def __init__(self, row): self.size = row['size'] except IndexError: self.size = 0 + try: + self.version = row['version'] + except IndexError: + self.version = 0 def __lt__(self, other): return self.name < other.name @@ -126,11 +142,15 @@ def is_trashed(self): @property def created(self): - return datetime_from_string(self.cre) + if isinstance(self.cre, str): + self.cre = datetime_from_string(self.cre) + return self.cre @property def modified(self): - return datetime_from_string(self.mod) + if isinstance(self.mod, str): + self.mod = datetime_from_string(self.mod) + return self.mod @property def simple_name(self): @@ -141,11 +161,20 @@ def simple_name(self): class QueryMixin(object): def get_node(self, id) -> 'Union[Node|None]': + with self.node_cache_lock: + try: + return self.node_id_to_node_cache[id] + except: + pass with cursor(self._conn) as c: c.execute(NODE_BY_ID_SQL, [id]) r = c.fetchone() if r: - return Node(r) + n = Node(r) + if n.is_available: + with self.node_cache_lock: + self.node_id_to_node_cache[n.id] = n + return n def get_root_node(self): return self.get_node(self.root_id) @@ -158,41 +187,51 @@ def get_conflicting_node(self, name: str, parent_id: str): if r: return Node(r) + def resolve_id(self, path: str, trash=False) -> 'Union[str|None]': + n = self.resolve(path, trash) + if n: + return n.id + def resolve(self, path: str, trash=False) -> 'Union[Node|None]': - segments = list(filter(bool, path.split('/'))) - if not segments: - if not self.root_id: - return - with cursor(self._conn) as c: - c.execute(NODE_BY_ID_SQL, [self.root_id]) - r = c.fetchone() - return Node(r) + path = path.rstrip('/') + with self.node_cache_lock: + try: + return self.get_node(self.path_to_node_id_cache[path]) + except: + pass + + parent_path, name = os.path.split(path) + if not name: + r = self.get_root_node() + with self.node_cache_lock: + self.node_id_to_node_cache[r.id] = r + self.path_to_node_id_cache[path] = r.id + return r + + parent = self.resolve(parent_path, trash=trash) + if not parent: + return - parent = self.root_id - for i, segment in enumerate(segments): - with cursor(self._conn) as c: - c.execute(CHILD_OF_SQL, [segment, parent]) - r = c.fetchone() - r2 = c.fetchone() + with cursor(self._conn) as c: + c.execute(CHILD_OF_SQL, [name, parent.id]) + r = c.fetchone() + r2 = c.fetchone() + if not r: + return + r = Node(r) - if not r: + if not r.is_available: + if not trash: return - r = Node(r) - - if not r.is_available: - if not trash: - return - if r2: - logger.debug('None-unique trash name "%s" in %s.' % (segment, parent)) - return - if i + 1 == len(segments): - return r - if r.is_folder: - parent = r.id - continue - else: + if r2: + logger.debug('None-unique trash name "%s" in %s.' % (name, parent)) return + with self.node_cache_lock: + self.node_id_to_node_cache[r.id] = r + self.path_to_node_id_cache[path] = r.id + return r + def childrens_names(self, folder_id) -> 'List[str]': with cursor(self._conn) as c: c.execute(CHILDRENS_NAMES_SQL, [folder_id]) @@ -248,7 +287,7 @@ def get_child(self, folder_id, child_name) -> 'Union[Node|None]': if r.is_available: return r - def list_children(self, folder_id, trash=False) -> 'Tuple[List[Node], List[Node]]': + def list_children(self, folder_id, trash=False, folder_path=None) -> 'Tuple[List[Node], List[Node]]': files = [] folders = [] @@ -264,6 +303,17 @@ def list_children(self, folder_id, trash=False) -> 'Tuple[List[Node], List[Node] folders.append(node) node = c.fetchone() + """If the caller provides the folder_path, we can add all the children to the + path->node_id cache for faster lookup after a directory listing""" + if folder_path: + folder_path = folder_path.rstrip('/') + with self.node_cache_lock: + for c in folders + files: + if c.is_available: + self.node_id_to_node_cache[c.id] = c + if folder_path: + self.path_to_node_id_cache[folder_path + '/' + c.name] = c.id + return folders, files def list_trashed_children(self, folder_id) -> 'Tuple[List[Node], List[Node]]': @@ -283,6 +333,29 @@ def first_path(self, node_id: str) -> str: return node.simple_name return self.first_path(node.id) + node.name + '/' + def all_path(self, node_id: str, path_suffix=None) -> 'List[str]': + if node_id == self.root_id: + return ['/' + path_suffix] + + n = self.get_node(node_id) + if not n: + return [] + if path_suffix: + path_suffix = os.path.join(n.name, path_suffix) + else: + path_suffix = n.name + + ret = [] + with cursor(self._conn) as c: + c.execute(PARENTS_SQL, [n.id]) + parent = c.fetchone() + while parent: + parent = Node(parent) + if parent.is_available: + ret += self.all_path(parent.id, path_suffix) + parent = c.fetchone() + return ret + def find_by_name(self, name: str) -> 'List[Node]': nodes = [] with cursor(self._conn) as c: @@ -319,3 +392,21 @@ def file_size_exists(self, size) -> bool: no = c.fetchone()[0] return bool(no) + + def get_property(self, node_id, owner_id, key) -> 'Union[str|None]': + with cursor(self._conn) as c: + c.execute(PROPERTY_BY_ID_SQL, [node_id, owner_id, key]) + r = c.fetchone() + if r: + return r['value'] + return None + + def get_content(self, node_id:str, version:int) -> 'Union[bytes|None]': + if version == 0: return None + with cursor(self._conn) as c: + # Uncomment if/when we want to purge the cache based on LRU. Until then reduce the db load. + # c.execute(CONTENT_ACCESSED_SQL, [datetime.utcnow(), node_id]) + c.execute(CONTENT_BY_ID_SQL, [node_id, version]) + r = c.fetchone() + if r: + return r['value'] diff --git a/acdcli/cache/schema.py b/acdcli/cache/schema.py index 92a5512..6c278c1 100755 --- a/acdcli/cache/schema.py +++ b/acdcli/cache/schema.py @@ -28,6 +28,15 @@ CHECK (status IN ('AVAILABLE', 'TRASH', 'PURGED', 'PENDING')) ); + CREATE TABLE properties ( + id VARCHAR(50) NOT NULL, + owner TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT, + PRIMARY KEY (id, owner, key), + FOREIGN KEY(id) REFERENCES nodes (id) + ); + CREATE TABLE labels ( id VARCHAR(50) NOT NULL, name VARCHAR(256) NOT NULL, @@ -39,6 +48,7 @@ id VARCHAR(50) NOT NULL, md5 VARCHAR(32), size BIGINT, + version BIGINT, PRIMARY KEY (id), UNIQUE (id), FOREIGN KEY(id) REFERENCES nodes (id) @@ -48,13 +58,27 @@ parent VARCHAR(50) NOT NULL, child VARCHAR(50) NOT NULL, PRIMARY KEY (parent, child), - FOREIGN KEY(parent) REFERENCES folders (id), + FOREIGN KEY(parent) REFERENCES nodes (id), FOREIGN KEY(child) REFERENCES nodes (id) ); + CREATE TABLE content ( + id VARCHAR(50) NOT NULL, + value BLOB, + size BIGINT, + version BIGINT, + accessed DATETIME, + PRIMARY KEY (id), + UNIQUE (id), + FOREIGN KEY(id) REFERENCES nodes (id) + ); + + CREATE INDEX ix_content_size ON content(size); + CREATE INDEX ix_content_accessed ON content(accessed); CREATE INDEX ix_parentage_child ON parentage(child); + CREATE INDEX ix_parentage_parent ON parentage(parent); CREATE INDEX ix_nodes_names ON nodes(name); - PRAGMA user_version = 3; + PRAGMA user_version = 4; """ _GEN_DROP_TABLES_SQL = \ @@ -101,8 +125,35 @@ def _2_to_3(conn): conn.commit() +@_migration +def _3_to_4(conn): + conn.executescript( + # For people upgrading from the main branch to PR374, this line should make the db queries work. + # The user would also need to old-sync if they had multiple databases *and* were all ready using + # properties in some of them. It's not clear how to do that from here aside from dropping all data. + 'CREATE TABLE IF NOT EXISTS properties (id VARCHAR(50) NOT NULL, owner TEXT NOT NULL, ' + 'key TEXT NOT NULL, value TEXT, PRIMARY KEY (id, owner, key), FOREIGN KEY(id) REFERENCES nodes (id));' + + 'ALTER TABLE files ADD version BIGINT;' + 'DROP TABLE IF EXISTS content;' + 'CREATE TABLE content (id VARCHAR(50) NOT NULL, value BLOB, size BIGINT, version BIGINT, accessed DATETIME,' + 'PRIMARY KEY (id), UNIQUE (id), FOREIGN KEY(id) REFERENCES nodes (id)); ' + + 'CREATE INDEX IF NOT EXISTS ix_content_size ON content(size);' + 'CREATE INDEX IF NOT EXISTS ix_content_accessed ON content(accessed);' + 'CREATE INDEX IF NOT EXISTS ix_parentage_parent ON parentage(parent);' + + # Having changed the schema, the queries can be optimised differently. + # In order to be aware of that, re-analyze the type of data and indexes, + # allowing SQLite3 to make better decisions. + 'ANALYZE;' + 'PRAGMA user_version = 4;' + ) + conn.commit() + + class SchemaMixin(object): - _DB_SCHEMA_VER = 3 + _DB_SCHEMA_VER = 4 def init(self): try: diff --git a/acdcli/cache/sync.py b/acdcli/cache/sync.py index d6dbe80..c9cfae1 100644 --- a/acdcli/cache/sync.py +++ b/acdcli/cache/sync.py @@ -5,6 +5,8 @@ import logging from datetime import datetime from itertools import islice + +from acdcli.cache.query import Node from .cursors import mod_cursor import dateutil.parser as iso_date @@ -40,14 +42,30 @@ def remove_purged(self, purged: list): with mod_cursor(self._conn) as c: c.execute('DELETE FROM nodes WHERE id IN %s' % placeholders(slice_), slice_) c.execute('DELETE FROM files WHERE id IN %s' % placeholders(slice_), slice_) + c.execute('DELETE FROM content WHERE id IN %s' % placeholders(slice_), slice_) c.execute('DELETE FROM parentage WHERE parent IN %s' % placeholders(slice_), slice_) c.execute('DELETE FROM parentage WHERE child IN %s' % placeholders(slice_), slice_) + c.execute('DELETE FROM properties WHERE id IN %s' % placeholders(slice_), slice_) c.execute('DELETE FROM labels WHERE id IN %s' % placeholders(slice_), slice_) logger.info('Purged %i node(s).' % len(purged)) - def insert_nodes(self, nodes: list, partial=True): + def resolve_cache_add(self, path:str, node_id:str): + with self.node_cache_lock: + self.path_to_node_id_cache[path] = node_id + + def resolve_cache_del(self, path:str): + with self.node_cache_lock: + try: del self.path_to_node_id_cache[path] + except:pass + + def insert_nodes(self, nodes: list, partial:bool=True, flush_resolve_cache:bool=False): """Inserts mixed list of files and folders into cache.""" + + if flush_resolve_cache: + with self.node_cache_lock: + self.path_to_node_id_cache.clear() + files = [] folders = [] for node in nodes: @@ -72,12 +90,13 @@ def insert_nodes(self, nodes: list, partial=True): self.insert_files(files) self.insert_parentage(files + folders, partial) + self.insert_properties(files + folders) - def insert_node(self, node: dict): + def insert_node(self, node:dict, flush_resolve_cache:bool=False): """Inserts single file or folder into cache.""" if not node: return - self.insert_nodes([node]) + self.insert_nodes([node], flush_resolve_cache=flush_resolve_cache) def insert_folders(self, folders: list): """ Inserts list of folders into cache. Sets 'update' column to current date. @@ -89,14 +108,34 @@ def insert_folders(self, folders: list): with mod_cursor(self._conn) as c: for f in folders: + n = Node(dict(id=f['id'], + type="folder", + name=f.get('name'), + description=f.get('description'), + created=iso_date.parse(f['createdDate']), + modified=iso_date.parse(f['modifiedDate']), + updated=datetime.utcnow(), + status=f['status'], + md5=None, + size=0, + version=0, + )) + + with self.node_cache_lock: + if n.is_available: + self.node_id_to_node_cache[n.id] = n + else: + try: del self.node_id_to_node_cache[n.id] + except: pass + c.execute( 'INSERT OR REPLACE INTO nodes ' '(id, type, name, description, created, modified, updated, status) ' - 'VALUES (?, "folder", ?, ?, ?, ?, ?, ?)', - [f['id'], f.get('name'), f.get('description'), - iso_date.parse(f['createdDate']), iso_date.parse(f['modifiedDate']), - datetime.utcnow(), - f['status'] + 'VALUES (?, ?, ?, ?, ?, ?, ?, ?)', + [n.id, n.type, n.name, n.description, + n.created, n.modified, + n.updated, + n.status ] ) @@ -108,22 +147,47 @@ def insert_files(self, files: list): with mod_cursor(self._conn) as c: for f in files: - c.execute('INSERT OR REPLACE INTO nodes ' - '(id, type, name, description, created, modified, updated, status)' - 'VALUES (?, "file", ?, ?, ?, ?, ?, ?)', - [f['id'], f.get('name'), f.get('description'), - iso_date.parse(f['createdDate']), iso_date.parse(f['modifiedDate']), - datetime.utcnow(), - f['status'] - ] - ) - c.execute('INSERT OR REPLACE INTO files (id, md5, size) VALUES (?, ?, ?)', - [f['id'], - f.get('contentProperties', {}).get('md5', - 'd41d8cd98f00b204e9800998ecf8427e'), - f.get('contentProperties', {}).get('size', 0) - ] - ) + n = Node(dict(id=f['id'], + type="file", + name=f.get('name'), + description=f.get('description'), + created=iso_date.parse(f['createdDate']), + modified=iso_date.parse(f['modifiedDate']), + updated=datetime.utcnow(), + status=f['status'], + md5=f.get('contentProperties', {}).get('md5', 'd41d8cd98f00b204e9800998ecf8427e'), + size=f.get('contentProperties', {}).get('size', 0), + version=f.get('contentProperties', {}).get('version', 0), + )) + + with self.node_cache_lock: + if n.is_available: + self.node_id_to_node_cache[n.id] = n + else: + try: del self.node_id_to_node_cache[n.id] + except: pass + + if not n.is_available: + self.remove_content(n.id) + + c.execute( + 'INSERT OR REPLACE INTO nodes ' + '(id, type, name, description, created, modified, updated, status) ' + 'VALUES (?, ?, ?, ?, ?, ?, ?, ?)', + [n.id, n.type, n.name, n.description, + n.created, n.modified, + n.updated, + n.status + ] + ) + c.execute( + 'INSERT OR REPLACE INTO files (id, md5, size, version) VALUES (?, ?, ?, ?)', + [n.id, + n.md5, + n.size, + n.version, + ] + ) logger.info('Inserted/updated %d file(s).' % len(files)) @@ -143,3 +207,44 @@ def insert_parentage(self, nodes: list, partial=True): c.execute('INSERT OR IGNORE INTO parentage VALUES (?, ?)', [p, n['id']]) logger.info('Parented %d node(s).' % len(nodes)) + + def insert_properties(self, nodes: list): + if not nodes: + return + + with mod_cursor(self._conn) as c: + for n in nodes: + if 'properties' not in n: + continue + id = n['id'] + for owner_id, key_value in n['properties'].items(): + for key, value in key_value.items(): + c.execute('INSERT OR REPLACE INTO properties ' + '(id, owner, key, value) ' + 'VALUES (?, ?, ?, ?)', + [id, owner_id, key, value] + ) + + logger.info('Applied properties to %d node(s).' % len(nodes)) + + def insert_property(self, node_id, owner_id, key, value): + with mod_cursor(self._conn) as c: + c.execute('INSERT OR REPLACE INTO properties ' + '(id, owner, key, value) ' + 'VALUES (?, ?, ?, ?)', + [node_id, owner_id, key, value] + ) + + def insert_content(self, node_id:str, version:int, value:bytes): + with mod_cursor(self._conn) as c: + c.execute('INSERT OR REPLACE INTO content ' + '(id, value, size, version, accessed) ' + 'VALUES (?, ?, ?, ?, ?)', + [node_id, value, len(value), version, datetime.utcnow()] + ) + + def remove_content(self, node_id:str): + with mod_cursor(self._conn) as c: + c.execute('DELETE FROM content WHERE id=?', + [node_id] + ) diff --git a/docs/configuration.rst b/docs/configuration.rst index 4d8658b..1f8540b 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -81,7 +81,7 @@ fuse.ini :: [fs] ;block size used for size info - block_size = 512 + block_size = io.DEFAULT_BUFFER_SIZE [read] ;maximal number of simultaneously opened chunks per file diff --git a/docs/contributors.rst b/docs/contributors.rst index 89cd829..8f05437 100644 --- a/docs/contributors.rst +++ b/docs/contributors.rst @@ -23,6 +23,8 @@ Thanks to - `memoz `_ for amending proxy documentation +- `bgemmill `_ for fuse write-back caching, xattrs, symlinks, and rsync support + - `gerph `_ for making file searches faster, particularly on large repositories Also thanks to