""" Classes to support overlayed file writing so that that piki can edit
without modifying the original copy.
Copyright (C) 2009 Darrell Karbott
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public
License as published by the Free Software Foundation; either
version 2.0 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks
"""
import codecs
import os
import stat
# NOTE: There are hard coded references to utf8 in piki.py, submission.py
# and hgoverlay.py. Look there before changing this value.
WIKITEXT_ENCODING = "utf8"
class IFileFunctions:
""" An ABC for file system operations. """
def __init__(self, base_path):
self.base_path = base_path
def overlay_path(self, path):
""" Return the path that writes should be written to. """
def write(self, path, bytes, mode='wb'):
""" Write a file. """
raise NotImplementedError()
def read(self, path, mode='rb', dummy_non_overlayed=False):
""" Read a file. """
raise NotImplementedError()
def exists(self, path, dummy_non_overlayed=False):
""" Return True if the file exists, False otherwise. """
raise NotImplementedError()
def modtime(self, path, dummy_non_overlayed=False):
""" Return the modtime for the file."""
raise NotImplementedError()
def list_pages(self, path, dummy_non_overlayed=False):
""" Return a list of all pages. """
raise NotImplementedError()
def has_overlay(self, path):
""" Return True if there's an overlay for the file, False otherwise. """
raise NotImplementedError()
def remove_overlay(self, path):
""" Remove the overlayed version of the file. """
raise NotImplementedError
def is_overlayed(self):
""" Return True if the instance supports overlaying, False
otherwise. """
raise NotImplementedError
class DirectFiles(IFileFunctions):
""" An IFileFunctions implementation which writes directly to
the file system. """
def __init__(self, base_path):
IFileFunctions.__init__(self, base_path)
def overlay_path(self, path):
""" IFileFunctions implementation. """
return path
def write(self, path, bytes, mode='wb'):
""" IFileFunctions implementation. """
# There were hacks in the original piki code
# to handle nt refusing to rename to an existing
# file name. Not sure if it is a problem on
# modern windows.
tmp_name = path + '.__%s__' % str(os.getpid())
try:
out_file = codecs.open(tmp_name, mode, WIKITEXT_ENCODING)
try:
if len(bytes) > 0: # Truncating is allowed.
out_file.write(bytes)
finally:
out_file.close()
if os.path.exists(path):
os.remove(path)
os.rename(tmp_name, path)
finally:
if os.path.exists(tmp_name):
os.remove(tmp_name)
def read(self, path, mode='rb', dummy_non_overlayed=False):
""" IFileFunctions implementation. """
in_file = codecs.open(path, mode, WIKITEXT_ENCODING)
try:
return in_file.read()
finally:
in_file.close()
def exists(self, path, dummy_non_overlayed=False):
""" IFileFunctions implementation. """
return os.path.exists(path)
def modtime(self, path, dummy_non_overlayed=False):
""" IFileFunctions implementation. """
return os.stat(path)[stat.ST_MTIME]
def list_pages(self, path, dummy_non_overlayed=False):
""" IFileFunctions implementation. """
return [name for name in os.listdir(path)
if (os.path.isfile(os.path.join(path, name)) and
not os.path.islink(os.path.join(path, name)))]
def has_overlay(self, dummy_path):
""" IFileFunctions implementation. """
return False
def remove_overlay(self, dummy_path):
""" IFileFunctions implementation. """
assert False
def is_overlayed(self):
""" IFileFunctions implementation. """
return False
OVERLAY_DIR = 'OVERLAY'
class OverlayedFiles(DirectFiles):
""" An IFileFunctions implementation which overlays writes into a separate
parallel OVERLAY directory.
e.g. if:
base_dir == /foo/bar/baz
then,
path == /foo/bar/baz/snafu.txt
maps to,
overlay == /foo/bar/OVERLAY/snafu.txt
"""
def __init__(self, base_path):
DirectFiles.__init__(self, base_path)
def overlay_path(self, path):
""" Return the path that overlayed writes should be written to. """
path = os.path.abspath(path)
assert path.startswith(self.base_path)
rest = path[len(self.base_path):]
if rest.startswith(os.sep):
rest = rest[len(os.sep):]
overlay_base = os.path.split(self.base_path)[0] # Hmmm... errors?
overlayed = os.path.join(os.path.join(overlay_base, OVERLAY_DIR),
rest)
return overlayed
# You cannot write to the non-overlayed files.
def write(self, path, bytes, mode='wb'):
""" IFileFunctions implementation. """
DirectFiles.write(self, self.overlay_path(path), bytes, mode)
def read(self, path, mode='rb', non_overlayed=False):
""" IFileFunctions implementation. """
if non_overlayed:
return DirectFiles.read(self, path, mode)
overlayed = self.overlay_path(path)
if os.path.exists(overlayed):
return DirectFiles.read(self, overlayed, mode)
return DirectFiles.read(self, path, mode)
# Zero length file means delete.
def exists(self, path, non_overlayed=False):
""" IFileFunctions implementation. """
if non_overlayed:
return DirectFiles.exists(self, path)
overlay = self.overlay_path(path)
if os.path.exists(overlay):
if os.path.getsize(overlay) == 0:
return False
else:
return True
return os.path.exists(path)
def modtime(self, path, non_overlayed=False):
""" IFileFunctions implementation. """
if non_overlayed:
return DirectFiles.modtime(self, path)
overlay = self.overlay_path(path)
if os.path.exists(overlay) and os.path.getsize(overlay) > 0:
return DirectFiles.modtime(self, overlay)
return DirectFiles.modtime(self, path)
def list_pages(self, path, non_overlayed=False):
""" IFileFunctions implementation. """
if non_overlayed:
return DirectFiles.list_pages(self, path)
overlay = self.overlay_path(path)
overlay_pages = set([])
if os.path.exists(overlay):
overlay_pages = set(DirectFiles.list_pages(self, overlay))
deleted = set([])
for name in overlay_pages:
if os.path.getsize(os.path.join(overlay, name)) == 0:
deleted.add(name)
return list(overlay_pages.union(
set(DirectFiles.list_pages(self, path)) - deleted))
# Hmmmm... Returns True for zero length file. i.e. "mark to delete"
def has_overlay(self, path):
""" IFileFunctions implementation. """
return os.path.exists(self.overlay_path(path))
def remove_overlay(self, path):
""" IFileFunctions implementation. """
overlay = self.overlay_path(path)
if os.path.exists(overlay):
os.remove(overlay)
def is_overlayed(self):
""" IFileFunctions implementation. """
return True
def get_file_funcs(base_path, is_overlayed=False):
""" Returns an overlayed IFileFunctions implementation if
is_overlayed is True, and a direct implementation otherwise. """
if not is_overlayed:
return DirectFiles(base_path)
return OverlayedFiles(base_path)
def remove_redundant_files(overlay, wikitext_dir, out_func=lambda msg:None):
""" Removes files which are identical in the overlayed and non-overlayed
directories.
Also removes empty deletion marker files for files which have
been deleted from the non-overlayed directory. """
assert overlay.is_overlayed()
for name in overlay.list_pages(wikitext_dir):
full_path = os.path.join(wikitext_dir, name)
if overlay.has_overlay(full_path):
if not overlay.exists(full_path, True):
if len(overlay.read(full_path, 'rb', False)) == 0:
overlay.remove_overlay(full_path)
out_func("Removed redundant overlayed file: %s" % name)
continue
if (overlay.read(full_path, 'rb', False) ==
overlay.read(full_path, 'rb', True)):
overlay.remove_overlay(full_path)
out_func("Removed redundant overlayed file: %s" % name)
continue