""" An IFileFunctions subclass which reads files from a particular version of an hg repo. Copyright (C) 2009 Darrell Karbott This library is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks """ import os from mercurial import cmdutil from pathhacks import add_parallel_sys_path add_parallel_sys_path('fniki') from fileoverlay import OverlayedFiles, DirectFiles, WIKITEXT_ENCODING # ATTRIBUTION: Pillaged from commands.cat() in the hg source. def get_hg_file(repo, file_name, rev, tmp_file_name, dump_to_file = False): """ INTERNAL: read a file from the hg repo. If dump_to_file, the data is written into tmp_file_name. Otherwise, the data is returned and tmp_file_name is deleted. """ #print "get_hg_file -- ", file_name, rev file_name = os.path.join(repo.root, file_name) ctx = repo[rev] bytes = None err = True matches = cmdutil.match(repo, (file_name,)) for abs_ in ctx.walk(matches): assert err # Wacky. Why are we looping again? # REDFLAG: ripped out decode code. Will I need that on windows? file_ptr = None # Hmmmm starting to look like crappy Java code :-( in_file = None try: file_ptr = cmdutil.make_file(repo, tmp_file_name, ctx.node(), pathname=abs_) file_ptr.write(ctx[abs_].data()) file_ptr.close() file_ptr = None if not dump_to_file: in_file = open(tmp_file_name) bytes = in_file.read() finally: if file_ptr: file_ptr.close() if in_file: in_file.close() if not dump_to_file and os.path.exists(tmp_file_name): os.remove(tmp_file_name) err = False if err: raise KeyError("File: %s doesn't exist in version: %s" \ % (file_name, rev)) if dump_to_file: return "The data was written into: %s" % tmp_file_name return bytes class HgFileOverlay(OverlayedFiles): """ An IFileOverlay that reads files from a mercurial revision.""" def __init__(self, ui_, repo, base_dir, tmp_file): OverlayedFiles.__init__(self, os.path.join(repo.root, base_dir)) self.base_dir = base_dir # i.e. root wrt repo self.ui_ = ui_ self.repo = repo self.version = 'tip' self.tmp_file = tmp_file def repo_path(self, path): """ Return path w.r.t. the repository root. """ path = os.path.abspath(path) assert path.startswith(self.base_path) assert path.startswith(self.repo.root) rest = path[len(self.repo.root):] if rest.startswith(os.sep): rest = rest[len(os.sep):] return rest def repo_pages(self, path): """ INTERNAL: Enumerate files in a repo subdirectory. """ if not path.endswith('wikitext'): raise ValueError("Dunno how to enumerate wikitext pages from: %s" % path) wikitext_dir = self.repo_path(path) # Hmmmm... won't work for files in root. use -1? return tuple([os.path.split(name)[1] for name in self.repo.changectx(self.version). manifest().keys() if name.startswith(wikitext_dir)]) def exists_in_repo(self, path): """ INTERNAL: Return True if the file exists in the repo, False otherwise. """ return (self.repo_path(path) in self.repo.changectx(self.version).manifest()) def read(self, path, mode='rb', non_overlayed=False): """ Read a file. """ if non_overlayed: return unicode( get_hg_file(self.repo, self.repo_path(path), self.version, self.tmp_file), WIKITEXT_ENCODING) overlayed = self.overlay_path(path) if os.path.exists(overlayed): return DirectFiles.read(self, overlayed, mode) return unicode(get_hg_file(self.repo, self.repo_path(path), self.version, self.tmp_file), WIKITEXT_ENCODING) def exists(self, path, non_overlayed=False): """ Return True if the file exists, False otherwise. """ if non_overlayed: return self.exists_in_repo(path) overlay = self.overlay_path(path) if os.path.exists(overlay): if os.path.getsize(overlay) == 0: return False else: return True return self.exists_in_repo(path) def modtime(self, path, non_overlayed=False): """ Return the modtime for the file.""" if non_overlayed: # Hmmm commit time for changeset, not file. Good enough. return int(self.repo.changectx(self.version).date()[0]) overlay = self.overlay_path(path) if os.path.exists(overlay) and os.path.getsize(overlay) > 0: return DirectFiles.modtime(self, overlay) return int(self.repo.changectx(self.version).date()[0]) def list_pages(self, path, non_overlayed=False): """ IFileFunctions implementation. """ if non_overlayed: return self.repo_pages() overlay_pages = set([]) overlay = self.overlay_path(path) if os.path.exists(overlay): overlay_pages = set(DirectFiles.list_pages(self, overlay)) deleted = set([]) for name in overlay_pages: if os.path.getsize(os.path.join(overlay, name)) == 0: deleted.add(name) return list(overlay_pages.union(set(self.repo_pages(path)) - deleted))