# -*- coding: utf-8 -*- # Copyright 2013 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Integration tests for the acl command.""" from __future__ import absolute_import import re from gslib import aclhelpers from gslib.command import CreateGsutilLogger from gslib.cs_api_map import ApiSelector from gslib.project_id import PopulateProjectId from gslib.storage_url import StorageUrlFromString import gslib.tests.testcase as testcase from gslib.tests.testcase.integration_testcase import SkipForGS from gslib.tests.testcase.integration_testcase import SkipForS3 from gslib.tests.util import ObjectToURI as suri from gslib.translation_helper import AclTranslation from gslib.util import Retry PUBLIC_READ_JSON_ACL_TEXT = '"entity":"allUsers","role":"READER"' class TestAclBase(testcase.GsUtilIntegrationTestCase): """Integration test case base class for acl command.""" _set_acl_prefix = ['acl', 'set'] _get_acl_prefix = ['acl', 'get'] _set_defacl_prefix = ['defacl', 'set'] _ch_acl_prefix = ['acl', 'ch'] _project_team = 'owners' _project_test_acl = '%s-%s' % (_project_team, PopulateProjectId()) @SkipForS3('Tests use GS ACL model.') class TestAcl(TestAclBase): """Integration tests for acl command.""" def setUp(self): super(TestAcl, self).setUp() self.sample_uri = self.CreateBucket() self.sample_url = StorageUrlFromString(str(self.sample_uri)) self.logger = CreateGsutilLogger('acl') def test_set_invalid_acl_object(self): """Ensures that invalid content returns a bad request error.""" obj_uri = suri(self.CreateObject(contents='foo')) inpath = self.CreateTempFile(contents='badAcl') stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri], return_stderr=True, expected_status=1) self.assertIn('ArgumentException', stderr) def test_set_invalid_acl_bucket(self): """Ensures that invalid content returns a bad request error.""" bucket_uri = suri(self.CreateBucket()) inpath = self.CreateTempFile(contents='badAcl') stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri], return_stderr=True, expected_status=1) self.assertIn('ArgumentException', stderr) def test_set_xml_acl_json_api_object(self): """Ensures XML content returns a bad request error and migration warning.""" obj_uri = suri(self.CreateObject(contents='foo')) inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>') stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri], return_stderr=True, expected_status=1) self.assertIn('ArgumentException', stderr) self.assertIn('XML ACL data provided', stderr) def test_set_xml_acl_json_api_bucket(self): """Ensures XML content returns a bad request error and migration warning.""" bucket_uri = suri(self.CreateBucket()) inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>') stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri], return_stderr=True, expected_status=1) self.assertIn('ArgumentException', stderr) self.assertIn('XML ACL data provided', stderr) def test_set_valid_acl_object(self): """Tests setting a valid ACL on an object.""" obj_uri = suri(self.CreateObject(contents='foo')) acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri], return_stdout=True) inpath = self.CreateTempFile(contents=acl_string) self.RunGsUtil(self._set_acl_prefix + ['public-read', obj_uri]) acl_string2 = self.RunGsUtil(self._get_acl_prefix + [obj_uri], return_stdout=True) self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri]) acl_string3 = self.RunGsUtil(self._get_acl_prefix + [obj_uri], return_stdout=True) self.assertNotEqual(acl_string, acl_string2) self.assertEqual(acl_string, acl_string3) def test_set_valid_permission_whitespace_object(self): """Ensures that whitespace is allowed in role and entity elements.""" obj_uri = suri(self.CreateObject(contents='foo')) acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri], return_stdout=True) acl_string = re.sub(r'"role"', r'"role" \n', acl_string) acl_string = re.sub(r'"entity"', r'\n "entity"', acl_string) inpath = self.CreateTempFile(contents=acl_string) self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri]) def test_set_valid_acl_bucket(self): """Ensures that valid canned and XML ACLs work with get/set.""" bucket_uri = suri(self.CreateBucket()) acl_string = self.RunGsUtil(self._get_acl_prefix + [bucket_uri], return_stdout=True) inpath = self.CreateTempFile(contents=acl_string) self.RunGsUtil(self._set_acl_prefix + ['public-read', bucket_uri]) acl_string2 = self.RunGsUtil(self._get_acl_prefix + [bucket_uri], return_stdout=True) self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri]) acl_string3 = self.RunGsUtil(self._get_acl_prefix + [bucket_uri], return_stdout=True) self.assertNotEqual(acl_string, acl_string2) self.assertEqual(acl_string, acl_string3) def test_invalid_canned_acl_object(self): """Ensures that an invalid canned ACL returns a CommandException.""" obj_uri = suri(self.CreateObject(contents='foo')) stderr = self.RunGsUtil( self._set_acl_prefix + ['not-a-canned-acl', obj_uri], return_stderr=True, expected_status=1) self.assertIn('CommandException', stderr) self.assertIn('Invalid canned ACL', stderr) def test_set_valid_def_acl_bucket(self): """Ensures that valid default canned and XML ACLs works with get/set.""" bucket_uri = self.CreateBucket() # Default ACL is project private. obj_uri1 = suri(self.CreateObject(bucket_uri=bucket_uri, contents='foo')) acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri1], return_stdout=True) # Change it to authenticated-read. self.RunGsUtil( self._set_defacl_prefix + ['authenticated-read', suri(bucket_uri)]) obj_uri2 = suri(self.CreateObject(bucket_uri=bucket_uri, contents='foo2')) acl_string2 = self.RunGsUtil(self._get_acl_prefix + [obj_uri2], return_stdout=True) # Now change it back to the default via XML. inpath = self.CreateTempFile(contents=acl_string) self.RunGsUtil(self._set_defacl_prefix + [inpath, suri(bucket_uri)]) obj_uri3 = suri(self.CreateObject(bucket_uri=bucket_uri, contents='foo3')) acl_string3 = self.RunGsUtil(self._get_acl_prefix + [obj_uri3], return_stdout=True) self.assertNotEqual(acl_string, acl_string2) self.assertIn('allAuthenticatedUsers', acl_string2) self.assertEqual(acl_string, acl_string3) def test_acl_set_version_specific_uri(self): """Tests setting an ACL on a specific version of an object.""" bucket_uri = self.CreateVersionedBucket() # Create initial object version. uri = self.CreateObject(bucket_uri=bucket_uri, contents='data') # Create a second object version. inpath = self.CreateTempFile(contents='def') self.RunGsUtil(['cp', inpath, uri.uri]) # Find out the two object version IDs. lines = self.AssertNObjectsInBucket(bucket_uri, 2, versioned=True) v0_uri_str, v1_uri_str = lines[0], lines[1] # Check that neither version currently has public-read permission # (default ACL is project-private). orig_acls = [] for uri_str in (v0_uri_str, v1_uri_str): acl = self.RunGsUtil(self._get_acl_prefix + [uri_str], return_stdout=True) self.assertNotIn(PUBLIC_READ_JSON_ACL_TEXT, self._strip_json_whitespace(acl)) orig_acls.append(acl) # Set the ACL for the older version of the object to public-read. self.RunGsUtil(self._set_acl_prefix + ['public-read', v0_uri_str]) # Check that the older version's ACL is public-read, but newer version # is not. acl = self.RunGsUtil(self._get_acl_prefix + [v0_uri_str], return_stdout=True) self.assertIn(PUBLIC_READ_JSON_ACL_TEXT, self._strip_json_whitespace(acl)) acl = self.RunGsUtil(self._get_acl_prefix + [v1_uri_str], return_stdout=True) self.assertNotIn(PUBLIC_READ_JSON_ACL_TEXT, self._strip_json_whitespace(acl)) # Check that reading the ACL with the version-less URI returns the # original ACL (since the version-less URI means the current version). acl = self.RunGsUtil(self._get_acl_prefix + [uri.uri], return_stdout=True) self.assertEqual(acl, orig_acls[0]) def _strip_json_whitespace(self, json_text): return re.sub(r'\s*', '', json_text) def testAclChangeWithUserId(self): change = aclhelpers.AclChange(self.USER_TEST_ID + ':r', scope_type=aclhelpers.ChangeType.USER) acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) change.Execute(self.sample_url, acl, 'acl', self.logger) self._AssertHas(acl, 'READER', 'UserById', self.USER_TEST_ID) def testAclChangeWithGroupId(self): change = aclhelpers.AclChange(self.GROUP_TEST_ID + ':r', scope_type=aclhelpers.ChangeType.GROUP) acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) change.Execute(self.sample_url, acl, 'acl', self.logger) self._AssertHas(acl, 'READER', 'GroupById', self.GROUP_TEST_ID) def testAclChangeWithUserEmail(self): change = aclhelpers.AclChange(self.USER_TEST_ADDRESS + ':r', scope_type=aclhelpers.ChangeType.USER) acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) change.Execute(self.sample_url, acl, 'acl', self.logger) self._AssertHas(acl, 'READER', 'UserByEmail', self.USER_TEST_ADDRESS) def testAclChangeWithGroupEmail(self): change = aclhelpers.AclChange(self.GROUP_TEST_ADDRESS + ':fc', scope_type=aclhelpers.ChangeType.GROUP) acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) change.Execute(self.sample_url, acl, 'acl', self.logger) self._AssertHas(acl, 'OWNER', 'GroupByEmail', self.GROUP_TEST_ADDRESS) def testAclChangeWithDomain(self): change = aclhelpers.AclChange(self.DOMAIN_TEST + ':READ', scope_type=aclhelpers.ChangeType.GROUP) acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) change.Execute(self.sample_url, acl, 'acl', self.logger) self._AssertHas(acl, 'READER', 'GroupByDomain', self.DOMAIN_TEST) def testAclChangeWithProjectOwners(self): change = aclhelpers.AclChange(self._project_test_acl + ':READ', scope_type=aclhelpers.ChangeType.PROJECT) acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) change.Execute(self.sample_url, acl, 'acl', self.logger) self._AssertHas(acl, 'READER', 'Project', self._project_test_acl) def testAclChangeWithAllUsers(self): change = aclhelpers.AclChange('AllUsers:WRITE', scope_type=aclhelpers.ChangeType.GROUP) acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) change.Execute(self.sample_url, acl, 'acl', self.logger) self._AssertHas(acl, 'WRITER', 'AllUsers') def testAclChangeWithAllAuthUsers(self): change = aclhelpers.AclChange('AllAuthenticatedUsers:READ', scope_type=aclhelpers.ChangeType.GROUP) acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) change.Execute(self.sample_url, acl, 'acl', self.logger) self._AssertHas(acl, 'READER', 'AllAuthenticatedUsers') remove = aclhelpers.AclDel('AllAuthenticatedUsers') remove.Execute(self.sample_url, acl, 'acl', self.logger) self._AssertHasNo(acl, 'READER', 'AllAuthenticatedUsers') def testAclDelWithUser(self): add = aclhelpers.AclChange(self.USER_TEST_ADDRESS + ':READ', scope_type=aclhelpers.ChangeType.USER) acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) add.Execute(self.sample_url, acl, 'acl', self.logger) self._AssertHas(acl, 'READER', 'UserByEmail', self.USER_TEST_ADDRESS) remove = aclhelpers.AclDel(self.USER_TEST_ADDRESS) remove.Execute(self.sample_url, acl, 'acl', self.logger) self._AssertHasNo(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS) def testAclDelWithProjectOwners(self): add = aclhelpers.AclChange(self._project_test_acl + ':READ', scope_type=aclhelpers.ChangeType.PROJECT) acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) add.Execute(self.sample_url, acl, 'acl', self.logger) self._AssertHas(acl, 'READER', 'Project', self._project_test_acl) remove = aclhelpers.AclDel(self._project_test_acl) remove.Execute(self.sample_url, acl, 'acl', self.logger) self._AssertHasNo(acl, 'READ', 'Project', self._project_test_acl) def testAclDelWithGroup(self): add = aclhelpers.AclChange(self.USER_TEST_ADDRESS + ':READ', scope_type=aclhelpers.ChangeType.GROUP) acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl())) add.Execute(self.sample_url, acl, 'acl', self.logger) self._AssertHas(acl, 'READER', 'GroupByEmail', self.USER_TEST_ADDRESS) remove = aclhelpers.AclDel(self.USER_TEST_ADDRESS) remove.Execute(self.sample_url, acl, 'acl', self.logger) self._AssertHasNo(acl, 'READER', 'GroupByEmail', self.GROUP_TEST_ADDRESS) # # Here are a whole lot of verbose asserts # def _AssertHas(self, current_acl, perm, scope, value=None): matches = list(self._YieldMatchingEntriesJson(current_acl, perm, scope, value)) self.assertEqual(1, len(matches)) def _AssertHasNo(self, current_acl, perm, scope, value=None): matches = list(self._YieldMatchingEntriesJson(current_acl, perm, scope, value)) self.assertEqual(0, len(matches)) def _YieldMatchingEntriesJson(self, current_acl, perm, scope, value=None): """Generator that yields entries that match the change descriptor. Args: current_acl: A list of apitools_messages.BucketAccessControls or ObjectAccessControls which will be searched for matching entries. perm: Role (permission) to match. scope: Scope type to match. value: Value to match (against the scope type). Yields: An apitools_messages.BucketAccessControl or ObjectAccessControl. """ for entry in current_acl: if (scope in ['UserById', 'GroupById'] and entry.entityId and value == entry.entityId and entry.role == perm): yield entry elif (scope in ['UserByEmail', 'GroupByEmail'] and entry.email and value == entry.email and entry.role == perm): yield entry elif (scope == 'GroupByDomain' and entry.domain and value == entry.domain and entry.role == perm): yield entry elif (scope == 'Project' and entry.role == perm and value == entry.entityId): yield entry elif (scope in ['AllUsers', 'AllAuthenticatedUsers'] and entry.entity.lower() == scope.lower() and entry.role == perm): yield entry def _MakeScopeRegex(self, role, entity_type, email_address): template_regex = (r'\{.*"entity":\s*"%s-%s".*"role":\s*"%s".*\}' % (entity_type, email_address, role)) return re.compile(template_regex, flags=re.DOTALL) def _MakeProjectScopeRegex(self, role, project_team): template_regex = (r'\{.*"entity":\s*"project-%s-\d+",\s*"projectTeam":\s*' r'\{\s*"projectNumber":\s*"(\d+)",\s*"team":\s*"%s"\s*\},' r'\s*"role":\s*"%s".*\}') % (project_team, project_team, role) return re.compile(template_regex, flags=re.DOTALL) def testBucketAclChange(self): """Tests acl change on a bucket.""" test_regex = self._MakeScopeRegex( 'OWNER', 'user', self.USER_TEST_ADDRESS) json_text = self.RunGsUtil( self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) self.assertNotRegexpMatches(json_text, test_regex) self.RunGsUtil(self._ch_acl_prefix + ['-u', self.USER_TEST_ADDRESS+':fc', suri(self.sample_uri)]) json_text = self.RunGsUtil( self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) self.assertRegexpMatches(json_text, test_regex) test_regex2 = self._MakeScopeRegex( 'WRITER', 'user', self.USER_TEST_ADDRESS) self.RunGsUtil(self._ch_acl_prefix + ['-u', self.USER_TEST_ADDRESS+':w', suri(self.sample_uri)]) json_text2 = self.RunGsUtil( self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) self.assertRegexpMatches(json_text2, test_regex2) self.RunGsUtil(self._ch_acl_prefix + ['-d', self.USER_TEST_ADDRESS, suri(self.sample_uri)]) json_text3 = self.RunGsUtil( self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) self.assertNotRegexpMatches(json_text3, test_regex) def testProjectAclChangesOnBucket(self): """Tests project entity acl changes on a bucket.""" if self.test_api == ApiSelector.XML: stderr = self.RunGsUtil(self._ch_acl_prefix + ['-p', self._project_test_acl +':w', suri(self.sample_uri)], expected_status=1, return_stderr=True) self.assertIn(('CommandException: XML API does not support project' ' scopes, cannot translate ACL.'), stderr) else: test_regex = self._MakeProjectScopeRegex( 'WRITER', self._project_team) self.RunGsUtil(self._ch_acl_prefix + ['-p', self._project_test_acl +':w', suri(self.sample_uri)]) json_text = self.RunGsUtil( self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) self.assertRegexpMatches(json_text, test_regex) # The api will accept string project ids, but stores the numeric project # ids internally, this extracts the numeric id from the returned acls. proj_num_id = test_regex.search(json_text).group(1) acl_to_remove = '%s-%s' % (self._project_team, proj_num_id) self.RunGsUtil(self._ch_acl_prefix + ['-d', acl_to_remove, suri(self.sample_uri)]) json_text2 = self.RunGsUtil( self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True) self.assertNotRegexpMatches(json_text2, test_regex) def testObjectAclChange(self): """Tests acl change on an object.""" obj = self.CreateObject(bucket_uri=self.sample_uri, contents='something') self.AssertNObjectsInBucket(self.sample_uri, 1) test_regex = self._MakeScopeRegex( 'READER', 'group', self.GROUP_TEST_ADDRESS) json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], return_stdout=True) self.assertNotRegexpMatches(json_text, test_regex) self.RunGsUtil(self._ch_acl_prefix + ['-g', self.GROUP_TEST_ADDRESS+':READ', suri(obj)]) json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], return_stdout=True) self.assertRegexpMatches(json_text, test_regex) test_regex2 = self._MakeScopeRegex( 'OWNER', 'group', self.GROUP_TEST_ADDRESS) self.RunGsUtil(self._ch_acl_prefix + ['-g', self.GROUP_TEST_ADDRESS+':OWNER', suri(obj)]) json_text2 = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], return_stdout=True) self.assertRegexpMatches(json_text2, test_regex2) self.RunGsUtil(self._ch_acl_prefix + ['-d', self.GROUP_TEST_ADDRESS, suri(obj)]) json_text3 = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], return_stdout=True) self.assertNotRegexpMatches(json_text3, test_regex2) all_auth_regex = re.compile( r'\{.*"entity":\s*"allAuthenticatedUsers".*"role":\s*"OWNER".*\}', flags=re.DOTALL) self.RunGsUtil(self._ch_acl_prefix + ['-g', 'AllAuth:O', suri(obj)]) json_text4 = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], return_stdout=True) self.assertRegexpMatches(json_text4, all_auth_regex) def testObjectAclChangeAllUsers(self): """Tests acl ch AllUsers:R on an object.""" obj = self.CreateObject(bucket_uri=self.sample_uri, contents='something') self.AssertNObjectsInBucket(self.sample_uri, 1) all_users_regex = re.compile( r'\{.*"entity":\s*"allUsers".*"role":\s*"READER".*\}', flags=re.DOTALL) json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], return_stdout=True) self.assertNotRegexpMatches(json_text, all_users_regex) self.RunGsUtil(self._ch_acl_prefix + ['-g', 'AllUsers:R', suri(obj)]) json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], return_stdout=True) self.assertRegexpMatches(json_text, all_users_regex) def testMultithreadedAclChange(self, count=10): """Tests multi-threaded acl changing on several objects.""" objects = [] for i in range(count): objects.append(self.CreateObject( bucket_uri=self.sample_uri, contents='something {0}'.format(i))) self.AssertNObjectsInBucket(self.sample_uri, count) test_regex = self._MakeScopeRegex( 'READER', 'group', self.GROUP_TEST_ADDRESS) json_texts = [] for obj in objects: json_texts.append(self.RunGsUtil( self._get_acl_prefix + [suri(obj)], return_stdout=True)) for json_text in json_texts: self.assertNotRegexpMatches(json_text, test_regex) uris = [suri(obj) for obj in objects] self.RunGsUtil(['-m', '-DD'] + self._ch_acl_prefix + ['-g', self.GROUP_TEST_ADDRESS+':READ'] + uris) json_texts = [] for obj in objects: json_texts.append(self.RunGsUtil( self._get_acl_prefix + [suri(obj)], return_stdout=True)) for json_text in json_texts: self.assertRegexpMatches(json_text, test_regex) def testRecursiveChangeAcl(self): """Tests recursively changing ACLs on nested objects.""" obj = self.CreateObject(bucket_uri=self.sample_uri, object_name='foo/bar', contents='something') self.AssertNObjectsInBucket(self.sample_uri, 1) test_regex = self._MakeScopeRegex( 'READER', 'group', self.GROUP_TEST_ADDRESS) json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], return_stdout=True) self.assertNotRegexpMatches(json_text, test_regex) @Retry(AssertionError, tries=5, timeout_secs=1) def _AddAcl(): self.RunGsUtil( self._ch_acl_prefix + ['-R', '-g', self.GROUP_TEST_ADDRESS+':READ', suri(obj)[:-3]]) json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], return_stdout=True) self.assertRegexpMatches(json_text, test_regex) _AddAcl() @Retry(AssertionError, tries=5, timeout_secs=1) def _DeleteAcl(): self.RunGsUtil(self._ch_acl_prefix + ['-d', self.GROUP_TEST_ADDRESS, suri(obj)]) json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)], return_stdout=True) self.assertNotRegexpMatches(json_text, test_regex) _DeleteAcl() def testMultiVersionSupport(self): """Tests changing ACLs on multiple object versions.""" bucket = self.CreateVersionedBucket() object_name = self.MakeTempName('obj') self.CreateObject( bucket_uri=bucket, object_name=object_name, contents='One thing') # Create another on the same URI, giving us a second version. self.CreateObject( bucket_uri=bucket, object_name=object_name, contents='Another thing') lines = self.AssertNObjectsInBucket(bucket, 2, versioned=True) obj_v1, obj_v2 = lines[0], lines[1] test_regex = self._MakeScopeRegex( 'READER', 'group', self.GROUP_TEST_ADDRESS) json_text = self.RunGsUtil(self._get_acl_prefix + [obj_v1], return_stdout=True) self.assertNotRegexpMatches(json_text, test_regex) self.RunGsUtil(self._ch_acl_prefix + ['-g', self.GROUP_TEST_ADDRESS+':READ', obj_v1]) json_text = self.RunGsUtil(self._get_acl_prefix + [obj_v1], return_stdout=True) self.assertRegexpMatches(json_text, test_regex) json_text = self.RunGsUtil(self._get_acl_prefix + [obj_v2], return_stdout=True) self.assertNotRegexpMatches(json_text, test_regex) def testBadRequestAclChange(self): stdout, stderr = self.RunGsUtil( self._ch_acl_prefix + ['-u', 'invalid_$$@hello.com:R', suri(self.sample_uri)], return_stdout=True, return_stderr=True, expected_status=1) self.assertIn('BadRequestException', stderr) self.assertNotIn('Retrying', stdout) self.assertNotIn('Retrying', stderr) def testAclGetWithoutFullControl(self): object_uri = self.CreateObject(contents='foo') with self.SetAnonymousBotoCreds(): stderr = self.RunGsUtil(self._get_acl_prefix + [suri(object_uri)], return_stderr=True, expected_status=1) self.assertIn('AccessDeniedException', stderr) def testTooFewArgumentsFails(self): """Tests calling ACL commands with insufficient number of arguments.""" # No arguments for get, but valid subcommand. stderr = self.RunGsUtil(self._get_acl_prefix, return_stderr=True, expected_status=1) self.assertIn('command requires at least', stderr) # No arguments for set, but valid subcommand. stderr = self.RunGsUtil(self._set_acl_prefix, return_stderr=True, expected_status=1) self.assertIn('command requires at least', stderr) # No arguments for ch, but valid subcommand. stderr = self.RunGsUtil(self._ch_acl_prefix, return_stderr=True, expected_status=1) self.assertIn('command requires at least', stderr) # Neither arguments nor subcommand. stderr = self.RunGsUtil(['acl'], return_stderr=True, expected_status=1) self.assertIn('command requires at least', stderr) def testMinusF(self): """Tests -f option to continue after failure.""" bucket_uri = self.CreateBucket() obj_uri = suri(self.CreateObject(bucket_uri=bucket_uri, object_name='foo', contents='foo')) acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri], return_stdout=True) self.RunGsUtil(self._set_acl_prefix + ['-f', 'public-read', suri(bucket_uri) + 'foo2', obj_uri], expected_status=1) acl_string2 = self.RunGsUtil(self._get_acl_prefix + [obj_uri], return_stdout=True) self.assertNotEqual(acl_string, acl_string2) class TestS3CompatibleAcl(TestAclBase): """ACL integration tests that work for s3 and gs URLs.""" def testAclObjectGetSet(self): bucket_uri = self.CreateBucket() obj_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo') self.AssertNObjectsInBucket(bucket_uri, 1) stdout = self.RunGsUtil(self._get_acl_prefix + [suri(obj_uri)], return_stdout=True) set_contents = self.CreateTempFile(contents=stdout) self.RunGsUtil(self._set_acl_prefix + [set_contents, suri(obj_uri)]) def testAclBucketGetSet(self): bucket_uri = self.CreateBucket() stdout = self.RunGsUtil(self._get_acl_prefix + [suri(bucket_uri)], return_stdout=True) set_contents = self.CreateTempFile(contents=stdout) self.RunGsUtil(self._set_acl_prefix + [set_contents, suri(bucket_uri)]) @SkipForGS('S3 ACLs accept XML and should not cause an XML warning.') class TestS3OnlyAcl(TestAclBase): """ACL integration tests that work only for s3 URLs.""" # TODO: Format all test case names consistently. def test_set_xml_acl(self): """Ensures XML content does not return an XML warning for S3.""" obj_uri = suri(self.CreateObject(contents='foo')) inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>') stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri], return_stderr=True, expected_status=1) self.assertIn('BadRequestException', stderr) self.assertNotIn('XML ACL data provided', stderr) def test_set_xml_acl_bucket(self): """Ensures XML content does not return an XML warning for S3.""" bucket_uri = suri(self.CreateBucket()) inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>') stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri], return_stderr=True, expected_status=1) self.assertIn('BadRequestException', stderr) self.assertNotIn('XML ACL data provided', stderr) class TestAclOldAlias(TestAcl): _set_acl_prefix = ['setacl'] _get_acl_prefix = ['getacl'] _set_defacl_prefix = ['setdefacl'] _ch_acl_prefix = ['chacl']