""" A RequestQueueState for which requests the hg bundle CHKS
required to update a repository.
Copyright (C) 2009 Darrell Karbott
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public
License as published by the Free Software Foundation; either
version 2.0 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks
"""
# REDFLAG: reevaluate on failure?
import os
import random # Hmmm... good enough?
from fcpmessage import GET_DEF
from bundlecache import make_temp_file
from graph import latest_index, \
FREENET_BLOCK_LEN, chk_to_edge_triple_map, \
dump_paths, MAX_PATH_LEN, get_heads, canonical_path_itr
from graphutil import parse_graph
from choose import get_update_edges, dump_update_edges, SaltingState
from statemachine import RetryingRequestList, CandidateRequest
from chk import clear_control_bytes
# REDFLAG: Make sure that you are copying lists. eg. updates
# FUNCTIONAL REQUIREMENTS:
# 0) Update as fast as possible
# 1) Single block fetch alternate keys.
# 2) Choose between primary and alternate keys "fairly"
# 3) transition from no graph to graph case.
# 4) deal with padding hacks
# 5) Optionally disable alternate single block fetching?
# ?6) serialize? Easier to write from scratch?
# No graph case
# CHKs ARE EDGES! dont need to store extra state
# can bootstrap from a chk_list -> edges (slowly)
#
# Can unwind padding hack w/o graph
# len < 32K done
# len == 32K re-request DONE
# What this does:
# 0) Fetches graph(s)
# 1) Fetches early bundles in parallel with graphs
# 2) Fixes up pending requests to graph edges when the graph arrives
# 3) Handles metadata salting for bundle requests
# 4) Keeps track of what requests are required to update and request them.
# a candidate is a list:
# [CHK, tries, single_block, edge_triple, update_data, msg, is_graph_request]
class RequestingBundles(RetryingRequestList):
""" A RequestQueueState for which requests the hg bundle CHKS
required to update a repository. """
def __init__(self, parent, name, success_state, failure_state):
RetryingRequestList.__init__(self, parent, name)
self.success_state = success_state
self.failure_state = failure_state
self.top_key_tuple = None # FNA sskdata
self.freenet_heads = None
############################################################
# State implementation
############################################################
def enter(self, from_state):
""" Implementation of State virtual. """
if hasattr(from_state, 'get_top_key_tuple'):
self._initialize(from_state.get_top_key_tuple())
return
self._initialize()
#self.dump()
# REDFLAG: delete this?
def leave(self, to_state):
""" Implementation of State virtual. """
pass
def reset(self):
""" Implementation of State virtual. """
#print "reset -- pending: ", len(self.pending)
self.top_key_tuple = None
RetryingRequestList.reset(self)
############################################################
# Implementation of RetryingRequestList virtuals
############################################################
def candidate_done(self, client, msg, candidate):
""" Implementation of RetryingRequestList virtual. """
# Hmmmm... special case hack code to handle graph.
if not self._graph_request_done(client, msg, candidate):
if msg[0] == 'AllData':
self._handle_success(client, msg, candidate)
else:
self._handle_failure(client, msg, candidate)
# Catch state machine stalls.
if (self.parent.current_state == self and
self.is_stalled()):
self.parent.ctx.ui_.warn("Giving up because the state "
+ "machine stalled.\n")
self.parent.transition(self.failure_state)
# DONT add to pending. Base clase does that.
def make_request(self, candidate):
""" Implementation of RetryingRequestList virtual. """
#print "CANDIDATE: ", candidate
assert len(candidate) >= 7
candidate[1] += 1 # Keep track of the number of times it has been tried
# tag == edge, but what if we don't have an edge yet?
request = CandidateRequest(self.parent)
request.in_params.fcp_params = self.parent.params.copy()
uri = candidate[0]
if candidate[2]:
uri = clear_control_bytes(uri)
request.in_params.fcp_params['URI'] = uri
request.in_params.definition = GET_DEF
request.in_params.file_name = (
make_temp_file(self.parent.ctx.bundle_cache.base_dir))
self.parent.ctx.set_cancel_time(request)
# Set tag
if not candidate[3] is None:
request.tag = candidate[3] # Edge
else:
# REDFLAG: Do better!
# Some random digit string.
request.tag = request.in_params.file_name[-12:]
# Set candidate
request.candidate = candidate
#print "make_request --", request.tag, candidate[0]
# Tags must be unique or we will loose requests!
assert not request.tag in self.pending
#request.in_params.fcp_params['MaxSize'] = ???
return request
############################################################
# DEALING: With partial heads, partial bases?
# REDFLAG: deal with optional request serialization?
# REDFLAG: Move
# ASSUMPTION: Keys are in descenting order of latest_rev.
# ASSUMPTION: Keys are in order of descending parent rev.
#
# Returns index of last update queued.
# Does gobbledygook to make single block requesting work.
#
# REDFLAG: candymachine? look at start_index / last_queued
def _queue_from_updates(self, candidate_list,
start_index, one_full, only_latest=False):
""" INTERNAL: Queues an hg bundle CHK request from the
top key data. """
updates = self.top_key_tuple[1]
last_queued = -1
for index, update in enumerate(updates):
if index < start_index:
# REDFLAG: do better?
continue
if not update[4] or not update[5]:
# Don't attempt to queue updates if we don't know
# full parent/head info.
# REDFLAG: remove test code
print "_queue_from_updates -- bailing out", update[4], update[5]
break
if only_latest and update[0] > 5 * FREENET_BLOCK_LEN:
# Short circuit (-1, top_index) rollup case.
break
if only_latest and update[2] != updates[0][2]:
# Only full updates.
break
if not self.parent.ctx.has_versions(update[1]):
# Only updates we can pull.
if only_latest:
# Don't want big bundles from the canonical path.
break
else:
continue
if self.parent.ctx.has_versions(update[2]):
# Only updates we need.
continue
chks = list(update[3][:])
full_chk = random.choice(chks)
chks.remove(full_chk)
candidate = [full_chk, 0, not one_full, None, update, None, False]
one_full = True
candidate_list.insert(0, candidate)
for chk in chks:
candidate = [chk, 0, True, None, update, None, False]
candidate_list.insert(0, candidate)
last_queued = index
if index > 1:
break
return last_queued
def _handle_testing_hacks(self):
""" INTERNAL: Helper function to implement TEST_DISABLE_UPDATES
and TEST_DISABLE_GRAPH testing params. """
if self.top_key_tuple is None:
return
if self.parent.params.get("TEST_DISABLE_UPDATES", False):
updates = list(self.top_key_tuple[1])
for index in range(0, len(updates)):
update = list(updates[index])
update[4] = False
update[5] = False
updates[index] = tuple(update)
top = list(self.top_key_tuple)
top[1] = tuple(updates)
self.top_key_tuple = tuple(top)
self.parent.ctx.ui_.warn("TEST_DISABLE_UPDATES == True\n"
+ "Disabled updating w/o graph\n")
if self.parent.params.get("TEST_DISABLE_GRAPH", False):
top = list(self.top_key_tuple)
# REDFLAG: Fix post 1208
# Using bad keys is a more realistic test but there's
# an FCP bug in 1208 that kills the connection on
# cancel. Go back to this when 1209 comes out.
top[0] = ('CHK@badroutingkeyA55JblbGup0yNSpoDJgVPnL8E5WXoc,'
+'KZ6azHOwEm4ga6dLy6UfbdSzVhJEz3OvIbSS4o5BMKU,AAIC--8',
'CHK@badroutingkeyB55JblbGup0yNSpoDJgVPnL8E5WXoc,'
+'KZ6azHOwEm4ga6dLy6UfbdSzVhJEz3OvIbSS4o5BMKU,AAIC--8',
)
top[0] = ()
self.top_key_tuple = tuple(top)
self.parent.ctx.ui_.warn("TEST_DISABLE_GRAPH == True\n"
+ "Disabled graph by removing graph "
+ "chks.\n")
# Hack special case code to add the graph.
def _initialize(self, top_key_tuple=None):
""" INTERNAL: Initialize.
If the graph isn't available yet kick off
requests for it and also a request for a full
update if there's one available in the top key data.
If the graph is available, use it to determine which
keys to request next.
"""
self.top_key_tuple = top_key_tuple
self._handle_testing_hacks()
############################################################
# Hack used to test graph request failure.
#bad_chk = ('CHK@badroutingkeyA55JblbGup0yNSpoDJgVPnL8E5WXoc,'
# +'KZ6azHOwEm4ga6dLy6UfbdSzVhJEz3OvIbSS4o5BMKU,AAIC--8')
#bad_update = list(top_key_tuple[1][0])
#bad_update[3] = (bad_chk, )
#print "old:", top_key_tuple
#self.top_key_tuple = ((bad_chk,), (bad_update, ))
#print "new:", self.top_key_tuple
############################################################
# If we don't have the graph, request it, and update
# from the data in the top key.
if self.parent.ctx.graph is None:
if self.top_key_tuple is None:
raise Exception("No top key data.")
updates = self.top_key_tuple[1]
if updates[0][5]:
self.freenet_heads = updates[0][2]
self.parent.ctx.ui_.status('Freenet heads: %s\n' %
' '.join([ver[:12] for ver in
updates[0][2]]))
if self.parent.ctx.has_versions(updates[0][2]):
self.parent.ctx.ui_.warn("All remote heads are already "
+ "in the local repo.\n")
self.parent.transition(self.success_state)
return
# INTENT: Improve throughput for most common update case.
# If it is possible to update fully in one fetch, queue the
# (possibly redundant) key(s) BEFORE graph requests.
latest_queued = self._queue_from_updates(self.
current_candidates,
-1, False, True)
if latest_queued != -1:
self.parent.ctx.ui_.status("Full update is possible in a "
+ "single FCP fetch. :-)\n")
else:
self.parent.ctx.ui_.warn("Couldn't read all Freenet heads from "
+ "top key.\n"
+ "Dunno if you're up to date :-(\n"
+ "Waiting for graph...\n")
if len(self.top_key_tuple[0]) == 0:
self.parent.ctx.ui_.warn("No graph CHKs in top key! "
+ "Giving up...\n")
self.parent.transition(self.failure_state)
return
# Kick off the fetch(es) for the full update graph.
# REDFLAG: make a parameter
parallel_graph_fetch = True
chks = list(self.top_key_tuple[0][:])
random.shuffle(chks)
#chks = [] # Hack to test bootstrapping w/o graph
for chk in chks:
candidate = [chk, 0, False, None, None, None, True]
# insert not append, because this should run AFTER
# initial single fetch update queued above.
self.current_candidates.insert(0, candidate)
if not parallel_graph_fetch:
break
if self.freenet_heads is None:
# Need to wait for the graph.
return
# Queue remaining fetchable keys in the NEXT pass.
# INTENT:
# The graph might have a better update path. So we don't try these
# until we have tried to get the graph.
self._queue_from_updates(self.next_candidates, latest_queued + 1,
latest_queued == -1, False)
return
# Otherwise, use the graph to figure out what keys we need.
self._reevaluate()
# REDFLAG: Move
# Set the graph and fixup all candidates with real edges.
def _set_graph(self, graph):
""" INTERNAL: Set the graph and fixup any pending CHK edge
requests with their edges. """
def fixup(edges, candidate_list):
""" INTERNAL : Helper fixes up CHK->edges. """
for candidate in candidate_list:
edge = edges[candidate[0]]
candidate[3] = edge
candidate[4] = None
edges = chk_to_edge_triple_map(graph)
skip_chks = set([]) # REDFLAG: remove!
for request in self.pending.values():
candidate = request.candidate
if candidate[6]:
continue
edge = edges[candidate[0]]
candidate[3] = edge
candidate[4] = None
#print "_set_graph -- fixed up: ", request.tag, edge
# REDFLAG: why am I keeping state in two places?
old_tag = request.tag
request.tag = edge
del self.pending[old_tag]
self.pending[request.tag] = request
skip_chks.add(candidate[0])
#print "pending.keys(): ", self.pending.keys()
fixup(edges, self.current_candidates)
fixup(edges, self.next_candidates)
fixup(edges, self.finished_candidates)
assert not self.top_key_tuple is None
if self.parent.params.get('DUMP_TOP_KEY', False):
text = "Fixed up top key CHKs:\n"
for update in self.top_key_tuple[1]:
for chk in update[3]:
if chk in edges:
text += " " + str(edges[chk]) + ":" + chk + "\n"
else:
text += " BAD TOP KEY DATA!" + ":" + chk + "\n"
self.parent.ctx.ui_.status(text)
all_heads = get_heads(graph)
assert (self.freenet_heads is None or
self.freenet_heads == all_heads)
self.freenet_heads = all_heads
self.parent.ctx.graph = graph
self.rep_invariant()
# REDFLAG: remove testing code
#kill_prob = 0.00
#print "BREAKING EDGES: Pkill==", kill_prob
#print skip_chks
#break_edges(graph, kill_prob, skip_chks)
# "fix" (i.e. break) pending good chks.
# REDFLAG: comment this out too?
for candidate in self.current_candidates + self.next_candidates:
if candidate[6]:
continue
edge = candidate[3]
assert not edge is None
if graph.get_chk(edge).find("badrouting") != -1:
candidate[0] = graph.get_chk(edge)
#self.dump()
self.rep_invariant()
self.parent.ctx.ui_.status("Got graph. Latest graph index: %i\n" %
graph.latest_index)
def _handle_graph_failure(self, candidate):
""" INTERNAL: Handle failed FCP requests for the graph. """
max_retries = self.parent.params.get('MAX_RETRIES', 1)
if candidate[1] < max_retries + 1:
#print "_should_retry -- returned False"
#return False
# Append retries immediately. Hmmmm...
self.current_candidates.append(candidate)
return
self.finished_candidates.append(candidate)
if self.is_stalled():
# BUG: Kind of. We can update w/o the graph without ever reporting
# that we couldn't get the graph.
self.parent.ctx.ui_.warn("Couldn't read graph from Freenet!\n")
self.parent.transition(self.failure_state)
def _handle_dump_canonical_paths(self, graph):
""" INTERNAL: Dump the top 20 canonical paths. """
if not self.parent.params.get('DUMP_CANONICAL_PATHS', False):
return
paths = canonical_path_itr(graph, 0, graph.latest_index,
MAX_PATH_LEN)
first_paths = []
# REDFLAG: Magick number
while len(first_paths) < 20:
try:
first_paths.append(paths.next())
except StopIteration:
break
dump_paths(graph,
first_paths,
"Canonical paths")
def _graph_request_done(self, client, msg, candidate):
""" INTERNAL: Handle requests for the graph. """
#print "CANDIDATE:", candidate
#print "_graph_request_done -- ", candidate[6]
if not candidate[6]:
return False
if not self.parent.ctx.graph is None:
self.finished_candidates.append(candidate)
return True
if msg[0] == 'AllData':
in_file = open(client.in_params.file_name, 'rb')
try:
data = in_file.read()
# REDFLAG: log graph?
if self.parent.params.get('DUMP_GRAPH', False):
self.parent.ctx.ui_.status("--- Raw Graph Data ---\n")
self.parent.ctx.ui_.status(data)
self.parent.ctx.ui_.status("\n---\n")
graph = parse_graph(data)
self._handle_dump_canonical_paths(graph)
self._set_graph(graph)
self._reevaluate()
finally:
in_file.close()
self.finished_candidates.append(candidate)
else:
if not self.top_key_tuple is None:
pending, current, next, finished = self._known_chks()
all_chks = pending.union(current).union(next).union(finished)
for chk in self.top_key_tuple[0]:
if not chk in all_chks and chk != candidate[0]:
# REDFLAG: Test this code path.
# Queue the other graph chk.
candidate = [chk, 0, False, None, None, None, True]
# Run next!
#print "QUEUEING OTHER GRAPH CHK"
# append retries immediately. Hmmm...
self.current_candidates.append(candidate)
break
# Careful, can drive state transition.
self._handle_graph_failure(candidate)
return True
# REDFLAG: move
# REDFLAG: Too slow?
def _force_single_block(self, edge):
""" INTERNAL: Make sure there is only one non-single-block request
running for a redundant edge. """
for candidate in self.current_candidates:
if candidate[3] == edge and not candidate[2]:
candidate[2] = True
# break. paranoia?
for candidate in self.next_candidates:
if candidate[3] == edge and not candidate[2]:
candidate[2] = True
# break. paranoia?
# REDFLAG: for now, do parallel multiblock fetches.
def _handled_multiblock_case(self, candidate):
""" INTERNAL: Handle requeueing full fetches when we don't have
the graph yet. """
if (candidate[2] and self._multiple_block(candidate) and
self.parent.ctx.graph is None):
assert not candidate[4] is None
update = candidate[4]
# Compare without control bytes, which were cleared.
target = candidate[0].split(',')[:-1]
for chk in update[3]:
if chk.split(',')[:-1] == target:
# Reset the CHK because the control bytes were zorched.
candidate[0] = chk
#candidate[1] += 1
candidate[2] = False
candidate[5] = None # Reset!
self.current_candidates.insert(0, candidate)
return True
assert False
def _handle_success(self, client, msg, candidate):
""" INTERNAL: Handle successful FCP requests. """
#print "_handle_success -- ", candidate
if not self._needs_bundle(candidate):
#print "_handle_success -- doesn't need bundle."
candidate[5] = msg
self.finished_candidates.append(candidate)
return
if self._handled_multiblock_case(candidate):
return
if (candidate[2] and self._multiple_block(candidate)):
#print "_handle_success -- multiple block..."
# Cases:
# 0) No redundant edge exists, -> requeue
# 1) Redundant edge request running, single block -> requeue
# 2) Redundant edge request running, full -> finish
# 3) Redundant edge request queued, full -> flip to single_block
# 4) Redundant edge request queued, single_block -> nop
edge = candidate[3]
redundant_edge = (edge[0], edge[1], int(not edge[2]))
if (not self.parent.ctx.graph is None and
self.parent.ctx.graph.is_redundant(edge)):
for value in self.pending_candidates():
if (value[3] == redundant_edge and
not value[2]):
# Bail out because there's already a request for that
# data running.
candidate[5] = msg
# Make sure the candidate will re-run if the running
# request fails.
candidate[1] = 0
self.next_candidates.insert(0, candidate)
#print "_handle_success -- already another running."
self.parent.ctx.ui_.status(("Other salted key is "
+ "running. Didn't "
+ "requeue: %s\n")
% str(candidate[3]))
return
self.parent.ctx.ui_.status("Requeuing full download for: %s\n"
% str(candidate[3]))
# Reset the CHK because the control bytes were zorched.
candidate[0] = self.parent.ctx.graph.get_chk(candidate[3])
#candidate[1] += 1
candidate[2] = False
candidate[5] = None # Reset!
self.rep_invariant()
self.current_candidates.insert(0, candidate)
self._force_single_block(redundant_edge)
self.rep_invariant()
return
#print "_handle_success -- bottom"
candidate[5] = msg
self.finished_candidates.append(candidate)
#print "_handle_success -- pulling!"
name = str(candidate[3])
if name == 'None':
name = "%s:%s" % (','.join([ver[:12] for ver in candidate[4][1]]),
','.join([ver[:12] for ver in candidate[4][2]]))
#print "Trying to pull: ", name
self._pull_bundle(client, msg, candidate)
#print "_handle_success -- pulled bundle ", candidate[3]
self.parent.ctx.ui_.status("Pulled bundle: %s\n" % name)
if self.parent.ctx.has_versions(self.freenet_heads):
# Done and done!
#print "SUCCEEDED!"
self.parent.transition(self.success_state)
return
#print "_reevaluate -- called"
self._reevaluate()
#print "_reevaluate -- exited"
# REDFLAG: move
def _should_retry(self, candidate):
""" Return True if the FCP request for the candidate should
be retried, False otherwise. """
max_retries = self.parent.params.get('MAX_RETRIES', 0)
if candidate[1] - 1 >= max_retries:
#print "_should_retry -- returned False"
return False
if not self._needs_bundle(candidate) and not candidate[6]:
return False
return True
def _queued_redundant_edge(self, candidate):
""" INTERNAL: Return True if a redundant request was queued for
the candidate. """
edge = candidate[3]
if edge is None or candidate[6]:
return False
if not self.parent.ctx.graph.is_redundant(edge):
return False
pending, current, next, finished = self._known_edges()
# Must include finished! REDFLAG: re-examine other cases.
all_edges = pending.union(current).union(next).union(finished)
alternate_edge = (edge[0], edge[1], int(not edge[2]))
if alternate_edge in all_edges:
return False
self.parent.ctx.ui_.status("Queueing redundant edge: %s\n"
% str(alternate_edge))
# Order is important because this changes SaltingState.
self.next_candidates.insert(0, candidate)
self._queue_candidate(self.next_candidates, alternate_edge,
not SaltingState(self).needs_full_request(
self.parent.ctx.graph, alternate_edge))
return True
def _handle_failure(self, dummy, msg, candidate):
""" INTERNAL: Handle FCP request failure for a candidate. """
if not self._needs_bundle(candidate):
#print "_handle_failure -- doesn't need bundle."
candidate[5] = msg
self.finished_candidates.append(candidate)
return
#print "_handle_failure -- ", candidate
if self._should_retry(candidate):
#print "_handle_failure -- retrying..."
# Order important. Allow should_retry to see previous msg.
candidate[5] = msg
if not self._queued_redundant_edge(candidate):
self.next_candidates.insert(0, candidate)
else:
#print "_handle_failure -- abandoning..."
candidate[5] = msg
# Thought about adding code to queue redundant salted request here,
# but it doesn't make sense.
self.finished_candidates.append(candidate)
if self.is_stalled():
self.parent.ctx.ui_.warn("Too many failures. Gave up :-(\n")
self.parent.transition(self.failure_state)
def _multiple_block(self, candidate):
""" INTERNAL: Return True if the candidate's FCP request is
more than one block. """
graph = self.parent.ctx.graph
if not graph is None:
step = candidate[3]
# Should have been fixed up when we got the graph.
assert not step is None
return graph.insert_length(step) > FREENET_BLOCK_LEN
# BUG: returns True for length == 32k w/o padding hack.
# Ugly but benign. Just causes an unnesc. re-fetch. Happens rarely.
return candidate[4][0] >= FREENET_BLOCK_LEN
# REDFLAG: Returns false for bundles you can't pull. CANT PREFETCH?
# False if parent rev not available.
def _needs_bundle(self, candidate):
""" INTERNAL: Returns True if the hg bundle for the candidate's edge
could be pulled and contains changes that we don't already have. """
versions = self._get_versions(candidate)
#print "_needs_bundle -- ", versions
if not self.parent.ctx.has_versions(versions[0]):
#print "Doesn't have parent ", versions
return False # Doesn't have parent.
return not self.parent.ctx.has_versions(versions[1])
# REDFLAGE: remove msg arg?
def _pull_bundle(self, client, dummy_msg, candidate):
""" INTERNAL: Pull the candidates bundle from the file in
the client param. """
length = os.path.getsize(client.in_params.file_name)
if not candidate[3] is None:
expected_length = self.parent.ctx.graph.get_length(candidate[3])
else:
expected_length = candidate[4][0]
#print "expected_length: ", expected_length
#print "length : ", length
# Unwind padding hack. grrrr... ugly.
assert length >= expected_length
if length != expected_length:
out_file = open(client.in_params.file_name, 'ab')
try:
out_file.truncate(expected_length)
finally:
out_file.close()
assert (os.path.getsize(client.in_params.file_name)
== expected_length)
self.parent.ctx.pull(client.in_params.file_name)
def _reevaluate_without_graph(self):
""" Decide which additional edges to request using the top key data
only. """
# Use chks since we don't have access to edges.
pending, current, next, finished = self._known_chks()
all_chks = pending.union(current).union(next).union(finished)
for update in self.top_key_tuple[1]:
if not self.parent.ctx.has_versions(update[1]):
# Still works with incomplete base.
continue # Don't have parent.
if self.parent.ctx.has_versions(update[2]):
# Not guaranteed to work with incomplete heads.
continue # Already have the update's changes.
new_chks = []
for chk in update[3]:
if not chk in all_chks:
new_chks.append(chk)
if len(new_chks) == 0:
continue
full_request_chk = random.choice(new_chks)
new_chks.remove(full_request_chk)
candidate = [full_request_chk, 0, False, None,
update, None, False]
self.current_candidates.insert(0, candidate)
for chk in new_chks:
candidate = [chk, 0, True, None, update, None, False]
self.current_candidates.insert(0, candidate)
# NOT CHEAP!
def _reevaluate(self):
""" Queue addition edge requests if necessary. """
#print "_reevaluate -- called."
self._remove_old_candidates()
graph = self.parent.ctx.graph
if graph is None:
self._reevaluate_without_graph()
return
# REDFLAG: make parameters
redundancy = 4
# Query graph for current index.
index = latest_index(graph, self.parent.ctx.repo)
# REDFLAG: remove debugging code
#latest = min(index + 1, graph.latest_index)
#dump_paths(graph, graph.enumerate_update_paths(index + 1,
# latest,
# MAX_PATH_LEN * 2),
# "All paths %i -> %i" % (index + 1, latest))
# Find all extant edges.
pending, current, next, finished = self._known_edges()
all_edges = pending.union(current).union(next).union(finished)
#print "sets:", pending, current, next, finished
#print "finished_candidates: ", self.finished_candidates
if None in all_edges:
all_edges.remove(None)
assert not None in all_edges
# BUG: What if an existing edge would be better?
# i.e. how do you know to queue the new edges after
# the better existing ones?
# Is this just a problem with edges queued before graph?
# Find the edges we need to update.
first, second = get_update_edges(graph, index, redundancy, True,
all_edges)
if self.parent.params.get('DUMP_UPDATE_EDGES', False):
dump_update_edges(first, second, all_edges)
assert not None in first
assert not None in second
assert len(set(first)) == len(first)
assert len(set(second)) == len(second)
assert len(set(first).intersection(all_edges)) == 0
assert len(set(second).intersection(all_edges)) == 0
self.rep_invariant()
#self.dump()
# first.reverse() ?
salting = SaltingState(self)
#print "FIRST: ", first
for edge in first:
assert not edge is None
#print "EDGE:", edge
full = salting.needs_full_request(graph, edge)
self._queue_candidate(self.current_candidates, edge, not full)
salting.add(edge, not full)
self.rep_invariant()
# second.reverse() ?
#print "SECOND: ", second
for edge in second:
full = salting.needs_full_request(graph, edge)
self._queue_candidate(self.next_candidates, edge, not full)
salting.add(edge, not full)
self.rep_invariant()
def _queue_candidate(self, candidate_list, edge, single_block=False):
""" INTERNAL: Queue a request for a single candidate. """
#print "queue_candidate -- called ", edge, single_block
assert not edge is None
chk = self.parent.ctx.graph.get_chk(edge)
candidate = [chk,
0, single_block, edge, None, None, False]
candidate_list.insert(0, candidate)
def _remove_old_candidates(self):
""" INTERNAL: Remove requests for candidates which are no longer
required. """
#print "_remove_old_candidates -- called"
# Cancel pending requests which are no longer required.
for client in self.pending.values():
candidate = client.candidate
if candidate[6]:
continue # Skip graph requests.
versions = self._get_versions(candidate)
if self.parent.ctx.has_versions(versions[1]):
self.parent.runner.cancel_request(client)
# "finish" requests which are no longer required.
victims = []
for candidate in self.current_candidates:
versions = self._get_versions(candidate)
if self.parent.ctx.has_versions(versions[1]):
victims.append(candidate)
for victim in victims:
self.current_candidates.remove(victim)
self.finished_candidates += victims
# REDFLAG: C&P
victims = []
for candidate in self.next_candidates:
versions = self._get_versions(candidate)
if self.parent.ctx.has_versions(versions[1]):
victims.append(candidate)
for victim in victims:
self.next_candidates.remove(victim)
self.finished_candidates += victims
def _get_versions(self, candidate):
""" Return the mercurial 40 digit hex version strings for the
parent versions and latest versions of the candidate's edge. """
assert not candidate[6] # graph request!
graph = self.parent.ctx.graph
if graph is None:
update_data = candidate[4]
assert not update_data is None
#print "_get_versions -- (no edge) ", update_data[1], update_data[2]
return(update_data[1], update_data[2])
# Should have been fixed up when we got the graph.
step = candidate[3]
#print "CANDIDATE: ", candidate
assert not step is None
#print "_get_versions -- ", step, graph.index_table[step[0]][1], \
# graph.index_table[step[1]][2]
return (graph.index_table[step[0] + 1][0],
graph.index_table[step[1]][1])
def _known_chks(self):
""" INTERNAL: Returns a tuple of sets off all CHKs which are
pending, currently scheduled, scheduled next or already
finished. """
return (set([candidate[0] for candidate in
self.pending_candidates()]),
set([candidate[0] for candidate in self.current_candidates]),
set([candidate[0] for candidate in self.next_candidates]),
set([candidate[0] for candidate in self.finished_candidates]))
# REDFLAG: need to fix these to skip graph special case candidates
# otherwise you get a None in the sets.
def _known_edges(self):
""" INTERNAL: Returns a tuple of sets off all edges which are
pending, currently scheduled, scheduled next or already
finished. """
return (set([candidate[3] for candidate in
self.pending_candidates()]),
set([candidate[3] for candidate in self.current_candidates]),
set([candidate[3] for candidate in self.next_candidates]),
set([candidate[3] for candidate in self.finished_candidates]))
############################################################
# Public helper functions for debugging
############################################################
# Expensive, only for debugging.
def rep_invariant(self):
""" Debugging function to check the instance's invariants. """
def count_edges(table, bad, candidate_list):
""" INTERNAL: Helper function to count edges. """
for candidate in candidate_list:
if candidate[3] is None:
continue
count = table.get(candidate[3], 0)
edge_counts[candidate[3]] = count + 1
if edge_counts[candidate[3]] > 1:
bad.add(candidate[3])
bad_counts = set([])
edge_counts = {}
count_edges(edge_counts, bad_counts, self.current_candidates)
count_edges(edge_counts, bad_counts, self.next_candidates)
count_edges(edge_counts, bad_counts, self.pending_candidates())
if len(bad_counts) > 0:
print "MULTIPLE EDGES: ", bad_counts
self.dump()
assert False
def dump(self):
""" Debugging function to dump the instance. """
def print_list(msg, values):
""" INTERNAL: print a list of values. """
self.parent.ctx.ui_.status(msg + '\n')
for value in values:
self.parent.ctx.ui_.status(" " + str(value) + '\n')
self.parent.ctx.ui_.status("--- dumping state: " + self.name + '\n')
print_list("pending_candidates", self.pending_candidates())
print_list("current_candidates", self.current_candidates)
print_list("next_candidates", self.next_candidates)