Skip to content

Commit

Permalink
Merge pull request #27 from beebeeep/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
beebeeep authored Sep 14, 2017
2 parents 9ec7061 + 00f8ef3 commit 84414a7
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 34 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
## [Version 0.7.18](https://github.com/beebeeep/cacus/tree/v0.7.17) (2017-09-04)
## [Version 0.7.19](https://github.com/beebeeep/cacus/tree/v0.7.19) (2017-09-14)
* Added compatibility with Azure CosmosDB (mongodb API)
* Added support of locks in Consul

## [Version 0.7.18](https://github.com/beebeeep/cacus/tree/v0.7.18) (2017-09-04)
* Added optional file name for api/v1/package/upload
* Fixed uploading to non-existent components

Expand Down
132 changes: 118 additions & 14 deletions cacus/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
# -*- coding: utf-8 -*-
import os
import sys
import imp
import time
import yaml
import uuid
import gnupg
import base64
import socket
import pymongo
import hashlib
import requests
Expand All @@ -25,6 +27,8 @@

import plugin

consul = None # optional module


class _ColorFormatter(logging.Formatter):

Expand Down Expand Up @@ -142,6 +146,21 @@ def filter(self, record):

self.log = logging.getLogger('cacus.{}'.format(type(self).__name__))

lock_method = self.config['lock']['method']
if lock_method == 'mongo':
pass
elif lock_method == 'consul':
try:
global consul
module_info = imp.find_module('consul')
consul = imp.load_module('consul', *module_info)
except ImportError:
self.log.critical("Cannot find 'consul' module. Check if 'python-consul' module is installed")
sys.exit(1)
else:
self.log.critical("Unkonwn lock method '%s'", lock_method)
sys.exit(1)

# GPG
try:
self.gpg = gnupg.GPG(homedir=self.config['gpg']['home'])
Expand All @@ -156,7 +175,9 @@ def filter(self, record):
# mongo
if not mongo:
self.config['db']['connect'] = False
implementation = self.config['db'].pop('implementation', 'mongo')
self.db = pymongo.MongoClient(**(self.config['db']))
self.db.implementation = implementation
else:
self.db = mongo

Expand All @@ -180,7 +201,18 @@ def load_config(config_file):
config = yaml.load(cfg)
return config

def lock(self, *args, **kwargs):
lock_method = self.config['lock']['method']
if lock_method == 'mongo':
return _MongoLock(self.db, *args, **kwargs)
elif lock_method == 'consul':
return _ConsulLock(self.db, self.config['lock'], *args, **kwargs)

def create_cacus_indexes(self):
if self.db.implementation == 'cosmos':
self.log.warning("Using Azure CosmosDB, skipping index creation")
return

self.log.info("Creating indexes for cacus.distros...")
self.db.cacus.distros.create_index('distro', unique=True)
self.db.cacus.distros.create_index('snapshot')
Expand All @@ -204,14 +236,18 @@ def create_cacus_indexes(self):
('distro', pymongo.DESCENDING),
('comp', pymongo.DESCENDING)],
unique=True)
self.db.cacus.locks.create_index('modified', expireAfterSeconds=self.config['lock_cleanup_timeout'])
self.db.cacus.locks.create_index('modified', expireAfterSeconds=self.config['lock']['ttl'])

self.log.info("Creating indexes for cacus.access_tokens...")
self.db.cacus.access_tokens.create_index('jti', unique=True)
# remove expired tokens from DB
self.db.cacus.access_tokens.create_index('exp', expireAfterSeconds=300)

def create_packages_indexes(self, distros=None):
if self.db.implementation == 'cosmos':
self.log.warning("Using Azure CosmosDB, skipping index creation")
return

if not distros:
distros = self.db.packages.collection_names()

Expand Down Expand Up @@ -391,10 +427,8 @@ def write(self, data):
raise IOError("Client has closed connection")


class DistroLock(object):
""" Poor man's implementation of distributed lock in mongodb.
Ostrich algorithm used for dealing with deadlocks. You can always add some retries if returning 409 is not an option
"""
class _DistroLock(object):
""" TODO move to plugins mb? """

def __init__(self, db, distro, comps=None, timeout=30, already_locked=False):
self.db = db
Expand All @@ -406,7 +440,29 @@ def __init__(self, db, distro, comps=None, timeout=30, already_locked=False):
else:
self.comps = comps
self.timeout = timeout
self.log = logging.getLogger("cacus.RepoLock")
self.log = logging.getLogger("cacus.{}".format(type(self).__name__))

def _unlock(self, comps):
raise NotImplementedError()

def _lock(self):
raise NotImplementedError()

