001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security.access;
020
021import static org.apache.hadoop.hbase.security.access.Permission.Action.READ;
022import static org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclController.SnapshotScannerHDFSAclStorage.hasUserTableHdfsAcl;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertTrue;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.fs.permission.FsPermission;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
035import org.apache.hadoop.hbase.security.User;
036import org.apache.hadoop.hbase.testclassification.LargeTests;
037import org.apache.hadoop.hbase.testclassification.SecurityTests;
038import org.junit.AfterClass;
039import org.junit.BeforeClass;
040import org.junit.ClassRule;
041import org.junit.Rule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044import org.junit.rules.TestName;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * Separated from {@link TestSnapshotScannerHDFSAclController}. Uses facility from that class.
050 * @see TestSnapshotScannerHDFSAclController
051 */
052@Category({ SecurityTests.class, LargeTests.class })
053public class TestSnapshotScannerHDFSAclController2 {
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056      HBaseClassTestRule.forClass(TestSnapshotScannerHDFSAclController2.class);
057  @Rule
058  public TestName name = new TestName();
059  private static final Logger LOG =
060      LoggerFactory.getLogger(TestSnapshotScannerHDFSAclController2.class);
061
062  private static final String UN_GRANT_USER = "un_grant_user";
063  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
064  private static Configuration conf = TEST_UTIL.getConfiguration();
065  private static Admin admin = null;
066  private static SnapshotScannerHDFSAclHelper helper;
067  private static Table aclTable;
068  private static FileSystem FS;
069
070  @BeforeClass
071  public static void setupBeforeClass() throws Exception {
072    // enable hdfs acl and set umask to 027
073    conf.setBoolean("dfs.namenode.acls.enabled", true);
074    conf.set("fs.permissions.umask-mode", "027");
075    // enable hbase hdfs acl feature
076    conf.setBoolean(SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE, true);
077    // enable secure
078    conf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
079    conf.set(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR,
080      SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR_DEFAULT);
081    SecureTestUtil.enableSecurity(conf);
082    // add SnapshotScannerHDFSAclController coprocessor
083    conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
084      conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY) + ","
085          + SnapshotScannerHDFSAclController.class.getName());
086
087    TEST_UTIL.startMiniCluster();
088    TEST_UTIL.waitTableAvailable(PermissionStorage.ACL_TABLE_NAME);
089    admin = TEST_UTIL.getAdmin();
090    Path rootDir = TEST_UTIL.getDefaultRootDirPath();
091    FS = rootDir.getFileSystem(conf);
092    User unGrantUser = User.createUserForTesting(conf, UN_GRANT_USER, new String[] {});
093    helper = new SnapshotScannerHDFSAclHelper(conf, admin.getConnection());
094
095    // set hbase directory permission
096    FsPermission commonDirectoryPermission =
097        new FsPermission(conf.get(SnapshotScannerHDFSAclHelper.COMMON_DIRECTORY_PERMISSION,
098          SnapshotScannerHDFSAclHelper.COMMON_DIRECTORY_PERMISSION_DEFAULT));
099    Path path = rootDir;
100    while (path != null) {
101      FS.setPermission(path, commonDirectoryPermission);
102      path = path.getParent();
103    }
104    // set restore directory permission
105    Path restoreDir = new Path(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR_DEFAULT);
106    if (!FS.exists(restoreDir)) {
107      FS.mkdirs(restoreDir);
108      FS.setPermission(restoreDir,
109        new FsPermission(
110            conf.get(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_DIRECTORY_PERMISSION,
111              SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT)));
112    }
113    path = restoreDir.getParent();
114    while (path != null) {
115      FS.setPermission(path, commonDirectoryPermission);
116      path = path.getParent();
117    }
118
119    SnapshotScannerHDFSAclController coprocessor = TEST_UTIL.getHBaseCluster().getMaster()
120        .getMasterCoprocessorHost().findCoprocessor(SnapshotScannerHDFSAclController.class);
121    TEST_UTIL.waitFor(1200000, () -> coprocessor.checkInitialized("check initialized"));
122    aclTable = admin.getConnection().getTable(PermissionStorage.ACL_TABLE_NAME);
123  }
124
125  @AfterClass
126  public static void tearDownAfterClass() throws Exception {
127    TEST_UTIL.shutdownMiniCluster();
128  }
129
130  @Test
131  public void testRestoreSnapshot() throws Exception {
132    final String grantUserName = name.getMethodName();
133    User grantUser = User.createUserForTesting(conf, grantUserName, new String[] {});
134    String namespace = name.getMethodName();
135    TableName table = TableName.valueOf(namespace, name.getMethodName());
136    String snapshot = namespace + "s1";
137    String snapshot2 = namespace + "s2";
138    String snapshot3 = namespace + "s3";
139
140    LOG.info("Create {}", table);
141    try (Table t = TestHDFSAclHelper.createTable(TEST_UTIL, table)) {
142      TestHDFSAclHelper.put(t);
143      // grant t1, snapshot
144      LOG.info("Grant {}", table);
145      TestHDFSAclHelper.grantOnTable(TEST_UTIL, grantUserName, table, READ);
146      admin.snapshot(snapshot, table);
147      // delete
148      admin.disableTable(table);
149      admin.deleteTable(table);
150      LOG.info("Before scan of shapshot! {}", table);
151      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
152
153      // restore snapshot and restore acl
154      admin.restoreSnapshot(snapshot, true, true);
155      TestHDFSAclHelper.put2(t);
156      // snapshot
157      admin.snapshot(snapshot2, table);
158      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, 6);
159      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, 10);
160      assertTrue(hasUserTableHdfsAcl(aclTable, grantUserName, table));
161      TestSnapshotScannerHDFSAclController.
162        checkUserAclEntry(FS, helper.getTableRootPaths(table, false),
163          grantUserName, true, true);
164
165      // delete
166      admin.disableTable(table);
167      admin.deleteTable(table);
168      // restore snapshot and skip restore acl
169      admin.restoreSnapshot(snapshot);
170      admin.snapshot(snapshot3, table);
171
172      LOG.info("CHECK");
173      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot, -1);
174      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot2, -1);
175      TestHDFSAclHelper.canUserScanSnapshot(TEST_UTIL, grantUser, snapshot3, -1);
176      assertFalse(hasUserTableHdfsAcl(aclTable, grantUserName, table));
177      TestSnapshotScannerHDFSAclController.
178        checkUserAclEntry(FS, helper.getPathHelper().getDataTableDir(table),
179          grantUserName, false, false);
180      TestSnapshotScannerHDFSAclController.
181        checkUserAclEntry(FS, helper.getPathHelper().getArchiveTableDir(table),
182          grantUserName, true, false);
183    }
184  }
185}