001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.access;
019
020import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
021import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
022import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
023import static org.apache.hadoop.fs.permission.AclEntryType.USER;
024import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
025
026import java.io.Closeable;
027import java.io.FileNotFoundException;
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Collections;
031import java.util.HashSet;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.concurrent.CompletableFuture;
036import java.util.concurrent.ExecutionException;
037import java.util.concurrent.ExecutorService;
038import java.util.concurrent.Executors;
039import java.util.stream.Collectors;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileStatus;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.fs.permission.AclEntry;
045import org.apache.hadoop.fs.permission.AclEntryScope;
046import org.apache.hadoop.fs.permission.FsPermission;
047import org.apache.hadoop.hbase.AuthUtil;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.NamespaceDescriptor;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.client.Admin;
052import org.apache.hadoop.hbase.client.Connection;
053import org.apache.hadoop.hbase.client.SnapshotDescription;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
056import org.apache.hadoop.hbase.mob.MobUtils;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
064import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
065import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
066import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
067
068/**
069 * A helper to modify or remove HBase granted user default and access HDFS ACLs over hFiles.
070 */
071@InterfaceAudience.Private
072public class SnapshotScannerHDFSAclHelper implements Closeable {
073  private static final Logger LOG = LoggerFactory.getLogger(SnapshotScannerHDFSAclHelper.class);
074
075  public static final String ACL_SYNC_TO_HDFS_ENABLE = "hbase.acl.sync.to.hdfs.enable";
076  public static final String ACL_SYNC_TO_HDFS_THREAD_NUMBER =
077    "hbase.acl.sync.to.hdfs.thread.number";
078  // The tmp directory to restore snapshot, it can not be a sub directory of HBase root dir
079  public static final String SNAPSHOT_RESTORE_TMP_DIR = "hbase.snapshot.restore.tmp.dir";
080  public static final String SNAPSHOT_RESTORE_TMP_DIR_DEFAULT =
081    "/hbase/.tmpdir-to-restore-snapshot";
082  // The default permission of the common directories if the feature is enabled.
083  public static final String COMMON_DIRECTORY_PERMISSION =
084    "hbase.acl.sync.to.hdfs.common.directory.permission";
085  // The secure HBase permission is 700, 751 means all others have execute access and the mask is
086  // set to read-execute to make the extended access ACL entries can work. Be cautious to set
087  // this value.
088  public static final String COMMON_DIRECTORY_PERMISSION_DEFAULT = "751";
089  // The default permission of the snapshot restore directories if the feature is enabled.
090  public static final String SNAPSHOT_RESTORE_DIRECTORY_PERMISSION =
091    "hbase.acl.sync.to.hdfs.restore.directory.permission";
092  // 753 means all others have write-execute access.
093  public static final String SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT = "753";
094
095  private Admin admin;
096  private final Configuration conf;
097  private FileSystem fs;
098  private PathHelper pathHelper;
099  private ExecutorService pool;
100
101  public SnapshotScannerHDFSAclHelper(Configuration configuration, Connection connection)
102    throws IOException {
103    this.conf = configuration;
104    this.pathHelper = new PathHelper(conf);
105    this.fs = pathHelper.getFileSystem();
106    this.pool = Executors.newFixedThreadPool(conf.getInt(ACL_SYNC_TO_HDFS_THREAD_NUMBER, 10),
107      new ThreadFactoryBuilder().setNameFormat("hdfs-acl-thread-%d").setDaemon(true).build());
108    this.admin = connection.getAdmin();
109  }
110
111  @Override
112  public void close() {
113    if (pool != null) {
114      pool.shutdown();
115    }
116    admin.close();
117  }
118
119  public void setCommonDirectoryPermission() throws IOException {
120    // Set public directory permission to 751 to make all users have access permission.
121    // And we also need the access permission of the parent of HBase root directory, but
122    // it's not set here, because the owner of HBase root directory may don't own permission
123    // to change it's parent permission to 751.
124    // The {root/.tmp} and {root/.tmp/data} directories are created to make global user HDFS
125    // ACLs can be inherited.
126    List<Path> paths = Lists.newArrayList(pathHelper.getRootDir(), pathHelper.getMobDir(),
127      pathHelper.getTmpDir(), pathHelper.getArchiveDir());
128    paths.addAll(getGlobalRootPaths());
129    for (Path path : paths) {
130      createDirIfNotExist(path);
131      fs.setPermission(path, new FsPermission(
132        conf.get(COMMON_DIRECTORY_PERMISSION, COMMON_DIRECTORY_PERMISSION_DEFAULT)));
133    }
134    // create snapshot restore directory
135    Path restoreDir =
136      new Path(conf.get(SNAPSHOT_RESTORE_TMP_DIR, SNAPSHOT_RESTORE_TMP_DIR_DEFAULT));
137    createDirIfNotExist(restoreDir);
138    fs.setPermission(restoreDir, new FsPermission(conf.get(SNAPSHOT_RESTORE_DIRECTORY_PERMISSION,
139      SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT)));
140  }
141
142  /**
143   * Set acl when grant user permission
144   * @param userPermission the user and permission
145   * @param skipNamespaces the namespace set to skip set acl because already set
146   * @param skipTables     the table set to skip set acl because already set
147   * @return false if an error occurred, otherwise true
148   */
149  public boolean grantAcl(UserPermission userPermission, Set<String> skipNamespaces,
150    Set<TableName> skipTables) {
151    try {
152      long start = EnvironmentEdgeManager.currentTime();
153      handleGrantOrRevokeAcl(userPermission, HDFSAclOperation.OperationType.MODIFY, skipNamespaces,
154        skipTables);
155      LOG.info("Set HDFS acl when grant {}, skipNamespaces: {}, skipTables: {}, cost {} ms",
156        userPermission, skipNamespaces, skipTables, EnvironmentEdgeManager.currentTime() - start);
157      return true;
158    } catch (Exception e) {
159      LOG.error("Set HDFS acl error when grant: {}, skipNamespaces: {}, skipTables: {}",
160        userPermission, skipNamespaces, skipTables, e);
161      return false;
162    }
163  }
164
165  /**
166   * Remove acl when grant or revoke user permission
167   * @param userPermission the user and permission
168   * @param skipNamespaces the namespace set to skip remove acl
169   * @param skipTables     the table set to skip remove acl
170   * @return false if an error occurred, otherwise true
171   */
172  public boolean revokeAcl(UserPermission userPermission, Set<String> skipNamespaces,
173    Set<TableName> skipTables) {
174    try {
175      long start = EnvironmentEdgeManager.currentTime();
176      handleGrantOrRevokeAcl(userPermission, HDFSAclOperation.OperationType.REMOVE, skipNamespaces,
177        skipTables);
178      LOG.info("Set HDFS acl when revoke {}, skipNamespaces: {}, skipTables: {}, cost {} ms",
179        userPermission, skipNamespaces, skipTables, EnvironmentEdgeManager.currentTime() - start);
180      return true;
181    } catch (Exception e) {
182      LOG.error("Set HDFS acl error when revoke: {}, skipNamespaces: {}, skipTables: {}",
183        userPermission, skipNamespaces, skipTables, e);
184      return false;
185    }
186  }
187
188  /**
189   * Set acl when take a snapshot
190   * @param snapshot the snapshot desc
191   * @return false if an error occurred, otherwise true
192   */
193  public boolean snapshotAcl(SnapshotDescription snapshot) {
194    try {
195      long start = EnvironmentEdgeManager.currentTime();
196      TableName tableName = snapshot.getTableName();
197      // global user permission can be inherited from default acl automatically
198      Set<String> userSet = getUsersWithTableReadAction(tableName, true, false);
199      if (userSet.size() > 0) {
200        Path path = pathHelper.getSnapshotDir(snapshot.getName());
201        handleHDFSAcl(new HDFSAclOperation(fs, path, userSet, HDFSAclOperation.OperationType.MODIFY,
202          true, HDFSAclOperation.AclType.DEFAULT_ADN_ACCESS)).get();
203      }
204      LOG.info("Set HDFS acl when snapshot {}, cost {} ms", snapshot.getName(),
205        EnvironmentEdgeManager.currentTime() - start);
206      return true;
207    } catch (Exception e) {
208      LOG.error("Set HDFS acl error when snapshot {}", snapshot, e);
209      return false;
210    }
211  }
212
213  /**
214   * Remove table access acl from namespace dir when delete table
215   * @param tableName   the table
216   * @param removeUsers the users whose access acl will be removed
217   * @return false if an error occurred, otherwise true
218   */
219  public boolean removeNamespaceAccessAcl(TableName tableName, Set<String> removeUsers,
220    String operation) {
221    try {
222      long start = EnvironmentEdgeManager.currentTime();
223      if (removeUsers.size() > 0) {
224        handleNamespaceAccessAcl(tableName.getNamespaceAsString(), removeUsers,
225          HDFSAclOperation.OperationType.REMOVE);
226      }
227      LOG.info("Remove HDFS acl when {} table {}, cost {} ms", operation, tableName,
228        EnvironmentEdgeManager.currentTime() - start);
229      return true;
230    } catch (Exception e) {
231      LOG.error("Remove HDFS acl error when {} table {}", operation, tableName, e);
232      return false;
233    }
234  }
235
236  /**
237   * Remove default acl from namespace archive dir when delete namespace
238   * @param namespace   the namespace
239   * @param removeUsers the users whose default acl will be removed
240   * @return false if an error occurred, otherwise true
241   */
242  public boolean removeNamespaceDefaultAcl(String namespace, Set<String> removeUsers) {
243    try {
244      long start = EnvironmentEdgeManager.currentTime();
245      Path archiveNsDir = pathHelper.getArchiveNsDir(namespace);
246      HDFSAclOperation operation = new HDFSAclOperation(fs, archiveNsDir, removeUsers,
247        HDFSAclOperation.OperationType.REMOVE, false, HDFSAclOperation.AclType.DEFAULT);
248      operation.handleAcl();
249      LOG.info("Remove HDFS acl when delete namespace {}, cost {} ms", namespace,
250        EnvironmentEdgeManager.currentTime() - start);
251      return true;
252    } catch (Exception e) {
253      LOG.error("Remove HDFS acl error when delete namespace {}", namespace, e);
254      return false;
255    }
256  }
257
258  /**
259   * Remove default acl from table archive dir when delete table
260   * @param tableName   the table name
261   * @param removeUsers the users whose default acl will be removed
262   * @return false if an error occurred, otherwise true
263   */
264  public boolean removeTableDefaultAcl(TableName tableName, Set<String> removeUsers) {
265    try {
266      long start = EnvironmentEdgeManager.currentTime();
267      Path archiveTableDir = pathHelper.getArchiveTableDir(tableName);
268      HDFSAclOperation operation = new HDFSAclOperation(fs, archiveTableDir, removeUsers,
269        HDFSAclOperation.OperationType.REMOVE, false, HDFSAclOperation.AclType.DEFAULT);
270      operation.handleAcl();
271      LOG.info("Remove HDFS acl when delete table {}, cost {} ms", tableName,
272        EnvironmentEdgeManager.currentTime() - start);
273      return true;
274    } catch (Exception e) {
275      LOG.error("Remove HDFS acl error when delete table {}", tableName, e);
276      return false;
277    }
278  }
279
280  /**
281   * Add table user acls
282   * @param tableName the table
283   * @param users     the table users with READ permission
284   * @return false if an error occurred, otherwise true
285   */
286  public boolean addTableAcl(TableName tableName, Set<String> users, String operation) {
287    try {
288      long start = EnvironmentEdgeManager.currentTime();
289      if (users.size() > 0) {
290        HDFSAclOperation.OperationType operationType = HDFSAclOperation.OperationType.MODIFY;
291        handleNamespaceAccessAcl(tableName.getNamespaceAsString(), users, operationType);
292        handleTableAcl(Sets.newHashSet(tableName), users, new HashSet<>(0), new HashSet<>(0),
293          operationType);
294      }
295      LOG.info("Set HDFS acl when {} table {}, cost {} ms", operation, tableName,
296        EnvironmentEdgeManager.currentTime() - start);
297      return true;
298    } catch (Exception e) {
299      LOG.error("Set HDFS acl error when {} table {}", operation, tableName, e);
300      return false;
301    }
302  }
303
304  /**
305   * Remove table acls when modify table
306   * @param tableName the table
307   * @param users     the table users with READ permission
308   * @return false if an error occurred, otherwise true
309   */
310  public boolean removeTableAcl(TableName tableName, Set<String> users) {
311    try {
312      long start = EnvironmentEdgeManager.currentTime();
313      if (users.size() > 0) {
314        handleTableAcl(Sets.newHashSet(tableName), users, new HashSet<>(0), new HashSet<>(0),
315          HDFSAclOperation.OperationType.REMOVE);
316      }
317      LOG.info("Set HDFS acl when create or modify table {}, cost {} ms", tableName,
318        EnvironmentEdgeManager.currentTime() - start);
319      return true;
320    } catch (Exception e) {
321      LOG.error("Set HDFS acl error when create or modify table {}", tableName, e);
322      return false;
323    }
324  }
325
326  private void handleGrantOrRevokeAcl(UserPermission userPermission,
327    HDFSAclOperation.OperationType operationType, Set<String> skipNamespaces,
328    Set<TableName> skipTables) throws ExecutionException, InterruptedException, IOException {
329    Set<String> users = Sets.newHashSet(userPermission.getUser());
330    switch (userPermission.getAccessScope()) {
331      case GLOBAL:
332        handleGlobalAcl(users, skipNamespaces, skipTables, operationType);
333        break;
334      case NAMESPACE:
335        NamespacePermission namespacePermission =
336          (NamespacePermission) userPermission.getPermission();
337        handleNamespaceAcl(Sets.newHashSet(namespacePermission.getNamespace()), users,
338          skipNamespaces, skipTables, operationType);
339        break;
340      case TABLE:
341        TablePermission tablePermission = (TablePermission) userPermission.getPermission();
342        handleNamespaceAccessAcl(tablePermission.getNamespace(), users, operationType);
343        handleTableAcl(Sets.newHashSet(tablePermission.getTableName()), users, skipNamespaces,
344          skipTables, operationType);
345        break;
346      default:
347        throw new IllegalArgumentException(
348          "Illegal user permission scope " + userPermission.getAccessScope());
349    }
350  }
351
352  private void handleGlobalAcl(Set<String> users, Set<String> skipNamespaces,
353    Set<TableName> skipTables, HDFSAclOperation.OperationType operationType)
354    throws ExecutionException, InterruptedException, IOException {
355    // handle global root directories HDFS acls
356    List<HDFSAclOperation> hdfsAclOperations =
357      getGlobalRootPaths().stream().map(path -> new HDFSAclOperation(fs, path, users, operationType,
358        false, HDFSAclOperation.AclType.DEFAULT_ADN_ACCESS)).collect(Collectors.toList());
359    handleHDFSAclParallel(hdfsAclOperations).get();
360    // handle namespace HDFS acls
361    handleNamespaceAcl(Sets.newHashSet(admin.listNamespaces()), users, skipNamespaces, skipTables,
362      operationType);
363  }
364
365  private void handleNamespaceAcl(Set<String> namespaces, Set<String> users,
366    Set<String> skipNamespaces, Set<TableName> skipTables,
367    HDFSAclOperation.OperationType operationType)
368    throws ExecutionException, InterruptedException, IOException {
369    namespaces.removeAll(skipNamespaces);
370    namespaces.remove(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR);
371    // handle namespace root directories HDFS acls
372    List<HDFSAclOperation> hdfsAclOperations = new ArrayList<>();
373    Set<String> skipTableNamespaces =
374      skipTables.stream().map(TableName::getNamespaceAsString).collect(Collectors.toSet());
375    for (String ns : namespaces) {
376      /**
377       * When op is REMOVE, remove the DEFAULT namespace ACL while keep the ACCESS for skipTables,
378       * otherwise remove both the DEFAULT + ACCESS ACLs. When op is MODIFY, just operate the
379       * DEFAULT + ACCESS ACLs.
380       */
381      HDFSAclOperation.OperationType op = operationType;
382      HDFSAclOperation.AclType aclType = HDFSAclOperation.AclType.DEFAULT_ADN_ACCESS;
383      if (
384        operationType == HDFSAclOperation.OperationType.REMOVE && skipTableNamespaces.contains(ns)
385      ) {
386        // remove namespace directories default HDFS acls for skip tables
387        op = HDFSAclOperation.OperationType.REMOVE;
388        aclType = HDFSAclOperation.AclType.DEFAULT;
389      }
390      for (Path path : getNamespaceRootPaths(ns)) {
391        hdfsAclOperations.add(new HDFSAclOperation(fs, path, users, op, false, aclType));
392      }
393    }
394    handleHDFSAclParallel(hdfsAclOperations).get();
395    // handle table directories HDFS acls
396    Set<TableName> tables = new HashSet<>();
397    for (String namespace : namespaces) {
398      tables.addAll(admin.listTableDescriptorsByNamespace(Bytes.toBytes(namespace)).stream()
399        .filter(this::isAclSyncToHdfsEnabled).map(TableDescriptor::getTableName)
400        .collect(Collectors.toSet()));
401    }
402    handleTableAcl(tables, users, skipNamespaces, skipTables, operationType);
403  }
404
405  private void handleTableAcl(Set<TableName> tableNames, Set<String> users,
406    Set<String> skipNamespaces, Set<TableName> skipTables,
407    HDFSAclOperation.OperationType operationType)
408    throws ExecutionException, InterruptedException, IOException {
409    Set<TableName> filterTableNames = new HashSet<>();
410    for (TableName tableName : tableNames) {
411      if (
412        !skipTables.contains(tableName)
413          && !skipNamespaces.contains(tableName.getNamespaceAsString())
414      ) {
415        filterTableNames.add(tableName);
416      }
417    }
418    List<CompletableFuture<Void>> futures = new ArrayList<>();
419    // handle table HDFS acls
420    for (TableName tableName : filterTableNames) {
421      List<HDFSAclOperation> hdfsAclOperations = getTableRootPaths(tableName, true).stream()
422        .map(path -> new HDFSAclOperation(fs, path, users, operationType, true,
423          HDFSAclOperation.AclType.DEFAULT_ADN_ACCESS))
424        .collect(Collectors.toList());
425      CompletableFuture<Void> future = handleHDFSAclSequential(hdfsAclOperations);
426      futures.add(future);
427    }
428    CompletableFuture<Void> future =
429      CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
430    future.get();
431  }
432
433  private void handleNamespaceAccessAcl(String namespace, Set<String> users,
434    HDFSAclOperation.OperationType operationType) throws ExecutionException, InterruptedException {
435    // handle namespace access HDFS acls
436    List<HDFSAclOperation> hdfsAclOperations =
437      getNamespaceRootPaths(namespace).stream().map(path -> new HDFSAclOperation(fs, path, users,
438        operationType, false, HDFSAclOperation.AclType.ACCESS)).collect(Collectors.toList());
439    CompletableFuture<Void> future = handleHDFSAclParallel(hdfsAclOperations);
440    future.get();
441  }
442
443  void createTableDirectories(TableName tableName) throws IOException {
444    List<Path> paths = getTableRootPaths(tableName, false);
445    for (Path path : paths) {
446      createDirIfNotExist(path);
447    }
448  }
449
450  /**
451   * return paths that user will global permission will visit
452   * @return the path list
453   */
454  List<Path> getGlobalRootPaths() {
455    return Lists.newArrayList(pathHelper.getTmpDataDir(), pathHelper.getDataDir(),
456      pathHelper.getMobDataDir(), pathHelper.getArchiveDataDir(), pathHelper.getSnapshotRootDir());
457  }
458
459  /**
460   * return paths that user will namespace permission will visit
461   * @param namespace the namespace
462   * @return the path list
463   */
464  List<Path> getNamespaceRootPaths(String namespace) {
465    return Lists.newArrayList(pathHelper.getTmpNsDir(namespace), pathHelper.getDataNsDir(namespace),
466      pathHelper.getMobDataNsDir(namespace), pathHelper.getArchiveNsDir(namespace));
467  }
468
469  /**
470   * return paths that user will table permission will visit
471   * @param tableName           the table
472   * @param includeSnapshotPath true if return table snapshots paths, otherwise false
473   * @return the path list
474   * @throws IOException if an error occurred
475   */
476  List<Path> getTableRootPaths(TableName tableName, boolean includeSnapshotPath)
477    throws IOException {
478    List<Path> paths = Lists.newArrayList(pathHelper.getDataTableDir(tableName),
479      pathHelper.getMobTableDir(tableName), pathHelper.getArchiveTableDir(tableName));
480    if (includeSnapshotPath) {
481      paths.addAll(getTableSnapshotPaths(tableName));
482    }
483    return paths;
484  }
485
486  private List<Path> getTableSnapshotPaths(TableName tableName) throws IOException {
487    return admin.listSnapshots().stream()
488      .filter(snapDesc -> snapDesc.getTableName().equals(tableName))
489      .map(snapshotDescription -> pathHelper.getSnapshotDir(snapshotDescription.getName()))
490      .collect(Collectors.toList());
491  }
492
493  /**
494   * Return users with global read permission
495   * @return users with global read permission
496   * @throws IOException if an error occurred
497   */
498  private Set<String> getUsersWithGlobalReadAction() throws IOException {
499    return getUsersWithReadAction(PermissionStorage.getGlobalPermissions(conf));
500  }
501
502  /**
503   * Return users with namespace read permission
504   * @param namespace     the namespace
505   * @param includeGlobal true if include users with global read action
506   * @return users with namespace read permission
507   * @throws IOException if an error occurred
508   */
509  Set<String> getUsersWithNamespaceReadAction(String namespace, boolean includeGlobal)
510    throws IOException {
511    Set<String> users =
512      getUsersWithReadAction(PermissionStorage.getNamespacePermissions(conf, namespace));
513    if (includeGlobal) {
514      users.addAll(getUsersWithGlobalReadAction());
515    }
516    return users;
517  }
518
519  /**
520   * Return users with table read permission
521   * @param tableName        the table
522   * @param includeNamespace true if include users with namespace read action
523   * @param includeGlobal    true if include users with global read action
524   * @return users with table read permission
525   * @throws IOException if an error occurred
526   */
527  Set<String> getUsersWithTableReadAction(TableName tableName, boolean includeNamespace,
528    boolean includeGlobal) throws IOException {
529    Set<String> users =
530      getUsersWithReadAction(PermissionStorage.getTablePermissions(conf, tableName));
531    if (includeNamespace) {
532      users
533        .addAll(getUsersWithNamespaceReadAction(tableName.getNamespaceAsString(), includeGlobal));
534    }
535    return users;
536  }
537
538  private Set<String>
539    getUsersWithReadAction(ListMultimap<String, UserPermission> permissionMultimap) {
540    return permissionMultimap.entries().stream()
541      .filter(entry -> checkUserPermission(entry.getValue())).map(Map.Entry::getKey)
542      .collect(Collectors.toSet());
543  }
544
545  private boolean checkUserPermission(UserPermission userPermission) {
546    boolean result = containReadAction(userPermission);
547    if (result && userPermission.getPermission() instanceof TablePermission) {
548      result = isNotFamilyOrQualifierPermission((TablePermission) userPermission.getPermission());
549    }
550    return result;
551  }
552
553  boolean containReadAction(UserPermission userPermission) {
554    return userPermission.getPermission().implies(Permission.Action.READ);
555  }
556
557  boolean isNotFamilyOrQualifierPermission(TablePermission tablePermission) {
558    return !tablePermission.hasFamily() && !tablePermission.hasQualifier();
559  }
560
561  public static boolean isAclSyncToHdfsEnabled(Configuration conf) {
562    String[] masterCoprocessors = conf.getStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
563    Set<String> masterCoprocessorSet = new HashSet<>();
564    if (masterCoprocessors != null) {
565      Collections.addAll(masterCoprocessorSet, masterCoprocessors);
566    }
567    return conf.getBoolean(SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE, false)
568      && masterCoprocessorSet.contains(SnapshotScannerHDFSAclController.class.getName())
569      && masterCoprocessorSet.contains(AccessController.class.getName());
570  }
571
572  boolean isAclSyncToHdfsEnabled(TableDescriptor tableDescriptor) {
573    return tableDescriptor == null
574      ? false
575      : Boolean.valueOf(tableDescriptor.getValue(ACL_SYNC_TO_HDFS_ENABLE));
576  }
577
578  PathHelper getPathHelper() {
579    return pathHelper;
580  }
581
582  private CompletableFuture<Void> handleHDFSAcl(HDFSAclOperation acl) {
583    return CompletableFuture.supplyAsync(() -> {
584      List<HDFSAclOperation> childAclOperations = new ArrayList<>();
585      try {
586        acl.handleAcl();
587        childAclOperations = acl.getChildAclOperations();
588      } catch (FileNotFoundException e) {
589        // Skip handle acl if file not found
590      } catch (IOException e) {
591        LOG.error("Set HDFS acl error for path {}", acl.path, e);
592      }
593      return childAclOperations;
594    }, pool).thenComposeAsync(this::handleHDFSAclParallel, pool);
595  }
596
597  private CompletableFuture<Void> handleHDFSAclSequential(List<HDFSAclOperation> operations) {
598    return CompletableFuture.supplyAsync(() -> {
599      try {
600        for (HDFSAclOperation hdfsAclOperation : operations) {
601          handleHDFSAcl(hdfsAclOperation).get();
602        }
603      } catch (InterruptedException | ExecutionException e) {
604        LOG.error("Set HDFS acl error", e);
605      }
606      return null;
607    }, pool);
608  }
609
610  private CompletableFuture<Void> handleHDFSAclParallel(List<HDFSAclOperation> operations) {
611    List<CompletableFuture<Void>> futures =
612      operations.stream().map(this::handleHDFSAcl).collect(Collectors.toList());
613    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
614  }
615
616  private static AclEntry aclEntry(AclEntryScope scope, String name) {
617    return new AclEntry.Builder().setScope(scope)
618      .setType(AuthUtil.isGroupPrincipal(name) ? GROUP : USER).setName(name)
619      .setPermission(READ_EXECUTE).build();
620  }
621
622  void createDirIfNotExist(Path path) throws IOException {
623    if (!fs.exists(path)) {
624      fs.mkdirs(path);
625    }
626  }
627
628  void deleteEmptyDir(Path path) throws IOException {
629    if (fs.exists(path) && fs.listStatus(path).length == 0) {
630      fs.delete(path, false);
631    }
632  }
633
634  /**
635   * Inner class used to describe modify or remove what type of acl entries(ACCESS, DEFAULT,
636   * ACCESS_AND_DEFAULT) for files or directories(and child files).
637   */
638  private static class HDFSAclOperation {
639    enum OperationType {
640      MODIFY,
641      REMOVE
642    }
643
644    enum AclType {
645      ACCESS,
646      DEFAULT,
647      DEFAULT_ADN_ACCESS
648    }
649
650    private interface Operation {
651      void apply(FileSystem fs, Path path, List<AclEntry> aclList) throws IOException;
652    }
653
654    private FileSystem fs;
655    private Path path;
656    private Operation operation;
657    private boolean recursive;
658    private AclType aclType;
659    private List<AclEntry> defaultAndAccessAclEntries;
660    private List<AclEntry> accessAclEntries;
661    private List<AclEntry> defaultAclEntries;
662
663    HDFSAclOperation(FileSystem fs, Path path, Set<String> users, OperationType operationType,
664      boolean recursive, AclType aclType) {
665      this.fs = fs;
666      this.path = path;
667      this.defaultAndAccessAclEntries = getAclEntries(AclType.DEFAULT_ADN_ACCESS, users);
668      this.accessAclEntries = getAclEntries(AclType.ACCESS, users);
669      this.defaultAclEntries = getAclEntries(AclType.DEFAULT, users);
670      if (operationType == OperationType.MODIFY) {
671        operation = FileSystem::modifyAclEntries;
672      } else if (operationType == OperationType.REMOVE) {
673        operation = FileSystem::removeAclEntries;
674      } else {
675        throw new IllegalArgumentException("Illegal HDFS acl operation type: " + operationType);
676      }
677      this.recursive = recursive;
678      this.aclType = aclType;
679    }
680
681    HDFSAclOperation(Path path, HDFSAclOperation parent) {
682      this.fs = parent.fs;
683      this.path = path;
684      this.defaultAndAccessAclEntries = parent.defaultAndAccessAclEntries;
685      this.accessAclEntries = parent.accessAclEntries;
686      this.defaultAclEntries = parent.defaultAclEntries;
687      this.operation = parent.operation;
688      this.recursive = parent.recursive;
689      this.aclType = parent.aclType;
690    }
691
692    List<HDFSAclOperation> getChildAclOperations() throws IOException {
693      List<HDFSAclOperation> hdfsAclOperations = new ArrayList<>();
694      if (recursive && fs.isDirectory(path)) {
695        FileStatus[] fileStatuses = fs.listStatus(path);
696        for (FileStatus fileStatus : fileStatuses) {
697          hdfsAclOperations.add(new HDFSAclOperation(fileStatus.getPath(), this));
698        }
699      }
700      return hdfsAclOperations;
701    }
702
703    void handleAcl() throws IOException {
704      if (fs.exists(path)) {
705        if (fs.isDirectory(path)) {
706          switch (aclType) {
707            case ACCESS:
708              operation.apply(fs, path, accessAclEntries);
709              break;
710            case DEFAULT:
711              operation.apply(fs, path, defaultAclEntries);
712              break;
713            case DEFAULT_ADN_ACCESS:
714              operation.apply(fs, path, defaultAndAccessAclEntries);
715              break;
716            default:
717              throw new IllegalArgumentException("Illegal HDFS acl type: " + aclType);
718          }
719        } else {
720          operation.apply(fs, path, accessAclEntries);
721        }
722      }
723    }
724
725    private List<AclEntry> getAclEntries(AclType aclType, Set<String> users) {
726      List<AclEntry> aclEntries = new ArrayList<>();
727      switch (aclType) {
728        case ACCESS:
729          for (String user : users) {
730            aclEntries.add(aclEntry(ACCESS, user));
731          }
732          break;
733        case DEFAULT:
734          for (String user : users) {
735            aclEntries.add(aclEntry(DEFAULT, user));
736          }
737          break;
738        case DEFAULT_ADN_ACCESS:
739          for (String user : users) {
740            aclEntries.add(aclEntry(ACCESS, user));
741            aclEntries.add(aclEntry(DEFAULT, user));
742          }
743          break;
744        default:
745          throw new IllegalArgumentException("Illegal HDFS acl type: " + aclType);
746      }
747      return aclEntries;
748    }
749  }
750
751  static final class PathHelper {
752    Configuration conf;
753    Path rootDir;
754    Path tmpDataDir;
755    Path dataDir;
756    Path mobDataDir;
757    Path archiveDataDir;
758    Path snapshotDir;
759
760    PathHelper(Configuration conf) {
761      this.conf = conf;
762      rootDir = new Path(conf.get(HConstants.HBASE_DIR));
763      tmpDataDir =
764        new Path(new Path(rootDir, HConstants.HBASE_TEMP_DIRECTORY), HConstants.BASE_NAMESPACE_DIR);
765      dataDir = new Path(rootDir, HConstants.BASE_NAMESPACE_DIR);
766      mobDataDir = new Path(MobUtils.getMobHome(rootDir), HConstants.BASE_NAMESPACE_DIR);
767      archiveDataDir = new Path(new Path(rootDir, HConstants.HFILE_ARCHIVE_DIRECTORY),
768        HConstants.BASE_NAMESPACE_DIR);
769      snapshotDir = new Path(rootDir, HConstants.SNAPSHOT_DIR_NAME);
770    }
771
772    Path getRootDir() {
773      return rootDir;
774    }
775
776    Path getDataDir() {
777      return dataDir;
778    }
779
780    Path getMobDir() {
781      return mobDataDir.getParent();
782    }
783
784    Path getMobDataDir() {
785      return mobDataDir;
786    }
787
788    Path getTmpDir() {
789      return new Path(rootDir, HConstants.HBASE_TEMP_DIRECTORY);
790    }
791
792    Path getTmpDataDir() {
793      return tmpDataDir;
794    }
795
796    Path getArchiveDir() {
797      return new Path(rootDir, HConstants.HFILE_ARCHIVE_DIRECTORY);
798    }
799
800    Path getArchiveDataDir() {
801      return archiveDataDir;
802    }
803
804    Path getDataNsDir(String namespace) {
805      return new Path(dataDir, namespace);
806    }
807
808    Path getMobDataNsDir(String namespace) {
809      return new Path(mobDataDir, namespace);
810    }
811
812    Path getDataTableDir(TableName tableName) {
813      return new Path(getDataNsDir(tableName.getNamespaceAsString()),
814        tableName.getQualifierAsString());
815    }
816
817    Path getMobTableDir(TableName tableName) {
818      return new Path(getMobDataNsDir(tableName.getNamespaceAsString()),
819        tableName.getQualifierAsString());
820    }
821
822    Path getArchiveNsDir(String namespace) {
823      return new Path(archiveDataDir, namespace);
824    }
825
826    Path getArchiveTableDir(TableName tableName) {
827      return new Path(getArchiveNsDir(tableName.getNamespaceAsString()),
828        tableName.getQualifierAsString());
829    }
830
831    Path getTmpNsDir(String namespace) {
832      return new Path(tmpDataDir, namespace);
833    }
834
835    Path getTmpTableDir(TableName tableName) {
836      return new Path(getTmpNsDir(tableName.getNamespaceAsString()),
837        tableName.getQualifierAsString());
838    }
839
840    Path getSnapshotRootDir() {
841      return snapshotDir;
842    }
843
844    Path getSnapshotDir(String snapshot) {
845      return new Path(snapshotDir, snapshot);
846    }
847
848    FileSystem getFileSystem() throws IOException {
849      return rootDir.getFileSystem(conf);
850    }
851  }
852}