diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..dca6ea3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.pyc +*~ +*# diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..0f3d172 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,4 @@ +include COPYING +include MANIFEST.in +include README.md +include *.py diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..749adc1 --- /dev/null +++ b/Makefile @@ -0,0 +1,49 @@ +#! /usr/bin/make + +default: + python setup.py check build + +.PHONY: register dist inspect upload clean docs test + +register: + if [ ! -f ~/.pypirc ]; then \ + echo "Missing ~/.pypirc file"; \ + exit 1; \ + fi; \ + python setup.py register + +dist: + python setup.py sdist + +inspect: + python setup.py clean + rm -rf build/ + rm -rf dist/ + rm -rf *.egg/ + rm -rf *.egg-info/ + rm -f MANIFEST + python setup.py sdist + cd dist/ && tar xzvf *.tar.gz + +upload: + if [ ! -f ~/.pypirc ]; then \ + echo "Missing ~/.pypirc file"; \ + exit 1; \ + fi; \ + python setup.py sdist upload + +clean: + python setup.py clean + rm -rf build/ + rm -rf dist/ + rm -rf *.egg/ + rm -rf *.egg-info/ + rm -rf __pycache__/ + rm -f MANIFEST + rm -rf docs/_*/ + +docs: + cd docs && make html + +test: + nosetests -c tests/nose.cfg diff --git a/README.md b/README.md index fb7b5f4..526381d 100644 --- a/README.md +++ b/README.md @@ -211,9 +211,8 @@ Commands * glacier vault create vault-name * glacier vault sync [--wait] [--fix] [--max-age hours] vault-name * glacier archive list vault-name -* glacier archive upload [--name archive-name] vault-name filename -* glacier archive retrieve [--wait] [-o filename] [--multipart-size bytes] vault-name archive-name -* glacier archive retrieve [--wait] [--multipart-size bytes] vault-name archive-name [archive-name...] +* glacier archive upload [--encrypt] [--concurrent [--part-size size] [--num-threads count]] [--name archive-name] vault-name filename +* glacier archive retrieve [--wait] [--decrypt] [-o filename] [--part-size bytes] vault-name archive-name [archive-name...] * glacier archive delete vault-name archive-name * glacier job list diff --git a/glacier.py b/glacier.py index e13d702..69453fb 100755 --- a/glacier.py +++ b/glacier.py @@ -24,21 +24,29 @@ from __future__ import print_function from __future__ import unicode_literals +from functools import partial import argparse import calendar import errno import itertools +import logging import os import os.path import sys +import tempfile import time +from boto.glacier.concurrent import ConcurrentDownloader +from boto.glacier.concurrent import ConcurrentUploader +DEFAULT_NUM_THREADS = 10 + import boto.glacier import iso8601 import sqlalchemy import sqlalchemy.ext.declarative import sqlalchemy.orm +from gpg import Encryptor # There is a lag between an archive being created and the archive # appearing on an inventory. Even if the inventory has an InventoryDate @@ -50,12 +58,23 @@ PROGRAM_NAME = 'glacier' +DEFAULT_PART_SIZE = 4194304 + +DEFAULT_REGION = 'us-east-1' + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + +logger = logging.getLogger('glacier-cli') + class ConsoleError(RuntimeError): def __init__(self, m): self.message = m -class RetryConsoleError(ConsoleError): pass +class RetryConsoleError(ConsoleError): + pass def info(message): @@ -349,7 +368,17 @@ def recent_enough(job): def find_complete_job(jobs): - for job in sorted(filter(lambda job: job.completed, jobs), key=lambda job: iso8601.parse_date(job.completion_date), reverse=True): + complete_jobs = filter(lambda job: job.completed, jobs) + + def sortable_date(job): + return iso8601.parse_date(job.completion_date) + + sorted_completed_jobs = sorted( + complete_jobs, + key=sortable_date, reverse=True + ) + + for job in sorted_completed_jobs: return job @@ -478,7 +507,7 @@ def archive_list(self): if archive_list: print(*archive_list, sep="\n") - def archive_upload(self): + def archive_upload(self, encryptor=None): # XXX: "Leading whitespace in archive descriptions is removed." # XXX: "The description must be less than or equal to 1024 bytes. The # allowable characters are 7 bit ASCII without control codes, @@ -490,55 +519,91 @@ def archive_upload(self): try: full_name = self.args.file.name except: - raise RuntimeError('Archive name not specified. Use --name') + raise RuntimeError("Archive name not specified. Use --name.") name = os.path.basename(full_name) + if self.args.encrypt: + file_obj = tempfile.NamedTemporaryFile() + file_name = file_obj.name + logger.info("Encrypting %s to %s." % (self.args.file, file_name)) + encryptor.encrypt_file(self.args.file, file_name) + logger.info("Encryption complete: %s." % file_name) + else: + file_obj = self.args.file + file_name = file_obj.name + vault = self.connection.get_vault(self.args.vault) - archive_id = vault.create_archive_from_file( - file_obj=self.args.file, description=name) + + if not self.args.concurrent: + logger.info("Uploading in a single part: %s to %s." + % (file_name, vault)) + archive_id = vault.create_archive_from_file( + file_obj = file_obj, description=name + ) + else: + logger.info("Uploading concurrent: %s to %s" + % (file_name, vault)) + uploader = ConcurrentUploader(self.connection.layer1, + vault.name, + part_size=self.args.part_size, + num_threads=self.args.num_threads) + archive_id = uploader.upload(file_name, description=name) + + logger.info("Upload complete.") + logger.info("New Archive ID: %s" % archive_id) + self.cache.add_archive(self.args.vault, name, archive_id) @staticmethod - def _write_archive_retrieval_job(f, job, multipart_size): - if job.archive_size > multipart_size: - def fetch(start, end): - byte_range = start, end-1 - f.write(job.get_output(byte_range).read()) - - whole_parts = job.archive_size // multipart_size - for first_byte in xrange(0, whole_parts * multipart_size, - multipart_size): - fetch(first_byte, first_byte + multipart_size) - remainder = job.archive_size % multipart_size - if remainder: - fetch(job.archive_size - remainder, job.archive_size) + def _write_archive_retrieval_job(f, job, part_size, + encryptor=None): + + if encryptor: + destfile = tempfile.NamedTemporaryFile() + else: + destfile = f + + if job.archive_size > part_size: + downloader = ConcurrentDownloader( + job=job, + part_size=part_size, + num_threads=DEFAULT_NUM_THREADS + ) + downloader.download(destfile.name) else: - f.write(job.get_output().read()) + destfile.write(job.get_output().read()) # Make sure that the file now exactly matches the downloaded archive, # even if the file existed before and was longer. try: - f.truncate(job.archive_size) + destfile.truncate(job.archive_size) except IOError, e: - # Allow ESPIPE, since the "file" couldn't have existed before in - # this case. + # Allow ESPIPE, since the "file" couldn't have existed + # before in this case. if e.errno != errno.ESPIPE: raise + # Decrypt file if encryptor is given + if encryptor: + encryptor.decrypt_file(destfile.name, f.name) + destfile.close() + @classmethod - def _archive_retrieve_completed(cls, args, job, name): + def _archive_retrieve_completed(cls, args, job, name, encryptor=None): if args.output_filename == '-': cls._write_archive_retrieval_job( - sys.stdout, job, args.multipart_size) + sys.stdout, job, args.part_size, + encryptor=encryptor) else: if args.output_filename: filename = args.output_filename else: filename = os.path.basename(name) with open(filename, 'wb') as f: - cls._write_archive_retrieval_job(f, job, args.multipart_size) + cls._write_archive_retrieval_job(f, job, args.part_size, + encryptor=encryptor) - def archive_retrieve_one(self, name): + def archive_retrieve_one(self, name, encryptor=None): try: archive_id = self.cache.get_archive_id(self.args.vault, name) except KeyError: @@ -549,30 +614,42 @@ def archive_retrieve_one(self, name): complete_job = find_complete_job(retrieval_jobs) if complete_job: - self._archive_retrieve_completed(self.args, complete_job, name) + self._archive_retrieve_completed(self.args, complete_job, name, + encryptor=encryptor) elif has_pending_job(retrieval_jobs): if self.args.wait: complete_job = wait_until_job_completed(retrieval_jobs) - self._archive_retrieve_completed(self.args, complete_job, name) + self._archive_retrieve_completed(self.args, complete_job, name, + encryptor=encryptor) else: - raise RetryConsoleError('job still pending for archive %r' % name) + raise RetryConsoleError('job still pending for archive %r' + % name) else: # create an archive retrieval job job = vault.retrieve_archive(archive_id) if self.args.wait: wait_until_job_completed([job]) - self._archive_retrieve_completed(self.args, job, name) + self._archive_retrieve_completed(self.args, job, name, + encryptor=encryptor) else: - raise RetryConsoleError('queued retrieval job for archive %r' % name) + raise RetryConsoleError('queued retrieval job for archive %r' + % name) - def archive_retrieve(self): + def archive_retrieve(self, encryptor=None): if len(self.args.names) > 1 and self.args.output_filename: - raise ConsoleError('cannot specify output filename with multi-archive retrieval') + raise ConsoleError("cannot specify output filename with " + "multi-archive retrieval") success_list = [] retry_list = [] + + # Let called functions know that encryption was disabled from + # command-line. + if not self.args.decrypt: + encryptor = None + for name in self.args.names: try: - self.archive_retrieve_one(name) + self.archive_retrieve_one(name, encryptor=encryptor) except RetryConsoleError, e: retry_list.append(e.message) else: @@ -640,10 +717,10 @@ def too_old(last_seen): print(self.args.name) - def parse_args(self, args=None): parser = argparse.ArgumentParser() - parser.add_argument('--region', default='us-east-1') + parser.add_argument('--region', default=DEFAULT_REGION) + subparsers = parser.add_subparsers() vault_subparser = subparsers.add_parser('vault').add_subparsers() vault_subparser.add_parser('list').set_defaults(func=self.vault_list) @@ -662,30 +739,74 @@ def parse_args(self, args=None): archive_list_subparser.set_defaults(func=self.archive_list) archive_list_subparser.add_argument('--force-ids', action='store_true') archive_list_subparser.add_argument('vault') + + encryptor = Encryptor() + + # Upload command + archive_upload_func = partial(self.archive_upload, encryptor=encryptor) archive_upload_subparser = archive_subparser.add_parser('upload') - archive_upload_subparser.set_defaults(func=self.archive_upload) + archive_upload_subparser.set_defaults(func=archive_upload_func) archive_upload_subparser.add_argument('vault') archive_upload_subparser.add_argument('file', type=argparse.FileType('rb')) - archive_upload_subparser.add_argument('--name') + archive_upload_subparser.add_argument( + '--name', + help='The description of the archive.' + ) + archive_upload_subparser.add_argument( + '--encrypt', default=False, action="store_true", + help="Encrypt before uploading using default GNUPG encryption." + ) + archive_upload_subparser.add_argument( + '--concurrent', default=False, action="store_true", + dest="concurrent", + help='Break the upload into multiple pieces ' + '(required for large uploads).' + ) + archive_upload_subparser.add_argument( + '--part-size', + default=DEFAULT_PART_SIZE, + dest="part_size", + help=("For --concurrent uploads, change the " + "part size from the default of %d." + % DEFAULT_PART_SIZE) + ) + archive_upload_subparser.add_argument( + '--num-threads', + default=DEFAULT_NUM_THREADS, + dest="num_threads", + help=("For --concurrent uploads, change the " + "num threads from the default of %d." + % DEFAULT_NUM_THREADS) + ) + + # Retrieve command + archive_retrieve_func = partial( + self.archive_retrieve, encryptor=encryptor) archive_retrieve_subparser = archive_subparser.add_parser('retrieve') - archive_retrieve_subparser.set_defaults(func=self.archive_retrieve) + archive_retrieve_subparser.set_defaults(func=archive_retrieve_func) archive_retrieve_subparser.add_argument('vault') archive_retrieve_subparser.add_argument('names', nargs='+', metavar='name') - archive_retrieve_subparser.add_argument('--multipart-size', type=int, - default=(8*1024*1024)) + archive_retrieve_subparser.add_argument('--part-size', type=int, + default=(8 * 1024 * 1024)) archive_retrieve_subparser.add_argument('-o', dest='output_filename', metavar='OUTPUT_FILENAME') archive_retrieve_subparser.add_argument('--wait', action='store_true') + archive_retrieve_subparser.add_argument( + '--decrypt', default=False, action="store_true") + + # Delete command archive_delete_subparser = archive_subparser.add_parser('delete') archive_delete_subparser.set_defaults(func=self.archive_delete) archive_delete_subparser.add_argument('vault') archive_delete_subparser.add_argument('name') + + # Checkpresent command archive_checkpresent_subparser = archive_subparser.add_parser( - 'checkpresent') + 'checkpresent') archive_checkpresent_subparser.set_defaults( - func=self.archive_checkpresent) + func=self.archive_checkpresent) archive_checkpresent_subparser.add_argument('vault') archive_checkpresent_subparser.add_argument('name') archive_checkpresent_subparser.add_argument('--wait', @@ -693,10 +814,24 @@ def parse_args(self, args=None): archive_checkpresent_subparser.add_argument('--quiet', action='store_true') archive_checkpresent_subparser.add_argument( - '--max-age', type=int, default=80, dest='max_age_hours') + '--max-age', type=int, default=80, dest='max_age_hours') + job_subparser = subparsers.add_parser('job').add_subparsers() job_subparser.add_parser('list').set_defaults(func=self.job_list) - return parser.parse_args(args) + parsed = parser.parse_args(args) + + if (parsed.func == archive_upload_func + and parsed.file is sys.stdin): + if parsed.concurrent: + raise ConsoleError( + "concurrent uploads do not support streaming stdin" + ) + if parsed.encrypt: + raise ConsoleError( + "encrypted uploads do not support streaming stdin" + ) + + return parsed def __init__(self, args=None, connection=None, cache=None): args = self.parse_args(args) @@ -712,6 +847,7 @@ def __init__(self, args=None, connection=None, cache=None): self.args = args def main(self): + try: self.args.func() except RetryConsoleError, e: diff --git a/glacier_test.input b/glacier_test.input new file mode 100644 index 0000000..190a180 --- /dev/null +++ b/glacier_test.input @@ -0,0 +1 @@ +123 diff --git a/glacier_test.py b/glacier_test.py old mode 100644 new mode 100755 index bc93f9c..bc3d4ee --- a/glacier_test.py +++ b/glacier_test.py @@ -23,15 +23,18 @@ from __future__ import print_function +import os import sys import unittest import mock from mock import Mock, patch, sentinel import nose.tools +from tempfile import gettempdir import glacier +os.environ['GNUPGHOME'] = 'gnupg_test_home' EX_TEMPFAIL = 75 @@ -106,22 +109,79 @@ def test_archive_list_force_ids(self): ) def test_archive_upload(self): - file_obj = Mock() - file_obj.name = 'filename' - open_mock = Mock(return_value=file_obj) - with patch('__builtin__.open', open_mock): - self.run_app(['archive', 'upload', 'vault_name', 'filename']) - self.connection.get_vault.assert_called_with('vault_name') - mock_vault = self.connection.get_vault.return_value - mock_vault.create_archive_from_file.assert_called_once_with( - file_obj=file_obj, description='filename') + for encrypt in (False, True): + args = ['archive', + 'upload', + 'vault_name', + 'glacier_test.input', + '--name', + 'glacier_test.input'] + if encrypt: + args.append('--encrypt') + + open_mock = Mock(wraps=open) + with patch('__builtin__.open', open_mock): + self.run_app(args) + + self.connection.get_vault.assert_called_with('vault_name') + + mock_vault = self.connection.get_vault.return_value + + call_args = mock_vault.create_archive_from_file.call_args + uploaded_name = call_args[1]['file_obj'].name + if encrypt: + nose.tools.assert_equals(uploaded_name.find(gettempdir()), 0) + else: + nose.tools.assert_equals(uploaded_name, 'glacier_test.input') + + nose.tools.assert_equals(call_args[1]['description'], + 'glacier_test.input') + + def test_archive_upload_concurrent(self): + for encrypt in (False, True): + args = ['archive', 'upload', 'vault_name', 'glacier_test.input', + '--name', 'glacier_test.input', + '--concurrent'] + if encrypt: + args.append('--encrypt') + + open_mock = Mock(wraps=open) + upload_mock = Mock(return_value='fake_archive_id') + with patch('__builtin__.open', open_mock), \ + patch('boto.glacier.concurrent.ConcurrentUploader.upload', + upload_mock): + self.run_app(args) + + self.connection.get_vault.assert_called_with('vault_name') + call_args = upload_mock.call_args + if encrypt: + nose.tools.assert_equals(call_args[0][0].find(gettempdir()), 0) + else: + nose.tools.assert_equals(call_args[0][0], 'glacier_test.input') + nose.tools.assert_equals(call_args[1]['description'], + 'glacier_test.input') def test_archive_stdin_upload(self): - self.run_app(['archive', 'upload', 'vault_name', '-']) - self.connection.get_vault.assert_called_once_with('vault_name') - vault = self.connection.get_vault.return_value - vault.create_archive_from_file.assert_called_once_with( - file_obj=sys.stdin, description='') + for concurrent in (False, True): + for encrypt in (False, True): + args = ['archive', 'upload', 'vault_name', '-'] + if concurrent: + args.append('--concurrent') + if encrypt: + args.append('--encrypt') + if concurrent or encrypt: + # NOTE: concurrency and encryption are not + # currently supported when streaming fron stdin. + nose.tools.assert_raises(Exception, + self.run_app, args) + else: + self.run_app(args) + self.connection.get_vault.assert_called_once_with( + 'vault_name' + ) + vault = self.connection.get_vault.return_value + vault.create_archive_from_file.assert_called_once_with( + file_obj=sys.stdin, description='') def test_archive_retrieve_no_job(self): self.init_app(['archive', 'retrieve', 'vault_name', 'archive_name']) diff --git a/gnupg_test_home/pubring.gpg b/gnupg_test_home/pubring.gpg new file mode 100644 index 0000000..91278d5 Binary files /dev/null and b/gnupg_test_home/pubring.gpg differ diff --git a/gnupg_test_home/pubring.gpg~ b/gnupg_test_home/pubring.gpg~ new file mode 100644 index 0000000..d22072b Binary files /dev/null and b/gnupg_test_home/pubring.gpg~ differ diff --git a/gnupg_test_home/random_seed b/gnupg_test_home/random_seed new file mode 100644 index 0000000..aa5ff6f Binary files /dev/null and b/gnupg_test_home/random_seed differ diff --git a/gnupg_test_home/secring.gpg b/gnupg_test_home/secring.gpg new file mode 100644 index 0000000..ddb0eb3 Binary files /dev/null and b/gnupg_test_home/secring.gpg differ diff --git a/gnupg_test_home/trustdb.gpg b/gnupg_test_home/trustdb.gpg new file mode 100644 index 0000000..d8725d6 Binary files /dev/null and b/gnupg_test_home/trustdb.gpg differ diff --git a/gpg.py b/gpg.py new file mode 100644 index 0000000..56e55bb --- /dev/null +++ b/gpg.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python + +# The MIT License (MIT) +# +# Copyright (c) 2013 Piotr Kaleta and Counsyl +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, dis- +# tribute, sublicense, and/or sell copies of the Software, and to permit +# persons to whom the Software is furnished to do so, subject to the fol- +# lowing conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- +# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +import gnupg +import os + +GNUPG_KEY_TYPE = "RSA" +GNUPG_KEY_LENGTH = 2048 + + +class Encryptor(object): + + def __init__(self): + self.gpg = gnupg.GPG() + self.gpg.encoding = "utf-8" + if self.gpg.list_keys() == []: + self.generate_keypair() + + def generate_keypair(self): + """Generate the RSA keypair.""" + input_data = self.gpg.gen_key_input(key_type=GNUPG_KEY_TYPE, + key_length=GNUPG_KEY_LENGTH) + key = self.gpg.gen_key(input_data) + + return key + + def export_keypair(self, key): + public_key = self.gpg.export_keys(key) + private_key = self.gpg.export_keys(key, True) + + return public_key, private_key + + def _get_fingerprint(self): + """Return the fingerprint of the default key.""" + return self.gpg.list_keys()[0]["fingerprint"] + + def encrypt_file(self, input_file, output_filename): + """Encrypt `input_file` (handle) and save results as `output_filename`.""" + fingerprint = self._get_fingerprint() + + self.gpg.encrypt_file(input_file, fingerprint, + output=output_filename) + + def encrypt(self, data): + """Return the encrypted version of `data`.""" + return self.gpg.encrypt(data) + + def decrypt_file(self, input_filename, output_filename): + """Decrypt `input_filename` and save results as `output_filename`.""" + with open(input_filename, "r") as input_file: + self.gpg.decrypt_file(input_file, output=output_filename) + + def decrypt(self, data): + """Return decrypted version of `data`.""" + return self.gpg.decrypt(data) diff --git a/koality.yml b/koality.yml new file mode 100644 index 0000000..722d71d --- /dev/null +++ b/koality.yml @@ -0,0 +1,14 @@ +languages: + python: 2.7 +setup: +- packages: + - pip: + - install requirements: requirements.txt + - install requirements: requirements-dev.txt +test: + machines: 1 + scripts: + - unit tests: + script: nosetests --with-xunit glacier_test.py + xunit: nosetests.xml + timeout: 300 diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..6955664 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,7 @@ +--index-url http://pypi.counsyl.com/counsyl/prod/+simple/ + +codequality==0.2.2.vcounsyl +mock==1.0.1 +nose==1.2.1 +pep8==1.4.4 +pyflakes==0.5.0 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..13d3b35 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +--index-url http://pypi.counsyl.com/counsyl/prod/+simple/ +SQLAlchemy>=0.8.4 +boto==2.19.0 +iso8601==0.1.8 +python-gnupg==0.3.5 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..1129ab1 --- /dev/null +++ b/setup.py @@ -0,0 +1,31 @@ +from setuptools import setup +from pip.req import parse_requirements + +setup( + name='counsyl-glacier-cli', + version='0.1.1.alpha', + author='Counsyl', + author_email='root@counsyl.com', + maintainer='Counsyl', + maintainer_email='root@counsyl.com', + description='Command-line interface to Amazon Glacier', + long_description=open('README.md', 'rt').read(), + url='https://github.counsyl.com/dev/glacier-cli', + + py_modules=['glacier', 'gpg'], + zip_safe=False, + install_requires=[ + str(req.req) for req in parse_requirements('requirements.txt') + ], + tests_require=[ + str(req.req) for req in parse_requirements('requirements-dev.txt') + ], + classifiers=( + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: BSD License", + "Operating System :: OS Independent", + "Programming Language :: Python :: 2.7", + "Topic :: System :: Logging", + ), +)