Source code for nti.app.pyramid_zope.traversal

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Support for resource tree traversal.

"""

from __future__ import division
from __future__ import print_function
from __future__ import absolute_import

from pyramid import traversal

from pyramid.compat import is_nonstr_iter
from pyramid.compat import decode_path_info

from pyramid.exceptions import URLDecodeError

from pyramid.httpexceptions import HTTPNotFound

from pyramid.interfaces import VH_ROOT_KEY

from pyramid.interfaces import ITraverser

from zope import interface

from zope.component import queryMultiAdapter
from zope.event import notify

from zope.location.interfaces import LocationError

from zope.traversing import api as ztraversing

from zope.traversing.interfaces import ITraversable
from zope.traversing.interfaces import BeforeTraverseEvent

from zope.publisher.interfaces.browser import IBrowserRequest
from zope.publisher.interfaces.browser import IDefaultBrowserLayer

from zope.traversing.namespace import resource as _zresource

lineage = traversal.lineage
find_interface = traversal.find_interface

empty = traversal.empty
split_path_info = traversal.split_path_info

logger = __import__('logging').getLogger(__name__)

__all__ = [
    'ZopeResourceTreeTraverser',
    'resource',
]

def _notify_before_traverse_event(ob, request):
    """
    Notifies a BeforeTraverseEvent, but safely: if the
    handlers themselves raise a location error, turn that into
    a HTTP 404 exception.

    Because handlers are deliberately doing this, we stop
    traversal and abort rather than try to return an information
    dictionary and find a view and context, etc. This is limiting, but
    safe.
    """
    try:
        notify(BeforeTraverseEvent(ob, request))
    except LocationError:
        # this is often a setup or programmer error
        logger.debug("LocationError from traverse subscribers", exc_info=True)
        raise HTTPNotFound("Traversal failed")


[docs]@interface.implementer(ITraverser) class ZopeResourceTreeTraverser(traversal.ResourceTreeTraverser): """ A :class:`pyramid.interfaces.ITraverser` based on pyramid's default traverser, but modified to use the :mod:`zope.traversing.api` machinery instead of (only) dictionary lookups. This provides is with the flexibility of the :obj:`zope.traversing.interfaces.ITraversable` adapter pattern, plus the support of namespace lookups (:func:`zope.traversing.namespace.nsParse` and :func:`zope.traversing.namespace.namespaceLookup`). As this object traverses, it fires :obj:`~.IBeforeTraverseEvent` events. If you either load the configuration from :mod:`zope.app.publication` or manually enable the :obj:`zope.site.site.threadSiteSubscriber <zope.site.site>` to subscribe to this event, then any Zope site managers found along the way will be made the current site. """
[docs] def __init__(self, root): traversal.ResourceTreeTraverser.__init__(self, root)
[docs] def __call__(self, request): # pylint:disable=too-many-locals,too-many-branches,too-many-statements """ See :meth:`pyramid.interfaces.ITraversar.__call__`. """ # JAM: Unfortunately, the superclass implementation is entirely monolithic # and we so we cannot reuse any part of it. Instead, # we copy-and-paste it. Unless otherwise noted, comments below are # original. # JAM: Note the abundance of no covers. These are for features we are # not currently using and the code is lifted directly from pyramid. environ = request.environ if request.matchdict is not None: matchdict = request.matchdict path = matchdict.get('traverse', '/') or '/' if is_nonstr_iter(path): # this is a *traverse stararg (not a {traverse}) # routing has already decoded these elements, so we just # need to join them path = '/'.join(path) or '/' subpath = matchdict.get('subpath', ()) if not is_nonstr_iter(subpath): # pragma: no cover # this is not a *subpath stararg (just a {subpath}) # routing has already decoded this string, so we just need # to split it subpath = split_path_info(subpath) else: # pragma: no cover # this request did not match a route subpath = () try: # empty if mounted under a path in mod_wsgi, for example path = decode_path_info(environ['PATH_INFO'] or '/') except KeyError: path = '/' except UnicodeDecodeError as e: raise URLDecodeError(e.encoding, e.object, e.start, e.end, e.reason) if VH_ROOT_KEY in environ: # pragma: no cover # HTTP_X_VHM_ROOT vroot_path = decode_path_info(environ[VH_ROOT_KEY]) vroot_tuple = split_path_info(vroot_path) # both will (must) be unicode or asciistr vpath = vroot_path + path vroot_idx = len(vroot_tuple) - 1 else: vroot_tuple = () vpath = path vroot_idx = -1 root = self.root ob = vroot = root if vpath == '/': # invariant: vpath must not be empty # prevent a call to traversal_path if we know it's going # to return the empty tuple vpath_tuple = () else: i = 0 view_selector = self.VIEW_SELECTOR # A list so that remaining_path can be modified vpath_tuple = list(split_path_info(vpath)) for segment in vpath_tuple: # JAM: Fire traversal events, mainly so sites get installed. See # zope.publisher.base. _notify_before_traverse_event(ob, request) # JAM: Notice that checking for '@@' is special cased, and # doesn't go through the normal namespace lookup as it would in # plain zope traversal. (XXX: Why not?) if segment.startswith(view_selector): # pragma: no cover return {'context': ob, 'view_name': segment[2:], 'subpath': vpath_tuple[i + 1:], 'traversed': vpath_tuple[:vroot_idx + i + 1], 'virtual_root': vroot, 'virtual_root_path': vroot_tuple, 'root': root} try: # JAM: This is where we differ. instead of using __getitem__, # we use the traversing machinery. # The zope app would use IPublishTraverser, which # would install security proxies along the way. We probably don't need to # do that? TODO: # NOTE: By passing the request here, we require all traversers # (including the namespace traversers) to be registered as multi-adapters. # None of the default namespaces are. See our # configure.zcml for what is. # JAM: Damn stupid implementation of traversePathElement ignores # the request argument to find a traversable /except/ when a namespace is found. # therefore, we explicitly query for the multi adapter ourself in the non-namespace case # (In the namespace case, we let traversing handle it, because it needs a named adapter # after parsing) traversable = None if segment and segment[0] not in '+@' \ and not ITraversable.providedBy(ob): try: # Use the installed component registry # instead of the request registry (which # is the global component registry if # pyramid was configured that way, or a # standalone registry) in case the act of # traversing has changed the site manager; # zope.site.site.threadSiteSubscriber will # do this for each BeforeTraverseEvent # that's fired (though that's not # registered by default). traversable = queryMultiAdapter((ob, request), ITraversable) except TypeError: # Some things are registered for "*" (DefaultTraversable) # which means they get called here. If they can't take # two arguments, then we bail. Sucks. pass remaining_path = vpath_tuple[i + 1:] next_ob = ztraversing.traversePathElement(ob, segment, remaining_path, traversable=traversable, request=request) if remaining_path != vpath_tuple[i + 1:]: # Is this if check necessary? It would be faster to # always assign vpath_tuple[i + 1:] = remaining_path except LocationError: # LocationError is a type of KeyError. The DefaultTraversable turns # plain KeyError and TypeErrors into LocationError. return {'context': ob, 'view_name': segment, 'subpath': vpath_tuple[i + 1:], 'traversed': vpath_tuple[:vroot_idx + i + 1], 'virtual_root': vroot, 'virtual_root_path': vroot_tuple, 'root': root} if i == vroot_idx: # pragma: no cover vroot = next_ob ob = next_ob i += 1 # JAM: Also fire before traversal for the actual context item, since we # won't actually traverse into it. Be sure not to fire multiple times # for this (E.g., the root). This logic is complicated by the # multi-returns above. _notify_before_traverse_event(ob, request) return {'context': ob, 'view_name': empty, 'subpath': subpath, 'traversed': vpath_tuple, 'virtual_root': vroot, 'virtual_root_path': vroot_tuple, 'root': root}
[docs]class resource(_zresource): """ Handles resource lookup in a way compatible with :mod:`zope.browserresource`. This package registers resources as named adapters from :class:`.IDefaultBrowserLayer` to Interface. We connect the two by making the pyramid request implement the right thing. """
[docs] def __init__(self, context, request): request = IBrowserRequest(request) if not IDefaultBrowserLayer.providedBy(request): interface.alsoProvides(request, IDefaultBrowserLayer) # We lie super(resource, self).__init__(context, request)