Added hg-archive command. It allows you to create, push to and pull from non-versioned (i.e non-Mercurial) incremental file archives in Freenet. See: hg help fn-archive for minimal help. BE CAREFUL when you run --pull, i.e. either of these: hg fn-archive hg fn-archive --pull It will DELETE any files/directories that are not stored in the archive in Freenet.
diff --git a/infocalypse/__init__.py b/infocalypse/__init__.py --- a/infocalypse/__init__.py +++ b/infocalypse/__init__.py @@ -290,6 +290,28 @@ If you see 'abort: Connection refused' w fn-fmsread or fn-fmsnotify, check fms_host and fms_port in the config file. +EXPERIMENTAL STUFF: +The following commands are experimental and undocumented: +fn-wiki +fn-archive + +See: +USK@kRM~jJVREwnN2qnA8R0Vt8HmpfRzBZ0j4rHC2cQ-0hw, \\ +2xcoQVdQLyqfTpF2DpkdUIbHFCeL4W~2X1phUYymnhM,AQACAAE/fniki_demo/-3/ + +Also, see: +hg help fn-wiki +hg help fn-archive + +BE CAREFUL, these commands may delete local files and directories: +hg fn-archive +hg fn-archive --pull + + +See: +USK@kRM~jJVREwnN2qnA8R0Vt8HmpfRzBZ0j4rHC2cQ-0hw, \\ +2xcoQVdQLyqfTpF2DpkdUIbHFCeL4W~2X1phUYymnhM,AQACAAE/fniki_demo/3/ + MORE DOCUMENTATION: See doc/infocalypse_howto.html in the directory this extension was installed into. @@ -343,6 +365,8 @@ from fmscmds import execute_fmsread, exe from sitecmds import execute_putsite, execute_genkey from wikicmds import execute_wiki +from arccmds import execute_arc_create, execute_arc_pull, execute_arc_push, \ + execute_arc_reinsert from config import read_freesite_cfg from validate import is_hex_string, is_fms_id @@ -655,6 +679,97 @@ def infocalypse_setup(ui_, **opts): opts['fcpport'], opts['tmpdir']) +#----------------------------------------------------------" +def do_archive_create(ui_, opts, params, stored_cfg): + """ fn-archive --create.""" + insert_uri = opts['uri'] + if insert_uri == '': + raise util.Abort("Please set the insert URI with --uri.") + + params['INSERT_URI'] = insert_uri + params['FROM_DIR'] = os.getcwd() + execute_arc_create(ui_, params, stored_cfg) + +def do_archive_push(ui_, opts, params, stored_cfg): + """ fn-archive --push.""" + insert_uri = opts['uri'] + if insert_uri == '': + insert_uri = ( + stored_cfg.get_dir_insert_uri(params['ARCHIVE_CACHE_DIR'])) + if not insert_uri: + ui_.warn("There is no stored insert URI for this archive.\n" + "Please set one with the --uri option.\n") + raise util.Abort("No Insert URI.") + + params['INSERT_URI'] = insert_uri + params['FROM_DIR'] = os.getcwd() + + execute_arc_push(ui_, params, stored_cfg) + +def do_archive_pull(ui_, opts, params, stored_cfg): + """ fn-archive --pull.""" + request_uri = opts['uri'] + + if request_uri == '': + request_uri = ( + stored_cfg.get_request_uri(params['ARCHIVE_CACHE_DIR'])) + if not request_uri: + ui_.warn("There is no stored request URI for this archive.\n" + "Please set one with the --uri option.\n") + raise util.Abort("No request URI.") + + params['REQUEST_URI'] = request_uri + params['TO_DIR'] = os.getcwd() + execute_arc_pull(ui_, params, stored_cfg) + +ILLEGAL_FOR_REINSERT = ('uri', 'aggressive', 'nosearch') +def do_archive_reinsert(ui_, opts, params, stored_cfg): + """ fn-archive --reinsert.""" + illegal = [value for value in ILLEGAL_FOR_REINSERT + if value in opts and opts[value]] + if illegal: + raise util.Abort("--uri, --aggressive, --nosearch illegal " + + "for reinsert.") + request_uri = stored_cfg.get_request_uri(params['ARCHIVE_CACHE_DIR']) + if request_uri is None: + ui_.warn("There is no stored request URI for this archive.\n" + + "Run fn-archive --pull first!.\n") + raise util.Abort(" No request URI, can't re-insert") + + insert_uri = stored_cfg.get_dir_insert_uri(params['ARCHIVE_CACHE_DIR']) + params['REQUEST_URI'] = request_uri + params['INSERT_URI'] = insert_uri + params['FROM_DIR'] = os.getcwd() # hmmm not used. + params['REINSERT_LEVEL'] = 3 + execute_arc_reinsert(ui_, params, stored_cfg) + +ARCHIVE_SUBCMDS = {'create':do_archive_create, + 'push':do_archive_push, + 'pull':do_archive_pull, + 'reinsert':do_archive_reinsert} +ARCHIVE_CACHE_DIR = '.ARCHIVE_CACHE' +def infocalypse_archive(ui_, **opts): + """ Commands to maintain a non-hg incremental archive.""" + subcmd = [value for value in ARCHIVE_SUBCMDS if opts[value]] + if len(subcmd) > 1: + raise util.Abort("--create, --pull, --push are mutally exclusive. " + + "Only specify one.") + if len(subcmd) > 0: + subcmd = subcmd[0] + else: + subcmd = "pull" + + params, stored_cfg = get_config_info(ui_, opts) + params['ARCHIVE_CACHE_DIR'] = os.path.join(os.getcwd(), ARCHIVE_CACHE_DIR) + + if not subcmd in ARCHIVE_SUBCMDS: + raise util.Abort("Unhandled subcommand: " + subcmd) + + # 2 qt? + ARCHIVE_SUBCMDS[subcmd](ui_, opts, params, stored_cfg) + +#----------------------------------------------------------" + # Can't use None as a default? Means "takes no argument'? FCP_OPTS = [('', 'fcphost', '', 'fcp host'), ('', 'fcpport', 0, 'fcp port'), @@ -758,9 +873,27 @@ cmdtable = { [('', 'tmpdir', '~/infocalypse_tmp', 'temp directory'),] + FCP_OPTS, "[options]"), + + + "fn-archive": (infocalypse_archive, + [('', 'uri', '', 'Request URI for --pull, Insert URI ' + + 'for --create, --push'), + ('', 'create', None, 'Create a new archive using the ' + + 'Insert URI --uri'), + ('', 'push', None, 'Push incremental updates into the ' + + 'archive in Freenet'), + ('', 'pull', None, 'Pull incremental updates from the ' + + 'archive in Freenet'), + ('', 'reinsert', None, 'Re-insert the entire archive. '), + ] + + FCP_OPTS + + NOSEARCH_OPT + + AGGRESSIVE_OPT, + "[options]"), + } commands.norepo += ' fn-setup' commands.norepo += ' fn-genkey' - +commands.norepo += ' fn-archive' diff --git a/infocalypse/arccmds.py b/infocalypse/arccmds.py new file mode 100644 --- /dev/null +++ b/infocalypse/arccmds.py @@ -0,0 +1,307 @@ +""" Implementation of fn-archive command. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +from mercurial import util + +from fcpclient import get_version, get_usk_for_usk_version, is_usk_file, is_usk + +from config import Config +from infcmds import setup, do_key_setup, is_redundant, run_until_quiescent +from updatesm import QUIESCENT, FINISHING +from archivesm import create_dirs, ArchiveUpdateContext, \ + start_inserting_blocks, start_requesting_blocks, cleanup_dirs, \ + ARC_INSERTING_URI, ARC_REQUESTING_URI, ARC_CACHING_TOPKEY + +from arclocal import local_create, local_synch, local_update, local_reinsert + + +def arc_cleanup(update_sm, top_key_state=None): + """ INTERNAL: Cleanup after running an archive command. """ + + if update_sm is None: + return + + # Cleanup archive temp files. + top_key = None + if not top_key_state is None: + top_key = update_sm.get_state(top_key_state).get_top_key_tuple() + + ctx = update_sm.ctx + if (not ctx is None and + 'ARCHIVE_CACHE_DIR' in update_sm.ctx and + 'REQUEST_URI' in update_sm.ctx): + cleanup_dirs(ctx.ui_, + ctx['ARCHIVE_CACHE_DIR'], + ctx['REQUEST_URI'], + top_key) + + # Previous cleanup code. + if not update_sm.runner is None: + update_sm.runner.connection.close() + + if not update_sm.ctx.bundle_cache is None: + update_sm.ctx.bundle_cache.remove_files() # Unreachable??? + + +def arc_handle_updating_config(update_sm, params, stored_cfg, + is_pulling=False): + """ INTERNAL: Write updates into the config file IFF the previous + command succeeded. """ + + base_dir = params['ARCHIVE_CACHE_DIR'] + + if not is_pulling: + if not update_sm.get_state(QUIESCENT).arrived_from(((FINISHING,))): + return + + if (params['INSERT_URI'] is None or # <- re-insert w/o insert uri + not is_usk_file(params['INSERT_URI'])): + return + + inverted_uri = params['INVERTED_INSERT_URI'] + + # Cache the request_uri - insert_uri mapping. + stored_cfg.set_insert_uri(inverted_uri, update_sm.ctx['INSERT_URI']) + + # Cache the updated index for the insert. + version = get_version(update_sm.ctx['INSERT_URI']) + stored_cfg.update_index(inverted_uri, version) + stored_cfg.update_dir(base_dir, inverted_uri) + + # Hmmm... if we wanted to be clever we could update the request + # uri too when it doesn't match the insert uri. Ok for now. + # Only for usks and only on success. + #print "UPDATED STORED CONFIG(0)" + Config.to_file(stored_cfg) + + else: + # Only finishing required. same. REDFLAG: look at this again + if not update_sm.get_state(QUIESCENT).arrived_from((FINISHING,)): + return + + if not is_usk(params['REQUEST_URI']): + return + + state = update_sm.get_state(ARC_REQUESTING_URI) + updated_uri = state.get_latest_uri() + version = get_version(updated_uri) + stored_cfg.update_index(updated_uri, version) + stored_cfg.update_dir(base_dir, updated_uri) + #print "UPDATED STORED CONFIG(1)" + Config.to_file(stored_cfg) + + +def execute_arc_create(ui_, params, stored_cfg): + """ Create a new incremental archive. """ + update_sm = None + top_key_state = None + try: + assert 'ARCHIVE_CACHE_DIR' in params + assert 'FROM_DIR' in params + update_sm = setup(ui_, None, params, stored_cfg) + request_uri, dummy = do_key_setup(ui_, update_sm, params, stored_cfg) + create_dirs(ui_, params['ARCHIVE_CACHE_DIR'], request_uri) + ui_.status("%sInsert URI:\n%s\n" % (is_redundant(params['INSERT_URI']), + params['INSERT_URI'])) + + # Create the local blocks. + files, top_key = local_create(params['ARCHIVE_CACHE_DIR'], + request_uri, + params['FROM_DIR']) + + for block in top_key[0]: + if block[1][0] == 'CHK@': + ui_.status("Created new %i byte block.\n" % block[0]) + + # Insert them into Freenet. + ctx = ArchiveUpdateContext(update_sm, ui_) + ctx.update({'REQUEST_URI':request_uri, + 'INSERT_URI':params['INSERT_URI'], + 'ARCHIVE_CACHE_DIR':params['ARCHIVE_CACHE_DIR'], + 'PROVISIONAL_TOP_KEY':top_key, + 'ARCHIVE_BLOCK_FILES':files}) + + start_inserting_blocks(update_sm, ctx) + run_until_quiescent(update_sm, params['POLL_SECS']) + + if update_sm.get_state(QUIESCENT).arrived_from(((FINISHING,))): + ui_.status("Inserted to:\n%s\n" % + '\n'.join(update_sm.get_state(ARC_INSERTING_URI). + get_request_uris())) + top_key_state = ARC_INSERTING_URI + else: + ui_.status("Archive create failed.\n") + + arc_handle_updating_config(update_sm, params, stored_cfg) + finally: + arc_cleanup(update_sm, top_key_state) + +def execute_arc_pull(ui_, params, stored_cfg): + """ Update from an existing incremental archive in Freenet. """ + update_sm = None + top_key_state = None + try: + assert 'ARCHIVE_CACHE_DIR' in params + assert not params['REQUEST_URI'] is None + if not params['NO_SEARCH'] and is_usk_file(params['REQUEST_URI']): + index = stored_cfg.get_index(params['REQUEST_URI']) + if not index is None: + if index >= get_version(params['REQUEST_URI']): + # Update index to the latest known value + # for the --uri case. + params['REQUEST_URI'] = get_usk_for_usk_version( + params['REQUEST_URI'], index) + else: + ui_.status(("Cached index [%i] < index in USK [%i]. " + + "Using the index from the USK.\n" + + "You're sure that index exists, right?\n") % + (index, get_version(params['REQUEST_URI']))) + + update_sm = setup(ui_, None, params, stored_cfg) + ui_.status("%sRequest URI:\n%s\n" % ( + is_redundant(params['REQUEST_URI']), + params['REQUEST_URI'])) + + # Pull changes into the local block cache. + ctx = ArchiveUpdateContext(update_sm, ui_) + ctx.update({'REQUEST_URI':params['REQUEST_URI'], + 'ARCHIVE_CACHE_DIR':params['ARCHIVE_CACHE_DIR']}) + start_requesting_blocks(update_sm, ctx) + run_until_quiescent(update_sm, params['POLL_SECS']) + + if update_sm.get_state(QUIESCENT).arrived_from(((FINISHING,))): + uri = update_sm.get_state(ARC_REQUESTING_URI).get_latest_uri() + blocks = update_sm.get_state(ARC_CACHING_TOPKEY).get_blocks() + plural = '' + if len(blocks) != 1: + plural = 's' + ui_.status("Fetched %i bytes in %i CHK%s from:\n%s\n" % + (sum([block[0] for block in blocks]), + len(blocks), plural, uri)) + ui_.status("Updating local directory...\n") + local_synch(ui_, + params['ARCHIVE_CACHE_DIR'], + # Use the updated URI below so we get the + # right cached topkey. + uri, + params['TO_DIR']) + top_key_state = ARC_REQUESTING_URI + else: + ui_.status("Synchronize failed.\n") + + arc_handle_updating_config(update_sm, params, stored_cfg, True) + finally: + arc_cleanup(update_sm, top_key_state) + + +def execute_arc_push(ui_, params, stored_cfg): + """ Push an update into an incremental archive in Freenet. """ + assert params.get('REQUEST_URI', None) is None # REDFLAG: why ? + update_sm = None + top_key_state = None + try: + update_sm = setup(ui_, None, params, stored_cfg) + request_uri, dummy_is_keypair = do_key_setup(ui_, update_sm, params, + stored_cfg) + create_dirs(ui_, params['ARCHIVE_CACHE_DIR'], request_uri) + ui_.status("%sInsert URI:\n%s\n" % (is_redundant(params['INSERT_URI']), + params['INSERT_URI'])) + + + # Update the local archive. + files, top_key = local_update(params['ARCHIVE_CACHE_DIR'], + request_uri, + params['FROM_DIR']) + + if files is None: + raise util.Abort("There are no local changes to add.") + + for block in top_key[0]: + if block[1][0] == 'CHK@': + ui_.status("Created new %i byte block.\n" % block[0]) + + # Insert them into Freenet. + ctx = ArchiveUpdateContext(update_sm, ui_) + ctx.update({'REQUEST_URI':request_uri, + 'INSERT_URI':params['INSERT_URI'], + 'ARCHIVE_CACHE_DIR':params['ARCHIVE_CACHE_DIR'], + 'PROVISIONAL_TOP_KEY':top_key, + 'ARCHIVE_BLOCK_FILES':files}) + + start_inserting_blocks(update_sm, ctx) + run_until_quiescent(update_sm, params['POLL_SECS']) + + if update_sm.get_state(QUIESCENT).arrived_from(((FINISHING,))): + ui_.status("Inserted to:\n%s\n" % + '\n'.join(update_sm.get_state(ARC_INSERTING_URI). + get_request_uris())) + top_key_state = ARC_INSERTING_URI + else: + ui_.status("Push to archive failed.\n") + + arc_handle_updating_config(update_sm, params, stored_cfg) + finally: + arc_cleanup(update_sm, top_key_state) + +def execute_arc_reinsert(ui_, params, stored_cfg): + """ Reinsert the archive into Freenet. """ + assert not params.get('REQUEST_URI', None) is None + assert params.get('REINSERT_LEVEL', 0) > 0 + + update_sm = None + try: + update_sm = setup(ui_, None, params, stored_cfg) + request_uri, dummy_is_keypair = do_key_setup(ui_, update_sm, params, + stored_cfg) + create_dirs(ui_, params['ARCHIVE_CACHE_DIR'], request_uri) + + ui_.status("%sRequest URI:\n%s\n" % (is_redundant(request_uri), + request_uri)) + + # Get the blocks to re-insert. + files, top_key = local_reinsert(params['ARCHIVE_CACHE_DIR'], + request_uri) + + # Tell the user about them. + for block in top_key[0]: + if block[1][0] == 'CHK@': + ui_.status("Re-inserting %i byte block.\n" % block[0]) + + # Start re-inserting them. + ctx = ArchiveUpdateContext(update_sm, ui_) + ctx.update({'REQUEST_URI':request_uri, + 'INSERT_URI':params['INSERT_URI'], + 'ARCHIVE_CACHE_DIR':params['ARCHIVE_CACHE_DIR'], + 'PROVISIONAL_TOP_KEY':top_key, + 'ARCHIVE_BLOCK_FILES':files, + 'REINSERT':params['REINSERT_LEVEL']}) + + start_inserting_blocks(update_sm, ctx) + run_until_quiescent(update_sm, params['POLL_SECS']) + + if update_sm.get_state(QUIESCENT).arrived_from(((FINISHING,))): + ui_.status("Re-insert finished.\n") + else: + ui_.status("Re-insert failed.\n") + + arc_handle_updating_config(update_sm, params, stored_cfg) + finally: + arc_cleanup(update_sm, None) # Don't prune cache. diff --git a/infocalypse/archive_block_redundancy.txt b/infocalypse/archive_block_redundancy.txt new file mode 100644 --- /dev/null +++ b/infocalypse/archive_block_redundancy.txt @@ -0,0 +1,44 @@ +djk20091209 -- Block redundancy in InsertingRedundantBlocks, RequestingRedundantBlocks + (for hg fn-archive, infocalypse stuff, works somewhat differently) + +See: archivesm.py + test_block_redundancy.py + +m == block size == 32K +h == max single block top key saltable splitfile size ~= 7M +l = actual data length + +INSERTION: +CASE l < m: +0) Insert first block unmodified with no metadata +1) Insert second block with a trailing '\xff' pad byte with no metadata +CASE l == m: +0) Insert first block with a trailing '\xff' pad byte, and metadata + with mime type: 'application/archive-block;0' +1) Request the splitfile top block from 0) and re-insert it twiddling + the embedded metadata string to: 'application/archive-block;1' +CASE l > m < h: +0) Insert first block unmodified and metadata + with mime type: 'application/archive-block;0' +1) Request the splitfile top block from 0) and re-insert it twiddling + the embedded metadata string to: 'application/archive-block;1' + +Case l >= h: +0) Insert first block unmodified and metadata + with mime type: 'application/archive-block;0' +1) Don't insert a second redundant block. + +REQUEST: +CASE l < m: + Request both blocks in parallel + TRUNCATE the redundant block to l. +CASE l == m: + Randomly pick on full request, just request the top block + for the other block. + TRUNCATE the both blocks to l. +CASE l > m < h: + Randomly pick on full request, just request the top block + for the other block. + Don't need to truncate (but I do anyway in the impl.) +Case l >= h: + Normal full request. diff --git a/infocalypse/archivesm.py b/infocalypse/archivesm.py new file mode 100644 --- /dev/null +++ b/infocalypse/archivesm.py @@ -0,0 +1,958 @@ +""" Classes to asynchronously create, push and pull incremental archives. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +import os +import random +import shutil + +import archivetop + +from fcpconnection import make_id, SUCCESS_MSGS, sha1_hexdigest +from fcpclient import get_version, get_usk_hash, get_usk_for_usk_version, \ + is_usk +from fcpmessage import GET_DEF, PUT_FILE_DEF + +from statemachine import StateMachine, State, DecisionState, \ + RetryingRequestList, CandidateRequest +from updatesm import UpdateStateMachine, QUIESCENT, FAILING, FINISHING, \ + RequestingUri, InsertingUri, UpdateContextBase, PAD_BYTE + +from archivetop import top_key_tuple_to_bytes, default_out + +from chk import clear_control_bytes +from graph import FREENET_BLOCK_LEN, MAX_METADATA_HACK_LEN + +TMP_DIR = "__TMP__" +BLOCK_DIR = "__TMP_BLOCKS__" + +ARC_MIME_TYPE = 'application/archive-block' +ARC_MIME_TYPE_FMT = ARC_MIME_TYPE + ';%i' +ARC_METADATA_MARKER = ARC_MIME_TYPE + ';' + +# Careful when changing. There is code that depends on '_' chars. +TOP_KEY_NAME_PREFIX = "_top_" +TOP_KEY_NAME_FMT = TOP_KEY_NAME_PREFIX + "%i_.bin" +CHK_NAME_PREFIX = "_chk_" +CHK_NAME_FMT = CHK_NAME_PREFIX + "%s.bin" + +ARC_REQUESTING_URI = 'ARC_REQUESTING_URI' +ARC_CACHING_TOPKEY = 'ARC_CACHING_TOPKEY' +ARC_REQUESTING_BLOCKS = 'ARC_REQUESTING_BLOCKS' + +ARC_INSERTING_BLOCKS = 'ARC_INSERTING_BLOCKS' +ARC_FIXING_UP_TOP_KEY = 'ARC_FIXING_UP_TOP_KEY' +ARC_INSERTING_URI = 'ARC_INSERTING_URI' +ARC_CACHING_INSERTED_TOPKEY = 'ARC_CACHING_INSERTED_TOPKEY' + +# File name that 0) is tagged for deletion and +# 1) doesn't have weird chars in it. e.g. '~' +# Hmmm... it would be more studly to extract the (SHA256?) +# hash from the CHK and use that +def chk_file_name(chk): + """ Return a file name for the CHK. """ + return CHK_NAME_FMT % sha1_hexdigest(chk) + +class ArchiveUpdateContext(UpdateContextBase): + """ An UpdateContextBase for running incremental archive commands. """ + def __init__(self, parent=None, ui_=None): + UpdateContextBase.__init__(self, parent) + self.ui_ = ui_ + + def arch_cache_top_key(self, uri, top_key_tuple): + """ Store top key in local archive cache. """ + out_file = open(os.path.join(self.arch_cache_dir(), + TOP_KEY_NAME_FMT % get_version(uri)), + 'wb') + try: + out_file.write(top_key_tuple_to_bytes(top_key_tuple)) + finally: + out_file.close() + + # Might rename file_name. Might not. Hmmm.... + def arch_cache_block(self, chk, file_name, length=None): + """ Store block in local cache. """ + dest = os.path.join(self.arch_cache_dir(), + chk_file_name(chk)) + + if os.path.exists(dest): + return + + if not os.path.exists(file_name): + print "DOESN'T EXIST: ", file_name + return + + if not length is None: + #print length, os.path.getsize(file_name) + assert length <= os.path.getsize(file_name) + if length < os.path.getsize(file_name): + out_file = open(file_name, 'ab') + try: + out_file.truncate(length) + finally: + out_file.close() + assert length == os.path.getsize(file_name) + + os.rename(file_name, dest) + + def required_blocks(self, top_key_tuple): + """ Return ((block_len, (chk0, ..), ...) for + non-locally-cached blocks. """ + ret = [] + for block in top_key_tuple[0]: + required_chks = [] + cached = 0 + for chk in block[1]: + if not os.path.exists(os.path.join(self.arch_cache_dir(), + chk_file_name(chk))): + #print "NEEDS: ", chk + required_chks.append(chk) + else: + #print "HAS: ", chk + cached += 1 + if cached == 0: + assert len(required_chks) > 0 + ret.append((block[0], tuple(required_chks))) + return ret + + def arch_cache_dir(self): + """ Return the local cache directory. """ + return os.path.join(self['ARCHIVE_CACHE_DIR'], + get_usk_hash(self['REQUEST_URI'])) + + +class RequestingArchiveUri(RequestingUri): + """ A state to request the top level URI for an archive. """ + def __init__(self, parent, name, success_state, failure_state): + RequestingUri.__init__(self, parent, name, success_state, + failure_state) + self.topkey_funcs = archivetop + +class InsertingArchiveUri(InsertingUri): + """ A state to insert the top level URI for an archive into Freenet.""" + def __init__(self, parent, name, success_state, failure_state): + InsertingUri.__init__(self, parent, name, success_state, failure_state) + self.topkey_funcs = archivetop + + # Why didn't I do this in the base class? Can't remember. + def leave(self, to_state): + """ Override to update REQUEST_URI in the parent's context. """ + InsertingUri.leave(self, to_state) + + if to_state.name != self.success_state: + return + + if self.parent.ctx['INSERT_URI'] is None: + # Assert reinserting??? + return # i.e. for reinserting. + + if not (is_usk(self.parent.ctx['INSERT_URI']) and + is_usk(self.parent.ctx['REQUEST_URI'])): + return + + if (get_version(self.parent.ctx['INSERT_URI']) > + get_version(self.parent.ctx['REQUEST_URI'])): + + version = get_version(self.parent.ctx['INSERT_URI']) + + self.parent.ctx['REQUEST_URI'] = ( + get_usk_for_usk_version(self.parent.ctx['REQUEST_URI'], + version)) + +def break_top_key(top_key, failure_probability=0.5, max_broken=3, max_keys=1): + """ Debugging helper function to test block failures. """ + ordinal = ord('A') + blocks = [] + for block in top_key[0]: + chks = [] + count = 0 + for chk in block[1]: + if random.random() < failure_probability and count < max_broken: + chk = (('CHK@badroutingkey0%s0JblbGup0yNSpoDJgVPnL8E5WXoc,' % + chr(ordinal)) + + 'KZ6azHOwEm4ga6dLy6UfbdSzVhJEz3OvIbSS4o5BMKU,AAIC--8') + ordinal += 1 + count += 1 + chks.append(chk) + if len(chks) >= max_keys: + break + blocks.append((block[0], tuple(chks), block[2])) + + top_key = list(top_key) + top_key[0] = blocks + top_key = tuple(top_key) + return top_key + +class CachingTopKey(DecisionState): + """ State to locally cache the archive top key. """ + def __init__(self, parent, name, has_blocks_state, + needs_blocks_state): + DecisionState.__init__(self, parent, name) + self.has_blocks_state = has_blocks_state + self.needs_blocks_state = needs_blocks_state + self.cached_blocks = None + + def reset(self): + """ State override. """ + self.cached_blocks = None + + def decide_next_state(self, from_state): + """ DecisionState implementation. """ + + if not hasattr(from_state, 'get_top_key_tuple'): + raise Exception("Illegal Transition from: %s" % from_state.name) + top_key = from_state.get_top_key_tuple() + + #top_key = break_top_key(top_key) + #archivetop.dump_top_key_tuple(top_key) + + uri = self.parent.ctx['REQUEST_URI'] # WRONG FOR INSERT (+1) + + # Hmmmm... push this into the context? ctx.request_uri() + insert_uri = self.parent.ctx.get('INSERT_URI', None) + if not insert_uri is None and insert_uri != 'CHK@': + version = get_version(insert_uri) + uri = get_usk_for_usk_version(uri, max(version, get_version(uri))) + + self.parent.ctx.arch_cache_top_key(uri, top_key) + self.cached_blocks = self.parent.ctx.required_blocks(top_key) + if len(self.cached_blocks) > 0: + #print "NEEDS BLOCKS: ", len(self.cached_blocks) + return self.needs_blocks_state + #print "HAS BLOCKS" + return self.has_blocks_state + + def get_blocks(self): + """ Return the blocks from the previous state. """ + return self.cached_blocks + +def twiddle_metadata_salting(raw_bytes, marker): + """ INTERNAL: Flip the embedded salting string in splitfile metadata. """ + assert len(raw_bytes) <= FREENET_BLOCK_LEN + pos = raw_bytes.find(marker) + if pos == -1 or len(raw_bytes) < pos + len(marker) + 1: + raise Exception("Couldn't read marker string.") + + salted_pos = pos + len(marker) + old_salt = raw_bytes[salted_pos] + if old_salt != '0': + raise Exception("Unexpected salt byte: %s" % old_salt) + + twiddled_bytes = raw_bytes[:salted_pos] + '1' \ + + raw_bytes[salted_pos + 1:] + assert len(raw_bytes) == len(twiddled_bytes) + + return twiddled_bytes + + +class InsertingRedundantBlocks(RetryingRequestList): + """ State to redundantly insert CHK blocks. """ + def __init__(self, parent, name, success_state, failure_state): + RetryingRequestList.__init__(self, parent, name) + # [file_name, file_len, [CHK0, CHK1], raw_top_key_data] + self.files = [] + + # Candidate is: + # (index, ordinal) + # index is an index into self.files + # ordinal 2 means request topkey data + # ordinal 0, 1 insert + self.success_state = success_state + self.failure_state = failure_state + + def enter(self, dummy_from_state): + """ State implementation. """ + self.files = [] + if len(self.parent.ctx['ARCHIVE_BLOCK_FILES']) == 0: + raise ValueError("No files to in ctx['ARCHIVE_BLOCK_FILES'].") + + for value in self.parent.ctx['ARCHIVE_BLOCK_FILES']: + # REDFLAG: LATER: partial redundant insert handling ? + length = os.path.getsize(value) + self.files.append([value, length, + [None, None], None]) + # Unsalted. + self.current_candidates.insert(0, (len(self.files) - 1, 0)) + #print "QUEUED NORMAL: ", self.current_candidates[0], + # self.files[-1][1] + if length < FREENET_BLOCK_LEN: + # Padded. + self.current_candidates.insert(0, (len(self.files) - 1, 1)) + #print "QUEUED PADDED: ", self.current_candidates[0], + # self.files[-1][1] + # else: + # candidate_done() will queue a salted insert. + + def leave(self, to_state): + """ State implementation. """ + if to_state.name == self.success_state: + #print "SUCCEEDED: ", self.name + # Dump the inserted blocks into the cache. + for entry in self.files: + assert not entry[2][0] is None + self.parent.ctx.arch_cache_block(entry[2][0], + entry[0], entry[1]) + #print "CACHED: ", entry[2][0] + #else: + # print "FAILED: ", self.name + + def reset(self): + """ State implementation. """ + RetryingRequestList.reset(self) + self.files = [] + + def get_blocks(self): + """ Return the block definitions. """ + return self.files[:] + + def make_request(self, candidate): + """ RetryingRequestList implementation. """ + #print "CREATED: ", candidate + entry = self.files[candidate[0]] + request = CandidateRequest(self.parent) + request.tag = str(candidate) # Hmmm + request.candidate = candidate + request.in_params.fcp_params = self.parent.params.copy() + + request.in_params.definition = PUT_FILE_DEF + request.in_params.fcp_params['URI'] = 'CHK@' + + if candidate[1] == 0: + # Simple insert. + request.in_params.file_name = entry[0] + request.in_params.send_data = True + # IMPORTANT: Don't add metadata to < 32K blocks to avoid redirect. + if entry[1] >= FREENET_BLOCK_LEN: + request.in_params.fcp_params['Metadata.ContentType'] = ( + ARC_MIME_TYPE_FMT % 0) + + if entry[1] == FREENET_BLOCK_LEN: + # UT hits this code path. + #print "HIT len==FREENET_BLOCK_LEN case" + # IMPORTANT: Special case len == FREENET_BLOCK_LEN + # PAD to force splitfile insertion, so that we can salt. + in_file = open(entry[0],'rb') + try: + # Read raw data and add one zero pad byte. + request.in_params.send_data = in_file.read() + PAD_BYTE + assert (len(request.in_params.send_data) == + FREENET_BLOCK_LEN + 1) + request.in_params.file_name = None # i.e. from string above. + finally: + in_file.close() + + elif candidate[1] == 1: + # Redundant insert. + if entry[1] < FREENET_BLOCK_LEN: + in_file = open(entry[0],'rb') + try: + # Read raw data and add one zero pad byte. + request.in_params.send_data = in_file.read() + PAD_BYTE + finally: + in_file.close() + else: + # Salted metadata. + assert not entry[3] is None + request.in_params.send_data = ( + twiddle_metadata_salting(entry[3], ARC_METADATA_MARKER)) + elif candidate[1] == 2: + # Raw topkey request + assert entry[2][0] != None + request.in_params.definition = GET_DEF + request.in_params.fcp_params['MaxSize'] = FREENET_BLOCK_LEN + request.in_params.fcp_params['URI'] = ( + clear_control_bytes(entry[2][0])) + else: + raise ValueError("Bad candidate: " + candidate) + + self.parent.ctx.set_cancel_time(request) + return request + + def candidate_done(self, client, msg, candidate): + """ RetryingRequestList implementation. """ + #print "DONE: ", candidate + if not msg[0] in SUCCESS_MSGS: + # LATER: Retry on failure??? + # REDFLAG: message to ui? + self.parent.transition(self.failure_state) + return + + # Keep UpdateStateMachine.request_done() from deleting. + client.in_params.file_name = None + + index, chk_ordinal = candidate + if chk_ordinal < 2: + # Stash inserted URI. + chk = msg[1]['URI'] + if chk_ordinal == 0: + self.files[index][2][0] = chk + else: + if self.files[index][1] >= FREENET_BLOCK_LEN: + # HACK HACK HACK + # TRICKY: + # Scrape the control bytes from the full request + # to enable metadata handling. + chk0 = self.files[index][2][0] + chk0_fields = chk0.split(',') + chk1_fields = chk.split(',') + # Hmmm... also no file names. + assert len(chk0_fields) == len(chk1_fields) + chk = ','.join(chk1_fields[:-1] + chk0_fields[-1:]) + + self.files[index][2][1] = chk + + if chk_ordinal == 0 and (self.files[index][1] >= FREENET_BLOCK_LEN and + self.files[index][1] <= MAX_METADATA_HACK_LEN): + # Queue a top block only request for the inserted splitfile + # metadata so that we can salt it. + self.current_candidates.append((index, 2)) # LIFO + + if chk_ordinal == 2: + # Insert a salted alias for the splitfile metadata. + assert self.files[index][1] >= FREENET_BLOCK_LEN + self.files[index][3] = msg[2] + self.current_candidates.append((index, 1)) # LIFO + + if self.is_stalled(): + # Nothing more to do, so we succeeded. + self.parent.transition(self.success_state) + +def chk_iter(inserter_files): + """ INTERNAL: Iterator which yields CHKs from + InsertingRedundantBlocks.files. """ + for block in inserter_files: + for chk in block[2]: + if not chk is None: + yield chk + +class FixingUpTopKey(State): + """ State to fix up missing CHKs in a top key with inserted values. """ + def __init__(self, parent, name, success_state): + State.__init__(self, parent, name) + self.success_state = success_state + self.fixed_up_top_key = None + + def reset(self): + """ State implementation. """ + self.fixed_up_top_key = None + + def enter(self, from_state): + """ State implementation. """ + if not hasattr(from_state, 'get_blocks'): + raise Exception("Illegal transition from: %s" % from_state.name) + + assert 'PROVISIONAL_TOP_KEY' in self.parent.ctx + top_key = self.parent.ctx['PROVISIONAL_TOP_KEY'] + # Hmmm... Opaque. Fails with StopIteration if something goes wrong. + chks = chk_iter(from_state.get_blocks()) + updated_blocks = [] + for block in top_key[0]: + new_block = list(block) + new_block[1] = list(new_block[1]) + for index, chk in enumerate(block[1]): + if chk == 'CHK@': + # Use the CHK's inserted by the previous state + # to fixup the CHK values in the provisional top key tuple. + new_block[1][index] = chks.next() + new_block[1] = tuple(new_block[1]) + new_block = tuple(new_block) + updated_blocks.append(new_block) + + top_key = list(top_key) + top_key[0] = tuple(updated_blocks) + top_key = tuple(top_key) + self.fixed_up_top_key = top_key + self.parent.transition(self.success_state) + + def get_top_key_tuple(self): + """ Return the fixed up top key. """ + assert not self.fixed_up_top_key is None + return self.fixed_up_top_key + +# REDFLAG: feels like way too much code. Really this complicated? +IS_RUNNING = 'running' # SENTINAL +class RequestHistory: + """ INTERNAL: Helper class to keep track of redundant block request + state. """ + def __init__(self): + self.history = {} + + def reset(self): + """ Reset. """ + self.history = {} + + def dump(self, out_func=default_out): + """ Debugging dump function. """ + keys = self.history.keys() + keys.sort() + out_func("--- dumping request history ---\n") + for key in keys: + out_func("%s->%s\n" % (str(key), str(self.history[key]))) + out_func("---\n") + + def started_request(self, candidate): + """ Record that a request for the candidate was started. """ + #print "started_request -- ", candidate + #self.dump() + assert not candidate in self.history + self.history[candidate] = IS_RUNNING + + def finished_request(self, candidate, result): + """ Record that a request for the candidate finished. """ + #print "finished_request -- ", candidate, result + #self.dump() + assert candidate in self.history + assert self.history[candidate] is IS_RUNNING + self.history[candidate] = bool(result) + + def is_running (self, candidate): + """ Return True if a request for the candidate is running. """ + return self.history.get(candidate, None) is IS_RUNNING + + def tried(self, candidate): + """ Return True if the candidate is was ever tried. """ + return candidate in self.history + + def succeeded(self, candidate): + """ Return True if a request for the candidate succeeded. """ + if not candidate in self.history: + return False + + return self.history[candidate] is True + + def finished(self, candidate): + """ Return True if a request for the candidate finished. """ + return (candidate in self.history and + (not self.history[candidate] is IS_RUNNING)) + + def subblock_failed(self, index, ordinal, count): + """ Return True if a full or partial request for the sub-block + failed. """ + assert not self.succeeded(index) + assert count <= 2 + for flipflop in range(0, count): + candidate = (index, ordinal, flipflop) + if self.finished(candidate) and not self.succeeded(candidate): + return True + return False + + def block_finished(self, index, count): + """ Return True if the block finished. """ + if self.block_succeeded(index): + return True + + # If any subblock isn't finished, the index can't be finished. + for ordinal in range(0, count): + if not self.subblock_failed(index, ordinal, count): + return False + return True + + def block_succeeded(self, index): + """ Return True if the block succeeded. """ + return (self.succeeded((index, 0, False)) or + self.succeeded((index, 1, False))) + + def block_failed(self, index, count): + """ Return True if the block failed. """ + if self.block_succeeded(index): + return False + return self.block_finished(index, count) + +# REDFLAG: Really no library func. to do this? RTFM +def choose_word(condition, true_word, false_word): + """ Return true_word, if condition, false_word otherwise. """ + if condition: + return true_word + return false_word + +# DESIGN INTENT: Keep BOTH top keys for each block in the network +# ALWAYS do a top block only request for the other +# redudant key when making a full request for the +# other block. +class RequestingRedundantBlocks(RetryingRequestList): + """ A State to request redundant block CHKs. """ + def __init__(self, parent, name, success_state, failure_state): + RetryingRequestList.__init__(self, parent, name) + # block -> (length, (CHK, CHK, ...)) + self.success_state = success_state + self.failure_state = failure_state + self.blocks = () + self.history = RequestHistory() + + def enter(self, from_state): + """ State implementation. """ + if not hasattr(from_state, 'get_blocks'): + raise Exception("Illegal Transition from: %s" % from_state.name) + + # Deep copy. + self.blocks = [] + for block in from_state.get_blocks(): + self.blocks.append((block[0], tuple(block[1]))) + self.blocks = tuple(self.blocks) + assert len(self.blocks) > 0 + self.queue_initial_requests() + + def reset(self): + """ State implementation. """ + RetryingRequestList.reset(self) + self.blocks = () + self.history.reset() + + def queue_initial_requests(self): + """ INTERNAL: Queue initial candidates. """ + self.current_candidates = [] + self.next_candidates = [] + for block_ordinal, block in enumerate(self.blocks): + if len(block[1]) == 0: + continue + + chk_ordinals = range(0, len(block[1])) + # DESIGN INTENT: Don't favor primary over redundant. + random.shuffle(chk_ordinals) + ordinal = chk_ordinals.pop() + # Randomly enqueue one full request. + self.current_candidates.append((block_ordinal, ordinal, False)) + + # Only handle single redudancy! + assert len(chk_ordinals) <= 1 + + while len(chk_ordinals) > 0: + ordinal = chk_ordinals.pop() + # Hmmmm... full requests for data under 32K + self.current_candidates.append((block_ordinal, ordinal, + block[0] >= FREENET_BLOCK_LEN)) + # DESIGN INTENT: Don't any particular block. + random.shuffle(self.current_candidates) + # REDFLAG: avoid pending / history same state in two places? + def queue_single_full_request(self, candidate): + """ INTERNAL: Queue a single full request for the block if + possible. """ + assert candidate[2] + + pending = self.pending_candidates() + for value in pending: + assert self.history.is_running(value) + + #print "PENDING: ", pending + #print "CURRENT: ", self.current_candidates + #print "NEXT: ", self.next_candidates + + assert not candidate in pending + full = (candidate[0], candidate[1], False) + if self.history.is_running(full) or self.history.tried(full): + self.parent.ctx.ui_.status("Didn't requeue, full request " + + "already %s.\n" % + choose_word(self.history. + is_running(full), + 'running', 'queued')) + return + + assert not full in pending + + alternate = (candidate[0], int(not candidate[1]), False) + if self.history.is_running(alternate): + self.parent.ctx.ui_.status("Didn't requeue, other salted key " + + "already running.\n") + assert alternate in pending + return + + if alternate in self.current_candidates: + self.parent.ctx.ui_.status("Didn't requeue, other salted key " + + "already queued.\n") + return + + if full in self.current_candidates: + self.current_candidates.remove(full) + + assert not full in self.current_candidates + #print "QUEUED: ", full + self.current_candidates.insert(0, full) # FIFO + + def _finished(self): + """ INTERNAL: Return True if finished requesting. """ + for index, block in enumerate(self.blocks): + if not self.history.block_finished(index, len(block[1])): + return False + return True + + def _block_finished(self, candidate): + """ INTERNAL: Return True if the block is finished. """ + return self.history.block_finished(candidate[0], + len(self.blocks[candidate[0]][1])) + + def queue_alternate(self, candidate): + """ INTERNAL: Queue an alternate full request if possible. """ + #print "BLOCKS:", self.blocks + if len(self.blocks[candidate[0]][1]) < 2: + return False # No alternate key. We're toast. + + assert len(self.blocks[candidate[0]][1]) == 2 + if self.history.block_failed(candidate[0], 2): + return False # Both CHKs already failed. We're toast. + + alternate = (candidate[0], int(not candidate[1]), False) + alternate_partial = (candidate[0], int(not candidate[1]), True) + assert not self.history.subblock_failed(alternate[0], alternate[1], 2) + + if (self.history.is_running(alternate) or + self.history.is_running(alternate_partial)): + self.parent.ctx.ui_.status("%s failed but %s is already running.\n" + % (str(candidate), choose_word( + self.history.is_running(alternate), + str(alternate), + str(alternate_partial)))) + return True + + if self.history.tried(alternate): + self.parent.ctx.ui_.status("Already tried running alternate %s.\n" + % str(alternate)) + return False + + if alternate_partial in self.current_candidates: + self.current_candidates.remove(alternate_partial) + self.parent.ctx.ui_.status("Removed %s from the queue.\n" % + str(alternate_partial)) + assert not alternate_partial in self.current_candidates + + if alternate in self.current_candidates: + self.parent.ctx.ui_.status("%s failed but %s already queued.\n" % + (str(candidate), str(alternate))) + return True + + self.current_candidates.insert(0, alternate) # FIFO + return True + + def candidate_done(self, client, msg, candidate): + """ RetryingRequestList implementation. """ + #print "CANDIDATE_DONE: ", msg[0], candidate + assert not self._finished() + succeeded = msg[0] in SUCCESS_MSGS + self.history.finished_request(candidate, succeeded) + if succeeded: + # Success + if (candidate[2] and not self._block_finished(candidate)): + self.queue_single_full_request(candidate) + elif not candidate[2]: + #print "FINISHED: ", candidate + # Dump the block data into the local cache. + self.parent.ctx.arch_cache_block( + self.blocks[candidate[0]][1][candidate[1]], # CHK + client.in_params.file_name, + self.blocks[candidate[0]][0]) # length + if self._finished(): + self.parent.transition(self.success_state) + return + + if ((not self._block_finished(candidate)) and + (not self.queue_alternate(candidate))): + self.parent.ctx.ui_.status("Download failed:\n" + + '\n'.join(self.blocks[candidate[0]][1]) + + '\n') + self.parent.transition(self.failure_state) + return + + if self.is_stalled(): # REDFLAG: I think this is now unreachable??? + self.parent.transition(self.failure_state) + + def make_request(self, candidate): + """ RetryingRequestList implementation. """ + uri = self.blocks[candidate[0]][1][candidate[1]] + if candidate[2]: + # Just top block. + uri = clear_control_bytes(uri) + + request = CandidateRequest(self.parent) + request.tag = str(candidate) # Hmmm + request.candidate = candidate + request.in_params.fcp_params = self.parent.params.copy() + + request.in_params.definition = GET_DEF + request.in_params.fcp_params['URI'] = uri + out_file = os.path.join( + os.path.join(self.parent.ctx['ARCHIVE_CACHE_DIR'], + TMP_DIR), make_id()) + request.in_params.file_name = out_file + + self.parent.ctx.set_cancel_time(request) + self.history.started_request(candidate) + return request + +# ctx has ui, but not bundlecache or repo +class ArchiveStateMachine(UpdateStateMachine): + """ An UpdateStateMachine subclass for creating, pull and pushing + incremental archives. """ + def __init__(self, runner, ctx): + UpdateStateMachine.__init__(self, runner, ctx) + self.states.update(self.new_states()) + + # REDFLAG: Fix base class + def reset(self): + """ UpdateStateMachin override. + + """ + StateMachine.reset(self) + if len(self.ctx.orphaned) > 0: + print "BUG?: Abandoning orphaned requests." + self.ctx.orphaned.clear() + self.ctx = ArchiveUpdateContext(self, self.ctx.ui_) + + def new_states(self): + """ INTERNAL: Create the new states and transitions. """ + return { + + # Requesting + ARC_REQUESTING_URI:RequestingArchiveUri(self, + ARC_REQUESTING_URI, + ARC_CACHING_TOPKEY, + FAILING), + + ARC_CACHING_TOPKEY:CachingTopKey(self, + ARC_CACHING_TOPKEY, + FINISHING, + ARC_REQUESTING_BLOCKS), + + ARC_REQUESTING_BLOCKS: + RequestingRedundantBlocks(self, + ARC_REQUESTING_BLOCKS, + FINISHING, + FAILING), + + + # Inserting + ARC_INSERTING_BLOCKS:InsertingRedundantBlocks(self, + ARC_INSERTING_BLOCKS, + ARC_FIXING_UP_TOP_KEY, + FAILING), + ARC_FIXING_UP_TOP_KEY:FixingUpTopKey(self, + ARC_FIXING_UP_TOP_KEY, + ARC_INSERTING_URI), + + ARC_INSERTING_URI:InsertingArchiveUri(self, + ARC_INSERTING_URI, + ARC_CACHING_INSERTED_TOPKEY, + FAILING), + + ARC_CACHING_INSERTED_TOPKEY: + CachingTopKey(self, + ARC_CACHING_INSERTED_TOPKEY, + FINISHING, + FAILING), # hmmm + } + + + +def create_dirs(ui_, cache_dir, uri): + """ Create cache and temp directories for an archive. """ + full_path = os.path.join(cache_dir, get_usk_hash(uri)) + if not os.path.exists(full_path): + ui_.status("Creating cache dir:\n%s\n" % full_path) + os.makedirs(full_path) + + tmp_dir = os.path.join(cache_dir, TMP_DIR) + if not os.path.exists(tmp_dir): + ui_.status("Creating temp dir:\n%s\n" % tmp_dir) + os.makedirs(tmp_dir) + +def cleanup_dirs(ui_, cache_dir, uri, top_key=None): + """ Remove unneeded files from the archive cache dir. """ + + # Remove temp dir + tmp_dir = os.path.join(cache_dir, TMP_DIR) + if os.path.exists(tmp_dir): + ui_.status("Removing: %s\n" % tmp_dir) + shutil.rmtree(tmp_dir) + + # Remove block dir + block_dir = os.path.join(cache_dir, BLOCK_DIR) + if os.path.exists(block_dir): + ui_.status("Removing: %s\n" % block_dir) + shutil.rmtree(block_dir) + + if top_key is None: + return + + # Remove old cached top keys and unneeded cached CHKs. + survivors = set([]) + survivors.add(TOP_KEY_NAME_FMT % get_version(uri)) + for block in top_key[0]: + for chk in block[1]: + survivors.add(chk_file_name(chk)) + + archive_dir = os.path.join(cache_dir, get_usk_hash(uri)) + for name in os.listdir(archive_dir): + if not (name.startswith(CHK_NAME_PREFIX) or + name.startswith(TOP_KEY_NAME_PREFIX)): + # Hmmm leave other files alone. Too paranoid? + continue + + if not name in survivors: + full_path = os.path.join(archive_dir, name) + ui_.status("Removing: %s\n" % full_path) + os.remove(full_path) + if len(survivors) > 0: + ui_.status("Leaving %i file%s in : %s\n" % ( + len(survivors), + choose_word(len(survivors) == 1, '','s'), + archive_dir)) + +# LATER: Add "START_STATE" to context, get rid of +# most start_* members on UpdateStateMachine +# and replace them with a generic start(ctx) function. + + +def check_keys(ctx, required_keys): + """ Raise a KeyError if all keys in required_keys are not in ctx. """ + # Just let it raise a KeyError + # Better but causes W0104 + # [ctx[key] for key in required_keys] + # + # Grrr... hacking to avoid pylint W0104 + for key in required_keys: + if not key in ctx and ctx[key]: # Let it raise KeyError + print "You just executed unreachable code???" + +def start(update_sm, ctx): + """ Start running a context on a state machine. """ + update_sm.require_state(QUIESCENT) + assert 'START_STATE' in ctx + update_sm.reset() + update_sm.set_context(ctx) + update_sm.transition(ctx['START_STATE']) + +def start_requesting_blocks(update_sm, ctx): + """ Start requesting redundant archive blocks. """ + check_keys(ctx, ('REQUEST_URI', 'ARCHIVE_CACHE_DIR')) + create_dirs(ctx.ui_, + ctx['ARCHIVE_CACHE_DIR'], + ctx['REQUEST_URI']) + ctx['START_STATE'] = ARC_REQUESTING_URI + start(update_sm, ctx) + +# Doesn't check! Just fails w/ collision +def start_inserting_blocks(update_sm, ctx): + """ Start inserting redundant archive blocks. """ + check_keys(ctx, ('REQUEST_URI', 'INSERT_URI', 'ARCHIVE_CACHE_DIR', + 'PROVISIONAL_TOP_KEY', 'ARCHIVE_BLOCK_FILES')) + create_dirs(ctx.ui_, + ctx['ARCHIVE_CACHE_DIR'], + ctx['REQUEST_URI']) + ctx['START_STATE'] = ARC_INSERTING_BLOCKS + start(update_sm, ctx) + diff --git a/infocalypse/archivetop.py b/infocalypse/archivetop.py new file mode 100644 --- /dev/null +++ b/infocalypse/archivetop.py @@ -0,0 +1,130 @@ +""" +Helper functions used to read and write the binary rep. of incremental +archive top keys. + +Eccentricities: +0) There is a salt byte whose only purpose is to perturb the hash + value of the binary rep. +1) There are multiple CHKS per block. + +This allows us to do top key redundancy like in Infocalypse. +""" + +import struct + +from fcpconnection import sha1_hexdigest +from chk import CHK_SIZE, bytes_to_chk, chk_to_bytes +from topkey import default_out + +MAJOR_VERSION = '01' +MINOR_VERSION = '02' + +HDR_VERSION = MAJOR_VERSION + MINOR_VERSION +HDR_PREFIX = 'WORM' +HDR_BYTES = HDR_PREFIX + HDR_VERSION + +# Header length 'WORM0100' +HDR_SIZE = 8 +assert len(HDR_BYTES) == HDR_SIZE + +KNOWN_VERS = (HDR_BYTES, ) +EXPECTED_VER = KNOWN_VERS[-1] + +# <header><salt_byte><age><num_blocks><num_root_objects> +BASE_FMT = "!%isBIBB" % HDR_SIZE +BASE_LEN = struct.calcsize(BASE_FMT) + +# <root_object> = <20 byte SHA1><kind> +ROOT_OBJ_FMT = "!20sI" +ROOT_OBJ_LEN = struct.calcsize(ROOT_OBJ_FMT) + +BLOCK_BASE_FMT = "!qLB" +BLOCK_BASE_LEN = struct.calcsize(BLOCK_BASE_FMT) + +def check_version(version): + """ Raises a ValueError if the format version isn't parsable. """ + if version != EXPECTED_VER: + raise ValueError("Can't parse format version. Saw: %s, expected: %s" % + (version, EXPECTED_VER)) + + +def bytes_to_top_key_tuple(bytes): + """ Writes the binary rep of an archive top key. + + Where top_key_tuple is: + ( ((length, (chk,), max_age), .. ), ((root_sha, kind), ..), age ) + """ + + if len(bytes) < BASE_LEN: + raise ValueError("Not enough data to parse static fields.") + + if not bytes.startswith(HDR_PREFIX): + raise ValueError("Doesn't look like %s top key binary data." % + HDR_PREFIX) + + + version, salt, age, num_blocks, num_obs = struct.unpack(BASE_FMT, + bytes[:BASE_LEN]) + + check_version(version) + + bytes = bytes[BASE_LEN:] + + blocks = [] + for dummy0 in range(0, num_blocks): + length, age, chk_count = struct.unpack(BLOCK_BASE_FMT, + bytes[:BLOCK_BASE_LEN]) + bytes = bytes[BLOCK_BASE_LEN:] + + chks = [] + for dummy1 in range(0, chk_count): + chks.append(bytes_to_chk(bytes[:CHK_SIZE])) + bytes = bytes[CHK_SIZE:] + + blocks.append((length, tuple(chks), age)) + + + root_objs = [] + for dummy2 in range(0, num_obs): + root_objs.append(struct.unpack(ROOT_OBJ_FMT, + bytes[:ROOT_OBJ_LEN])) + bytes = bytes[ROOT_OBJ_LEN:] + + return ((tuple(blocks), tuple(root_objs), age), salt) + +def top_key_tuple_to_bytes(values, salt_byte=0): + """ Reads the binary rep of an archive top key. """ + + ret = struct.pack(BASE_FMT, HDR_BYTES, + salt_byte, + values[2], + len(values[0]), + len(values[1])) + + for block in values[0]: + ret += struct.pack(BLOCK_BASE_FMT, + block[0], block[2], len(block[1])) + for chk in block[1]: + ret += chk_to_bytes(chk) + + for obj in values[1]: + ret += struct.pack(ROOT_OBJ_FMT, + obj[0], + obj[1]) + return ret + +def dump_top_key_tuple(top_key_tuple, out_func=default_out): + """ Debugging function to print a top_key_tuple. """ + out_func("--- %s top key tuple---\n" % HDR_BYTES) + out_func("age: %i\n" % top_key_tuple[2]) + for index, block in enumerate(top_key_tuple[0]): + out_func("block[%i]\n len: %i max age: %i\n" % + (index, block[0], block[2])) + for chk in block[1]: + out_func(" %s\n" % chk) + for index, obj in enumerate(top_key_tuple[1]): + out_func("root_sha[%i]: %s %i\n" % (index, sha1_hexdigest(obj[0]), + obj[1])) + + out_func("---\n") + diff --git a/infocalypse/arclocal.py b/infocalypse/arclocal.py new file mode 100644 --- /dev/null +++ b/infocalypse/arclocal.py @@ -0,0 +1,297 @@ +""" Classes and functions to manage a local incremental archive cache. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +import os +import re +import shutil +import random + +import archivetop +from fcpclient import get_version, get_usk_hash +from graph import MAX_METADATA_HACK_LEN +from archivesm import choose_word, chk_file_name, BLOCK_DIR, TMP_DIR, \ + TOP_KEY_NAME_FMT + +# Archive stuff +from pathhacks import add_parallel_sys_path +add_parallel_sys_path('wormarc') +from blocks import BlockStorage, ITempFileManager +from archive import WORMBlockArchive, UpToDateException +from deltacoder import DeltaCoder +from filemanifest import FileManifest, entries_from_dir, manifest_to_dir + +BLOCK_NAME = "block" + +# REDFLAG: move to manifest.py ??? +# An integer type constant for the root object in the archive. +# i.e. so that we know how to read it. +KIND_MANIFEST = 1 # 32 bit unsigned id value. + +# MUST match blocknames.BLOCK_SUFFIX +BLOCK_NAME_FMT = BLOCK_NAME +"_%i.bin" + +class HandleTemps(ITempFileManager): + """ Delegate to handle temp file creation and deletion. """ + def __init__(self, base_dir): + ITempFileManager.__init__(self) + self.base_dir = base_dir + + def make_temp_file(self): + """ Return a new unique temp file name including full path. """ + return os.path.join(self.base_dir, "__TMP__%s" % + str(random.random())[2:]) + + def remove_temp_file(self, full_path): + """ Remove and existing temp file. """ + + if not full_path: + return # Allowed. + + if not os.path.split(full_path)[-1].startswith("__TMP__"): + raise IOError("Didn't create: %s" % full_path) + + if not os.path.exists(full_path): + return + os.remove(full_path) + +def cache_dir_name(cache_dir, uri): + """ Return the name of the cache directory. """ + return os.path.join(cache_dir, get_usk_hash(uri)) + +def cached_block(cache_dir, uri, block): + """ Return the file name of a cached block. """ + for chk in block[1]: + full_path = os.path.join(cache_dir_name(cache_dir, uri), + chk_file_name(chk)) + if os.path.exists(full_path): + if os.path.getsize(full_path) != block[0]: + raise IOError("Wrong size: %s, expected: %i, got: %i" % + (full_path, block[0], + os.path.getsize(full_path))) + #print "FOUND: ", chk + return full_path + #else: + # print "MISSING: ", chk + + raise IOError("Not cached: %s" % str(block)) + +def load_cached_top_key(cache_dir, uri): + """ Return a top key tuple from a cached top key. """ + full_path = os.path.join(cache_dir_name(cache_dir, uri), + TOP_KEY_NAME_FMT % get_version(uri)) + + in_file = open(full_path, 'rb') + try: + try: + return archivetop.bytes_to_top_key_tuple(in_file.read())[0] + except ValueError: + # Remove the corrupt file from the cache. + in_file.close() + if os.path.exists(full_path): + os.remove(full_path) + raise + finally: + in_file.close() + +# Means retrievable, NOT that every block is cached. Change name? +def verify_fully_cached(cache_dir, uri, top_key): + """ Raise an IOError if all blocks in top_key aren't + retrievable. """ + for block in top_key[0]: + cached_block(cache_dir, uri, block) + +def setup_block_dir(cache_dir, uri, top_key=None, copy_blocks=False, + pad_to=4): + """ Create a temporary block directory for reading and writing + archive blocks. """ + block_dir = os.path.join(cache_dir, BLOCK_DIR) + if os.path.exists(block_dir): + shutil.rmtree(block_dir) # Hmmmm... + os.makedirs(block_dir) + + if copy_blocks: + for index, block in enumerate(top_key[0]): + src = cached_block(cache_dir, uri, block) + dest = os.path.join(block_dir, + BLOCK_NAME_FMT % index) + shutil.copyfile(src, dest) + # 'pad' with empty block files. + for index in range(len(top_key[0]), pad_to): + dest = os.path.join(block_dir, + BLOCK_NAME_FMT % index) + out_file = open(dest, 'wb') + out_file.close() + + return block_dir + + +def create_archive(cache_dir, uri): + """ Create a new archive. """ + block_dir = setup_block_dir(cache_dir, uri) + + tmps = HandleTemps(os.path.join(cache_dir, TMP_DIR)) + archive = WORMBlockArchive(DeltaCoder(), BlockStorage(tmps)) + archive.create(block_dir, BLOCK_NAME) + + return archive + +def load_cached_archive(cache_dir, uri): + """ Load an archive from the cache. """ + top_key = load_cached_top_key(cache_dir, uri) + if len(top_key[1]) != 1 or top_key[1][0][1] != KIND_MANIFEST: + raise Exception("Can't read manifest from archive.") + + verify_fully_cached(cache_dir, uri, top_key) + + # Clear previous block dir and copy cached blocks into it. + block_dir = setup_block_dir(cache_dir, uri, top_key, True) + + tmps = HandleTemps(os.path.join(cache_dir, TMP_DIR)) + archive = WORMBlockArchive(DeltaCoder(), BlockStorage(tmps)) + archive.load(block_dir, BLOCK_NAME) + + # IMPORTANT: Set tags so we can keep track of + # unchanged blocks. + for index in range(0, len(top_key[0])): + archive.blocks.tags[index] = str(index) + + return top_key, archive + +# Returns the files you need to insert and a provisional +# top key tuple with 'CHK@' for new files. +def provisional_top_key(archive, manifest, old_top_key, reinsert=False): + """ Create a new top key which has place holder 'CHK@' block CHKs + for new blocks. + + Return (file_list, top_key) + + where file_list is a list of files to insert the new CHKs + from and top_key is the provisional top key tuple. + """ + files = [] + blocks = [] + for index, tag in enumerate(archive.blocks.tags): + if reinsert or tag == '' or tag == 'new': # REDFLAG: make constant? + full_path = archive.blocks.full_path(index) + length = os.path.getsize(full_path) + if length == 0: + continue # Skip empty blocks. + files.append(full_path) + if length < MAX_METADATA_HACK_LEN: + blocks.append((length, ['CHK@', 'CHK@'], archive.age)) + else: + # MUST get the number of required CHKs right or + # FixingUpTopkey will fail. + blocks.append((length, ['CHK@', ], archive.age)) + continue + blocks.append(old_top_key[0][int(tag)]) + + provisional = (tuple(blocks), ((manifest.stored_sha, KIND_MANIFEST),), + archive.age) + + return files, provisional + +# Archive MUST be fully locally cached. +def local_reinsert(cache_dir, uri): + """ Return the top_key, file list info needed to fully reinsert + the archive. """ + # Load cached topkey + top_key, archive = load_cached_archive(cache_dir, uri) + try: + manifest = FileManifest.from_archive(archive, top_key[1][0][0]) + return provisional_top_key(archive, manifest, top_key, True) + finally: + archive.close() + +# Only modifies local <cache_dir>/blocks directory. +# REQUIRES: cached topkey and blocks +def local_update(cache_dir, uri, from_dir): + """ Update the archive by inserting deltas against from_dir. """ + # Load cached topkey + top_key, archive = load_cached_archive(cache_dir, uri) + try: + # Load the old file manifest and use it to update. + manifest = FileManifest.from_archive(archive, top_key[1][0][0]) + try: + manifest.update(archive, + entries_from_dir(from_dir, True, + make_skip_regex(cache_dir))) + except UpToDateException: + # Hmmm don't want to force client code + # to import archive module + return (None, None) + + return provisional_top_key(archive, manifest, top_key) + finally: + archive.close() + +# A regex that skips the archive cache dir +def make_skip_regex(cache_dir): + """ INTERNAL: Regular expression to ignore the local cache directory. """ + first, second = os.path.split(cache_dir) + if second == '': + first, second = os.path.split(first) + assert not second == '' + #print "SKIPPING: ", second + return re.compile(".*%s$" % second.replace('.', '\.')) + +def local_create(cache_dir, uri, from_dir): + """ Create a new local archive. """ + + # Load cached topkey + archive = create_archive(cache_dir, uri) + try: + # Create an empty manifest and use it to update the archive. + manifest = FileManifest() + + manifest.update(archive, + entries_from_dir(from_dir, + True, + make_skip_regex(cache_dir))) + + return provisional_top_key(archive, manifest, ((), (), 0)) + finally: + archive.close() + +# Overwrites! +# LATER: verify=False +def local_synch(ui_, cache_dir, uri, to_dir): + """ Update to_dir from the archive in cache_dir. + + CAUTION: May delete files and directories. + """ + + top_key, archive = load_cached_archive(cache_dir, uri) + try: + # Load the old file manifest and use it to extract. + manifest = FileManifest.from_archive(archive, top_key[1][0][0]) + + result = manifest_to_dir(archive, manifest, + to_dir, make_skip_regex(cache_dir)) + ui_.status(("Created: %i, Modified: %i, Removed: %i\n") % + (len(result[0]), len(result[1]), len(result[2]))) + + if len(result[3]) > 0: + ui_.status("Removed %i local %s.\n" % (len(result[3]), + choose_word(result[3] == 1, "subdirectory", "subdirectories"))) + + finally: + archive.close() diff --git a/infocalypse/config.py b/infocalypse/config.py --- a/infocalypse/config.py +++ b/infocalypse/config.py @@ -393,7 +393,6 @@ def read_freesite_cfg(ui_, repo, params, ui_.status('Using config file:\n%s\n' % cfg_file) if not os.path.exists(cfg_file): ui_.warn("Can't read: %s\n" % cfg_file) - # REDFLAG: DCI TEST raise util.Abort(no_cfg_err) parser = ConfigParser() @@ -405,7 +404,6 @@ def read_freesite_cfg(ui_, repo, params, # wiki specific if params['ISWIKI']: - # REDFLAG: DCI test error params['WIKI_ROOT'] = parser.get('default', 'wiki_root') else: params['SITE_DIR'] = parser.get('default', 'site_dir') diff --git a/infocalypse/devnotes.txt b/infocalypse/devnotes.txt --- a/infocalypse/devnotes.txt +++ b/infocalypse/devnotes.txt @@ -1,4 +1,21 @@ !!! experimental branch for testing wiki over hg idea !!! +See: +USK@kRM~jJVREwnN2qnA8R0Vt8HmpfRzBZ0j4rHC2cQ-0hw,2xcoQVdQLyqfTpF2DpkdUIbHFCeL4W~2X1phUYymnhM,AQACAAE/fniki_demo/3/ +!!! + +djk20091208 +Failed attempt to DRY out: + +THIS_IS_A_CONSTANT = 'THIS_IS_A_CONSTANT' + +constant defs. + +# pylint doesn't infer the names so it reports them as undefined +# def str_constant(name): +# assert not name is globals() +# globals()[name] = name +#str_constant('ARC_REQUESTING_URI') + djk20091111 Remember to test pathhacks under windows. If it works, use it to clean diff --git a/infocalypse/infcmds.py b/infocalypse/infcmds.py --- a/infocalypse/infcmds.py +++ b/infocalypse/infcmds.py @@ -45,7 +45,9 @@ from updatesm import UpdateStateMachine, REQUESTING_URI_4_INSERT, INSERTING_BUNDLES, INSERTING_GRAPH, \ INSERTING_URI, FAILING, REQUESTING_URI_4_COPY, CANCELING, \ REQUIRES_GRAPH_4_HEADS, REQUESTING_GRAPH_4_HEADS, \ - RUNNING_SINGLE_REQUEST, CleaningUp + RUNNING_SINGLE_REQUEST, CleaningUp, UpdateContext + +from archivesm import ArchiveStateMachine, ArchiveUpdateContext from statemachine import StatefulRequest @@ -287,6 +289,7 @@ class PatchedCleaningUp(CleaningUp): CleaningUp.enter(self, from_state) # REDFLAG: remove store_cfg +# DCI: retest! esp. infocalypse stuff def setup(ui_, repo, params, stored_cfg): """ INTERNAL: Setup to run an Infocalypse extension command. """ # REDFLAG: choose another name. Confusion w/ fcp param @@ -315,7 +318,8 @@ def setup(ui_, repo, params, stored_cfg) callbacks = UICallbacks(ui_) callbacks.verbosity = verbosity - cache = BundleCache(repo, ui_, params['TMP_DIR']) + if not repo is None: + cache = BundleCache(repo, ui_, params['TMP_DIR']) try: async_socket = PolledSocket(params['FCP_HOST'], params['FCP_PORT']) @@ -331,7 +335,20 @@ def setup(ui_, repo, params, stored_cfg) raise err runner = RequestRunner(connection, params['N_CONCURRENT']) - update_sm = UpdateStateMachine(runner, repo, ui_, cache) + + if repo is None: + # For incremental archives. + ctx = ArchiveUpdateContext() + update_sm = ArchiveStateMachine(runner, ctx) + else: + # For Infocalypse repositories + ctx = UpdateContext(None) + ctx.repo = repo + ctx.ui_ = ui_ + ctx.bundle_cache = cache + update_sm = UpdateStateMachine(runner, ctx) + + update_sm.params = params.copy() update_sm.transition_callback = callbacks.transition_callback update_sm.monitor_callback = callbacks.monitor_callback @@ -399,7 +416,9 @@ def cleanup(update_sm): if not update_sm.runner is None: update_sm.runner.connection.close() - update_sm.ctx.bundle_cache.remove_files() + # DCI: what will force cleanup of archive temp files? + if not update_sm.ctx.bundle_cache is None: + update_sm.ctx.bundle_cache.remove_files() # This function needs cleanup. # REDFLAG: better name. 0) inverts 1) updates indices from cached state. @@ -507,7 +526,6 @@ def handle_updating_config(repo, update_ #print "UPDATED STORED CONFIG(1)" Config.to_file(stored_cfg) - def is_redundant(uri): """ Return True if uri is a file USK and ends in '.R1', False otherwise. """ diff --git a/infocalypse/statemachine.py b/infocalypse/statemachine.py --- a/infocalypse/statemachine.py +++ b/infocalypse/statemachine.py @@ -146,10 +146,11 @@ class DecisionState(RequestQueueState): def enter(self, from_state): """ Immediately drive transition to decide_next_state(). """ target_state = self.decide_next_state(from_state) - assert target_state != self + assert target_state != self.name assert target_state != from_state self.parent.transition(target_state) + # State instance NOT name. def decide_next_state(self, dummy_from_state): """ Pure virtual. diff --git a/infocalypse/test_block_redundancy.py b/infocalypse/test_block_redundancy.py new file mode 100644 --- /dev/null +++ b/infocalypse/test_block_redundancy.py @@ -0,0 +1,448 @@ +""" Minimal UTs for InsertingRedundantBlocks, RequestingRedundantBlocks. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +# REQUIRES: mercurial in PYTHONPATH! +# REDFLAG: LATER: Add test case for blocks too big to salt! (tested manually) + +# OK to be a little sloppy for test code. +# pylint: disable-msg=C0111 +# For setUp() and tearDown() +# pylint: disable-msg=C0103 +# Allow attribute creation in setUp() +# pylint: disable-msg=W0201 +# Allow test methods that don't reference self. +# pylint: disable-msg=R0201 +# Allow many test methods. +# pylint: disable-msg=R0904 +# Allow mocked ui class, FakeUI() with only 2 public methods. +# pylint: disable-msg=R0903 +import os +import random +import shutil +import sys +import unittest + +from infcmds import UICallbacks, run_until_quiescent +from fcpconnection import PolledSocket, FCPConnection +from fcpclient import FCPClient +from requestqueue import RequestRunner +from statemachine import State +from archivesm import ArchiveStateMachine, ArchiveUpdateContext, \ + create_dirs, start, InsertingRedundantBlocks, RequestingRedundantBlocks, \ + chk_file_name + +from updatesm import FAILING, FINISHING, QUIESCENT +from graph import FREENET_BLOCK_LEN + +TEST_BASE = '/tmp' +TEST_ROOT = '__block_test_run__' +TMP_DIR = "__TMP__" + +FCP_HOST = '127.0.0.1' +FCP_PORT = 19481 +N_CONCURRENT = 4 +POLL_SECS = 0.25 +CANCEL_TIME_SECS = 5 * 60 + +# Doesn't need to be fetchable, just for building cache subdir. +SOME_USK = ('USK@Q60BTelEyg6V2KK97k1WNHA7N77pkE-v3m5~hHbm3ew,' + + 'IgRKyz2LoDCv0a1ptc5ycWtYknNP6DL1E8o4VM0tZ6Q,AQACAAE/small/4') + + +BLOCK_DEF = \ +((44003, ('CHK@A6rpa~7jmUbZ55fugPuziwrZdLhmDUo6OorLVGB45f8,' + + '4P~momeirpQpvnCIqT3P5D5Z~a486IQXqI3s7R6FQjg,AAIC--8', + 'CHK@LzqlbkyyUAixGXD52kMu8uad1CxGgW0QGSHNP-WrP-4,' + + 'mc3P0kb17xpAHtjh2rG2EfDWujp8bN0~L5GuezNV50E,AAIC--8') + ),) + +FILE_BLOCKS_DEF = \ +(('', 44003, ('CHK@A6rpa~7jmUbZ55fugPuziwrZdLhmDUo6OorLVGB45f8,' + + '4P~momeirpQpvnCIqT3P5D5Z~a486IQXqI3s7R6FQjg,AAIC--8', + 'CHK@LzqlbkyyUAixGXD52kMu8uad1CxGgW0QGSHNP-WrP-4,' + + 'mc3P0kb17xpAHtjh2rG2EfDWujp8bN0~L5GuezNV50E,AAIC--8') + ),) + + +# State stored across tests. +SHARED_STATE = {} +#SHARED_STATE['FILE_BLOCKS'] = FILE_BLOCKS_DEF + + +def bytes(count, offset): + return "".join([chr((index + offset) % 256) for index in range(0, count)]) + +def bad_chk_itr(): + ordinal = ord('A') + + while ordinal <= ord('Z'): + yield (('CHK@badroutingkey0%s0JblbGup0yNSpoDJgVPnL8E5WXoc,' % + chr(ordinal)) + + 'KZ6azHOwEm4ga6dLy6UfbdSzVhJEz3OvIbSS4o5BMKU,AAIC--8') + ordinal += 1 + return + +BAD_CHKS = bad_chk_itr() + +def break_primary(chks): + chks = list(chks) + assert len(chks) > 0 + chks[0] = BAD_CHKS.next() + return chks + +def break_redundant(chks): + chks = list(chks) + assert len(chks) > 0 + chks[-1] = BAD_CHKS.next() + return chks + +# Not sure that this will work. +class FakeUI: + def __init__(self): + pass + + def status(self, text): + if text.endswith('\n'): + text = text[:-1] + print text + +class HoldingBlocks(State): + """ State to hold blockd for testing RequestingRedundantBlocks """ + def __init__(self, parent, name, next_state): + State.__init__(self, parent, name) + self.next_state = next_state + self.blocks = () + + def enter(self, dummy_from_state): + """ State implemenation. """ + + print self.blocks + self.parent.transition(self.next_state) + + def reset(self): + pass + + def get_blocks(self): + """ Return the cached blocks. """ + return self.blocks + +class RedundancyTests(unittest.TestCase): + def setup_test_dirs(self, base_dir, dir_name): + if not os.path.exists(base_dir): + raise IOError("Base test directory doesn't exist: %s" % base_dir) + + full_path = os.path.join(base_dir, dir_name) + if os.path.exists(full_path): + raise IOError("Test directory exists: %s" % full_path) + + os.makedirs(full_path) + self.test_root = full_path + self.tmp_dir = os.path.join(self.test_root, TMP_DIR) + os.makedirs(self.tmp_dir) + + def remove_test_dirs(self): + assert self.test_root.endswith(TEST_ROOT) + if os.path.exists(self.test_root): + shutil.rmtree(self.test_root) + + def setUp(self): + self.setup_test_dirs(TEST_BASE, TEST_ROOT) + self.connection = None + + def tearDown(self): + if not self.connection is None: + self.connection.close() + + self.remove_test_dirs() + + def make_state_machine(self): + if not self.connection is None: + self.connection.close() + + callbacks = UICallbacks(FakeUI()) + callbacks.verbosity = 5 + # Knows about reading and writing bytes. + async_socket = PolledSocket(FCP_HOST, FCP_PORT) + # Knows about running the FCP protocol over async_socket. + self.connection = FCPConnection(async_socket, True, + callbacks.connection_state) + # Knows about running requests from a request queue. + runner = RequestRunner(self.connection, N_CONCURRENT) + # Knows how to run series of requests to perform operations + # on an archive in Freenet. + sm = ArchiveStateMachine(runner, ArchiveUpdateContext()) + sm.transition_callback = callbacks.transition_callback + sm.monitor_callback = callbacks.monitor_callback + sm.params['CANCEL_TIME_SECS'] = CANCEL_TIME_SECS + + return sm + + + def checkCHK(self, chk, logical_len, length, data=None): + print "---" + print "Checking: ", chk + # Something is closing the connection? + resp = FCPClient.connect(FCP_HOST, FCP_PORT).get(chk) + self.assertTrue(resp[0] == 'AllData') + print "Length: ", len(resp[2]) + print "Mime_Type: ", resp[1]['Metadata.ContentType'] + if len(resp[2]) != length: + print "Expected len: %i, got: %i!" % (length, len(resp[2])) + self.assertTrue(False) + if not data is None and resp[2][:logical_len] != data: + print "Data doesn't match! (only showing first 16 bytes below)" + print "got: ", repr(resp[2][:logical_len][:16]) + print "expected: " , repr(data[:16]) + self.assertTrue(False) + + def _testCheckCHK(self): + self.make_state_machine() + self.checkCHK("CHK@Q~xLO5t0tVCkrJ8MAZUeFijK090CsJdJ1RGoRQPbUfY," + + "gWj4935igWd~LuhckS6bST~-qfJ5oW8E5YEa7Yy-tzk,AAIC--8", + 32767, + 32767 + 1) + + def test_inserting(self): + # Takes longer to insert existing blocks? + offset = random.randrange(0, 256) + print "offset: ", offset + lengths = (FREENET_BLOCK_LEN - 1, + FREENET_BLOCK_LEN, + FREENET_BLOCK_LEN + 1, + 1, + FREENET_BLOCK_LEN + 11235, + ) + + insert_files = [] + for index, length in enumerate(lengths): + full_path = os.path.join(self.tmp_dir, + "%i.bin" % index) + out_file = open(full_path, 'wb') + out_file.write(bytes(length, offset)) + out_file.close() + self.assertTrue(os.path.getsize(full_path) == length) + insert_files.append(full_path) + + update_sm = self.make_state_machine() + self.assertTrue(not 'TEST_STATE' in update_sm.states) + update_sm.states['TEST_STATE'] = ( + InsertingRedundantBlocks(update_sm, + 'TEST_STATE', + FINISHING, + FAILING)) + + + ctx = ArchiveUpdateContext(update_sm, FakeUI()) + ctx.update({'ARCHIVE_CACHE_DIR':self.tmp_dir, + 'REQUEST_URI':SOME_USK, + 'ARCHIVE_BLOCK_FILES':insert_files, + 'START_STATE':'TEST_STATE'}) + + create_dirs(ctx.ui_, + ctx['ARCHIVE_CACHE_DIR'], + ctx['REQUEST_URI']) + + start(update_sm, ctx) + run_until_quiescent(update_sm, POLL_SECS) + self.assertTrue(update_sm.get_state(QUIESCENT). + arrived_from(((FINISHING,)))) + + blocks = update_sm.states['TEST_STATE'].files + for index, entry in enumerate(blocks): + print "block [%i]: len: %i" % (index, entry[1]) + for chk in entry[2]: + print " ", chk + + # FREENET_BLOCK_LEN - 1, first is unpadded + self.checkCHK(blocks[0][2][0], blocks[0][1], blocks[0][1], + bytes(blocks[0][1], offset)) + # FREENET_BLOCK_LEN - 1, second is padded + self.checkCHK(blocks[0][2][1], blocks[0][1], blocks[0][1] + 1, + bytes(blocks[0][1], offset)) + + # FREENET_BLOCK_LEN first is padded + self.checkCHK(blocks[1][2][0], blocks[1][1], blocks[1][1] + 1, + bytes(blocks[1][1], offset)) + # FREENET_BLOCK_LEN second is padded + self.checkCHK(blocks[1][2][1], blocks[1][1], blocks[1][1] + 1, + bytes(blocks[1][1], offset)) + + # FREENET_BLOCK_LEN + 1, first is unpadded + self.checkCHK(blocks[2][2][0], blocks[2][1], blocks[2][1], + bytes(blocks[2][1], offset)) + # FREENET_BLOCK_LEN + 1, second is unpadded + self.checkCHK(blocks[2][2][1], blocks[2][1], blocks[2][1], + bytes(blocks[2][1], offset)) + + # 1, first is unpadded + self.checkCHK(blocks[3][2][0], blocks[3][1], blocks[3][1], + bytes(blocks[3][1], offset)) + + # 1, second is padded + self.checkCHK(blocks[3][2][1], blocks[3][1], blocks[3][1] + 1, + bytes(blocks[3][1], offset)) + + + # FREENET_BLOCK_LEN + 11235, first is unpadded + self.checkCHK(blocks[4][2][0], blocks[4][1], blocks[4][1], + bytes(blocks[4][1], offset)) + + # FREENET_BLOCK_LEN + 11235, second is unpadded + self.checkCHK(blocks[4][2][1], blocks[4][1], blocks[4][1], + bytes(blocks[4][1], offset)) + + # Save info for use in request testing + SHARED_STATE['FILE_BLOCKS'] = blocks + SHARED_STATE['OFFSET'] = offset + + + def setup_request_sm(self): + """ Helper sets up a state machine instance containing a + RequestingRedundantBlocks instance. """ + update_sm = self.make_state_machine() + self.assertTrue(not 'TEST_HAS_BLOCKS' in update_sm.states) + + update_sm.states['TEST_HAS_BLOCKS'] = ( + HoldingBlocks(update_sm, 'TEST_HAS_BLOCKS', + 'TEST_REQUESTING')) + + + update_sm.states['TEST_REQUESTING'] = ( + RequestingRedundantBlocks(update_sm, + 'TEST_REQUESTING', + FINISHING, + FAILING)) + + ctx = ArchiveUpdateContext(update_sm, FakeUI()) + ctx.update({'ARCHIVE_CACHE_DIR':self.tmp_dir, + 'REQUEST_URI':SOME_USK, + 'START_STATE':'TEST_HAS_BLOCKS'}) + + create_dirs(ctx.ui_, + ctx['ARCHIVE_CACHE_DIR'], + ctx['REQUEST_URI']) + + return (ctx, update_sm, update_sm.states['TEST_HAS_BLOCKS']) + + + def verify_not_cached(self, ctx, blocks): + for block in blocks: + for chk in block[1]: + full_path = os.path.join(ctx.arch_cache_dir(), + chk_file_name(chk)) + if os.path.exists(full_path): + print "Already cached: ", chk + self.assertTrue(False) + + + def verify_cached(self, ctx, blocks): + for index, block in enumerate(blocks): + count = 0 + for ordinal, chk in enumerate(block[1]): + full_path = os.path.join(ctx.arch_cache_dir(), + chk_file_name(chk)) + if os.path.exists(full_path): + print "%s: CACHED" % str((index, ordinal)) + self.assertTrue(os.path.getsize(full_path) == + block[0]) + count += 1 + else: + print "%s: MISSING" % str((index, ordinal)) + self.assertTrue(count > 0) + + + # REQUIRES: test_inserting run first. + def test_requesting_all(self): + if not 'FILE_BLOCKS' in SHARED_STATE: + print "You must run test_inserting() before this test." + self.assertTrue(False) + + ctx, update_sm, start_state = self.setup_request_sm() + + blocks = [] + for entry in SHARED_STATE['FILE_BLOCKS']: + blocks.append((entry[1], tuple(entry[2]))) + + self.verify_not_cached(ctx, blocks) + start_state.blocks = tuple(blocks) + + start(update_sm, ctx) + run_until_quiescent(update_sm, POLL_SECS) + self.assertTrue(update_sm.get_state(QUIESCENT). + arrived_from(((FINISHING,)))) + + self.verify_cached(ctx, blocks) + + def test_requesting_primary(self): + if not 'FILE_BLOCKS' in SHARED_STATE: + print "You must run test_inserting() before this test." + self.assertTrue(False) + + ctx, update_sm, start_state = self.setup_request_sm() + + blocks = [] + for entry in SHARED_STATE['FILE_BLOCKS']: + blocks.append((entry[1], tuple(break_redundant(entry[2])))) + + self.verify_not_cached(ctx, blocks) + start_state.blocks = tuple(blocks) + + start(update_sm, ctx) + run_until_quiescent(update_sm, POLL_SECS) + self.assertTrue(update_sm.get_state(QUIESCENT). + arrived_from(((FINISHING,)))) + + self.verify_cached(ctx, blocks) + + def test_requesting_redundant(self): + if not 'FILE_BLOCKS' in SHARED_STATE: + print "You must run test_inserting() before this test." + self.assertTrue(False) + + ctx, update_sm, start_state = self.setup_request_sm() + + blocks = [] + for entry in SHARED_STATE['FILE_BLOCKS']: + blocks.append((entry[1], tuple(break_primary(entry[2])))) + + self.verify_not_cached(ctx, blocks) + start_state.blocks = tuple(blocks) + + start(update_sm, ctx) + run_until_quiescent(update_sm, POLL_SECS) + self.assertTrue(update_sm.get_state(QUIESCENT). + arrived_from(((FINISHING,)))) + + self.verify_cached(ctx, blocks) + + + +if __name__ == '__main__': + # use -v on command line to get verbose output. + # verbosity keyword arg not supported in 2.6? + if len(sys.argv) >= 2 and sys.argv[1] != '-v': + # Run a single test case + suite = unittest.TestSuite() + suite.addTest(RedundancyTests(sys.argv[1])) + unittest.TextTestRunner().run(suite) + else: + # Run everything. + unittest.main() diff --git a/infocalypse/topkey.py b/infocalypse/topkey.py --- a/infocalypse/topkey.py +++ b/infocalypse/topkey.py @@ -217,7 +217,7 @@ def default_out(text): def dump_top_key_tuple(top_key_tuple, out_func=default_out): """ Debugging function to print a top_key_tuple. """ - out_func("---top key tuple---\n") + out_func("--- %s top key tuple---\n" % HDR_BYTES) for index, chk in enumerate(top_key_tuple[0]): out_func("graph_%s:%s\n" % (chr(ord('a') + index), chk)) for index, update in enumerate(top_key_tuple[1]): diff --git a/infocalypse/updatesm.py b/infocalypse/updatesm.py --- a/infocalypse/updatesm.py +++ b/infocalypse/updatesm.py @@ -40,8 +40,6 @@ from graph import INSERT_NORMAL, INSERT_ pull_bundle, hex_version from graphutil import minimal_graph, graph_to_string, parse_graph from choose import get_top_key_updates -from topkey import bytes_to_top_key_tuple, top_key_tuple_to_bytes, \ - dump_top_key_tuple from statemachine import StatefulRequest, RequestQueueState, StateMachine, \ Quiescent, Canceling, RetryingRequestList, CandidateRequest, \ @@ -50,6 +48,8 @@ from statemachine import StatefulRequest from insertingbundles import InsertingBundles from requestingbundles import RequestingBundles +import topkey + HG_MIME_TYPE = 'application/mercurial-bundle' HG_MIME_TYPE_FMT = HG_MIME_TYPE + ';%i' @@ -58,7 +58,7 @@ PAD_BYTE = '\xff' MAX_SSK_LEN = 1024 -class UpdateContext(dict): +class UpdateContextBase(dict): """ A class to hold inter-state data used while the state machine is running. """ @@ -69,24 +69,48 @@ class UpdateContext(dict): self.parent = parent # Merurial state + self.ui_ = None self.repo = None - self.ui_ = None self.bundle_cache = None # Orphaned request handling hmmm... self.orphaned = {} - # UpdateGraph instance. - self.graph = None - # If this is True states can use the results of index searches on the # public key to update the private key. self['IS_KEYPAIR'] = False - self['TARGET_VERSIONS'] = None self['INSERT_URI'] = 'CHK@' self['REQUEST_URI'] = None + def set_cancel_time(self, request): + """ Sets the timeout on a QueueableRequest. """ + request.cancel_time_secs = time.time() \ + + self.parent.params['CANCEL_TIME_SECS'] + + def orphan_requests(self, from_state): + """ Give away requests that should be allowed to keep running. """ + if not hasattr(from_state, 'pending') or len(from_state.pending) == 0: + return + + for tag in from_state.pending: + request = from_state.pending[tag] + request.tag = "orphaned_%s_%s" % (str(request.tag), from_state.name) + assert not request.tag in self.orphaned + self.orphaned[request.tag] = request + from_state.pending.clear() + + +class UpdateContext(UpdateContextBase): + """ A class to hold inter-state data used while the state machine is + running. """ + + def __init__(self, parent): + UpdateContextBase.__init__(self, parent) + + self.graph = None + self['TARGET_VERSIONS'] = None + def has_versions(self, versions): """ Returns True if all versions are already in the hg repository, False otherwise. """ @@ -109,10 +133,6 @@ class UpdateContext(dict): finally: self.ui_.popbuffer() - def set_cancel_time(self, request): - """ Sets the timeout on a QueueableRequest. """ - request.cancel_time_secs = time.time() \ - + self.parent.params['CANCEL_TIME_SECS'] # REDFLAG: get rid of tag arg? def make_splitfile_metadata_request(self, edge, tag): """ Makes a StatefulRequest for the Freenet metadata for the @@ -221,17 +241,6 @@ class UpdateContext(dict): return (tmp_file, mime_type) - def orphan_requests(self, from_state): - """ Give away requests that should be allowed to keep running. """ - if not hasattr(from_state, 'pending') or len(from_state.pending) == 0: - return - - for tag in from_state.pending: - request = from_state.pending[tag] - request.tag = "orphaned_%s_%s" % (str(request.tag), from_state.name) - assert not request.tag in self.orphaned - self.orphaned[request.tag] = request - from_state.pending.clear() class CleaningUp(Canceling): @@ -439,7 +448,7 @@ class InsertingGraph(StaticRequestList): # fit in an ssk. index = len(updates) - 1 zorch_base = True - while (len(top_key_tuple_to_bytes((chks, updates))) >= MAX_SSK_LEN + while (len(topkey.top_key_tuple_to_bytes((chks, updates))) >= MAX_SSK_LEN and index >= 0): victim = list(updates[index]) # djk20090628 -- There was a bad b_ug here until c47cb6a56d80 which @@ -456,7 +465,7 @@ class InsertingGraph(StaticRequestList): continue zorch_base = False - assert len(top_key_tuple_to_bytes((chks, updates))) < MAX_SSK_LEN + assert len(topkey.top_key_tuple_to_bytes((chks, updates))) < MAX_SSK_LEN return (chks, updates) @@ -473,6 +482,8 @@ class InsertingUri(StaticRequestList): def __init__(self, parent, name, success_state, failure_state): StaticRequestList.__init__(self, parent, name, success_state, failure_state) + self.topkey_funcs = topkey + self.cached_top_key_tuple = None def enter(self, from_state): """ Implementation of State virtual. @@ -483,6 +494,12 @@ class InsertingUri(StaticRequestList): if not hasattr(from_state, 'get_top_key_tuple'): raise Exception("Illegal Transition from: %s" % from_state.name) + + # DCI: Retest non-archive stuff! + # Cache *before* the possible transition below. + top_key_tuple = from_state.get_top_key_tuple() + self.cached_top_key_tuple = top_key_tuple # hmmmm... + if (self.parent.ctx['INSERT_URI'] is None and self.parent.ctx.get('REINSERT', 0) > 0): # Hmmmm... hackery to deal with reinsert w/o insert uri @@ -491,10 +508,9 @@ class InsertingUri(StaticRequestList): assert not self.parent.ctx['INSERT_URI'] is None - top_key_tuple = from_state.get_top_key_tuple() if self.parent.params.get('DUMP_TOP_KEY', False): - dump_top_key_tuple(top_key_tuple, - self.parent.ctx.ui_.status) + self.topkey_funcs.dump_top_key_tuple(top_key_tuple, + self.parent.ctx.ui_.status) salt = {0:0x00, 1:0xff} # grrr.... less code. insert_uris = make_frozen_uris(self.parent.ctx['INSERT_URI'], @@ -504,7 +520,8 @@ class InsertingUri(StaticRequestList): if self.parent.params.get('DUMP_URIS', False): self.parent.ctx.ui_.status("INSERT_URI: %s\n" % uri) self.queue([uri, 0, True, - top_key_tuple_to_bytes(top_key_tuple, salt[index]), + self.topkey_funcs.top_key_tuple_to_bytes(top_key_tuple, + salt[index]), None, None]) self.required_successes = len(insert_uris) @@ -534,6 +551,10 @@ class InsertingUri(StaticRequestList): ret.append(uri) return ret + def get_top_key_tuple(self): + """ Return the top key tuple that it inserted from. """ + return self.cached_top_key_tuple + class RequestingUri(StaticRequestList): """ A state to request the top level URI for an Infocalypse repository. """ @@ -542,6 +563,10 @@ class RequestingUri(StaticRequestList): failure_state) self.try_all = True # Hmmmm... + # hmmmm... Does C module as namespace idiom really belong in Python? + # Git'r done for now. + self.topkey_funcs = topkey + def enter(self, dummy): """ Implementation of State virtual. """ #require_state(from_state, QUIESCENT) @@ -593,8 +618,8 @@ class RequestingUri(StaticRequestList): # Allow pending requests to run to completion. self.parent.ctx.orphan_requests(self) if self.parent.params.get('DUMP_TOP_KEY', False): - dump_top_key_tuple(self.get_top_key_tuple(), - self.parent.ctx.ui_.status) + self.topkey_funcs.dump_top_key_tuple(self.get_top_key_tuple(), + self.parent.ctx.ui_.status) def get_top_key_tuple(self): """ Get the python rep of the data in the URI. """ @@ -603,7 +628,7 @@ class RequestingUri(StaticRequestList): result = candidate[5] if result is None or result[0] != 'AllData': continue - top_key_tuple = bytes_to_top_key_tuple(result[2])[0] + top_key_tuple = self.topkey_funcs.bytes_to_top_key_tuple(result[2])[0] break assert not top_key_tuple is None return top_key_tuple @@ -821,10 +846,11 @@ class UpdateStateMachine(RequestQueue, S """ A StateMachine implementaion to create, push to and pull from Infocalypse repositories. """ - def __init__(self, runner, repo, ui_, bundle_cache): + def __init__(self, runner, ctx): RequestQueue.__init__(self, runner) StateMachine.__init__(self) - + self.ctx = None + self.set_context(ctx) # Do early. States might depend on ctx. self.states = { QUIESCENT:Quiescent(self, QUIESCENT), @@ -905,12 +931,12 @@ class UpdateStateMachine(RequestQueue, S # Must not change any state! self.monitor_callback = lambda parent, client, msg: None - self.ctx = UpdateContext(self) - self.ctx.repo = repo - self.ctx.ui_ = ui_ - self.ctx.bundle_cache = bundle_cache + runner.add_queue(self) - runner.add_queue(self) + def set_context(self, new_ctx): + """ Set the context. """ + self.ctx = new_ctx + self.ctx.parent = self def reset(self): """ StateMachine override. """