""" |
module with base functionality for std.path package |
|
""" |
from __future__ import generators |
import os, sys |
import py |
|
def checktype(pathinstance, kw): |
names = ('local', 'svnwc', 'svnurl', 'py', ) |
for name,value in kw.items(): |
if name in names: |
cls = getattr(py.path, name) |
if bool(isinstance(pathinstance, cls)) ^ bool(value): |
return False |
del kw[name] |
return True |
|
class checker: |
""" deprecated: return checker callable checking for the given |
kwargs-specified specification. |
""" |
def __init__(self, **kwargs): |
py.std.warnings.warn("py.path.checker is deprecated, construct " |
"calls to pathobj.check() instead", |
DeprecationWarning, stacklevel=2) |
self.kwargs = kwargs |
def __call__(self, p): |
return p.check(**self.kwargs) |
|
class Checkers: |
_depend_on_existence = 'exists', 'link' |
|
def __init__(self, path): |
self.path = path |
|
def exists(self): |
raise NotImplementedError |
|
def basename(self, arg): |
return self.path.basename == arg |
|
def basestarts(self, arg): |
return self.path.basename.startswith(arg) |
|
def relto(self, arg): |
return self.path.relto(arg) |
|
def fnmatch(self, arg): |
return fnmatch(arg)(self.path) |
|
def endswith(self, arg): |
return str(self.path).endswith(arg) |
|
def _evaluate(self, kw): |
for name, value in kw.items(): |
invert = False |
meth = None |
try: |
meth = getattr(self, name) |
except AttributeError: |
if name[:3] == 'not': |
invert = True |
try: |
meth = getattr(self, name[3:]) |
except AttributeError: |
pass |
if meth is None: |
raise TypeError, "no %r checker available for %r" % (name, self.path) |
try: |
if meth.im_func.func_code.co_argcount > 1: |
if (not meth(value)) ^ invert: |
return False |
else: |
if bool(value) ^ bool(meth()) ^ invert: |
return False |
except (py.error.ENOENT, py.error.ENOTDIR): |
for name in self._depend_on_existence: |
if name in kw: |
if kw.get(name): |
return False |
name = 'not' + name |
if name in kw: |
if not kw.get(name): |
return False |
return True |
|
class _dummyclass: |
pass |
|
class PathBase(object): |
""" shared implementation for filesystem path objects.""" |
Checkers = Checkers |
|
def check(self, **kw): |
""" check a path for existence, or query its properties |
|
without arguments, this returns True if the path exists (on the |
filesystem), False if not |
|
with (keyword only) arguments, the object compares the value |
of the argument with the value of a property with the same name |
(if it has one, else it raises a TypeError) |
|
when for example the keyword argument 'ext' is '.py', this will |
return True if self.ext == '.py', False otherwise |
""" |
if kw: |
kw = kw.copy() |
if not checktype(self, kw): |
return False |
else: |
kw = {'exists' : 1} |
return self.Checkers(self)._evaluate(kw) |
|
def __iter__(self): |
for i in self.listdir(): |
yield i |
|
def __contains__(self, other): |
if isinstance(other, str): |
return self.join(other).check() |
else: |
if other.dirpath() != self: |
return False |
p = self.join(other.basename) |
return p.check() |
|
def basename(self): |
return self._getbyspec('basename')[0] |
basename = property(basename, None, None, 'basename part of path') |
|
def relto(self, relpath): |
""" return a string which is the relative part of the path |
to the given 'relpath'. |
""" |
if not isinstance(relpath, (str, PathBase)): |
raise TypeError("%r: not a string or path object" %(relpath,)) |
strrelpath = str(relpath) |
if strrelpath and strrelpath[-1] != self.sep: |
strrelpath += self.sep |
|
|
strself = str(self) |
if strself.startswith(strrelpath): |
return strself[len(strrelpath):] |
return "" |
|
def parts(self, reverse=False): |
""" return a root-first list of all ancestor directories |
plus the path itself. |
""" |
current = self |
l = [self] |
while 1: |
last = current |
current = current.dirpath() |
if last == current: |
break |
l.insert(0, current) |
if reverse: |
l.reverse() |
return l |
|
def common(self, other): |
""" return the common part shared with the other path |
or None if there is no common part. |
""" |
last = None |
for x, y in zip(self.parts(), other.parts()): |
if x != y: |
return last |
last = x |
return last |
|
def __add__(self, other): |
""" return new path object with 'other' added to the basename""" |
return self.new(basename=self.basename+str(other)) |
|
def __cmp__(self, other): |
""" return sort value (-1, 0, +1). """ |
try: |
return cmp(self.strpath, other.strpath) |
except AttributeError: |
return cmp(str(self), str(other)) |
|
def __repr__(self): |
""" return a string representation of this path. """ |
return repr(str(self)) |
|
def visit(self, fil=None, rec=None, ignore=_dummyclass): |
""" yields all paths below the current one |
|
fil is a filter (glob pattern or callable), if not matching the |
path will not be yielded, defaulting to None (everything is |
returned) |
|
rec is a filter (glob pattern or callable) that controls whether |
a node is descended, defaulting to None |
|
ignore is an Exception class that is ignoredwhen calling dirlist() |
on any of the paths (by default, all exceptions are reported) |
""" |
if isinstance(fil, str): |
fil = fnmatch(fil) |
if rec: |
if isinstance(rec, str): |
rec = fnmatch(fil) |
elif not callable(rec): |
rec = lambda x: True |
reclist = [self] |
while reclist: |
current = reclist.pop(0) |
try: |
dirlist = current.listdir() |
except ignore: |
return |
for p in dirlist: |
if fil is None or fil(p): |
yield p |
if p.check(dir=1) and (rec is None or rec(p)): |
reclist.append(p) |
|
def _callex(self, func, *args): |
""" call a function and raise errno-exception if applicable. """ |
__tracebackhide__ = True |
try: |
return func(*args) |
except py.error.Error: |
raise |
except EnvironmentError, e: |
if not hasattr(e, 'errno'): |
raise |
__tracebackhide__ = False |
cls, value, tb = sys.exc_info() |
errno = e.errno |
try: |
if not isinstance(e, WindowsError): |
raise NameError |
except NameError: |
|
cls = py.error._geterrnoclass(errno) |
else: |
try: |
cls = py.error._getwinerrnoclass(errno) |
except KeyError: |
raise cls, value, tb |
value = cls("%s%r" % (func.__name__, args)) |
__tracebackhide__ = True |
raise cls, value |
|
def _gethashinstance(self, hashtype): |
if hashtype == "md5": |
return py.std.md5.md5() |
elif hashtype == "sha": |
return py.std.sha.sha() |
else: |
raise ValueError("unknown hash type: %r" %(hashtype,)) |
|
|
class fnmatch: |
def __init__(self, pattern): |
self.pattern = pattern |
def __call__(self, path): |
"""return true if the basename/fullname matches the glob-'pattern'. |
|
* matches everything |
? matches any single character |
[seq] matches any character in seq |
[!seq] matches any char not in seq |
|
if the pattern contains a path-separator then the full path |
is used for pattern matching and a '*' is prepended to the |
pattern. |
|
if the pattern doesn't contain a path-separator the pattern |
is only matched against the basename. |
""" |
pattern = self.pattern |
if pattern.find(path.sep) == -1: |
name = path.basename |
else: |
name = str(path) |
pattern = '*' + path.sep + pattern |
from fnmatch import fnmatch |
return fnmatch(name, pattern) |
|
|
class FSCheckers(Checkers): |
_depend_on_existence = Checkers._depend_on_existence+('dir', 'file') |
|
def dir(self): |
raise NotImplementedError |
|
def file(self): |
raise NotImplementedError |
|
def dotfile(self): |
return self.path.basename.startswith('.') |
|
def ext(self, arg): |
if not arg.startswith('.'): |
arg = '.' + arg |
return self.path.ext == arg |
|
class FSPathBase(PathBase): |
""" shared implementation for filesystem path objects.""" |
Checkers = FSCheckers |
|
def __div__(self, other): |
return self.join(str(other)) |
|
def dirpath(self, *args, **kwargs): |
""" return the directory Path of the current Path joined |
with any given path arguments. |
""" |
return self.new(basename='').join(*args, **kwargs) |
|
def ext(self): |
""" extension of the path (including the '.').""" |
return self._getbyspec('ext')[0] |
ext = property(ext, None, None, 'extension part of path') |
|
def purebasename(self): |
""" pure base name of the path.""" |
return self._getbyspec('purebasename')[0] |
purebasename = property(purebasename, None, None, 'basename without extension') |
|
def read(self, mode='rb'): |
""" read and return a bytestring from reading the path. """ |
if py.std.sys.version_info < (2,3): |
for x in 'u', 'U': |
if x in mode: |
mode = mode.replace(x, '') |
f = self.open(mode) |
try: |
return f.read() |
finally: |
f.close() |
|
def readlines(self, cr=1): |
""" read and return a list of lines from the path. if cr is False, the |
newline will be removed from the end of each line. """ |
if not cr: |
content = self.read('rU') |
return content.split('\n') |
else: |
f = self.open('rU') |
try: |
return f.readlines() |
finally: |
f.close() |
|
def load(self): |
""" return object unpickled from self.read() """ |
f = self.open('rb') |
try: |
from cPickle import load |
return self._callex(load, f) |
finally: |
f.close() |
|
def move(self, target): |
""" move this path to target. """ |
if target.relto(self): |
raise py.error.EINVAL(target, "cannot move path into a subdirectory of itself") |
try: |
self.rename(target) |
except py.error.EXDEV: |
self.copy(target) |
self.remove() |
|
def _getpymodule(self): |
"""resolve this path to a module python object. """ |
modname = str(self) |
modname = modname.replace('.', self.sep) |
try: |
return sys.modules[modname] |
except KeyError: |
co = self._getpycodeobj() |
mod = py.std.new.module(modname) |
mod.__file__ = PathStr(self) |
if self.basename == '__init__.py': |
mod.__path__ = [str(self.dirpath())] |
sys.modules[modname] = mod |
try: |
exec co in mod.__dict__ |
except: |
del sys.modules[modname] |
raise |
return mod |
|
def _getpycodeobj(self): |
""" read the path and compile it to a py.code.Code object. """ |
s = self.read('rU') |
|
return py.code.compile(s) |
|
class PathStr(str): |
def __init__(self, path): |
global old_import_hook |
self.__path__ = path |
if old_import_hook is None: |
import __builtin__ |
old_import_hook = __builtin__.__import__ |
__builtin__.__import__ = custom_import_hook |
|
def relativeimport(p, name, parent=None): |
names = name.split('.') |
last_list = [False] * (len(names)-1) + [True] |
modules = [] |
for name, is_last in zip(names, last_list): |
if hasattr(parent, name): |
|
|
submodule = getattr(parent, name) |
else: |
if is_last and p.new(basename=name+'.py').check(): |
p = p.new(basename=name+'.py') |
else: |
p = p.new(basename=name).join('__init__.py') |
if not p.check(): |
return None |
submodule = p._getpymodule() |
if parent is not None: |
setattr(parent, name, submodule) |
modules.append(submodule) |
parent = submodule |
return modules |
|
|
old_import_hook = None |
|
def custom_import_hook(name, glob=None, loc=None, fromlist=None): |
__tracebackhide__ = False |
__file__ = glob and glob.get('__file__') |
if isinstance(__file__, PathStr): |
|
|
modules = None |
if hasattr(__file__.__path__, 'pkgdir'): |
modules = relativeimport(__file__.__path__.pkgdir, name) |
if not modules: |
modules = relativeimport(__file__.__path__, name) |
if modules: |
if fromlist: |
submodule = modules[-1] |
|
|
p = submodule.__file__.__path__ |
if p.check(basename='__init__.py'): |
for name in fromlist: |
relativeimport(p, name, parent=submodule) |
|
return submodule |
else: |
return modules[0] |
|
__tracebackhide__ = True |
return old_import_hook(name, glob, loc, fromlist) |
|
|