A module to keep a file archive in a bounded number of Write Once Read Many blocks.
diff --git a/wormarc/archive.py b/wormarc/archive.py new file mode 100644 --- /dev/null +++ b/wormarc/archive.py @@ -0,0 +1,404 @@ +""" Classes to maintain an updateable file archive on top of + bounded number of WORM (Write Once Read Many) blocks. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +import os +from binaryrep import NULL_SHA, write_raw_link, check_shas #, str_sha +from blocknames import BLOCK_SUFFIX, ReadWriteNames + +# Just happens to be Freenet block size ;-) +MIN_BLOCK_LEN = 32 * 1024 + +MAX_HISTORY_LEN = 16 + +# 1 effectively causes a full reinsert when history chains are shortened. +# Larger values favor smaller incremental deltas at the expense of +# a longer history chain and larger total history size. +COALESCE_FACTOR = 1.5 + +#----------------------------------------------------------# + +def is_ordered(partitions): + """ Return True if the partitions are in ascending order, + False otherwise. """ + + # Ignore trailing 0 length blocks. + lengths = [value[2] for value in partitions] + while len(lengths) > 0 and lengths[-1] == 0: + lengths = lengths[:-1] + + for index in range (0, len(lengths) - 1): + #if lengths[index] >= lengths[index + 1]: + if lengths[index] > lengths[index + 1]: + return False + return True + +def is_contiguous(partitions): + """ Return True if the block numbers in adjacent + partitions are contiguous, False othewise. """ + if len(partitions) == 0: + return True + + if partitions[-1][0] > partitions[-1][1]: + return False # Hmmmm... + + for index in range (0, len(partitions) - 1): + if partitions[index][0] > partitions[index][1]: + return False # Hmmmm... + span = partitions[index + 1][0] - partitions[index][1] + if span < 0 or span > 1: + return False + + return True + +# [(start_block, end_block, length), ...] +def repartition(partitions, multiple=2): + """ Merge newest to oldest until + len(partition[n-1]) <= multiple * len(partition[n]) + for all partitions. """ + + for index in range (0, len(partitions) - 1): + if partitions[index][2] * multiple >= partitions[index + 1][2]: + good = partitions[0:index] + rest = partitions[index:] + # Hmmm... if this is True, maybe you should simplify your rep.??? + assert rest[1][0] - rest[0][1] >= 0 and rest[1][0] - rest[0][1] < 2 + rest[1] = (rest[0][0], rest[1][1], rest[0][2] + rest[1][2]) + rest = rest[1:] + ret = good + repartition(rest) + assert is_ordered(ret) + # Removed this constraint so I can drop empty partions + # assert is_contiguous(ret) + return ret + + ret = partitions[:] # Hmmmm + assert is_ordered(ret) + assert is_contiguous(ret) + return ret + +def compress(partitions, max_len, multiple=2): + """ Reduce the length of the partitions to <= max_len. + + Drops zero length partitions. """ + + partitions = partitions[:] + partitions = [partition for partition in partitions + if partition[2] > 0] + + if len(partitions) <= max_len: + return partitions + + assert max_len > 1 + while len(partitions) > max_len: + combined = (partitions[0][0], partitions[1][1], + partitions[0][2] + partitions[1][2]) + partitions[1] = combined + # Enforce the ordering constraint. + partitions = repartition(partitions[1:], multiple) + + assert is_ordered(partitions) + return partitions + +#----------------------------------------------------------# + +class WORMBlockArchive: + """ A file archive implemented on top of a bounded length sequence + of Write Once Read Many blocks. + + Updating the archive means replacing one or more of the + underlying blocks. + + The fundamental atom of storage is a 'history' link. A + history link contains an age, the sha1 hash of its parent + link, and a blob of delta encoded change data. Age is an + integer which is incremented with every update to the + archive. History links have at most one parent, but may have + many children. + + The archive has an index which maps history link sha1 hashes + to history links. + + Files are represented as chains of history links. They are + retrieved from the archive by running the delta decoding + algorithm over all the patch blobs in the chain. Files are + addressable by the sha1 hash of the head link in the history + chain. The FileManifest class allows files in the archive to + be accessed by human readable names. + + The start_update() method creates a temporary block for update + writes. write_new_delta() writes a new history link into the + temporary block. commit_update() permanently adds the updates + in the temporary block to the archive, re-writing blocks as + necessary in order to bound the total number of blocks in the + archive at max_blocks. + + There is no explict notion of deleting history links or files + but unreferenced history links may be dropped whenever new + blocks are created. + + The design for this module was influenced by looking at + revlog.py in Mercurial, and to a lesser extent by reading + about how git works. + + It was written to implement incrementally updateable file + archives on top of Freenet. + + """ + def __init__(self, delta_coder, blocks): + self.delta_coder = delta_coder + self.delta_coder.get_data_func = self.get_data + self.blocks = blocks + self.max_blocks = 4 + # Hmmm... + self.age = 0 + + def create(self, block_dir, base_name, overwrite=False ): + """ Create a new archive. """ + names = ReadWriteNames(block_dir, base_name, BLOCK_SUFFIX) + self.age = self.blocks.create(names, self.max_blocks, overwrite) + + # Updateable. + # LATER: read only??? + def load(self, block_dir, base_name, tags=None): + """ Load an existing archive. """ + names = ReadWriteNames(block_dir, base_name, BLOCK_SUFFIX) + self.age = self.blocks.load(names, self.max_blocks, tags) + + # MUST call this if you called load() or create() + def close(self): + """ Close the archive. """ + self.blocks.close() + + # Callback used by DeltaCoder. + def get_data(self, link_sha, return_stream=False): + """ INTERNAL: Helper function used by DeltaCoder to get raw + change data. """ + assert not return_stream + return self.blocks.link_map.get_link(link_sha, True)[3] + + # by head history link sha, NOT file sha + def get_file(self, history_sha, out_file): + """ Get a file by the sha1 hash of the head link in its + history link chain. """ + check_shas([history_sha, ]) # hmmmm... + if history_sha == NULL_SHA: + tmp = open(out_file, 'wb') + tmp.close() + return + + self.delta_coder.apply_deltas(self.blocks.get_history(history_sha), + out_file) + + # Hmmmm... too pedantic. how much faster would this run + # if it were in BlockStorage? + # DESIGN INTENT: BlockStorage shouldn't need to know + # about DeltaCoder. + def write_new_delta(self, history_sha, new_file): + """ Writes a new history link to the update file. + + history_sha can be NULL_SHA. + + Can ignore history. i.e. not link to previous history. + + Returns the new link. + + REQUIRES: is updating. + REQUIRES: history_sha is present in the currently committed + version of the archive. + You CANNOT reference uncommited history links. + """ + check_shas([history_sha, ]) + + self.require_blocks() + if self.blocks.update_file is None: + raise Exception("Not updating.") + + history = self.blocks.get_history(history_sha) + tmp_file = self.blocks.tmps.make_temp_file() + old_file = self.blocks.tmps.make_temp_file() + oldest_delta = self.blocks.tmps.make_temp_file() + blob_file = None + try: + # REDFLAG: Think through. + # It would make more sense for the delta coder to decide when to + # truncate history, but I don't want to expose the full archive + # interface to the delta coder implementation. + if len(history) >= MAX_HISTORY_LEN: + # Delta to original file. + self.get_file(history[-1][0], old_file) + parent0 = self.delta_coder.make_delta(history[-1:], + old_file, + new_file, + oldest_delta) + # Full reinsert + parent1 = self.delta_coder.make_full_insert(new_file, + tmp_file) + + #print "full: %i old: %i delta: %i target: %i" % ( + # os.path.getsize(tmp_file), + # history[-1][6], + # os.path.getsize(oldest_delta), + # COALESCE_FACTOR * os.path.getsize(tmp_file)) + + # LATER: Back to this. + # This is bottom up history shortening driven by the most + # recent changes. We should also have some mechanism shortening + # history (to 1 link) for files which haven't changed in many + # updates, whenever blocks are merged. + # Hmmmm... hard (impossible?) to decouple from manifest because + # files are addressed by head history link sha + if (COALESCE_FACTOR * os.path.getsize(tmp_file) < + (os.path.getsize(oldest_delta) + history[-1][6])): + parent = parent1 + blob_file = tmp_file + #print "SHORTENED: FULL REINSERT" + else: + #print "history:" + #for link in history: + # print " ", str_sha(link[0]), str_sha(link[2]) + + parent = parent0 + #print + #print "parent: ", str_sha(parent) + + blob_file = oldest_delta + #print "SHORTENED: COMPRESSED DELTAS" + else: + self.get_file(history_sha, old_file) + parent = self.delta_coder.make_delta(history, old_file, + new_file, + tmp_file) + blob_file = tmp_file + + + self.blocks.update_links.append( + write_raw_link(self.blocks.update_stream, + self.age + 1, parent, + blob_file, 0)) + return self.blocks.update_links[-1] + finally: + self.blocks.tmps.remove_temp_file(old_file) + self.blocks.tmps.remove_temp_file(oldest_delta) + self.blocks.tmps.remove_temp_file(tmp_file) + + def require_blocks(self): + """ INTERNAL: Raises if the BlockStorage delegate isn't initialized.""" + if self.blocks is None: + raise Exception("Uninitialized. Run create() or load().") + + def start_update(self): + """ Create temporary storage required to update the archive. """ + self.require_blocks() + self.blocks.start_update() + + def abandon_update(self): + """ Abandon all changes made to the archive since + start_update() and free temporary storage. """ + if not self.blocks is None: # Hmmmm... + self.blocks.abandon_update() + + # Allowed to drop history not in the referenced shas + # list. + # + # Returns an (blocks_added, blocks_removed) tuple. + def commit_update(self, referenced_shas=None): + """ Permanently write changes into the archive. """ + + self.require_blocks() + if referenced_shas is None: + referenced_shas = set([]) + self.age = self.blocks.commit_update(referenced_shas) + self.compress(referenced_shas) + + + # Restores length and ordering invariants. + def compress(self, referenced_shas): + """ Compresses the archive to fit in max_blocks blocks. + + REQUIRES: self.blocks.total_blocks() > max_blocks + + Merges blocks such that: + n <= max_blocks + and + block[0] < block[1] ... < block[n -1] + """ + + if referenced_shas is None: + referenced_shas = set([]) + + check_shas(referenced_shas) + + #count = self.blocks.nonzero_blocks() + + # Compute the "real" size of each block without unreferenced links + real_lens = [0 for dummy in range(0, len(self.blocks.tags))] + + for links in self.blocks.link_map.values(): + for link in links: + if not link[0] in referenced_shas: + continue + real_lens[link[5]] += link[6] + + uncompressed = [[index, index, real_lens[index]] + for index in range(0, len(self.blocks.tags))] + + compressed = compress(uncompressed, self.max_blocks) + # Can't put lists in a set. + compressed = [tuple(value) for value in compressed] + uncompressed = [tuple(value) for value in uncompressed] + + if compressed == uncompressed: + return False + + self.blocks.update_blocks(uncompressed, compressed, + referenced_shas, self.max_blocks) + return True + + def referenced_shas(self, head_sha_list, include_updates=True): + """ Return the SHA1 hashes of all history links referenced by the + links in the head_sha_list. """ + + check_shas(head_sha_list) + + ret = set([]) + for head_sha in head_sha_list: + for link in self.blocks.get_history(head_sha): + ret.add(link[0]) + if include_updates: + ret = ret.union(self.uncommited_shas()) + + # Hmmm... frozenset faster? + return ret + + def uncommited_shas(self): + """ Return a set of SHA1 hash digests for history links that have + been added since start_update(). + + Note that get_file() fails for these SHA1 because the aren't + commited yet. """ + return set([link[0] for link in + self.blocks.update_links]) + +class UpToDateException(Exception): + """ Raised to signal that no changes were required to the archive. """ + def __init__(self, msg): + Exception.__init__(self, msg) + diff --git a/wormarc/binaryrep.py b/wormarc/binaryrep.py new file mode 100644 --- /dev/null +++ b/wormarc/binaryrep.py @@ -0,0 +1,228 @@ +""" Functions to read and write binary representation of archive data. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +# REDFLAG: Only tested on x86 32-bit Intel Linux. Alignment/endedness issues? +# REDFLAG: OK to read/write byte strings directly w/o (un)pack'ing, right? +# REDFLAG: REDUCE RAM: do chunked read/writes/hash digests where possible. + +import struct +from binascii import hexlify +from hashlib import sha1 + +NULL_SHA = '\x00' * 20 + +LINK_HEADER_FMT = '!LL20s' +LINK_HEADER_LEN = struct.calcsize(LINK_HEADER_FMT) + +COUNT_FMT = "!L" +COUNT_LEN = struct.calcsize(COUNT_FMT) + +# REDFLAG: doc <16k name length +MANIFEST_ENTRY_HDR_FMT = "!H20s20s" +MANIFEST_ENTRY_HDR_LEN = struct.calcsize(MANIFEST_ENTRY_HDR_FMT) +MANIFEST_ENTRY_FMT = MANIFEST_ENTRY_HDR_FMT + "%is" + +MSG_INCOMPLETE_READ = "Bad stream, EOF during read." + +READ_CHUNK_LEN = 1024 * 16 + +def str_sha(raw_sha): + """ Return a 12 digit hex string for a raw SHA1 hash. """ + return hexlify(raw_sha)[:12] + +# Used to catch pilot error which otherwise shows up as weird failures. +def check_shas(raw_sha_sequence): + """ INTERNAL: Raise a ValueError if the sequence values don't look like + raw SHA1 hashes. """ + if raw_sha_sequence is None: + raise ValueError("SHA1 has sequence is None?") + for value in raw_sha_sequence: + if value is None: + raise ValueError("None instead of binary SHA1 digest") + + if not len(value) == 20: + raise ValueError("Doesn't look like a binary SHA1 digest: %s" % + repr(value)) + +def checked_read(in_stream, length, allow_eof=False): + """ Read a fixed number of bytes from an open input stream. + + Raises IOError if EOF is encountered before all bytes are read. + """ + + bytes = in_stream.read(length) + if allow_eof and bytes == '': + return bytes + if len(bytes) != length: + raise IOError(MSG_INCOMPLETE_READ) + return bytes + +# Wire rep: +# <total length><age><parent><blob data> +# +# Python rep +# 0 1 2 3 4 5 6 +# (sha1, age, parent, data, stream_offset, stream_index, physical_len) +# +# sha1 is hash of parent + data +# physical_len is the number of bytes of storage used to persist +# the link. +def read_link(in_stream, keep_data=True, pos=None, stream_index=None): + """ Read a single history link from an open stream. """ + + bytes = checked_read(in_stream, LINK_HEADER_LEN, True) + if bytes == '': + return None # Clean EOF + + length, age, parent = struct.unpack(LINK_HEADER_FMT, bytes) + payload_len = length - LINK_HEADER_LEN # already read header + raw = checked_read(in_stream, payload_len) + + # READFLAG: incrementally read / hash + sha_value = sha1(str(age)) + sha_value.update(parent) + sha_value.update(raw) + + if not keep_data: + raw = None + + return (sha_value.digest(), age, parent, raw, + pos, stream_index, payload_len) + + +def copy_raw_links(in_stream, out_stream, allowed_shas, copied_shas): + """ Copy any links with SHA1 hashes in allowed_shas from in_instream to + out_stream. + """ + count = 0 + while True: + hdr = checked_read(in_stream, LINK_HEADER_LEN, True) + if hdr == '': + return count # Clean EOF + length, age, parent = struct.unpack(LINK_HEADER_FMT, hdr) + sha_value = sha1(str(age)) + sha_value.update(parent) + rest = checked_read(in_stream, length - LINK_HEADER_LEN) + sha_value.update(rest) + value = sha_value.digest() + if value in copied_shas: + continue # Only copy once. + + if allowed_shas is None or value in allowed_shas: + out_stream.write(hdr) + out_stream.write(rest) + count += 1 + copied_shas.add(value) + +# Sets pos, but caller must fix stream index +def write_raw_link(out_stream, age, parent, raw_file, stream_index): + """ Write a history link to an open stream. + + Returns a history link tuple for the link written. """ + + assert len(parent) == 20 # Raw, not hex string + + pos = out_stream.tell() + in_file = open(raw_file, 'rb') + try: + raw = in_file.read() + + out_stream.write(struct.pack(LINK_HEADER_FMT, + len(raw) + LINK_HEADER_LEN, + age, + parent)) + + sha_value = sha1(str(age)) + sha_value.update(parent) + + out_stream.write(raw) + # REDFLAG: read / hash incrementally + sha_value.update(raw) + finally: + in_file.close() + + return (sha_value.digest(), age, parent, None, + pos, stream_index, len(raw) + LINK_HEADER_LEN) + +def write_file_manifest(name_map, out_stream): + """ Write file manifest data to an open stream. """ + + out_stream.write(struct.pack(COUNT_FMT, len(name_map))) + # Sort to make it easier for diff algos to find contiguous + # changes. + names = name_map.keys() + names.sort() + for name in names: + length = MANIFEST_ENTRY_HDR_LEN + len(name) + file_sha, history_sha = name_map[name] + + out_stream.write(struct.pack(MANIFEST_ENTRY_FMT % len(name), + length, + file_sha, + history_sha, + name)) +def read_file_manifest(in_stream): + """ Read file manifest data from an open input stream. """ + count = struct.unpack(COUNT_FMT, checked_read(in_stream, COUNT_LEN))[0] + name_map = {} + for dummy in range(0, count): + length, file_sha, history_sha = \ + struct.unpack(MANIFEST_ENTRY_HDR_FMT, + checked_read(in_stream, + MANIFEST_ENTRY_HDR_LEN)) + + length -= MANIFEST_ENTRY_HDR_LEN + name = checked_read(in_stream, length) + + assert not name in name_map + name_map[name] = (file_sha, history_sha) + return name_map + +def manifest_to_file(file_name, name_map): + """ Write a single manifest to a file. """ + out_file = open(file_name, 'wb') + try: + write_file_manifest(name_map, out_file) + finally: + out_file.close() + +def manifest_from_file(file_name): + """ Read a single manifest from a file. """ + in_file = open(file_name, 'rb') + try: + return read_file_manifest(in_file) + finally: + in_file.close() + +def get_file_sha(full_path): + """ Return the 20 byte sha1 hash digest of a file. """ + in_file = open(full_path, 'rb') + try: + sha_value = sha1() + while True: + bytes = in_file.read(READ_CHUNK_LEN) + if bytes == "": + break + sha_value.update(bytes) + return sha_value.digest() + finally: + in_file.close() + diff --git a/wormarc/blocknames.py b/wormarc/blocknames.py new file mode 100644 --- /dev/null +++ b/wormarc/blocknames.py @@ -0,0 +1,75 @@ +""" Classes used by BlockStorage to map block ordinals to file names. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +# Grrrr... separate file to avoid circular dependency. + + +import os + +BLOCK_SUFFIX = '.bin' + +class BlockNames: + """ ABC to map ordinals to file names. """ + def __init__(self, read_only): + self.read_only = read_only + + def read_path(self, ordinal): + """ Return a file name to read the block from. """ + raise NotImplementedError() + + def write_path(self, ordinal): + """ Return a file name to write the block to. + This can raise a ValueError if the blocks are read only. + """ + if self.read_only: + raise ValueError("Blocks are read only!") + return self.read_path(ordinal) + +class ReadWriteNames(BlockNames): + """ A naming policy for an updateable set of blocks. """ + def __init__(self, block_dir, block_name, suffix): + BlockNames.__init__(self, False) + self.block_dir = block_dir + self.block_name = block_name + self.suffix = suffix + + def read_path(self, ordinal): + """ Implement pure virtual. """ + return os.path.join(self.block_dir, "%s_%s%s" % + (self.block_name, + str(ordinal), + self.suffix)) + +# UNTESTED! +# DESIGN INTENT: Adapter that allows you to load a BlockStorage from +# a static cache of CHK blocks. +class ReadOnlyNames(BlockNames): + """ A naming policy for a read only set of blocks. """ + def __init__(self, read_only_file_names): + BlockNames.__init__(self, True) + self.file_names = read_only_file_names + + def read_path(self, ordinal): + """ Implement pure virtual. """ + if ordinal < 0 or ordinal >= len(self.file_names): + raise IndexError("No such file: %i" % ordinal) + return self.file_names[ordinal] + diff --git a/wormarc/blocks.py b/wormarc/blocks.py new file mode 100644 --- /dev/null +++ b/wormarc/blocks.py @@ -0,0 +1,461 @@ +""" Classes to store collections of archive history links in files. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +# REDFLAG: CLEANUP ERROR HANDLING. Failures can lose or corrupt blocks! + +import os + +from archive import MIN_BLOCK_LEN, UpToDateException +from linkmap import LinkMap +from binaryrep import NULL_SHA, copy_raw_links + +# REDFLAG: rtfm python tempfile module. is this really needed? +class ITempFileManager: + """ Delegate to handle temp file creation and deletion. """ + def __init__(self): + pass + def make_temp_file(self): + """ Return a new unique temp file name including full path. """ + raise NotImplementedError() + def remove_temp_file(self, name): + """ Remove and existing temp file. """ + raise NotImplementedError() + +def has_internal_zero(sequence): + """ Return True if the sequence has a zero to non-zero transition, + False otherwise. """ + saw_zero = False + for value in sequence: + if value == 0: + saw_zero = True + else: + if saw_zero: + return True + return False + +# DESIGN INTENT: Push file system dependancies out of archive code. +class BlockStorage: + """ A class to store history links in a collection of files. """ + def __init__(self, tmps, name_policy=None): + self.tmps = tmps + self.names = name_policy + self.tags = ['', ] # Is also a proxy for length. + + self.link_map = None + + # Hmmmm... file and stream belong in storage + # but links belongs in the archive.... + self.update_file = None + self.update_stream = None + self.update_links = [] + + def is_updating(self): + """ Return True if updating, False otherwise. """ + # Hmmm... + return not self.update_stream is None + + def close(self): + """ Close the files. """ + self.abandon_update() + if self.link_map is None: + return + self.link_map.close() + self.link_map = None + + def full_path(self, ordinal, read=True): + """ Return the full path to an underlying block file. """ + if read: + return self.names.read_path(ordinal) + + return self.names.write_path(ordinal) + + def get_history(self, head_sha1): + """ Return the history link chain which has a head link with hash + head_sha1. """ + if head_sha1 == NULL_SHA: + return [] + ret = [] + head = head_sha1 + while True: + link = self.link_map.get_link(head) + ret.append(link[:]) # Copy + if link[2] == NULL_SHA: + return ret + head = link[2] + + def start_update(self): + """ Create temporary storage required to write an update. + + You MUST call commit_update() or abandon_update() after + calling this. """ + + if not self.update_file is None: + raise Exception("Commmit or abandon the previous update!") + self.update_file = self.tmps.make_temp_file() + self.update_links = [] + raised = True + try: + self.update_stream = open(self.update_file, "wb") + raised = False + finally: + if raised: + self.abandon_update() + + # UpToDateException is recoverable, all others fatal. i.e. zorch instance. + def commit_update(self, referenced_shas=None): + """ Permanently write changes into the archive. + + This creates a new block which may replace an + existing one. """ + + assert not referenced_shas is None + + if self.update_file is None or len(self.update_links) == None: + UpToDateException("No changes to commit.") + + age = 0 + # Automagically add history for self and parents + for link in self.update_links: + age = max(age, link[1]) + # New link + referenced_shas.add(link[0]) + # Previous history + # TRICKY: You can't call get_history on the new link itself + # because it isn't commited yet. + for child in self.get_history(link[2]): + referenced_shas.add(child[0]) + + try: + self.update_stream.close() + self.update_stream = None # see is_updating() + self.add_block(referenced_shas) + return age + finally: + # Always clean up, even on success. + # EXCEPTIONS ARE FATAL! + self.abandon_update() + + def abandon_update(self): + """ Free temporary storage associated with an update without + committing it. """ + + self.update_links = [] + if not self.update_stream is None: + self.update_stream.close() + self.update_stream = None + if not self.update_file is None: + self.tmps.remove_temp_file(self.update_file) + self.update_file = None + + # Returns 0 + def create(self, name_policy, num_blocks, overwrite=False ): + """ Initialize the instance by creating a new set of empty + block files. """ + + if name_policy.read_only: + raise ValueError("Names are read only! Use load() instead?") + self.names = name_policy + self.tags = ['', ] # Length == 1 + if not overwrite: + for ordinal in range(0, num_blocks): + if os.path.exists(self.full_path(ordinal, False)): + raise IOError("Already exists: %s" % + self.full_path(ordinal, False)) + + for ordinal in range(0, num_blocks): + out_file = open(self.full_path(ordinal, False), 'wb') + out_file.close() + + return self.load(name_policy, num_blocks) + + # hmmmm... want to use hash names for blocks + # blocks is [[file_name, desc, dirty], ...] + # returns maximum age + def load(self, name_policy, num_blocks, tags=None): + """ Initialize the instance by loading from an existing set of + block files. """ + + self.names = name_policy + if tags is None: + tags = ['' for dummy in range(0, num_blocks)] + + # DESIGN INTENT: Meant for keeping track of Freenet CHK's + assert len(tags) == num_blocks + self.tags = tags[:] + self.link_map = LinkMap() + age, counts = self.link_map.read([self.full_path(ordinal, False) + for ordinal in range(0, num_blocks)]) + assert not has_internal_zero(counts) + if max(counts) == 0: + self.tags = self.tags[:1] # Length == 1 + return age + + # Includes 0 length blocks + def total_blocks(self): + """ Return the total number of blocks including 0 length ones. """ + return len(self.tags) + + # Hmmmm... physical length. + def nonzero_blocks(self): + """ Return the number of non-zero length blocks. """ + for ordinal in range(0, len(self.tags)): + if os.path.getsize(self.full_path(ordinal)) == 0: + # Check for illegal internal zero length blocks. + for index in range(ordinal + 1, len(self.tags)): + if os.path.exists(self.full_path(index)): + assert os.path.getsize(self.full_path(index)) == 0 + return ordinal + + return len(self.tags) + + # This may delete self.update_file, but caller is + # still responsible for cleaning it up. + # + # Breaks length and ordering invariants. + def add_block(self, referenced_shas, tag=''): + """ INTERNAL: Add the temporary update file to the permanent + block files. """ + + assert not self.is_updating() + assert not self.names.read_only + update_len = os.path.getsize(self.update_file) + head_len = os.path.getsize(self.full_path(0, False)) + tmp = None + try: + # Doesn't change length + if update_len + head_len < MIN_BLOCK_LEN: + # Link map has open file descriptors. + # Must close or os.remove() below fails on Windows. + self.link_map.close() + + # We might merge with an empty block here, but it + # doesn't matter since the length is bounded. Do better? + + # Can just append to the first block. + # [N + O1] ... + tmp = self.merge_blocks((self.update_file, + self.full_path(0, False)), + referenced_shas) + if os.path.exists(self.full_path(0, False)): + # REDFLAG: What if this fails? + os.remove(self.full_path(0, False)) + + os.rename(tmp, self.full_path(0, False)) + self.tags[0] = tag + + # Deletes update file IFF we get here. + tmp = self.update_file + self.update_file = None + + # Fix the link map. + fixups = {} # Drop links in head block. + for index in range(1, self.total_blocks()): + fixups[index] = index + # Potentially SLOW. + self.link_map.update_blocks(fixups, + [self.full_path(index, False) + for index in + range(0, self.total_blocks())], + [0,]) # Read links from head block. + return + + + # Deletes update file always. + tmp = self.update_file + self.update_file = None + + # Close the link map before messing with files. + self.link_map.close() + + self.prepend_block(tmp) + self.tags.insert(0, tag) # Increments implicit length + + # Fix the link map. + fixups = {} + for index in range(0, self.total_blocks() - 1): # We inserted! + fixups[index] = index + 1 + # Potentially SLOW. + self.link_map.update_blocks(fixups, + [self.full_path(index, False) + for index in + range(0, self.total_blocks())], + [0,]) # Read links from head block. + finally: + self.tmps.remove_temp_file(tmp) + + # Returns tmp file with merged blocks. + # Caller must delete tmp file. + def merge_blocks(self, block_file_list, referenced_shas): + """ INTERNAL: Merge blocks into a single file. """ + tmp = self.tmps.make_temp_file() + copied_shas = set([]) + raised = True + try: + out_file = open(tmp, 'wb') + try: + for name in block_file_list: + in_file = open(name, "rb") + try: + # Hmmm... do something with count? + #count = copy_raw_links(in_file, out_file, + # referenced_shas) + copy_raw_links(in_file, out_file, + referenced_shas, copied_shas) + finally: + in_file.close() + finally: + out_file.close() + raised = False + return tmp + finally: + if raised: + self.tmps.remove_temp_file(tmp) + + # Implementation helper function, caller deals with file cleanup. + # REQUIRES: new_block not an extant block file. + def prepend_block(self, new_block): + """ INTERNAL: Insert a new block at the head of the block list. """ + + assert not self.is_updating() + assert self.update_file is None + # Shift all extant blocks up by one index + for index in range(self.total_blocks() - 1, -1, -1): + if os.path.exists(self.full_path(index + 1, False)): + # REDFLAG: failure? + os.remove(self.full_path(index + 1, False)) + # REDFLAG: failure? + os.rename(self.full_path(index, False), + self.full_path(index + 1, False)) + # Now copy the update block into the 0 position. + os.rename(new_block, self.full_path(0, False)) + + + def _make_new_files(self, new_blocks, referenced_shas, tmp_files): + """ INTERNAL: Implementation helper for update_blocks(). """ + new_files = {} + for partition in new_blocks: + # Calling code should have already dropped empty blocks. + new_files[partition] = self.merge_blocks([self.full_path(index, + False) + for index in + range(partition[0], + partition[1] + + 1)], + referenced_shas) + tmp_files.append(new_files[partition]) + return new_files + + def _remove_old_files(self, dropped_blocks): + """ INTERNAL: Implementation helper for update_blocks(). """ + # Delete the files for dropped blocks + for partition in dropped_blocks: + assert partition[0] == partition[1] + if not os.path.exists(self.full_path(partition[0], False)): + continue + os.remove(self.full_path(partition[0], False)) + + def _copy_old_blocks(self, old_blocks, tmp_files): + """ INTERNAL: Implementation helper for update_blocks(). """ + renamed = {} + for partition in old_blocks: + assert partition[0] == partition[1] + + src = self.full_path(partition[0], False) + assert os.path.exists(src) + dest = self.tmps.make_temp_file() + tmp_files.append(dest) + os.rename(src, dest) + renamed[partition] = dest + return renamed + + def _update_block_files(self, compressed, uncompressed, + referenced_shas, tmp_files): + """ INTERNAL: Implementation helper for update_blocks(). """ + + # Hmmm... to appease pylint max local vars constraint. + #new_blocks = set(compressed) - set(uncompressed) + old_blocks = set(compressed).intersection(set(uncompressed)) + #dropped_blocks = set(uncompressed) - old_blocks + + # Build new blocks in tmp files + new_files = self._make_new_files(set(compressed) - set(uncompressed), + referenced_shas, + tmp_files) + # Delete the files for dropped blocks + self._remove_old_files(set(uncompressed) - old_blocks) + # Move old blocks into tmp files + renamed = self._copy_old_blocks(old_blocks, tmp_files) + + new_tags = ['' for dummy in range(0, len(compressed))] + new_indices = [] + ordinal_fixups = {} + # Rename blocks onto new block ordinals + for index, block in enumerate(compressed): #hmmm not a set??? + dest = self.full_path(index, False) + assert not os.path.exists(dest) + if block in set(compressed) - set(uncompressed): + os.rename(new_files[block], dest) + new_tags[index] = 'new' # best we can do. + new_indices.append(index) + continue + + assert block in old_blocks + os.rename(renamed[block], dest) + # Copy the old tag value into the right position + new_tags[index] = self.tags[block[0]] + # Save info we need to fix the link_map + ordinal_fixups[block[0]] = index + self.tags = new_tags + return (new_tags, new_indices, ordinal_fixups) + + # REDFLAG: Failure. + def update_blocks(self, uncompressed, compressed, referenced_shas, + min_blocks): + """ Repartition the underlying block files into the partitions + described by compressed. """ + + assert not self.is_updating() + assert not self.names.read_only + + tmp_files = [] + try: + self.link_map.close() + self.tags, new_indices, ordinal_fixups = \ + self._update_block_files(compressed, uncompressed, + referenced_shas, tmp_files) + + # Drop links for unreferenced blocks and shift indices. + # Then read links from new block files. + self.link_map.update_blocks(ordinal_fixups, + [self.full_path(index, False) + for index in + range(0, self.total_blocks())], + new_indices) + + # Add trailing zero length blocks. + for index in range(self.nonzero_blocks(), min_blocks): + out_file = open(self.full_path(index, False), 'wb') + out_file.close() + finally: + for name in tmp_files: + self.tmps.remove_temp_file(name) + + diff --git a/wormarc/deltacoder.py b/wormarc/deltacoder.py new file mode 100644 --- /dev/null +++ b/wormarc/deltacoder.py @@ -0,0 +1,208 @@ +""" A delta encoder/decoder based on Mercurial's binary diff/patch code. + + ATTRIBUTION: Contains source fragements written by Matt Mackall. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +# For names in pillaged Mercurial code. +# pylint: disable-msg=C0103, W0141 + +import zlib +from mercurial import mdiff + + +from binaryrep import NULL_SHA +############################################################ +# ATTRIBUTION: Pillaged from Mercurial revlog.py by Matt Mackall +# Then hacked, so bugs are mine. +_compress = zlib.compress +_decompress = zlib.decompress + +def compress(text): + """ generate a possibly-compressed representation of text """ + if not text: + return ("", text) + l = len(text) + bin = None + if l < 44: # Is this Mercurial specific or a zlib overhead thing? + pass + elif l > 1000000: + # zlib makes an internal copy, thus doubling memory usage for + # large files, so lets do this in pieces + z = zlib.compressobj() + p = [] + pos = 0 + while pos < l: + pos2 = pos + 2**20 + p.append(z.compress(text[pos:pos2])) + pos = pos2 + p.append(z.flush()) + if sum(map(len, p)) < l: + bin = "".join(p) + else: + bin = _compress(text) + if bin is None or len(bin) > l: + if text[0] == '\0': + return ("", text) + return ('u', text) + return ("", bin) + +def decompress(bin): + """ decompress the given input """ + if not bin: + return bin + t = bin[0] + if t == '\0': + return bin + if t == 'x': + return _decompress(bin) + if t == 'u': + return bin[1:] + + raise Exception("unknown compression type %r" % t) + + # _ is a function defined in i18n.py to call i18n.gettext. + #raise RevlogError(_("unknown compression type %r") % t) + +############################################################ + +# REDFLAG: wants_stream ENOTIMPL, who closes stream? +# Returns raw patch data if if it's not set +# returns a readable stream if wants_stream is True, otherwise the raw data +# def example_get_data_func(history_link, wants_stream=False): +# pass + +class DeltaCoder: + """ Wrapper around the delta compression/decompression implementation + used by the Mercurial Revlog. + + See revlog.py, mdiff.py, mpatch.c, bdiff.c in Mercurial codebase. + """ + def __init__(self): + self.get_data_func = lambda x:None + self.tmp_file_mgr = None + + # Define an ABC? What would the runtime overhead be? + # Subclass might need tmp_file_mgr or get_data_func. + # pylint: disable-msg=R0201 + def make_full_insert(self, new_file, out_file_name, + disable_compression=False): + """ Make a blob readable by apply_deltas containing the entire file. """ + + in_file = open(new_file, 'rb') + raw_new = None + try: + raw_new = in_file.read() + finally: + in_file.close() + + if disable_compression: + values = ('u', raw_new) + else: + values = compress(raw_new) + + out_file = open(out_file_name, 'wb') + try: + if values[0]: + out_file.write(values[0]) + out_file.write(values[1]) + finally: + out_file.close() + + return NULL_SHA + + # Writes a new delta blob into out_files + # Returns parent sha1. + # Can truncate history by returning NULL_SHA + def make_delta(self, history_chain, old_file, new_file, out_file_name): + """ Make a new binary change blob and write it into out_file_name. + + """ + if len(history_chain) == 0: + #print "DOING FULL INSERT" + return self.make_full_insert(new_file, out_file_name) + + #print "MAKING DELTA" + in_file = open(new_file, 'rb') + raw_new = None + try: + raw_new = in_file.read() + finally: + in_file.close() + + parent = NULL_SHA + in_old = open(old_file, 'rb') + try: + raw_old = in_old.read() + values = compress(mdiff.textdiff(raw_old, raw_new)) + parent = history_chain[0][0] + out_file = open(out_file_name, 'wb') + try: + if values[0]: + out_file.write(values[0]) + out_file.write(values[1]) + finally: + out_file.close() + finally: + in_old.close() + + return parent + + # All text and patches kept in RAM. + # Rebuilds the file by applying all the deltas in the history chain. + def apply_deltas(self, history_chain, out_file_name): + """ Rebuild a file from a series of patches and write it into + out_file_name. """ + assert len(history_chain) > 0 + + deltas = [] + text = None + index = 0 + while index < len(history_chain): + link = history_chain[index] + if link[2] == NULL_SHA: + text = link[3] + if text is None: + text = self.get_data_func(link[0]) + break + + delta = link[3] + if delta is None: + delta = self.get_data_func(link[0]) + assert not delta is None + deltas.append(delta) + index += 1 + + assert not text is None + text = decompress(text) + if len(deltas) == 0: + raw = text + else: + for index in range(0, len(deltas)): + deltas[index] = decompress(deltas[index]) + deltas.reverse() # iterate in reverse? + raw = mdiff.patches(text, deltas) + + text = None + out_file = open(out_file_name, "wb") + try: + out_file.write(raw) + finally: + out_file.close() diff --git a/wormarc/design.txt b/wormarc/design.txt new file mode 100644 --- /dev/null +++ b/wormarc/design.txt @@ -0,0 +1,74 @@ +djk20091206 -- personal notes, probably not very useful to anyone else. + +OVERVIEW: +Git/hg lite. file is a linear change of delta encoded patches. patches +referenced by sha(parent + data) Archive is a collection of patches, +has one "root object" patch chain head. FileManifest is an arbitrary +mapping of human readable names to patch chain heads + +patch + HASA *one* parent (but multiple patches can have the same parent) + HASA sha1 + HASA age + +patch chain + HASA ordered sequences of patch'es + +file + ISA patch chain + +Manifest + ISA file + HASA file sha -> patchchain map + + can determine every referenced patch by walking all the patch chains + can generate new patches for an update from a file list + +Archive + HASA unordered collection of patches + HASA "special" patch chain sha1 which points to the root object. + + + can partition patches across a bounded number of read only files + can map patch sha1s to patches (via Blocks, LinkMap) + tries to write as little as possible + tries to update the oldest files as little as possible + knows to drop unreferenced patch chains + could incrementally update a local directory based on locally cached archive state + ??? COOL, BUT REALLY REQUIRED?, requires age + i.e. rsync like behaviour +top key + HASA ordered sequence of bundle CHKS + HASA root object sha + + Is the in Freenet rep of an Archive + + +PROS: +sites load fully in a bounded number of freenet fetches +CHKs for older content used in newer updates. +Easy to implement top key redundancy, reinsertion + +CONS: +Slower in typical use case? + offset by local caching of old bundles + fast binary patching algo +Requires either lots of memory or fast random access to encrypted storage. +"Defrag" usability issue. Inserts are usually fast, but could take a long time. +Inserting is stateful. i.e. you must be able to fetch the previous version. +Complexity. Need to disable diffs for static files (e.g. images) + +USES: +Wiki +Freesites +PyFreenetHg +git? + +Important Classes: +WORMBlockArchive -- The archive +Blocks -- Delegate of WORMBBlockArchive to handle reading / + writing patch chains to files. +BlockNames -- Delegate of Blocks to abstract ordinal to file name mapping. +LinkMap -- Delegate of Blocks to keep a SHA1 addressable map of patch chain links +ManifestFile -- Map of human readable names to patch chain head SHA1s + diff --git a/wormarc/filemanifest.py b/wormarc/filemanifest.py new file mode 100644 --- /dev/null +++ b/wormarc/filemanifest.py @@ -0,0 +1,485 @@ +""" Classes to address files stored in a WORMBlockArchive by + human readable name. + + ATTRIBUTION: Contains source fragements written by Matt Mackall. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +import os +import shutil + +from binaryrep import NULL_SHA, manifest_from_file, \ + manifest_to_file, get_file_sha, check_shas, str_sha + +from archive import UpToDateException + +def is_printable_ascii(value): + """ Return True if all the characters in value are printable + ASCII, False otherwise. """ + value = [ord(c) for c in value] + # Hmmm... allow spaces + return max(value) <= 0x7e and min(value) >= 0x20 + +#----------------------------------------------------------# + +# Hmmmm... this feels horrifically overdesigned, but I need a way +# to decouple the data that you can insert into a manifest from +# the manifest implementation. +class IManifestEntry: + """ Abstract base class for things that can be referenced + from a FileManifest. """ + def __init__(self): + pass + + def get_name(self): + """ Returns the name to insert this entry under in the manifest. """ + #raise NotImplementedError() + pass + + def make_file(self): + """ Returns the full path to the data to insert. + May create a temp file which it can clean up in release(). + """ + #raise NotImplementedError() + pass + + # MUST call this. + def release(self): + """ Cleanup method called when the instance is no longer in use. """ + #raise NotImplementedError() + pass + +class FileManifest: + """ An index which maps human readable names to files in an archive. """ + def __init__(self, name_map=None, history_sha=NULL_SHA): + check_shas([history_sha, ]) + if name_map == None: + name_map = {} + + # name -> (file sha1, patch chain head sha1) + self.name_map = name_map + # Hmmmm... convenient, but it ties the manifest to an archive. + self.stored_sha = history_sha + + @classmethod + def from_archive(cls, archive, history_sha): + """ Create a FileManifest from a file in the archive. """ + check_shas([history_sha, ]) + tmp_name = archive.blocks.tmps.make_temp_file() + try: + archive.get_file(history_sha, tmp_name) + # Hmmmm... age... put back in manifest? + name_map = manifest_from_file(tmp_name) + return FileManifest(name_map, history_sha) + finally: + archive.blocks.tmps.remove_temp_file(tmp_name) + + # hmmmm... not to_archive, would expect that to be an instance member. + @classmethod + def write_manifest(cls, archive, name_map, history_sha): + """ Helper, writes updated manifest to archive. + Returns link. + """ + check_shas([history_sha, ]) + # Add manifest + tmp_file_name = archive.blocks.tmps.make_temp_file() + try: + manifest_to_file(tmp_file_name, name_map) + return archive.write_new_delta(history_sha, tmp_file_name) + finally: + archive.blocks.tmps.remove_temp_file(tmp_file_name) + + def make_file_sha_map(self): + """ INTERNAL: Make a file_sha -> (file_sha, patch_sha) map + from name_map. """ + file_sha_map = {} + for name in self.name_map: + pair = self.name_map[name] + file_sha_map[pair[0]] = pair + return file_sha_map + + # Doesn't change manifest or archive. + def write_changes(self, archive, entry_infos, prev_manifest_sha=NULL_SHA): + """ INTERNAL: Helper function for update(). + + Writes the changes required to add the IManifestEntries + in entries_infos to an archive. + + Raises UpToDateException if there are no changes. + + Return an (updated_name_map, manifest_sha) tuple. """ + + check_shas([prev_manifest_sha, ]) + + file_sha_map = self.make_file_sha_map() + new_name_map = {} + updated = False + + for info in entry_infos: + full_path = info.make_file() + try: + name = info.get_name() + if not is_printable_ascii(name): + raise IOError("Non-ASCII name: %s" % repr(name)) + hash_info = self.name_map.get(name, None) + file_sha = get_file_sha(full_path) + if hash_info is None: + updated = True + if file_sha in file_sha_map: + # Renamed + new_name_map[name] = file_sha_map[file_sha] + else: + # REDFLAG: We lose history for files which are renamed + # and modified. + # Created (or renamed and modified) + link = archive.write_new_delta(NULL_SHA, full_path) + new_name_map[name] = (file_sha, link[0]) + else: + if self.name_map[name][0] == file_sha: + # Exists in manifest and is unmodified. + new_name_map[name] = self.name_map[name] + continue + + # Modified + updated = True + link = archive.write_new_delta(self.name_map[name][1], + full_path) + new_name_map[name] = (file_sha, link[0]) + + # delete == ophaned history, NOP + finally: + info.release() + + if not updated: + if (frozenset(new_name_map.keys()) == + frozenset(self.name_map.keys())): + raise UpToDateException("The file manifest is up to date.") + + # Add updated manifest + link = FileManifest.write_manifest(archive, new_name_map, + prev_manifest_sha) + + return (new_name_map, link[0]) + + # Only works if fully committed! + def all_shas(self, archive): + """ Return the SHA1 hashes of all history links required to store + the files referenced by the manifest. """ + shas = [entry[1] for entry in self.name_map] + shas.add(self.stored_sha) + history_shas = set([]) + for value in shas: + history_shas.union(set([link[0] for link in + archive.blocks.get_history(value)])) + return shas.union(history_shas) + + # Changes both the manifest and the archive. + # other_head_shas is for other files in the archive not + # handled by this manifest. + def update(self, archive, entry_infos, other_head_shas=None, + truncate_manifest_history=False): + """ Update the manifest with the changes in entry infos and + write the changes and the updated manifest into the archive. """ + if other_head_shas is None: + other_head_shas = set([]) + + check_shas(other_head_shas) + + archive.start_update() + raised = True + try: + prev_sha = self.stored_sha + if truncate_manifest_history: + prev_sha = NULL_SHA + + new_names, root_sha = self.write_changes(archive, + entry_infos, + prev_sha) + + # History for all files except recently modified ones. + old_shas = set([]) + + new_shas = archive.uncommited_shas() + + for value in new_names.values(): + if value[1] in new_shas: + # Adding history for new values is handled by + # commit_update(). + continue + + # We need to explictly add history for the files which + # still exist in the manifest but didn't change. + for link in (archive.blocks.get_history(value[1])): + old_shas.add(link[0]) + + all_shas = archive.referenced_shas(old_shas. + union(other_head_shas)) + + archive.commit_update(all_shas) + self.stored_sha = root_sha + self.name_map = new_names + raised = False + finally: + if raised: + archive.abandon_update() + + +def verify_manifest(archive, manifest, brief=False): + """ Debugging function to verify the integrity of a manifest. """ + failures = 0 + for name in manifest.name_map: + tmp = archive.blocks.tmps.make_temp_file() + file_sha, link_sha = manifest.name_map[name] + if not brief: + print "Verifying: %s %s => %s)" % (name, + str_sha(file_sha), + str_sha(link_sha)) + archive.get_file(link_sha, tmp) + history = archive.blocks.get_history(link_sha) + if not brief: + print "History: " + " ".join([str_sha(link[0]) + for link in history]) + + retrieved_sha = get_file_sha(tmp) + if retrieved_sha != file_sha: + print "Expected: %s, but got %s." % (str_sha(file_sha), + str_sha(retrieved_sha)) + failures += 1 + else: + if not brief: + print "Ok. Read %i bytes." % os.path.getsize(tmp) + + archive.blocks.tmps.remove_temp_file(tmp) + + if failures > 0: + print "%i entries failed to verify!" % failures + assert False + +def fix_backwards_slashes(name): + """ Helper to fix backwards slashes in windows file names. """ + if os.sep != '\\' or name.find('\\') == -1: + return name + + return '/'.join(name.split('\\')) + +class PathEntry(IManifestEntry): + """ IManifestEntry implementation for a path to a file on the + local filesystem. """ + def __init__(self, full_path, name): + IManifestEntry.__init__(self) + self.full_path = full_path + self.name = fix_backwards_slashes(name) + + def get_name(self): + """ IManifestEntry implementation. """ + return self.name + + def make_file(self): + """ IManifestEntry implementation. """ + return self.full_path + + + # make_file(), release() are NOPs + + +# skips empty directories +# LATER: updates w/o sending all data? +# only send files which have changes since +# a local sha1 list file has changed, just send sha1s of others. +# LATER: add accept_regex? +def entries_from_dir(start_dir, recurse, ignore_regex=None, include_dirs=False): + """ An iterator which yields FileManifestEntries for + files in a directory. """ + stack = [start_dir] + while len(stack) > 0: + current_dir = stack.pop() + names = os.listdir(current_dir) + for name in names: + if not ignore_regex is None and ignore_regex.match(name): + continue + full_path = os.path.join(current_dir, name) + if os.path.isdir(full_path) and recurse: + if include_dirs: + # Hack so that I can delete unreferenced dirs + # in manifest_to_dir + yield PathEntry(full_path, '') + stack.append(full_path) + if os.path.isfile(full_path): + name = full_path[len(start_dir):] + while len(name) > 0 and name.startswith(os.sep): + name = name[1:] + if len(name) > 0: + yield PathEntry(full_path, name) + +def find_dirs(name_map, target_dir): + """ INTERNAL: Helper function used by manifest_to_dir(). """ + + dirs = set([]) + for file_name in name_map: + dir_name = os.path.dirname(os.path.join(target_dir, file_name)) + if not dir_name: + continue # Hmmm + if dir_name == os.sep: + continue # Hmmm + dirs.add(dir_name) + + return dirs + +def read_local_dir(manifest, target_dir, dirs, ignore_regex): + """ INTERNAL: Helper function used by manifest_to_dir(). """ + # Read local directory state. + overwrite = set([]) + remove = {} # name -> path + local_dirs = set([]) + extant = set([]) + for entry in entries_from_dir(target_dir, True, ignore_regex, True): + name = entry.get_name() + extant.add(name) + full_path = entry.make_file() + if name == '': + # Because we told entries_from_dir to return directories. + local_dirs.add(full_path) + continue + + local_dirs.add(os.path.dirname(full_path)) + if name in manifest.name_map: + overwrite.add(name) + else: # skip directory entries + remove[name] = entry.make_file() + entry.release() + + # O(N*M) hmmm.... + # Remove non-leaf subdirectories. + for stored_dir in dirs: + for local_dir in local_dirs.copy(): + if stored_dir.startswith(local_dir): + local_dirs.remove(local_dir) + + + return (overwrite, remove, local_dirs, extant) + +# Hmmm... wackamole code. +# REDFLAG: Other ways to make sleazy path references. +def validate_path(base_dir, full_path): + """ Catch references to direcories above base_dir. """ + base_dir = os.path.abspath(base_dir) + + if type(full_path) is unicode: + raise IOError("Unicode path name: %s" % repr(full_path)) + if not is_printable_ascii(full_path): + raise IOError("Non-ASCII path name: %s" % repr(full_path)) + + full_path = os.path.abspath(full_path) + + if not (len(full_path) > len(base_dir) and + full_path.startswith(base_dir)): + raise IOError("Hinky path in manifest: %s" % full_path) + +# No error handling or cleanup. +# Doubt this will work on Windows, must handle backwards path sep. +def manifest_to_dir(archive, manifest, target_dir, ignore_regex=None, + dry_run=False): + + """ Update files in a local directory by extracting files in a manifest. + + WARNING. NOT WELL TESTED. POTENTIALLY DANGEROUS. + PROBABLY BROKEN ON WINDOWS. """ + + dirs = find_dirs(manifest.name_map, target_dir) + + overwrite, remove, local_dirs, extant = \ + read_local_dir(manifest, target_dir, dirs, ignore_regex) + + remove_dirs = local_dirs - dirs + create = set(manifest.name_map.keys()) - extant + if dry_run: + return (create, overwrite, set(remove.keys()), remove_dirs) + + # Remove files + for victim in remove.values(): + if os.path.exists(victim): + validate_path(target_dir, victim) + os.remove(victim) + + # Remove directories + for victim in (remove_dirs): + if os.path.exists(victim): + # REDFLAG: I saw this fail silently once + validate_path(target_dir, victim) + shutil.rmtree(victim) + assert not os.path.exists(victim) + + # Make directories that exist in manifest, but not locally. + for dir_name in dirs: + if not os.path.exists(dir_name): + validate_path(target_dir, dir_name) + os.makedirs(dir_name) + + # Copy files out of the archive, onto the local file system. + for file_name in manifest.name_map: + validate_path(target_dir, os.path.join(target_dir, file_name)) + archive.get_file(manifest.name_map[file_name][1], + os.path.join(target_dir, file_name)) + + return (create, overwrite, set(remove.keys()), remove_dirs) + +class RawDataTupleEntry(IManifestEntry): + """ IManifestEntry implementation for a path to a file on the + local filesystem. """ + def __init__(self, tmps, raw_tuple): + IManifestEntry.__init__(self) + self.tmps = tmps + self.raw_tuple = raw_tuple + self.full_path = None + def get_name(self): + """ IManifestEntry implementation. """ + return self.raw_tuple[0] + + def make_file(self): + """ IManifestEntry implementation. """ + assert self.full_path is None + self.full_path = self.tmps.make_temp_file() + out_file = open(self.full_path, 'wb') + try: + out_file.write(self.raw_tuple[1]) + finally: + out_file.close() + + return self.full_path + + # MUST call this. + def release(self): + """ IManifestEntry implementation. """ + if not self.full_path is None: + self.tmps.remove_temp_file(self.full_path) + self.full_path = None + + # REDFLAG: Does this really help garbage collection or just CC? + self.raw_tuple = None + self.tmps = None + +def entries_from_seq(tmps, sequence): + """ An iterator which yields FileManifestEntries from a sequence of + (name, raw_data) tuples. + + REQUIRES: sequence not modified while iterating. + """ + for value in sequence: + yield RawDataTupleEntry(tmps, value) diff --git a/wormarc/hghelper.py b/wormarc/hghelper.py new file mode 100644 --- /dev/null +++ b/wormarc/hghelper.py @@ -0,0 +1,38 @@ +""" Testing helper functions for using hg repos. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +from mercurial import hg, commands, ui + +#{'rev': '0', 'no_decode': None, 'prefix': '', 'exclude': [], +# 'include': [], 'type': ''} +def export_hg_repo(src_dir, dest_dir, target_rev): + """ Export the files in the hg repo in src_dir to dest_dir. """ + ui_ = ui.ui() + repo = hg.repository(ui_, src_dir) + commands.archive(ui_, + repo, + dest_dir, + rev=target_rev, + prefix='' # <- needs this to work. + ) + return repo['tip'].rev() + + diff --git a/wormarc/linkmap.py b/wormarc/linkmap.py new file mode 100644 --- /dev/null +++ b/wormarc/linkmap.py @@ -0,0 +1,202 @@ +""" A class to keep track of history links stored in a set of files. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +from binaryrep import read_link, str_sha + +class LinkMap(dict): + """ A history link hash addressable index of the history links in + a set of block files. """ + def __init__(self): + dict.__init__(self) + self.files = [] + + def read(self, file_list, keep_data=False): + """ Read the index from a collection of block files. """ + counts = [0 for dummy in range(0, len(file_list))] + age = 0 # Hmmmm + for index, name in enumerate(file_list): + in_stream = open(name, 'rb') + raised = True + try: + latest_age, count = self.read_from_stream(in_stream, + index, keep_data) + age = max(age, latest_age) + counts[index] = count + raised = False + finally: + if raised or keep_data: + in_stream.close() + else: + self.files.append(in_stream) + return age, tuple(counts) + + + def read_from_stream(self, in_stream, index, keep_data=False): + """ Read links from a stream. """ + age = 0 + count = 0 + while True: + link = read_link(in_stream, keep_data, in_stream.tell(), + index) + if link is None: + break + + age = max(age, link[1]) + prev = list(self.get(link[0], [])) + link = list(link) # REDFLAG: ??? tuple -> list -> tuple + prev.append(tuple(link)) + self[link[0]] = tuple(prev) + count += 1 + + return age, count + + # SLOW, get rid of list copy? + # fixups is a old_index -> new index map + # Omit from fixups == delete + def _update_block_ordinals(self, fixups): + """ INTERNAL: Implementation helper for update_blocks(). """ + for sha_hash in self.keys(): + prev = self.get(sha_hash) + updated = [] + for link in prev: + assert link[0] == sha_hash + if not link[5] in fixups: + continue # Dropped block + link = list(link) + link[5] = fixups[link[5]] + updated.append(tuple(link)) + if len(updated) > 0: + self[sha_hash] = tuple(updated) + else: + del self[sha_hash] + + # Fixes ordinals in referenced links + # Drops omited blocks + # Closes and re-opens all file streams. + # Loads links from the streams in new_indices. + def update_blocks(self, fixups, file_list, new_indices, keep_data=False): + """ Update the index to track addition, deletion and reordering of + the underlying block files. """ + + assert len(self.files) == 0 # must be closed. + self._update_block_ordinals(fixups) + self.files = [] + age = 0 + raised = True + try: + for index, name in enumerate(file_list): + self.files.append(open(name, 'rb')) + if not index in new_indices: + continue + + # Need to read links out of the new file. + latest_age, dummy = self.read_from_stream(self.files[index], + index, keep_data) + age = max(age, latest_age) + raised = False + return age + finally: + if raised: + self.close() + + def close(self): + """ Close the index. """ + for in_file in self.files: + in_file.close() + self.files = [] + + def get_link(self, link_sha, need_data=False): + """ Get a history link by its sha1 hash. """ + links = self.get(link_sha, None) + if links is None: + raise IOError("Unresolved link: " + str_sha(link_sha)) + + assert len(links) > 0 + # REDFLAG: Fully think through. + # The same link can exist in multiple files. + link = links[0] + + if (not need_data) or (not link[3] is None): + return link + + index = link[5] + self.files[index].seek(link[4]) + ret = read_link(self.files[index], True) + if ret is None: + raise IOError("Couldn't read blob from disk.") + + assert ret[0] == link[0] + assert ret[1] == link[1] + assert ret[2] == link[2] + assert not ret[3] is None + assert ret[0] == link_sha + return ret + +def raw_block_read(link_map, ordinal): + """ Read a single block file. """ + table = {} + in_stream = link_map.files[ordinal] + in_stream.seek(0) + while True: + start_pos = in_stream.tell() + link = read_link(in_stream, False, start_pos, ordinal) + # read_link() never returns None except for eof, right? + # Otherwise we'd only do a partial read... + if link is None: + break + entry = table.get(link[0], []) + entry.append(link) + table[link[0]] = entry + return table + +def links_by_block(link_map): + """ INTERNAL: Implementation helper function for + verify_link_map(). """ + tables = [{} for dummy in range(0, len(link_map.files))] + for links in link_map.values(): + assert len(links) > 0 + for link in links: + ordinal = link[5] + assert ordinal >= 0 and ordinal < len(link_map.files) + entry = tables[ordinal].get(link[0], []) + entry.append(link) + tables[ordinal][link[0]] = entry + return tables + +def verify_link_map(link_map): + """ Debugging function to verify the integrity of a LinkMap instance. """ + + assert len(link_map.files) > 0 + count = 0 + by_block = links_by_block(link_map) + + for ordinal in range(0, len(link_map.files)): + raw_shas = raw_block_read(link_map, ordinal) + # Hashes read from the raw file are the same as + # the ones that the LinkMap thinks should be in the file. + assert frozenset(raw_shas.keys()) == frozenset(by_block[ordinal].keys()) + + # Now check values. + for link_sha in raw_shas: + assert (frozenset(raw_shas[link_sha]) == + frozenset(by_block[ordinal][link_sha])) + count += 1 + return count diff --git a/wormarc/shafunc.py b/wormarc/shafunc.py new file mode 100644 --- /dev/null +++ b/wormarc/shafunc.py @@ -0,0 +1,50 @@ +""" Deal with move of SHA1 hash lib from sha to hashlib module. + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + +try: + #raise ImportError("fake") # Tested under 2.6 using this. + from hashlib import sha1 as newshafunc + #print "LOADED NEW" + def new_sha(value=None): + """ Make new SHA1 instance using hashlib module. """ + if value == None: + return newshafunc() + return newshafunc(value) + +except ImportError: + # Fall back so that code still runs on pre 2.6 systems. + import sha as oldshamod + #print "LOADED OLD" + def new_sha(value=None): + """ Make new SHA1 instance using old sha module. """ + if value == None: + return oldshamod.new() + return oldshamod.new(value) + +# from shafunc import new_sha +# def main(): +# text = 'OH HAI' +# a = new_sha() +# a.update(text) +# b = new_sha(text) +# print a.hexdigest() +# print b.hexdigest() + +# main() diff --git a/wormarc/test_archive.py b/wormarc/test_archive.py new file mode 100644 --- /dev/null +++ b/wormarc/test_archive.py @@ -0,0 +1,730 @@ +""" Unit tests. + + Copyright (C) 2009 Darrell Karbott + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public + License as published by the Free Software Foundation; either + version 2.0 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks +""" + + +# OK to be a little sloppy for test code. +# pylint: disable-msg=C0111 +# For setUp() and tearDown() +# pylint: disable-msg=C0103 +# Allow attribute creation in setUp() +# pylint: disable-msg=W0201 +# Allow test methods that don't reference self. +# pylint: disable-msg=R0201 +# Allow many test methods. +# pylint: disable-msg=R0904 +import os +import shutil +import traceback +import random +import time +import sys +import unittest + +from shafunc import new_sha as sha1 + +from binaryrep import NULL_SHA, get_file_sha, str_sha +from blocks import BlockStorage, ITempFileManager +from linkmap import verify_link_map +from filemanifest import FileManifest, entries_from_dir, entries_from_seq, \ + manifest_to_dir, verify_manifest, validate_path + +from archive import WORMBlockArchive, is_ordered, is_contiguous, \ + repartition, compress + +from deltacoder import DeltaCoder + +from hghelper import export_hg_repo + +# False causes test dir to be cleaned up automatically +# after every run. +LEAVE_TEST_DIR = False + +# Absolute path to some hg repository to use for +# testing. +# You MUST MODIFY this for test_hg_repo_torture_test() to work +HG_REPO_DIR = "" +# e.g.: +#HG_REPO_DIR = os.path.expanduser("~/mess/hg_zoo/somedude") + +#----------------------------------------------------------# +TEST_BASE = '/tmp/' +TEST_ROOT = '__latest_test_run__' + +TMP_DIR = '__TMP__' +TEST_DIR = 'test' + + +class HandleTemps(ITempFileManager): + """ Delegate to handle temp file creation and deletion. """ + def __init__(self, base_dir): + ITempFileManager.__init__(self) + self.base_dir = base_dir + self.callers = {} + def make_temp_file(self): + """ Return a new unique temp file name including full path. """ + name = os.path.join(self.base_dir, "__TMP__%s" % + str(random.random())[2:]) + self.callers[name] = traceback.extract_stack() + return name + + def remove_temp_file(self, full_path): + """ Remove and existing temp file. """ + if not os.path.split(full_path)[-1].startswith("__TMP__"): + raise IOError("Didn't create: %s" % full_path) + + if not os.path.exists(full_path): + return + + if full_path in self.callers.keys(): + del self.callers[full_path] + else: + print "HandleTemps.remove_file() -- removing non-managed file???" + print full_path + + os.remove(full_path) + + def check_for_leaks(self): + for name in self.callers: + if not os.path.exists(name): + continue + + print "LEAKED: ", name + print "FROM:" + print self.callers[name] + + if len(os.listdir(self.base_dir)) > 0: + file_count = 0 + for name in os.listdir(self.base_dir): + if os.path.isdir(os.path.join(self.base_dir, name)): + # Allow directories. e.g. __hg_repo__, __unarchived__. + print "HandleTemps.check_for_leaks -- ignored dir: ", name + continue + print name + file_count += 1 + + if file_count > 0: + raise IOError("Undeleted temp files!") + +def dump_blocks(blocks, msg=None, brief=False): + if not msg is None: + print msg + values = [] + for index in range(0, len(blocks.tags)): + path = blocks.full_path(index) + if os.path.exists(path): + length = str(os.path.getsize(path)) + else: + length = "no_file" + if brief: + values.append(length) + else: + values.append("%s:[%s]" % (path, length)) + + if brief: + print "blocks: " + " ".join(values) + else: + print "blocks\n" + "\n".join(values) + +def link_str(link): + return "(%s, %i, %s, data: %s, %i, %s)" % (str_sha(link[0]), + link[1], + str_sha(link[2]), + bool(link[3]), + link[4], + link[5]) +def dump_links(links, msg=None): + if not msg is None: + print msg + for link in links: + print link_str(link) + +def dump_link_map(link_map, msg=None, brief=False): + if not msg is None: + print msg + print "keys: ", len(link_map) + if brief: + return + keys = link_map.keys() + keys.sort() + for key in keys: + print str_sha(key) + dump_links(link_map[key]) + +def dump_names_map(names_map, msg=None): + if not msg is None: + print msg + keys = names_map.keys() + keys.sort() + for key in keys: + hashes = names_map[key] + print "%s->(%s, %s)" % (key, str_sha(hashes[0]), str_sha(hashes[1])) + +def dump_archive(archive, msg=None, brief=False): + print "--- start archive dump ---" + if not msg is None: + print msg + print "age: %i max_blocks: %i" % (archive.age, archive.max_blocks) + dump_blocks(archive.blocks, "blocks:") + + dump_link_map(archive.blocks.link_map, "link_map:", brief) + print "--- end ---" + + +def words(): + while True: + yield sha1(str(random.random())).hexdigest()[:random.randrange(1, 9)] + +WORD_ITR = words() + +def lines(count): + line = "" + while count > 0: + line += WORD_ITR.next() + line += " " + if len(line) > 60: + ret = line + line = "" + count -= 1 + yield ret.strip() + return + +class ArchiveTestCase(unittest.TestCase): + def setup_test_dirs(self, base_dir, dir_name): + if not os.path.exists(base_dir): + raise IOError("Base test directory doesn't exist: %s" % base_dir) + + full_path = os.path.join(base_dir, dir_name) + if os.path.exists(full_path): + raise IOError("Test directory exists: %s" % full_path) + + os.makedirs(full_path) + self.test_root = full_path + self.test_dir = os.path.join(self.test_root, TEST_DIR) + self.tmp_dir = os.path.join(self.test_root, TMP_DIR) + os.makedirs(self.test_dir) + os.makedirs(self.tmp_dir) + + def remove_test_dirs(self): + assert self.test_root.endswith(TEST_ROOT) + try: + self.tmps.check_for_leaks() + finally: + if not LEAVE_TEST_DIR: + shutil.rmtree(self.test_root) + + # Caller must release temp file. + def write_file(self, raw): + file_name = self.tmps.make_temp_file() + out_file = open(file_name, 'wb') + raised = True + try: + out_file.write(raw) + out_file.close() + raised = False + finally: + out_file.close() + if raised: + self.tmps.remove_temp_file(file_name) + + return file_name + + def read_file(self, file_name, remove_tmp=True): + in_file = open(file_name, 'rb') + try: + ret = in_file.read() + finally: + in_file.close() + if remove_tmp: + self.tmps.remove_temp_file(file_name) + return ret + + + def setUp(self): + self.setup_test_dirs(TEST_BASE, TEST_ROOT) + self.tmps = HandleTemps(self.tmp_dir) + + def tearDown(self): + self.remove_test_dirs() + +class SmokeTests(ArchiveTestCase): + def _testLeakATempFile(self): + out_file = open(self.tmps.make_temp_file(), 'wb') + out_file.write("OH NOES! FILZ IZ LIIKAN!!!") + out_file.close() + + def make_empty_archive(self, block_name): + archive = WORMBlockArchive(DeltaCoder(), BlockStorage(self.tmps)) + + archive.create(self.test_dir, block_name) + + return archive + + def load_archive(self, block_name): + archive = WORMBlockArchive(DeltaCoder(), BlockStorage(self.tmps)) + archive.load(self.test_dir, block_name) + + return archive + + def test_create_archive(self): + print + archive = self.make_empty_archive('A') + dump_archive(archive) + + def test_load_archive(self): + print + self.make_empty_archive('A') + b = self.load_archive('A') + dump_archive(b) + + def test_archive_write_read(self): + a = self.make_empty_archive('A') + dump_archive(a, "empty") + + r0 = self.write_file("OH HAI!") + r1 = self.write_file("OH HAI! AGAIN") + r2 = self.write_file("STILL ME") + + t1 = self.tmps.make_temp_file() + try: + a.start_update() + link0 = a.write_new_delta(NULL_SHA, r0) + link1 = a.write_new_delta(NULL_SHA, r1) + link2 = a.write_new_delta(NULL_SHA, r2) + + # Write + a.commit_update() + dump_archive(a, "updated") + + # Read + print + print str_sha(link0[0]), a.get_data(link0[0]) + print str_sha(link1[0]), a.get_data(link1[0]) + print str_sha(link2[0]), a.get_data(link2[0]) + + a.close() + + b = self.load_archive('A') + dump_archive(b, "[Reloaded from disk]") + print + # Mix up order. + print str_sha(link1[0]), b.get_data(link1[0]) + print str_sha(link0[0]), b.get_data(link0[0]) + print str_sha(link2[0]), b.get_data(link2[0]) + finally: + self.tmps.remove_temp_file(t1) + self.tmps.remove_temp_file(r0) + self.tmps.remove_temp_file(r1) + self.tmps.remove_temp_file(r2) + #a.abandon_update() + + def test_torture_a_single_chain(self): + a = self.make_empty_archive('A') + dump_archive(a, "empty") + + text = "" + prev = NULL_SHA + for iteration in range(0, 5000): + # Write + a.start_update() + text += str(time.time()) + '\n' + t2 = self.write_file(text) + #print "Adding to: ", str_sha(prev) + + link = a.write_new_delta(prev, t2) + new_sha = link[0] + link = None + #print "Added: ", str_sha(new_sha), str_sha(new_parent) + a.commit_update() + self.tmps.remove_temp_file(t2) + + #history = a.blocks.get_history(new_sha) + #history_size = sum([value[6] for value in history]) + #print "History: ", len(history), history_size, len(text) + #print + #dump_archive(a, "updated", True) + + t3 = self.tmps.make_temp_file() + a.get_file(new_sha, t3) + + self.assertTrue(text == self.read_file(t3)) + + prev = new_sha + if iteration > 0 and iteration % 100 == 0: + print "iteration: ", iteration + + # grrr... giving up on temp files + def test_single_update(self): + a = self.make_empty_archive('A') + m = FileManifest() + data = ( \ + ('foo.txt', 'This is the foo file.\n'), + ('empty.txt', ''), + ('big.txt', '*' * (1024 * 128)), + ) + entries = entries_from_seq(self.tmps, data) + m.update(a, entries) + dump_archive(a) + + def test_multiple_updates(self): + a = self.make_empty_archive('A') + m = FileManifest() + data0 = ( \ + ('foo.txt', 'This is the foo file.\n'), + ('empty.txt', ''), + ('big.txt', '*' * (1 * 128)), + ) + + print "manifest sha: ", str_sha(m.stored_sha) + m.update(a, entries_from_seq(self.tmps, data0)) + print "manifest sha: ", str_sha(m.stored_sha) + + dump_archive(a, "AFTER FIRST WRITE:") + verify_manifest(a, m) + + data1 = ( \ + ('foo.txt', 'This is the foo file.\n'), + ('empty.txt', ''), + ('big.txt', 'hello' + ('*' * (1 * 128))), + ) + + m.update(a, entries_from_seq(self.tmps, data1)) + print "manifest sha: ", str_sha(m.stored_sha) + dump_archive(a) + verify_link_map(a.blocks.link_map) + verify_manifest(a, m) + + def test_words(self): + print WORD_ITR.next() + + def test_lines(self): + for line in lines(10): + print line + + def test_many_updates(self): + + a = self.make_empty_archive('A') + m = FileManifest() + + files = ("A.txt", "B.txt", "C.txt") + + updates = 100 + for dummy in range(0, updates): + names = list(files) + random.shuffle(names) + #names = names[:random.randrange(1, len(files))] + data = [] + for name in names: + text = '' + if name in m.name_map: + tmp = self.tmps.make_temp_file() + a.get_file(m.name_map[name][1], tmp) + text = self.read_file(tmp) + text += "\n".join([line for line in lines(20)]) + + data.append((name, text)) + + #print "updating:" + #for value in data: + # print value[0], len(value[1]) + + #print "manifest sha: ", str_sha(m.stored_sha) + #dump_archive(a, "BEFORE UPDATE: %i" % count, True) + m.update(a, entries_from_seq(self.tmps, data)) + #print "manifest sha: ", str_sha(m.stored_sha) + + #dump_archive(a, "AFTER UPDATE: %i" % count, True) + verify_manifest(a, m, True) + verify_link_map(a.blocks.link_map) + dump_blocks(a.blocks, None, True) + + a.close() + + + def test_validate_path(self): + base_dir = "/tmp/test/foo" + validate_path(base_dir, "/tmp/test/foo/bar") + validate_path(base_dir, "/tmp/test/foo/baz") + validate_path(base_dir, "/tmp/test/foo/barf/text.dat") + + try: + validate_path(base_dir, "/tmp/test/foo/../../../etc/passwd") + self.assertTrue(False) + except IOError, e: + print "Got expected exception: ", e + + try: + validate_path(base_dir, "/tmp/test/foo/../forbidden") + self.assertTrue(False) + except IOError, e: + print "Got expected exception: ", e + + try: + validate_path(base_dir, + u"/tmp/test/foo/f\xc3\xb6rbjuden.txt") + self.assertTrue(False) + except IOError, e: + print "Got expected exception: ", e + + try: + validate_path(base_dir, + "/tmp/test/foo/f\xc3\xb6rbjuden.txt") + self.assertTrue(False) + except IOError, e: + print "Got expected exception: ", e + + def test_is_contiguous(self): + self.assertTrue(is_contiguous( () )) + self.assertTrue(is_contiguous( ((0, 0, '?'), ) )) + self.assertTrue(is_contiguous( ((0, 0, 2), (1, 1, '?')) )) + self.assertTrue(is_contiguous( ((0, 1, 2), (2, 3, '?')) )) + self.assertFalse(is_contiguous( ((0, 0, 2), (2, 2, '?')) )) + self.assertFalse(is_contiguous( ((0, 1, 2), (3, 3, '?')) )) + + # Trailing Zeros are ignored. + def test_is_ordered(self): + self.assertTrue(is_ordered( () )) + self.assertTrue(is_ordered( (('?', '?', 2),) )) + self.assertTrue(is_ordered( (('?', '?', 2), ('?', '?', 2)) )) + self.assertFalse(is_ordered( (('?', '?', 2), ('?', '?', 1)) )) + self.assertTrue(is_ordered( (('?', '?', 1), ('?', '?', 2)) )) + self.assertTrue(is_ordered( (('?', '?', 2), ('?', '?', 2), + ('?', '?', 2)) )) + self.assertTrue(is_ordered( (('?', '?', 1), ('?', '?', 2), + ('?', '?', 2)) )) + self.assertFalse(is_ordered( (('?', '?', 1), ('?', '?', 0), + ('?', '?', 2)) )) + self.assertTrue(is_ordered( (('?', '?', 1), ('?', '?', 2), + ('?', '?', 3)) )) + + self.assertTrue(is_ordered( (('?', '?', 2), ('?', '?', 0)) )) + self.assertTrue(is_ordered( (('?', '?', 2), ('?', '?', 2), + ('?', '?', 0)) )) + self.assertFalse(is_ordered( (('?', '?', 2), ('?', '?', 1), + ('?', '?', 0)) )) + self.assertTrue(is_ordered( (('?', '?', 1), ('?', '?', 2), + ('?', '?', 0), ('?', '?', 0)) )) + + + self.assertTrue(is_ordered( (('?', '?', 2), ('?', '?', 2), + ('?', '?', 2), + ('?', '?', 0)) )) + + + self.assertTrue(is_ordered( (('?', '?', 2), ('?', '?', 2), + ('?', '?', 2), + ('?', '?', 0)) )) + self.assertTrue(is_ordered( (('?', '?', 1), ('?', '?', 2), + ('?', '?', 2), + ('?', '?', 0)) )) + self.assertFalse(is_ordered( (('?', '?', 1), ('?', '?', 0), + ('?', '?', 2), + ('?', '?', 0)) )) + self.assertTrue(is_ordered( (('?', '?', 1), ('?', '?', 2), + ('?', '?', 3), + ('?', '?', 0)) )) + + self.assertFalse(is_ordered( (('?', '?', 3), ('?', '?', 2), + ('?', '?', 1), + ('?', '?', 0)) )) + + self.assertFalse(is_ordered( (('?', '?', 3), ('?', '?', 2), + ('?', '?', 1) )) ) + + + def test_repartition(self): + for dummy in range(0, 1000): + length = random.randrange(1, 8) + blocks = [(index, index, random.randrange(1, 10)) + for index in range(0, length)] + self.assertTrue(is_contiguous(blocks)) + original_blocks = blocks[:] + #were_ordered = is_ordered(blocks) + #print blocks + repartioned = repartition(blocks) + #print repartioned + self.assertTrue(is_ordered(repartioned)) + self.assertTrue(blocks == original_blocks) + + # Can't assert this anymore. + # Trips when in order partitions get merged because they + # don't meet the multiple constraint. + # #self.assertTrue((were_ordered and blocks == repartioned) or + # ((not were_ordered) and blocks != repartioned)) + + self.assertTrue(is_contiguous(repartioned)) + + + def updateFunc(self, blocks, change_len, max_len): + assert len(blocks) > 0 + blocks = blocks[:] + if blocks[0][2] + change_len < 32 * 1024: + blocks[0] = (blocks[0][0], blocks[0][1], blocks[0][2] + change_len) + return blocks + # Add and compress + blocks.insert(0, (-1, -1, change_len)) + return compress(blocks, max_len) + + def histogram(self, values, bin_width): + table = {} + for value in values: + index = int(value/bin_width) + table[index] = table.get(index, 0) + 1 + + max_bin = max(table.keys()) + return tuple([(index, table.get(index, 0)) + for index in range(0, max_bin + 1)]) + + + # Naive + # DOESN'T SIMULATE: + # o Dropping unreferenced chains. + # o GOOD: reduces total archive size + # o BAD: effective length of older blocks declines with time + # as unreferenced chains drop out. -> churn ??? + # o variance in commit sizes + + # HACKed this together fast, not sure it is correct. + # Looks like I'm getting a power law dist. + def test_simulate_updates(self): + max_blocks = 4 + iterations = 10000 + change_size = 2*1024 + blocks = [(index, index, 0) for index in range(0, max_blocks)] + changes = [] + for dummy in range(0, iterations): + old_blocks = blocks[:] + blocks = self.updateFunc(blocks, change_size, max_blocks) + + if not ((is_ordered(blocks) or + (is_ordered(blocks[1:]) and blocks[0][2] < 32 * 1024))): + print blocks + + self.assertTrue(is_ordered(blocks) or + (is_ordered(blocks[1:]) and + blocks[0][2] < 32 * 1024)) + + changed = set(old_blocks) - set(blocks) + for value in changed: + # i.e. the number of bytes we had to write + changes.append(value[2]) + + # Fix ordinals. Shouldn't matter. + blocks = [(index, index, blocks[index][2]) for index + in range(0, len(blocks))] + + #hist = self.histogram(changes, 32 * 1024) + #for value in hist: + # print value[0], value[1] + + changes.sort() + #max_insert = max(changes) + for percent in (50, 75, 80, 85, 90, 95, 99, 100): + point = changes[min(int((percent/100.0) * len(changes)), + len(changes) - 1)] + print "%i %i %i" % (percent, point, point/(32*1024 + 1)) + + + def test_hg_repo_torture_test(self): + if HG_REPO_DIR == '': + print "Set HG_REPO_DIR!" + self.assertTrue(False) + + writer = self.make_empty_archive('hgtst') + manifest = FileManifest() + + rev = 0 + max_rev = 1 # Set below + while rev < max_rev: + target_dir = os.path.join(self.tmp_dir, '__hg_repo__') + if os.path.exists(target_dir): + shutil.rmtree(target_dir) # DANGEROUS + + # export the repo + # FIX: Wacky way to set max_rev. + print "Exporting rev: ", rev + max_rev = export_hg_repo(HG_REPO_DIR, target_dir, rev) + if rev >= max_rev: + break + + # put the export dir into the archive + # print "Inserting into the archive..." + + entries = entries_from_dir(target_dir, True) + manifest.update(writer, entries) + + # Will be written into Freenet top key + # along with rest of archive info. + s3kr1t = manifest.stored_sha + + dump_blocks(writer.blocks, None, True) + # create a second archive instance from the same block files. + # REDFLAG: Would this work on windoze? + # writer still has files open for reading. + reader = self.load_archive('hgtst') + read_manifest = FileManifest.from_archive(reader, s3kr1t) + # REDFLAG: audit for other places where I could do + # direct dict compares? + assert (read_manifest.name_map == manifest.name_map) + + # clean the archive output dir + unarchived_dir = os.path.join(self.tmp_dir, '__unarchived__') + if os.path.exists(unarchived_dir): + shutil.rmtree(unarchived_dir) # DANGEROUS + + os.makedirs(unarchived_dir) + + # extract the archive to the cleaned files + manifest_to_dir(reader, read_manifest, unarchived_dir) + reader.close() + + # diff the directories + + # A poor man's diff. + insert_map = {} + for entry in entries_from_dir(target_dir, True): + insert_map[entry.get_name()] = get_file_sha(entry.make_file()) + entry.release() # NOP + + unarchived_map = {} + for entry in entries_from_dir(unarchived_dir, True): + unarchived_map[entry.get_name()] = ( + get_file_sha(entry.make_file())) + entry.release() # NOP + + + assert len(insert_map) > 0 + assert insert_map == unarchived_map + print "%i files compared equal." % len(insert_map) + + rev += 1 + + +if __name__ == '__main__': + # use -v on command line to get verbose output. + # verbosity keyword arg not supported in 2.6? + if len(sys.argv) >= 2 and sys.argv[1] != '-v': + # Run a single test case + suite = unittest.TestSuite() + suite.addTest(SmokeTests(sys.argv[1])) + unittest.TextTestRunner().run(suite) + else: + # Run everything. + unittest.main()