Exported from old hg_infocalypse repo at ad77115c2149.
diff --git a/infocalypse/__init__.py b/infocalypse/__init__.py new file mode 100644 --- /dev/null +++ b/infocalypse/__init__.py @@ -0,0 +1,242 @@ +"""Redundant incrementally updateable repositories in Freenet. + +Copyright (C) 2009 Darrell Karbott +License: GPL 2 (or later) + +This extension provides commands to create and maintain +incrementally updateable mercurial repositories in +Freenet. + +REQURIEMENTS: +You MUST be able to connect to a running Freenet node +in order to use this extension. + +For more information on Freenet see: +http://freenetproject.org/ + +ADDING THE EXTENSION: +Add the following to your .hgrc/mercurial.ini file. + + # .hgrc snippet + [extensions] + infocalypse = /path/to/infocalypse_dir + +Where infocalypse_dir is the directory containing +this file. + +SETUP: +Run hg fn-setup once to create the extension's config file. + +By default, it will write the configuration to: +~/.infocalypse on *nix and ~/infocalypse.ini on +Windows. + +If you want to put the config file in a different +location set the cfg_file option in the +[infocalypse] section of your +.hgrc/mercurial.ini file *before* running setup. + +Example .hgrc entry: + +# Snip, from .hgrc +[infocalypse] +cfg_file = /mnt/usbkey/s3kr1t/infocalypse.cfg + +The default temp file dirctory is set to: +~/infocalypse_tmp. It will be created if +it doesn't exist. + +Set the --tmpdir option to use a different +value. + +The config file contains your default +private key and cached information about +what repositories you insert/retrieve. + +It's a good idea to keep it on +a removable drive for maximum security. + +EXAMPLES: + +hg fn-create --uri USK@/test.R1/0 + +Inserts the local hg repository into a new +USK in Freenet, using the private key in your +config file. You can use a full insert URI +value if you want. + +hg fn-push --uri USK@/test.R1/0 + +Pushes incremental changes from the local +directory into the existing repository. + +You can ommit the --uri argument when +you run from the same directory the fn-create +was run in because the insert key -> dir +mapping is saved in the config file. + +If you go to a different directory +do an hg init and type: + +hg fn-pull --uri <request uri from steps above> + +to pull from the repository in Freenet. + +The request uri -> dir mapping is saved after +the first pull, so you can ommit the --uri +argument for subsequent fn-pull invocations. + +HINTS: +The -q, -v and --debug verbosity options are +supported. + +Top level URIs ending in '.R1' are inserted redundantly. + +CONTACT: +djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +Post to freenet group on FMS. + +""" + +# Copyright (C) 2009 Darrell Karbott +# +# Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks + +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public +# License as published by the Free Software Foundation; either +# version 2.0 of the License, or (at your option) any later version. + +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. + +# You should have received a copy of the GNU General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +import os + +from mercurial import commands, util + +from infcmds import get_config_info, execute_create, execute_pull, \ + execute_push, execute_setup + +def set_target_version(ui_, repo, opts, params, msg_fmt): + """ INTERNAL: Update TARGET_VERSION in params. """ + + revs = opts.get('rev') or None + if not revs is None: + if len(revs) > 1: + raise util.Abort("Only a single -r version is supported. ") + rev = revs[0] + ctx = repo[rev] # Fail if we don't have the rev. + params['TO_VERSION'] = rev + if ctx != repo['tip']: + ui_.status(msg_fmt % rev) + +def infocalypse_create(ui_, repo, **opts): + """ Create a new Infocalypse repository in Freenet. """ + params, stored_cfg = get_config_info(ui_, opts) + + insert_uri = opts['uri'] + if insert_uri == '': + # REDFLAG: fix parameter definition so that it is required? + ui_.warn("Please set the insert URI with --uri.\n") + return + + set_target_version(ui_, repo, opts, params, + "Only inserting to version: %s\n") + params['INSERT_URI'] = insert_uri + execute_create(ui_, repo, params, stored_cfg) + +def infocalypse_pull(ui_, repo, **opts): + """ Pull from an Infocalypse repository in Freenet. + """ + params, stored_cfg = get_config_info(ui_, opts) + request_uri = opts['uri'] + if request_uri == '': + request_uri = stored_cfg.get_request_uri(repo.root) + if not request_uri: + ui_.warn("There is no stored request URI for this repo.\n" + "Please set one with the --uri option.\n") + return + + params['REQUEST_URI'] = request_uri + # Hmmmm... can't really implement rev. + execute_pull(ui_, repo, params, stored_cfg) + +def infocalypse_push(ui_, repo, **opts): + """ Push to an Infocalypse repository in Freenet. """ + params, stored_cfg = get_config_info(ui_, opts) + insert_uri = opts['uri'] + if insert_uri == '': + insert_uri = stored_cfg.get_dir_insert_uri(repo.root) + if not insert_uri: + ui_.warn("There is no stored insert URI for this repo.\n" + "Please set one with the --uri option.\n") + return + + set_target_version(ui_, repo, opts, params, + "Only pushing to version: %s\n") + params['INSERT_URI'] = insert_uri + #if opts['requesturi'] != '': + # # DOESN'T search the insert uri index. + # ui_.status(("Copying from:\n%s\nTo:\n%s\n\nThis is an " + # + "advanced feature. " + # + "I hope you know what you're doing.\n") % + # (opts['requesturi'], insert_uri)) + # params['REQUEST_URI'] = opts['requesturi'] + + execute_push(ui_, repo, params, stored_cfg) + +def infocalypse_setup(ui_, **opts): + """ Setup the extension for use for the first time. """ + + execute_setup(ui_, + opts['fcphost'], + opts['fcpport'], + opts['tmpdir']) + +# Can't use None as a default? Means "takes no argument'? +FCP_OPTS = [('', 'fcphost', '', 'fcp host'), + ('', 'fcpport', 0, 'fcp port'), +] + +AGGRESSIVE_OPT = [('', 'aggressive', None, 'aggressively search for the ' + + 'latest USK index'),] + +# Allow mercurial naming convention for command table. +# pylint: disable-msg=C0103 +cmdtable = { + "fn-pull": (infocalypse_pull, + [('', 'uri', '', 'request URI to pull from'),] + + FCP_OPTS + + AGGRESSIVE_OPT, + "[options]"), + + "fn-push": (infocalypse_push, + [('', 'uri', '', 'insert URI to push to'), + # Buggy. Not well thought out. + #('', 'requesturi', '', 'optional request URI to copy'), + ('r', 'rev', [],'maximum rev to push'),] + + FCP_OPTS + + AGGRESSIVE_OPT, + "[options]"), + + "fn-create": (infocalypse_create, + [('', 'uri', '', 'insert URI to create on'), + ('r', 'rev', [],'maximum rev to push'),] + + FCP_OPTS, + "[options]"), + + "fn-setup": (infocalypse_setup, + [('', 'tmpdir', '~/infocalypse_tmp', 'temp directory'),] + + FCP_OPTS, + "[options]"), + } + + +commands.norepo += ' fn-setup' + diff --git a/infocalypse/bundlecache.py b/infocalypse/bundlecache.py new file mode 100644 --- /dev/null +++ b/infocalypse/bundlecache.py @@ -0,0 +1,212 @@ +""" Helper class used by InsertingBundles to create hg bundle files + and cache information about their sizes. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +import os +import shutil +import random + +from mercurial import commands + +from graph import FIRST_INDEX, FREENET_BLOCK_LEN, MAX_REDUNDANT_LENGTH + +def make_temp_file(temp_dir): + """ Make a temporary file name. """ + return os.path.join(temp_dir, '_tmp_' + ('%0.16f' % random.random())[2:14]) + +def is_writable(dir_name): + """ Check whether the directory exists and is writable. """ + tmp_file = os.path.join(dir_name, '_tmp_test_write') + out_file = None + try: + try: + out_file = open(tmp_file, 'wb') + out_file.write('Can I write here?\n') + return True + except IOError: + return False + return True + finally: + if not out_file is None: + out_file.close() + if os.path.exists(tmp_file): + os.remove(tmp_file) + + +class BundleCache: + """ Class to create hg bundle files and cache information about + their sizes. """ + + def __init__(self, repo, ui_, base_dir): + self.graph = None + self.repo = repo + self.ui_ = ui_ + self.redundant_table = {} + self.base_dir = os.path.abspath(base_dir) + assert is_writable(self.base_dir) + self.enabled = True + + def get_bundle_path(self, index_pair): + """ INTERNAL: Get the full path to a bundle file for the given edge. """ + start_info = self.graph.index_table[index_pair[0]] + end_info = self.graph.index_table[index_pair[1]] + return os.path.join(self.base_dir, "_tmp_%s_%s.hg" + % (start_info[1], end_info[1])) + + def get_cached_bundle(self, index_pair, out_file): + """ INTERNAL: Copy the cached bundle file for the edge to out_file. """ + full_path = self.get_bundle_path(index_pair) + if not os.path.exists(full_path): + return None + + if not out_file is None: + # can't do this for paths that don't exist + #assert not os.path.samefile(out_file, full_path) + if os.path.exists(out_file): + os.remove(out_file) + + raised = True + try: + shutil.copyfile(full_path, out_file) + raised = False + finally: + if raised and os.path.exists(out_file): + os.remove(out_file) + + return (os.path.getsize(full_path), out_file, index_pair) + + def update_cache(self, index_pair, out_file): + """ INTERNAL: Store a file in the cache. """ + assert out_file != self.get_bundle_path(index_pair) + + raised = True + try: + shutil.copyfile(out_file, self.get_bundle_path(index_pair)) + raised = False + finally: + if raised and os.path.exists(out_file): + os.remove(out_file) + + def make_bundle(self, graph, index_pair, out_file=None): + """ Create an hg bundle file corresponding to the edge in graph. """ + #print "INDEX_PAIR:", index_pair + assert not index_pair is None + self.graph = graph + + cached = self.get_cached_bundle(index_pair, out_file) + if not cached is None: + #print "make_bundle -- cache hit: ", index_pair + return cached + + delete_out_file = out_file is None + if out_file is None: + out_file = make_temp_file(self.base_dir) + try: + start_info = self.graph.index_table[index_pair[0]] + end_info = self.graph.index_table[index_pair[1]] + + # Hmmm... ok to suppress mercurial noise here. + self.ui_.pushbuffer() + try: + commands.bundle(self.ui_, self.repo, out_file, + None, base=[start_info[1]], rev=[end_info[1]]) + finally: + self.ui_.popbuffer() + + if self.enabled: + self.update_cache(index_pair, out_file) + file_field = None + if not delete_out_file: + file_field = out_file + return (os.path.getsize(out_file), file_field, index_pair) + finally: + if delete_out_file and os.path.exists(out_file): + os.remove(out_file) + + # INTENT: Freenet stores data in 32K blocks. If we can stuff + # extra changes into the bundle file under the block boundry + # we get extra redundancy for free. + def make_redundant_bundle(self, graph, last_index, out_file=None): + """ Make an hg bundle file including the changes in the edge and + other earlier changes if it is possible to fit them under + the 32K block size boundry. """ + self.graph = graph + #print "make_redundant_bundle -- called for index: ", last_index + + if out_file is None and last_index in self.redundant_table: + #print "make_redundant_bundle -- cache hit: ", last_index + return self.redundant_table[last_index] + + size_boundry = None + prev_length = None + earliest_index = last_index - 1 + while earliest_index >= FIRST_INDEX: + pair = (earliest_index, last_index) + #print "PAIR:", pair + bundle = self.make_bundle(graph, + pair, + out_file) + + #print "make_redundant_bundle -- looping: ", earliest_index, \ + # last_index, bundle[0] + assert bundle[0] > 0 # hmmmm + + if size_boundry is None: + size_boundry = ((bundle[0] / FREENET_BLOCK_LEN) + * FREENET_BLOCK_LEN) + prev_length = bundle[0] + if (bundle[0] % FREENET_BLOCK_LEN) == 0: + # REDFLAG: test this code path + self.redundant_table[bundle[2]] = bundle + return bundle # Falls exactly on a 32k boundry + else: + size_boundry += FREENET_BLOCK_LEN + + # Purely to bound the effort spent creating bundles. + if bundle[0] > MAX_REDUNDANT_LENGTH: + #print "make_redundant_bundle -- to big for redundancy" + self.redundant_table[bundle[2]] = bundle + return bundle + + if bundle[0] > size_boundry: + earliest_index += 1 # Can only happen after first pass??? + #print "make_redundant_bundle -- breaking" + break + + earliest_index -= 1 + prev_length = bundle[0] + + bundle = (prev_length, out_file, + (max(FIRST_INDEX, earliest_index), last_index)) + # ^--- possible to fix loop so this is not required? + + #print "make_redundant_bundle -- return: ", bundle + self.redundant_table[bundle[2]] = bundle + return bundle + + def remove_files(self): + """ Remove cached files. """ + for name in os.listdir(self.base_dir): + # Only remove files that we created in case cache_dir + # is set to something like ~/. + if name.startswith("_tmp_"): + os.remove(os.path.join(self.base_dir, name)) + diff --git a/infocalypse/chk.py b/infocalypse/chk.py new file mode 100644 --- /dev/null +++ b/infocalypse/chk.py @@ -0,0 +1,101 @@ +""" Freenet CHK key helper functions. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +import base64 + +# Length of the binary rep of a CHK. +CHK_SIZE = 69 +# Length of a human readable CHK w/o '/' or filename. +ENCODED_CHK_SIZE = 99 + +# REDFLAG: Is this correct? +def freenet_base64_encode(data): + """ INTERNAL: Base64 encode data using Freenet's base64 algo. """ + encoded = base64.b64encode(data, ['~', '-']) + length = len(encoded) + while encoded[length - 1] == '=': + length -= 1 + return encoded[:length] + +# REDFLAG: Is this correct? +def freenet_base64_decode(data): + """ INTERNAL: Base64 decode data using Freenet's base64 algo. """ + while len(data) % 4 != 0: + data += '=' + return base64.b64decode(data, ['~', '-']) + +def bytes_to_chk(bytes): + """ Reads the binary representation of a Freenet CHK and returns + the human readable equivalent. """ + assert len(bytes) == CHK_SIZE + + return 'CHK@' + freenet_base64_encode(bytes[5:37]) + ',' \ + + freenet_base64_encode(bytes[37:69]) + ',' \ + + freenet_base64_encode(bytes[:5]) + +def chk_to_bytes(chk): + """ Returns the binary representation of a Freenet CHK.""" + + assert chk.startswith('CHK@') + # NO / or filename allowed. + assert len(chk) == ENCODED_CHK_SIZE + fields = chk[4:].split(',') + assert len(fields) == 3 + + # [0, 4] -- control bytes + # [5, 36] -- routing key + # [37, 68] -- crypto key + ret = (freenet_base64_decode(fields[2]) + + freenet_base64_decode(fields[0]) + + freenet_base64_decode(fields[1])) + assert len(ret) == CHK_SIZE + + return ret + +# ATTRIBUTION: +# Based on code from SomeDude's ffp-src-1.1.0.zip +# sha1: b765d05ac320d4c89051740bd575040108db9791 ffp-src-1.1.0.zip +def clear_control_bytes(key): + """ Returns a CHK with the control bytes cleared. + + This is used to fetch raw Freenet metadata. + + REQUIRES: key is a CHK key. + """ + + if not key.startswith('CHK@'): + raise ValueError("Only works for CHK keys.") + fields = key.split('/') + key_fields = fields[0].split(',') + + bytes = freenet_base64_decode(key_fields[2]) + + # Hmmm... ok since it is very short + bytes = bytes[:2] + '\x00' + bytes[3:] + ret = key_fields[0] + ',' + key_fields[1] + ',' \ + + freenet_base64_encode(bytes) + + # REDFLAG: different. was there really a bug in somedudes code??? + if len(fields) > 1: + for index in range(1, len(fields)): + ret += '/' + fields[index] + return ret + diff --git a/infocalypse/choose.py b/infocalypse/choose.py new file mode 100644 --- /dev/null +++ b/infocalypse/choose.py @@ -0,0 +1,279 @@ +""" Functions to choose which bundle to fetch next. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +import random + +from graph import MAX_PATH_LEN, block_cost, print_list + +# This is the maximum allowed ratio of allowed path block cost +# to minimum full update block cost. +# It is used in low_block_cost_edges() to determine when a +# path is too expensive to include. +MAX_COST_RATIO = 2.0 + +def step_contains(step, index): + """ Returns True if step contains index. """ + return index > step[0] and index <= step[1] + +# REDFLAG: dog slow for long lists (worse than O(n^2)) +def shuffle_edge_redundancy(graph, first, second, known): + """ INTERNAL: Shuffle the redundancy for redundant edges in + the values returned by get_update_edges. """ + # Kick it Pascal style. + def shuffle_one(graph, current, other, known): + """ INTERNAL: shuffle redundancy for a single edge list. """ + for index, edge in enumerate(current): + if not graph.is_redundant(edge): + continue # Not redundant + + new_ordinal = random.choice((0, 1)) + if new_ordinal == edge[2]: + # NOP is a valid choice. + continue + + alternate_edge = (edge[0], edge[1], new_ordinal) + if alternate_edge in known: # fast + # Already queued + continue + + try: + pos = other.index(alternate_edge) # slow + except ValueError: + pos = -1 + + if pos != -1: + # If already in other, swap + #print "shuffle_one -- swapped with other list %s -> %s" % \ + # (str(current[index]), str(other[pos])) + tmp = other[pos] + other[pos] = current[index] + current[index] = tmp + + continue + try: + pos = current.index(alternate_edge) # slow + except ValueError: + pos = -1 + + if pos != -1: + # If already in current + #print "shuffle_one -- swapped in same list %s -> %s" % \ + # (str(current[index]), str(current[pos])) + current[pos] = current[index] + #else: + # print "shuffle_one -- flipped %s -> %s" % \ + # (str(current[index]), str(alternate_edge)) + + current[index] = alternate_edge + + assert len(set(first).intersection(known)) == 0 + assert len(set(second).intersection(known)) == 0 + + # #ifdef _DEBUG_? only used to check invariants. + first_len = len(first) + second_len = len(second) + + shuffle_one(graph, first, second, known) + shuffle_one(graph, second, first, known) + + assert len(set(first).intersection(known)) == 0 + assert len(set(second).intersection(known)) == 0 + assert len(first) == first_len + assert len(second) == second_len + + +# Returns the number of edges which contain the index. +def contained_edge_count(edges, index, max_count=None): + """ INTERNAL: Helper function returns the number of edges + which contain index. """ + count = 0 + for step in edges: + assert not step is None + if step_contains(step, index): + count += 1 + if not max_count is None and count >= max_count: + return count + return count + +# ORDER: Best candidates first. +# INTENT: +# Inserter does 2 33K updates. The last one causes a "rollup" +# of the entire multi-megabyte repo into one CHK. +# This code is intended to make sure that we try fetching the +# first 33k update before the rollup CHK even though it means +# fetching more keys. +def low_block_cost_edges(graph, known_edges, from_index, allowed): + """ INTERNAL: Returns the best update edges that aren't too big. """ + # MAX_PATH_LEN - 1. If it takes more steps you should be using + # a canonical path. + paths = graph.enumerate_update_paths(from_index + 1, + graph.latest_index, MAX_PATH_LEN - 1) + if len(paths) == 0: + return + + first = [] + + with_cost = [] + for path in paths: + total = 0 + for step in path: + total += block_cost(graph.insert_length(step)) + with_cost.append((total, path)) + with_cost.sort() + #for item in with_cost: + # print "COST: ", item[0], item + + # Ignore paths which are too much bigger than the shortest path. + allowed_cost = int(with_cost[0][0] * MAX_COST_RATIO) + #print "get_update_edges -- min block cost: ", \ + # with_cost[0][0], allowed_cost + # First steps of the paths with a cost <= allowed_cost + first_steps = [[value[1][0], ] for value in with_cost if value[0] + <= allowed_cost] + #print "FIRST_STEPS: ", first_steps + first_steps.sort(graph.cmp_recency) + for path in first_steps: + assert len(path) == 1 + step = path[0] + if step in known_edges: + continue + first.append(step) + known_edges.add(step) + allowed -= 1 + if allowed <= 0: + break + return first + +# ORDER: Most canonical first. +# Best candidates at the end of the list +def canonical_path_edges(graph, known_edges, from_index, allowed): + """ INTERNAL: Returns edges containing from_index from canonical paths. """ + # Steps from canonical paths + paths = graph.canonical_paths(graph.latest_index, MAX_PATH_LEN) + second = [] + #print "get_update_edges -- after" + for path in paths: + # We need the tmp gook because the edges can overlap + # and we want the most recent ones. + tmp = [] + for step in path: + assert not step is None + if (step_contains(step, from_index + 1) and + not step in known_edges): + tmp.append([step, ]) + + tmp.sort(graph.cmp_recency) + for tmp_path in tmp: + assert len(tmp_path) == 1 + assert not tmp_path[0] is None + second.append(tmp_path[0]) + known_edges.add(tmp_path[0]) + allowed -= 1 + if allowed <= 0: + return second + + return second + + +# STEP BACK: +# This function answers two questions: +# 0) What should I request to update as quickly as possible? +# A: Steps from paths <= MAX_COST_RATIO * (minimal block cost) +# if there are any. +# 1) What should I request if that doesn't work. +# A: Most canonical. +# Then backfill by recency. + +# Will I always be able to fully enumerate search paths? or too slow + +# REDFLAG: g'ter done code. Simplify and re-examine efficiency. +# REDFLAG: rename redundancy. redundancy == 2 -> 2 paths, NOT 3 + +# Returns (first_choice_steps, second_choice_steps) +def get_update_edges(graph, from_index, redundancy, shuffle_redundancy=False, + known_edges=None): + """ Gets edges not already in known edges which could be used to + update (pull). """ + + if known_edges is None: + known_edges = set([]) + + assert not None in known_edges + + allowed = redundancy - contained_edge_count(known_edges, from_index + 1, + redundancy) + if allowed <= 0: + # Bail out if we already have enough edges. + return ([], []) + + original_known = known_edges + known_edges = known_edges.copy() + + # 0) First get some low block cost paths. + # Hmmm... make allowed cheap edges a parameter + first = low_block_cost_edges(graph, known_edges, from_index, + min(2, allowed)) + + allowed -= len(first) + second = [] + if allowed > 0: + # 1) Then get edges from canonical path + second = canonical_path_edges(graph, known_edges, + from_index, allowed) + + # Resort by recency. + second_paths = [[edge, ] for edge in second] + second_paths.sort(graph.cmp_recency) + second = [path[0] for path in second_paths] + + allowed -= len(second) + + if allowed > 0: + # 2) Finally backfill with most recent other edges which + # advance us at least one step. + containing_paths = [[edge, ] for edge in + graph.contain(from_index + 1) + if edge not in known_edges] + + containing_paths.sort(graph.cmp_recency) + + for path in containing_paths: + second.insert(0, path[0]) + known_edges.add(path[0]) + allowed -= 1 + if allowed <= 0: + break + + # Hmmmm... previously I was always sorting second by recency. + if shuffle_redundancy: + shuffle_edge_redundancy(graph, first, second, original_known) + + #print "get_update_edges -- exiting", len(first), len(second) + return (first, list(second)) + +def dump_update_edges(first, second, all_edges): + """ Debugging function to print update edges. """ + print "--- update edges --- " + print_list("known edges :", all_edges) + print_list("first choice :", first) + print_list("second choice:", second) + print "---" + diff --git a/infocalypse/config.py b/infocalypse/config.py new file mode 100644 --- /dev/null +++ b/infocalypse/config.py @@ -0,0 +1,206 @@ +""" Helper class used to persist stored state for Infocalypse + mercurial extension. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + + +import os +import sys + +from fcpclient import get_usk_hash, is_usk_file, get_version, \ + get_usk_for_usk_version +from mercurial import util +from ConfigParser import ConfigParser + +if sys.platform == 'win32': + CFG_NAME = 'infocalypse.ini' +else: + CFG_NAME = '.infocalypse' + +DEFAULT_CFG_PATH = '~/%s' % CFG_NAME + +def normalize(usk_or_id): + """ Returns a USK hash. """ + if usk_or_id.startswith('USK'): + usk_or_id = get_usk_hash(usk_or_id) + return usk_or_id + +# Eventually set state from fms feed. i.e. latest repo updates. +class Config: + """ Persisted state used by the Infocalypse mercurial extension. """ + def __init__(self): + # repo_id -> version map + self.version_table = {} + # repo_dir -> request usk map + self.request_usks = {} + # repo_id -> insert uri map + self.insert_usks = {} + self.file_name = None + + # Use a dict instead of members to avoid pylint R0902. + self.defaults = {} + self.defaults['HOST'] = '127.0.0.1' + self.defaults['PORT'] = 9481 + self.defaults['TMP_DIR'] = None + self.defaults['DEFAULT_PRIVATE_KEY'] = None + + def get_index(self, usk_or_id): + """ Returns the highest known USK version for a USK or None. """ + return self.version_table.get(normalize(usk_or_id)) + + def update_index(self, usk_or_id, index): + """ Update the stored index value for a USK. """ + usk_or_id = normalize(usk_or_id) + prev = self.get_index(usk_or_id) + index = abs(index) + if not prev is None and index < prev: + print "update_index -- exiting, new value is lower %i %i %s" % \ + (prev, index, usk_or_id) + return + self.version_table[usk_or_id] = index + + def update_dir(self, repo_dir, usk): + """ Updated the repo USK used pull changes into repo_dir. """ + assert is_usk_file(usk) + repo_dir = os.path.abspath(repo_dir) + self.request_usks[repo_dir] = usk + + def get_request_uri(self, for_dir): + """ Get the repo USK used to pull changes into for_dir or None. """ + uri = self.request_usks.get(os.path.abspath(for_dir)) + if uri is None: + return None + version = self.get_index(uri) + if not version is None: + if version > get_version(uri): + uri = get_usk_for_usk_version(uri, version) + return uri + + def get_insert_uri(self, for_usk_or_id): + """ Get the insert USK for the request USK or None. """ + uri = self.insert_usks.get(normalize(for_usk_or_id)) + if uri is None: + return None + version = self.get_index(for_usk_or_id) + if not version is None: + if version > get_version(uri): + uri = get_usk_for_usk_version(uri, version) + + return uri + + def set_insert_uri(self, for_usk_or_id, insert_usk): + """ Set the insert USK associated with the request USK. """ + self.insert_usks[normalize(for_usk_or_id)] = insert_usk + + # Hmmm... really nescessary? + def get_dir_insert_uri(self, repo_dir): + """ Return the insert USK for repo_dir or None. """ + request_uri = self.request_usks.get(os.path.abspath(repo_dir)) + if request_uri is None: + return None + return self.get_insert_uri(request_uri) + + # MY_KEY/foobar -- i.e. to insert + # MIRROR/freenet -- i.e. to request + #def get_key_alias(self, alias, is_public): + # pass + + @classmethod + def from_file(cls, file_name): + """ Make a Config from a file. """ + file_name = os.path.expanduser(file_name) + parser = ConfigParser() + parser.read(file_name) + cfg = Config() + if parser.has_section('index_values'): + for repo_id in parser.options('index_values'): + cfg.version_table[repo_id] = ( + parser.getint('index_values', repo_id)) + if parser.has_section('request_usks'): + for repo_dir in parser.options('request_usks'): + cfg.request_usks[repo_dir] = parser.get('request_usks', + repo_dir) + if parser.has_section('insert_usks'): + for repo_id in parser.options('insert_usks'): + cfg.insert_usks[repo_id] = parser.get('insert_usks', repo_id) + if parser.has_section('default'): + if parser.has_option('default','host'): + cfg.defaults['HOST'] = parser.get('default','host') + if parser.has_option('default','port'): + cfg.defaults['PORT'] = parser.getint('default','port') + if parser.has_option('default','tmp_dir'): + cfg.defaults['TMP_DIR'] = parser.get('default', 'tmp_dir') + if parser.has_option('default','default_private_key'): + cfg.defaults['DEFAULT_PRIVATE_KEY'] = ( + parser.get('default','default_private_key')) + + cfg.file_name = file_name + return cfg + + @classmethod + def from_ui(cls, ui_): + """ Make a Config from a ui. + + This checks the [infocalypse] section of the user's + .hgrc for a cfg_file entry, and creates a Config from + that file. + + If there's no [infocalypse] section, a Config is + created from the default file.""" + + file_name = ui_.config('infocalypse', 'cfg_file', None) + if file_name is None: + file_name = os.path.expanduser(DEFAULT_CFG_PATH) + if not os.path.exists(file_name): + ui_.warn("Couldn't read config file: %s\n" % file_name) + raise util.Abort("Run fn-setup.\n") + return Config.from_file(file_name) + + @classmethod + def to_file(cls, cfg, file_name=None): + """ Writes a Config to a file. """ + if file_name is None: + if cfg.file_name is None: + file_name = os.path.expanduser(DEFAULT_CFG_PATH) + else: + file_name = cfg.file_name + file_name = os.path.expanduser(file_name) + parser = ConfigParser() + parser.add_section('default') + parser.set('default', 'host', cfg.defaults['HOST']) + parser.set('default', 'port', cfg.defaults['PORT']) + parser.set('default', 'tmp_dir', cfg.defaults['TMP_DIR']) + parser.set('default', 'default_private_key', + cfg.defaults['DEFAULT_PRIVATE_KEY']) + parser.add_section('index_values') + for repo_id in cfg.version_table: + parser.set('index_values', repo_id, cfg.version_table[repo_id]) + parser.add_section('request_usks') + for repo_dir in cfg.request_usks: + parser.set('request_usks', repo_dir, cfg.request_usks[repo_dir]) + parser.add_section('insert_usks') + for repo_id in cfg.insert_usks: + parser.set('insert_usks', repo_id, cfg.insert_usks[repo_id]) + + out_file = open(file_name, 'wb') + try: + parser.write(out_file) + finally: + out_file.close() diff --git a/infocalypse/fcpclient.py b/infocalypse/fcpclient.py new file mode 100644 --- /dev/null +++ b/infocalypse/fcpclient.py @@ -0,0 +1,796 @@ +""" Simplified client interface for common FCP request. + + Copyright (C) 2008 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +import mimetypes, os, re + +from fcpconnection import FCPConnection, IDataSource, READ_BLOCK, \ + MinimalClient, PolledSocket, FCPError, sha1_hexdigest + +from fcpmessage import GETNODE_DEF, GENERATE_SSK_DEF, \ + GET_REQUEST_URI_DEF, GET_DEF, \ + PUT_FILE_DEF, PUT_REDIRECT_DEF, PUT_COMPLEX_DIR_DEF + +# Defaults for commonly used FCP parameters. +FCP_PARAM_DEFAULTS = { + 'ReturnType':'direct', + 'IgnoreDS':False, + 'MaxRetries':3, + 'DontCompress':True, # Hmmmm... + 'Verbosity':1023 # MUST set this to get progress messages. +} + +#-----------------------------------------------------------# +# file_info helper functions +#-----------------------------------------------------------# + +def get_file_infos(directory, forced_mime_type=None, accept_regex = None): + """ Traverse a directory and return a list of file information + tuples which is suitable for use by + FCPClient.put_complex_dir(). + + forced_mime_type determines the value of + the mime_type field in the returned tuples. + + If acceptRegex is not None only files which match + it are added. + + TUPLE FORMAT: + (name, length, mime_type, full_path) + """ + + def walk_visitor(file_info, dirname, names): + """ Function scope visitor implementation passed to os.path.walk. + """ + + for name in names: + full_name = os.path.join(dirname, name) + if os.path.isfile(full_name): + base = file_info[0] + local_path = full_name.replace(base, '') + if file_info[2] and not file_info[2].match(local_path): + # Skip files rejected by the regex + continue + + file_info[1].append((local_path, + os.path.getsize(full_name), + forced_mime_type, + full_name)) + if directory[-1] != os.path.sep: + # Force trailing path separator. + directory += os.path.sep + file_info = (directory, [], accept_regex) + os.path.walk(directory, walk_visitor, file_info) + return file_info[1] + +def total_length(file_infos): + """ Returns the sum of the file lengths in file_info list. """ + + total = 0 + for info in file_infos: + total += info[1] + return total + +def set_index_file(file_infos, file_name): + """ Move the tuple with the name file_name to the front of + file_infos so that it will be used as the index. + """ + index = None + for info in file_infos: # hmmm... faster search? + if info[0] == file_name: + index = info + break + + if index is None: + raise ValueError("No file named: %s" % file_name) + + file_infos.remove(index) + file_infos.insert(0, index) + +class FileInfoDataSource(IDataSource): + """ IDataSource which concatenates files in a list of + file infos into a contiguous data stream. + + Useful for direct ClientPutComplexDir requests. + """ + + MSG_LENGTH_MISMATCH = "Upload bytes doesn't match sum of " \ + + "lengths in file_infos. Did the files " \ + + "change during uploading?" + + def __init__(self, file_infos): + IDataSource.__init__(self) + assert file_infos + self.infos = file_infos + self.total_length = total_length(file_infos) + self.running_total = 0 + self.chunks = None + self.input_file = None + + def data_generator(self, infos): + """ INTERNAL: Returns a generator which yields the concatenated + data from all the file infos. + """ + + for info in infos: + #print "FileInfoDataSource.GEN -- opening", info[3] + self.input_file = open(info[3], 'rb') + while True: + raised = True + try: + data = self.input_file.read(READ_BLOCK) + #print "FileInfoDataSource.GEN -- read:", len(data) + raised = False + finally: + # Note: Wacky control flow because you can't yield + # from a finally block + if raised or data is None: + #print "FileInfoDataSource.GEN -- closing", info[3] + self.input_file.close() + self.input_file = None + if not data: + break + self.running_total += len(data) + if self.running_total > self.total_length: + raise IOError(self.MSG_LENGTH_MISMATCH) + #print "FileInfoDataSource.GEN -- yeilding", len(data) + yield data + + if self.running_total != self.total_length: + raise IOError(self.MSG_LENGTH_MISMATCH) + + yield None + return + + def initialize(self): + """ IDataSource implementation. """ + #print "FileInfoDataSource.initialize -- called" + assert self.chunks is None + self.chunks = self.data_generator(self.infos) + + def data_length(self): + """ IDataSource implementation. """ + #print "FileInfoDataSource.data_length -- ", self.total_length + return self.total_length + + def release(self): + """ IDataSource implementation. """ + #print "FileInfoDataSource.release -- called" + if not self.chunks is None: + self.chunks = None + if not self.input_file: + self.input_file.close() + self.input_file = None + + def read(self): + """ IDataSource implementation. """ + #print "FileInfoDataSource.read -- called" + assert not self.chunks is None + if self.chunks: + ret = self.chunks.next() + if ret is None: + self.chunks = None + #print "FileInfoDataSource.read -- returned None" + return None + #print "FileInfoDataSource.read -- returned:", len(ret) + return ret + #print "FileInfoDataSource.read(1) -- returned None, \ + # SHOULD NEVER HAPPEN" + return None + + + + +#-----------------------------------------------------------# +# Key classification and manipulation helper functions +#-----------------------------------------------------------# + +# REDFLAG: Use a common regex? Not sure that would cut loc... +USK_FILE_REGEX = re.compile('(freenet:)?(USK).*/((\\-)?[0-9]+[0-9]*)$') +def is_usk_file(uri): + """ Returns True if uri points to a single file, False otherwise. """ + return bool(USK_FILE_REGEX.match(uri)) + +USK_CONTAINER_REGEX = re.compile('(freenet:)?(USK).*/((\\-)?[0-9]+[0-9]*)/$') +def is_usk_container(uri): + """ Return True if uri is USK uri which points to a Freenet + Container, False otherwise. + """ + return bool(USK_CONTAINER_REGEX.match(uri)) + +KEY_TYPE_REGEX = re.compile('(freenet:)?(?P<key_type>CHK|KSK|SSK|USK)@') +def key_type(uri): + """ Returns the key type. """ + + match = KEY_TYPE_REGEX.match(uri) + if not match: + raise Exception("Doesn't look like a Freenet URI: %s" % uri) + return match.groupdict()['key_type'] + +def is_chk(uri): + """ Returns True if the URI is a CHK key, False otherwise. """ + return key_type(uri) == 'CHK' + +def is_ksk(uri): + """ Returns True if the URI is a KSK key, False otherwise. """ + return key_type(uri) == 'KSK' + +def is_ssk(uri): + """ Returns True if the URI is a SSK key, False otherwise. """ + return key_type(uri) == 'SSK' + +def is_usk(uri): + """ Returns True if the URI is a USK key, False otherwise. """ + return key_type(uri) == 'USK' + +# LATER: fix regex to work for SSKs too. +VERSION_REGEX = re.compile('(?P<usk>USK)@(.*)/(?P<version>' + + '(\\-)?[0-9]+[0-9]*)(/.*)?') +def get_version(uri): + """ Return the version index of USK. + + Raises ValueError if no version could be extracted. + """ + + try: + version = int(VERSION_REGEX.match(uri). + groupdict()['version']) + except: + raise ValueError("Couldn't parse a USK or SSK version from: %s" % uri) + return version + +def get_ssk_for_usk_version(usk_uri, version): + """ Return an SSK for a specific version of a USK. + + NOTE: + The version in usk_uri is ignored. + """ + match = VERSION_REGEX.match(usk_uri) + if not match: + raise Exception("Couldn't parse version from USK: %s" % usk_uri) + + return 'SSK' + usk_uri[match.end('usk') : match.start('version') - 1] \ + + '-' + str(version) + usk_uri[match.end('version'):] + +def get_usk_for_usk_version(usk_uri, version, negative = False): + """ Return an USK for a specific version of a USK. + + NOTE: + The version in usk_uri is ignored. + Works for both containers and files. + """ + match = VERSION_REGEX.match(usk_uri) + if not match: + raise Exception("Couldn't parse version from USK: %s" % usk_uri) + if negative and version > 0: + version = -1 * version + version_str = str(version) + if version == 0 and negative: + version_str = '-0' + # BITCH: + # They should have picked some other symbol ('*'?) which doesn't + # encourage implementers to jam the version into an integer. + # i.e. because you can't represent the version with an integer + # because -0 == 0. + assert not negative or version_str.find('-') > -1 + + return usk_uri[0 : match.start('version')] \ + + version_str + usk_uri[match.end('version'):] + +def is_negative_usk(usk_uri): + """ Returns True if usk_uri has a negative version index, + False otherwise. + + REQUIRES: usk_uri is a USK key. + """ + match = VERSION_REGEX.match(usk_uri) + if not match: + raise Exception("Couldn't parse version from USK: %s" % usk_uri) + return match.groupdict()['version'].find('-') > -1 + +def get_negative_usk(usk_uri): + """ Return an USK with a negative version index. + + NOTE: + Using a negative index causes the FCP server to search + harder for later versions in ClientGet requests. + + NOTE: + This is a NOP if usk_uri is already negative. + """ + version = get_version(usk_uri) + if is_negative_usk(usk_uri): + return usk_uri + + return get_usk_for_usk_version(usk_uri, version, True) + +def prefetch_usk(client, usk_uri, allowed_redirects = 3, + message_callback = None): + """ Force the FCP server to explicitly search for updates + to the USK. + + Returns the latest version as an integer or None if + no version could be determined. + + This works by sending a negative index value for the USK. + + Note that this can return a version LESS THAN the version + in usk_uri. + """ + + if client.in_params.async: + raise ValueError("This function only works synchronously.") + + usk_uri = get_negative_usk(usk_uri) + client.reset() + callback = client.message_callback + return_type = client.in_params.default_fcp_params.get('ReturnType') + version = None + try: + if message_callback: + # Install a custom message callback + client.message_callback = message_callback + client.in_params.default_fcp_params['ReturnType'] = 'none' + try: + version = get_version(client.get(usk_uri, + allowed_redirects)[1]['URI']) + except FCPError: + version = None + finally: + client.message_callback = callback + if return_type: + client.in_params.default_fcp_params['ReturnType'] = return_type + + return version + +def latest_usk_index(client, usk_uri, allowed_redirects = 1, + message_callback = None): + """ Determines the version index of a USK key. + + Returns a (version, data_found) tuple where version + is the integer version and data_found is the data_found + message for the latest index. + + + NOTE: + This fetches the key and discards the data. + It may take a very long time if you call it for + a key which points to a large block of data. + """ + + if client.in_params.async: + raise ValueError("This function only works synchronously.") + + client.reset() + callback = client.message_callback + #print "PARAMS:", client.in_params.default_fcp_params + return_type = client.in_params.default_fcp_params.get('ReturnType') + try: + if message_callback: + # Install a custom message callback + client.message_callback = message_callback + client.in_params.default_fcp_params['ReturnType'] = 'none' + prev = None + while True: + # Hmmmm... Make sure that the USK has 'settled' + next = client.get(usk_uri, allowed_redirects) + if prev and next[1]['URI'] == prev[1]['URI']: + break + prev = next + finally: + client.message_callback = callback + if return_type: + client.in_params.default_fcp_params['ReturnType'] = return_type + + return (get_version(prev[1]['URI']), prev) + +def get_insert_chk_filename(uri): + """ Returns the file name part of CHK@/file_part.ext style + CHK insert uris. """ + assert uri.startswith('CHK@') + if not uri.startswith('CHK@/'): + if uri != 'CHK@': + raise ValueError("Unexpected data after '@'. Maybe you forgot the " + + "'/' before the filename part?") + return None + return uri[5:] + +def set_insert_uri(params, uri): + """ INTERNAL: Set the 'URI' and 'TargetFilename' in params, + correctly handling CHK@/filename.ext style insert URIs. """ + + if is_chk(uri): + params['URI'] = 'CHK@' + filename = get_insert_chk_filename(uri) + if not filename is None: + params['TargetFilename'] = filename + else: + params['URI'] = uri + +def get_usk_hash(usk): + """ Returns a 12 hex digit hash for a USK which is independant + of verison. """ + return sha1_hexdigest(get_usk_for_usk_version(usk, 0))[:12] + +def check_usk_hash(usk, hash_value): + """ Returns True if the hash matches, False otherwise. """ + return (sha1_hexdigest(get_usk_for_usk_version(usk, 0))[:12] + == hash_value) + +def show_progress(dummy, msg): + """ Default message callback implementation. """ + + if msg[0] == 'SimpleProgress': + print "Progress: (%s/%s/%s)" % (msg[1]['Succeeded'], + msg[1]['Required'], + msg[1]['Total']) + else: + print "Progress: %s" % msg[0] + +def parse_progress(msg): + """ Parse a SimpleProgress message into a tuple. """ + assert msg[0] == 'SimpleProgress' + + return (int(msg[1]['Succeeded']), + int(msg[1]['Required']), + int(msg[1]['Total']), + int(msg[1]['Failed']), + int(msg[1]['FatallyFailed']), + bool(msg[1]['FinalizedTotal'].lower() == 'true')) + +class FCPClient(MinimalClient): + """ A class to execute common FCP requests. + + This class provides a simplified interface for common FCP commands. + Calls are blocking by default. Set FCPClient.in_params.async = True + to run asynchronously. + + You can set FCP parameters using the + FCPClient.in_params.default_fcp_params dictionary. + + GOTCHA: + Don't set FCPClient.in_params.fcp_params directly. It is reset + before most calls so changes to it probably won't have any effect. + """ + def __init__(self, conn): + MinimalClient.__init__(self) + self.conn = conn + self.message_callback = show_progress + self.in_params.default_fcp_params = FCP_PARAM_DEFAULTS.copy() + + @classmethod + def connect(cls, host, port, socket_class = PolledSocket, + state_callback = None): + """ Create an FCPClient which owns a new FCPConnection. + + NOTE: If you need multiple FCPClient instances it is + better to explictly create an FCPConnection and + use the FCPClient.__init__() method so that all + instances are multiplexed over the same connection. + """ + sock = None + conn = None + raised = True + try: + sock = socket_class(host, port) + conn = FCPConnection(sock, True, state_callback) + raised = False + finally: + if raised: + if conn: + conn.close() + if sock: + sock.close() + + return FCPClient(conn) + + + def wait_until_finished(self): + """ Wait for the current request to finish. """ + assert self.conn + self.conn.wait_for_terminal(self) + + def close(self): + """ Close the underlying FCPConnection. """ + if self.conn: + self.conn.close() + + def get_node(self, opennet = False, private = False, volatile = True): + """ Query node information by sending an FCP GetNode message. """ + + # Hmmmm... I added an 'Identifier' value to request message + # even though there's None in the doc. See GETNODE_DEF. + # It seems to work. + self.reset() + self.in_params.definition = GETNODE_DEF + self.in_params.fcp_params = {'GiveOpennetRef': opennet, + 'WithPrivate': private, + 'WithVolatile': volatile } + + return self.conn.start_request(self) + + def generate_ssk(self): + """ Generate an SSK key pair. + + Returns the SSKKeyPair message. + """ + self.reset() + self.in_params.definition = GENERATE_SSK_DEF + return self.conn.start_request(self) + + def get_request_uri(self, insert_uri): + """ Return the request URI corresponding to the insert URI. + + REQUIRES: insert_uri is a private SSK or USK. + """ + + if self.in_params.async: + raise ValueError("This function only works synchronously.") + + assert is_usk(insert_uri) or is_ssk(insert_uri) + + if is_usk(insert_uri): + target = get_ssk_for_usk_version(insert_uri, 0) + else: + target = insert_uri + + self.reset() + self.in_params.definition = GET_REQUEST_URI_DEF + self.in_params.fcp_params = {'URI': target, + 'MaxRetries': 1, + 'PriorityClass':1, + 'UploadFrom':'direct', + 'DataLength':9, + 'GetCHKOnly':True} + self.in_params.send_data = '012345678' # 9 bytes of data + inverted = self.conn.start_request(self)[1]['URI'] + public = inverted[inverted.find('@') + 1: inverted.find('/')] + return insert_uri[:insert_uri.find('@') + 1] + public \ + + insert_uri[insert_uri.find('/'):] + + def get(self, uri, allowed_redirects = 0, output_file = None): + """ Requests the data corresponding to the URI from the + FCP server. + + Returns an AllData or DataFound (when + self.default_fcp_params['ReturnType'] == 'none') message + on success. + + If output_file or self.output_file is not None, write the + raw data to file instead of returning it as a string. + + Raises an FCPError on failure. + + An extra 'URI' entry is added to the returned message + containing the final URI the data was requested + from after redirecting. + + An extra 'Metadata.ContentType' entry is added to the + returned AllData message containing the mime type + information extracted from the last DataFound. + """ + self.reset() + self.in_params.definition = GET_DEF + self.in_params.fcp_params = {'URI':uri } + self.in_params.allowed_redirects = allowed_redirects + self.in_params.file_name = output_file + # REDFLAG: fix + self.in_params.send_data = False + return self.conn.start_request(self) + + + def put(self, uri, bytes, mime_type=None): + """ Insert a string into Freenet. + + Returns a PutSuccessful message on success. + Raises an FCPError on failure. + """ + self.reset() + self.in_params.definition = PUT_FILE_DEF + set_insert_uri(self.in_params.fcp_params, uri) + if mime_type: + self.in_params.fcp_params['Metadata.ContentType'] = mime_type + + self.in_params.send_data = bytes + return self.conn.start_request(self) + + def put_file(self, uri, path, mime_type=None): + """ Insert a single file into Freenet. + + Returns a PutSuccessful message on success. + Raises an FCPError on failure. + + REQUIRES: The size of the file can't change during this + call. + """ + + self.reset() + self.in_params.definition = PUT_FILE_DEF + set_insert_uri(self.in_params.fcp_params, uri) + + if mime_type: + self.in_params.fcp_params['Metadata.ContentType'] = mime_type + + # REDFLAG: test. not sure this ever worked in previous version + #if 'UploadFrom' in params and params['UploadFrom'] == 'disk': + # # REDFLAG: test this code path! + # params['FileName'] = path + # path = None + + self.in_params.file_name = path + # REDFLAG: fix + self.in_params.send_data = True + return self.conn.start_request(self) + + def put_redirect(self, uri, target_uri, mime_type=None): + """ Insert a redirect into freenet. + + Returns a PutSuccessful message on success. + Raises an FCPError on failure. + """ + self.reset() + self.in_params.definition = PUT_REDIRECT_DEF + self.in_params.fcp_params = {'URI':uri, + 'TargetURI':target_uri, + 'UploadFrom':'redirect'} + if mime_type: + self.in_params.fcp_params['Metadata.ContentType'] = mime_type + return self.conn.start_request(self) + + def put_complex_dir(self, uri, file_infos, + default_mime_type = 'text/plain'): + """ Insert a collection of files into a Freenet Container. + + file_infos must be a list of + (name, length, mime_type, full_path) tuples. + + file_infos[0] is inserted as the default document. + + mime types: + If the mime_type value in the file_infos tuple for the + file is not None, it is used. Otherwise the mime type + is guessed from the file extension. Finally, if guessing + fails, default_mime_type is used. + """ + + assert default_mime_type + assert file_infos + + self.reset() + self.in_params.definition = PUT_COMPLEX_DIR_DEF + self.in_params.fcp_params = {'URI': uri} + + for field in self.in_params.default_fcp_params: + if field.startswith("Files"): + raise ValueError("You can't set file entries via " + + " default_fcp_params.") + if 'DefaultName' in self.in_params.default_fcp_params: + raise ValueError("You can't set 'DefaultName' via " + + "default_fcp_params.") + + files = {} + index = 0 + for info in file_infos: + mime_type = info[2] + if not mime_type: + # First try to guess from the extension. + type_tuple = mimetypes.guess_type(info[0]) + if type_tuple: + mime_type = type_tuple[0] + if not mime_type: + # Fall back to the default. + mime_type = default_mime_type + + files['Files.%i.Name' % index] = info[0] + files['Files.%i.UploadFrom' % index] = 'direct' + files['Files.%i.DataLength' % index] = info[1] + files['Files.%i.Metadata.ContentType' % index] = mime_type + + index += 1 + + self.in_params.fcp_params['Files'] = files + self.in_params.fcp_params['DefaultName'] = file_infos[0][0] + + #REDFLAG: Fix + self.in_params.send_data = True + + # IMPORTANT: Don't set the data length. + return self.conn.start_request(self, + FileInfoDataSource(file_infos), False) + +############################################################ +# Helper function for hg changeset bundle handling. +############################################################ + +# Saw here: +# http://sage.math.washington.edu/home/robertwb/trac-bundle/test \ +# /sage_trac/log/trac.log +HG_MIME_TYPE = 'application/mercurial-bundle' + +def package_metadata(metadata): + """ Package the bundle contents metadata into a string which + can be inserted into to the Metadata.ContentType field + of the Freenet key. + + All args must be full 40 digit hex keys. + """ + return "%s;%s,%s,%s" % (HG_MIME_TYPE, metadata[0], metadata[1], metadata[2]) + +CHANGESET_REGEX = re.compile('.*;\s*([0-9a-fA-F]{40,40})\s*,' + + '\s*([0-9a-fA-F]{40,40})\s*,' + + '\s*([0-9a-fA-F]{40,40})') +def parse_metadata(msg): + """ INTERNAL: Parse the (base_rev, first_rev, tip) info out of the + Metadata.ContentType field of msg. + + FCP2.0 doesn't have support for user defined metadata, so we + jam the metadata we need into the mime type field. + """ + match = CHANGESET_REGEX.match(msg[1]['Metadata.ContentType']) + if not match or len(match.groups()) != 3: + # This happens for bundles inserted with older versions + # of hg2fn.py + raise ValueError("Couldn't parse changeset info from [%s]." \ + % msg[1]['Metadata.ContentType']) + return match.groups() + +def make_rollup_filename(rollup_info, request_uri): + """ Return a filename containing info for a rollup bundle. """ + if not is_usk_file(request_uri): + raise ValueError("request_uri is not a USK file uri.") + + # Hmmmm.... get rid of symbolic names? + tip = rollup_info[0][0] + parent = rollup_info[0][1] + start_index = rollup_info[0][2] + end_index = rollup_info[0][3] + assert len(tip) == 40 # LATER: is_changset_id_str() func? + assert len(parent) == 40 + assert start_index >= 0 + assert end_index >= 0 + assert end_index >= start_index + + human_readable = request_uri.split('/')[1] + # hmmmm... always supress .hg + if human_readable.lower().endswith('.hg'): + human_readable = human_readable[:-3] + # <human_name>_<end_index>_<start_index>_<tip>_<parent>_ID<repoid> + return "%s_%i_%i_%s_%s_ID%s" % (human_readable, end_index, start_index, + tip[:12], parent[:12], + get_usk_hash(request_uri)) + +def parse_rollup_filename(filename): + """ Parse a filename created with make_rollup_filename + into a tuple.""" + fields = filename.split('_') + repo_id = fields[-1] + if not repo_id.startswith("ID") or len(repo_id) != 14: + raise ValueError("Couldn't parse repo usk hash.") + repo_id = repo_id[2:] + parent = fields[-2] + if len(parent) != 12: + raise ValueError("Couldn't parse parent.") + tip = fields[-3] + if len(tip) != 12: + raise ValueError("Couldn't parse tip.") + start_index = int(fields[-4]) + end_index = int(fields[-5]) + human_readable = '_'.join(fields[:-6]) # REDFLAG: dci obo? + return (human_readable, start_index, end_index, tip, parent, repo_id) diff --git a/infocalypse/fcpconnection.py b/infocalypse/fcpconnection.py new file mode 100644 --- /dev/null +++ b/infocalypse/fcpconnection.py @@ -0,0 +1,1000 @@ +""" Classes to create a multiplexed asynchronous connection to an + FCP server. + + Copyright (C) 2008 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks + + OVERVIEW: + IAsyncSocket is an abstract interface to an asynchronous + socket. The intent is that client code can plug in a + framework appropriate implementation. i.e. for Twisted, + asyncore, Tkinter, pyQt, pyGtk, etc. A platform agnostic + implementation, PolledSocket is supplied. + + FCPConnection uses an IAsyncSocket delegate to run the + FCP 2.0 protocol over a single socket connection to an FCP server. + + FCPConnection fully(*) supports multiplexing multiple requests. + Client code runs requests by passing an instance of MinimalClient + into FCPConnection.start_request(). The FCPClient MinimalClient + subclass provides convenience wrapper functions for common requests. + + Both blocking and non-blocking client requests are supported. + If MinimalClient.in_params.async == True, FCPConnection.start_connection() + returns a request id string immediately. This is the same request + id which appears in the 'Identifier' field of subsequent incoming. + FCP messages. The MinimalClient.message_callback(client, msg) + callback function is called for every incoming client message for + the request. Async client code can detect the request has finished + by checking client.is_finished() from this callback. + + (*) GOTCHA: If you start a request which writes trailing data + to the FCP server, the FCPConnection will transition into the + UPLOADING state and you won't be able to start new requests until + uploading finishes and it transitions back to the CONNECTED state. + It is recommended that you use a dedicated FCPConnection instance + for file uploads. You don't have to worry about this if you use + blocking requests exclusively. +""" +# REDFLAG: get pylint to acknowledge inherited doc strings from ABCs? + +import os, os.path, random, select, socket, time + +try: + from hashlib import sha1 + def sha1_hexdigest(bytes): + """ Return the SHA1 hexdigest of bytes using the hashlib module. """ + return sha1(bytes).hexdigest() +except ImportError: + # Fall back so that code still runs on pre 2.6 systems. + import sha + def sha1_hexdigest(bytes): + """ Return the SHA1 hexdigest of bytes using the sha module. """ + return sha.new(bytes).hexdigest() + +from fcpmessage import make_request, FCPParser, HELLO_DEF, REMOVE_REQUEST_DEF + +FCP_VERSION = '2.0' # Expected version value sent in ClientHello + +RECV_BLOCK = 4096 # socket recv +SEND_BLOCK = 4096 # socket send +READ_BLOCK = 16 * 1024 # disk read + +MAX_SOCKET_READ = 33 * 1024 # approx. max bytes read during IAsyncSocket.poll() + +POLL_TIME_SECS = 0.25 # hmmmm... + +# FCPConnection states. +CONNECTING = 1 +CONNECTED = 2 +CLOSED = 3 +UPLOADING = 4 + +CONNECTION_STATES = {CONNECTING:'CONNECTING', + CONNECTED:'CONNECTED', + CLOSED:'CLOSED', + UPLOADING:'UPLOADING'} + +def example_state_callback(dummy, state): + """ Example FCPConnection.state_callback function. """ + + value = CONNECTION_STATES.get(state) + if not value: + value = "UNKNOWN" + print "FCPConnection State -> [%s]" % value + +def make_id(): + """ INTERNAL: Make a unique id string. """ + return sha1_hexdigest(str(random.random()) + str(time.time())) + +#-----------------------------------------------------------# +# Byte level socket handling +#-----------------------------------------------------------# + +class IAsyncSocket: + """ Abstract interface for an asynchronous socket. """ + def __init__(self): + # lambda's prevent pylint E1102 warning + + # Data arrived on socket + self.recv_callback = lambda x:None + # Socket closed + self.closed_callback = lambda :None + # Socket wants data to write. This can be None. + self.writable_callback = None + + def write_bytes(self, bytes): + """ Write bytes to the socket. """ + pass + + def close(self): + """ Release all resources associated with the socket. """ + pass + + # HACK to implement waiting on messages. + def poll(self): + """ Do whatever is required to check for new activity + on the socket. + + e.g. run gui framework message pump, explictly poll, etc. + MUST call recv_callback, writable_callback + """ + pass + +class NonBlockingSocket(IAsyncSocket): + """ Base class used for IAsyncSocket implementations based on + non-blocking BSD style sockets. + """ + def __init__(self, connected_socket): + """ REQUIRES: connected_socket is non-blocking and fully connected. """ + IAsyncSocket.__init__(self) + self.buffer = "" + self.socket = connected_socket + + def write_bytes(self, bytes): + """ IAsyncSocket implementation. """ + assert bytes + self.buffer += bytes + #print "write_bytes: ", self.buffer + + def close(self): + """ IAsyncSocket implementation. """ + if self.socket: + self.socket.close() # sync? + self.closed_callback() + self.socket = None + + def do_write(self): + """ INTERNAL: Write to the socket. + + Returns True if data was written, false otherwise. + + REQUIRES: buffer has data or the writable_callback is set. + """ + + assert self.buffer or self.writable_callback + if not self.buffer: + # pylint doesn't infer that this must be set. + # pylint: disable-msg=E1102 + self.writable_callback() + if self.buffer: + chunk = self.buffer[:SEND_BLOCK] + sent = self.socket.send(chunk) + #print "WRITING:", self.buffer[:sent] + assert sent >= 0 + #print "TO_WIRE:" + #print repr(self.buffer[:sent]) + self.buffer = self.buffer[sent:] + return True + assert not self.writable_callback # Hmmmm... This is a client error. + return False + + def do_read(self): + """ INTERNAL: Read from the socket. + + Returns the data read from the socket or None + on EOF. + + Closes on EOF as a side effect. + """ + data = self.socket.recv(RECV_BLOCK) + if not data: + self.close() + #ret = False + #break + return None + + #print "FROM_WIRE:" + #print repr(data) + return data + + +class PolledSocket(NonBlockingSocket): + """ Sucky polled IAsyncSocket implementation which should + work everywhere. i.e. *nix, Windows, OSX. """ + + def __init__(self, host, port): + connected_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # REDFLAG: Can block here. + connected_socket.connect((host, port)) + connected_socket.setblocking(0) + NonBlockingSocket.__init__(self, connected_socket) + + def poll(self): + """ IAsyncSocket implementation. """ + #print "PolledSocket.poll -- called" + if not self.socket: + #print "PolledSocket.poll -- CLOSED" + raise IOError("The socket is closed") + # Why? Because we don't want to call the recv_callback while + # reading... wacky re-entrance issues.... + read = '' + ret = True + while len(read) < MAX_SOCKET_READ: # bound read length + check_writable = [] + if self.buffer or self.writable_callback: + check_writable = [self.socket] + readable, writable, errs = \ + select.select([self.socket], check_writable, + [self.socket], 0) + + #print "result:", readable, writable, errs + + stop = True + if errs: + #print "GOT AN ERROR" + # Hack. Force an IO exception. + self.socket.sendall(RECV_BLOCK) + # Err... should never get here. + raise IOError("Unknown socket error") + + if readable: + data = self.do_read() + if not data: + ret = False + break + + read += data + stop = False + + if writable: + if self.do_write(): + stop = False + + if stop: + break + + if read: + self.recv_callback(read) + #print "PolledSocket.poll -- exited" + return ret + +#-----------------------------------------------------------# +# Message level FCP protocol handling. +#-----------------------------------------------------------# + +# NOTE: +# 'DataFound' is sometimes terminal. See msg_is_terminal(). +# +# NOTE: +# This list is not complete. It only lists +# messages generated by supported FCP commands. +# Messages which always indicate that an FCP request ended in success. +SUCCESS_MSGS = frozenset([ \ + 'NodeHello', 'SSKKeypair', 'AllData', 'PutSuccessful', 'NodeData', + ]) + +# Messages which always indicate that an FCP request ended in failure. +FAILURE_MSGS = frozenset([ \ + 'CloseConnectionDuplicateClientName', 'PutFailed', 'GetFailed', + 'ProtocolError', 'IdentifierCollision', 'UnknownNodeIdentifier', + 'UnknownPeerNoteType' + ]) + +# Messages which always indicate that an FCP request ended. +TERMINAL_MSGS = SUCCESS_MSGS.union(FAILURE_MSGS) + +def msg_is_terminal(msg, params): + """ INTERNAL: Return True if the message ends an FCP request, + False otherwise. + """ + + if msg[0] in TERMINAL_MSGS: + return True + + # Special cases + if msg[0] == 'DataFound' and 'ReturnType' in params and \ + params['ReturnType'] == 'none': + return True + + #print "msg_is_terminal: False" + #print "MSG:", msg + #print "PARAMS:", params + + return False + +def get_code(msg): + """ Returns integer error code if msg has a 'Code' field + None otherwise. + """ + + # Hmmmm... does 'Code' ever appear in non-error messages? + #if not msg[0] in FAILURE_MSGS: + # # Message is not an error. + # return None + + if not 'Code' in msg[1]: + if msg[0] in FAILURE_MSGS: + print "WARNING: get_code(msg, code) couldn't read 'Code'." + return None + + return int(msg[1]['Code']) + +def is_code(msg, error_code): + """ Returns True if msg has a 'Code' field and it is + equal to error_code, False, otherwise. + """ + + code = get_code(msg) + if code is None: + return False + return code == error_code + +def is_fatal_error(msg): + """ Returns True if msg has a 'Fatal' field and it + indicates a non-recoverable error, False otherwise. + """ + + value = msg[1].get('Fatal') + if value is None: + return False # hmmm... + return bool(value.lower() == 'true') + +class FCPError(Exception): + """ An Exception raised when an FCP command fails. """ + + def __init__(self, msg): + Exception.__init__(self, msg[0]) + self.fcp_msg = msg + self.last_uri = None + + def __str__(self): + text = "FCPError: " + self.fcp_msg[0] + if self.fcp_msg[1].has_key('CodeDescription'): + text += " -- " + self.fcp_msg[1]['CodeDescription'] + return text + + def is_code(self, error_code): + """ Returns True if the 'Code' field in the FCP error message + is equal to error_code, False, otherwise. + """ + + if not self.fcp_msg or not 'Code' in self.fcp_msg[1]: + # YES. This does happen. + # Hmmmm... just assert? Can this really happen. + print "WARNING: FCPError.is_code() couldn't read 'Code'." + return False + + return is_code(self.fcp_msg, error_code) + +def raise_on_error(msg): + """ INTERNAL: raise an FCPError if msg indicates an error. """ + + assert msg + if msg[0] in FAILURE_MSGS: + raise FCPError(msg) + +class IDataSource: + """ Abstract interface which provides data written up to + the FCP Server as part of an FCP request. """ + def __init__(self): + pass + + def initialize(self): + """ Initialize. """ + raise NotImplementedError() + + def data_length(self): + """ Returns the total length of the data which will be + returned by read(). """ + raise NotImplementedError() + + def release(self): + """ Release all resources associated with the IDataSource + implementation. """ + raise NotImplementedError() + + def read(self): + """ Returns a raw byte block or None if no more data + is available. """ + raise NotImplementedError() + +class FileDataSource(IDataSource): + """ IDataSource implementation which get's its data from a single + file. + """ + def __init__(self, file_name): + IDataSource.__init__(self) + self.file_name = file_name + self.file = None + + def initialize(self): + """ IDataSource implementation. """ + self.file = open(self.file_name, 'rb') + + def data_length(self): + """ IDataSource implementation. """ + return os.path.getsize(self.file_name) + + def release(self): + """ IDataSource implementation. """ + if self.file: + self.file.close() + self.file = None + + def read(self): + """ IDataSource implementation. """ + assert self.file + return self.file.read(READ_BLOCK) + + + +# MESSAGE LEVEL + +class FCPConnection: + """Class for a single persistent socket connection + to an FCP server. + + Socket level IO is handled by the IAsyncSocket delegate. + + The connection is multiplexed (i.e. it can handle multiple + concurrent client requests). + + """ + + def __init__(self, socket_, wait_for_connect = False, + state_callback = None): + """ Create an FCPConnection from an open IAsyncSocket instance. + + REQUIRES: socket_ ready for writing. + """ + self.running_clients = {} + # Delegate handles parsing FCP protocol off the wire. + self.parser = FCPParser() + self.parser.msg_callback = self.msg_handler + self.parser.context_callback = self.get_context + + self.socket = socket_ + if state_callback: + self.state_callback = state_callback + else: + self.state_callback = lambda x, y: None + self.socket.recv_callback = self.parser.parse_bytes + self.socket.closed_callback = self.closed_handler + + self.node_hello = None + + # Only used for uploads. + self.data_source = None + + # Tell the client code that we are trying to connect. + self.state_callback(self, CONNECTING) + + # Send a ClientHello + params = {'Name':'FCPConnection[%s]' % make_id(), + 'ExpectedVersion': FCP_VERSION} + self.socket.write_bytes(make_request(HELLO_DEF, params)) + if wait_for_connect: + # Wait for the reply + while not self.is_connected(): + if not self.socket.poll(): + raise IOError("Socket closed") + time.sleep(POLL_TIME_SECS) + + def is_connected(self): + """ Returns True if the instance is fully connected to the + FCP Server and ready to process requests, False otherwise. + """ + return not self.node_hello is None + + def is_uploading(self): + """ Returns True if the instance is uploading data, False + otherwise. + """ + return (self.data_source or + self.socket.writable_callback) + + def close(self): + """ Close the connection and the underlying IAsyncSocket + delegate. + """ + if self.socket: + self.socket.close() + + # set_data_length only applies if data_source is set + def start_request(self, client, data_source = None, set_data_length = True): + """ Start an FCP request. + + If in_params.async is True this returns immediately, otherwise + it blocks until the request finishes. + + If client.in_params.send_data is set, trailing data is sent + after the request message. If data_source is not None, then + the data in it is sent. Otherwise if client.in_params.file is + not None, the data in the file is sent. Finally if neither of + the other sources are not None the contents of + client.in_params.send_data are sent. + + If set_data_length is True the 'DataLength' field is set in the + requests FCP message. + + If in_params.async it True, this method returns the identifier + for the request, otherwise, returns the FCP message which + terminated the request. + """ + assert not self.is_uploading() + assert not client.context + assert not client.response + assert not 'Identifier' in client.in_params.fcp_params + identifier = make_id() + client.in_params.fcp_params['Identifier'] = identifier + write_string = False + if client.in_params.send_data: + assert not self.data_source + if data_source: + data_source.initialize() + if set_data_length: + client.in_params.fcp_params['DataLength'] = (data_source. + data_length()) + self.data_source = data_source + self.socket.writable_callback = self.writable_handler + elif client.in_params.file_name: + self.data_source = FileDataSource(client.in_params.file_name) + self.data_source.initialize() + client.in_params.fcp_params['DataLength'] = (self. + data_source. + data_length()) + self.socket.writable_callback = self.writable_handler + else: + client.in_params.fcp_params['DataLength'] = len(client. + in_params. + send_data) + write_string = True + + self.socket.write_bytes(make_request(client.in_params.definition, + client.in_params.fcp_params, + client.in_params. + default_fcp_params)) + + if write_string: + self.socket.write_bytes(client.in_params.send_data) + + assert not client.context + client.context = RequestContext(client.in_params.allowed_redirects, + identifier, + client.in_params.fcp_params.get('URI')) + if not client.in_params.send_data: + client.context.file_name = client.in_params.file_name + + #print "MAPPED [%s]->[%s]" % (identifier, str(client)) + self.running_clients[identifier] = client + + if self.data_source: + self.state_callback(self, UPLOADING) + + if client.in_params.async: + return identifier + + resp = self.wait_for_terminal(client) + raise_on_error(resp) + return client.response + + def remove_request(self, identifier, is_global = False): + """ Cancel a running request. + NOT ALLOWED WHILE UPLOADING DATA. + """ + if self.is_uploading(): + raise Exception("Can't remove while uploading. Sorry :-(") + + if not identifier in self.running_clients: + print "FCPConnection.remove_request -- unknown identifier: ", \ + identifier + params = {'Identifier': identifier, + 'Global': is_global} + self.socket.write_bytes(make_request(REMOVE_REQUEST_DEF, params)) + + def wait_for_terminal(self, client): + """ Wait until the request running on client finishes. """ + while not client.is_finished(): + if not self.socket.poll(): + break + time.sleep(POLL_TIME_SECS) + + # Doh saw this trip 20080124. Regression from + # NonBlockingSocket changes? + # assert client.response + if not client.response: + raise IOError("No response. Maybe the socket dropped?") + + return client.response + + def handled_redirect(self, msg, client): + """ INTERNAL: Handle code 27 redirects. """ + + # BITCH: This is a design flaw in the FCP 2.0 protocol. + # They should have used unique numbers for all error + # codes so that client coders don't need to keep track + # of the initiating request in order to interpret the + # error code. + if client.in_params.definition[0] == 'ClientGet' and is_code(msg, 27): + #print "Checking for allowed redirect" + if client.context.allowed_redirects: + #print "Handling redirect" + client.context.allowed_redirects -= 1 + assert client.context.initiating_id + assert client.context.initiating_id in self.running_clients + assert client.context.running_id + if client.context.running_id != client.context.initiating_id: + # Remove the context for the intermediate redirect. + #print "DELETED: ", client.context.running_id + del self.running_clients[client.context.running_id] + + client.context.running_id = make_id() + client.context.last_uri = msg[1]['RedirectURI'] + + # Copy, don't modify params. + params = {} + params.update(client.in_params.fcp_params) + params['URI'] = client.context.last_uri + params['Identifier'] = client.context.running_id + + # Send new request. + self.socket.write_bytes(make_request(client.in_params. + definition, params)) + + #print "MAPPED(1) [%s]->[%s]" % (client.context.running_id, + # str(client)) + self.running_clients[client.context.running_id] = client + + # REDFLAG: change callback to include identifier? + # Hmmm...fixup identifier in msg? + if client.message_callback: + client.message_callback(client, msg) + return True + + return False + + + def handle_unexpected_msgs(self, msg): + """ INTERNAL: Process unexpected messages. """ + + if not self.node_hello: + if msg[0] == 'NodeHello': + self.node_hello = msg + self.state_callback(self, CONNECTED) + return True + + raise Exception("Unexpected message before NodeHello: %s" + % msg[0]) + + if not 'Identifier' in msg[1]: + print "Saw message without 'Identifier': %s" % msg[0] + print msg + return True + + if not msg[1]['Identifier'] in self.running_clients: + print "No client for identifier: %s" % msg[1]['Identifier'] + # BITCH: You get a PersistentRequestRemoved msg even for non + # peristent requests AND you get it after the GetFailed. + #print msg[0] + return True + + return False + + def get_context(self, request_id): + """ INTERNAL: Lookup RequestContexts for the FCPParser delegate. + """ + + client = self.running_clients.get(request_id) + if not client: + raise Exception("No client for identifier: %s" % request_id) + assert client.context + return client.context + + def msg_handler(self, msg): + """INTERNAL: Process incoming FCP messages from the FCPParser delegate. + """ + + if self.handle_unexpected_msgs(msg): + return + + client = self.running_clients[msg[1]['Identifier']] + assert client.is_running() + + if msg_is_terminal(msg, client.in_params.fcp_params): + if self.handled_redirect(msg, client): + return + + # Remove running context entries + assert msg[1]['Identifier'] == client.context.running_id + #print "DELETED: ", client.context.running_id + del self.running_clients[client.context.running_id] + if client.context.running_id != client.context.initiating_id: + #print "DELETED: ", client.context.initiating_id + del self.running_clients[client.context.initiating_id] + + if msg[0] == 'DataFound' or msg[0] == 'AllData': + # REDFLAG: Always do this? and fix FCPError.last_uri? + # Copy URI into final message. i.e. so client + # sees the final redirect not the inital URI. + msg[1]['URI'] = client.context.last_uri + if msg[0] == 'AllData': + # Copy metadata into final message + msg[1]['Metadata.ContentType'] = client.context.metadata + + # Add a third entry to the msg tuple containing + # the raw data, or a comment saying where it was + # written. + assert len(msg) == 2 + msg = list(msg) + if client.context.data_sink.file_name: + msg.append("Wrote raw data to: %s" \ + % client.context.file_name) + else: + msg.append(client.context.data_sink.raw_data) + msg = tuple(msg) + + + # So that MinimalClient.request_id() returns the + # initiating id correctly even after following + # redirects. + msg[1]['Identifier'] = client.context.initiating_id + + # Reset the context + client.context.release() + client.context = None + + client.response = msg + assert not client.is_running() + else: + if 'Metadata.ContentType' in msg[1]: + # Keep track of metadata as we follow redirects + client.context.metadata = msg[1]['Metadata.ContentType'] + + # Notify client. + if client.message_callback: + client.message_callback(client, msg) + + def closed_handler(self): + """ INTERNAL: Callback called by the IAsyncSocket delegate when the + socket closes. """ + + self.node_hello = None + + # Hmmmm... other info, ok to share this? + fake_msg = ('ProtocolError', {'CodeDescription':'Socket closed'}) + #print "NOTIFIED: CLOSED" + + # Hmmmm... iterate over values instead of keys? + for identifier in self.running_clients: + client = self.running_clients[identifier] + # Remove client from list of running clients. + #print "CLIENT:", client + #print "CLIENT.CONTEXT:", client.context + assert client.context + assert client.context.running_id + # Notify client that it has stopped. + if (client.context.initiating_id == client.context.running_id + and client.message_callback): + client.message_callback(client, fake_msg) + + self.running_clients.clear() + self.state_callback(self, CLOSED) + + def writable_handler(self): + """ INTERNAL: Callback called by the IAsyncSocket delegate when + it needs more data to write. + """ + + if not self.data_source: + return + data = self.data_source.read() + if not data: + self.data_source.release() + self.data_source = None + self.socket.writable_callback = None + if self.is_connected(): + self.state_callback(self, CONNECTED) + return + self.socket.write_bytes(data) + +# Writes to file if file_name is set, raw_data otherwise +class DataSink: + """ INTERNAL: Helper class used to save trailing data for FCP + messages. + """ + + def __init__(self): + self.file_name = None + self.file = None + self.raw_data = '' + self.data_bytes = 0 + + def initialize(self, data_length, file_name): + """ Initialize the instance. + If file_name is not None the data is written into + the file, otherwise, it is saved in the raw_data member. + """ + # This should only be called once. You can't reuse the datasink. + assert not self.file and not self.raw_data and not self.data_bytes + self.data_bytes = data_length + self.file_name = file_name + + def write_bytes(self, bytes): + """ Write bytes into the instance. + + Multiple calls can be made. The final amount of + data written into the instance MUST be equal to + the data_length value passed into the initialize() + call. + """ + + #print "WRITE_BYTES called." + if self.file_name and not self.file: + self.file = open(self.file_name, 'wb') + + if self.file: + #print "WRITE_BYTES writing to file" + if self.file.closed: + print "FileOrStringDataSink -- refusing to write" \ + + " to closed file!" + return + self.file.write(bytes) + self.data_bytes -= len(bytes) + assert self.data_bytes >= 0 + if self.data_bytes == 0: + self.file.close() + return + + self.raw_data += bytes + self.data_bytes -= len(bytes) + assert self.data_bytes >= 0 + + def release(self): + """ Release all resources associated with the instance. """ + + if self.data_bytes != 0: + print "DataSink.release -- DIDN'T FINISH PREVIOUS READ!", \ + self.data_bytes + if self.file: + self.file.close() + self.file_name = None + self.file = None + self.raw_data = '' + self.data_bytes = 0 + +class RequestContext: + """ INTERNAL: 'Live' context information which an FCPConnection needs + to keep about a single FCP request. + """ + def __init__(self, allowed_redirects, identifier, uri): + self.initiating_id = identifier + self.running_id = identifier + + # Redirect handling + self.allowed_redirects = allowed_redirects + self.last_uri = uri + self.metadata = "" # Hmmm... + + # Incoming data handling + self.data_sink = DataSink() + + def writable(self): + """ Returns the number of additional bytes which can be written + into the data_sink member. + """ + + return self.data_sink.data_bytes + + def release(self): + """ Release all resources associated with the instance. """ + + self.data_sink.release() + + +#-----------------------------------------------------------# +# Client code +#-----------------------------------------------------------# + +# Hmmmm... created separate class because pylint was complaining +# about too many attributes in MinimalClient and FCPClient +class ClientParams: + """ A helper class to aggregate request parameters. """ + + def __init__(self): + self.definition = None + # These are default values which can be modified by the client code. + # THE IMPLEMENTATION CODE i.e. fcp(connection/client/message) + # MUST NOT MODIFY THEM. + self.default_fcp_params = {} + # These are per request values. They can be modified / reset. + self.fcp_params = {} + self.async = False + self.file_name = None + self.send_data = None + self.allowed_redirects = 0 + + def reset(self): + """ Reset all members EXCEPT async, allowed_redirects and + default_fcp_params to their default values. + """ + + self.definition = None + self.fcp_params = {} + self.file_name = None + self.send_data = None + + # HACK: Not really required, but supresses pylint R0903 + def pretty(self): + """Returns a human readable rep of the params. """ + + return "%s: %s %s %s %s %s %s" % \ + ( self.definition[0], + str(self.send_data), + str(self.async), + self.file_name, + self.allowed_redirects, + self.fcp_params, + self.default_fcp_params ) + +class MinimalClient: + """ A single FCP request which can be executed via the + FCPConnection.start_request() method. + + If in_params.async is True the request runs asynchronously, + otherwise it causes FCPConnection.start_request() to block. + + The message_callback notifier function is called for + each incoming FCP message during the request. The first + argument is the client instance. Its is_finished() + method will return True for the final message. message_callback + implementations MUST NOT modify the state of the client + instance while is_finished() is False. + """ + + def __init__(self): + # IN parameters. + self.in_params = ClientParams() + + # OUT parameter + self.response = None + + # Variables used while client request is running. + self.context = None + + # Notification + self.message_callback = lambda client, msg:None + + def reset(self, reset_params = True): + """ Reset all members EXCEPT self.in_params.allowed_redirects, + self.in_params.default_fcp_params and + self.in_params.async to their default values. + """ + assert not self.is_running() + if reset_params: + self.in_params.reset() + self.response = None + self.context = None + + def is_running(self): + """ Returns True if a request is running, False otherwise. """ + + return self.context + + def is_finished(self): + """ Returns True if the request is finished, False otherwise. """ + + return not self.response is None + + def request_id(self): + """ Returns the request id. """ + if self.response and not self.context: + return self.response[1]['Identifier'] + elif self.context: + return self.context.initiating_id + return None diff --git a/infocalypse/fcpmessage.py b/infocalypse/fcpmessage.py new file mode 100644 --- /dev/null +++ b/infocalypse/fcpmessage.py @@ -0,0 +1,333 @@ +""" Classes and functions for creating and parsing FCP messages. + + Copyright (C) 2008 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks + + An FCP message is represented as a + (msg_name, msg_values_dict) tuple. + + Some message e.g. AllData may have a third entry + which contains the raw data string for the FCP + message's trailing data. +""" + +#-----------------------------------------------------------# +# FCP mesage creation helper functions +#-----------------------------------------------------------# + +def merge_params(params, allowed, defaults = None): + """ Return a new dictionary instance containing only the values + which have keys in the allowed field list. + + Values are taken from defaults only if they are not + set in params. + """ + + ret = {} + for param in allowed: + if param in params: + ret[param] = params[param] + elif defaults and param in defaults: + ret[param] = defaults[param] + return ret + +def format_params(params, allowed, required): + """ INTERNAL: Format params into an FCP message body string. """ + + ret = '' + for field in params: + if not field in allowed: + raise ValueError("Illegal field [%s]." % field) + + for field in allowed: + if field in params: + if field == 'Files': + # Special case Files dictionary. + assert params['Files'] + for subfield in params['Files']: + ret += "%s=%s\n" % (subfield, params['Files'][subfield]) + continue + value = str(params[field]) + if not value: + raise ValueError("Illegal value for field [%s]." % field) + if value.lower() == 'true' or value.lower() == 'false': + value = value.lower() + ret += "%s=%s\n" % (field, value) + elif field in required: + #print "FIELD:", field, required + raise ValueError("A required field [%s] was not set." % field) + return ret + +# REDFLAG: remove trailing_data? +def make_request(definition, params, defaults = None, trailing_data = None): + """ Make a request message string from a definition tuple + and params parameters dictionary. + + Values for allowed parameters not specified in params are + taken from defaults if they are present and params IS + UPDATED to include these values. + + A definition tuple has the following entries: + (msg_name, allowed_fields, required_fields, contraint_func) + + msg_name is the FCP message name. + allowed_fields is a sequence of field names which are allowed + in params. + required_fields is a sequence of field names which are required + in params. If this is None all the allowed fields are + assumed to be required. + constraint_func is a function which takes definitions, params + arguments and can raise if contraints on the params values + are not met. This can be None. + """ + + #if 'Identifier' in params: + # print "MAKE_REQUEST: ", definition[0], params['Identifier'] + #else: + # print "MAKE_REQUEST: ", definition[0], "NO_IDENTIFIER" + + #print "DEFINITION:" + #print definition + #print "PARAMS:" + #print params + name, allowed, required, constraint_func = definition + assert name + + real_params = merge_params(params, allowed, defaults) + + # Don't force repetition if required is the same. + if required is None: + required = allowed + + ret = name + '\n' + format_params(real_params, allowed, required) \ + + 'EndMessage\n' + + # Run extra checks on parameter values + # Order is important. Format_params can raise on missing fields. + if constraint_func: + constraint_func(definition, real_params) + + if trailing_data: + ret += trailing_data + + params.clear() + params.update(real_params) + + return ret + +#-----------------------------------------------------------# +# FCP request definitions for make_request() +#-----------------------------------------------------------# + +def get_constraint(dummy, params): + """ INTERNAL: Check get params. """ + if 'ReturnType' in params and params['ReturnType'] != 'disk': + if 'Filename' in params or 'TempFilename' in params: + raise ValueError("'Filename' and 'TempFileName' only allowed" \ + + " when 'ReturnType' is disk.") + +def put_file_constraint(dummy, params): + """ INTERNAL: Check put_file params. """ + # Hmmmm... this only checks for required arguments, it + # doesn't report values that have no effect. + upload_from = 'direct' + if 'UploadFrom' in params: + upload_from = params['UploadFrom'] + if upload_from == 'direct': + if not 'DataLength' in params: + raise ValueError("'DataLength' MUST be set, 'UploadFrom ==" + + " 'direct'.") + elif upload_from == 'disk': + if not 'Filename' in params: + raise ValueError("'Filename' MUST be set, 'UploadFrom ==" + + " 'disk'.") + elif upload_from == 'redirect': + if not 'TargetURI' in params: + raise ValueError("'TargetURI' MUST be set, 'UploadFrom ==" + + " 'redirect'.") + else: + raise ValueError("Unknown value, 'UploadFrom' == %s" % upload_from) + + +HELLO_DEF = ('ClientHello', ('Name', 'ExpectedVersion'), None, None) + +# Identifier not included in doc? +GETNODE_DEF = ('GetNode', ('Identifier', 'GiveOpennetRef', 'WithPrivate', + 'WithVolatile'), + None, None) + +#IMPORTANT: One entry tuple MUST have trailing comma or it will evaluate +# to a string instead of a tuple. +GENERATE_SSK_DEF = ('GenerateSSK', ('Identifier',), None, None) +GET_REQUEST_URI_DEF = ('ClientPut', + ('URI', 'Identifier', 'MaxRetries', 'PriorityClass', + 'UploadFrom', 'DataLength', 'GetCHKOnly'), + None, None) +GET_DEF = ('ClientGet', + ('IgnoreDS', 'DSOnly', 'URI', 'Identifier', 'Verbosity', + 'MaxSize', 'MaxTempSize', 'MaxRetries', 'PriorityClass', + 'Persistence', 'ClientToken', 'Global', 'ReturnType', + 'BinaryBlob', 'AllowedMimeTypes', 'FileName', 'TmpFileName'), + ('URI', 'Identifier'), + get_constraint) +PUT_FILE_DEF = ('ClientPut', + ('URI', 'Metadata.ContentType', 'Identifier', 'Verbosity', + 'MaxRetries', 'PriorityClass', 'GetCHKOnly', 'Global', + 'DontCompress','ClientToken', 'Persistence', + 'TargetFilename', 'EarlyEncode', 'UploadFrom', 'DataLength', + 'Filename', 'TargetURI', 'FileHash', 'BinaryBlob'), + ('URI', 'Identifier'), + put_file_constraint) +PUT_REDIRECT_DEF = ('ClientPut', + ('URI', 'Metadata.ContentType', 'Identifier', 'Verbosity', + 'MaxRetries', 'PriorityClass', 'GetCHKOnly', 'Global', + 'ClientToken', 'Persistence', 'UploadFrom', + 'TargetURI'), + ('URI', 'Identifier', 'TargetURI'), + None) +PUT_COMPLEX_DIR_DEF = ('ClientPutComplexDir', + ('URI', 'Identifier', 'Verbosity', + 'MaxRetries', 'PriorityClass', 'GetCHKOnly', 'Global', + 'DontCompress', 'ClientToken', 'Persistence', + 'TargetFileName', 'EarlyEncode', 'DefaultName', + 'Files'), #<- one off code in format_params() for this + ('URI', 'Identifier'), + None) + +REMOVE_REQUEST_DEF = ('RemoveRequest', ('Identifier', 'Global'), None, None) + +# REDFLAG: Shouldn't assert on bad data! raise instead. +# Hmmmm... I hacked this together by unwinding a "pull" parser +# to make a "push" parser. Feels like there's too much code here. +class FCPParser: + """Parse a raw byte stream into FCP messages and trailing data blobs. + + Push bytes into the parser by calling FCPParser.parse_bytes(). + Set FCPParser.msg_callback to get the resulting FCP messages. + Set FCPParser.context_callback to control how trailing data is written. + See RequestContext in the fcpconnection module for an example of how + contexts are supposed to work. + + NOTE: This only handles byte level presentation. It DOES NOT validate + that the incoming messages are correct w.r.t. the FCP 2.0 spec. + """ + def __init__(self): + self.msg = None + self.prev_chunk = "" + self.data_context = None + + # lambda's prevent pylint E1102 warning + # Called for each parsed message. + self.msg_callback = lambda msg:None + + # MUST set this callback. + # Return the RequestContext for the request_id + self.context_callback = None #lambda request_id:RequestContext() + + def handle_line(self, line): + """ INTERNAL: Process a single line of an FCP message. """ + if not line: + return False + + if not self.msg: + # Start of a new message + self.msg = [line, {}] + return False + + pos = line.find('=') + if pos != -1: + # name=value pair + fields = (line[:pos], line[pos + 1:]) + # CANNOT just split + # fields = line.split('=') + # e.g. + # ExtraDescription=Invalid precompressed size: 81588 maxlength=10 + assert len(fields) == 2 + self.msg[1][fields[0].strip()] = fields[1].strip() + else: + # end of message line + if line == 'Data': + # Handle trailing data + assert self.msg + # REDFLAG: runtime protocol error (should never happen) + assert 'Identifier' in self.msg[1] + assert not self.data_context + self.data_context = self.context_callback(self.msg[1] + ['Identifier']) + self.data_context.data_sink.initialize(int(self.msg[1] + ['DataLength']), + self.data_context. + file_name) + return True + + assert line == 'End' or line == 'EndMessage' + msg = self.msg + self.msg = None + assert not self.data_context or self.data_context.writable() == 0 + self.msg_callback(msg) + + return False + + def handle_data(self, data): + """ INTERNAL: Handle trailing data following an FCP message. """ + #print "RECVD: ", len(data), "bytes of data." + assert self.data_context + self.data_context.data_sink.write_bytes(data) + if self.data_context.writable() == 0: + assert self.msg + msg = self.msg + self.msg = None + self.data_context = None + self.msg_callback(msg) + + def parse_bytes(self, bytes): + """ This method drives an FCP Message parser and eventually causes + calls into msg_callback(). + """ + #print "FCPParser.parse_bytes -- called" + if self.data_context and self.data_context.writable(): + # Expecting raw data. + assert not self.prev_chunk + data = bytes[:self.data_context.writable()] + self.handle_data(data) # MUST handle msg notification! + bytes = bytes[len(data):] + if bytes: + # Hmmm... recursion depth + self.parse_bytes(bytes) + else: + # Expecting a \n terminated line. + bytes = self.prev_chunk + bytes + self.prev_chunk = "" + last_eol = -1 + pos = bytes.find('\n') + while pos != -1: + if last_eol <= 0: + last_eol = 0 + + line = bytes[last_eol:pos].strip() + last_eol = pos + if self.handle_line(line): + # Reading trailing data + # Hmmm... recursion depth + self.parse_bytes(bytes[last_eol + 1:]) + return + pos = bytes.find('\n', last_eol + 1) + + assert not self.data_context or not self.data_context.writable() + self.prev_chunk = bytes[last_eol + 1:] + diff --git a/infocalypse/graph.py b/infocalypse/graph.py new file mode 100644 --- /dev/null +++ b/infocalypse/graph.py @@ -0,0 +1,821 @@ +""" Infocalypse Freenet hg repo update graph. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +# REDFLAG: Document how dealing with missing indices works... +# REDFLAG: Document how pruning works. +# REDFLAG: Remove unused crap from this file +# REDFLAG: push MAX_PATH_LEN into graph class -> max_cannonical_len + +import copy +import mercurial +import os +import random + +from binascii import hexlify +from mercurial import commands + +# Index for an empty repo. +FIRST_INDEX = -1 +NULL_REV = '0000000000000000000000000000000000000000' +PENDING_INSERT = 'pending' +PENDING_INSERT1 = 'pending1' +MAX_PATH_LEN = 4 + +INSERT_NORMAL = 1 # Don't transform inserted data. +INSERT_PADDED = 2 # Add one trailing byte. +INSERT_SALTED_METADATA = 3 # Salt Freenet splitfile metadata. + +# The size of Freenet data blocks. +FREENET_BLOCK_LEN = 32 * 1024 + +# Hmmm... arbitrary. +MAX_REDUNDANT_LENGTH = 128 * 1024 + +# HACK: Approximate multi-level splitfile boundry +MAX_METADATA_HACK_LEN = 7 * 1024 * 1024 + +############################################################ +# Mercurial helper functions. +def hex_version(repo, version = 'tip', offset=0): + """ Returns the 40 digit hex changeset id for an changeset in repo. """ + #print "hex_version -- ", version + ctx = repo.changectx(version) + assert not ctx is None + if offset != 0: + ctx = repo.changectx(ctx.rev() + offset) + assert not ctx is None + return hexlify(ctx.node()) + +def has_version(repo, version): + """ Returns True if repo already contains the changeset version, + False otherwise. """ + try: + # Is there a faster way? + repo.changectx(version) + except mercurial.repo.RepoError: + return False + return True + +def pull_bundle(repo, ui_, bundle_file): + """ Pull an hg bundle file. + + bundle_file must be an absolute path. + """ + + # REDFLAG: djk20090319, is this still an issue? + # IMPORTANT: + # You must be in the repository root directory in order to pull + # from the bundle. This is not obvious from the Hg doc. + # + # See: http://marc.info/?l=mercurial&m=118362491521186&w=2 + # + # MUST use --cwd + # MUST use an absolute path for the bundle field + prev_cwd = os.getcwd() + os.chdir(repo.root) + try: + commands.pull(ui_, repo, bundle_file, rev=[], + force=None, update=None) + finally: + os.chdir(prev_cwd) + +############################################################ + +def cmp_age_weight(path_a, path_b): + """ Comparison function used to sort paths in ascending order + of 'canonicalness'. """ + # Only works for equivalent paths! + assert path_a[0][0] == path_b[0][0] + assert path_b[-1][1] == path_b[-1][1] + + # NO! path step tuples contain a third entry which keeps this + # from working. + # if path_a == path_b: + # return 0 + + index = 0 + while index < len(path_a) and index < len(path_b): + if path_a[index][1] == path_b[index][1]: + if path_a[index][2] == path_b[index][2]: + index += 1 + continue + # If the edges are the same age prefer the one + # the the lower (i.e. older) CHK ordinal. + return path_b[index][2] - path_a[index][2] + return path_a[index][1] - path_b[index][1] + + #print "CMP == ", path_a, path_b + return 0 + +def block_cost(length): + """ Return the number of Freenet blocks required to store + data of length, length. """ + blocks = length/FREENET_BLOCK_LEN + if (length % FREENET_BLOCK_LEN) != 0: + blocks += 1 + return blocks + +############################################################ +# Doesn't dump FIRST_INDEX entry. +def graph_to_string(graph): + """ Returns a human readable representation of the graph. """ + lines = [] + # Indices + indices = graph.index_table.keys() + indices.sort() + for index in indices: + if index == FIRST_INDEX: + continue + + entry = graph.index_table[index] + #print entry + lines.append("I:%i:%s:%s" % (index, entry[0], entry[1])) + + # Edges + index_pairs = graph.edge_table.keys() + for index_pair in index_pairs: + edge_info = graph.edge_table[index_pair] + as_str = ':'.join(edge_info[1:]) + if as_str != '': + as_str = ':' + as_str + lines.append("E:%i:%i:%i%s" % (index_pair[0], index_pair[1], + edge_info[0], + as_str)) + + return '\n'.join(lines) + '\n' + +def parse_graph(text): + """ Returns a graph parsed from text. + text must be in the format used by graph_to_string(). + Lines starting with '#' are ignored. + """ + + graph = UpdateGraph() + lines = text.split('\n') + for line in lines: + fields = line.split(':') + if fields[0] == 'I': + if len(fields) != 4: + raise ValueError("Exception parsing index values.") + index = int(fields[1]) + if index in graph.index_table: + print "OVERWRITING INDEX: " , index + if len(tuple(fields[2:])) != 2: + raise ValueError("Error parsing index value: %i" % index) + graph.index_table[index] = tuple(fields[2:]) + elif fields[0] == 'E': + #print fields + if len(fields) < 5: + raise ValueError("Exception parsing edge values.") + index_pair = (int(fields[1]), int(fields[2])) + length = int(fields[3]) + chk_list = [] + for chk in fields[4:]: + chk_list.append(chk) + graph.edge_table[index_pair] = tuple([length, ] + chk_list) + #else: + # print "SKIPPED LINE:" + # print line + indices = graph.index_table.keys() + if len(indices) == 0: + raise ValueError("No indices?") + indices.sort() + graph.latest_index = indices[-1] + + graph.rep_invariant() + + return graph + +############################################################ + +class UpdateGraphException(Exception): + """ Base class for UpdateGraph exceptions. """ + def __init__(self, msg): + Exception.__init__(self, msg) + +class UpToDate(UpdateGraphException): + """ Exception thrown to indicate that an update failed because + the graph already contains the specified local changes. """ + def __init__(self, msg): + UpdateGraphException.__init__(self, msg) + +class UpdateGraph: + """ A digraph representing an Infocalypse Freenet + hg repository. """ + + def __init__(self): + # Vertices in the update digraph. + # index_ordinal -> (start_rev, end_rev) + self.index_table = {FIRST_INDEX:(NULL_REV, NULL_REV)} + + # These are edges in the update digraph. + # There can be multiple redundant edges. + # + # This is what is actually stored in Freenet. + # Edges contain changesets for the indices from + # start_index + 1 to end_index, but not for start_index. + # (start_index, end_index) -> (length, chk@, chk@, ...) + self.edge_table = {} + + # Bound path search length. + self.max_search_path = 10 + + self.latest_index = -1 + + def clone(self): + """ Return a deep copy of the graph. """ + return copy.deepcopy(self) # REDFLAG: correct + + # Contains the end_index changesets but not the start index changesets. + def add_edge(self, index_pair, length_chk_pair): + """ Add a new edge to the graph. """ + assert len(index_pair) == 2 + assert len(length_chk_pair) == 2 + assert index_pair[0] <= index_pair[1] + + edge_info = self.edge_table.get(index_pair) + if edge_info is None: + edge_info = tuple(length_chk_pair) + else: + if length_chk_pair[0] != edge_info[0]: + raise ValueError("Redundant edge doesn't have same length.") + edge_info = list(edge_info) + edge_info.append(length_chk_pair[1]) + edge_info = tuple(edge_info) + + self.edge_table[index_pair] = edge_info + return (index_pair[0], index_pair[1], + len(self.edge_table[index_pair]) - 2) + + def subgraph(self, containing_paths): + """ Return a subgraph which contains the vertices and + edges in containing_paths. """ + self.rep_invariant() + graph = UpdateGraph() + max_index = -1 + + for path in containing_paths: + for step in path: + pair = step[:2] + # REDFLAG: copies ALL redundant paths + graph.edge_table[pair] = self.edge_table[pair][:] + for index in pair: + if index not in graph.index_table: + graph.index_table[index] = self.index_table[index][:] + max_index = max(max_index, index) + + graph.latest_index = max_index + graph.rep_invariant() + return graph + + ############################################################ + # Helper functions used when inserting / requesting + # the edge CHKs. + ############################################################ + def has_chk(self, edge_triple): + """ Return True if the graph has a CHK for the edge, + false otherwise. """ + chk = self.edge_table.get(edge_triple[:2])[1:][edge_triple[2]] + return chk.startswith('CHK@') # Hmmm... False for pending??? + + def get_chk(self, edge_triple): + """ Return the CHK for an edge. """ + return self.edge_table[edge_triple[:2]][1:][edge_triple[2]] + + def get_length(self, edge_triple): + """ Return the length of the hg bundle file for an edge. """ + return self.edge_table.get(edge_triple[:2])[0] + + def is_redundant(self, edge_triple): + """ Return True if there is more than one CHK for the + edge, False otherwise. """ + return len(self.edge_table[edge_triple[:2]]) > 2 + + # REDFLAG: fix signature to take an edge triplet? + # Hmmm... too much paranoia. just assert? + def set_chk(self, index_pair, ordinal, length, chk): + """ Set the CHK for an edge. """ + edge_info = self.edge_table.get(index_pair) + if edge_info is None: + raise UpdateGraphException("No such edge: %s" % str(index_pair)) + edge_list = list(edge_info) + if len(edge_list) < ordinal + 2: + raise UpdateGraphException("No such chk ordinal: [%s]:%i" + % (str(index_pair), ordinal)) + if edge_list[0] != length: + raise UpdateGraphException("Length mismatch: [%s]:%i" + % (str(index_pair), ordinal)) + if not edge_list[ordinal + 1].startswith(PENDING_INSERT): + print "set_chk -- replacing a non pending chk (%i, %i, %i)?" % \ + (index_pair[0], index_pair[1], ordinal) + edge_list[ordinal + 1] = chk + self.edge_table[index_pair] = tuple(edge_list) + + def insert_type(self, edge_triple): + """ Return the kind of insert required to insert the CHK + for the edge. + + INSERT_NORMAL -> No modification to the bundle file. + INSERT_PADDED -> Add one trailing pad byte. + INSERT_SALTED_METADATA -> Copy and salt the Freenet + split file metadata for the normal insert. """ + edge_info = self.edge_table[edge_triple[:2]] + #print "insert_type -- ", edge_triple, entry + if edge_info[edge_triple[2] + 1] == PENDING_INSERT: + return INSERT_NORMAL + if edge_info[edge_triple[2] + 1] != PENDING_INSERT1: + raise ValueError("CHK already set?") + if edge_info[0] <= FREENET_BLOCK_LEN: + return INSERT_PADDED + return INSERT_SALTED_METADATA + + def insert_length(self, step): + """ Returns the actual length of the data inserted into + Freenet for the edge. """ + length = self.edge_table.get(step[:2])[0] + if step[2] == 0: + # No hacks on primary insert. + return length + if length < FREENET_BLOCK_LEN: + # Made redundant path by padding. + return length + 1 + + # Salted the metadata. Data length unaffected. + return length + + ############################################################ + + # REDFLAG: really no need for ui? if so, remove arg + # Index and edges to insert + # Returns index triples with new edges that need to be inserted. + def update(self, repo, dummy, version, cache): + """ Update the graph to include versions up to version + in repo. + + This may add multiple edges for redundancy. + + Returns the new edges. + + The client code is responsible for setting their CHKs!""" + + if self.latest_index > FIRST_INDEX: + if (repo.changectx(version).rev() <= + repo.changectx(self.index_table[self.latest_index][1]).rev()): + raise UpToDate("Version: %s is already in the repo." % + hex_version(repo, version)[:12]) + + new_edges = [] + + # Add changes to graph. + prev_changes = self.index_table[self.latest_index] + parent_rev = prev_changes[1] + # REDFLAG: Think. What are the implicit assumptions here? + first_rev = hex_version(repo, prev_changes[1], 1) + latest_rev = hex_version(repo, version) + + index = self._add_changes(parent_rev, first_rev, latest_rev) + #print "ADDED INDEX: ", index + #print self.index_table + # Insert index w/ rollup if possible. + first_bundle = cache.make_redundant_bundle(self, index) + + new_edges.append(self.add_edge(first_bundle[2], + (first_bundle[0], PENDING_INSERT))) + #print "ADDED EDGE: ", new_edges[-1] + + canonical_path = self.canonical_path(index, MAX_PATH_LEN + 1) + assert len(canonical_path) <= MAX_PATH_LEN + 1 + + bundle = None + if len(canonical_path) > MAX_PATH_LEN: + print "CANNONICAL LEN: ", len(canonical_path) + short_cut = self._compress_canonical_path(index, MAX_PATH_LEN + 1) + bundle = cache.make_bundle(self, short_cut) + new_edges.append(self.add_edge(bundle[2], + (bundle[0], PENDING_INSERT))) + canonical_path = self.canonical_path(index, MAX_PATH_LEN + 1) + assert len(canonical_path) <= MAX_PATH_LEN + + if bundle == None: + if (first_bundle[0] <= FREENET_BLOCK_LEN and + first_bundle[2][0] < index - 1): + # This gives us redundancy at the cost of one 32K block. + bundle = cache.make_bundle(self, + (first_bundle[2][0] + 1, + index)) + new_edges.append(self.add_edge(bundle[2], + (bundle[0], + PENDING_INSERT))) + elif first_bundle[0] <= MAX_METADATA_HACK_LEN: + # Request insert of a redundant copy of exactly the same + # bundle. + bundle = first_bundle[:] + new_edges.append(self.add_edge(bundle[2], (bundle[0], + PENDING_INSERT1))) + else: + print "update -- Bundle too big to add redundant CHK: %i" \ + % first_bundle[0] + + new_edges = new_edges + self._add_canonical_path_redundancy() + + return new_edges + + def get_top_key_edges(self): + """ Returns the ordered list of edges that should be + included in the top key. """ + self.rep_invariant() + + edges = [] + + #print "LATEST_INDEX: ", self.latest_index + + paths = self.enumerate_update_paths(self.latest_index, + self.latest_index, 1) + #print paths + + paths.sort(self._cmp_block_cost) + #dump_paths(self, paths, "Paths sorted by block cost") + + if len(paths) > 0: + # Path with the most changes in the least blocks. + edges.append(paths[0][0]) + del paths[0] + + if len(paths) > 0: + # REDFLAG: == 32k case for padding crosses block boundry... + if (block_cost(self.path_cost([edges[0], ])) == + block_cost(self.path_cost(paths[0]))): + # One more at the same cost if there is one. + edges.append(paths[0][0]) + del paths[0] + + # The canonical path + path = list(self.canonical_path(self.latest_index, MAX_PATH_LEN)) + + path.reverse() # most recent first. + for step in path: + if not step in edges: + edges.append(step) + + # 2 possibly redundant immediate update keys, and MAX_PATH_LEN + # canonical path keys. Actually one of the canonical keys + # should already be in the immediate updates. + assert len(edges) < 4 + MAX_PATH_LEN + return edges + + def enumerate_update_paths(self, containing_start, to_end, max_len, + partial_path=()): + + """ INTERNAL: Returns a list of paths from the start index to the end + index. """ + + if max_len <= 0: + return [] + ret = [] + + candidates = self.contain(containing_start) + #print "CANDIDATES: ", candidates + for candidate in candidates: + if candidate[1] >= to_end: + ret.append(partial_path + (candidate,)) + else: + ret += self.enumerate_update_paths(candidate[1] + 1, to_end, + max_len - 1, + partial_path + + (candidate,)) + return ret + + # REQUIRES: Using the same index mappings! + def copy_path(self, from_graph, path): + """ Copy a path from one graph to another. """ + copied = False + for step in path: + pair = step[:2] + if not pair in self.edge_table: + copied = True + self.edge_table[pair] = ( + from_graph.edge_table[pair][:]) # Deep copy + for index in pair: + if index not in self.index_table: + self.index_table[index] = ( + from_graph.index_table[index][:]) # Deep copy + return copied + + def canonical_path(self, to_index, max_search_len): + """ Returns shortest preferred path from no updates + to latest_index. + + This is what you would use to bootstrap from hg rev -1. """ + + return self.canonical_paths(to_index, max_search_len)[-1] + + def canonical_paths(self, to_index, max_search_len): + """ Returns a list of paths from no updates to to_index in + ascending order of 'canonicalness'. i.e. so you + can pop() the candidates off the list. """ + + paths = self.enumerate_update_paths(0, to_index, max_search_len) + if len(paths) == 0: + raise UpdateGraphException("No such path: %s" + % str((0, to_index))) + + paths.sort(cmp_age_weight) + return paths + + def path_cost(self, path): + """ The sum of the lengths of the hg bundles required to update + using the path. """ + + value = 0 + for step in path: + value += self.edge_table[step[:2]][0] + return value + + # Returns ((start_index, end_index, chk_list_ordinal), ...) + def contain(self, contains_index): + """ Returns a list of edge triples which contain contains_index. """ + ret = [] + for pair in self.edge_table: + if pair[0] >= contains_index: + continue + if pair[1] < contains_index: + continue + for index in range(0, len(self.edge_table[pair]) - 1): + ret.append(pair + (index,)) + return ret + + def cmp_recency(self, path_a, path_b): + """ INTERNAL: A comparison function for sorting single edge paths + by recency. """ + # Only for steps + assert len(path_a) == 1 + assert len(path_b) == 1 + + # Only steps in the paths. + step_a = path_a[0] + step_b = path_b[0] + + if step_a[1] == step_b[1]: + if step_a[2] == step_b[2]: + # Ascending Length. TRICKY: because of padding hacks. + return (self.insert_length(step_a) + - self.insert_length(step_b)) + # Ascending redundancy. i.e. "most canonical" first + return step_a[2] - step_b[2] + + # descending initial update. i.e. Most recent first. + return step_b[1] - step_a[1] + + # REDFLAG: add_index instead ??? + # REDFLAG: rethink parent_rev + def _add_changes(self, parent_rev, first_rev, last_rev): + """ Add changes to the graph. """ + assert parent_rev == self.index_table[self.latest_index][1] + self.latest_index += 1 + self.index_table[self.latest_index] = (first_rev, last_rev) + return self.latest_index + + def _cmp_block_cost(self, path_a, path_b): + """ INTERNAL: A comparison function for sorting single edge paths + in order of ascending order of block count. """ + assert len(path_a) == 1 + assert len(path_b) == 1 + + cost_a = self.insert_length(path_a[0]) + cost_b = self.insert_length(path_b[0]) + + # Actually block cost - 1, but that's ok. + block_cost_a = cost_a / FREENET_BLOCK_LEN + block_cost_b = cost_b / FREENET_BLOCK_LEN + + if block_cost_a == block_cost_b: + mod_a = cost_a % FREENET_BLOCK_LEN + mod_b = cost_b % FREENET_BLOCK_LEN + if mod_a == mod_b: + # Ascending order of redundancy ordinal. + return int(path_a[0][2] - path_b[0][2]) + + # Descending order of length (for same block size) + return int(mod_b - mod_a) + + # Ascending order of length in blocks + return int(block_cost_a - block_cost_b) + + # REDFLAG: Can the edge already exists? + # Only makes sense for latest index. get rid of latest_index argument? + + # enforce constraint that sum of costs head must be less + # than the cost for the prev step. power law??? + + # REQURIES: len(canonical_path(latest_index)) > 1 + def _compress_canonical_path(self, to_index, max_search_len=10): + """ Return an index tuple for a new shortcut path that would + reduces the canonical path length by at least one, favoring + accumulation of hg bundle size at the start of the path. """ + + + shortest_known = self.canonical_path(to_index, max_search_len) + #print "SHORTEST_KNOWN: ", shortest_known + assert len(shortest_known) > 1 + + if len(shortest_known) == 2: + # We only have one move. + return (shortest_known[0][0], shortest_known[-1][1]) + + # REDFLAG: Shouldn't this be using block cost? + for index in range(1, len(shortest_known)): + prev_cost = self.path_cost((shortest_known[index - 1],)) + if self.path_cost(shortest_known[index:]) > prev_cost: + return (shortest_known[index - 1][0], shortest_known[-1][1]) + return (shortest_known[-2][0], shortest_known[-1][1]) + + def _add_canonical_path_redundancy(self): + """ Adds redundant edges for steps on the canonical path. + + Returns the new edges. + """ + ret = [] + path = self.canonical_path(self.latest_index, MAX_PATH_LEN) + for index, step in enumerate(path): + if index == MAX_PATH_LEN - 1: + # Don't try to add redundancy to the last (latest) step + break + entries = self.edge_table[step[:2]] + if len(entries) > 2: + # Already redundant + continue + assert step[2] == 0 + assert entries[1] != PENDING_INSERT1 + if entries[0] <= FREENET_BLOCK_LEN: + #print "_add_canonical_path_redundancy -- too small: ", \ + # str(step) + continue + if entries[0] > MAX_METADATA_HACK_LEN: + #print "_add_canonical_path_redundancy -- too big: ", str(step) + continue + edge = self.add_edge(step[:2], (entries[0][0], PENDING_INSERT1)) + #print "_add_canonical_path_redundancy -- added edge: ", str(edge) + ret.append(edge) + return ret + + def rep_invariant(self): + """ Debugging function to check invariants. """ + max_index = -1 + for index in self.index_table.keys(): + max_index = max(index, max_index) + + assert self.latest_index == max_index + + for edge in self.edge_table.keys(): + assert edge[0] in self.index_table + assert edge[1] in self.index_table + +# REDFLAG: O(n), has_index(). +def latest_index(graph, repo): + """ Returns the index of the latest hg version in the graph + that exists in repo. """ + graph.rep_invariant() + for index in range(graph.latest_index, FIRST_INDEX - 1, -1): + if not index in graph.index_table: + continue + if has_version(repo, graph.index_table[index][1]): + return index + return FIRST_INDEX + +# REDFLAG: fix this so that it always includes pending edges. +def minimal_update_graph(graph, max_size=32*1024, + formatter_func=graph_to_string): + """ Returns a subgraph that can be formatted to <= max_size + bytes with formatter_func. """ + + index = graph.latest_index + assert index > FIRST_INDEX + + # All the edges that would be included in the top key. + # This includes the canonical bootstrap path and the + # two cheapest updates from the previous index. + paths = [[edge, ] for edge in graph.get_top_key_edges()] + + minimal = graph.subgraph(paths) + if len(formatter_func(minimal)) > max_size: + raise UpdateGraphException("Too big with only required paths.") + + # REDFLAG: read up on clone() + prev_minimal = minimal.clone() + + # Then add all other full bootstrap paths. + canonical_paths = graph.canonical_paths(index, MAX_PATH_LEN) + + while len(canonical_paths): + if minimal.copy_path(graph, canonical_paths.pop()): + size = len(formatter_func(minimal)) + #print "minimal_update_graph -- size: %i " % size + if size > max_size: + return prev_minimal + else: + prev_minimal = minimal.clone() + + if index == 0: + return prev_minimal + + # Favors older edges + # Then add bootstrap paths back to previous indices + for upper_index in range(index - 1, FIRST_INDEX, - 1): + canonical_paths = graph.canonical_paths(upper_index, MAX_PATH_LEN) + while len(canonical_paths): + if minimal.copy_path(graph, canonical_paths.pop()): + size = len(formatter_func(minimal)) + #print "minimal_update_graph -- size(1): %i" % size + if size > max_size: + return prev_minimal + else: + prev_minimal = minimal.clone() + + return prev_minimal + + +def chk_to_edge_triple_map(graph): + """ Returns a CHK -> edge triple map. """ + ret = {} + for edge in graph.edge_table: + #print "EDGE: ", edge + chks = graph.edge_table[edge][1:] + #print "ENTRIES: ", entries + for index, chk in enumerate(chks): + assert ret.get(chk) is None + ret[chk] = (edge[0], edge[1], index) + return ret + +def break_edges(graph, kill_probability, skip_chks): + """ Testing function breaks edges by replacing the CHKs with a known + bad one. """ + bad_chk = ('CHK@badroutingkeyB55JblbGup0yNSpoDJgVPnL8E5WXoc,' + +'KZ6azHOwEm4ga6dLy6UfbdSzVhJEz3OvIbSS4o5BMKU,AAIC--8') + for edge in graph.edge_table: + edge_info = graph.edge_table[edge[:2]] + length = edge_info[0] + chks = edge_info[1:] + for index in range(0, len(chks)): + if graph.get_chk((edge[0], edge[1], index)) in skip_chks: + # Hack to skip pending requests. + print "break_edges -- skipped: ", (edge[0], edge[1], index) + continue + if random.random() < kill_probability: + graph.set_chk(edge, index, length, bad_chk) + +def pretty_index(index): + """ Format an index value for output. """ + if index == FIRST_INDEX: + return "." + else: + return str(index) + +def dump_path(graph, path): + """ Debugging function to print a path. """ + if len(path) == 0: + print "EMPTY PATH!" + return + + print "(%s)-->[%s] cost=%0.2f" % (pretty_index(path[0][0]), + pretty_index(path[-1][1]), + graph.path_cost(path)) + for step in path: + cost = graph.get_length(step) + print " (%s) -- (%0.2f, %i) --> [%s]" % (pretty_index(step[0]), + cost, + step[2], + pretty_index(step[1])) +def dump_paths(graph, paths, msg): + """ Debugging function to dump a list of paths. """ + print "--- %s ---" % msg + for path in paths: + dump_path(graph, path) + print "---" + +def print_list(msg, values): + """ INTERNAL: Helper function. """ + if msg: + print msg + for value in values: + print " ", value + if len(values) == 0: + print + diff --git a/infocalypse/infcmds.py b/infocalypse/infcmds.py new file mode 100644 --- /dev/null +++ b/infocalypse/infcmds.py @@ -0,0 +1,631 @@ +""" Implementation of commands for Infocalypse mercurial extension. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + + +# infcmds.py has actual implementation +# REDFLAG: cleanup exception handling +# by converting socket.error to IOError in fcpconnection? +# REDFLAG: returning vs aborting. set system exit code. +import os +import socket +import time + +from mercurial import util + +from fcpclient import parse_progress, is_usk, is_ssk, get_version, \ + get_usk_for_usk_version, FCPClient, is_usk_file, is_negative_usk +from fcpconnection import FCPConnection, PolledSocket, CONNECTION_STATES, \ + get_code, FCPError +from requestqueue import RequestRunner + +from graph import UpdateGraph, hex_version +from bundlecache import BundleCache, is_writable +from updatesm import UpdateStateMachine, QUIESCENT, FINISHING, REQUESTING_URI, \ + REQUESTING_GRAPH, REQUESTING_BUNDLES, INVERTING_URI, \ + REQUESTING_URI_4_INSERT, INSERTING_BUNDLES, INSERTING_GRAPH, \ + INSERTING_URI, FAILING + +from config import Config, DEFAULT_CFG_PATH + +DEFAULT_PARAMS = { + # FCP params + 'MaxRetries':1, + 'PriorityClass':1, + 'DontCompress':True, # hg bundles are already compressed. + 'Verbosity':1023, # MUST set this to get progress messages. + + # Non-FCP stuff + 'N_CONCURRENT':4, # Maximum number of concurrent FCP requests. + 'CANCEL_TIME_SECS': 15 * 60, # Bound request time. + 'POLL_SECS':0.25, # Time to sleep in the polling loop. + } + + +MSG_TABLE = {(QUIESCENT, REQUESTING_URI_4_INSERT) + :"Requesting previous URI...", + (REQUESTING_URI_4_INSERT,REQUESTING_GRAPH) + :"Requesting previous graph...", + (INSERTING_BUNDLES,INSERTING_GRAPH) + :"Inserting updated graph...", + (INSERTING_GRAPH, INSERTING_URI) + :"Inserting URI...", + (QUIESCENT, REQUESTING_URI) + :"Fetching URI...", + (REQUESTING_URI, REQUESTING_BUNDLES) + :"Fetching bundles...", + } + +class UICallbacks: + """ Display callback output with a ui instance. """ + def __init__(self, ui_): + self.ui_ = ui_ + self.verbosity = 0 + + def connection_state(self, dummy, state): + """ FCPConnection.state_callback function which writes to a ui. """ + + if self.verbosity < 2: + return + + value = CONNECTION_STATES.get(state) + if not value: + value = "UNKNOWN" + + self.ui_.status("FCP connection [%s]\n" % value) + + def transition_callback(self, from_state, to_state): + """ StateMachine transition callback that writes to a ui.""" + if self.verbosity < 1: + return + if self.verbosity > 2: + self.ui_.status("[%s]->[%s]\n" % (from_state.name, to_state.name)) + return + if to_state.name == FAILING: + self.ui_.status("Cleaning up after failure...\n") + return + if to_state.name == FINISHING: + self.ui_.status("Cleaning up...\n") + return + msg = MSG_TABLE.get((from_state.name, to_state.name)) + if not msg is None: + self.ui_.status("%s\n" % msg) + + def monitor_callback(self, dummy, client, msg): + """ FCP message status callback which writes to a ui. """ + if self.verbosity < 2: + return + + #prefix = update_sm.current_state.name + prefix = '' + if self.verbosity > 2: + prefix = client.request_id()[:10] + ':' + #if hasattr(update_sm.current_state, 'pending'): + # prefix = str(len(update_sm.current_state.pending)) + ":" + prefix + #else: + # prefix = "?:" + prefix + + if msg[0] == 'SimpleProgress': + text = str(parse_progress(msg)) + elif msg[0] == 'URIGenerated': + return # shows up twice + #elif msg[0] == 'PutSuccessful': + # text = 'PutSuccessful:' + msg[1]['URI'] + elif msg[0] == 'ProtocolError': + text = 'ProtocolError:' + str(msg) + elif msg[0] == 'AllData': + # Don't try to print raw data. + text = 'AllData: length=%s' % msg[1].get('DataLength', '???') + elif msg[0].find('Failed') != -1: + code = get_code(msg) or -1 + redirect = '' + if (code == 27 and 'RedirectURI' in msg[1] + and is_usk(msg[1]['RedirectURI'])): + redirect = ", redirected to version: %i" % \ + get_version(msg[1]['RedirectURI']) + + text = "%s: code=%i%s" % (msg[0], code, redirect) + else: + text = msg[0] + + self.ui_.status("%s%s:%s\n" % (prefix, str(client.tag), text)) + # REDFLAG: re-add full dumping of FCP errors at debug level? + #if msg[0].find('Failed') != -1 or msg[0].find('Error') != -1: + # print client.in_params.pretty() + # print msg + # print "FINISHED:" , client.is_finished(), + #bool(client.is_finished()) + + +# Paranoia? Just stat? I'm afraid of problems/differences w/ Windoze. +# Hmmmm... SUSPECT. Abuse of mercurial ui design intent. +# ISSUE: I don't just want to suppress/include output. +# I use this value to keep from running code which isn't +# required. +def get_verbosity(ui_): + """ INTERNAL: Get the verbosity levl from the state of a ui. """ + if ui_.debugflag: + return 5 # Graph, candidates, canonical paths + elif ui_.verbose: + return 2 # FCP message status + elif ui_.quiet: + # Hmmm... still not 0 output + return 0 + else: + return 1 # No FCP message status + +def get_config_info(ui_, opts): + """ INTERNAL: Read configuration info out of the config file and + or command line options. """ + + cfg = Config.from_ui(ui_) + + if opts.get('fcphost') != '': + cfg.defaults['HOST'] = opts['fcphost'] + if opts.get('fcpport') != 0: + cfg.defaults['PORT'] = opts['fcpport'] + + params = DEFAULT_PARAMS.copy() + params['FCP_HOST'] = cfg.defaults['HOST'] + params['FCP_PORT'] = cfg.defaults['PORT'] + params['TMP_DIR'] = cfg.defaults['TMP_DIR'] + params['VERBOSITY'] = get_verbosity(ui_) + params['AGGRESSIVE_SEARCH'] = bool(opts.get('aggressive')) + return (params, cfg) + +# Hmmmm USK@/style_keys/0 +def check_uri(ui_, uri): + """ INTERNAL: Abort if uri is not supported. """ + if uri is None: + return + + if is_usk(uri): + if not is_usk_file(uri): + ui_.status("Only file USKs are allowed." + + "\nMake sure the URI ends with '/<number>' " + + "with no trailing '/'.\n") + raise util.Abort("Non-file USK %s\n" % uri) + # Just fix it instead of doing B&H? + if is_negative_usk(uri): + ui_.status("Negative USK index values are not allowed." + + "\nUse --aggressive instead. \n") + raise util.Abort("Negative USK %s\n" % uri) + +# REDFLAG: remove store_cfg +def setup(ui_, repo, params, stored_cfg): + """ INTERNAL: Setup to run an Infocalypse extension command. """ + # REDFLAG: choose another name. Confusion w/ fcp param + # REDFLAG: add an hg param and get rid of this line. + #params['VERBOSITY'] = 1 + + check_uri(ui_, params.get('INSERT_URI')) + check_uri(ui_, params.get('REQUEST_URI')) + + if not is_writable(os.path.expanduser(stored_cfg.defaults['TMP_DIR'])): + raise util.Abort("Can't write to temp dir: %s\n" + % stored_cfg.defaults['TMP_DIR']) + + verbosity = params.get('VERBOSITY', 1) + if verbosity > 2 and params.get('DUMP_GRAPH', None) is None: + params['DUMP_GRAPH'] = True + if verbosity > 3 and params.get('DUMP_UPDATE_EDGES', None) is None: + params['DUMP_UPDATE_EDGES'] = True + if verbosity > 4 and params.get('DUMP_CANONICAL_PATHS', None) is None: + params['DUMP_CANONICAL_PATHS'] = True + params['DUMP_URIS'] = True + callbacks = UICallbacks(ui_) + callbacks.verbosity = verbosity + + cache = BundleCache(repo, ui_, params['TMP_DIR']) + + try: + async_socket = PolledSocket(params['FCP_HOST'], params['FCP_PORT']) + connection = FCPConnection(async_socket, True, + callbacks.connection_state) + except socket.error, err: # Not an IOError until 2.6. + ui_.warn("Connection to FCP server [%s:%i] failed.\n" + % (params['FCP_HOST'], params['FCP_PORT'])) + raise err + except IOError, err: + ui_.warn("Connection to FCP server [%s:%i] failed.\n" + % (params['FCP_HOST'], params['FCP_PORT'])) + raise err + + runner = RequestRunner(connection, params['N_CONCURRENT']) + + update_sm = UpdateStateMachine(runner, repo, ui_, cache) + update_sm.params = params.copy() + update_sm.transition_callback = callbacks.transition_callback + update_sm.monitor_callback = callbacks.monitor_callback + + return update_sm + +def run_until_quiescent(update_sm, poll_secs, close_socket=True): + """ Run the state machine until it reaches the QUIESCENT state. """ + runner = update_sm.runner + assert not runner is None + connection = runner.connection + assert not connection is None + raised = True + try: + while update_sm.current_state.name != QUIESCENT: + # Poll the FCP Connection. + try: + connection.socket.poll() + # Indirectly nudge the state machine. + update_sm.runner.kick() + except socket.error: # Not an IOError until 2.6. + update_sm.ui_.warn("Exiting because of an error on " + + "the FCP socket.\n") + raise + except IOError: + # REDLAG: better message. + update_sm.ui_.warn("Exiting because of an IO error.\n") + raise + # Rest :-) + time.sleep(poll_secs) + raised = False + finally: + if raised or close_socket: + update_sm.runner.connection.close() + +def cleanup(update_sm): + """ INTERNAL: Cleanup after running an Infocalypse command. """ + if update_sm is None: + return + + if not update_sm.runner is None: + update_sm.runner.connection.close() + + update_sm.ctx.bundle_cache.remove_files() + +# REDFLAG: better name. 0) inverts 1) updates indices from cached state. +# 2) key substitutions. +def handle_key_inversion(ui_, update_sm, params, stored_cfg): + """ INTERNAL: Handle inverting/updating keys before running a command.""" + insert_uri = params.get('INSERT_URI') + if not insert_uri is None and insert_uri.startswith('USK@/'): + insert_uri = ('USK' + + stored_cfg.defaults['DEFAULT_PRIVATE_KEY'][3:] + + insert_uri[5:]) + ui_.status("Filled in the insert URI using the default private key.\n") + + if insert_uri is None or not (is_usk(insert_uri) or is_ssk(insert_uri)): + return (params.get('REQUEST_URI'), False) + + update_sm.start_inverting(insert_uri) + run_until_quiescent(update_sm, params['POLL_SECS'], False) + if update_sm.get_state(QUIESCENT).prev_state != INVERTING_URI: + raise util.Abort("Couldn't invert private key:\n%s" % insert_uri) + + inverted_uri = update_sm.get_state(INVERTING_URI).get_request_uri() + params['INVERTED_INSERT_URI'] = inverted_uri + + if is_usk(insert_uri): + version = get_version(insert_uri) + # Latest previously known version of the insert_uri's request_uri. + # Can be None. + max_index = max(stored_cfg.get_index(inverted_uri), version) + request_uri = params.get('REQUEST_URI') + if not request_uri is None and is_usk(request_uri): + max_index = max(get_version(request_uri), max_index) + # Update Request URI to the latest known version. + params['REQUEST_URI'] = get_usk_for_usk_version(request_uri, + max_index) + # Update the Insert URI to the latest known version. + params['INSERT_URI'] = get_usk_for_usk_version(insert_uri, + max_index) + + # Update the inverted Insert URI to the latest known version. + params['INVERTED_INSERT_URI'] = get_usk_for_usk_version( + inverted_uri, + max_index) + + # Skip key inversion if we already inverted the insert_uri. + request_uri = params.get('REQUEST_URI') + is_keypair = False + if (request_uri is None and + not params.get('INVERTED_INSERT_URI') is None): + request_uri = params['INVERTED_INSERT_URI'] + is_keypair = True + + return (request_uri, is_keypair) + +def handle_updating_config(repo, update_sm, params, stored_cfg, + is_pulling=False): + """ INTERNAL: Write updates into the config file IFF the previous + command succeeded. """ + if not is_pulling: + if not update_sm.get_state(QUIESCENT).arrived_from(((FINISHING,))): + return + + if not is_usk_file(params['INSERT_URI']): + return + + inverted_uri = params['INVERTED_INSERT_URI'] + + # Cache the request_uri - insert_uri mapping. + stored_cfg.set_insert_uri(inverted_uri, update_sm.ctx['INSERT_URI']) + + # Cache the updated index for the insert. + version = get_version(update_sm.ctx['INSERT_URI']) + stored_cfg.update_index(inverted_uri, version) + stored_cfg.update_dir(repo.root, inverted_uri) + + # Hmmm... if we wanted to be clever we could update the request + # uri too when it doesn't match the insert uri. Ok for now. + # Only for usks and only on success. + print "UPDATED STORED CONFIG(0)" + Config.to_file(stored_cfg) + + else: + # Only finishing required. same. REDFLAG: look at this again + if not update_sm.get_state(QUIESCENT).arrived_from(( + REQUESTING_BUNDLES, FINISHING)): + return + + if not is_usk(params['REQUEST_URI']): + return + + state = update_sm.get_state(REQUESTING_URI) + updated_uri = state.get_latest_uri() + version = get_version(updated_uri) + stored_cfg.update_index(updated_uri, version) + stored_cfg.update_dir(repo.root, updated_uri) + print "UPDATED STORED CONFIG(1)" + Config.to_file(stored_cfg) + + +def is_redundant(uri): + """ Return True if uri is a file USK and ends in '.R1', + False otherwise. """ + if not is_usk_file(uri): + return '' + fields = uri.split('/') + if not fields[-2].endswith('.R1'): + return '' + return 'Redundant ' + +############################################################ +# User feedback? success, failure? +def execute_create(ui_, repo, params, stored_cfg): + """ Run the create command. """ + update_sm = None + try: + + update_sm = setup(ui_, repo, params, stored_cfg) + # REDFLAG: Do better. + # This call is not necessary, but I do it to set + # 'INVERTED_INSERT_URI'. Write code to fish that + # out of INSERTING_URI instead. + handle_key_inversion(ui_, update_sm, params, stored_cfg) + + ui_.status("%sInsert URI:\n%s\n" % (is_redundant(params['INSERT_URI']), + params['INSERT_URI'])) + #ui_.status("Current tip: %s\n" % hex_version(repo)[:12]) + + update_sm.start_inserting(UpdateGraph(), + params.get('TO_VERSION', 'tip'), + params['INSERT_URI']) + + run_until_quiescent(update_sm, params['POLL_SECS']) + + if update_sm.get_state(QUIESCENT).arrived_from(((FINISHING,))): + ui_.status("Inserted to:\n%s\n" % + '\n'.join(update_sm.get_state(INSERTING_URI). + get_request_uris())) + else: + ui_.status("Create failed.\n") + + handle_updating_config(repo, update_sm, params, stored_cfg) + finally: + cleanup(update_sm) + +# REDFLAG: move into fcpclient? +#def usks_equal(usk_a, usk_b): +# assert is_usk(usk_a) and and is_usk(usk_b) +# return (get_usk_for_usk_version(usk_a, 0) == +# get_usk_for_usk_version(usk_b, 0)) + +# REDFLAG: reading from on uri and inserting to another isn't +# fully working yet +def execute_push(ui_, repo, params, stored_cfg): + """ Run the push command. """ + update_sm = None + try: + update_sm = setup(ui_, repo, params, stored_cfg) + request_uri, is_keypair = handle_key_inversion(ui_, update_sm, params, + stored_cfg) + + ui_.status("%sInsert URI:\n%s\n" % (is_redundant(params['INSERT_URI']), + params['INSERT_URI'])) + #ui_.status("Current tip: %s\n" % hex_version(repo)[:12]) + + update_sm.start_pushing(params['INSERT_URI'], + params.get('TO_VERSION', 'tip'), + request_uri, # None is allowed + is_keypair) + run_until_quiescent(update_sm, params['POLL_SECS']) + + if update_sm.get_state(QUIESCENT).arrived_from(((FINISHING,))): + ui_.status("Inserted to:\n%s\n" % + '\n'.join(update_sm.get_state(INSERTING_URI). + get_request_uris())) + else: + ui_.status("Push failed.\n") + + handle_updating_config(repo, update_sm, params, stored_cfg) + finally: + cleanup(update_sm) + +def execute_pull(ui_, repo, params, stored_cfg): + """ Run the pull command. """ + update_sm = None + try: + update_sm = setup(ui_, repo, params, stored_cfg) + ui_.status("%sRequest URI:\n%s\n" % (is_redundant(params[ + 'REQUEST_URI']), + params['REQUEST_URI'])) + #ui_.status("Current tip: %s\n" % hex_version(repo)[:12]) + + update_sm.start_pulling(params['REQUEST_URI']) + run_until_quiescent(update_sm, params['POLL_SECS']) + + if update_sm.get_state(QUIESCENT).arrived_from(((FINISHING,))): + ui_.status("Pulled from:\n%s\n" % + update_sm.get_state('REQUESTING_URI'). + get_latest_uri()) + ui_.status("New tip: %s\n" % hex_version(repo)[:12]) + else: + ui_.status("Pull failed.\n") + + handle_updating_config(repo, update_sm, params, stored_cfg, True) + finally: + cleanup(update_sm) + +def setup_tmp_dir(ui_, tmp): + """ INTERNAL: Setup the temp directory. """ + tmp = os.path.expanduser(tmp) + + # Create the tmp dir if nescessary. + if not os.path.exists(tmp): + try: + os.makedirs(tmp) + except os.error, err: + # Will exit below. + ui_.warn(err) + return tmp + +MSG_HGRC_SET = \ +"""Read the config file name from the: + +[infocalypse] +cfg_file = <filename> + +section of your .hgrc (or mercurial.ini) file. + +cfg_file: %s + +""" + +MSG_CFG_EXISTS = \ +"""%s already exists! +Move it out of the way if you really +want to re-run setup. + +Consider before deleting it. It may contain +the *only copy* of your private key. + +""" + +def execute_setup(ui_, host, port, tmp, cfg_file = None): + """ Run the setup command. """ + def connection_failure(msg): + """ INTERNAL: Display a warning string. """ + ui_.warn(msg) + ui_.warn("It looks like your FCP host or port might be wrong.\n") + ui_.warn("Set them with --fcphost and/or --fcpport and try again.\n") + + # Fix defaults. + if host == '': + host = '127.0.0.1' + if port == 0: + port = 9481 + + if cfg_file is None: + cfg_file = os.path.expanduser(DEFAULT_CFG_PATH) + + existing_name = ui_.config('infocalypse', 'cfg_file', None) + if not existing_name is None: + existing_name = os.path.expanduser(existing_name) + ui_.status(MSG_HGRC_SET % existing_name) + cfg_file = existing_name + + if os.path.exists(cfg_file): + ui_.status(MSG_CFG_EXISTS % cfg_file) + raise util.Abort("Refusing to modify existing configuration.") + + tmp = setup_tmp_dir(ui_, tmp) + + if not is_writable(tmp): + ui_.warn("Can't write to temp dir: %s\n" % tmp) + return + + # Test FCP connection. + timeout_secs = 20 + connection = None + default_private_key = None + try: + ui_.status("Testing FCP connection [%s:%i]...\n" % (host, port)) + + connection = FCPConnection(PolledSocket(host, port)) + + started = time.time() + while (not connection.is_connected() and + time.time() - started < timeout_secs): + connection.socket.poll() + time.sleep(.25) + + if not connection.is_connected(): + connection_failure(("\nGave up after waiting %i secs for an " + + "FCP NodeHello.\n") % timeout_secs) + return + + ui_.status("Looks good.\nGenerating a default private key...\n") + + # Hmmm... this waits on a socket. Will an ioerror cause an abort? + # Lazy, but I've never seen this call fail except for IO reasons. + client = FCPClient(connection) + client.message_callback = lambda x, y:None # Disable chatty default. + default_private_key = client.generate_ssk()[1]['InsertURI'] + + except FCPError: + # Protocol error. + connection_failure("\nMaybe that's not an FCP server?\n") + return + + except socket.error: # Not an IOError until 2.6. + # Horked. + connection_failure("\nSocket level error.\n") + return + + except IOError: + # Horked. + connection_failure("\nSocket level error.\n") + return + + cfg = Config() + cfg.defaults['HOST'] = host + cfg.defaults['PORT'] = port + cfg.defaults['TMP_DIR'] = tmp + cfg.defaults['default_private_key'] = default_private_key + Config.to_file(cfg, cfg_file) + + ui_.status("""\nFinished setting configuration. +FCP host: %s +FCP port: %i +Temp dir: %s +cfg file: %s + +Default private key: +%s + +""" % (host, port, tmp, cfg_file, default_private_key)) + diff --git a/infocalypse/insertingbundles.py b/infocalypse/insertingbundles.py new file mode 100644 --- /dev/null +++ b/infocalypse/insertingbundles.py @@ -0,0 +1,198 @@ +""" A RequestQueueState which inserts hg bundles corresponding to + edges in the Infocalypse update graph into Freenet. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +from graph import graph_to_string, UpToDate, INSERT_SALTED_METADATA, \ + FREENET_BLOCK_LEN + +from statemachine import RequestQueueState + +# REDFLAG: duplicated to get around circular deps. +INSERTING_GRAPH = 'INSERTING_GRAPH' +FAILING = 'FAILING' +CANCELING = 'CANCELING' +QUIESCENT = 'QUIESCENT' + +# Hmmmm... hard coded exit states. +class InsertingBundles(RequestQueueState): + """ A state to insert hg bundles corresponding to the edges in an + Infocalypse update graph into Freenet. """ + def __init__(self, parent, name): + RequestQueueState.__init__(self, parent, name) + + # edge -> StatefulRequest + self.pending = {} + self.new_edges = [] + self.required_edges = [] + # HACK: + # edge -> (x,y, 0) Freenet metadata bytes + self.salting_cache = {} + + def enter(self, dummy): + """ Implementation of State virtual. + + This checks the graph against the local repository and + adds edges required to update it to the TARGET_VERSION + specified in the context object. Then it starts inserting + CHKS for the new edges into Freenet, doing padding / + metadata salting as required. + + """ + #require_state(from_state, QUIESCENT) + assert not self.parent.ctx['INSERT_URI'] is None + assert not self.parent.ctx.graph is None + + graph = self.parent.ctx.graph.clone() + if self.parent.params.get('DUMP_GRAPH', False): + self.parent.ctx.ui_.status("--- Initial Graph ---\n") + self.parent.ctx.ui_.status(graph_to_string(graph) +'\n') + + # Update graph. + try: + self.new_edges = graph.update(self.parent.ctx.repo, + self.parent.ctx.ui_, + self.parent.ctx['TARGET_VERSION'], + self.parent.ctx.bundle_cache) + except UpToDate, err: + # REDFLAG: Later, add FORCE_INSERT parameter? + self.parent.ctx.ui_.warn(str(err) + '\n') # Hmmm + self.parent.transition(FAILING) # Hmmm... hard coded state name + return + + text = '' + for edge in self.new_edges: + text += "%i:%s\n" % (graph.get_length(edge), str(edge)) + if len(text) > 0: + self.parent.ctx.ui_.status('Adding new bundles:\n' + text) + + #print "--- Updated Graph ---" + #print graph_to_string(graph) + #print "--- Minimal Graph ---" + #print graph_to_string(minimal_update_graph(graph,31 * 1024, + # graph_to_string)) + #print "---" + #dump_top_key_tuple((('CHK@', 'CHK@'), + # get_top_key_updates(graph))) + + if len(self.new_edges) == 0: + raise Exception("Up to date") + + self.parent.ctx.graph = graph + + # Edge CHKs required to do metadata salting. + # Most of these probably won't exist yet. + self.required_edges = [] + for edge in self.new_edges: + assert edge[2] <= 1 + if graph.insert_type(edge) == INSERT_SALTED_METADATA: + # Have to wait for the primary insert to finish. + self.required_edges.append((edge[0], edge[1], 0)) + + for edge in self.required_edges: + # Will be re-added when the required metadata arrives. + self.new_edges.remove((edge[0], edge[1], 1)) + + # REDFLAG: no longer needed? + def leave(self, dummy): + """ Implementation of State virtual. """ + # Hmmm... + for request in self.pending.values(): + self.parent.runner.cancel_request(request) + + def reset(self): + """ Implementation of State virtual. """ + self.new_edges = [] + self.required_edges = [] + self.salting_cache = {} + RequestQueueState.reset(self) + + def next_runnable(self): + """ Implementation of RequestQueueState virtual. """ + for edge in self.required_edges: + if edge in self.pending: + # Already running. + continue + if not self.parent.ctx.graph.has_chk(edge): + # Depends on an edge which hasn't been inserted yet. + continue + + assert not edge in self.pending + request = self.parent.ctx.make_splitfile_metadata_request(edge, + edge) + self.pending[edge] = request + return request + + if len(self.new_edges) == 0: + return None + + edge = self.new_edges.pop() + request = self.parent.ctx.make_edge_insert_request(edge, edge, + self.salting_cache) + self.pending[edge] = request + return request + + def request_done(self, client, msg): + """ Implementation of RequestQueueState virtual. """ + #print "TAG: ", client.tag + assert client.tag in self.pending + edge = client.tag + del self.pending[edge] + if msg[0] == 'AllData': + self.salting_cache[client.tag] = msg[2] + + # Queue insert request now that the required data is cached. + if edge in self.required_edges: + assert edge[2] == 0 + self.required_edges.remove(edge) + self.parent.ctx.ui_.status("Re-adding put request for salted " + + "metadata: %s\n" + % str((edge[0], edge[1], 1))) + self.new_edges.append((edge[0], edge[1], 1)) + elif msg[0] == 'PutSuccessful': + chk1 = msg[1]['URI'] + graph = self.parent.ctx.graph + if edge[2] == 1 and graph.insert_length(edge) > FREENET_BLOCK_LEN: + # HACK HACK HACK + # TRICKY: + # Scrape the control bytes from the full request + # to enable metadata handling. + # REDFLAG: Do better? + chk0 = graph.get_chk((edge[0], edge[1], 0)) + chk0_fields = chk0.split(',') + chk1_fields = chk1.split(',') + #print "FIELDS: ", chk0_fields, chk1_fields + # Hmmm... also no file names. + assert len(chk0_fields) == len(chk1_fields) + chk1 = ','.join(chk1_fields[:-1] + chk0_fields[-1:]) + graph.set_chk(edge[:2], edge[2], + graph.get_length(edge), + chk1) + else: + # REDFLAG: retrying? + # REDFLAG: More failure information, FAILING state? + self.parent.transition(FAILING) + return + + if (len(self.pending) == 0 and + len(self.new_edges) == 0 and + len(self.required_edges) == 0): + self.parent.transition(INSERTING_GRAPH) + diff --git a/infocalypse/repos.py b/infocalypse/repos.py new file mode 100644 --- /dev/null +++ b/infocalypse/repos.py @@ -0,0 +1,50 @@ +# REDFLAG: remove this file. +def get_hgsvn_freenet_versions(): + ret = ['bd3a01e5371613ca6c9ed2821fe1371a0efd5ff9', + 'e78bb38e51eef45cff9dfda6a8da2fdf391ae81c', + '2be10feac043901dd8c589cbd08f71acafd3c0b6', + '2663315175761de43872661a73708f5823cc7a7b', + '05d094d05a44fb5df9d639f9aaaf1af394a813dd', + '99ee4cab38c1e8b3a199d8b9a81aee9f1f9ec4dc', + '53b5a48c24d5e3a809cb1730b8e5be1819108d40', + '11832fa81ff4753ebfae15e35f1ae3742ff0a363', + '2ff5cb79fe2f02ae48debefec82cc7a6b4aff5c1', + '458e0e493d7c3e1ee91b0f19830c48fa5f45adbd', + '5bf836b7ad576220a6abc9c6193ec61ecc55dbca', + 'a7fc31fff2e7229be9652467e26de61638894a72', + '21d5d2bfebe975a673b50e9b1d5b498b07cedf6b', + '2d3ae0e196d35649ea66ac722d7a4c1b6e7bf79e', + 'ddba7cd416bbe5c2d14d21fb4e59671b1c5c914b', + '3f35793ef58b6307c5e9648cfc066c03b7102b94', + '9ce9c854c84f4068795f1032f39d79930d3d9ae3', + '4e44dce081636a3687012062d25dd6e789117490', + 'b51b681c202dbf71de5a91aea364e3a82068d0cd', + '0ab120233104e5361c42ce55707a4ca68dee9c3a', + '85d046dc19bfba1c348d63f7b505dd7526e378a0', + '03421f6f6db235af815a680eb0e65926cfb166a4', + '790c66ca6088050e8fb99999a1f021106dc0f9ef', + '32b6241381b8cc0124eafa465abdd852baff59c6', + 'aeec38e399b05aaa65289122e0bc0729aaae479d', + '4687535f446c679b00f30bd5cc638d1d29338b2b', + '2f9cfac30468ca9c997c3b7b4b70f622d0af63a4', + '672cad78757137e05ae7a72fa242a5e5ca1e0e9e', + '639150e10e857a4c4a3ead9a5f4180b5a7f54b27', + 'd36888219ca2c6ec37d88ba735ba3a09030145e8', + 'd146bd503fdf4486771d9fc6ffbd5217fa659a72', + 'c2b63e0a906ce201eee54393482cd6695d4f4966', + '41bbdec90ce72234ff4099a774ea923631f9f160', + '7c05a1891230a4ffb05266a5db6b92cac6d89ead', + '55d7c4cbeea1f862f95123177a87192b738829c5', + '1528f4975390958aaaa6433222d9dd931d5180ff', + 'dae69276ab02fcb450d1ee38adb7bfc725f3f518', + 'cbc3458336dce27cab86b89393dcc6c9338f1873', + 'd73b4ca8d1bbdd8a4d690e0d673826faf8e298ac', + 'f97669b0b12381a2c2059d10dd4e27249ca63d47', + '3419451e7d80c65ad2ad4d4ce79e927005f62c70', + 'df27a449247d68b998080a64d69d26631b2d8f77', + 'd801c8d5c10a23737bf10a87c7d21b595f6f188b', + '63f1cdcb5b554954c8cf790a99609f4c95a5ecbc', + 'f261dd930bbb6287fe4237751b41ec670eea049e', + 'd3868724c2d811d561eeab7fa8cbf91864ad3d05'] + ret.reverse() + return ret diff --git a/infocalypse/requestingbundles.py b/infocalypse/requestingbundles.py new file mode 100644 --- /dev/null +++ b/infocalypse/requestingbundles.py @@ -0,0 +1,897 @@ +""" A RequestQueueState for which requests the hg bundle CHKS + required to update a repository. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +# REDFLAG: reevaluate on failure? + +import os +import random # Hmmm... good enough? + +from fcpmessage import GET_DEF + +from bundlecache import make_temp_file +from graph import parse_graph, latest_index, \ + FREENET_BLOCK_LEN, chk_to_edge_triple_map, \ + dump_paths, MAX_PATH_LEN + +from choose import get_update_edges, dump_update_edges + +from statemachine import RetryingRequestList, CandidateRequest + +#from topkey import dump_top_key_tuple +from chk import clear_control_bytes + +# REDFLAG: Make sure that you are copying lists. eg. updates +# FUNCTIONAL REQUIREMENTS: +# 0) Update as fast as possible +# 1) Single block fetch alternate keys. +# 2) Choose between primary and alternate keys "fairly" +# 3) transition from no graph to graph case. +# 4) deal with padding hack +# 5) Optionally disable alternate single block fetching? +# ?6) serialize? Easier to write from scratch? + +# No graph case +# CHKs ARE EDGES! dont need to store extra state +# can bootstrap from a chk_list -> edges (slowly) +# +# Can unwind padding hack w/o graph +# len < 32K done +# len == 32K re-request DONE +# +# REDFLAG: get rid of pending_candidates by subclassing StatefulRequest +# to include a .candidate attribute. ??? + +def build_salting_table(target): + """ INTERNAL: Build table used to keep track of metadata salting. """ + def traverse_candidates(candidate_list, table): + """ INTERNAL: Helper function to traverse a single candidate list. """ + for candidate in candidate_list: + if candidate[6]: + continue + edge = candidate[3] + value = table.get(edge, []) + value.append(candidate[2]) + table[edge] = value + ret = {} + traverse_candidates(target.pending_candidates(), ret) + traverse_candidates(target.current_candidates, ret) + traverse_candidates(target.next_candidates, ret) + return ret + +# REDFLAG: get rid of unused methods. +# Hmmm... feels like coding my way out of a design problem. +class SaltingState: + """ INTERNAL: Helper class to keep track of metadata salting state. + """ + def __init__(self, target): + self.table = build_salting_table(target) + + def full_request(self, edge): + """ Return True if a full request is scheduled for the edge. """ + if not edge in self.table: + return False + + for value in self.table[edge]: + if not value: + return True + return False + + def add(self, edge, is_partial): + """ Add an entry to the table. """ + value = self.table.get(edge, []) + value.append(is_partial) + self.table[edge] = value + + def needs_full_request(self, graph, edge): + """ Returns True if a full request is required. """ + assert len(edge) == 3 + if not graph.is_redundant(edge): + return False + return not (self.full_request(edge) or + self.full_request((edge[0], edge[1], int(not edge[2])))) +# What this does: +# 0) Fetches graph(s) +# 1) Fetches early bundles in parallel with graphs +# 2) Fixes up pending requests to graph edges when the graph arrives +# 3) Handles metadata salting for bundle requests +# 4) Keeps track of what requests are required to update and request them. + +# a candidate is a list: +# [CHK, tries, single_block, edge_triple, update_data, msg, is_graph_request] +class RequestingBundles(RetryingRequestList): + """ A RequestQueueState for which requests the hg bundle CHKS + required to update a repository. """ + + def __init__(self, parent, name, success_state, failure_state): + RetryingRequestList.__init__(self, parent, name) + self.success_state = success_state + self.failure_state = failure_state + self.top_key_tuple = None # FNA sskdata + + ############################################################ + # State implementation + ############################################################ + def enter(self, from_state): + """ Implementation of State virtual. """ + if hasattr(from_state, 'get_top_key_tuple'): + self._initialize(from_state.get_top_key_tuple()) + return + + self._initialize() + #self.dump() + + # REDFLAG: delete this? + def leave(self, to_state): + """ Implementation of State virtual. """ + pass + + def reset(self): + """ Implementation of State virtual. """ + #print "reset -- pending: ", len(self.pending) + self.top_key_tuple = None + RetryingRequestList.reset(self) + + ############################################################ + # Implementation of RetryingRequestList virtuals + ############################################################ + def candidate_done(self, client, msg, candidate): + """ Implementation of RetryingRequestList virtual. """ + # Hmmmm... special case hack code to handle graph. + if not self._graph_request_done(client, msg, candidate): + if msg[0] == 'AllData': + self._handle_success(client, msg, candidate) + else: + self._handle_failure(client, msg, candidate) + + # DONT add to pending. Base clase does that. + def make_request(self, candidate): + """ Implementation of RetryingRequestList virtual. """ + #print "CANDIDATE: ", candidate + assert len(candidate) >= 7 + candidate[1] += 1 # Keep track of the number of times it has been tried + # tag == edge, but what if we don't have an edge yet? + request = CandidateRequest(self.parent) + request.in_params.fcp_params = self.parent.params.copy() + + uri = candidate[0] + if candidate[2]: + uri = clear_control_bytes(uri) + request.in_params.fcp_params['URI'] = uri + + request.in_params.definition = GET_DEF + request.in_params.file_name = ( + make_temp_file(self.parent.ctx.bundle_cache.base_dir)) + self.parent.ctx.set_cancel_time(request) + + # Set tag + if not candidate[3] is None: + request.tag = candidate[3] # Edge + else: + # REDFLAG: Do better! + # Some random digit string. + request.tag = request.in_params.file_name[-12:] + + # Set candidate + request.candidate = candidate + + #print "make_request --", request.tag, candidate[0] + # Tags must be unique or we will loose requests! + assert not request.tag in self.pending + + #request.in_params.fcp_params['MaxSize'] = ??? + + return request + + ############################################################ + + # REDFLAG: deal with optional request serialization? + # REDFLAG: Move + # ASSUMPTION: Keys are in descenting order of latest_rev. + # ASSUMPTION: Keys are in order of descending parent rev. + # + # Returns index of last update queued. + # Does gobbledygook to make single block requesting work. + # + # REDFLAG: candymachine? look at start_index / last_queued + def _queue_from_updates(self, candidate_list, + start_index, one_full, only_latest=False): + """ INTERNAL: Queues an hg bundle CHK request from the + top key data. """ + updates = self.top_key_tuple[1] + + last_queued = -1 + for index, update in enumerate(updates): + if index < start_index: + # REDFLAG: do better? + continue + + if only_latest and update[0] > 5 * FREENET_BLOCK_LEN: + # Short circuit (-1, top_index) rollup case. + break + + if only_latest and update[2] != updates[0][2]: + # Only full updates. + break + + if not self.parent.ctx.has_version(update[1]): + # Only updates we can pull. + if only_latest: + # Don't want big bundles from the canonical path. + break + else: + continue + + if self.parent.ctx.has_version(update[2]): + # Only updates we need. + continue + + chks = list(update[3][:]) + full_chk = random.choice(chks) + chks.remove(full_chk) + candidate = [full_chk, 0, not one_full, None, None, None, False] + one_full = True + candidate_list.append(candidate) + + for chk in chks: + candidate = [chk, 0, True, None, None, None, False] + candidate_list.append(candidate) + last_queued = index + if index > 1: + break + + return last_queued + + # Hack special case code to add the graph. + def _initialize(self, top_key_tuple=None): + """ INTERNAL: Initialize. + + If the graph isn't available yet kick off + requests for it and also a request for a full + update if there's one available in the top key data. + + If the graph is available, use it to determine which + keys to request next. + """ + self.top_key_tuple = top_key_tuple + + ############################################################ + # Hack used to test graph request failure. + #bad_chk = ('CHK@badroutingkeyA55JblbGup0yNSpoDJgVPnL8E5WXoc,' + # +'KZ6azHOwEm4ga6dLy6UfbdSzVhJEz3OvIbSS4o5BMKU,AAIC--8') + #bad_update = list(top_key_tuple[1][0]) + #bad_update[3] = (bad_chk, ) + #print "old:", top_key_tuple + + #self.top_key_tuple = ((bad_chk,), (bad_update, )) + #print "new:", self.top_key_tuple + ############################################################ + + # If we don't have the graph, request it, and update + # from the data in the top key. + if self.parent.ctx.graph is None: + if self.top_key_tuple is None: + raise Exception("No top key data.") + + #dump_top_key_tuple(top_key_tuple) + + updates = self.top_key_tuple[1] + + if self.parent.ctx.has_version(updates[0][2]): + self.parent.ctx.ui_.warn(("Version: %s is already in the " + + "local repo.\n") + % updates[0][2][:12]) + self.parent.transition(self.success_state) + return + + # INTENT: Improve throughput for most common update case. + # If it is possible to update fully in one fetch, queue the + # (possibly redundant) key(s) BEFORE graph requests. + latest_queued = self._queue_from_updates(self.current_candidates, + -1, False, True) + + if latest_queued != -1: + self.parent.ctx.ui_.status("Full update is possible in a " + + "single FCP fetch. :-)\n") + + # Kick off the fetch(es) for the full update graph. + # REDFLAG: make a parameter + parallel_graph_fetch = True + chks = list(self.top_key_tuple[0][:]) + random.shuffle(chks) + for chk in self.top_key_tuple[0]: + candidate = [chk, 0, False, None, None, None, True] + self.current_candidates.append(candidate) + if not parallel_graph_fetch: + break + + # Queue remaining fetchable keys in the NEXT pass. + # INTENT: + # The graph might have a better update path. So we don't try these + # until we have tried to get the graph. + self._queue_from_updates(self.next_candidates, latest_queued + 1, + latest_queued == -1, False) + return + + # Otherwise, use the graph to figure out what keys we need. + self._reevaluate() + + # REDFLAG: Move + # Set the graph and fixup all candidates with real edges. + def _set_graph(self, graph): + """ INTERNAL: Set the graph and fixup any pending CHK edge + requests with their edges. """ + + def fixup(edges, candidate_list): + """ INTERNAL : Helper fixes up CHK->edges. """ + for candidate in candidate_list: + edge = edges[candidate[0]] + candidate[3] = edge + candidate[4] = None + + edges = chk_to_edge_triple_map(graph) + + skip_chks = set([]) # REDFLAG: remove! + for request in self.pending.values(): + candidate = request.candidate + if candidate[6]: + continue + edge = edges[candidate[0]] + candidate[3] = edge + candidate[4] = None + #print "_set_graph -- fixed up: ", request.tag, edge + # REDFLAG: why am I keeping state in two places? + old_tag = request.tag + request.tag = edge + del self.pending[old_tag] + self.pending[request.tag] = request + skip_chks.add(candidate[0]) + + #print "pending.keys(): ", self.pending.keys() + + fixup(edges, self.current_candidates) + fixup(edges, self.next_candidates) + fixup(edges, self.finished_candidates) + + self.parent.ctx.graph = graph + + self.rep_invariant() + + # REDFLAG: remove testing code + #kill_prob = 0.00 + #print "BREAKING EDGES: Pkill==", kill_prob + #print skip_chks + #break_edges(graph, kill_prob, skip_chks) + + # "fix" (i.e. break) pending good chks. + for candidate in self.current_candidates + self.next_candidates: + if candidate[6]: + continue + edge = candidate[3] + assert not edge is None + if graph.get_chk(edge).find("badrouting") != -1: + candidate[0] = graph.get_chk(edge) + + #self.dump() + self.rep_invariant() + + self.parent.ctx.ui_.status("Got graph. Latest graph index: %i\n" % + graph.latest_index) + + def _handle_graph_failure(self, candidate): + """ INTERNAL: Handle failed FCP requests for the graph. """ + max_retries = self.parent.params.get('MAX_RETRIES', 1) + if candidate[1] < max_retries + 1: + #print "_should_retry -- returned False" + #return False + self.current_candidates.append(candidate) + return + + self.finished_candidates.append(candidate) + if self.is_stalled(): + self.parent.ctx.ui_.warn("Couldn't read graph from Freenet!\n") + self.parent.transition(self.failure_state) + + def _graph_request_done(self, client, msg, candidate): + """ INTERNAL: Handle requests for the graph. """ + #print "CANDIDATE:", candidate + #print "_graph_request_done -- ", candidate[6] + if not candidate[6]: + return False + + if not self.parent.ctx.graph is None: + self.finished_candidates.append(candidate) + return True + + if msg[0] == 'AllData': + in_file = open(client.in_params.file_name, 'rb') + try: + data = in_file.read() + # REDFLAG: log graph? + if self.parent.params.get('DUMP_GRAPH', False): + self.parent.ctx.ui_.status("--- Raw Graph Data ---\n") + self.parent.ctx.ui_.status(data) + self.parent.ctx.ui_.status("\n---\n") + graph = parse_graph(data) + if self.parent.params.get('DUMP_CANONICAL_PATHS', False): + dump_paths(graph, + graph.canonical_paths(graph.latest_index, + MAX_PATH_LEN), + "Canonical paths") + self._set_graph(graph) + self._reevaluate() + finally: + in_file.close() + self.finished_candidates.append(candidate) + else: + if not self.top_key_tuple is None: + pending, current, next, finished = self._known_chks() + all_chks = pending.union(current).union(next).union(finished) + + for chk in self.top_key_tuple[0]: + if not chk in all_chks and chk != candidate[0]: + # REDFLAG: Test this code path. + # Queue the other graph chk. + candidate = [chk, 0, False, None, None, None, True] + # Run next! + #print "QUEUEING OTHER GRAPH CHK" + self.current_candidates.append(candidate) + break + + + # Careful, can drive state transition. + self._handle_graph_failure(candidate) + return True + + # REDFLAG: move + # REDFLAG: Too slow? + def _force_single_block(self, edge): + """ INTERNAL: Make sure there is only one non-single-block request + running for a redundant edge. """ + for candidate in self.current_candidates: + if candidate[3] == edge and not candidate[2]: + candidate[2] = True + # break. paranoia? + + for candidate in self.next_candidates: + if candidate[3] == edge and not candidate[2]: + candidate[2] = True + # break. paranoia? + + def _handle_success(self, client, msg, candidate): + """ INTERNAL: Handle successful FCP requests. """ + #print "_handle_success -- ", candidate + if not self._needs_bundle(candidate): + #print "_handle_success -- doesn't need bundle." + candidate[5] = msg + self.finished_candidates.append(candidate) + return + if (candidate[2] and + self._multiple_block(candidate)): + #print "_handle_success -- multiple block..." + # Cases: + # 0) No redundant edge exists, -> requeue + # 1) Redundant edge request running, single block -> requeue + # 2) Redundant edge request running, full -> finish + # 3) Redundant edge request queued, full -> flip to single_block + # 4) Redundant edge request queued, single_block -> nop + edge = candidate[3] + redundant_edge = (edge[0], edge[1], int(not edge[2])) + if (not self.parent.ctx.graph is None and + self.parent.ctx.graph.is_redundant(edge)): + for value in self.pending_candidates(): + if (value[3] == redundant_edge and + not value[2]): + # Bail out because there's already a request for that + # data running. + candidate[5] = msg + # Make sure the candidate will re-run if the running + # request fails. + candidate[1] = 0 + self.next_candidates.append(candidate) + #print "_handle_success -- already another running." + self.parent.ctx.ui_.status(("Other salted key is " + + "running. Didn't " + + "requeue: %s\n") + % str(candidate[3])) + return + self.parent.ctx.ui_.status("Requeuing full download for: %s\n" + % str(candidate[3])) + # Reset the CHK because the control bytes were zorched. + candidate[0] = self.parent.ctx.graph.get_chk(candidate[3]) + candidate[1] += 1 + candidate[2] = False + candidate[5] = None # Reset! + self.rep_invariant() + self.current_candidates.insert(0, candidate) + self._force_single_block(redundant_edge) + self.rep_invariant() + return + + #print "_handle_success -- bottom" + candidate[5] = msg + self.finished_candidates.append(candidate) + #print "_handle_success -- pulling!" + self._pull_bundle(client, msg, candidate) + #print "_handle_success -- pulled bundle ", candidate[3] + self.parent.ctx.ui_.status("Pulled bundle: %s\n" % str(candidate[3])) + graph = self.parent.ctx.graph + if graph is None: + latest_version = self.top_key_tuple[1][2] + else: + latest_version = graph.index_table[graph.latest_index][1] + + if self.parent.ctx.has_version(latest_version): + # Done and done! + self.parent.transition(self.success_state) + return + + #print "_reevaluate -- called" + self._reevaluate() + #print "_reevaluate -- exited" + + # REDFLAG: move + def _should_retry(self, candidate): + """ Return True if the FCP request for the candidate should + be retried, False otherwise. """ + max_retries = self.parent.params.get('MAX_RETRIES', 0) + if candidate[1] - 1 >= max_retries: + #print "_should_retry -- returned False" + return False + if not self._needs_bundle(candidate) and not candidate[6]: + return False + return True + + def _queued_redundant_edge(self, candidate): + """ INTERNAL: Return True if a redundant request was queued for + the candidate. """ + edge = candidate[3] + if edge is None or candidate[6]: + return False + + if not self.parent.ctx.graph.is_redundant(edge): + return False + + pending, current, next, finished = self._known_edges() + # Must include finished! REDFLAG: re-examine other cases. + all_edges = pending.union(current).union(next).union(finished) + alternate_edge = (edge[0], edge[1], int(not edge[2])) + if alternate_edge in all_edges: + return False + + self.parent.ctx.ui_.status("Queueing redundant edge: %s\n" + % str(alternate_edge)) + + # Order is important because this changes SaltingState. + self.next_candidates.append(candidate) + self._queue_candidate(self.next_candidates, alternate_edge, + not SaltingState(self).needs_full_request( + self.parent.ctx.graph, alternate_edge)) + return True + + def _handle_failure(self, dummy, msg, candidate): + """ INTERNAL: Handle FCP request failure for a candidate. """ + if not self._needs_bundle(candidate): + #print "_handle_failure -- doesn't need bundle." + candidate[5] = msg + self.finished_candidates.append(candidate) + return + #print "_handle_failure -- ", candidate + if self._should_retry(candidate): + #print "_handle_failure -- retrying..." + # Order important. Allow should_retry to see previous msg. + candidate[5] = msg + if not self._queued_redundant_edge(candidate): + self.next_candidates.append(candidate) + else: + #print "_handle_failure -- abandoning..." + candidate[5] = msg + # Thought about adding code to queue redundant salted request here, + # but it doesn't make sense. + self.finished_candidates.append(candidate) + + if self.is_stalled(): + self.parent.ctx.ui_.warn("Too many failures. Gave up :-(\n") + self.parent.transition(self.failure_state) + + def _multiple_block(self, candidate): + """ INTERNAL: Return True if the candidate's FCP request is + more than one block. """ + graph = self.parent.ctx.graph + if not graph is None: + step = candidate[3] + # Should have been fixed up when we got the graph. + assert not step is None + return graph.insert_length(step) > FREENET_BLOCK_LEN + + # BUG: returns True for length == 32k w/o padding hack. + # Ugly but benign. Just causes an unnesc. re-fetch. Happens rarely. + return candidate[4][0] >= FREENET_BLOCK_LEN + + # REDFLAG: Returns false for bundles you can't pull. CANT PREFETCH? + # False if parent rev not available. + def _needs_bundle(self, candidate): + """ INTERNAL: Returns True if the hg bundle for the candidate's edge + could be pulled and contains changes that we don't already have. """ + versions = self._get_versions(candidate) + #print "_needs_bundle -- ", versions + if not self.parent.ctx.has_version(versions[0]): + #print "Doesn't have parent ", versions + return False # Doesn't have parent. + + return not self.parent.ctx.has_version(versions[1]) + + # REDFLAGE: remove msg arg? + def _pull_bundle(self, client, dummy_msg, candidate): + """ INTERNAL: Pull the candidates bundle from the file in + the client param. """ + length = os.path.getsize(client.in_params.file_name) + if not candidate[3] is None: + expected_length = self.parent.ctx.graph.get_length(candidate[3]) + else: + expected_length = candidate[4][0] + + #print "expected_length: ", expected_length + #print "length : ", length + # Unwind padding hack. grrrr... ugly. + assert length >= expected_length + if length != expected_length: + out_file = open(client.in_params.file_name, 'ab') + try: + out_file.truncate(expected_length) + finally: + out_file.close() + assert (os.path.getsize(client.in_params.file_name) + == expected_length) + + self.parent.ctx.pull(client.in_params.file_name) + + def _reevaluate_without_graph(self): + """ Decide which additional edges to request using the top key data + only. """ + # Use chks since we don't have access to edges. + pending, current, next, finished = self._known_chks() + all_chks = pending + current + next + finished + + for update in self.top_key_tuple[1]: + # REDFLAG: bug? Hmmm.. can't remember why I marked this bug. + if not self.parent.ctx.needs_bundle(update[1], update[2]): + continue + new_chks = [] + for chk in update[3]: + if not chk in all_chks: + new_chks.append(chk) + + if len(new_chks) == 0: + continue + + full_request_chk = random.choice(new_chks) + new_chks.remove(full_request_chk) + candidate = [full_request_chk, 0, False, None, + update, None, False] + self.current_candidates.insert(0, candidate) + for chk in new_chks: + candidate = [chk, 0, True, None, update, None, False] + self.current_candidates.insert(0, candidate) + + # NOT CHEAP! + def _reevaluate(self): + """ Queue addition edge requests if necessary. """ + #print "_reevaluate -- called." + self._remove_old_candidates() + graph = self.parent.ctx.graph + + if graph is None: + self._reevaluate_without_graph() + return + + # REDFLAG: make parameters + redundancy = 4 + + # Query graph for current index. + index = latest_index(graph, self.parent.ctx.repo) + + # REDFLAG: remove debugging code + #latest = min(index + 1, graph.latest_index) + #dump_paths(graph, graph.enumerate_update_paths(index + 1, + # latest, + # MAX_PATH_LEN * 2), + # "All paths %i -> %i" % (index + 1, latest)) + + # Find all extant edges. + pending, current, next, finished = self._known_edges() + all_edges = pending.union(current).union(next).union(finished) + #print "sets:", pending, current, next, finished + #print "finished_candidates: ", self.finished_candidates + if None in all_edges: + all_edges.remove(None) + + assert not None in all_edges + + # Find the edges we need to update. + first, second = get_update_edges(graph, index, redundancy, True, + all_edges) + + if self.parent.params.get('DUMP_UPDATE_EDGES', False): + dump_update_edges(first, second, all_edges) + + assert not None in first + assert not None in second + assert len(set(first)) == len(first) + assert len(set(second)) == len(second) + assert len(set(first).intersection(all_edges)) == 0 + assert len(set(second).intersection(all_edges)) == 0 + + self.rep_invariant() + #self.dump() + # first.reverse() ? + + salting = SaltingState(self) + + #print "FIRST: ", first + for edge in first: + assert not edge is None + #print "EDGE:", edge + full = salting.needs_full_request(graph, edge) + self._queue_candidate(self.current_candidates, edge, not full) + salting.add(edge, not full) + self.rep_invariant() + + # second.reverse() ? + #print "SECOND: ", second + for edge in second: + full = salting.needs_full_request(graph, edge) + self._queue_candidate(self.next_candidates, edge, not full) + salting.add(edge, not full) + + self.rep_invariant() + + def _queue_candidate(self, candidate_list, edge, single_block=False): + """ INTERNAL: Queue a request for a single candidate. """ + #print "queue_candidate -- called ", edge, single_block + assert not edge is None + + chk = self.parent.ctx.graph.get_chk(edge) + candidate = [chk, + 0, single_block, edge, None, None, False] + + candidate_list.insert(0, candidate) + + def _remove_old_candidates(self): + """ INTERNAL: Remove requests for candidates which are no longer + required. """ + #print "_remove_old_candidates -- called" + # Cancel pending requests which are no longer required. + for client in self.pending.values(): + candidate = client.candidate + if candidate[6]: + continue # Skip graph requests. + versions = self._get_versions(candidate) + if self.parent.ctx.has_version(versions[1]): + self.parent.runner.cancel_request(client) + + # "finish" requests which are no longer required. + victims = [] + for candidate in self.current_candidates: + versions = self._get_versions(candidate) + if self.parent.ctx.has_version(versions[1]): + victims.append(candidate) + for victim in victims: + self.current_candidates.remove(victim) + self.finished_candidates += victims + + # REDFLAG: C&P + victims = [] + for candidate in self.next_candidates: + versions = self._get_versions(candidate) + if self.parent.ctx.has_version(versions[1]): + victims.append(candidate) + for victim in victims: + self.next_candidates.remove(victim) + + self.finished_candidates += victims + + def _get_versions(self, candidate): + """ Return the mercurial 40 digit hex version strings for the + parent version and latest version of the candidate's edge. """ + assert not candidate[6] # graph request! + graph = self.parent.ctx.graph + if graph is None: + update_data = candidate[4] + assert not update_data is None + #print "_get_versions -- (no edge) ", update_data[1], update_data[2] + return(update_data[1], update_data[2]) + + # Should have been fixed up when we got the graph. + step = candidate[3] + #print "CANDIDATE: ", candidate + assert not step is None + + #print "_get_versions -- ", step, graph.index_table[step[0]][1], \ + # graph.index_table[step[1]][2] + return (graph.index_table[step[0]][1], + graph.index_table[step[1]][1]) + + def _known_chks(self): + """ INTERNAL: Returns a tuple of sets off all CHKs which are + pending, currently scheduled, scheduled next or already + finished. """ + return (set([candidate[0] for candidate in + self.pending_candidates()]), + set([candidate[0] for candidate in self.current_candidates]), + set([candidate[0] for candidate in self.next_candidates]), + set([candidate[0] for candidate in self.finished_candidates])) + + # REDFLAG: need to fix these to skip graph special case candidates + # otherwise you get a None in the sets. + def _known_edges(self): + """ INTERNAL: Returns a tuple of sets off all edges which are + pending, currently scheduled, scheduled next or already + finished. """ + + return (set([candidate[3] for candidate in + self.pending_candidates()]), + set([candidate[3] for candidate in self.current_candidates]), + set([candidate[3] for candidate in self.next_candidates]), + set([candidate[3] for candidate in self.finished_candidates])) + + + ############################################################ + # Public helper functions for debugging + ############################################################ + + # Expensive, only for debugging. + def rep_invariant(self): + """ Debugging function to check the instance's invariants. """ + def count_edges(table, bad, candidate_list): + """ INTERNAL: Helper function to count edges. """ + for candidate in candidate_list: + if candidate[3] is None: + continue + count = table.get(candidate[3], 0) + edge_counts[candidate[3]] = count + 1 + if edge_counts[candidate[3]] > 1: + bad.add(candidate[3]) + + bad_counts = set([]) + edge_counts = {} + count_edges(edge_counts, bad_counts, self.current_candidates) + count_edges(edge_counts, bad_counts, self.next_candidates) + count_edges(edge_counts, bad_counts, self.pending_candidates()) + + if len(bad_counts) > 0: + print "MULTIPLE EDGES: ", bad_counts + self.dump() + assert False + + def dump(self): + """ Debugging function to dump the instance. """ + def print_list(msg, values): + """ INTERNAL: print a list of values. """ + self.parent.ctx.ui_.status(msg + '\n') + for value in values: + self.parent.ctx.ui_.status(" " + str(value) + '\n') + + self.parent.ctx.ui_.status("--- dumping state: " + self.name + '\n') + print_list("pending_candidates", self.pending_candidates()) + print_list("current_candidates", self.current_candidates) + print_list("next_candidates", self.next_candidates) + diff --git a/infocalypse/requestqueue.py b/infocalypse/requestqueue.py new file mode 100644 --- /dev/null +++ b/infocalypse/requestqueue.py @@ -0,0 +1,148 @@ +# REDFLAG: modified need to push changes back into main repo +""" This module contains classes for scheduling and running large numbers + of FCP requests. + + Copyright (C) 2008 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +import time + +from fcpconnection import MinimalClient + +class QueueableRequest(MinimalClient): + """ A request which can be queued in a RequestQueue and run + by a RequestRunner. + """ + def __init__(self, queue): + MinimalClient.__init__(self) + self.queue = queue + self.message_callback = None # set by RequestRunner + # The time after which this request should be canceled. + self.cancel_time_secs = None # RequestQueue.next_request() MUST set this + +class RequestRunner: + """ Class to run requests scheduled on one or more RequestQueues. """ + def __init__(self, connection, concurrent): + self.connection = connection + self.concurrent = concurrent + # request id -> client + self.running = {} + self.request_queues = [] + self.index = 0 + + def add_queue(self, request_queue): + """ Add a queue to the scheduler. """ + if not request_queue in self.request_queues: + self.request_queues.append(request_queue) + + def remove_queue(self, request_queue): + """ Remove a queue from the scheduler. """ + if request_queue in self.request_queues: + self.request_queues.remove(request_queue) + + def cancel_request(self, client): + """ Cancel a request. + + This is asynchronous. + """ + #print "CLIENT: ", client, type(client) + if type(client) == type(1): + raise Exception("Hack added to find bug: REDFLAG") + + self.connection.remove_request(client.request_id()) + # REDFLAG: BUG: fix to set cancel time in the past. + # fix kick to check cancel time before starting? + def kick(self): + """ Run the scheduler state machine. + + You MUST call this frequently. + """ + + if self.connection.is_uploading(): + # REDFLAG: Test this code path! + #print "kick -- bailed out, still UPLOADING..." + # Wait for upload to finish. + return + + # Cancel running requests which have timed out. + now = time.time() + for client in self.running.values(): + assert client.cancel_time_secs + if client.cancel_time_secs < now: + self.connection.remove_request(client.request_id()) + + # REDFLAG: test this code with multiple queues!!! + # Round robin schedule requests from queues + idle_queues = 0 + # Catch before uninsightful /0 error on the next line. + assert len(self.request_queues) > 0 + self.index = self.index % len(self.request_queues) # Paranoid + start_index = self.index + while (len(self.running) < self.concurrent + and idle_queues < len(self.request_queues) + and not self.connection.is_uploading()): + #print "IDLE_QUEUES:", idle_queues + if self.index == start_index: + idle_queues = 0 + client = self.request_queues[self.index].next_runnable() + #print "CLIENT:", client + if client: + #print "client.queue: ", client.queue + #print "running: ", client + #if 'URI' in client.in_params.fcp_params: + # print " ", client.in_params.fcp_params['URI'] + assert client.queue == self.request_queues[self.index] + client.in_params.async = True + client.message_callback = self.msg_callback + self.running[self.connection.start_request(client)] \ + = client + else: + idle_queues += 1 + self.index = (self.index + 1) % len(self.request_queues) + + def msg_callback(self, client, msg): + """ Route incoming FCP messages to the appropriate queues. """ + if client.is_finished(): + client.queue.request_done(client, msg) + #print "RUNNING:" + #print self.running + del self.running[client.request_id()] + self.kick() # haha + else: + client.queue.request_progress(client, msg) + + +class RequestQueue: + """ Abstract base class for request queues. """ + def __init__(self, runner): + self.runner = runner + + def next_runnable(self): + """ Return a MinimalClient instance for the next request to + be run or None if none is available. """ + pass + + def request_progress(self, client, msg): + """ Handle non-terminal FCP messages for running requests. """ + pass + + def request_done(self, client, msg): + """ Handle terminal FCP messages for running requests. """ + pass + diff --git a/infocalypse/statemachine.py b/infocalypse/statemachine.py new file mode 100644 --- /dev/null +++ b/infocalypse/statemachine.py @@ -0,0 +1,293 @@ +""" Classes for making complicated / interdependent sequences of + FCP requests using state machine logic. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + + +# REDFLAG: move this into requestqueue? + +import os + +from requestqueue import QueueableRequest + +# Move this to fcpconnection? +def delete_client_file(client): + """ Delete the file in client.inf_params.file_name. """ + if client.in_params is None: + return + if client.in_params.file_name is None: + return + if not os.path.exists(client.in_params.file_name): + return + assert client.is_finished() + os.remove(client.in_params.file_name) +# allow tuples, lists? +def require_state(state, state_name): + """ Raise if state.name != state_name. """ + if state is None or state.name != state_name: + raise Exception("Illegal State") + +# Halting when connection drops? +class StateMachine: + """ CS101 state machine treatment. """ + def __init__(self): + # name -> State + self.states = {} + self.current_state = None # Subclass should set. + self.transition_callback = lambda old_state, new_state: None + + def get_state(self, state_name): + """ Get a state object by name. """ + return self.states[state_name] + + def require_state(self, state_name): + """ Assert that the current state has the name state_name """ + require_state(self.current_state, state_name) + + def transition(self, to_name): + """ Transition to the state to_name. """ + new_state = self.states[to_name] + assert new_state.name == to_name + old_state = self.current_state + old_state.leave(new_state) # Shouldn't change state. + assert self.current_state == old_state + self.current_state = new_state # Hmmm... order + self.transition_callback(old_state, new_state) # Shouldn't change state + assert self.current_state == new_state + new_state.enter(old_state) # Can change state. + + def reset(self): + """ Reset all State instances owned by the StateMachine. """ + for state in self.states.values(): + state.reset() + +class State: + """ A class to represent a state in the StateMachine. """ + def __init__(self, parent, name): + self.parent = parent + self.name = name + + def enter(self, from_state): + """ Virtual called when the state is entered. + + It is legal to transition to another state here. """ + pass + + def leave(self, to_state): + """ Virtual called when the state is exited. """ + # Handle canceling here. + pass + + def reset(self): + """ Pure virtual to reset the state. """ + print self.name + raise NotImplementedError() + +class StatefulRequest(QueueableRequest): + """ A QueueableRequest which can be processed by a RequestQueueState. """ + def __init__(self, queue): + QueueableRequest.__init__(self, queue) + self.tag = None + +# Is a delegate which can handle RequestQueue protocol but doesn't +# implement it. +class RequestQueueState(State): + """ A State subclass which implements the RequestQueue method + call protocol without subclassing it. """ + def __init__(self, parent, name): + State.__init__(self, parent, name) + # ? -> StatefulRequest, key type is implementation dependant + self.pending = {} + + def reset(self): + """ Implementation of State virtual. """ + if len(self.pending) > 0: + print ("BUG?: Reseting state: %s with %i pending requests!" % + (self.name, len(self.pending))) + + def next_runnable(self): + """ Return a MinimalClient instance for the next request to + be run or None if none is available. """ + pass + #return None # Trips pylint r201 + + def request_progress(self, client, msg): + """ Handle non-terminal FCP messages for running requests. """ + pass + + def request_done(self, client, msg): + """ Handle terminal FCP messages for running requests. """ + pass + +class Quiescent(RequestQueueState): + """ The quiescent state for the state machine. """ + def __init__(self, parent, name): + RequestQueueState.__init__(self, parent, name) + self.prev_state = 'UNKNOWN' + + def enter(self, from_state): + """ Implementation of State virtual. """ + self.prev_state = from_state.name + + def reset(self): + """ Implementation of State virtual. """ + self.prev_state = 'UNKNOWN' + RequestQueueState.reset(self) + + def arrived_from(self, allowed_states): + """ Returns True IFF the state machine transitioned to this state + from one of the states in allowed_states, False otherwise. """ + return self.prev_state in allowed_states + +class Canceling(RequestQueueState): + """ State which cancels FCP requests from the previous state and + waits for them to finish. """ + + def __init__(self, parent, name, finished_state): + RequestQueueState.__init__(self, parent, name) + self.finished_state = finished_state + + def enter(self, from_state): + """ Implementation of State virtual. """ + if not hasattr(from_state, 'pending') or len(from_state.pending) == 0: + self.parent.transition(self.finished_state) + return + + self.pending = from_state.pending.copy() + for request in self.pending.values(): + self.parent.runner.cancel_request(request) + + def request_done(self, client, dummy): + """ Implementation of RequestQueueState virtual. """ + tag = client.tag + del self.pending[tag] + + if len(self.pending) == 0: + self.parent.transition(self.finished_state) + return + +class CandidateRequest(StatefulRequest): + """ A StatefulRequest subclass that was made from + some kind of candidate. """ + def __init__(self, queue): + StatefulRequest.__init__(self, queue) + self.candidate = None + +# This is not as well thought out as the other stuff in this file. +# REDFLAG: better name? +class RetryingRequestList(RequestQueueState): + """ A RequestQueueState subclass which maintains a collection + of 'candidate' objects which it uses to make request from. + + NOTE: + The definition of what a candidate is is left to the subclass. + """ + def __init__(self, parent, name): + RequestQueueState.__init__(self, parent, name) + self.current_candidates = [] + self.next_candidates = [] + self.finished_candidates = [] + + def reset(self): + """ Implementation of State virtual. """ + self.current_candidates = [] + self.next_candidates = [] + self.finished_candidates = [] + RequestQueueState.reset(self) + + def next_runnable(self): + """ Implementation of RequestQueueState virtual. """ + candidate = self.get_candidate() + if candidate is None: + return None + + request = self.make_request(candidate) + self.pending[request.tag] = request + return request + + def request_done(self, client, msg): + """ Implementation of RequestQueueState virtual. """ + candidate = client.candidate + assert not candidate is None + del self.pending[client.tag] + # REDFLAG: fix signature? to get rid of candidate + self.candidate_done(client, msg, candidate) + + ############################################################ + def is_stalled(self): + """ Returns True if there are no more candidates to run, + False otherwise. """ + return (len(self.pending) + len(self.current_candidates) + + len(self.next_candidates) == 0) + + def pending_candidates(self): + """ Returns the candiates that are currently being run + by the RequestQueue. """ + return [request.candidate for request in self.pending.values()] + + # ORDER: + # 0) Candidates are popped of the lists. + # 1) All candidates are popped off of current before any are popped + # off of next. + # 2) When current is empty AND all pending requests have finished + # next and current are swapped. + def get_candidate(self): + """ INTERNAL: Gets the next candidate to run, or None if none + is available. """ + if len(self.current_candidates) == 0: + if len(self.pending) != 0 or len(self.next_candidates) == 0: + # i.e. Don't run requests from the next_candidates + # until requests for current candidates have finished. + # REDFLAG: Add a parameter to control this behavior? + return None + + self.current_candidates = self.next_candidates + self.next_candidates = [] + return self.get_candidate() + + #print "get_candidate -- ", len(self.pending) + # len(self.current_candidates), \ + # len(self.next_candidates) + #print "CURRENT:" + #print self.current_candidates + #print "NEXT:" + #print self.next_candidates + + candidate = self.current_candidates.pop() + + return candidate + + ############################################################ + def candidate_done(self, client, msg, candidate): + """ Pure virtual. + + Add candidate to next_candidates here to retry. + Add candidate to finished_candidates here if done. """ + # Commented out to avoid pylint R0922 + #raise NotImplementedError() + pass + + def make_request(self, dummy_candidate): + """ Subclasses must return CandidateRequest or CandidateRequest + subclass for the candidate.""" + #raise NotImplementedError() + + return CandidateRequest(self.parent) + diff --git a/infocalypse/topkey.py b/infocalypse/topkey.py new file mode 100644 --- /dev/null +++ b/infocalypse/topkey.py @@ -0,0 +1,138 @@ +""" Helper functions to read and write the update stored in Hg + Infocalypse top level keys. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks + + + The Python rep of top key data is just a tuple: + ((graph_a_chk, graph_b_chk), (<update>,...)) + + Where: + <update> := (length, parent_rev, latest_rev, (CHK, ...)) + + top_key_data_to_bytes() converts from the tuple format to + a compact binary rep. + bytes_to_top_key_data() converts the binary rep back to a tuple. +""" + + +# Hmmm... this is essentially a bespoke solution for limitations in +# Freenet metadata processing. +import struct + +from binascii import hexlify, unhexlify + +from chk import CHK_SIZE, bytes_to_chk, chk_to_bytes + +MAJOR_VERSION = '1' +MINOR_VERSION = '00' + +HDR_VERSION = MAJOR_VERSION + MINOR_VERSION +HDR_BYTES = 'HGINF%s' % HDR_VERSION + +# Header length 'HGINF100' +HDR_SIZE = 8 +assert len(HDR_BYTES) == HDR_SIZE + +# Length of the binary rep of an hg version +HGVER_SIZE = 20 + +# <header bytes><salt byte><num graph chks> +BASE_FMT = "!%isBB" % HDR_SIZE +# bundle_len:parent_rev:latest_rev:CHK [:CHK] +BASE_UPDATE_FMT = "!q%is%isB" % (HGVER_SIZE, HGVER_SIZE) +# Binary rep of a single CHK +CHK_FMT = "!%is" % CHK_SIZE + +BASE_LEN = struct.calcsize(BASE_FMT) +BASE_UPDATE_LEN = struct.calcsize(BASE_UPDATE_FMT) + +def top_key_tuple_to_bytes(top_key_tuple, salt_byte=0): + """ Returns a binary representation of top_key_tuple. """ + + ret = struct.pack(BASE_FMT, HDR_BYTES, salt_byte, len(top_key_tuple[0])) + for graph_chk in top_key_tuple[0]: + ret += chk_to_bytes(graph_chk) + + for update in top_key_tuple[1]: + assert len(update[1]) == 40 + assert len(update[2]) == 40 + ret += struct.pack(BASE_UPDATE_FMT, update[0], + unhexlify(update[1]), + unhexlify(update[2]), + len(update[3])) + for chk in update[3]: + chk_bytes = struct.pack(CHK_FMT, chk_to_bytes(chk)) + assert len(chk_bytes) == CHK_SIZE + ret += chk_bytes + + return ret + +def bytes_to_top_key_tuple(bytes): + """ Parses the top key data from a byte block and returns a tuple. """ + + if len(bytes) < BASE_LEN: + raise ValueError("Not enough data to parse static fields.") + + # Hmmm... return the salt byte? + hdr, dummy, graph_chk_count = struct.unpack(BASE_FMT, bytes[:BASE_LEN]) + #print "bytes_to_top_key_data -- salt: ", dummy + bytes = bytes[BASE_LEN:] + if hdr != HDR_BYTES: + print "bytes_to_top_key_data -- header doesn't match! Expect problems." + if len(bytes) == 0: + print "bytes_to_top_key_data -- No updates?" + + graph_chks = [] + for dummy in range(0, graph_chk_count): + graph_chks.append(bytes_to_chk(struct.unpack(CHK_FMT, + bytes[:CHK_SIZE])[0])) + bytes = bytes[CHK_SIZE:] + + updates = [] + while len(bytes) > BASE_UPDATE_LEN: + length, raw_parent, raw_latest, chk_count = struct.unpack( + BASE_UPDATE_FMT, + bytes[:BASE_UPDATE_LEN]) + + bytes = bytes[BASE_UPDATE_LEN:] + chks = [] + for dummy in range(0, chk_count): + chks.append(bytes_to_chk(struct.unpack(CHK_FMT, + bytes[:CHK_SIZE])[0])) + bytes = bytes[CHK_SIZE:] + + updates.append((length, hexlify(raw_parent), hexlify(raw_latest), + tuple(chks))) + + return (tuple(graph_chks), tuple(updates)) + +def dump_top_key_tuple(top_key_tuple): + """ Debugging function to print a top_key_tuple. """ + print "---top key tuple---" + for index, chk in enumerate(top_key_tuple[0]): + print "graph_%s:%s" % (chr(ord('a') + index), chk) + for index, update in enumerate(top_key_tuple[1]): + print "update[%i]" % index + print " length : %i" % update[0] + print " parent_rev: %s" % update[1] + print " latest_rev: %s" % update[2] + for index, chk in enumerate(update[3]): + print " CHK[%i]:%s" % (index, chk) + print "---" diff --git a/infocalypse/updatesm.py b/infocalypse/updatesm.py new file mode 100644 --- /dev/null +++ b/infocalypse/updatesm.py @@ -0,0 +1,972 @@ +""" Classes to asynchronously create, push and pull Infocalypse + Freenet repositories. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +# Classes for inserting to or updating from freenet +# REDFLAG: better name. + +import os +import random +import time + +from fcpclient import get_ssk_for_usk_version, get_usk_for_usk_version, \ + is_usk, is_ssk, is_usk_file, get_version, get_negative_usk +from fcpconnection import SUCCESS_MSGS +from fcpmessage import GET_DEF, PUT_FILE_DEF, GET_REQUEST_URI_DEF + +from requestqueue import RequestQueue + +from chk import clear_control_bytes +from bundlecache import make_temp_file +from graph import INSERT_NORMAL, INSERT_PADDED, INSERT_SALTED_METADATA, \ + minimal_update_graph, graph_to_string, \ + FREENET_BLOCK_LEN, has_version, pull_bundle, parse_graph, hex_version + +from topkey import bytes_to_top_key_tuple, top_key_tuple_to_bytes + +from statemachine import StatefulRequest, RequestQueueState, StateMachine, \ + Quiescent, Canceling, RetryingRequestList, CandidateRequest, \ + require_state, delete_client_file + +from insertingbundles import InsertingBundles +from requestingbundles import RequestingBundles + +HG_MIME_TYPE = 'application/mercurial-bundle' +HG_MIME_TYPE_FMT = HG_MIME_TYPE + ';%i' + +METADATA_MARKER = HG_MIME_TYPE + ';' +PAD_BYTE = '\xff' + +# Hmmm... do better? +# IIF ends with .R1 second ssk ends with .R0. +# Makes it easy for paranoid people to disable redundant +# top key fetching. ie. just request *R0 instead of *R1. +# Also could intuitively be expanded to higher levels of +# redundancy. +def make_redundant_ssk(usk, version): + """ Returns a redundant ssk pair for the USK version IFF the file + part of usk ends with '.R1', otherwise a single + ssk for the usk specified version. """ + ssk = get_ssk_for_usk_version(usk, version) + fields = ssk.split('-') + if not fields[-2].endswith('.R1'): + return (ssk, ) + #print "make_redundant_ssk -- is redundant" + fields[-2] = fields[-2][:-2] + 'R0' + return (ssk, '-'.join(fields)) + +# For search +def make_search_uris(uri): + """ Returns a redundant USK pair if the file part of uri ends + with '.R1', a tuple containing only uri. """ + if not is_usk_file(uri): + return (uri,) + fields = uri.split('/') + if not fields[-2].endswith('.R1'): + return (uri, ) + #print "make_search_uris -- is redundant" + fields[-2] = fields[-2][:-2] + 'R0' + return (uri, '/'.join(fields)) + +# For insert +def make_insert_uris(uri): + """ Returns a possibly redundant insert uri tuple. + NOTE: This increments the version by 1 if uri is a USK. + """ + if uri == 'CHK@': + return (uri,) + assert is_usk_file(uri) + version = get_version(uri) + # REDFLAG: does index increment really belong here? + return make_redundant_ssk(uri, version + 1) + +def ssk_to_usk(ssk): + """ Convert an SSK for a file USK back into a file USK. """ + fields = ssk.split('-') + end = '/'.join(fields[-2:]) + fields = fields[:-2] + [end, ] + return 'USK' + '-'.join(fields)[3:] + +class UpdateContext(dict): + """ A class to hold inter-state data used while the state machine is + running. """ + + def __init__(self, parent): + dict.__init__(self) + + # Parent state machine. + self.parent = parent + + # Merurial state + self.repo = None + self.ui_ = None + self.bundle_cache = None + + # Orphaned request handling hmmm... + self.orphaned = {} + + # UpdateGraph instance. + self.graph = None + + # If this is True states can use the results of index searches on the + # public key to update the private key. + self['IS_KEYPAIR'] = False + + self['TARGET_VERSION'] = None + self['INSERT_URI'] = 'CHK@' + self['REQUEST_URI'] = None + + def has_version(self, version): + """ Returns True if version is already in the hg repository, + False otherwise. """ + return has_version(self.repo, version) + + def pull(self, file_name): + """ Pulls an hg bundle file into the local repository. """ + self.ui_.pushbuffer() # Hmmm.. add param to make this optional? + try: + pull_bundle(self.repo, self.ui_, file_name) + finally: + self.ui_.popbuffer() + + def set_cancel_time(self, request): + """ Sets the timeout on a QueueableRequest. """ + request.cancel_time_secs = time.time() \ + + self.parent.params['CANCEL_TIME_SECS'] + # REDFLAG: get rid of tag arg? + def make_splitfile_metadata_request(self, edge, tag): + """ Makes a StatefulRequest for the Freenet metadata for the + CHK corresponding to an edge in the update graph. + + Helper function used by InsertingBundles state. + """ + request = StatefulRequest(self.parent) + request.tag = tag + # TRICKY: Clear control bytes to get the raw CHK contents, + # disabling Freenet metadata handling. + uri = clear_control_bytes(self.parent.ctx.graph.get_chk(edge)) + request.in_params.definition = GET_DEF + request.in_params.fcp_params = self.parent.params.copy() + request.in_params.fcp_params['URI'] = uri + self.set_cancel_time(request) + return request + + # From file (via bundle_cache). + def make_edge_insert_request(self, edge, tag, salted_metadata_cache): + """ Makes a StatefuleRequest to insert the hg bundle + corresponding to an edge in the update graph. + + Helper function used by InsertingBundles state. + """ + request = StatefulRequest(self.parent) + request.tag = tag + request.in_params.definition = PUT_FILE_DEF + request.in_params.fcp_params = self.parent.params.copy() + request.in_params.fcp_params['URI'] = 'CHK@' + kind = self.graph.insert_type(edge) + if kind == INSERT_SALTED_METADATA: + #print "make_edge_insert_request -- salted" + assert edge[2] == 1 + raw_bytes = salted_metadata_cache[(edge[0], edge[1], 0)] + pos = raw_bytes.find(METADATA_MARKER) + if pos == -1 or len(raw_bytes) < pos + len(METADATA_MARKER) + 1: + raise Exception("Couldn't read marker string.") + + salted_pos = pos + len(METADATA_MARKER) + old_salt = raw_bytes[salted_pos] + if old_salt != '0': + raise Exception("Unexpected salt byte: %s" % old_salt) + + twiddled_bytes = raw_bytes[:salted_pos] + '1' \ + + raw_bytes[salted_pos + 1:] + assert len(raw_bytes) == len(twiddled_bytes) + + request.in_params.send_data = twiddled_bytes + self.set_cancel_time(request) + return request + + assert kind == INSERT_NORMAL or kind == INSERT_PADDED + pad = (kind == INSERT_PADDED) + #print "make_edge_insert_request -- from disk: pad" + + tmp_file, mime_type = self._get_bundle(edge, pad) + request.in_params.file_name = tmp_file + request.in_params.send_data = True + if not mime_type is None: + request.in_params.fcp_params['Metadata.ContentType'] = mime_type + self.set_cancel_time(request) + return request + + def _get_bundle(self, edge, pad): + """ Returns a (temp_file, mime_type) tuple for the hg bundle + file corresponding to edge. """ + original_len = self.graph.get_length(edge) + expected_len = original_len + if pad: + expected_len += 1 + # Hmmmm... misuse of bundle cache dir? + tmp_file = make_temp_file(self.parent.ctx.bundle_cache.base_dir) + raised = False + try: + bundle = self.parent.ctx.bundle_cache.make_bundle(self.graph, + edge[:2], + tmp_file) + assert bundle[0] == original_len + if pad: + out_file = open(tmp_file, 'ab') + try: + out_file.seek(0, os.SEEK_END) + out_file.write(PAD_BYTE) + finally: + out_file.close() + + assert expected_len == os.path.getsize(tmp_file) + raised = False + finally: + if raised and os.path.exists(tmp_file): + os.remove(tmp_file) + + if expected_len <= FREENET_BLOCK_LEN: + mime_type = None + else: + assert edge[2] > -1 and edge[2] < 2 + mime_type = HG_MIME_TYPE_FMT % edge[2] + + return (tmp_file, mime_type) + + def orphan_requests(self, from_state): + """ Give away requests that should be allowed to keep running. """ + if not hasattr(from_state, 'pending') or len(from_state.pending) == 0: + return + + for tag in from_state.pending: + request = from_state.pending[tag] + request.tag = "orphaned_%s_%s" % (str(request.tag), from_state.name) + assert not request.tag in self.orphaned + self.orphaned[request.tag] = request + from_state.pending.clear() + + +class CleaningUp(Canceling): + """ Cancel all pending requests including orphaned ones and wait + for them to finish. """ + + def __init__(self, parent, name, finished_state): + Canceling.__init__(self, parent, name, finished_state) + + def enter(self, from_state): + """ Override Cancel implementation to grab all orphaned requests.""" + self.parent.ctx.orphan_requests(from_state) + self.pending.update(self.parent.ctx.orphaned) + self.parent.ctx.orphaned.clear() + # Hmmm... should be ok to recancel already canceled requests. + for request in self.pending.values(): + self.parent.runner.cancel_request(request) + if len(self.pending) == 0: + self.parent.transition(self.finished_state) + +# Uses: +# Inserting Graph -- insert 2 chks +# Inserting URI -- insert up to 2 keys +# Requesting URI -- request up to 2 keys +# Requesting Graph -- request up to 2 + +# candidate is: +#[uri, tries, is_insert, raw_data, mime_type, last_msg] +class StaticRequestList(RetryingRequestList): + """ A base class for states which insert or fetch static lists + of keys/to from Freenet. + + Candidates are tuples of the form: + [uri, tries, is_insert, raw_data, mime_type, last_msg] + """ + def __init__(self, parent, name, success_state, failure_state): + RetryingRequestList.__init__(self, parent, name) + self.success_state = success_state + self.failure_state = failure_state + self.ordered = [] # i.e. so you can deref candidates in order queued + self.required_successes = 0 + # If this is True attemps all candidates before success. + self.try_all = False + + def reset(self): + """ Implementation of State virtual. """ + self.ordered = [] + self.required_successes = 0 + self.try_all = False + RetryingRequestList.reset(self) + + def queue(self, candidate): + """ Enqueue a new request. """ + #[uri, tries, is_insert, raw_data, mime_type, last_msg] + assert candidate[1] == 0 + assert candidate[2] == True or candidate[2] == False + assert candidate[5] == None + self.current_candidates.insert(0, candidate) + self.ordered.append(candidate) + + def should_retry(self, dummy_client, dummy_msg, candidate): + """ Returns True if the request candidate should be retried, + False otherwise. """ + # REDFLAG: rationalize parameter names + # ATL == Above the Line + max_retries = self.parent.params.get('MAX_ATL_RETRIES', 0) + return candidate[1] > max_retries + 1 + + # Override to provide better tags. + # tags MUST uniquely map to candidates. + # REDFLAG: O(n) + def get_tag(self, candidate): + """ Return a unique tag correspoinding the request candidate. """ + return self.ordered.index(candidate) + + def get_result(self, index): + """ Returns the final FCP message for a request candidate or + None if none is available. """ + return self.ordered[index][5] + + def candidate_done(self, client, msg, candidate): + """ Implementation of RetryingRequestList virtual. """ + # Add candidate to next_candidates here to retry. + # Add candidate to finished_candidates here if done. + # Commented out to avoid pylint R0922 + #raise NotImplementedError() + candidate[5] = msg + if msg[0] in SUCCESS_MSGS: + self.required_successes -= 1 + elif self.should_retry(client, msg, candidate): + self.next_candidates.insert(0, candidate) + return + + self.finished_candidates.append(candidate) + if self.required_successes <= 0: + if self.try_all: + for candidate in self.ordered: + if candidate[5] is None: + # Wait for all to be tried + return + self.parent.transition(self.success_state) + return + + if self.is_stalled(): + self.parent.transition(self.failure_state) + + # Override for bigger data. This: + # 0) Keeps all data in RAM + # 1) Limits requests to 32K + #[uri, tries, is_insert, raw_data, mime_type, last_msg] + def make_request(self, candidate): + """ Implementation of RetryingRequestList virtual. """ + request = CandidateRequest(self.parent) + request.tag = self.get_tag(candidate) + request.candidate = candidate + request.in_params.fcp_params = self.parent.params.copy() + request.in_params.fcp_params['URI'] = candidate[0] + if candidate[2]: + # Insert from raw data. + request.in_params.definition = PUT_FILE_DEF + if not candidate[4] is None: + mime_type = candidate[4] + request.in_params.fcp_params['Metadata.ContentType'] = mime_type + request.in_params.send_data = candidate[3] + else: + # Request data + request.in_params.definition = GET_DEF + request.in_params.fcp_params['MaxSize'] = FREENET_BLOCK_LEN + request.in_params.allowed_redirects = ( + self.parent.params.get('ALLOWED_REDIRECTS', 5)) + # Hmmmm... + self.parent.ctx.set_cancel_time(request) + candidate[1] += 1 + return request + +class InsertingGraph(StaticRequestList): + """ A state to insert the Infocalypse update graph into Freenet. """ + def __init__(self, parent, name, success_state, failure_state): + StaticRequestList.__init__(self, parent, name, + success_state, failure_state) + self.working_graph = None + + def enter(self, from_state): + """ Implementation of State virtual. + + This computes the minimal graph that will fit in a 32k + block from the graph in the context and inserts it + into two different Freenet CHK's. Different headers + are added before the graph data to get different + CHKs. + """ + require_state(from_state, INSERTING_BUNDLES) + + if self.parent.params.get('DUMP_GRAPH', False): + self.parent.ctx.ui_.status("--- Updated Graph ---\n") + self.parent.ctx.ui_.status(graph_to_string(self.parent.ctx.graph) + + '\n') + + # Create minimal graph that will fit in a 32k block. + self.working_graph = minimal_update_graph(self.parent.ctx.graph, + 31 * 1024, graph_to_string) + + if self.parent.params.get('DUMP_GRAPH', False): + self.parent.ctx.ui_.status("--- Minimal Graph ---\n") + self.parent.ctx.ui_.status(graph_to_string(minimal_update_graph( + self.working_graph, + 31 * 1024, graph_to_string)) + '\n---\n') + + # Make sure the string rep is small enough! + graph_bytes = graph_to_string(self.working_graph) + assert len(graph_bytes) < 31 * 1024 + + # Insert the graph twice for redundancy + self.queue(['CHK@', 0, True, '#A\n' + graph_bytes, None, None]) + self.queue(['CHK@', 0, True, '#B\n' + graph_bytes, None, None]) + self.required_successes = 2 + + def leave(self, to_state): + """ Implementation of State virtual. + + This updates the graph in the context on success. """ + if to_state.name == self.success_state: + # Update the graph in the context on success. + self.parent.ctx.graph = self.working_graph + self.working_graph = None + + def reset(self): + """ Implementation of State virtual. """ + StaticRequestList.reset(self) + self.working_graph = None + +def get_top_key_updates(graph): + """ Returns the update tuples needed to build the top key.""" + + graph.rep_invariant() + + edges = graph.get_top_key_edges() + + coalesced_edges = [] + ordinals = {} + for edge in edges: + assert edge[2] >= 0 and edge[2] < 2 + assert edge[2] == 0 or (edge[0], edge[1], 0) in edges + ordinal = ordinals.get(edge[:2]) + if ordinal is None: + ordinal = 0 + coalesced_edges.append(edge[:2]) + ordinals[edge[:2]] = max(ordinal, edge[2]) + + ret = [] + for edge in coalesced_edges: + parent_rev = graph.index_table[edge[0]][0] + latest_rev = graph.index_table[edge[1]][1] + length = graph.get_length(edge) + assert len(graph.edge_table[edge][1:]) > 0 + + #(length, parent_rev, latest_rev, (CHK, ...)) + update = (length, parent_rev, latest_rev, + graph.edge_table[edge][1:]) + ret.append(update) + + return ret + +class InsertingUri(StaticRequestList): + """ A state to insert the top level URI for an Infocalypse repository + into Freenet.""" + def __init__(self, parent, name, success_state, failure_state): + StaticRequestList.__init__(self, parent, name, success_state, + failure_state) + + def enter(self, from_state): + """ Implementation of State virtual. + + This creates the binary rep for the top level key + data and starts inserting it into Freenet. + """ + require_state(from_state, INSERTING_GRAPH) + + # Pull the graph CHKs out of the inserting + # graph state instance. REDFLAG: Hardcoded state name ok? + graph_insert = self.parent.get_state(INSERTING_GRAPH) + graph = self.parent.ctx.graph + + top_key_tuple = ((graph_insert.get_result(0)[1]['URI'], + graph_insert.get_result(1)[1]['URI']), + get_top_key_updates(graph)) + + salt = {0:0x00, 1:0xff} # grrr.... less code. + insert_uris = make_insert_uris(self.parent.ctx['INSERT_URI']) + assert len(insert_uris) < 3 + for index, uri in enumerate(insert_uris): + if self.parent.params.get('DUMP_URIS', False): + self.parent.ctx.ui_.status("INSERT_URI: %s\n" % uri) + self.queue([uri, 0, True, + top_key_tuple_to_bytes(top_key_tuple, salt[index]), + None, None]) + self.required_successes = len(insert_uris) + + def leave(self, to_state): + """ Implementation of State virtual. """ + if to_state.name == self.success_state: + # Hmmm... what about chks? + # Update the index in the insert_uri on success + if is_usk(self.parent.ctx['INSERT_URI']): + version = get_version(self.parent.ctx['INSERT_URI']) + 1 + self.parent.ctx['INSERT_URI'] = ( + get_usk_for_usk_version(self.parent.ctx['INSERT_URI'], + version)) + if self.parent.params.get('DUMP_URIS', False): + self.parent.ctx.ui_.status(("INSERT UPDATED INSERT " + + "URI:\n%s\n") + % self.parent.ctx['INSERT_URI']) + def get_request_uris(self): + """ Return the inserted request uri(s). """ + ret = [] + was_usk = is_usk_file(self.parent.ctx['INSERT_URI']) + for candidate in self.ordered: + uri = candidate[5][1]['URI'] + if is_ssk(uri) and was_usk: + uri = ssk_to_usk(uri) + ret.append(uri) + return ret + +class RequestingUri(StaticRequestList): + """ A state to request the top level URI for an Infocalypse + repository. """ + def __init__(self, parent, name, success_state, failure_state): + StaticRequestList.__init__(self, parent, name, success_state, + failure_state) + self.try_all = True # Hmmmm... + + def enter(self, dummy): + """ Implementation of State virtual. """ + #require_state(from_state, QUIESCENT) + + #print "REQUEST_URI:" + #print self.parent.ctx['REQUEST_URI'] + + request_uri = self.parent.ctx['REQUEST_URI'] + if (is_usk(request_uri) and + self.parent.params.get('AGGRESSIVE_SEARCH', False)): + request_uri = get_negative_usk(request_uri) + + request_uris = make_search_uris(request_uri) + for uri in request_uris: + #[uri, tries, is_insert, raw_data, mime_type, last_msg] + if self.parent.params.get('DUMP_URIS', False): + self.parent.ctx.ui_.status("REQUEST URI: %s\n" % uri) + self.queue([uri, 0, False, None, None, None]) + + self.required_successes = 1 #len(self.results) # Hmmm fix, but how + + # So we don't implictly favor one by requesting it first. + random.shuffle(self.current_candidates) + + def leave(self, to_state): + """ Implementation of State virtual. """ + if to_state.name == self.success_state: + self.parent.ctx['REQUEST_URI'] = self.get_latest_uri() + if is_usk(self.parent.ctx['REQUEST_URI']): + self.parent.ctx.ui_.status("Current USK version: %i\n" % + get_version(self.parent + .ctx['REQUEST_URI'])) + + if (self.parent.ctx['IS_KEYPAIR'] and + is_usk(self.parent.ctx['REQUEST_URI']) and # lose usk checks? + is_usk(self.parent.ctx['INSERT_URI'])): + version = get_version(self.parent.ctx['REQUEST_URI']) + self.parent.ctx['INSERT_URI'] = ( + get_usk_for_usk_version(self.parent.ctx['INSERT_URI'], + version)) + #print "SEARCH UPDATED INSERT URI: ", \ + # self.parent.ctx['INSERT_URI'] + + # Allow pending requests to run to completion. + self.parent.ctx.orphan_requests(self) + + def get_top_key_tuple(self): + """ Get the python rep of the data in the URI. """ + top_key_tuple = None + for candidate in self.ordered: + result = candidate[5] + if result is None or result[0] != 'AllData': + continue + top_key_tuple = bytes_to_top_key_tuple(result[2]) + break + assert not top_key_tuple is None + return top_key_tuple + + def get_latest_uri(self): + """ Returns the URI with the version part update if the URI is a USK.""" + max_version = None + for candidate in self.ordered: + result = candidate[5] + if result is None or result[0] != 'AllData': + continue + uri = result[1]['URI'] + if not is_usk_file(uri): + return uri + max_version = max(max_version, abs(get_version(uri))) + break + + assert not max_version is None + # The .R1 URI is queued first. + assert (len(self.ordered) < 2 or + self.ordered[0][0].find('.R1') != -1) + return get_usk_for_usk_version(self.ordered[0][0], + max_version) + +class InvertingUri(RequestQueueState): + """ A state to compute the request URI corresponding to a Freenet + insert URI. """ + def __init__(self, parent, name, success_state, failure_state): + RequestQueueState.__init__(self, parent, name) + self.insert_uri = None + self.queued = False + self.msg = None + self.success_state = success_state + self.failure_state = failure_state + def get_request_uri(self): + """ Returns the request URI.""" + if self.msg is None or self.msg[0] != 'PutSuccessful': + return None + inverted = self.msg[1]['URI'] + public = inverted[inverted.find('@') + 1: inverted.find('/')] + return self.insert_uri[:self.insert_uri.find('@') + 1] + public \ + + self.insert_uri[self.insert_uri.find('/'):] + + def enter(self, dummy): + """ Implementation of State virtual. """ + if self.insert_uri == None: + self.insert_uri = self.parent.ctx['INSERT_URI'] + assert not self.insert_uri is None + #print "INVERTING: ", self.insert_uri + + def leave(self, to_state): + """ Implementation of State virtual. + + Sets the REQUEST_URI in the context on success. + """ + if to_state.name == self.success_state: + # Don't overwrite request_uri in the pushing from case. + if self.parent.ctx['REQUEST_URI'] is None: + self.parent.ctx['REQUEST_URI'] = self.get_request_uri() + + def reset(self): + """ Implementation of State virtual. """ + self.insert_uri = None + self.queued = False + self.msg = None + RequestQueueState.reset(self) + + def next_runnable(self): + """ Implementation of RequestQueueState virtual. """ + if self.queued: + return None + self.queued = True + + uri = self.insert_uri + if is_usk(uri): + # Hack to keep freenet from doing a USK search. + uri = get_ssk_for_usk_version(uri, 0) + + request = StatefulRequest(self.parent) + request.in_params.definition = GET_REQUEST_URI_DEF + request.in_params.fcp_params = {'URI': uri, + 'MaxRetries': 1, + 'PriorityClass':1, + 'UploadFrom':'direct', + 'GetCHKOnly':True} + request.in_params.send_data = '@' * 9 + request.in_params.fcp_params['DataLength'] = ( + len(request.in_params.send_data)) + request.tag = 'only_invert' # Hmmmm... + self.parent.ctx.set_cancel_time(request) + return request + + def request_done(self, dummy_client, msg): + """ Implementation of RequestQueueState virtual. """ + #print "INVERTING DONE:", msg + self.msg = msg + if msg[0] == 'PutSuccessful': + #print "REQUEST_URI: ", self.get_request_uri() + self.parent.transition(self.success_state) + return + self.parent.transition(self.failure_state) + +class RequestingGraph(StaticRequestList): + """ A state to request the update graph for an Infocalypse repository. """ + def __init__(self, parent, name, success_state, failure_state): + StaticRequestList.__init__(self, parent, name, success_state, + failure_state) + + def get_top_key_tuple(self): + """ Returns the Python rep of the data in the request uri. """ + results = [candidate[5] for candidate in + self.parent.get_state(REQUESTING_URI_4_INSERT).ordered] + top_key_tuple = None + for result in results: + if result is None or result[0] != 'AllData': + continue + top_key_tuple = bytes_to_top_key_tuple(result[2]) + break + assert not top_key_tuple is None + return top_key_tuple + + def enter(self, from_state): + """ Implementation of State virtual. """ + require_state(from_state, REQUESTING_URI_4_INSERT) + + top_key_tuple = self.get_top_key_tuple() + #print "TOP_KEY_TUPLE", top_key_tuple + #[uri, tries, is_insert, raw_data, mime_type, last_msg] + for uri in top_key_tuple[0]: + self.queue([uri, 0, False, None, None, None]) + random.shuffle(self.current_candidates) + self.required_successes = 1 + + def leave(self, to_state): + """ Implementation of State virtual. """ + if to_state.name == self.success_state: + # Set the graph from the result + graph = None + for candidate in self.ordered: + result = candidate[5] + if not result is None and result[0] == 'AllData': + graph = parse_graph(result[2]) + break + assert not graph is None + + self.parent.ctx.graph = graph + + # Allow pending requests to run to completion. + for tag in self.pending: + request = self.pending[tag] + request.tag = "orphaned_%s_%s" % (str(request.tag), self.name) + assert not request.tag in self.parent.ctx.orphaned + self.parent.ctx.orphaned[request.tag] = request + self.pending.clear() + +# Allow entry into starting +QUIESCENT = 'QUIESCENT' + +# Get the request_uri from the insert_uri +INVERTING_URI = 'INVERTING_URI' + +# Get the request_uri from the insert_uri, and start inserting +INVERTING_URI_4_INSERT = 'INVERTING_URI_4_INSERT' + +# Used to lookup graph. +REQUESTING_URI_4_INSERT = 'REQUESTING_URI_4_INSERT' + +# Read the graph out of freenet. +REQUESTING_GRAPH = 'REQUESTING_GRAPH' + +# Wait for bundles to insert, handle metadata salting. +INSERTING_BUNDLES = 'INSERTING_BUNDLES' +# Wait for graphs to insert. +INSERTING_GRAPH = 'INSERTING_GRAPH' +# Wait for ssks to insert +INSERTING_URI = 'INSERTING_URI' +# Wait for pending requests to finish +CANCELING = 'CANCELING' +FAILING = 'FAILING' +FINISHING = 'FINISHING' + +REQUESTING_URI = 'REQUESTING_URI' +REQUESTING_BUNDLES = 'REQUESTING_BUNDLES' + +class UpdateStateMachine(RequestQueue, StateMachine): + """ A StateMachine implementaion to create, push to and pull from + Infocalypse repositories. """ + + def __init__(self, runner, repo, ui_, bundle_cache): + RequestQueue.__init__(self, runner) + StateMachine.__init__(self) + + self.states = { + QUIESCENT:Quiescent(self, QUIESCENT), + + # Justing inverting + INVERTING_URI:InvertingUri(self, INVERTING_URI, + QUIESCENT, + FAILING), + + # Requesting previous graph in order to do insert. + INVERTING_URI_4_INSERT:InvertingUri(self, INVERTING_URI_4_INSERT, + REQUESTING_URI_4_INSERT, + FAILING), + + REQUESTING_URI_4_INSERT:RequestingUri(self, + REQUESTING_URI_4_INSERT, + REQUESTING_GRAPH, + FAILING), + REQUESTING_GRAPH:RequestingGraph(self, REQUESTING_GRAPH, + INSERTING_BUNDLES, + FAILING), + + + # Inserting + INSERTING_BUNDLES:InsertingBundles(self, + INSERTING_BUNDLES), + INSERTING_GRAPH:InsertingGraph(self, INSERTING_GRAPH, + INSERTING_URI, + FAILING), + INSERTING_URI:InsertingUri(self,INSERTING_URI, + FINISHING, + FAILING), + CANCELING:CleaningUp(self, CANCELING, QUIESCENT), + FAILING:CleaningUp(self, FAILING, QUIESCENT), + + # Requesting + REQUESTING_URI:RequestingUri(self, REQUESTING_URI, + REQUESTING_BUNDLES, + FAILING), + + REQUESTING_BUNDLES:RequestingBundles(self, REQUESTING_BUNDLES, + FINISHING, + FAILING), + + FINISHING:CleaningUp(self, FINISHING, QUIESCENT), + } + + self.current_state = self.get_state(QUIESCENT) + + self.params = {} + # Must not change any state! + self.monitor_callback = lambda parent, client, msg: None + + self.ctx = UpdateContext(self) + self.ctx.repo = repo + self.ctx.ui_ = ui_ + self.ctx.bundle_cache = bundle_cache + + runner.add_queue(self) + + def reset(self): + """ StateMachine override. """ + StateMachine.reset(self) + + ctx = UpdateContext(self) + ctx.repo = self.ctx.repo + ctx.ui_ = self.ctx.ui_ + ctx.bundle_cache = self.ctx.bundle_cache + if len(self.ctx.orphaned) > 0: + print "BUG?: Abandoning orphaned requests." + self.ctx.orphaned.clear() + + self.ctx = ctx + + def start_inserting(self, graph, to_version, insert_uri='CHK@'): + """ Start and insert of the graph and any required new edge CHKs + to the insert URI. """ + self.require_state(QUIESCENT) + self.reset() + self.ctx.graph = graph + self.ctx['TARGET_VERSION'] = to_version + self.ctx['INSERT_URI'] = insert_uri + self.transition(INSERTING_BUNDLES) + + # Update a repo USK. + # REDFLAG: later, keys_match=False arg + def start_pushing(self, insert_uri, to_version='tip', request_uri=None, + is_keypair=False): + + """ Start pushing local changes up to to_version to an existing + Infocalypse repository. """ + + self.require_state(QUIESCENT) + self.reset() + self.ctx.graph = None + self.ctx['INSERT_URI'] = insert_uri + self.ctx['REQUEST_URI'] = request_uri + # Hmmmm... better exception if to_version isn't in the repo? + self.ctx['TARGET_VERSION'] = hex_version(self.ctx.repo, to_version) + if request_uri is None: + self.ctx['IS_KEYPAIR'] = True + self.transition(INVERTING_URI_4_INSERT) + else: + self.ctx['IS_KEYPAIR'] = is_keypair + self.transition(REQUESTING_URI_4_INSERT) + + # Pull from a repo USK. + def start_pulling(self, request_uri): + """ Start pulling changes from an Infocalypse repository URI + in Freenet into the local hg repository. """ + self.require_state(QUIESCENT) + self.reset() + self.ctx.graph = None + self.ctx['REQUEST_URI'] = request_uri + self.transition(REQUESTING_URI) + + # REDFLAG: SSK case untested + def start_inverting(self, insert_uri): + """ Start inverting a Freenet URI into it's analogous + request URI. """ + assert is_usk(insert_uri) or is_ssk(insert_uri) + self.require_state(QUIESCENT) + self.reset() + self.get_state(INVERTING_URI).insert_uri = insert_uri + self.transition(INVERTING_URI) + + # REDFLAG: UNTESTED + def cancel(self): + """ Start canceling the current operation. """ + + if (self.current_state.name != QUIESCENT and + self.current_state.name != FAILING): + self.transition(CANCELING) + + ############################################################ + def handled_orphan(self, client, dummy_msg): + """ Handle cleanup of requests that aren't owned by any state. """ + if not client.tag in self.ctx.orphaned: + return False + if client.is_finished(): + del self.ctx.orphaned[client.tag] + return True + + def next_runnable(self): + """ Implementation of RequestQueue virtual. """ + return self.current_state.next_runnable() + + def request_progress(self, client, msg): + """ Implementation of RequestQueue virtual. """ + self.monitor_callback(self, client, msg) + if self.handled_orphan(client, msg): + return + # Don't let client time out while it's making progress. + self.ctx.set_cancel_time(client) # Hmmmm + self.current_state.request_progress(client, msg) + + def request_done(self, client, msg): + """ Implementation of RequestQueue virtual. """ + try: + self.monitor_callback(self, client, msg) + if self.handled_orphan(client, msg): + return + self.current_state.request_done(client, msg) + finally: + # Clean up all upload and download files. + delete_client_file(client) + +# REDFLAG: fix orphan handling to use special state iff it is the current state. +# REDFLAG: rationalize. writing updated state into ctx vs. +# leaving it in state instances +# REDFLAG: audit. is_usk vs. is_usk_file