""" Unit tests. Copyright (C) 2009 Darrell Karbott This library is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks """ # OK to be a little sloppy for test code. # pylint: disable-msg=C0111 # For setUp() and tearDown() # pylint: disable-msg=C0103 # Allow attribute creation in setUp() # pylint: disable-msg=W0201 # Allow test methods that don't reference self. # pylint: disable-msg=R0201 # Allow many test methods. # pylint: disable-msg=R0904 import os import shutil import traceback import random import time import sys import unittest from shafunc import new_sha as sha1 from binaryrep import NULL_SHA, get_file_sha, str_sha from blocks import BlockStorage, ITempFileManager from linkmap import verify_link_map from filemanifest import FileManifest, entries_from_dir, entries_from_seq, \ manifest_to_dir, verify_manifest, validate_path from archive import WORMBlockArchive, is_ordered, is_contiguous, \ repartition, compress from deltacoder import DeltaCoder from hghelper import export_hg_repo # False causes test dir to be cleaned up automatically # after every run. LEAVE_TEST_DIR = False # Absolute path to some hg repository to use for # testing. # You MUST MODIFY this for test_hg_repo_torture_test() to work HG_REPO_DIR = "" # e.g.: #HG_REPO_DIR = os.path.expanduser("~/mess/hg_zoo/somedude") #----------------------------------------------------------# TEST_BASE = '/tmp/' TEST_ROOT = '__latest_test_run__' TMP_DIR = '__TMP__' TEST_DIR = 'test' class HandleTemps(ITempFileManager): """ Delegate to handle temp file creation and deletion. """ def __init__(self, base_dir): ITempFileManager.__init__(self) self.base_dir = base_dir self.callers = {} def make_temp_file(self): """ Return a new unique temp file name including full path. """ name = os.path.join(self.base_dir, "__TMP__%s" % str(random.random())[2:]) self.callers[name] = traceback.extract_stack() return name def remove_temp_file(self, full_path): """ Remove and existing temp file. """ if not os.path.split(full_path)[-1].startswith("__TMP__"): raise IOError("Didn't create: %s" % full_path) if not os.path.exists(full_path): return if full_path in self.callers.keys(): del self.callers[full_path] else: print "HandleTemps.remove_file() -- removing non-managed file???" print full_path os.remove(full_path) def check_for_leaks(self): for name in self.callers: if not os.path.exists(name): continue print "LEAKED: ", name print "FROM:" print self.callers[name] if len(os.listdir(self.base_dir)) > 0: file_count = 0 for name in os.listdir(self.base_dir): if os.path.isdir(os.path.join(self.base_dir, name)): # Allow directories. e.g. __hg_repo__, __unarchived__. print "HandleTemps.check_for_leaks -- ignored dir: ", name continue print name file_count += 1 if file_count > 0: raise IOError("Undeleted temp files!") def dump_blocks(blocks, msg=None, brief=False): if not msg is None: print msg values = [] for index in range(0, len(blocks.tags)): path = blocks.full_path(index) if os.path.exists(path): length = str(os.path.getsize(path)) else: length = "no_file" if brief: values.append(length) else: values.append("%s:[%s]" % (path, length)) if brief: print "blocks: " + " ".join(values) else: print "blocks\n" + "\n".join(values) def link_str(link): return "(%s, %i, %s, data: %s, %i, %s)" % (str_sha(link[0]), link[1], str_sha(link[2]), bool(link[3]), link[4], link[5]) def dump_links(links, msg=None): if not msg is None: print msg for link in links: print link_str(link) def dump_link_map(link_map, msg=None, brief=False): if not msg is None: print msg print "keys: ", len(link_map) if brief: return keys = link_map.keys() keys.sort() for key in keys: print str_sha(key) dump_links(link_map[key]) def dump_names_map(names_map, msg=None): if not msg is None: print msg keys = names_map.keys() keys.sort() for key in keys: hashes = names_map[key] print "%s->(%s, %s)" % (key, str_sha(hashes[0]), str_sha(hashes[1])) def dump_archive(archive, msg=None, brief=False): print "--- start archive dump ---" if not msg is None: print msg print "age: %i max_blocks: %i" % (archive.age, archive.max_blocks) dump_blocks(archive.blocks, "blocks:") dump_link_map(archive.blocks.link_map, "link_map:", brief) print "--- end ---" def words(): while True: yield sha1(str(random.random())).hexdigest()[:random.randrange(1, 9)] WORD_ITR = words() def lines(count): line = "" while count > 0: line += WORD_ITR.next() line += " " if len(line) > 60: ret = line line = "" count -= 1 yield ret.strip() return class ArchiveTestCase(unittest.TestCase): def setup_test_dirs(self, base_dir, dir_name): if not os.path.exists(base_dir): raise IOError("Base test directory doesn't exist: %s" % base_dir) full_path = os.path.join(base_dir, dir_name) if os.path.exists(full_path): raise IOError("Test directory exists: %s" % full_path) os.makedirs(full_path) self.test_root = full_path self.test_dir = os.path.join(self.test_root, TEST_DIR) self.tmp_dir = os.path.join(self.test_root, TMP_DIR) os.makedirs(self.test_dir) os.makedirs(self.tmp_dir) def remove_test_dirs(self): assert self.test_root.endswith(TEST_ROOT) try: self.tmps.check_for_leaks() finally: if not LEAVE_TEST_DIR: shutil.rmtree(self.test_root) # Caller must release temp file. def write_file(self, raw): file_name = self.tmps.make_temp_file() out_file = open(file_name, 'wb') raised = True try: out_file.write(raw) out_file.close() raised = False finally: out_file.close() if raised: self.tmps.remove_temp_file(file_name) return file_name def read_file(self, file_name, remove_tmp=True): in_file = open(file_name, 'rb') try: ret = in_file.read() finally: in_file.close() if remove_tmp: self.tmps.remove_temp_file(file_name) return ret def setUp(self): self.setup_test_dirs(TEST_BASE, TEST_ROOT) self.tmps = HandleTemps(self.tmp_dir) def tearDown(self): self.remove_test_dirs() class SmokeTests(ArchiveTestCase): def _testLeakATempFile(self): out_file = open(self.tmps.make_temp_file(), 'wb') out_file.write("OH NOES! FILZ IZ LIIKAN!!!") out_file.close() def make_empty_archive(self, block_name): archive = WORMBlockArchive(DeltaCoder(), BlockStorage(self.tmps)) archive.create(self.test_dir, block_name) return archive def load_archive(self, block_name): archive = WORMBlockArchive(DeltaCoder(), BlockStorage(self.tmps)) archive.load(self.test_dir, block_name) return archive def test_create_archive(self): print archive = self.make_empty_archive('A') dump_archive(archive) def test_load_archive(self): print self.make_empty_archive('A') b = self.load_archive('A') dump_archive(b) def test_archive_write_read(self): a = self.make_empty_archive('A') dump_archive(a, "empty") r0 = self.write_file("OH HAI!") r1 = self.write_file("OH HAI! AGAIN") r2 = self.write_file("STILL ME") t1 = self.tmps.make_temp_file() try: a.start_update() link0 = a.write_new_delta(NULL_SHA, r0) link1 = a.write_new_delta(NULL_SHA, r1) link2 = a.write_new_delta(NULL_SHA, r2) # Write a.commit_update() dump_archive(a, "updated") # Read print print str_sha(link0[0]), a.get_data(link0[0]) print str_sha(link1[0]), a.get_data(link1[0]) print str_sha(link2[0]), a.get_data(link2[0]) a.close() b = self.load_archive('A') dump_archive(b, "[Reloaded from disk]") print # Mix up order. print str_sha(link1[0]), b.get_data(link1[0]) print str_sha(link0[0]), b.get_data(link0[0]) print str_sha(link2[0]), b.get_data(link2[0]) finally: self.tmps.remove_temp_file(t1) self.tmps.remove_temp_file(r0) self.tmps.remove_temp_file(r1) self.tmps.remove_temp_file(r2) #a.abandon_update() def test_torture_a_single_chain(self): a = self.make_empty_archive('A') dump_archive(a, "empty") text = "" prev = NULL_SHA for iteration in range(0, 5000): # Write a.start_update() text += str(time.time()) + '\n' t2 = self.write_file(text) #print "Adding to: ", str_sha(prev) link = a.write_new_delta(prev, t2) new_sha = link[0] link = None #print "Added: ", str_sha(new_sha), str_sha(new_parent) a.commit_update() self.tmps.remove_temp_file(t2) #history = a.blocks.get_history(new_sha) #history_size = sum([value[6] for value in history]) #print "History: ", len(history), history_size, len(text) #print #dump_archive(a, "updated", True) t3 = self.tmps.make_temp_file() a.get_file(new_sha, t3) self.assertTrue(text == self.read_file(t3)) prev = new_sha if iteration > 0 and iteration % 100 == 0: print "iteration: ", iteration # grrr... giving up on temp files def test_single_update(self): a = self.make_empty_archive('A') m = FileManifest() data = ( \ ('foo.txt', 'This is the foo file.\n'), ('empty.txt', ''), ('big.txt', '*' * (1024 * 128)), ) entries = entries_from_seq(self.tmps, data) m.update(a, entries) dump_archive(a) def test_multiple_updates(self): a = self.make_empty_archive('A') m = FileManifest() data0 = ( \ ('foo.txt', 'This is the foo file.\n'), ('empty.txt', ''), ('big.txt', '*' * (1 * 128)), ) print "manifest sha: ", str_sha(m.stored_sha) m.update(a, entries_from_seq(self.tmps, data0)) print "manifest sha: ", str_sha(m.stored_sha) dump_archive(a, "AFTER FIRST WRITE:") verify_manifest(a, m) data1 = ( \ ('foo.txt', 'This is the foo file.\n'), ('empty.txt', ''), ('big.txt', 'hello' + ('*' * (1 * 128))), ) m.update(a, entries_from_seq(self.tmps, data1)) print "manifest sha: ", str_sha(m.stored_sha) dump_archive(a) verify_link_map(a.blocks.link_map) verify_manifest(a, m) def test_words(self): print WORD_ITR.next() def test_lines(self): for line in lines(10): print line def test_many_updates(self): a = self.make_empty_archive('A') m = FileManifest() files = ("A.txt", "B.txt", "C.txt") updates = 100 for dummy in range(0, updates): names = list(files) random.shuffle(names) #names = names[:random.randrange(1, len(files))] data = [] for name in names: text = '' if name in m.name_map: tmp = self.tmps.make_temp_file() a.get_file(m.name_map[name][1], tmp) text = self.read_file(tmp) text += "\n".join([line for line in lines(20)]) data.append((name, text)) #print "updating:" #for value in data: # print value[0], len(value[1]) #print "manifest sha: ", str_sha(m.stored_sha) #dump_archive(a, "BEFORE UPDATE: %i" % count, True) m.update(a, entries_from_seq(self.tmps, data)) #print "manifest sha: ", str_sha(m.stored_sha) #dump_archive(a, "AFTER UPDATE: %i" % count, True) verify_manifest(a, m, True) verify_link_map(a.blocks.link_map) dump_blocks(a.blocks, None, True) a.close() def test_validate_path(self): base_dir = "/tmp/test/foo" validate_path(base_dir, "/tmp/test/foo/bar") validate_path(base_dir, "/tmp/test/foo/baz") validate_path(base_dir, "/tmp/test/foo/barf/text.dat") try: validate_path(base_dir, "/tmp/test/foo/../../../etc/passwd") self.assertTrue(False) except IOError, e: print "Got expected exception: ", e try: validate_path(base_dir, "/tmp/test/foo/../forbidden") self.assertTrue(False) except IOError, e: print "Got expected exception: ", e try: validate_path(base_dir, u"/tmp/test/foo/f\xc3\xb6rbjuden.txt") self.assertTrue(False) except IOError, e: print "Got expected exception: ", e try: validate_path(base_dir, "/tmp/test/foo/f\xc3\xb6rbjuden.txt") self.assertTrue(False) except IOError, e: print "Got expected exception: ", e def test_is_contiguous(self): self.assertTrue(is_contiguous( () )) self.assertTrue(is_contiguous( ((0, 0, '?'), ) )) self.assertTrue(is_contiguous( ((0, 0, 2), (1, 1, '?')) )) self.assertTrue(is_contiguous( ((0, 1, 2), (2, 3, '?')) )) self.assertFalse(is_contiguous( ((0, 0, 2), (2, 2, '?')) )) self.assertFalse(is_contiguous( ((0, 1, 2), (3, 3, '?')) )) # Trailing Zeros are ignored. def test_is_ordered(self): self.assertTrue(is_ordered( () )) self.assertTrue(is_ordered( (('?', '?', 2),) )) self.assertTrue(is_ordered( (('?', '?', 2), ('?', '?', 2)) )) self.assertFalse(is_ordered( (('?', '?', 2), ('?', '?', 1)) )) self.assertTrue(is_ordered( (('?', '?', 1), ('?', '?', 2)) )) self.assertTrue(is_ordered( (('?', '?', 2), ('?', '?', 2), ('?', '?', 2)) )) self.assertTrue(is_ordered( (('?', '?', 1), ('?', '?', 2), ('?', '?', 2)) )) self.assertFalse(is_ordered( (('?', '?', 1), ('?', '?', 0), ('?', '?', 2)) )) self.assertTrue(is_ordered( (('?', '?', 1), ('?', '?', 2), ('?', '?', 3)) )) self.assertTrue(is_ordered( (('?', '?', 2), ('?', '?', 0)) )) self.assertTrue(is_ordered( (('?', '?', 2), ('?', '?', 2), ('?', '?', 0)) )) self.assertFalse(is_ordered( (('?', '?', 2), ('?', '?', 1), ('?', '?', 0)) )) self.assertTrue(is_ordered( (('?', '?', 1), ('?', '?', 2), ('?', '?', 0), ('?', '?', 0)) )) self.assertTrue(is_ordered( (('?', '?', 2), ('?', '?', 2), ('?', '?', 2), ('?', '?', 0)) )) self.assertTrue(is_ordered( (('?', '?', 2), ('?', '?', 2), ('?', '?', 2), ('?', '?', 0)) )) self.assertTrue(is_ordered( (('?', '?', 1), ('?', '?', 2), ('?', '?', 2), ('?', '?', 0)) )) self.assertFalse(is_ordered( (('?', '?', 1), ('?', '?', 0), ('?', '?', 2), ('?', '?', 0)) )) self.assertTrue(is_ordered( (('?', '?', 1), ('?', '?', 2), ('?', '?', 3), ('?', '?', 0)) )) self.assertFalse(is_ordered( (('?', '?', 3), ('?', '?', 2), ('?', '?', 1), ('?', '?', 0)) )) self.assertFalse(is_ordered( (('?', '?', 3), ('?', '?', 2), ('?', '?', 1) )) ) def test_repartition(self): for dummy in range(0, 1000): length = random.randrange(1, 8) blocks = [(index, index, random.randrange(1, 10)) for index in range(0, length)] self.assertTrue(is_contiguous(blocks)) original_blocks = blocks[:] #were_ordered = is_ordered(blocks) #print blocks repartioned = repartition(blocks) #print repartioned self.assertTrue(is_ordered(repartioned)) self.assertTrue(blocks == original_blocks) # Can't assert this anymore. # Trips when in order partitions get merged because they # don't meet the multiple constraint. # #self.assertTrue((were_ordered and blocks == repartioned) or # ((not were_ordered) and blocks != repartioned)) self.assertTrue(is_contiguous(repartioned)) def updateFunc(self, blocks, change_len, max_len): assert len(blocks) > 0 blocks = blocks[:] if blocks[0][2] + change_len < 32 * 1024: blocks[0] = (blocks[0][0], blocks[0][1], blocks[0][2] + change_len) return blocks # Add and compress blocks.insert(0, (-1, -1, change_len)) return compress(blocks, max_len) def histogram(self, values, bin_width): table = {} for value in values: index = int(value/bin_width) table[index] = table.get(index, 0) + 1 max_bin = max(table.keys()) return tuple([(index, table.get(index, 0)) for index in range(0, max_bin + 1)]) # Naive # DOESN'T SIMULATE: # o Dropping unreferenced chains. # o GOOD: reduces total archive size # o BAD: effective length of older blocks declines with time # as unreferenced chains drop out. -> churn ??? # o variance in commit sizes # HACKed this together fast, not sure it is correct. # Looks like I'm getting a power law dist. def test_simulate_updates(self): max_blocks = 4 iterations = 10000 change_size = 2*1024 blocks = [(index, index, 0) for index in range(0, max_blocks)] changes = [] for dummy in range(0, iterations): old_blocks = blocks[:] blocks = self.updateFunc(blocks, change_size, max_blocks) if not ((is_ordered(blocks) or (is_ordered(blocks[1:]) and blocks[0][2] < 32 * 1024))): print blocks self.assertTrue(is_ordered(blocks) or (is_ordered(blocks[1:]) and blocks[0][2] < 32 * 1024)) changed = set(old_blocks) - set(blocks) for value in changed: # i.e. the number of bytes we had to write changes.append(value[2]) # Fix ordinals. Shouldn't matter. blocks = [(index, index, blocks[index][2]) for index in range(0, len(blocks))] #hist = self.histogram(changes, 32 * 1024) #for value in hist: # print value[0], value[1] changes.sort() #max_insert = max(changes) for percent in (50, 75, 80, 85, 90, 95, 99, 100): point = changes[min(int((percent/100.0) * len(changes)), len(changes) - 1)] print "%i %i %i" % (percent, point, point/(32*1024 + 1)) def test_hg_repo_torture_test(self): if HG_REPO_DIR == '': print "Set HG_REPO_DIR!" self.assertTrue(False) writer = self.make_empty_archive('hgtst') manifest = FileManifest() rev = 0 max_rev = 1 # Set below while rev < max_rev: target_dir = os.path.join(self.tmp_dir, '__hg_repo__') if os.path.exists(target_dir): shutil.rmtree(target_dir) # DANGEROUS # export the repo # FIX: Wacky way to set max_rev. print "Exporting rev: ", rev max_rev = export_hg_repo(HG_REPO_DIR, target_dir, rev) if rev >= max_rev: break # put the export dir into the archive # print "Inserting into the archive..." entries = entries_from_dir(target_dir, True) manifest.update(writer, entries) # Will be written into Freenet top key # along with rest of archive info. s3kr1t = manifest.stored_sha dump_blocks(writer.blocks, None, True) # create a second archive instance from the same block files. # REDFLAG: Would this work on windoze? # writer still has files open for reading. reader = self.load_archive('hgtst') read_manifest = FileManifest.from_archive(reader, s3kr1t) # REDFLAG: audit for other places where I could do # direct dict compares? assert (read_manifest.name_map == manifest.name_map) # clean the archive output dir unarchived_dir = os.path.join(self.tmp_dir, '__unarchived__') if os.path.exists(unarchived_dir): shutil.rmtree(unarchived_dir) # DANGEROUS os.makedirs(unarchived_dir) # extract the archive to the cleaned files manifest_to_dir(reader, read_manifest, unarchived_dir) reader.close() # diff the directories # A poor man's diff. insert_map = {} for entry in entries_from_dir(target_dir, True): insert_map[entry.get_name()] = get_file_sha(entry.make_file()) entry.release() # NOP unarchived_map = {} for entry in entries_from_dir(unarchived_dir, True): unarchived_map[entry.get_name()] = ( get_file_sha(entry.make_file())) entry.release() # NOP assert len(insert_map) > 0 assert insert_map == unarchived_map print "%i files compared equal." % len(insert_map) rev += 1 if __name__ == '__main__': # use -v on command line to get verbose output. # verbosity keyword arg not supported in 2.6? if len(sys.argv) >= 2 and sys.argv[1] != '-v': # Run a single test case suite = unittest.TestSuite() suite.addTest(SmokeTests(sys.argv[1])) unittest.TextTestRunner().run(suite) else: # Run everything. unittest.main()