""" Classes to maintain an updateable file archive on top of bounded number of WORM (Write Once Read Many) blocks. Copyright (C) 2009 Darrell Karbott This library is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks """ import os from binaryrep import NULL_SHA, write_raw_link, check_shas #, str_sha from blocknames import BLOCK_SUFFIX, ReadWriteNames # Just happens to be Freenet block size ;-) MIN_BLOCK_LEN = 32 * 1024 MAX_HISTORY_LEN = 16 # 1 effectively causes a full reinsert when history chains are shortened. # Larger values favor smaller incremental deltas at the expense of # a longer history chain and larger total history size. COALESCE_FACTOR = 1.5 #----------------------------------------------------------# def is_ordered(partitions): """ Return True if the partitions are in ascending order, False otherwise. """ # Ignore trailing 0 length blocks. lengths = [value[2] for value in partitions] while len(lengths) > 0 and lengths[-1] == 0: lengths = lengths[:-1] for index in range (0, len(lengths) - 1): #if lengths[index] >= lengths[index + 1]: if lengths[index] > lengths[index + 1]: return False return True def is_contiguous(partitions): """ Return True if the block numbers in adjacent partitions are contiguous, False othewise. """ if len(partitions) == 0: return True if partitions[-1][0] > partitions[-1][1]: return False # Hmmmm... for index in range (0, len(partitions) - 1): if partitions[index][0] > partitions[index][1]: return False # Hmmmm... span = partitions[index + 1][0] - partitions[index][1] if span < 0 or span > 1: return False return True # [(start_block, end_block, length), ...] def repartition(partitions, multiple=2): """ Merge newest to oldest until len(partition[n-1]) <= multiple * len(partition[n]) for all partitions. """ for index in range (0, len(partitions) - 1): if partitions[index][2] * multiple >= partitions[index + 1][2]: good = partitions[0:index] rest = partitions[index:] # Hmmm... if this is True, maybe you should simplify your rep.??? assert rest[1][0] - rest[0][1] >= 0 and rest[1][0] - rest[0][1] < 2 rest[1] = (rest[0][0], rest[1][1], rest[0][2] + rest[1][2]) rest = rest[1:] ret = good + repartition(rest) assert is_ordered(ret) # Removed this constraint so I can drop empty partions # assert is_contiguous(ret) return ret ret = partitions[:] # Hmmmm assert is_ordered(ret) assert is_contiguous(ret) return ret def compress(partitions, max_len, multiple=2): """ Reduce the length of the partitions to <= max_len. Drops zero length partitions. """ partitions = partitions[:] partitions = [partition for partition in partitions if partition[2] > 0] if len(partitions) <= max_len: return partitions assert max_len > 1 while len(partitions) > max_len: combined = (partitions[0][0], partitions[1][1], partitions[0][2] + partitions[1][2]) partitions[1] = combined # Enforce the ordering constraint. partitions = repartition(partitions[1:], multiple) assert is_ordered(partitions) return partitions #----------------------------------------------------------# class WORMBlockArchive: """ A file archive implemented on top of a bounded length sequence of Write Once Read Many blocks. Updating the archive means replacing one or more of the underlying blocks. The fundamental atom of storage is a 'history' link. A history link contains an age, the sha1 hash of its parent link, and a blob of delta encoded change data. Age is an integer which is incremented with every update to the archive. History links have at most one parent, but may have many children. The archive has an index which maps history link sha1 hashes to history links. Files are represented as chains of history links. They are retrieved from the archive by running the delta decoding algorithm over all the patch blobs in the chain. Files are addressable by the sha1 hash of the head link in the history chain. The FileManifest class allows files in the archive to be accessed by human readable names. The start_update() method creates a temporary block for update writes. write_new_delta() writes a new history link into the temporary block. commit_update() permanently adds the updates in the temporary block to the archive, re-writing blocks as necessary in order to bound the total number of blocks in the archive at max_blocks. There is no explict notion of deleting history links or files but unreferenced history links may be dropped whenever new blocks are created. The design for this module was influenced by looking at revlog.py in Mercurial, and to a lesser extent by reading about how git works. It was written to implement incrementally updateable file archives on top of Freenet. """ def __init__(self, delta_coder, blocks): self.delta_coder = delta_coder self.delta_coder.get_data_func = self.get_data self.blocks = blocks self.max_blocks = 4 # Hmmm... self.age = 0 def create(self, block_dir, base_name, overwrite=False ): """ Create a new archive. """ names = ReadWriteNames(block_dir, base_name, BLOCK_SUFFIX) self.age = self.blocks.create(names, self.max_blocks, overwrite) # Updateable. # LATER: read only??? def load(self, block_dir, base_name, tags=None): """ Load an existing archive. """ names = ReadWriteNames(block_dir, base_name, BLOCK_SUFFIX) self.age = self.blocks.load(names, self.max_blocks, tags) # MUST call this if you called load() or create() def close(self): """ Close the archive. """ self.blocks.close() # Callback used by DeltaCoder. def get_data(self, link_sha, return_stream=False): """ INTERNAL: Helper function used by DeltaCoder to get raw change data. """ assert not return_stream return self.blocks.link_map.get_link(link_sha, True)[3] # by head history link sha, NOT file sha def get_file(self, history_sha, out_file): """ Get a file by the sha1 hash of the head link in its history link chain. """ check_shas([history_sha, ]) # hmmmm... if history_sha == NULL_SHA: tmp = open(out_file, 'wb') tmp.close() return self.delta_coder.apply_deltas(self.blocks.get_history(history_sha), out_file) # Hmmmm... too pedantic. how much faster would this run # if it were in BlockStorage? # DESIGN INTENT: BlockStorage shouldn't need to know # about DeltaCoder. def write_new_delta(self, history_sha, new_file): """ Writes a new history link to the update file. history_sha can be NULL_SHA. Can ignore history. i.e. not link to previous history. Returns the new link. REQUIRES: is updating. REQUIRES: history_sha is present in the currently committed version of the archive. You CANNOT reference uncommited history links. """ check_shas([history_sha, ]) self.require_blocks() if self.blocks.update_file is None: raise Exception("Not updating.") history = self.blocks.get_history(history_sha) tmp_file = self.blocks.tmps.make_temp_file() old_file = self.blocks.tmps.make_temp_file() oldest_delta = self.blocks.tmps.make_temp_file() blob_file = None try: # REDFLAG: Think through. # It would make more sense for the delta coder to decide when to # truncate history, but I don't want to expose the full archive # interface to the delta coder implementation. if len(history) >= MAX_HISTORY_LEN: # Delta to original file. self.get_file(history[-1][0], old_file) parent0 = self.delta_coder.make_delta(history[-1:], old_file, new_file, oldest_delta) # Full reinsert parent1 = self.delta_coder.make_full_insert(new_file, tmp_file) #print "full: %i old: %i delta: %i target: %i" % ( # os.path.getsize(tmp_file), # history[-1][6], # os.path.getsize(oldest_delta), # COALESCE_FACTOR * os.path.getsize(tmp_file)) # LATER: Back to this. # This is bottom up history shortening driven by the most # recent changes. We should also have some mechanism shortening # history (to 1 link) for files which haven't changed in many # updates, whenever blocks are merged. # Hmmmm... hard (impossible?) to decouple from manifest because # files are addressed by head history link sha if (COALESCE_FACTOR * os.path.getsize(tmp_file) < (os.path.getsize(oldest_delta) + history[-1][6])): parent = parent1 blob_file = tmp_file #print "SHORTENED: FULL REINSERT" else: #print "history:" #for link in history: # print " ", str_sha(link[0]), str_sha(link[2]) parent = parent0 #print #print "parent: ", str_sha(parent) blob_file = oldest_delta #print "SHORTENED: COMPRESSED DELTAS" else: self.get_file(history_sha, old_file) parent = self.delta_coder.make_delta(history, old_file, new_file, tmp_file) blob_file = tmp_file self.blocks.update_links.append( write_raw_link(self.blocks.update_stream, self.age + 1, parent, blob_file, 0)) return self.blocks.update_links[-1] finally: self.blocks.tmps.remove_temp_file(old_file) self.blocks.tmps.remove_temp_file(oldest_delta) self.blocks.tmps.remove_temp_file(tmp_file) def require_blocks(self): """ INTERNAL: Raises if the BlockStorage delegate isn't initialized.""" if self.blocks is None: raise Exception("Uninitialized. Run create() or load().") def start_update(self): """ Create temporary storage required to update the archive. """ self.require_blocks() self.blocks.start_update() def abandon_update(self): """ Abandon all changes made to the archive since start_update() and free temporary storage. """ if not self.blocks is None: # Hmmmm... self.blocks.abandon_update() # Allowed to drop history not in the referenced shas # list. # # Returns an (blocks_added, blocks_removed) tuple. def commit_update(self, referenced_shas=None): """ Permanently write changes into the archive. """ self.require_blocks() if referenced_shas is None: referenced_shas = set([]) self.age = self.blocks.commit_update(referenced_shas) self.compress(referenced_shas) # Restores length and ordering invariants. def compress(self, referenced_shas): """ Compresses the archive to fit in max_blocks blocks. REQUIRES: self.blocks.total_blocks() > max_blocks Merges blocks such that: n <= max_blocks and block[0] < block[1] ... < block[n -1] """ if referenced_shas is None: referenced_shas = set([]) check_shas(referenced_shas) #count = self.blocks.nonzero_blocks() # Compute the "real" size of each block without unreferenced links real_lens = [0 for dummy in range(0, len(self.blocks.tags))] for links in self.blocks.link_map.values(): for link in links: if not link[0] in referenced_shas: continue real_lens[link[5]] += link[6] uncompressed = [[index, index, real_lens[index]] for index in range(0, len(self.blocks.tags))] compressed = compress(uncompressed, self.max_blocks) # Can't put lists in a set. compressed = [tuple(value) for value in compressed] uncompressed = [tuple(value) for value in uncompressed] if compressed == uncompressed: return False self.blocks.update_blocks(uncompressed, compressed, referenced_shas, self.max_blocks) return True def referenced_shas(self, head_sha_list, include_updates=True): """ Return the SHA1 hashes of all history links referenced by the links in the head_sha_list. """ check_shas(head_sha_list) ret = set([]) for head_sha in head_sha_list: for link in self.blocks.get_history(head_sha): ret.add(link[0]) if include_updates: ret = ret.union(self.uncommited_shas()) # Hmmm... frozenset faster? return ret def uncommited_shas(self): """ Return a set of SHA1 hash digests for history links that have been added since start_update(). Note that get_file() fails for these SHA1 because the aren't commited yet. """ return set([link[0] for link in self.blocks.update_links]) class UpToDateException(Exception): """ Raised to signal that no changes were required to the archive. """ def __init__(self, msg): Exception.__init__(self, msg)