""" Implementation of commands for Infocalypse mercurial extension. Copyright (C) 2009 Darrell Karbott This library is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks """ # infcmds.py has actual implementation # REDFLAG: cleanup exception handling # by converting socket.error to IOError in fcpconnection? # REDFLAG: returning vs aborting. set system exit code. import os import socket import time from mercurial import util from fcpclient import parse_progress, is_usk, is_ssk, get_version, \ get_usk_for_usk_version, FCPClient, is_usk_file, is_negative_usk from fcpconnection import FCPConnection, PolledSocket, CONNECTION_STATES, \ get_code, FCPError from requestqueue import RequestRunner from graph import UpdateGraph, hex_version from bundlecache import BundleCache, is_writable from updatesm import UpdateStateMachine, QUIESCENT, FINISHING, REQUESTING_URI, \ REQUESTING_GRAPH, REQUESTING_BUNDLES, INVERTING_URI, \ REQUESTING_URI_4_INSERT, INSERTING_BUNDLES, INSERTING_GRAPH, \ INSERTING_URI, FAILING, REQUESTING_URI_4_COPY from config import Config, DEFAULT_CFG_PATH DEFAULT_PARAMS = { # FCP params 'MaxRetries':3, 'PriorityClass':1, 'DontCompress':True, # hg bundles are already compressed. 'Verbosity':1023, # MUST set this to get progress messages. # Non-FCP stuff 'N_CONCURRENT':4, # Maximum number of concurrent FCP requests. 'CANCEL_TIME_SECS': 15 * 60, # Bound request time. 'POLL_SECS':0.25, # Time to sleep in the polling loop. } MSG_TABLE = {(QUIESCENT, REQUESTING_URI_4_INSERT) :"Requesting previous URI...", (QUIESCENT, REQUESTING_URI_4_COPY) :"Requesting URI to copy...", (REQUESTING_URI_4_INSERT,REQUESTING_GRAPH) :"Requesting previous graph...", (INSERTING_BUNDLES,INSERTING_GRAPH) :"Inserting updated graph...", (INSERTING_GRAPH, INSERTING_URI) :"Inserting URI...", (REQUESTING_URI_4_COPY, INSERTING_URI) :"Inserting copied URI...", (QUIESCENT, REQUESTING_URI) :"Fetching URI...", (REQUESTING_URI, REQUESTING_BUNDLES) :"Fetching bundles...", } class UICallbacks: """ Display callback output with a ui instance. """ def __init__(self, ui_): self.ui_ = ui_ self.verbosity = 0 def connection_state(self, dummy, state): """ FCPConnection.state_callback function which writes to a ui. """ if self.verbosity < 2: return value = CONNECTION_STATES.get(state) if not value: value = "UNKNOWN" self.ui_.status("FCP connection [%s]\n" % value) def transition_callback(self, from_state, to_state): """ StateMachine transition callback that writes to a ui.""" if self.verbosity < 1: return if self.verbosity > 2: self.ui_.status("[%s]->[%s]\n" % (from_state.name, to_state.name)) return if to_state.name == FAILING: self.ui_.status("Cleaning up after failure...\n") return if to_state.name == FINISHING: self.ui_.status("Cleaning up...\n") return msg = MSG_TABLE.get((from_state.name, to_state.name)) if not msg is None: self.ui_.status("%s\n" % msg) def monitor_callback(self, update_sm, client, msg): """ FCP message status callback which writes to a ui. """ if self.verbosity < 2: return #prefix = update_sm.current_state.name prefix = '' if self.verbosity > 2: prefix = client.request_id()[:10] + ':' if hasattr(update_sm.current_state, 'pending') and self.verbosity > 1: prefix = ("{%i}:" % len(update_sm.runner.running)) + prefix if msg[0] == 'SimpleProgress': text = str(parse_progress(msg)) elif msg[0] == 'URIGenerated': return # shows up twice #elif msg[0] == 'PutSuccessful': # text = 'PutSuccessful:' + msg[1]['URI'] elif msg[0] == 'ProtocolError': text = 'ProtocolError:' + str(msg) elif msg[0] == 'AllData': # Don't try to print raw data. text = 'AllData: length=%s' % msg[1].get('DataLength', '???') elif msg[0].find('Failed') != -1: code = get_code(msg) or -1 redirect = '' if (code == 27 and 'RedirectURI' in msg[1] and is_usk(msg[1]['RedirectURI'])): redirect = ", redirected to version: %i" % \ get_version(msg[1]['RedirectURI']) text = "%s: code=%i%s" % (msg[0], code, redirect) else: text = msg[0] self.ui_.status("%s%s:%s\n" % (prefix, str(client.tag), text)) # REDFLAG: re-add full dumping of FCP errors at debug level? #if msg[0].find('Failed') != -1 or msg[0].find('Error') != -1: # print client.in_params.pretty() # print msg # print "FINISHED:" , client.is_finished(), #bool(client.is_finished()) # Paranoia? Just stat? I'm afraid of problems/differences w/ Windoze. # Hmmmm... SUSPECT. Abuse of mercurial ui design intent. # ISSUE: I don't just want to suppress/include output. # I use this value to keep from running code which isn't # required. def get_verbosity(ui_): """ INTERNAL: Get the verbosity levl from the state of a ui. """ if ui_.debugflag: return 5 # Graph, candidates, canonical paths elif ui_.verbose: return 2 # FCP message status elif ui_.quiet: # Hmmm... still not 0 output return 0 else: return 1 # No FCP message status def get_config_info(ui_, opts): """ INTERNAL: Read configuration info out of the config file and or command line options. """ cfg = Config.from_ui(ui_) if opts.get('fcphost') != '': cfg.defaults['HOST'] = opts['fcphost'] if opts.get('fcpport') != 0: cfg.defaults['PORT'] = opts['fcpport'] params = DEFAULT_PARAMS.copy() params['FCP_HOST'] = cfg.defaults['HOST'] params['FCP_PORT'] = cfg.defaults['PORT'] params['TMP_DIR'] = cfg.defaults['TMP_DIR'] params['VERBOSITY'] = get_verbosity(ui_) params['AGGRESSIVE_SEARCH'] = bool(opts.get('aggressive')) return (params, cfg) # Hmmmm USK@/style_keys/0 def check_uri(ui_, uri): """ INTERNAL: Abort if uri is not supported. """ if uri is None: return if is_usk(uri): if not is_usk_file(uri): ui_.status("Only file USKs are allowed." + "\nMake sure the URI ends with '/<number>' " + "with no trailing '/'.\n") raise util.Abort("Non-file USK %s\n" % uri) # Just fix it instead of doing B&H? if is_negative_usk(uri): ui_.status("Negative USK index values are not allowed." + "\nUse --aggressive instead. \n") raise util.Abort("Negative USK %s\n" % uri) # REDFLAG: remove store_cfg def setup(ui_, repo, params, stored_cfg): """ INTERNAL: Setup to run an Infocalypse extension command. """ # REDFLAG: choose another name. Confusion w/ fcp param # REDFLAG: add an hg param and get rid of this line. #params['VERBOSITY'] = 1 check_uri(ui_, params.get('INSERT_URI')) check_uri(ui_, params.get('REQUEST_URI')) if not is_writable(os.path.expanduser(stored_cfg.defaults['TMP_DIR'])): raise util.Abort("Can't write to temp dir: %s\n" % stored_cfg.defaults['TMP_DIR']) verbosity = params.get('VERBOSITY', 1) if verbosity > 2 and params.get('DUMP_GRAPH', None) is None: params['DUMP_GRAPH'] = True if verbosity > 3 and params.get('DUMP_UPDATE_EDGES', None) is None: params['DUMP_UPDATE_EDGES'] = True if verbosity > 4 and params.get('DUMP_CANONICAL_PATHS', None) is None: params['DUMP_CANONICAL_PATHS'] = True if verbosity > 4 and params.get('DUMP_URIS', None) is None: params['DUMP_URIS'] = True if verbosity > 4 and params.get('DUMP_TOP_KEY', None) is None: params['DUMP_TOP_KEY'] = True callbacks = UICallbacks(ui_) callbacks.verbosity = verbosity cache = BundleCache(repo, ui_, params['TMP_DIR']) try: async_socket = PolledSocket(params['FCP_HOST'], params['FCP_PORT']) connection = FCPConnection(async_socket, True, callbacks.connection_state) except socket.error, err: # Not an IOError until 2.6. ui_.warn("Connection to FCP server [%s:%i] failed.\n" % (params['FCP_HOST'], params['FCP_PORT'])) raise err except IOError, err: ui_.warn("Connection to FCP server [%s:%i] failed.\n" % (params['FCP_HOST'], params['FCP_PORT'])) raise err runner = RequestRunner(connection, params['N_CONCURRENT']) update_sm = UpdateStateMachine(runner, repo, ui_, cache) update_sm.params = params.copy() update_sm.transition_callback = callbacks.transition_callback update_sm.monitor_callback = callbacks.monitor_callback return update_sm def run_until_quiescent(update_sm, poll_secs, close_socket=True): """ Run the state machine until it reaches the QUIESCENT state. """ runner = update_sm.runner assert not runner is None connection = runner.connection assert not connection is None raised = True try: while update_sm.current_state.name != QUIESCENT: # Poll the FCP Connection. try: connection.socket.poll() # Indirectly nudge the state machine. update_sm.runner.kick() except socket.error: # Not an IOError until 2.6. update_sm.ui_.warn("Exiting because of an error on " + "the FCP socket.\n") raise except IOError: # REDLAG: better message. update_sm.ui_.warn("Exiting because of an IO error.\n") raise # Rest :-) time.sleep(poll_secs) raised = False finally: if raised or close_socket: update_sm.runner.connection.close() def cleanup(update_sm): """ INTERNAL: Cleanup after running an Infocalypse command. """ if update_sm is None: return if not update_sm.runner is None: update_sm.runner.connection.close() update_sm.ctx.bundle_cache.remove_files() # REDFLAG: better name. 0) inverts 1) updates indices from cached state. # 2) key substitutions. def handle_key_inversion(ui_, update_sm, params, stored_cfg): """ INTERNAL: Handle inverting/updating keys before running a command.""" insert_uri = params.get('INSERT_URI') if not insert_uri is None and insert_uri.startswith('USK@/'): insert_uri = ('USK' + stored_cfg.defaults['DEFAULT_PRIVATE_KEY'][3:] + insert_uri[5:]) ui_.status("Filled in the insert URI using the default private key.\n") if insert_uri is None or not (is_usk(insert_uri) or is_ssk(insert_uri)): return (params.get('REQUEST_URI'), False) update_sm.start_inverting(insert_uri) run_until_quiescent(update_sm, params['POLL_SECS'], False) if update_sm.get_state(QUIESCENT).prev_state != INVERTING_URI: raise util.Abort("Couldn't invert private key:\n%s" % insert_uri) inverted_uri = update_sm.get_state(INVERTING_URI).get_request_uri() params['INVERTED_INSERT_URI'] = inverted_uri if is_usk(insert_uri): # Determine the highest known index for the insert uri. max_index = max(stored_cfg.get_index(inverted_uri), get_version(insert_uri)) # Update the insert uri to the latest known version. params['INSERT_URI'] = get_usk_for_usk_version(insert_uri, max_index) # Update the inverted insert URI to the latest known version. params['INVERTED_INSERT_URI'] = get_usk_for_usk_version( inverted_uri, max_index) # NO COUPLING # Update the index of the request uri using the stored config. request_uri = params.get('REQUEST_URI') if not request_uri is None: max_index = max(stored_cfg.get_index(request_uri), get_version(request_uri)) request_uri = get_usk_for_usk_version(request_uri, max_index) # Skip key inversion if we already inverted the insert_uri. is_keypair = False if (request_uri is None and not params.get('INVERTED_INSERT_URI') is None): request_uri = params['INVERTED_INSERT_URI'] is_keypair = True return (request_uri, is_keypair) def handle_updating_config(repo, update_sm, params, stored_cfg, is_pulling=False): """ INTERNAL: Write updates into the config file IFF the previous command succeeded. """ if not is_pulling: if not update_sm.get_state(QUIESCENT).arrived_from(((FINISHING,))): return if not is_usk_file(params['INSERT_URI']): return inverted_uri = params['INVERTED_INSERT_URI'] # Cache the request_uri - insert_uri mapping. stored_cfg.set_insert_uri(inverted_uri, update_sm.ctx['INSERT_URI']) # Cache the updated index for the insert. version = get_version(update_sm.ctx['INSERT_URI']) stored_cfg.update_index(inverted_uri, version) stored_cfg.update_dir(repo.root, inverted_uri) # Hmmm... if we wanted to be clever we could update the request # uri too when it doesn't match the insert uri. Ok for now. # Only for usks and only on success. #print "UPDATED STORED CONFIG(0)" Config.to_file(stored_cfg) else: # Only finishing required. same. REDFLAG: look at this again if not update_sm.get_state(QUIESCENT).arrived_from(( REQUESTING_BUNDLES, FINISHING)): return if not is_usk(params['REQUEST_URI']): return state = update_sm.get_state(REQUESTING_URI) updated_uri = state.get_latest_uri() version = get_version(updated_uri) stored_cfg.update_index(updated_uri, version) stored_cfg.update_dir(repo.root, updated_uri) #print "UPDATED STORED CONFIG(1)" Config.to_file(stored_cfg) def is_redundant(uri): """ Return True if uri is a file USK and ends in '.R1', False otherwise. """ if not is_usk_file(uri): return '' fields = uri.split('/') if not fields[-2].endswith('.R1'): return '' return 'Redundant ' ############################################################ # User feedback? success, failure? def execute_create(ui_, repo, params, stored_cfg): """ Run the create command. """ update_sm = None try: update_sm = setup(ui_, repo, params, stored_cfg) # REDFLAG: Do better. # This call is not necessary, but I do it to set # 'INVERTED_INSERT_URI'. Write code to fish that # out of INSERTING_URI instead. handle_key_inversion(ui_, update_sm, params, stored_cfg) ui_.status("%sInsert URI:\n%s\n" % (is_redundant(params['INSERT_URI']), params['INSERT_URI'])) #ui_.status("Current tip: %s\n" % hex_version(repo)[:12]) update_sm.start_inserting(UpdateGraph(), params.get('TO_VERSION', 'tip'), params['INSERT_URI']) run_until_quiescent(update_sm, params['POLL_SECS']) if update_sm.get_state(QUIESCENT).arrived_from(((FINISHING,))): ui_.status("Inserted to:\n%s\n" % '\n'.join(update_sm.get_state(INSERTING_URI). get_request_uris())) else: ui_.status("Create failed.\n") handle_updating_config(repo, update_sm, params, stored_cfg) finally: cleanup(update_sm) # REDFLAG: LATER: make this work without a repo? def execute_copy(ui_, repo, params, stored_cfg): """ Run the copy command. """ update_sm = None try: update_sm = setup(ui_, repo, params, stored_cfg) handle_key_inversion(ui_, update_sm, params, stored_cfg) ui_.status("%sInsert URI:\n%s\n" % (is_redundant(params['INSERT_URI']), params['INSERT_URI'])) update_sm.start_copying(params['REQUEST_URI'], params['INSERT_URI']) run_until_quiescent(update_sm, params['POLL_SECS']) if update_sm.get_state(QUIESCENT).arrived_from(((FINISHING,))): ui_.status("Copied to:\n%s\n" % '\n'.join(update_sm.get_state(INSERTING_URI). get_request_uris())) else: ui_.status("Copy failed.\n") handle_updating_config(repo, update_sm, params, stored_cfg) finally: cleanup(update_sm) # REDFLAG: move into fcpclient? #def usks_equal(usk_a, usk_b): # assert is_usk(usk_a) and and is_usk(usk_b) # return (get_usk_for_usk_version(usk_a, 0) == # get_usk_for_usk_version(usk_b, 0)) # REDFLAG: reading from on uri and inserting to another isn't # fully working yet def execute_push(ui_, repo, params, stored_cfg): """ Run the push command. """ update_sm = None try: update_sm = setup(ui_, repo, params, stored_cfg) request_uri, is_keypair = handle_key_inversion(ui_, update_sm, params, stored_cfg) ui_.status("%sInsert URI:\n%s\n" % (is_redundant(params['INSERT_URI']), params['INSERT_URI'])) #ui_.status("Current tip: %s\n" % hex_version(repo)[:12]) update_sm.start_pushing(params['INSERT_URI'], params.get('TO_VERSION', 'tip'), request_uri, # None is allowed is_keypair) run_until_quiescent(update_sm, params['POLL_SECS']) if update_sm.get_state(QUIESCENT).arrived_from(((FINISHING,))): ui_.status("Inserted to:\n%s\n" % '\n'.join(update_sm.get_state(INSERTING_URI). get_request_uris())) else: ui_.status("Push failed.\n") handle_updating_config(repo, update_sm, params, stored_cfg) finally: cleanup(update_sm) def execute_pull(ui_, repo, params, stored_cfg): """ Run the pull command. """ update_sm = None try: update_sm = setup(ui_, repo, params, stored_cfg) ui_.status("%sRequest URI:\n%s\n" % (is_redundant(params[ 'REQUEST_URI']), params['REQUEST_URI'])) #ui_.status("Current tip: %s\n" % hex_version(repo)[:12]) update_sm.start_pulling(params['REQUEST_URI']) run_until_quiescent(update_sm, params['POLL_SECS']) if update_sm.get_state(QUIESCENT).arrived_from(((FINISHING,))): ui_.status("Pulled from:\n%s\n" % update_sm.get_state('REQUESTING_URI'). get_latest_uri()) ui_.status("New tip: %s\n" % hex_version(repo)[:12]) else: ui_.status("Pull failed.\n") handle_updating_config(repo, update_sm, params, stored_cfg, True) finally: cleanup(update_sm) def setup_tmp_dir(ui_, tmp): """ INTERNAL: Setup the temp directory. """ tmp = os.path.expanduser(tmp) # Create the tmp dir if nescessary. if not os.path.exists(tmp): try: os.makedirs(tmp) except os.error, err: # Will exit below. ui_.warn(err) return tmp MSG_HGRC_SET = \ """Read the config file name from the: [infocalypse] cfg_file = <filename> section of your .hgrc (or mercurial.ini) file. cfg_file: %s """ MSG_CFG_EXISTS = \ """%s already exists! Move it out of the way if you really want to re-run setup. Consider before deleting it. It may contain the *only copy* of your private key. """ def execute_setup(ui_, host, port, tmp, cfg_file = None): """ Run the setup command. """ def connection_failure(msg): """ INTERNAL: Display a warning string. """ ui_.warn(msg) ui_.warn("It looks like your FCP host or port might be wrong.\n") ui_.warn("Set them with --fcphost and/or --fcpport and try again.\n") # Fix defaults. if host == '': host = '127.0.0.1' if port == 0: port = 9481 if cfg_file is None: cfg_file = os.path.expanduser(DEFAULT_CFG_PATH) existing_name = ui_.config('infocalypse', 'cfg_file', None) if not existing_name is None: existing_name = os.path.expanduser(existing_name) ui_.status(MSG_HGRC_SET % existing_name) cfg_file = existing_name if os.path.exists(cfg_file): ui_.status(MSG_CFG_EXISTS % cfg_file) raise util.Abort("Refusing to modify existing configuration.") tmp = setup_tmp_dir(ui_, tmp) if not is_writable(tmp): ui_.warn("Can't write to temp dir: %s\n" % tmp) return # Test FCP connection. timeout_secs = 20 connection = None default_private_key = None try: ui_.status("Testing FCP connection [%s:%i]...\n" % (host, port)) connection = FCPConnection(PolledSocket(host, port)) started = time.time() while (not connection.is_connected() and time.time() - started < timeout_secs): connection.socket.poll() time.sleep(.25) if not connection.is_connected(): connection_failure(("\nGave up after waiting %i secs for an " + "FCP NodeHello.\n") % timeout_secs) return ui_.status("Looks good.\nGenerating a default private key...\n") # Hmmm... this waits on a socket. Will an ioerror cause an abort? # Lazy, but I've never seen this call fail except for IO reasons. client = FCPClient(connection) client.message_callback = lambda x, y:None # Disable chatty default. default_private_key = client.generate_ssk()[1]['InsertURI'] except FCPError: # Protocol error. connection_failure("\nMaybe that's not an FCP server?\n") return except socket.error: # Not an IOError until 2.6. # Horked. connection_failure("\nSocket level error.\n") return except IOError: # Horked. connection_failure("\nSocket level error.\n") return cfg = Config() cfg.defaults['HOST'] = host cfg.defaults['PORT'] = port cfg.defaults['TMP_DIR'] = tmp cfg.defaults['DEFAULT_PRIVATE_KEY'] = default_private_key Config.to_file(cfg, cfg_file) ui_.status("""\nFinished setting configuration. FCP host: %s FCP port: %i Temp dir: %s cfg file: %s Default private key: %s """ % (host, port, tmp, cfg_file, default_private_key))