# -*- test-case-name: twisted.web2.test.test_httpauth -*- """ Wrapper Resources for rfc2617 HTTP Auth. """ from zope.interface import implements from twisted.cred import error from twisted.web2 import resource from twisted.web2 import responsecode from twisted.web2 import http from twisted.web2 import iweb class UnauthorizedResource(resource.LeafResource): """Returned by locateChild or render to generate an http Unauthorized response. """ def __init__(self, factories): """ @param factories: sequence of ICredentialFactory implementations for which to generate a WWW-Authenticate header. """ self.factories = factories def render(self, req): resp = http.Response(responsecode.UNAUTHORIZED) authHeaders = [] for factory in self.factories.itervalues(): authHeaders.append((factory.scheme, factory.getChallenge(req.remoteAddr))) resp.headers.setHeader('www-authenticate', authHeaders) return resp class HTTPAuthResource(object): """I wrap a resource to prevent it being accessed unless the authentication can be completed using the credential factory, portal, and interfaces specified. """ implements(iweb.IResource) def __init__(self, wrappedResource, credentialFactories, portal, interfaces): """ @param wrappedResource: A L{twisted.web2.iweb.IResource} to be returned from locateChild and render upon successful authentication. @param credentialFactories: A list of instances that implement L{ICredentialFactory}. @type credentialFactories: L{list} @param portal: Portal to handle logins for this resource. @type portal: L{twisted.cred.portal.Portal} @param interfaces: the interfaces that are allowed to log in via the given portal @type interfaces: L{tuple} """ self.wrappedResource = wrappedResource self.credentialFactories = dict([(factory.scheme, factory) for factory in credentialFactories]) self.portal = portal self.interfaces = interfaces def login(self, factory, response, req): def _loginSucceeded(res): return self.wrappedResource def _loginFailed(res): return UnauthorizedResource(self.credentialFactories) try: creds = factory.decode(response, req.method) except error.LoginFailed: return UnauthorizedResource(self.credentialFactories) return self.portal.login(creds, None, *self.interfaces ).addCallbacks(_loginSucceeded, _loginFailed) def authenticate(self, req): authHeader = req.headers.getHeader('authorization') if authHeader is None or authHeader[0] not in self.credentialFactories: return UnauthorizedResource(self.credentialFactories) else: return self.login(self.credentialFactories[authHeader[0]], authHeader[1], req) def locateChild(self, req, seg): return self.authenticate(req), seg[1:] def renderHTTP(self, req): return self.authenticate(req)