def __enter__(self):
if self.already_locked:
self.log.debug("%s/%s is already locked", self.distro, self.comps)
return
self._lock()

def __exit__(self, exc_type, exc_value, traceback):
if not self.already_locked:
self._unlock(self.comps)


class _MongoLock(_DistroLock):
""" Poor man's implementation of distributed lock in mongodb.
Ostrich algorithm used for dealing with deadlocks. You can always add some retries if returning 409 is not an option
"""

def _unlock(self, comps):
for comp in comps:
Expand All @@ -424,11 +480,7 @@ def _unlock(self, comps):
except:
self.log.error("Error while unlocking %s/%s: %s", self.distro, comp, sys.exc_info())

def __enter__(self):
if self.already_locked:
self.log.debug("%s/%s is already locked", self.distro, self.comps)
return

def _lock(self):
self.log.debug("Trying to lock %s/%s", self.distro, self.comps)
while True:
locked = []
Expand Down Expand Up @@ -458,9 +510,61 @@ def __enter__(self):
else:
break # good, we just locked all comps

def __exit__(self, exc_type, exc_value, traceback):
if not self.already_locked:
self._unlock(self.comps)

class _ConsulLock(_DistroLock):

def __init__(self, db, settings, *args, **kwargs):
self.settings = settings
self.consul = consul.Consul(**settings.get('connection', {}))
super(_ConsulLock, self).__init__(db, *args, **kwargs)

def _unlock(self, comps):
for comp in comps:
try:
result = self.consul.kv.put('locks/{}/{}'.format(self.distro, comp), None, release=self.session)
if result:
self.log.debug("%s/%s unlocked", self.distro, comp)
else:
self.log.warning("Cannot unlock %s/%s", self.distro, comp)
except:
self.log.error("Error while unlocking %s/%s: %s", self.distro, comp, sys.exc_info())

try:
self.consul.session.destroy(self.session)
self.log.debug("Destroyed consul session %s", self.session)
except:
self.log.error("Error while destrying consul session: %s", sys.exc_info())

def _lock(self):
try:
self.session = self.consul.session.create(name=socket.getfqdn(), ttl=self.settings['ttl'])
self.log.debug("Created consul session %s", self.session)
while True:
locked = []
for comp in self.comps:
try:
result = self.consul.kv.put('locks/{}/{}'.format(self.distro, comp), 'YOU SHALL NOT PASS', acquire=self.session)
if result:
self.log.debug("%s/%s locked", self.distro, comp)
locked.append(comp)
else:
self.log.debug("Failed to lock %s/%s, waiting...", self.distro, comp)
self._unlock(locked)
time.sleep(1)
self.timeout -= 1
if self.timeout > 0:
break # try to lock all comps once again
else:
raise DistroLockTimeout("Timeout while trying to lock distro {0}/{1}".format(self.distro, comp))
except:
self.log.error("Error while locking %s/%s: %s", self.distro, comp, sys.exc_info())
self._unlock(locked)
raise FatalError("Error while locking {}/{}: {}", self.distro, comp, sys.exc_info())
else:
break # good, we just locked all comps
except:
self.log.error("Error while creating session: %s", sys.exc_info())
raise FatalError("Error while creating consul session: {}", sys.exc_info())


def _setup_log_handlers(config):
Expand Down
3 changes: 1 addition & 2 deletions cacus/distro_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from debian import deb822

import repo_manage
import common


class DistroImporter(repo_manage.RepoManager):
Expand Down Expand Up @@ -54,7 +53,7 @@ def import_distro(self, base_url, distro, components=None, arches=None):
release = deb822.Release(f)

