import click CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) def enable_debug(): import logging logger = logging.getLogger('peewee') logger.setLevel(logging.DEBUG) logger.addHandler(logging.StreamHandler()) @click.group(context_settings = CONTEXT_SETTINGS) @click.option('--debug', '-d', is_flag=True, default=False) def cli(debug): if debug: enable_debug() @cli.group() def db(): """Database Management""" pass @db.command() def init(): """Initialize the database, if not done already.""" from .model import create_tables, db from .prefixes import register_prefixes create_tables() with db.atomic(): register_prefixes() @db.command() @click.confirmation_option(prompt="Are you sure you want to drop the database?") def drop(): """Completely drop all tables.""" from .model import drop_tables drop_tables() @cli.group() def tag(): """Handling of tags""" pass @cli.group() def prefix(): """Handling of prefixes of tags""" pass @tag.command('list') @click.argument('pattern', required = False) def list_tags(pattern): from .model import Tag print("Tags") print("====") print() query = Tag.select().where(~Tag.default) if pattern: query = query.where(Tag.name.contains(pattern) | Tag.prefix.contains(pattern)) for t in query: descr = t.description or '' if descr: descr = '-- ' + descr print(' *', t, descr) @prefix.command('list') def list_prefixes(): from .model import Prefix print("Prefixes") print("========") print() for p in Prefix.select(): print(" * %s" % p) @prefix.command('add') @click.argument('name') @click.argument('description', required = False) def add_prefix(name, description): from .model import Prefix prefix, created = Prefix.create_or_get(name = name, description = description) if not created: print("Prefix already existed:", prefix) def prefix_tag_name(name, prefix=None): if prefix: return "%s:%s" % (prefix, name) else: return name class PrefixTag: def __init__(self, tag, prefix = None): self.tag = tag self.prefix = prefix def __str__(self): if self.prefix: return '%s:%s' % (self.prefix, self.tag) else: return self.tag def __iter__(self): yield self.prefix yield self.tag def prefixed_name(self): return prefix_tag_name(self.tag, self.prefix) def __str__(self): return self.prefixed_name() class PrefixTagType(click.ParamType): name = 'prefixed tag' def convert(self, value, param, ctx): try: prefix, tag = value.split(':', 1) except ValueError: return PrefixTag(value) return PrefixTag(tag, prefix) TAG = PrefixTagType() @prefix.command('edit') @click.argument('name') @click.argument('implies', type = TAG, nargs = -1) @click.option('--description') def edit_prefix(name, implies, description): from .model import Prefix, db try: prefix = Prefix.get(name = name) except Prefix.DoesNotExist: raise click.UsageError("Prefix '%s' does not exist." % name) with db.atomic(): add_implications(prefix.default_tag, implies) if description: prefix.description = description prefix.save() def create_tag(name, description=None): from .model import Tag, Prefix, db with db.atomic(): if name.prefix: prefix, created = Prefix.get_or_create(name = name.prefix) if not created and prefix.virtual: raise click.UsageError("Prefix '%s' is not allowed to carry additional tags." % name.prefix) return Tag.create_or_get(name = name.tag, prefix = name.prefix, description = description) def fetch_tags(tag_list, ignore_missing=False): if not tag_list: return [] from .model import Tag from .peewee_ext import sqlite_tuple_in fetched_tags = Tag.select().where(sqlite_tuple_in((Tag.prefix, Tag.name), tag_list)).execute() if len(fetched_tags) < len(tag_list): print("Some tags were not present:", ', '.join(set(map(str, tag_list)).difference(map(str, fetched_tags)))) if not ignore_missing: raise click.ClickException("Not all tags present") return fetched_tags @tag.command('add') @click.argument('name', type = TAG) @click.argument('description', required = False) def add_tag(name, description): created, tag = create_tag(name, description) if not created: print("Tag already existed:", tag) def add_implications(tag, implies): from .model import TagImplications from peewee import IntegrityError for i in fetch_tags(implies): try: TagImplications.create(tag = tag, implies_tag = i) except IntegrityError: print("Implication onto '%s' already existing. Skipping." % i) @tag.command('edit') @click.argument('name', type = TAG) @click.argument('implies', type = TAG, nargs = -1) @click.option('--description') def edit_tag(name, implies, description): from .model import Tag, db try: tag = Tag.get(name = name.tag, prefix = name.prefix) except Tag.DoesNotExist: raise click.UsageError("Tag '%s' does not exist." % name) with db.atomic(): add_implications(tag, implies) if description: tag.description = description tag.save() @cli.group() def doc(): """Document handling""" pass @doc.command('add') @click.argument('file', type=click.File(mode = 'rb')) @click.argument('tags', type = TAG, nargs = -1) @click.option('--create-tags', '-c', is_flag = True) @click.option('--ignore-missing-tags', '-i', is_flag = True) def add_doc(file, tags, create_tags, ignore_missing_tags): """Add a new document together with the given tags.""" from .model import Document, Tag, db, DocumentTag import magic with db.atomic(): if tags: if create_tags: tags = [create_tag(tag)[0] for tag in tags] else: tags = fetch_Tags(tags, ignore_missing_tags) mimetype = magic.from_file(file.name, mime=True) doc = Document.create(content = file.read(), file_type = mimetype, original_path = file.name, direction = Document.Direction.IN) for t in tags: DocumentTag.create(document = doc, tag = t)