infocalypse

(djk)
2009-11-15: Changed fn-info to read repo head list from Freenet an print it in

Changed fn-info to read repo head list from Freenet an print it in addition to existing info. Added helper functions for fms patch bundle submission.

diff --git a/infocalypse/__init__.py b/infocalypse/__init__.py
--- a/infocalypse/__init__.py
+++ b/infocalypse/__init__.py
@@ -501,7 +501,7 @@ def infocalypse_info(ui_, repo, **opts):
             return
 
     params['REQUEST_URI'] = request_uri
-    execute_info(ui_, params, stored_cfg)
+    execute_info(ui_, repo, params, stored_cfg)
 
 def parse_trust_args(params, opts):
     """ INTERNAL: Helper function to parse  --hash and --fmsid. """
diff --git a/infocalypse/infcmds.py b/infocalypse/infcmds.py
--- a/infocalypse/infcmds.py
+++ b/infocalypse/infcmds.py
@@ -27,21 +27,29 @@ import os
 import socket
 import time
 
+from binascii import hexlify
+
 from mercurial import util
+from mercurial import commands
 
 from fcpclient import parse_progress, is_usk, is_ssk, get_version, \
      get_usk_for_usk_version, FCPClient, is_usk_file, is_negative_usk
-
 from fcpconnection import FCPConnection, PolledSocket, CONNECTION_STATES, \
      get_code, FCPError
+from fcpmessage import PUT_FILE_DEF
+
 from requestqueue import RequestRunner
 
-from graph import UpdateGraph
-from bundlecache import BundleCache, is_writable
+from graph import UpdateGraph, get_heads, has_version
+from bundlecache import BundleCache, is_writable, make_temp_file
 from updatesm import UpdateStateMachine, QUIESCENT, FINISHING, REQUESTING_URI, \
      REQUESTING_GRAPH, REQUESTING_BUNDLES, INVERTING_URI, \
      REQUESTING_URI_4_INSERT, INSERTING_BUNDLES, INSERTING_GRAPH, \
-     INSERTING_URI, FAILING, REQUESTING_URI_4_COPY, CANCELING, CleaningUp
+     INSERTING_URI, FAILING, REQUESTING_URI_4_COPY, CANCELING, \
+     REQUIRES_GRAPH_4_HEADS, REQUESTING_GRAPH_4_HEADS, \
+     RUNNING_SINGLE_REQUEST, CleaningUp
+
+from statemachine import StatefulRequest
 
 from config import Config, DEFAULT_CFG_PATH, FORMAT_VERSION, normalize
 
