001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.access;
019
020import static org.apache.hadoop.hbase.security.access.Permission.Action.READ;
021import static org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclController.SnapshotScannerHDFSAclStorage.hasUserTableHdfsAcl;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.fs.permission.FsPermission;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
035import org.apache.hadoop.hbase.security.User;
036import org.apache.hadoop.hbase.testclassification.LargeTests;
037import org.apache.hadoop.hbase.testclassification.SecurityTests;
038import org.junit.AfterClass;
039import org.junit.BeforeClass;
040import org.junit.ClassRule;
041import org.junit.Rule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044import org.junit.rules.TestName;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * Separated from {@link TestSnapshotScannerHDFSAclController}. Uses facility from that class.
050 * @see TestSnapshotScannerHDFSAclController
051 */
052@Category({ SecurityTests.class, LargeTests.class })
053public class TestSnapshotScannerHDFSAclController2 {
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056    HBaseClassTestRule.forClass(TestSnapshotScannerHDFSAclController2.class);
057  @Rule
058  public TestName name = new TestName();
059  private static final Logger LOG =
060    LoggerFactory.getLogger(TestSnapshotScannerHDFSAclController2.class);
061
062  private static final String UN_GRANT_USER = "un_grant_user";
063  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
064  private static Configuration conf = TEST_UTIL.getConfiguration();
065  private static Admin admin = null;
066  private static SnapshotScannerHDFSAclHelper helper;
067  private static Table aclTable;
068  private static FileSystem FS;
069
070  @BeforeClass
071  public static void setupBeforeClass() throws Exception {
072    // enable hdfs acl and set umask to 027
073    conf.setBoolean("dfs.namenode.acls.enabled", true);
074    conf.set("fs.permissions.umask-mode", "027");
075    // enable hbase hdfs acl feature
076    conf.setBoolean(SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE, true);
077    // enable secure
078    conf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
079    conf.set(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR,
080      SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR_DEFAULT);
081    SecureTestUtil.enableSecurity(conf);
082    // add SnapshotScannerHDFSAclController coprocessor
083    conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
084      conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY) + ","
085        + SnapshotScannerHDFSAclController.class.getName());
086
087    TEST_UTIL.startMiniCluster();
088    SnapshotScannerHDFSAclController coprocessor = TEST_UTIL.getHBaseCluster().getMaster()
089      .getMasterCoprocessorHost().findCoprocessor(SnapshotScannerHDFSAclController.class);
090    TEST_UTIL.waitFor(30000, () -> coprocessor.checkInitialized("check initialized"));
091    TEST_UTIL.waitTableAvailable(PermissionStorage.ACL_TABLE_NAME);
092
093    admin = TEST_UTIL.getAdmin();
094    Path rootDir = TEST_UTIL.getDefaultRootDirPath();
095    FS = rootDir.getFileSystem(conf);
096    User unGrantUser = User.createUserForTesting(conf, UN_GRANT_USER, new String[] {});
097    helper = new SnapshotScannerHDFSAclHelper(conf, admin.getConnection());
098
099    // set hbase directory permission
100    FsPermission commonDirectoryPermission =
101      new FsPermission(conf.get(SnapshotScannerHDFSAclHelper.COMMON_DIRECTORY_PERMISSION,
102        SnapshotScannerHDFSAclHelper.COMMON_DIRECTORY_PERMISSION_DEFAULT));
103    Path path = rootDir;
104    while (path != null) {
105      FS.setPermission(path, commonDirectoryPermission);
106      path = path.getParent();
107    }
108    // set restore directory permission
109    Path restoreDir = new Path(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR_DEFAULT);
110    if (!FS.exists(restoreDir)) {
111      FS.mkdirs(restoreDir);
112      FS.setPermission(restoreDir,
113        new FsPermission(
114          conf.get(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_DIRECTORY_PERMISSION,
115            SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT)));
116    }
117    path = restoreDir.getParent();
118    while (path != null) {
119      FS.setPermission(path, commonDirectoryPermission);
120      path = path.getParent();
121    }
122    aclTable = admin.getConnection().getTable(PermissionStorage.ACL_TABLE_NAME);
123  }
124
125  @AfterClass
126  public static void tearDownAfterClass() throws Exception {
127    TEST_UTIL.shutdownMiniCluster();
128  }
129
130  @Test
131  public void testRestoreSnapshot() throws Exception {
132    final String grantUserName = name.getMethodName();
133    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
134    String namespace = name.getMethodName();
135    TableName table = TableName.valueOf(namespace, name.getMethodName());
136    String snapshot = namespace + "s1";
137    String snapshot2 = namespace + "s2";
138    String snapshot3 = namespace + "s3";
139
140    LOG.info("Create {}", table);
141    try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, table)) {
142      TestHDFSAclHelper.put(t);
143      // grant t1, snapshot
144      LOG.info("Grant {}", table);
145      TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
146      admin.snapshot(snapshot, table);
147      // delete
148      admin.disableTable(table);
149      admin.deleteTable(table);
150      LOG.info("Before scan of shapshot! {}", table);
151      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
152
153      // restore snapshot and restore acl
154      admin.restoreSnapshot(snapshot, true, true);
155      TestHDFSAclHelper.put2(t);
156      // snapshot
157      admin.snapshot(snapshot2, table);
158      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
159      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 10);
160      assertTrue(hasUserTableHdfsAcl(aclTable, grantUserName, table));
161      TestSnapshotScannerHDFSAclController.checkUserAclEntry(FS,
162        helper.getTableRootPaths(table, false), grantUserName, true, true);
163
164      // delete
165      admin.disableTable(table);
166      admin.deleteTable(table);
167      // restore snapshot and skip restore acl
168      admin.restoreSnapshot(snapshot);
169      admin.snapshot(snapshot3, table);
170
171      LOG.info("CHECK");
172      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
173      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
174      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, -1);
175      assertFalse(hasUserTableHdfsAcl(aclTable, grantUserName, table));
176      TestSnapshotScannerHDFSAclController.checkUserAclEntry(FS,
177        helper.getPathHelper().getDataTableDir(table), grantUserName, false, false);
178      TestSnapshotScannerHDFSAclController.checkUserAclEntry(FS,
179        helper.getPathHelper().getArchiveTableDir(table), grantUserName, true, false);
180    }
181  }
182}