# since we don't know list of components in distro, lock distro on some fake component name
with common.DistroLock(self.db, distro, ['__cacusimport']):
with self.lock(self.db, distro, ['__cacusimport']):
# remove all packages imported - we will recreate distro collection from scratch
# note that this does not affect APT API of distro - indices are still in place i
# and will be updated once we finish import
Expand Down
29 changes: 17 additions & 12 deletions cacus/repo_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,23 @@ def create_distro(self, distro, description, components, simple, strict=None, q

to_delete = set(old_components) - set(components)
if to_delete:
with common.DistroLock(self.db, distro, to_delete):
with self.lock(distro, to_delete):
for deleted in to_delete:
self._delete_component(distro, deleted)

# even empty distro deserves to have proper Release file and Package&Sources indices
self.update_distro_metadata(distro)
self.create_packages_indexes(distros=[distro])

# even empty distro deserves to have proper Release file and Package&Sources indices
try:
with self.lock(distro):
self.update_distro_metadata(distro)
except common.DistroLockTimeout as e:
raise common.TemporaryError(e.message)

return old_distro

def remove_distro(self, distro):
with common.DistroLock(self.db, distro):
with self.lock(distro):
# Transactions? Bitch, please!
self.log.info("Removing distro '%s'", distro)
result = self.db.cacus.distros.delete_one({'distro': distro, 'snapshot': {'$exists': False}})
Expand Down Expand Up @@ -320,7 +325,7 @@ def upload_package(self, distro, comp, files, changes, skipUpdateMeta=False):
# critical section. updating meta DB
try:
# block whole distro since we will possibly update not only 'comp' component
with common.DistroLock(self.db, distro):
with self.lock(distro):
components_to_update = set([comp])
if src_pkg:
src = self.db.sources[distro].find_one_and_update(
Expand Down Expand Up @@ -440,8 +445,8 @@ def update_distro_metadata(self, distro, comps=None, arches=None):
if not comps:
comps = [x['component'] for x in self.db.cacus.components.find({'distro': distro})]
if not arches:
arches = self.db.cacus.repos.find({'distro': distro}).distinct('architecture')
arches.extend(self.default_arches)
arches = set(x['architecture'] for x in self.db.cacus.repos.find({'distro': distro}, {'architecture': 1}))
arches.update(self.default_arches)

if not comps or not arches:
raise common.NotFound("Distro {} is not found or empty".format(distro))
Expand Down Expand Up @@ -533,7 +538,7 @@ def remove_package(self, pkg=None, ver=None, arch=None, distro=None, comp=None,

affected_arches = []
try:
with common.DistroLock(self.db, distro, [comp], already_locked=locked):
with self.lock(distro, [comp], already_locked=locked):
if source_pkg:
# remove source package (if any) and all binary packages within it from component
result = self.db.sources[distro].find_one_and_update(
Expand Down Expand Up @@ -590,7 +595,7 @@ def purge_package(self, pkg=None, ver=None, arch=None, distro=None, skipUpdateM
affected_arches = set()
affected_comps = set()
try:
with common.DistroLock(self.db, distro, comps=None, already_locked=locked):
with self.lock(distro, comps=None, already_locked=locked):
result = self.db.sources[distro].find_one_and_delete({'Package': pkg, 'Version': ver})
if result:
# found source package matching query, remove this package, its non-deb files and all debs it consists of
Expand Down Expand Up @@ -634,7 +639,7 @@ def copy_package(self, pkg=None, ver=None, arch=None, distro=None, src=None, dst
raise common.NotFound("Component '{}' was not found in distro '{}'".format(dst, distro))

try:
with common.DistroLock(self.db, distro, [src, dst]):
with self.lock(distro, [src, dst]):
if source_pkg:
# move source package (if any) and all binary packages within it
result = self.db.sources[distro].find_one_and_update(
Expand Down Expand Up @@ -716,7 +721,7 @@ def delete_snapshot(self, distro, name):
raise common.NotFound("Snapshot '{}' does not exist".format(name))

try:
with common.DistroLock(self.db, distro):
with self.lock(distro):
# XXX: Packages and Sources indices are not being cleaned up here, source of garbage in storage:
self.db.cacus.components.remove({'snapshot': {'origin': distro, 'name': name}})
self.db.cacus.repos.remove({'snapshot': {'origin': distro, 'name': name}})
Expand Down Expand Up @@ -755,7 +760,7 @@ def create_snapshot(self, distro, name, from_snapshot=None, allow_update=True):
raise common.NotFound("Distro or snapshot '{}' not found".format(distro))

try:
with common.DistroLock(self.db, distro):
with self.lock(distro):
for component in self.db.cacus.components.find({'distro': distro}):
component['distro'] = snapshot_name
component['snapshot'] = snapshot_info
Expand Down
5 changes: 4 additions & 1 deletion contrib/cacus-default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ gpg:

retry_count: 3
retry_delays: [2, 5, 10, 30, 60, 90]
lock_cleanup_timeout: 3600

lock:
method: mongo
ttl: 600

# additional plugins
plugin_path:
Expand Down
7 changes: 7 additions & 0 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
cacus (0.7.19) unstable; urgency=medium

* Added compatibility with Azure CosmosDB (mongodb API)
* Added support of locks in Consul

-- Cacus maintainer entity <[email protected]> Thu, 14 Sep 2017 14:19:08 +0000

cacus (0.7.18) unstable; urgency=medium

* Added optional file name for api/v1/package/upload
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

setup(
name="cacus",
version="0.7.18",
version="0.7.19",
author="Danila Migalin",
author_email="[email protected]",
url="https://github.com/beebeeep/cacus",
Expand Down
Loading

0 comments on commit 84414a7

Please sign in to comment.