@@ -79,6 +87,8 @@ MSG_TABLE = {(QUIESCENT, REQUESTING_URI_
              :"Fetching URI...",
              (REQUESTING_URI, REQUESTING_BUNDLES)
              :"Fetching bundles...",
+             (REQUIRES_GRAPH_4_HEADS, REQUESTING_GRAPH_4_HEADS)
+             :"Head list not in top key, fetching graph...",
              }
 
 class UICallbacks:
@@ -700,6 +710,28 @@ def execute_pull(ui_, repo, params, stor
     finally:
         cleanup(update_sm)
 
+
+# Note: doesn't close the socket, but its ok because cleanup() does.
+def read_freenet_heads(params, update_sm, request_uri):
+    """ Helper function reads the know heads from Freenet. """
+    update_sm.start_requesting_heads(request_uri)
+    run_until_quiescent(update_sm, params['POLL_SECS'], False)
+    if update_sm.get_state(QUIESCENT).arrived_from(((FINISHING,))):
+        if update_sm.ctx.graph is None:
+            # Heads are in the top key.
+            top_key_tuple = update_sm.get_state(REQUIRES_GRAPH_4_HEADS).\
+                            get_top_key_tuple()
+            assert top_key_tuple[1][0][5] # heads list complete
+            return top_key_tuple[1][0][2] # stored in first update
+
+        else:
+            # Have to pull the heads from the graph.
+            assert not update_sm.ctx.graph is None
+            return get_heads(update_sm.ctx.graph)
+
+    raise util.Abort("Couldn't read heads from Freenet.")
+
+
 NO_INFO_FMT = """There's no stored information about this USK.
 USK hash: %s
 """
@@ -714,9 +746,11 @@ Request URI:
 %s
 Insert URI:
 %s
+
+Reading repo state from Freenet...
 """
 
-def execute_info(ui_, params, stored_cfg):
+def execute_info(ui_, repo, params, stored_cfg):
     """ Run the info command. """
     request_uri = params['REQUEST_URI']
     if request_uri is None or not is_usk_file(request_uri):
@@ -743,6 +777,16 @@ def execute_info(ui_, params, stored_cfg
     ui_.status(INFO_FMT %
                (usk_hash, max_index or -1, trusted, request_uri, insert_uri))
 
+    update_sm = setup(ui_, repo, params, stored_cfg)
+    try:
+        ui_.status('Freenet head(s): %s\n' %
+                   ' '.join([ver[:12] for ver in
+                             read_freenet_heads(params, update_sm,
+                                                request_uri)]))
+    finally:
+        cleanup(update_sm)
+
+
 def setup_tmp_dir(ui_, tmp):
     """ INTERNAL: Setup the temp directory. """
     tmp = os.path.expanduser(tmp)
@@ -872,3 +916,90 @@ Default private key:
 
 """ % (host, port, tmp, cfg_file, default_private_key))
 
+
+def create_patch_bundle(ui_, repo, freenet_heads, out_file):
+    """ Creates an hg bundle file containing all the changesets
+        later than freenet_heads. """
+
+    freenet_heads = list(freenet_heads)
+    freenet_heads.sort()
+    # Make sure you have them all locally
+    for head in freenet_heads:
+        if not has_version(repo, head):
+            raise util.Abort("The local repository isn't up to date. " +
+                             "Run hg fn-pull.")
+
+    heads = [hexlify(head) for head in repo.heads()]
+    heads.sort()
+
+    if freenet_heads == heads:
+        raise util.Abort("All local changesets already in the repository " +
+                         "in Freenet.")
+
+    # Create a bundle using the freenet_heads as bases.
+    ui_.pushbuffer()
+    try:
+        #print 'PARENTS:', freenet_heads
+        #print 'HEADS:', heads
+        commands.bundle(ui_, repo, out_file,
+                        None, base=list(freenet_heads),
+                        rev=heads)
+    finally:
+        ui_.popbuffer()
+
+
+    # explicitly specify heads?
+
+    # insert it into freenet as a CHK
+
+# ':', '|' not in freenet base64
+def patch_msg(usk_hash, bases, heads, chk, kind='B'):
+    """ Return a machine readable patch notification suitable for posting
+        via FMS. """
+    return ':'.join((kind, usk_hash, ':'.join([base[:12] for base in bases]),
+                     '|', ':'.join([head[:12] for head in heads]), chk))
+def execute_insert_patch(ui_, repo, params, stored_cfg):
+    """ Create and hg bundle containing all changes not already in the
+        infocalypse repo in Freenet and insert it to a CHK. """
+    try:
+        update_sm = setup(ui_, repo, params, stored_cfg)
+        out_file = make_temp_file(update_sm.ctx.bundle_cache.base_dir)
+        freenet_heads = read_freenet_heads(params, update_sm,
+                                           params['REQUEST_URI'])
+
+        # This may eventually change to support other patch types.
+        create_patch_bundle(ui_, repo, freenet_heads, out_file)
+
+        # Make an FCP file insert request which will run on the
+        # on the state machine.
+        request = StatefulRequest(update_sm)
+        request.tag = 'patch_bundle_insert'
+        request.in_params.definition = PUT_FILE_DEF
+        request.in_params.fcp_params = update_sm.params.copy()
+        request.in_params.fcp_params['URI'] = 'CHK@'
+        request.in_params.file_name = out_file
+        request.in_params.send_data = True
+
+        update_sm.start_single_request(request)
+        run_until_quiescent(update_sm, params['POLL_SECS'])
+
+        freenet_heads = list(freenet_heads)
+        freenet_heads.sort()
+        heads = [hexlify(head) for head in repo.heads()]
+        heads.sort()
+
+        if update_sm.get_state(QUIESCENT).arrived_from(((FINISHING,))):
+            chk = update_sm.get_state(RUNNING_SINGLE_REQUEST).\
+                  final_msg[1]['URI']
+            ui_.status("Patch CHK:\n%s\n" %
+                       chk)
+
+            ui_.status("\nNotification:\n%s\n" %
+                       patch_msg(normalize(params['REQUEST_URI']),
+                                 freenet_heads, heads, chk) + '\n')
+
+        else:
+            ui_.status("Insert failed.\n")
+    finally:
+        # Cleans up out file.
+        cleanup(update_sm)
diff --git a/infocalypse/statemachine.py b/infocalypse/statemachine.py
--- a/infocalypse/statemachine.py
+++ b/infocalypse/statemachine.py
@@ -25,6 +25,7 @@
 
 import os
 
+from fcpconnection import SUCCESS_MSGS
 from requestqueue import QueueableRequest
 
 # Move this to fcpconnection?
@@ -136,6 +137,89 @@ class RequestQueueState(State):
         """ Handle terminal FCP messages for running requests. """
         pass
 
+class DecisionState(RequestQueueState):
+    """ Synthetic State which drives a transition to another state
+        in enter()."""
+    def __init__(self, parent, name):
+        RequestQueueState.__init__(self, parent, name)
+
+    def enter(self, from_state):
+        """ Immediately drive transition to decide_next_state(). """
+        target_state =  self.decide_next_state(from_state)
+        assert target_state != self
+        assert target_state != from_state
+        self.parent.transition(target_state)
+
+    def decide_next_state(self, dummy_from_state):
+        """ Pure virtual.
+
+            Return the state to transition into. """
+        print "ENOTIMPL:" + self.name
+        return ""
+
+    # Doesn't handle FCP requests.
+    def next_runnable(self):
+        """ Illegal. """
+        assert False
+
+    def request_progress(self, dummy_client, dummy_msg):
+        """ Illegal. """
+        assert False
+
+    def request_done(self, dummy_client, dummy_msg):
+        """ Illegal. """
+        assert False
+
+class RunningSingleRequest(RequestQueueState):
+    """ RequestQueueState to run a single StatefulRequest.
+
+        Caller MUST set request field.
+    """
+    def __init__(self, parent, name, success_state, failure_state):
+        RequestQueueState.__init__(self, parent, name)
+        self.success_state = success_state
+        self.failure_state = failure_state
+        self.request = None
+        self.queued = False
+        self.final_msg = None
+
+    def enter(self, dummy_from_state):
+        """ Implementation of State virtual. """
+        assert not self.queued
+        assert len(self.pending) == 0
+        assert not self.request is None
+        assert not self.request.tag is None
+
+    def reset(self):
+        """ Implementation of State virtual. """
+        RequestQueueState.reset(self)
+        self.request = None
+        self.queued = False
+        self.final_msg = None
+
+    def next_runnable(self):
+        """ Send request for the file once."""
+        if self.queued:
+            return None
+
+        # REDFLAG: sucky code, weird coupling
+        self.parent.ctx.set_cancel_time(self.request)
+
+        self.queued = True
+        self.pending[self.request.tag] = self.request
+        return self.request
+
+    def request_done(self, client, msg):
+        """ Implement virtual. """
+        assert self.request == client
+        del self.pending[self.request.tag]
+        self.final_msg = msg
+        if msg[0] in SUCCESS_MSGS:
+            self.parent.transition(self.success_state)
+            return
+
+        self.parent.transition(self.failure_state)
+
 class Quiescent(RequestQueueState):
     """ The quiescent state for the state machine. """
     def __init__(self, parent, name):
diff --git a/infocalypse/updatesm.py b/infocalypse/updatesm.py
--- a/infocalypse/updatesm.py
+++ b/infocalypse/updatesm.py
@@ -45,7 +45,7 @@ from topkey import bytes_to_top_key_tupl
 
 from statemachine import StatefulRequest, RequestQueueState, StateMachine, \
      Quiescent, Canceling, RetryingRequestList, CandidateRequest, \
-     require_state, delete_client_file
+     DecisionState, RunningSingleRequest, require_state, delete_client_file
 
 from insertingbundles import InsertingBundles
 from requestingbundles import RequestingBundles
@@ -631,6 +631,34 @@ class RequestingUri(StaticRequestList):
                 self.ordered[0][0].find('.R1') != -1)
         return get_usk_for_usk_version(self.ordered[0][0],
                                        max_version)
