############################################################################## # # Copyright (c) 2002-2003 Ingeniweb SARL # ############################################################################## """GroupUserFolder product""" # fakes a method from a DTML File from Globals import MessageDialog, DTMLFile from AccessControl import ClassSecurityInfo from AccessControl import Permissions from AccessControl import getSecurityManager from Globals import InitializeClass from Acquisition import Implicit, aq_inner, aq_parent, aq_base from Globals import Persistent from AccessControl.Role import RoleManager from OFS.SimpleItem import Item from OFS.PropertyManager import PropertyManager from OFS import ObjectManager, SimpleItem from DateTime import DateTime from App import ImageFile import AccessControl.Role, webdav.Collection import Products import os import string import shutil import random from global_symbols import * import AccessControl import GRUFFolder import GroupUserFolder from AccessControl.PermissionRole \ import _what_not_even_god_should_do, rolesForPermissionOn from ComputedAttribute import ComputedAttribute import os import traceback from interfaces.IUserFolder import IUser, IGroup _marker = ['INVALID_VALUE'] # NOTE : _what_not_even_god_should_do is a specific permission defined by ZOPE # that indicates that something has not to be done within Zope. # This value is given to the ACCESS_NONE directive of a SecurityPolicy. # It's rarely used within Zope BUT as it is documented (in AccessControl) # and may be used by third-party products, we have to process it. #GROUP_PREFIX is a constant class GRUFUserAtom(AccessControl.User.BasicUser, Implicit): """ Base class for all GRUF-catched User objects. There's, alas, many copy/paste from AccessControl.BasicUser... """ security = ClassSecurityInfo() security.declarePrivate('_setUnderlying') def _setUnderlying(self, user): """ _setUnderlying(self, user) => Set the GRUFUser properties to the underlying user's one. Be careful that any change to the underlying user won't be reported here. $$$ We don't know yet if User object are transaction-persistant or not... """ self._original_name = user.getUserName() self._original_password = user._getPassword() self._original_roles = user.getRoles() self._original_domains = user.getDomains() self._original_id = user.getId() self.__underlying__ = user # Used for authenticate() and __getattr__ # ---------------------------- # Public User object interface # ---------------------------- # Maybe allow access to unprotected attributes. Note that this is # temporary to avoid exposing information but without breaking # everyone's current code. In the future the security will be # clamped down and permission-protected here. Because there are a # fair number of user object types out there, this method denies # access to names that are private parts of the standard User # interface or implementation only. The other approach (only # allowing access to public names in the User interface) would # probably break a lot of other User implementations with extended # functionality that we cant anticipate from the base scaffolding. security.declarePrivate('__init__') def __init__(self, underlying_user, GRUF, isGroup, source_id, ): # When calling, set isGroup it to TRUE if this user represents a group self._setUnderlying(underlying_user) self._isGroup = isGroup self._GRUF = GRUF self._source_id = source_id self.id = self._original_id # Store the results of getRoles and getGroups. Initially set to None, # set to a list after the methods are first called. # If you are caching users you want to clear these. self.clearCachedGroupsAndRoles() security.declarePrivate('clearCachedGroupsAndRoles') def clearCachedGroupsAndRoles(self, underlying_user = None): self._groups = None self._user_roles = None self._group_roles = None self._all_roles = None if underlying_user: self._setUnderlying(underlying_user) self._original_user_roles = None security.declarePublic('isGroup') def isGroup(self,): """Return 1 if this user is a group abstraction""" return self._isGroup security.declarePublic('getUserSourceId') def getUserSourceId(self,): """ getUserSourceId(self,) => string Return the GRUF's GRUFUsers folder used to fetch this user. """ return self._source_id security.declarePrivate('getGroupNames') def getGroupNames(self,): """...""" ret = self._getGroups(no_recurse = 1) return map(lambda x: x[GROUP_PREFIX_LEN:], ret) security.declarePrivate('getGroupIds') def getGroupIds(self,): """...""" return list(self._getGroups(no_recurse = 1)) security.declarePrivate("getAllGroups") def getAllGroups(self,): """Same as getAllGroupNames()""" return self.getAllGroupIds() security.declarePrivate('getAllGroupNames') def getAllGroupNames(self,): """...""" ret = self._getGroups() return map(lambda x: x[GROUP_PREFIX_LEN:], ret) security.declarePrivate('getAllGroupIds') def getAllGroupIds(self,): """...""" return list(self._getGroups()) security.declarePrivate('getGroups') def getGroups(self, *args, **kw): """...""" ret = self._getGroups(*args, **kw) return list(ret) security.declarePrivate("getImmediateGroups") def getImmediateGroups(self,): """ Return NON-TRANSITIVE groups """ ret = self._getGroups(no_recurse = 1) return list(ret) def _getGroups(self, no_recurse = 0, already_done = [], prefix = GROUP_PREFIX): """ getGroups(self, no_recurse = 0, already_done = [], prefix = GROUP_PREFIX) => list of strings If this user is a user (uh, uh), get its groups. THIS METHODS NOW SUPPORTS NESTED GROUPS ! :-) The already_done parameter prevents infite recursions. Keep it as it is, never give it a value. If no_recurse is true, return only first level groups This method is private and should remain so. """ # List this user's roles. We consider that roles starting # with GROUP_PREFIX are in fact groups, and thus are # returned (prefixed). if self._groups is not None: return self._groups # Populate cache if necessary if self._original_user_roles is None: self._original_user_roles = self.__underlying__.getRoles() # Scan roles to find groups ret = [] for role in self._original_user_roles: # Inspect group-like roles if role.startswith(prefix): # Prevent infinite recursion if self._isGroup and role in already_done: continue # Get the underlying group grp = self.aq_parent.getUser(role) if not grp: continue # Invalid group # Do not add twice the current group if role in ret: continue # Append its nested groups (if recurse is asked) ret.append(role) if no_recurse: continue for extend in grp.getGroups(already_done = ret): if not extend in ret: ret.append(extend) # Return the groups self._groups = tuple(ret) return self._groups security.declarePrivate('getGroupsWithoutPrefix') def getGroupsWithoutPrefix(self, **kw): """ Same as getGroups but return them without a prefix. """ ret = [] for group in self.getGroups(**kw): if group.startswith(GROUP_PREFIX): ret.append(group[len(GROUP_PREFIX):]) return ret security.declarePublic('getUserNameWithoutGroupPrefix') def getUserNameWithoutGroupPrefix(self): """Return the username of a user without a group prefix""" if self.isGroup() and \ self._original_name[:len(GROUP_PREFIX)] == GROUP_PREFIX: return self._original_name[len(GROUP_PREFIX):] return self._original_name security.declarePublic('getUserId') def getUserId(self): """Return the user id of a user""" if self.isGroup() and \ not self._original_name[:len(GROUP_PREFIX)] == GROUP_PREFIX: return "%s%s" % (GROUP_PREFIX, self._original_name ) return self._original_name security.declarePublic("getName") def getName(self,): """Get user's or group's name. For a user, the name can be set by the underlying user folder but usually id == name. For a group, the ID is prefixed, but the NAME is NOT prefixed by 'group_'. """ return self.getUserNameWithoutGroupPrefix() security.declarePublic("getUserName") def getUserName(self,): """Alias for getName()""" return self.getUserNameWithoutGroupPrefix() security.declarePublic('getId') def getId(self, unprefixed = 0): """Get the ID of the user. The ID can be used, at least from Python, to get the user from the user's UserDatabase """ # Return the right id if self.isGroup() and not self._original_name.startswith(GROUP_PREFIX) and not unprefixed: return "%s%s" % (GROUP_PREFIX, self._original_name) return self._original_name security.declarePublic('getRoles') def getRoles(self): """ Return the list (tuple) of roles assigned to a user. THIS IS WHERE THE ATHENIANS REACHED ! """ if self._all_roles is not None: return self._all_roles # Return user and groups roles self._all_roles = GroupUserFolder.unique(self.getUserRoles() + self.getGroupRoles()) return self._all_roles security.declarePublic('getUserRoles') def getUserRoles(self): """ returns the roles defined for the user without the group roles """ if self._user_roles is not None: return self._user_roles prefix = GROUP_PREFIX if self._original_user_roles is None: self._original_user_roles = self.__underlying__.getRoles() self._user_roles = tuple([r for r in self._original_user_roles if not r.startswith(prefix)]) return self._user_roles security.declarePublic("getGroupRoles") def getGroupRoles(self,): """ Return the tuple of roles belonging to this user's group(s) """ if self._group_roles is not None: return self._group_roles ret = [] acl_users = self._GRUF.acl_users groups = acl_users.getGroupIds() # XXX We can have a cache here for group in self.getGroups(): if not group in groups: Log("Group", group, "is invalid. Ignoring.") # This may occur when groups are deleted # Ignored silently continue ret.extend(acl_users.getGroup(group).getUserRoles()) self._group_roles = GroupUserFolder.unique(ret) return self._group_roles security.declarePublic('getRolesInContext') def getRolesInContext(self, object, userid = None): """ Return the list of roles assigned to the user, including local roles assigned in context of the passed in object. """ if not userid: userid=self.getId() roles = {} for role in self.getRoles(): roles[role] = 1 user_groups = self.getGroups() inner_obj = getattr(object, 'aq_inner', object) while 1: # Usual local roles retreiving local_roles = getattr(inner_obj, '__ac_local_roles__', None) if local_roles: if callable(local_roles): local_roles = local_roles() dict = local_roles or {} for role in dict.get(userid, []): roles[role] = 1 # Get roles & local roles for groups # This handles nested groups as well for groupid in user_groups: for role in dict.get(groupid, []): roles[role] = 1 # LocalRole blocking obj = getattr(inner_obj, 'aq_base', inner_obj) if getattr(obj, '__ac_local_roles_block__', None): break # Loop management inner = getattr(inner_obj, 'aq_inner', inner_obj) parent = getattr(inner, 'aq_parent', None) if parent is not None: inner_obj = parent continue if hasattr(inner_obj, 'im_self'): inner_obj=inner_obj.im_self inner_obj=getattr(inner_obj, 'aq_inner', inner_obj) continue break return tuple(roles.keys()) security.declarePublic('getDomains') def getDomains(self): """Return the list of domain restrictions for a user""" return self._original_domains security.declarePrivate("getProperty") def getProperty(self, name, default = _marker): """getProperty(self, name) => return property value or raise AttributeError """ # Try to do an attribute lookup on the underlying user object v = getattr(self.__underlying__, name, _marker) if v is _marker: raise AttributeError, name return v security.declarePrivate("hasProperty") def hasProperty(self, name): """hasProperty""" return hasattr(self.__underlying__, name) security.declarePrivate("setProperty") def setProperty(self, name, value): """setProperty => Try to set the property... By now, it's available only for LDAPUserFolder """ # Get actual source src = self._GRUF.getUserSource(self.getUserSourceId()) if not src: raise RuntimeError, "Invalid or missing user source for '%s'." % (self.getId(),) # LDAPUserFolder => specific API. if hasattr(src, "manage_setUserProperty"): # Unmap pty name if necessary, get it in the schema ldapname = None for schema in src.getSchemaConfig().values(): if schema["ldap_name"] == name: ldapname = schema["ldap_name"] if schema["public_name"] == name: ldapname = schema["ldap_name"] break # If we didn't find it, we skip it if ldapname is None: raise KeyError, "Invalid LDAP attribute: '%s'." % (name, ) # Edit user user_dn = src._find_user_dn(self.getUserName()) src.manage_setUserProperty(user_dn, ldapname, value) # Expire the underlying user object self.__underlying__ = src.getUser(self.getId()) if not self.__underlying__: raise RuntimeError, "Error while setting property of '%s'." % (self.getId(),) # Now we check if the property has been changed if not self.hasProperty(name): raise NotImplementedError, "Property setting is not supported for '%s'." % (name,) v = self._GRUF.getUserById(self.getId()).getProperty(name) if not v == value: Log(LOG_DEBUG, "Property '%s' for user '%s' should be '%s' and not '%s'" % ( name, self.getId(), value, v, )) raise NotImplementedError, "Property setting is not supported for '%s'." % (name,) # ------------------------------ # Internal User object interface # ------------------------------ security.declarePrivate('authenticate') def authenticate(self, password, request): # We prevent groups from authenticating if self._isGroup: return None return self.__underlying__.authenticate(password, request) security.declarePublic('allowed') def allowed(self, object, object_roles=None): """Check whether the user has access to object. The user must have one of the roles in object_roles to allow access.""" if object_roles is _what_not_even_god_should_do: return 0 # Short-circuit the common case of anonymous access. if object_roles is None or 'Anonymous' in object_roles: return 1 # Provide short-cut access if object is protected by 'Authenticated' # role and user is not nobody if 'Authenticated' in object_roles and \ (self.getUserName() != 'Anonymous User'): return 1 # Check for ancient role data up front, convert if found. # This should almost never happen, and should probably be # deprecated at some point. if 'Shared' in object_roles: object_roles = self._shared_roles(object) if object_roles is None or 'Anonymous' in object_roles: return 1 # Trying to make some speed improvements, changes starts here. # Helge Tesdal, Plone Solutions AS, http://www.plonesolutions.com # We avoid using the getRoles() and getRolesInContext() methods to be able # to short circuit. # Dict for faster lookup and avoiding duplicates object_roles_dict = {} for role in object_roles: object_roles_dict[role] = 1 if [role for role in self.getUserRoles() if object_roles_dict.has_key(role)]: if self._check_context(object): return 1 return None # Try the top level group roles. if [role for role in self.getGroupRoles() if object_roles_dict.has_key(role)]: if self._check_context(object): return 1 return None user_groups = self.getGroups() # No luck on the top level, try local roles inner_obj = getattr(object, 'aq_inner', object) userid = self.getId() while 1: local_roles = getattr(inner_obj, '__ac_local_roles__', None) if local_roles: if callable(local_roles): local_roles = local_roles() dict = local_roles or {} if [role for role in dict.get(userid, []) if object_roles_dict.has_key(role)]: if self._check_context(object): return 1 return None # Get roles & local roles for groups # This handles nested groups as well for groupid in user_groups: if [role for role in dict.get(groupid, []) if object_roles_dict.has_key(role)]: if self._check_context(object): return 1 return None # LocalRole blocking obj = getattr(inner_obj, 'aq_base', inner_obj) if getattr(obj, '__ac_local_roles_block__', None): break # Loop control inner = getattr(inner_obj, 'aq_inner', inner_obj) parent = getattr(inner, 'aq_parent', None) if parent is not None: inner_obj = parent continue if hasattr(inner_obj, 'im_self'): inner_obj=inner_obj.im_self inner_obj=getattr(inner_obj, 'aq_inner', inner_obj) continue break return None security.declarePublic('hasRole') def hasRole(self, *args, **kw): """hasRole is an alias for 'allowed' and has been deprecated. Code still using this method should convert to either 'has_role' or 'allowed', depending on the intended behaviour. """ import warnings warnings.warn('BasicUser.hasRole is deprecated, please use ' 'BasicUser.allowed instead; hasRole was an alias for allowed, but ' 'you may have ment to use has_role.', DeprecationWarning) return self.allowed(*args, **kw) # # # Underlying user object support # # # def __getattr__(self, name): # This will call the underlying object's methods # if they are not found in this user object. # We will have to check Chris' http://www.plope.com/Members/chrism/plone_on_zope_head # to make it work with Zope HEAD. ret = getattr(self.__dict__['__underlying__'], name) return ret security.declarePublic('getUnwrappedUser') def getUnwrappedUser(self,): """ same as GRUF.getUnwrappedUser, but implicitly with this particular user """ return self.__dict__['__underlying__'] def __getitem__(self, name): # This will call the underlying object's methods # if they are not found in this user object. return self.__underlying__[name] # # # HTML link support # # # def asHTML(self, implicit=0): """ asHTML(self, implicit=0) => HTML string Used to generate homogeneous links for management screens """ acl_users = self.acl_users if self.isGroup(): color = acl_users.group_color kind = "Group" else: color = acl_users.user_color kind = "User" ret = '''%(name)s''' % { "color": color, "href": "%s/%s/manage_workspace" % (acl_users.absolute_url(), self.getId(), ), "name": self.getUserNameWithoutGroupPrefix(), "alt": "%s (%s)" % (self.getUserNameWithoutGroupPrefix(), kind, ), } if implicit: return "%s" % ret return ret security.declarePrivate("isInGroup") def isInGroup(self, groupid): """Return true if the user is member of the specified group id (including transitive groups)""" return groupid in self.getAllGroupIds() security.declarePublic("getRealId") def getRealId(self,): """Return id WITHOUT group prefix """ raise NotImplementedError, "Must be derived in subclasses" class GRUFUser(GRUFUserAtom): """ This is the class for actual user objects """ __implements__ = (IUser, ) security = ClassSecurityInfo() # # # User Mutation # # # security.declarePublic('changePassword') def changePassword(self, password): """Set the user's password. This method performs its own security checks""" # Check security user = getSecurityManager().getUser() if not user.has_permission(Permissions.manage_users, self._GRUF): # Is manager ? if user.__class__.__name__ != "GRUFUser": raise "Unauthorized", "You cannot change someone else's password." if not user.getId() == self.getId(): # Is myself ? raise "Unauthorized", "You cannot change someone else's password." # Just do it self.clearCachedGroupsAndRoles() return self._GRUF.userSetPassword(self.getId(), password) security.declarePrivate("setRoles") def setRoles(self, roles): """Change the roles of a user atom. """ self.clearCachedGroupsAndRoles() return self._GRUF.userSetRoles(self.getId(), roles) security.declarePrivate("addRole") def addRole(self, role): """Append a role for a user atom """ self.clearCachedGroupsAndRoles() return self._GRUF.userAddRole(self.getId(), role) security.declarePrivate("removeRole") def removeRole(self, role): """Remove the role of a user atom """ self.clearCachedGroupsAndRoles() return self._GRUF.userRemoveRole(self.getId(), role) security.declarePrivate("setPassword") def setPassword(self, newPassword): """Set the password of a user """ self.clearCachedGroupsAndRoles() return self._GRUF.userSetPassword(self.getId(), newPassword) security.declarePrivate("setDomains") def setDomains(self, domains): """Set domains for a user """ self.clearCachedGroupsAndRoles() self._GRUF.userSetDomains(self.getId(), domains) self._original_domains = self._GRUF.userGetDomains(self.getId()) security.declarePrivate("addDomain") def addDomain(self, domain): """Append a domain to a user """ self.clearCachedGroupsAndRoles() self._GRUF.userAddDomain(self.getId(), domain) self._original_domains = self._GRUF.userGetDomains(self.getId()) security.declarePrivate("removeDomain") def removeDomain(self, domain): """Remove a domain from a user """ self.clearCachedGroupsAndRoles() self._GRUF.userRemoveDomain(self.getId(), domain) self._original_domains = self._GRUF.userGetDomains(self.getId()) security.declarePrivate("setGroups") def setGroups(self, groupnames): """Set the groups of a user """ self.clearCachedGroupsAndRoles() return self._GRUF.userSetGroups(self.getId(), groupnames) security.declarePrivate("addGroup") def addGroup(self, groupname): """add a group to a user atom """ self.clearCachedGroupsAndRoles() return self._GRUF.userAddGroup(self.getId(), groupname) security.declarePrivate("removeGroup") def removeGroup(self, groupname): """remove a group from a user atom. """ self.clearCachedGroupsAndRoles() return self._GRUF.userRemoveGroup(self.getId(), groupname) security.declarePrivate('_getPassword') def _getPassword(self): """Return the password of the user.""" return self._original_password security.declarePublic("getRealId") def getRealId(self,): """Return id WITHOUT group prefix """ return self.getId() class GRUFGroup(GRUFUserAtom): """ This is the class for actual group objects """ __implements__ = (IGroup, ) security = ClassSecurityInfo() security.declarePublic("getRealId") def getRealId(self,): """Return group id WITHOUT group prefix """ return self.getId()[len(GROUP_PREFIX):] def _getMemberIds(self, users = 1, groups = 1, transitive = 1, ): """Return the member ids (users and groups) of the atoms of this group""" # Initial parameters gruf = self.aq_parent if transitive: method = "getAllGroupIds" else: method = "getGroupIds" if users and not groups: lst = gruf.getPureUserIds() elif groups and not users: lst = gruf.getGroupIds() else: lst = gruf.getUserIds() # Extraction groupid = self.getId() groups_mapping = {} for u in lst: usr = gruf.getUser(u) if not usr: groups_mapping[u] = [] Log(LOG_WARNING, "Invalid user retreiving:", u) else: groups_mapping[u] = getattr(usr, method)() return [u for u in lst if groupid in groups_mapping[u]] security.declarePrivate("getMemberIds") def getMemberIds(self, transitive = 1, ): "Return member ids of this group, including or not transitive groups." return self._getMemberIds(transitive = transitive) security.declarePrivate("getUserMemberIds") def getUserMemberIds(self, transitive = 1, ): """Return the member ids (users only) of the users of this group""" return self._getMemberIds(groups = 0, transitive = transitive) security.declarePrivate("getGroupMemberIds") def getGroupMemberIds(self, transitive = 1, ): """Return the members ids (groups only) of the groups of this group""" return self._getMemberIds(users = 0, transitive = transitive) security.declarePrivate("hasMember") def hasMember(self, id): """Return true if the specified atom id is in the group. This is the contrary of IUserAtom.isInGroup(groupid)""" gruf = self.aq_parent return id in gruf.getMemberIds(self.getId()) security.declarePrivate("setMembers") def setMembers(self, userids): """Set the members of the group """ for userid in userids: self.aq_parent.userFolderAddGroup(userid, groupid) security.declarePrivate("addMember") def addMember(self, userid): """Add a user the the current group""" gruf = self.aq_parent groupid = self.getId() usr = gruf.getUser(userid) if not usr: raise ValueError, "Invalid user: '%s'" % (userid, ) if not groupid in gruf.getGroupNames() + gruf.getGroupIds(): raise ValueError, "Invalid group: '%s'" % (groupid, ) groups = list(usr.getGroups()) groups.append(groupid) groups = GroupUserFolder.unique(groups) return gruf._updateUser(userid, groups = groups) security.declarePrivate("removeMember") def removeMember(self, userid): """Remove a user from the current group""" gruf = self.aq_parent groupid = self.getId() # Check the user usr = gruf.getUser(userid) if not usr: raise ValueError, "Invalid user: '%s'" % (userid, ) # Now, remove the group groups = list(usr.getGroups()) if groupid in groups: groups.remove(groupid) gruf._updateUser(userid, groups = groups) else: raise ValueError, "User '%s' doesn't belong to group '%s'" % (userid, groupid, ) InitializeClass(GRUFUser) InitializeClass(GRUFGroup)