""" A RequestQueueState which inserts hg bundles corresponding to edges in the Infocalypse update graph into Freenet. Copyright (C) 2009 Darrell Karbott This library is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks """ from graph import UpToDate, INSERT_SALTED_METADATA, \ FREENET_BLOCK_LEN, build_version_table, get_heads from graphutil import graph_to_string from bundlecache import BundleException from statemachine import RequestQueueState # REDFLAG: duplicated to get around circular deps. INSERTING_GRAPH = 'INSERTING_GRAPH' FAILING = 'FAILING' CANCELING = 'CANCELING' QUIESCENT = 'QUIESCENT' # Hmmmm... hard coded exit states. class InsertingBundles(RequestQueueState): """ A state to insert hg bundles corresponding to the edges in an Infocalypse update graph into Freenet. """ def __init__(self, parent, name): RequestQueueState.__init__(self, parent, name) # edge -> StatefulRequest self.pending = {} self.new_edges = [] self.required_edges = [] # HACK: # edge -> (x,y, 0) Freenet metadata bytes self.salting_cache = {} def enter(self, dummy): """ Implementation of State virtual. This checks the graph against the local repository and adds edges required to update it to the TARGET_VERSION specified in the context object. Then it starts inserting CHKS for the new edges into Freenet, doing padding / metadata salting as required. """ #require_state(from_state, QUIESCENT) assert (self.parent.ctx.get('REINSERT', 0) > 0 or (not self.parent.ctx['INSERT_URI'] is None)) assert not self.parent.ctx.graph is None graph = self.parent.ctx.graph.clone() if self.parent.params.get('DUMP_GRAPH', False): self.parent.ctx.ui_.status("--- Initial Graph ---\n") self.parent.ctx.ui_.status(graph_to_string(graph) +'\n') latest_revs = get_heads(graph) self.parent.ctx.ui_.status("Latest heads(s) in Freenet: %s\n" % ' '.join([ver[:12] for ver in latest_revs])) if not self.parent.ctx.has_versions(latest_revs): self.parent.ctx.ui_.warn("The local repository isn't up " + "to date.\n" + "Try doing an fn-pull.\n") self.parent.transition(FAILING) # Hmmm... hard coded state name return # Update graph. try: self.set_new_edges(graph) except UpToDate, err: # REDFLAG: Later, add FORCE_INSERT parameter? self.parent.ctx.ui_.warn(str(err) + '\n') # Hmmm self.parent.transition(FAILING) # Hmmm... hard coded state name return text = '' for edge in self.new_edges: text += "%i:%s\n" % (graph.get_length(edge), str(edge)) if len(text) > 0: self.parent.ctx.ui_.status('Inserting bundles:\n' + text) #print "--- Updated Graph ---" #print graph_to_string(graph) #print "--- Minimal Graph ---" #print graph_to_string(minimal_update_graph(graph,31 * 1024, # graph_to_string)) #print "---" #dump_top_key_tuple((('CHK@', 'CHK@'), # get_top_key_updates(graph))) if len(self.new_edges) == 0: raise Exception("Up to date") self.parent.ctx.graph = graph # Edge CHKs required to do metadata salting. # Most of these probably won't exist yet. self.required_edges = [] for edge in self.new_edges: assert edge[2] <= 1 if graph.insert_type(edge) == INSERT_SALTED_METADATA: # Have to wait for the primary insert to finish. self.required_edges.append((edge[0], edge[1], 0)) for edge in self.required_edges: # Will be re-added when the required metadata arrives. self.new_edges.remove((edge[0], edge[1], 1)) # REDFLAG: no longer needed? def leave(self, dummy): """ Implementation of State virtual. """ # Hmmm... for request in self.pending.values(): self.parent.runner.cancel_request(request) def reset(self): """ Implementation of State virtual. """ self.new_edges = [] self.required_edges = [] self.salting_cache = {} RequestQueueState.reset(self) def next_runnable(self): """ Implementation of RequestQueueState virtual. """ for edge in self.required_edges: if edge in self.pending: # Already running. continue # We can't count on the graph when reinserting. # Because the chks are already set. #if not self.parent.ctx.graph.has_chk(edge): # # Depends on an edge which hasn't been inserted yet. # continue if edge in self.new_edges: # Depends on an edge which hasn't been inserted yet. continue assert not edge in self.pending request = self.parent.ctx.make_splitfile_metadata_request(edge, edge) self.pending[edge] = request return request if len(self.new_edges) == 0: return None request = None try: edge = self.new_edges.pop() request = self.parent.ctx.make_edge_insert_request(edge, edge, self.salting_cache) self.pending[edge] = request except BundleException: if self.parent.ctx.get('REINSERT', 0) > 0: self.parent.ctx.ui_.warn("Couldn't create an identical " + "bundle to re-insert.\n" + "Possible causes:\n" + "0) Changes been locally commited " + "but not fn-push'd yet.\n" + "1) The repository was inserted " + "with a different version of hg.\n") self.parent.transition(FAILING) else: # Dunno what's going on. raise return request def request_done(self, client, msg): """ Implementation of RequestQueueState virtual. """ #print "TAG: ", client.tag assert client.tag in self.pending edge = client.tag del self.pending[edge] if msg[0] == 'AllData': self.salting_cache[client.tag] = msg[2] # Queue insert request now that the required data is cached. if edge in self.required_edges: assert edge[2] == 0 self.required_edges.remove(edge) self.parent.ctx.ui_.status("Re-adding put request for salted " + "metadata: %s\n" % str((edge[0], edge[1], 1))) self.new_edges.append((edge[0], edge[1], 1)) elif msg[0] == 'PutSuccessful': chk1 = msg[1]['URI'] graph = self.parent.ctx.graph if edge[2] == 1 and graph.insert_length(edge) > FREENET_BLOCK_LEN: # HACK HACK HACK # TRICKY: # Scrape the control bytes from the full request # to enable metadata handling. # REDFLAG: Do better? chk0 = graph.get_chk((edge[0], edge[1], 0)) chk0_fields = chk0.split(',') chk1_fields = chk1.split(',') #print "FIELDS: ", chk0_fields, chk1_fields # Hmmm... also no file names. assert len(chk0_fields) == len(chk1_fields) chk1 = ','.join(chk1_fields[:-1] + chk0_fields[-1:]) if self.parent.ctx.get('REINSERT', 0) < 1: graph.set_chk(edge[:2], edge[2], graph.get_length(edge), chk1) else: if chk1 != graph.get_chk(edge): self.parent.ctx.ui_.status("Bad CHK: %s %s\n" % (str(edge), chk1)) self.parent.ctx.ui_.warn("CHK for reinserted edge doesn't " + "match!\n") self.parent.transition(FAILING) else: # REDFLAG: retrying? # REDFLAG: More failure information, FAILING state? self.parent.transition(FAILING) return if (len(self.pending) == 0 and len(self.new_edges) == 0 and len(self.required_edges) == 0): self.parent.transition(INSERTING_GRAPH) def set_new_edges(self, graph): """ INTERNAL: Set the list of new edges to insert. """ # REDFLAG: Think this through. self.parent.ctx.version_table = build_version_table(graph, self.parent.ctx. repo) if self.parent.ctx.get('REINSERT', 0) == 0: self.new_edges = graph.update(self.parent.ctx.repo, self.parent.ctx.ui_, self.parent.ctx['TARGET_VERSIONS'], self.parent.ctx.bundle_cache) return # Hmmmm... later support different int values of REINSERT? self.new_edges = graph.get_top_key_edges() redundant = [] for edge in self.new_edges: if graph.is_redundant(edge): alternate_edge = (edge[0], edge[1], int(not edge[2])) if not alternate_edge in self.new_edges: redundant.append(alternate_edge) self.new_edges += redundant