001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.access;
019
020import static org.apache.hadoop.hbase.AuthUtil.toGroupEntry;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025
026import java.util.Arrays;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.Coprocessor;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HColumnDescriptor;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.HTableDescriptor;
035import org.apache.hadoop.hbase.NamespaceDescriptor;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.TableNotFoundException;
038import org.apache.hadoop.hbase.TestTableName;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.ConnectionFactory;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.ResultScanner;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
048import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
049import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
050import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
051import org.apache.hadoop.hbase.security.User;
052import org.apache.hadoop.hbase.security.access.Permission.Action;
053import org.apache.hadoop.hbase.testclassification.LargeTests;
054import org.apache.hadoop.hbase.testclassification.SecurityTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.zookeeper.ZKUtil;
057import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
058import org.junit.After;
059import org.junit.AfterClass;
060import org.junit.Before;
061import org.junit.BeforeClass;
062import org.junit.ClassRule;
063import org.junit.Rule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069@Category({SecurityTests.class, LargeTests.class})
070public class TestAccessController2 extends SecureTestUtil {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074      HBaseClassTestRule.forClass(TestAccessController2.class);
075
076  private static final Logger LOG = LoggerFactory.getLogger(TestAccessController2.class);
077
078  private static final byte[] TEST_ROW = Bytes.toBytes("test");
079  private static final byte[] TEST_FAMILY = Bytes.toBytes("f");
080  private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q");
081  private static final byte[] TEST_VALUE = Bytes.toBytes("value");
082
083  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
084  private static Configuration conf;
085
086  /** The systemUserConnection created here is tied to the system user. In case, you are planning
087   * to create AccessTestAction, DON'T use this systemUserConnection as the 'doAs' user
088   * gets  eclipsed by the system user. */
089  private static Connection systemUserConnection;
090
091  private final static byte[] Q1 = Bytes.toBytes("q1");
092  private final static byte[] value1 = Bytes.toBytes("value1");
093
094  private static byte[] TEST_FAMILY_2 = Bytes.toBytes("f2");
095  private static byte[] TEST_ROW_2 = Bytes.toBytes("r2");
096  private final static byte[] Q2 = Bytes.toBytes("q2");
097  private final static byte[] value2 = Bytes.toBytes("value2");
098
099  private static byte[] TEST_ROW_3 = Bytes.toBytes("r3");
100
101  private static final String TESTGROUP_1 = "testgroup_1";
102  private static final String TESTGROUP_2 = "testgroup_2";
103
104  private static User TESTGROUP1_USER1;
105  private static User TESTGROUP2_USER1;
106
107  @Rule
108  public TestTableName TEST_TABLE = new TestTableName();
109  private String namespace = "testNamespace";
110  private String tname = namespace + ":testtable1";
111  private TableName tableName = TableName.valueOf(tname);
112  private static String TESTGROUP_1_NAME;
113
114  @BeforeClass
115  public static void setupBeforeClass() throws Exception {
116    conf = TEST_UTIL.getConfiguration();
117    // Up the handlers; this test needs more than usual.
118    conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
119    // Enable security
120    enableSecurity(conf);
121    // Verify enableSecurity sets up what we require
122    verifyConfiguration(conf);
123    TEST_UTIL.startMiniCluster();
124    // Wait for the ACL table to become available
125    TEST_UTIL.waitUntilAllRegionsAssigned(AccessControlLists.ACL_TABLE_NAME);
126
127    TESTGROUP_1_NAME = toGroupEntry(TESTGROUP_1);
128    TESTGROUP1_USER1 =
129        User.createUserForTesting(conf, "testgroup1_user1", new String[] { TESTGROUP_1 });
130    TESTGROUP2_USER1 =
131        User.createUserForTesting(conf, "testgroup2_user2", new String[] { TESTGROUP_2 });
132
133    systemUserConnection = ConnectionFactory.createConnection(conf);
134  }
135
136  @Before
137  public void setUp() throws Exception {
138    createNamespace(TEST_UTIL, NamespaceDescriptor.create(namespace).build());
139    try (Table table = createTable(TEST_UTIL, tableName,
140          new byte[][] { TEST_FAMILY, TEST_FAMILY_2 })) {
141      TEST_UTIL.waitTableEnabled(tableName);
142
143      // Ingesting test data.
144      table.put(Arrays.asList(new Put(TEST_ROW).addColumn(TEST_FAMILY, Q1, value1),
145          new Put(TEST_ROW_2).addColumn(TEST_FAMILY, Q2, value2),
146          new Put(TEST_ROW_3).addColumn(TEST_FAMILY_2, Q1, value1)));
147    }
148
149    assertEquals(1, AccessControlLists.getTablePermissions(conf, tableName).size());
150    try {
151      assertEquals(1, AccessControlClient.getUserPermissions(systemUserConnection,
152          tableName.toString()).size());
153    } catch (Throwable e) {
154      LOG.error("Error during call of AccessControlClient.getUserPermissions. ", e);
155    }
156
157  }
158
159  @AfterClass
160  public static void tearDownAfterClass() throws Exception {
161    systemUserConnection.close();
162    TEST_UTIL.shutdownMiniCluster();
163  }
164
165  @After
166  public void tearDown() throws Exception {
167    // Clean the _acl_ table
168    try {
169      deleteTable(TEST_UTIL, tableName);
170    } catch (TableNotFoundException ex) {
171      // Test deleted the table, no problem
172      LOG.info("Test deleted table " + tableName);
173    }
174    deleteNamespace(TEST_UTIL, namespace);
175    // Verify all table/namespace permissions are erased
176    assertEquals(0, AccessControlLists.getTablePermissions(conf, tableName).size());
177    assertEquals(0, AccessControlLists.getNamespacePermissions(conf, namespace).size());
178  }
179
180  @Test
181  public void testCreateWithCorrectOwner() throws Exception {
182    // Create a test user
183    final User testUser = User.createUserForTesting(TEST_UTIL.getConfiguration(), "TestUser",
184      new String[0]);
185    // Grant the test user the ability to create tables
186    SecureTestUtil.grantGlobal(TEST_UTIL, testUser.getShortName(), Action.CREATE);
187    verifyAllowed(new AccessTestAction() {
188      @Override
189      public Object run() throws Exception {
190        HTableDescriptor desc = new HTableDescriptor(TEST_TABLE.getTableName());
191        desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
192        try (Connection connection =
193            ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), testUser)) {
194          try (Admin admin = connection.getAdmin()) {
195            createTable(TEST_UTIL, admin, desc);
196          }
197        }
198        return null;
199      }
200    }, testUser);
201    TEST_UTIL.waitTableAvailable(TEST_TABLE.getTableName());
202    // Verify that owner permissions have been granted to the test user on the
203    // table just created
204    List<TablePermission> perms =
205      AccessControlLists.getTablePermissions(conf, TEST_TABLE.getTableName())
206       .get(testUser.getShortName());
207    assertNotNull(perms);
208    assertFalse(perms.isEmpty());
209    // Should be RWXCA
210    assertTrue(perms.get(0).implies(Permission.Action.READ));
211    assertTrue(perms.get(0).implies(Permission.Action.WRITE));
212    assertTrue(perms.get(0).implies(Permission.Action.EXEC));
213    assertTrue(perms.get(0).implies(Permission.Action.CREATE));
214    assertTrue(perms.get(0).implies(Permission.Action.ADMIN));
215  }
216
217  @Test
218  public void testCreateTableWithGroupPermissions() throws Exception {
219    grantGlobal(TEST_UTIL, TESTGROUP_1_NAME, Action.CREATE);
220    try {
221      AccessTestAction createAction = new AccessTestAction() {
222        @Override
223        public Object run() throws Exception {
224          HTableDescriptor desc = new HTableDescriptor(TEST_TABLE.getTableName());
225          desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
226          try (Connection connection =
227              ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
228            try (Admin admin = connection.getAdmin()) {
229              admin.createTable(desc);
230            }
231          }
232          return null;
233        }
234      };
235      verifyAllowed(createAction, TESTGROUP1_USER1);
236      verifyDenied(createAction, TESTGROUP2_USER1);
237    } finally {
238      revokeGlobal(TEST_UTIL, TESTGROUP_1_NAME, Action.CREATE);
239    }
240  }
241
242  @Test
243  public void testACLTableAccess() throws Exception {
244    final Configuration conf = TEST_UTIL.getConfiguration();
245
246    // Superuser
247    User superUser = User.createUserForTesting(conf, "admin", new String[] { "supergroup" });
248
249    // Global users
250    User globalRead = User.createUserForTesting(conf, "globalRead", new String[0]);
251    User globalWrite = User.createUserForTesting(conf, "globalWrite", new String[0]);
252    User globalCreate = User.createUserForTesting(conf, "globalCreate", new String[0]);
253    User globalAdmin = User.createUserForTesting(conf, "globalAdmin", new String[0]);
254    SecureTestUtil.grantGlobal(TEST_UTIL, globalRead.getShortName(), Action.READ);
255    SecureTestUtil.grantGlobal(TEST_UTIL, globalWrite.getShortName(), Action.WRITE);
256    SecureTestUtil.grantGlobal(TEST_UTIL, globalCreate.getShortName(), Action.CREATE);
257    SecureTestUtil.grantGlobal(TEST_UTIL, globalAdmin.getShortName(), Action.ADMIN);
258
259    // Namespace users
260    User nsRead = User.createUserForTesting(conf, "nsRead", new String[0]);
261    User nsWrite = User.createUserForTesting(conf, "nsWrite", new String[0]);
262    User nsCreate = User.createUserForTesting(conf, "nsCreate", new String[0]);
263    User nsAdmin = User.createUserForTesting(conf, "nsAdmin", new String[0]);
264    SecureTestUtil.grantOnNamespace(TEST_UTIL, nsRead.getShortName(),
265      TEST_TABLE.getTableName().getNamespaceAsString(), Action.READ);
266    SecureTestUtil.grantOnNamespace(TEST_UTIL, nsWrite.getShortName(),
267      TEST_TABLE.getTableName().getNamespaceAsString(), Action.WRITE);
268    SecureTestUtil.grantOnNamespace(TEST_UTIL, nsCreate.getShortName(),
269      TEST_TABLE.getTableName().getNamespaceAsString(), Action.CREATE);
270    SecureTestUtil.grantOnNamespace(TEST_UTIL, nsAdmin.getShortName(),
271      TEST_TABLE.getTableName().getNamespaceAsString(), Action.ADMIN);
272
273    // Table users
274    User tableRead = User.createUserForTesting(conf, "tableRead", new String[0]);
275    User tableWrite = User.createUserForTesting(conf, "tableWrite", new String[0]);
276    User tableCreate = User.createUserForTesting(conf, "tableCreate", new String[0]);
277    User tableAdmin = User.createUserForTesting(conf, "tableAdmin", new String[0]);
278    SecureTestUtil.grantOnTable(TEST_UTIL, tableRead.getShortName(),
279      TEST_TABLE.getTableName(), null, null, Action.READ);
280    SecureTestUtil.grantOnTable(TEST_UTIL, tableWrite.getShortName(),
281      TEST_TABLE.getTableName(), null, null, Action.WRITE);
282    SecureTestUtil.grantOnTable(TEST_UTIL, tableCreate.getShortName(),
283      TEST_TABLE.getTableName(), null, null, Action.CREATE);
284    SecureTestUtil.grantOnTable(TEST_UTIL, tableAdmin.getShortName(),
285      TEST_TABLE.getTableName(), null, null, Action.ADMIN);
286
287    grantGlobal(TEST_UTIL, TESTGROUP_1_NAME, Action.WRITE);
288    try {
289      // Write tests
290
291      AccessTestAction writeAction = new AccessTestAction() {
292        @Override
293        public Object run() throws Exception {
294
295          try (Connection conn = ConnectionFactory.createConnection(conf);
296              Table t = conn.getTable(AccessControlLists.ACL_TABLE_NAME)) {
297            t.put(new Put(TEST_ROW).addColumn(AccessControlLists.ACL_LIST_FAMILY,
298                TEST_QUALIFIER, TEST_VALUE));
299            return null;
300          } finally {
301          }
302        }
303      };
304
305      // All writes to ACL table denied except for GLOBAL WRITE permission and superuser
306
307      verifyDenied(writeAction, globalAdmin, globalCreate, globalRead, TESTGROUP2_USER1);
308      verifyDenied(writeAction, nsAdmin, nsCreate, nsRead, nsWrite);
309      verifyDenied(writeAction, tableAdmin, tableCreate, tableRead, tableWrite);
310      verifyAllowed(writeAction, superUser, globalWrite, TESTGROUP1_USER1);
311    } finally {
312      revokeGlobal(TEST_UTIL, TESTGROUP_1_NAME, Action.WRITE);
313    }
314
315    grantGlobal(TEST_UTIL, TESTGROUP_1_NAME, Action.READ);
316    try {
317      // Read tests
318
319      AccessTestAction scanAction = new AccessTestAction() {
320        @Override
321        public Object run() throws Exception {
322          try (Connection conn = ConnectionFactory.createConnection(conf);
323              Table t = conn.getTable(AccessControlLists.ACL_TABLE_NAME)) {
324            ResultScanner s = t.getScanner(new Scan());
325            try {
326              for (Result r = s.next(); r != null; r = s.next()) {
327                // do nothing
328              }
329            } finally {
330              s.close();
331            }
332            return null;
333          }
334        }
335      };
336
337      // All reads from ACL table denied except for GLOBAL READ and superuser
338
339      verifyDenied(scanAction, globalAdmin, globalCreate, globalWrite, TESTGROUP2_USER1);
340      verifyDenied(scanAction, nsCreate, nsAdmin, nsRead, nsWrite);
341      verifyDenied(scanAction, tableCreate, tableAdmin, tableRead, tableWrite);
342      verifyAllowed(scanAction, superUser, globalRead, TESTGROUP1_USER1);
343    } finally {
344      revokeGlobal(TEST_UTIL, TESTGROUP_1_NAME, Action.READ);
345    }
346  }
347
348  /*
349   * Test table scan operation at table, column family and column qualifier level.
350   */
351  @Test
352  public void testPostGrantAndRevokeScanAction() throws Exception {
353    AccessTestAction scanTableActionForGroupWithTableLevelAccess = new AccessTestAction() {
354      @Override
355      public Void run() throws Exception {
356        try (Connection connection = ConnectionFactory.createConnection(conf);
357            Table table = connection.getTable(tableName);) {
358          Scan s1 = new Scan();
359          try (ResultScanner scanner1 = table.getScanner(s1);) {
360            Result[] next1 = scanner1.next(5);
361            assertTrue("User having table level access should be able to scan all "
362                + "the data in the table.", next1.length == 3);
363          }
364        }
365        return null;
366      }
367    };
368
369    AccessTestAction scanTableActionForGroupWithFamilyLevelAccess = new AccessTestAction() {
370      @Override
371      public Void run() throws Exception {
372        try (Connection connection = ConnectionFactory.createConnection(conf);
373            Table table = connection.getTable(tableName);) {
374          Scan s1 = new Scan();
375          try (ResultScanner scanner1 = table.getScanner(s1);) {
376            Result[] next1 = scanner1.next(5);
377            assertTrue("User having column family level access should be able to scan all "
378                + "the data belonging to that family.", next1.length == 2);
379          }
380        }
381        return null;
382      }
383    };
384
385    AccessTestAction scanFamilyActionForGroupWithFamilyLevelAccess = new AccessTestAction() {
386      @Override
387      public Void run() throws Exception {
388        try (Connection connection = ConnectionFactory.createConnection(conf);
389            Table table = connection.getTable(tableName);) {
390          Scan s1 = new Scan();
391          s1.addFamily(TEST_FAMILY_2);
392          try (ResultScanner scanner1 = table.getScanner(s1);) {
393            scanner1.next();
394          }
395        }
396        return null;
397      }
398    };
399
400    AccessTestAction scanTableActionForGroupWithQualifierLevelAccess = new AccessTestAction() {
401      @Override
402      public Void run() throws Exception {
403        try (Connection connection = ConnectionFactory.createConnection(conf);
404            Table table = connection.getTable(tableName);) {
405          Scan s1 = new Scan();
406          try (ResultScanner scanner1 = table.getScanner(s1);) {
407            Result[] next1 = scanner1.next(5);
408            assertTrue("User having column qualifier level access should be able to scan "
409                + "that column family qualifier data.", next1.length == 1);
410          }
411        }
412        return null;
413      }
414    };
415
416    AccessTestAction scanFamilyActionForGroupWithQualifierLevelAccess = new AccessTestAction() {
417      @Override
418      public Void run() throws Exception {
419        try (Connection connection = ConnectionFactory.createConnection(conf);
420            Table table = connection.getTable(tableName);) {
421          Scan s1 = new Scan();
422          s1.addFamily(TEST_FAMILY_2);
423          try (ResultScanner scanner1 = table.getScanner(s1);) {
424            scanner1.next();
425          }
426        }
427        return null;
428      }
429    };
430
431    AccessTestAction scanQualifierActionForGroupWithQualifierLevelAccess = new AccessTestAction() {
432      @Override
433      public Void run() throws Exception {
434        try (Connection connection = ConnectionFactory.createConnection(conf);
435            Table table = connection.getTable(tableName);) {
436          Scan s1 = new Scan();
437          s1.addColumn(TEST_FAMILY, Q2);
438          try (ResultScanner scanner1 = table.getScanner(s1);) {
439            scanner1.next();
440          }
441        }
442        return null;
443      }
444    };
445
446    // Verify user from a group which has table level access can read all the data and group which
447    // has no access can't read any data.
448    grantOnTable(TEST_UTIL, TESTGROUP_1_NAME, tableName, null, null, Action.READ);
449    verifyAllowed(TESTGROUP1_USER1, scanTableActionForGroupWithTableLevelAccess);
450    verifyDenied(TESTGROUP2_USER1, scanTableActionForGroupWithTableLevelAccess);
451
452    // Verify user from a group whose table level access has been revoked can't read any data.
453    revokeFromTable(TEST_UTIL, TESTGROUP_1_NAME, tableName, null, null);
454    verifyDenied(TESTGROUP1_USER1, scanTableActionForGroupWithTableLevelAccess);
455
456    // Verify user from a group which has column family level access can read all the data
457    // belonging to that family and group which has no access can't read any data.
458    grantOnTable(TEST_UTIL, TESTGROUP_1_NAME, tableName, TEST_FAMILY, null,
459      Permission.Action.READ);
460    verifyAllowed(TESTGROUP1_USER1, scanTableActionForGroupWithFamilyLevelAccess);
461    verifyDenied(TESTGROUP1_USER1, scanFamilyActionForGroupWithFamilyLevelAccess);
462    verifyDenied(TESTGROUP2_USER1, scanTableActionForGroupWithFamilyLevelAccess);
463    verifyDenied(TESTGROUP2_USER1, scanFamilyActionForGroupWithFamilyLevelAccess);
464
465    // Verify user from a group whose column family level access has been revoked can't read any
466    // data from that family.
467    revokeFromTable(TEST_UTIL, TESTGROUP_1_NAME, tableName, TEST_FAMILY, null);
468    verifyDenied(TESTGROUP1_USER1, scanTableActionForGroupWithFamilyLevelAccess);
469
470    // Verify user from a group which has column qualifier level access can read data that has this
471    // family and qualifier, and group which has no access can't read any data.
472    grantOnTable(TEST_UTIL, TESTGROUP_1_NAME, tableName, TEST_FAMILY, Q1, Action.READ);
473    verifyAllowed(TESTGROUP1_USER1, scanTableActionForGroupWithQualifierLevelAccess);
474    verifyDenied(TESTGROUP1_USER1, scanFamilyActionForGroupWithQualifierLevelAccess);
475    verifyDenied(TESTGROUP1_USER1, scanQualifierActionForGroupWithQualifierLevelAccess);
476    verifyDenied(TESTGROUP2_USER1, scanTableActionForGroupWithQualifierLevelAccess);
477    verifyDenied(TESTGROUP2_USER1, scanFamilyActionForGroupWithQualifierLevelAccess);
478    verifyDenied(TESTGROUP2_USER1, scanQualifierActionForGroupWithQualifierLevelAccess);
479
480    // Verify user from a group whose column qualifier level access has been revoked can't read the
481    // data having this column family and qualifier.
482    revokeFromTable(TEST_UTIL, TESTGROUP_1_NAME, tableName, TEST_FAMILY, Q1);
483    verifyDenied(TESTGROUP1_USER1, scanTableActionForGroupWithQualifierLevelAccess);
484  }
485
486  public static class MyAccessController extends AccessController {
487  }
488
489  @Test
490  public void testCoprocessorLoading() throws Exception {
491    MasterCoprocessorHost cpHost =
492        TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterCoprocessorHost();
493    cpHost.load(MyAccessController.class, Coprocessor.PRIORITY_HIGHEST, conf);
494    AccessController ACCESS_CONTROLLER = cpHost.findCoprocessor(MyAccessController.class);
495    MasterCoprocessorEnvironment CP_ENV = cpHost.createEnvironment(
496      ACCESS_CONTROLLER, Coprocessor.PRIORITY_HIGHEST, 1, conf);
497    RegionServerCoprocessorHost rsHost = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)
498        .getRegionServerCoprocessorHost();
499    RegionServerCoprocessorEnvironment RSCP_ENV = rsHost.createEnvironment(
500      ACCESS_CONTROLLER, Coprocessor.PRIORITY_HIGHEST, 1, conf);
501  }
502
503  @Test
504  public void testACLZNodeDeletion() throws Exception {
505    String baseAclZNode = "/hbase/acl/";
506    String ns = "testACLZNodeDeletionNamespace";
507    NamespaceDescriptor desc = NamespaceDescriptor.create(ns).build();
508    createNamespace(TEST_UTIL, desc);
509
510    final TableName table = TableName.valueOf(ns, "testACLZNodeDeletionTable");
511    final byte[] family = Bytes.toBytes("f1");
512    HTableDescriptor htd = new HTableDescriptor(table);
513    htd.addFamily(new HColumnDescriptor(family));
514    createTable(TEST_UTIL, htd);
515
516    // Namespace needs this, as they follow the lazy creation of ACL znode.
517    grantOnNamespace(TEST_UTIL, TESTGROUP1_USER1.getShortName(), ns, Action.ADMIN);
518    ZKWatcher zkw = TEST_UTIL.getMiniHBaseCluster().getMaster().getZooKeeper();
519    assertTrue("The acl znode for table should exist",  ZKUtil.checkExists(zkw, baseAclZNode +
520        table.getNameAsString()) != -1);
521    assertTrue("The acl znode for namespace should exist", ZKUtil.checkExists(zkw, baseAclZNode +
522        convertToNamespace(ns)) != -1);
523
524    revokeFromNamespace(TEST_UTIL, TESTGROUP1_USER1.getShortName(), ns, Action.ADMIN);
525    deleteTable(TEST_UTIL, table);
526    deleteNamespace(TEST_UTIL, ns);
527
528    assertTrue("The acl znode for table should have been deleted",
529        ZKUtil.checkExists(zkw, baseAclZNode + table.getNameAsString()) == -1);
530    assertTrue( "The acl znode for namespace should have been deleted",
531        ZKUtil.checkExists(zkw, baseAclZNode + convertToNamespace(ns)) == -1);
532  }
533}