+class RequiresGraph(DecisionState):
+    """ State which decides whether the graph data is required. """
+    def __init__(self, parent, name, yes_state, no_state):
+        DecisionState.__init__(self, parent, name)
+        self.yes_state = yes_state
+        self.no_state = no_state
+        self.top_key_tuple = None
+
+    def reset(self):
+        """ Implementation of State virtual. """
+        self.top_key_tuple = None
+
+    def decide_next_state(self, from_state):
+        """ Returns yes_state if the graph is required, no_state otherwise. """
+        assert hasattr(from_state, 'get_top_key_tuple')
+        self.top_key_tuple = from_state.get_top_key_tuple()
+        if not self.top_key_tuple[1][0][5]:
+            # The top key data doesn't contain the full head list for
+            # the repository in Freenet, so we need to request the
+            # graph.
+            return self.yes_state
+        return self.no_state
+
+    def get_top_key_tuple(self):
+        """ Return the cached top key tuple. """
+        assert not self.top_key_tuple is None
+        return self.top_key_tuple
+
 
 class InvertingUri(RequestQueueState):
     """ A state to compute the request URI corresponding to a Freenet
@@ -715,9 +743,9 @@ class RequestingGraph(StaticRequestList)
 
     def enter(self, from_state):
         """ Implementation of State virtual. """
-        require_state(from_state, REQUESTING_URI_4_INSERT)
-        top_key_tuple = (self.parent.get_state(REQUESTING_URI_4_INSERT).
-                         get_top_key_tuple())
+
+        assert hasattr(from_state, "get_top_key_tuple")
+        top_key_tuple = from_state.get_top_key_tuple()
 
         #top_key_tuple = self.get_top_key_tuple() REDFLAG: remove
         #print "TOP_KEY_TUPLE", top_key_tuple
@@ -779,6 +807,16 @@ REQUESTING_URI = 'REQUESTING_URI'
 REQUESTING_BUNDLES = 'REQUESTING_BUNDLES'
 REQUESTING_URI_4_COPY = 'REQUESTING_URI_4_COPY'
 
+REQUESTING_URI_4_HEADS = 'REQUESTING_URI_4_HEADS'
+REQUIRES_GRAPH_4_HEADS  = 'REQUIRES_GRAPH'
+REQUESTING_GRAPH_4_HEADS = 'REQUESTING_GRAPH_4_HEADS'
+
+RUNNING_SINGLE_REQUEST = 'RUNNING_SINGLE_REQUEST'
+# REDFLAG: DRY out (after merging wiki stuff)
+# 1. write state_name(string) func to create state names by inserting them
+#    into globals.
+# 2. Helper func to add states to states member so you don't have to repeat
+#    the name
 class UpdateStateMachine(RequestQueue, StateMachine):
     """ A StateMachine implementaion to create, push to and pull from
         Infocalypse repositories. """
@@ -833,6 +871,26 @@ class UpdateStateMachine(RequestQueue, S
             FINISHING:CleaningUp(self, FINISHING, QUIESCENT),
 
 
+            # Requesting head info from freenet
+            REQUESTING_URI_4_HEADS:RequestingUri(self, REQUESTING_URI_4_HEADS,
+                                                 REQUIRES_GRAPH_4_HEADS,
+                                                 FAILING),
+
+            REQUIRES_GRAPH_4_HEADS:RequiresGraph(self, REQUIRES_GRAPH_4_HEADS,
+                                                 REQUESTING_GRAPH_4_HEADS,
+                                                 FINISHING),
+
+            REQUESTING_GRAPH_4_HEADS:RequestingGraph(self,
+                                                     REQUESTING_GRAPH_4_HEADS,
+                                                     FINISHING,
+                                                     FAILING),
+
+            # Run and arbitrary StatefulRequest.
+            RUNNING_SINGLE_REQUEST:RunningSingleRequest(self,
+                                                        RUNNING_SINGLE_REQUEST,
+                                                        FINISHING,
+                                                        FAILING),
+
             # Copying.
             # This doesn't verify that the graph chk(s) are fetchable.
             REQUESTING_URI_4_COPY:RequestingUri(self, REQUESTING_URI_4_COPY,
@@ -911,6 +969,26 @@ class UpdateStateMachine(RequestQueue, S
         self.ctx['REQUEST_URI'] = request_uri
         self.transition(REQUESTING_URI)
 
+    def start_requesting_heads(self, request_uri):
+        """ Start fetching the top key and graph if necessary to retrieve
+            the list of the latest heads in Freenet.
+        """
+        self.require_state(QUIESCENT)
+        self.reset()
+        self.ctx.graph = None
+        self.ctx['REQUEST_URI'] = request_uri
+        self.transition(REQUESTING_URI_4_HEADS)
+
+    def start_single_request(self, stateful_request):
+        """ Run a single StatefulRequest on the state machine.
+        """
+        assert not stateful_request is None
+        assert not stateful_request.in_params is None
+        assert not stateful_request.in_params.definition is None
+        self.require_state(QUIESCENT)
+        self.reset()
+        self.get_state(RUNNING_SINGLE_REQUEST).request = stateful_request
+        self.transition(RUNNING_SINGLE_REQUEST)
 
     def start_copying(self, from_uri, to_insert_uri):
         """ Start pulling changes from an Infocalypse repository URI