001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.access;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Optional;
026import java.util.Set;
027import java.util.stream.Collectors;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HBaseInterfaceAudience;
033import org.apache.hadoop.hbase.NamespaceDescriptor;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.TableNotFoundException;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.Delete;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.RegionInfo;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.ResultScanner;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.SnapshotDescription;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
051import org.apache.hadoop.hbase.coprocessor.HasMasterServices;
052import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
053import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
054import org.apache.hadoop.hbase.coprocessor.MasterObserver;
055import org.apache.hadoop.hbase.coprocessor.ObserverContext;
056import org.apache.hadoop.hbase.master.MasterServices;
057import org.apache.hadoop.hbase.security.User;
058import org.apache.hadoop.hbase.security.UserProvider;
059import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper.PathHelper;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.yetus.audience.InterfaceAudience;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
067
068/**
069 * Set HDFS ACLs to hFiles to make HBase granted users have permission to scan snapshot
070 * <p>
071 * To use this feature, please mask sure HDFS config:
072 * <ul>
073 * <li>dfs.namenode.acls.enabled = true</li>
074 * <li>fs.permissions.umask-mode = 027 (or smaller umask than 027)</li>
075 * </ul>
076 * </p>
077 * <p>
078 * The implementation of this feature is as followings:
079 * <ul>
080 * <li>For common directories such as 'data' and 'archive', set other permission to '--x' to make
081 * everyone have the permission to access the directory.</li>
082 * <li>For namespace or table directories such as 'data/ns/table', 'archive/ns/table' and
083 * '.hbase-snapshot/snapshotName', set user 'r-x' access acl and 'r-x' default acl when following
084 * operations happen:
085 * <ul>
086 * <li>grant user with global, namespace or table permission;</li>
087 * <li>revoke user from global, namespace or table;</li>
088 * <li>snapshot table;</li>
089 * <li>truncate table;</li>
090 * </ul>
091 * </li>
092 * <li>Note: Because snapshots are at table level, so this feature just considers users with global,
093 * namespace or table permissions, ignores users with table CF or cell permissions.</li>
094 * </ul>
095 * </p>
096 */
097@CoreCoprocessor
098@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
099public class SnapshotScannerHDFSAclController implements MasterCoprocessor, MasterObserver {
100  private static final Logger LOG = LoggerFactory.getLogger(SnapshotScannerHDFSAclController.class);
101
102  private SnapshotScannerHDFSAclHelper hdfsAclHelper = null;
103  private PathHelper pathHelper = null;
104  private MasterServices masterServices = null;
105  private volatile boolean initialized = false;
106  private volatile boolean aclTableInitialized = false;
107  /** Provider for mapping principal names to Users */
108  private UserProvider userProvider;
109
110  @Override
111  public Optional<MasterObserver> getMasterObserver() {
112    return Optional.of(this);
113  }
114
115  @Override
116  public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> c)
117    throws IOException {
118    if (
119      c.getEnvironment().getConfiguration()
120        .getBoolean(SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE, false)
121    ) {
122      MasterCoprocessorEnvironment mEnv = c.getEnvironment();
123      if (!(mEnv instanceof HasMasterServices)) {
124        throw new IOException("Does not implement HMasterServices");
125      }
126      masterServices = ((HasMasterServices) mEnv).getMasterServices();
127      hdfsAclHelper = new SnapshotScannerHDFSAclHelper(masterServices.getConfiguration(),
128        masterServices.getConnection());
129      pathHelper = hdfsAclHelper.getPathHelper();
130      hdfsAclHelper.setCommonDirectoryPermission();
131      initialized = true;
132      userProvider = UserProvider.instantiate(c.getEnvironment().getConfiguration());
133    } else {
134      LOG.warn("Try to initialize the coprocessor SnapshotScannerHDFSAclController but failure "
135        + "because the config " + SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE
136        + " is false.");
137    }
138  }
139
140  @Override
141  public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> c) throws IOException {
142    if (!initialized) {
143      return;
144    }
145    try (Admin admin = c.getEnvironment().getConnection().getAdmin()) {
146      if (admin.tableExists(PermissionStorage.ACL_TABLE_NAME)) {
147        // Check if acl table has 'm' CF, if not, add 'm' CF
148        TableDescriptor tableDescriptor = admin.getDescriptor(PermissionStorage.ACL_TABLE_NAME);
149        boolean containHdfsAclFamily = Arrays.stream(tableDescriptor.getColumnFamilies()).anyMatch(
150          family -> Bytes.equals(family.getName(), SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY));
151        if (!containHdfsAclFamily) {
152          TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDescriptor)
153            .setColumnFamily(ColumnFamilyDescriptorBuilder
154              .newBuilder(SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY).build());
155          admin.modifyTable(builder.build());
156        }
157        aclTableInitialized = true;
158      } else {
159        throw new TableNotFoundException(
160          "Table " + PermissionStorage.ACL_TABLE_NAME + " is not created yet. Please check if "
161            + getClass().getName() + " is configured after " + AccessController.class.getName());
162      }
163    }
164  }
165
166  @Override
167  public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c) {
168    if (initialized) {
169      hdfsAclHelper.close();
170    }
171  }
172
173  @Override
174  public void postCompletedCreateTableAction(ObserverContext<MasterCoprocessorEnvironment> c,
175    TableDescriptor desc, RegionInfo[] regions) throws IOException {
176    if (needHandleTableHdfsAcl(desc, "createTable " + desc.getTableName())) {
177      TableName tableName = desc.getTableName();
178      // 1. Create table directories to make HDFS acls can be inherited
179      hdfsAclHelper.createTableDirectories(tableName);
180      // 2. Add table owner HDFS acls
181      String owner = getActiveUser(c).getShortName();
182      hdfsAclHelper.addTableAcl(tableName, Sets.newHashSet(owner), "create");
183      // 3. Record table owner permission is synced to HDFS in acl table
184      SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(c.getEnvironment().getConnection(), owner,
185        tableName);
186    }
187  }
188
189  @Override
190  public void postCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> c,
191    NamespaceDescriptor ns) throws IOException {
192    if (checkInitialized("createNamespace " + ns.getName())) {
193      // Create namespace directories to make HDFS acls can be inherited
194      List<Path> paths = hdfsAclHelper.getNamespaceRootPaths(ns.getName());
195      for (Path path : paths) {
196        hdfsAclHelper.createDirIfNotExist(path);
197      }
198    }
199  }
200
201  @Override
202  public void postCompletedSnapshotAction(ObserverContext<MasterCoprocessorEnvironment> c,
203    SnapshotDescription snapshot, TableDescriptor tableDescriptor) throws IOException {
204    if (needHandleTableHdfsAcl(tableDescriptor, "snapshot " + snapshot.getName())) {
205      // Add HDFS acls of users with table read permission to snapshot files
206      hdfsAclHelper.snapshotAcl(snapshot);
207    }
208  }
209
210  @Override
211  public void postCompletedTruncateTableAction(ObserverContext<MasterCoprocessorEnvironment> c,
212    TableName tableName) throws IOException {
213    if (needHandleTableHdfsAcl(tableName, "truncateTable " + tableName)) {
214      // 1. create tmp table directories
215      hdfsAclHelper.createTableDirectories(tableName);
216      // 2. Since the table directories is recreated, so add HDFS acls again
217      Set<String> users = hdfsAclHelper.getUsersWithTableReadAction(tableName, false, false);
218      hdfsAclHelper.addTableAcl(tableName, users, "truncate");
219    }
220  }
221
222  @Override
223  public void postCompletedDeleteTableAction(ObserverContext<MasterCoprocessorEnvironment> ctx,
224    TableName tableName) throws IOException {
225    if (!tableName.isSystemTable() && checkInitialized("deleteTable " + tableName)) {
226      /*
227       * Remove table user access HDFS acl from namespace directory if the user has no permissions
228       * of global, ns of the table or other tables of the ns, eg: Bob has 'ns1:t1' read permission,
229       * when delete 'ns1:t1', if Bob has global read permission, '@ns1' read permission or
230       * 'ns1:other_tables' read permission, then skip remove Bob access acl in ns1Dirs, otherwise,
231       * remove Bob access acl.
232       */
233      try (Table aclTable =
234        ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
235        Set<String> users = SnapshotScannerHDFSAclStorage.getTableUsers(aclTable, tableName);
236        if (users.size() > 0) {
237          // 1. Remove table archive directory default ACLs
238          hdfsAclHelper.removeTableDefaultAcl(tableName, users);
239          // 2. Delete table owner permission is synced to HDFS in acl table
240          SnapshotScannerHDFSAclStorage.deleteTableHdfsAcl(aclTable, tableName);
241          // 3. Remove namespace access acls
242          Set<String> removeUsers = filterUsersToRemoveNsAccessAcl(aclTable, tableName, users);
243          if (removeUsers.size() > 0) {
244            hdfsAclHelper.removeNamespaceAccessAcl(tableName, removeUsers, "delete");
245          }
246        }
247      }
248    }
249  }
250
251  @Override
252  public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
253    TableName tableName, TableDescriptor oldDescriptor, TableDescriptor currentDescriptor)
254    throws IOException {
255    try (Table aclTable =
256      ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
257      if (
258        needHandleTableHdfsAcl(currentDescriptor, "modifyTable " + tableName)
259          && !hdfsAclHelper.isAclSyncToHdfsEnabled(oldDescriptor)
260      ) {
261        // 1. Create table directories used for acl inherited
262        hdfsAclHelper.createTableDirectories(tableName);
263        // 2. Add table users HDFS acls
264        Set<String> tableUsers = hdfsAclHelper.getUsersWithTableReadAction(tableName, false, false);
265        Set<String> users =
266          hdfsAclHelper.getUsersWithNamespaceReadAction(tableName.getNamespaceAsString(), true);
267        users.addAll(tableUsers);
268        hdfsAclHelper.addTableAcl(tableName, users, "modify");
269        // 3. Record table user acls are synced to HDFS in acl table
270        SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(ctx.getEnvironment().getConnection(),
271          tableUsers, tableName);
272      } else if (
273        needHandleTableHdfsAcl(oldDescriptor, "modifyTable " + tableName)
274          && !hdfsAclHelper.isAclSyncToHdfsEnabled(currentDescriptor)
275      ) {
276        // 1. Remove empty table directories
277        List<Path> tableRootPaths = hdfsAclHelper.getTableRootPaths(tableName, false);
278        for (Path path : tableRootPaths) {
279          hdfsAclHelper.deleteEmptyDir(path);
280        }
281        // 2. Remove all table HDFS acls
282        Set<String> tableUsers = hdfsAclHelper.getUsersWithTableReadAction(tableName, false, false);
283        Set<String> users =
284          hdfsAclHelper.getUsersWithNamespaceReadAction(tableName.getNamespaceAsString(), true);
285        users.addAll(tableUsers);
286        hdfsAclHelper.removeTableAcl(tableName, users);
287        // 3. Remove namespace access HDFS acls for users who only own permission for this table
288        hdfsAclHelper.removeNamespaceAccessAcl(tableName,
289          filterUsersToRemoveNsAccessAcl(aclTable, tableName, tableUsers), "modify");
290        // 4. Record table user acl is not synced to HDFS
291        SnapshotScannerHDFSAclStorage.deleteUserTableHdfsAcl(ctx.getEnvironment().getConnection(),
292          tableUsers, tableName);
293      }
294    }
295  }
296
297  @Override
298  public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
299    String namespace) throws IOException {
300    if (checkInitialized("deleteNamespace " + namespace)) {
301      try (Table aclTable =
302        ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
303        // 1. Delete namespace archive dir default ACLs
304        Set<String> users = SnapshotScannerHDFSAclStorage.getEntryUsers(aclTable,
305          PermissionStorage.toNamespaceEntry(Bytes.toBytes(namespace)));
306        hdfsAclHelper.removeNamespaceDefaultAcl(namespace, users);
307        // 2. Record namespace user acl is not synced to HDFS
308        SnapshotScannerHDFSAclStorage.deleteNamespaceHdfsAcl(ctx.getEnvironment().getConnection(),
309          namespace);
310        // 3. Delete tmp namespace directory
311        /**
312         * Delete namespace tmp directory because it's created by this coprocessor when namespace is
313         * created to make namespace default acl can be inherited by tables. The namespace data
314         * directory is deleted by DeleteNamespaceProcedure, the namespace archive directory is
315         * deleted by HFileCleaner.
316         */
317        hdfsAclHelper.deleteEmptyDir(pathHelper.getTmpNsDir(namespace));
318      }
319    }
320  }
321
322  @Override
323  public void postGrant(ObserverContext<MasterCoprocessorEnvironment> c,
324    UserPermission userPermission, boolean mergeExistingPermissions) throws IOException {
325    if (
326      !checkInitialized(
327        "grant " + userPermission + ", merge existing permissions " + mergeExistingPermissions)
328    ) {
329      return;
330    }
331    try (Table aclTable =
332      c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
333      Configuration conf = c.getEnvironment().getConfiguration();
334      String userName = userPermission.getUser();
335      switch (userPermission.getAccessScope()) {
336        case GLOBAL:
337          UserPermission perm = getUserGlobalPermission(conf, userName);
338          if (perm != null && hdfsAclHelper.containReadAction(perm)) {
339            if (!isHdfsAclSet(aclTable, userName)) {
340              // 1. Get namespaces and tables which global user acls are already synced
341              Pair<Set<String>, Set<TableName>> skipNamespaceAndTables =
342                SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName);
343              Set<String> skipNamespaces = skipNamespaceAndTables.getFirst();
344              Set<TableName> skipTables = skipNamespaceAndTables.getSecond().stream()
345                .filter(t -> !skipNamespaces.contains(t.getNamespaceAsString()))
346                .collect(Collectors.toSet());
347              // 2. Add HDFS acl(skip namespaces and tables directories whose acl is set)
348              hdfsAclHelper.grantAcl(userPermission, skipNamespaces, skipTables);
349              // 3. Record global acl is sync to HDFS
350              SnapshotScannerHDFSAclStorage.addUserGlobalHdfsAcl(aclTable, userName);
351            }
352          } else {
353            // The merged user permission doesn't contain READ, so remove user global HDFS acls if
354            // it's set
355            removeUserGlobalHdfsAcl(aclTable, userName, userPermission);
356          }
357          break;
358        case NAMESPACE:
359          String namespace = ((NamespacePermission) userPermission.getPermission()).getNamespace();
360          UserPermission nsPerm = getUserNamespacePermission(conf, userName, namespace);
361          if (nsPerm != null && hdfsAclHelper.containReadAction(nsPerm)) {
362            if (!isHdfsAclSet(aclTable, userName, namespace)) {
363              // 1. Get tables which namespace user acls are already synced
364              Set<TableName> skipTables = SnapshotScannerHDFSAclStorage
365                .getUserNamespaceAndTable(aclTable, userName).getSecond();
366              // 2. Add HDFS acl(skip tables directories whose acl is set)
367              hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), skipTables);
368            }
369            // 3. Record namespace acl is synced to HDFS
370            SnapshotScannerHDFSAclStorage.addUserNamespaceHdfsAcl(aclTable, userName, namespace);
371          } else {
372            // The merged user permission doesn't contain READ, so remove user namespace HDFS acls
373            // if it's set
374            removeUserNamespaceHdfsAcl(aclTable, userName, namespace, userPermission);
375          }
376          break;
377        case TABLE:
378          TablePermission tablePerm = (TablePermission) userPermission.getPermission();
379          if (needHandleTableHdfsAcl(tablePerm)) {
380            TableName tableName = tablePerm.getTableName();
381            UserPermission tPerm = getUserTablePermission(conf, userName, tableName);
382            if (tPerm != null && hdfsAclHelper.containReadAction(tPerm)) {
383              if (!isHdfsAclSet(aclTable, userName, tableName)) {
384                // 1. create table dirs
385                hdfsAclHelper.createTableDirectories(tableName);
386                // 2. Add HDFS acl
387                hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), new HashSet<>(0));
388              }
389              // 2. Record table acl is synced to HDFS
390              SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(aclTable, userName, tableName);
391            } else {
392              // The merged user permission doesn't contain READ, so remove user table HDFS acls if
393              // it's set
394              removeUserTableHdfsAcl(aclTable, userName, tableName, userPermission);
395            }
396          }
397          break;
398        default:
399          throw new IllegalArgumentException(
400            "Illegal user permission scope " + userPermission.getAccessScope());
401      }
402    }
403  }
404
405  @Override
406  public void postRevoke(ObserverContext<MasterCoprocessorEnvironment> c,
407    UserPermission userPermission) throws IOException {
408    if (checkInitialized("revoke " + userPermission)) {
409      try (Table aclTable =
410        c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
411        String userName = userPermission.getUser();
412        Configuration conf = c.getEnvironment().getConfiguration();
413        switch (userPermission.getAccessScope()) {
414          case GLOBAL:
415            UserPermission userGlobalPerm = getUserGlobalPermission(conf, userName);
416            if (userGlobalPerm == null || !hdfsAclHelper.containReadAction(userGlobalPerm)) {
417              removeUserGlobalHdfsAcl(aclTable, userName, userPermission);
418            }
419            break;
420          case NAMESPACE:
421            NamespacePermission nsPerm = (NamespacePermission) userPermission.getPermission();
422            UserPermission userNsPerm =
423              getUserNamespacePermission(conf, userName, nsPerm.getNamespace());
424            if (userNsPerm == null || !hdfsAclHelper.containReadAction(userNsPerm)) {
425              removeUserNamespaceHdfsAcl(aclTable, userName, nsPerm.getNamespace(), userPermission);
426            }
427            break;
428          case TABLE:
429            TablePermission tPerm = (TablePermission) userPermission.getPermission();
430            if (needHandleTableHdfsAcl(tPerm)) {
431              TableName tableName = tPerm.getTableName();
432              UserPermission userTablePerm = getUserTablePermission(conf, userName, tableName);
433              if (userTablePerm == null || !hdfsAclHelper.containReadAction(userTablePerm)) {
434                removeUserTableHdfsAcl(aclTable, userName, tableName, userPermission);
435              }
436            }
437            break;
438          default:
439            throw new IllegalArgumentException(
440              "Illegal user permission scope " + userPermission.getAccessScope());
441        }
442      }
443    }
444  }
445
446  private void removeUserGlobalHdfsAcl(Table aclTable, String userName,
447    UserPermission userPermission) throws IOException {
448    if (SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) {
449      // 1. Get namespaces and tables which global user acls are already synced
450      Pair<Set<String>, Set<TableName>> namespaceAndTable =
451        SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName);
452      Set<String> skipNamespaces = namespaceAndTable.getFirst();
453      Set<TableName> skipTables = namespaceAndTable.getSecond().stream()
454        .filter(t -> !skipNamespaces.contains(t.getNamespaceAsString()))
455        .collect(Collectors.toSet());
456      // 2. Remove user HDFS acls(skip namespaces and tables directories
457      // whose acl must be reversed)
458      hdfsAclHelper.revokeAcl(userPermission, skipNamespaces, skipTables);
459      // 3. Remove global user acl is synced to HDFS in acl table
460      SnapshotScannerHDFSAclStorage.deleteUserGlobalHdfsAcl(aclTable, userName);
461    }
462  }
463
464  private void removeUserNamespaceHdfsAcl(Table aclTable, String userName, String namespace,
465    UserPermission userPermission) throws IOException {
466    if (SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName, namespace)) {
467      if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) {
468        // 1. Get tables whose namespace user acls are already synced
469        Set<TableName> skipTables =
470          SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName).getSecond();
471        // 2. Remove user HDFS acls(skip tables directories whose acl must be reversed)
472        hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(), skipTables);
473      }
474      // 3. Remove namespace user acl is synced to HDFS in acl table
475      SnapshotScannerHDFSAclStorage.deleteUserNamespaceHdfsAcl(aclTable, userName, namespace);
476    }
477  }
478
479  private void removeUserTableHdfsAcl(Table aclTable, String userName, TableName tableName,
480    UserPermission userPermission) throws IOException {
481    if (SnapshotScannerHDFSAclStorage.hasUserTableHdfsAcl(aclTable, userName, tableName)) {
482      if (
483        !SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)
484          && !SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName,
485            tableName.getNamespaceAsString())
486      ) {
487        // 1. Remove table acls
488        hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(0), new HashSet<>(0));
489      }
490      // 2. Remove table user acl is synced to HDFS in acl table
491      SnapshotScannerHDFSAclStorage.deleteUserTableHdfsAcl(aclTable, userName, tableName);
492    }
493  }
494
495  private UserPermission getUserGlobalPermission(Configuration conf, String userName)
496    throws IOException {
497    List<UserPermission> permissions = PermissionStorage.getUserPermissions(conf,
498      PermissionStorage.ACL_GLOBAL_NAME, null, null, userName, true);
499    return permissions.size() > 0 ? permissions.get(0) : null;
500  }
501
502  private UserPermission getUserNamespacePermission(Configuration conf, String userName,
503    String namespace) throws IOException {
504    List<UserPermission> permissions =
505      PermissionStorage.getUserNamespacePermissions(conf, namespace, userName, true);
506    return permissions.size() > 0 ? permissions.get(0) : null;
507  }
508
509  private UserPermission getUserTablePermission(Configuration conf, String userName,
510    TableName tableName) throws IOException {
511    List<UserPermission> permissions = PermissionStorage
512      .getUserTablePermissions(conf, tableName, null, null, userName, true).stream()
513      .filter(userPermission -> hdfsAclHelper
514        .isNotFamilyOrQualifierPermission((TablePermission) userPermission.getPermission()))
515      .collect(Collectors.toList());
516    return permissions.size() > 0 ? permissions.get(0) : null;
517  }
518
519  private boolean isHdfsAclSet(Table aclTable, String userName) throws IOException {
520    return isHdfsAclSet(aclTable, userName, null, null);
521  }
522
523  private boolean isHdfsAclSet(Table aclTable, String userName, String namespace)
524    throws IOException {
525    return isHdfsAclSet(aclTable, userName, namespace, null);
526  }
527
528  private boolean isHdfsAclSet(Table aclTable, String userName, TableName tableName)
529    throws IOException {
530    return isHdfsAclSet(aclTable, userName, null, tableName);
531  }
532
533  /**
534   * Check if user global/namespace/table HDFS acls is already set
535   */
536  private boolean isHdfsAclSet(Table aclTable, String userName, String namespace,
537    TableName tableName) throws IOException {
538    boolean isSet = SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName);
539    if (namespace != null) {
540      isSet = isSet
541        || SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName, namespace);
542    }
543    if (tableName != null) {
544      isSet = isSet
545        || SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName,
546          tableName.getNamespaceAsString())
547        || SnapshotScannerHDFSAclStorage.hasUserTableHdfsAcl(aclTable, userName, tableName);
548    }
549    return isSet;
550  }
551
552  @InterfaceAudience.Private
553  boolean checkInitialized(String operation) {
554    if (initialized) {
555      if (aclTableInitialized) {
556        return true;
557      } else {
558        LOG.warn("Skip set HDFS acls because acl table is not initialized when {}", operation);
559      }
560    }
561    return false;
562  }
563
564  private boolean needHandleTableHdfsAcl(TablePermission tablePermission) throws IOException {
565    return needHandleTableHdfsAcl(tablePermission.getTableName(), "")
566      && hdfsAclHelper.isNotFamilyOrQualifierPermission(tablePermission);
567  }
568
569  private boolean needHandleTableHdfsAcl(TableName tableName, String operation) throws IOException {
570    return !tableName.isSystemTable() && checkInitialized(operation)
571      && hdfsAclHelper.isAclSyncToHdfsEnabled(masterServices.getTableDescriptors().get(tableName));
572  }
573
574  private boolean needHandleTableHdfsAcl(TableDescriptor tableDescriptor, String operation) {
575    TableName tableName = tableDescriptor.getTableName();
576    return !tableName.isSystemTable() && checkInitialized(operation)
577      && hdfsAclHelper.isAclSyncToHdfsEnabled(tableDescriptor);
578  }
579
580  private User getActiveUser(ObserverContext<?> ctx) throws IOException {
581    // for non-rpc handling, fallback to system user
582    Optional<User> optionalUser = ctx.getCaller();
583    if (optionalUser.isPresent()) {
584      return optionalUser.get();
585    }
586    return userProvider.getCurrent();
587  }
588
589  /**
590   * Remove table user access HDFS acl from namespace directory if the user has no permissions of
591   * global, ns of the table or other tables of the ns, eg: Bob has 'ns1:t1' read permission, when
592   * delete 'ns1:t1', if Bob has global read permission, '@ns1' read permission or
593   * 'ns1:other_tables' read permission, then skip remove Bob access acl in ns1Dirs, otherwise,
594   * remove Bob access acl.
595   * @param aclTable    acl table
596   * @param tableName   the name of the table
597   * @param tablesUsers table users set
598   * @return users whose access acl will be removed from the namespace of the table
599   * @throws IOException if an error occurred
600   */
601  private Set<String> filterUsersToRemoveNsAccessAcl(Table aclTable, TableName tableName,
602    Set<String> tablesUsers) throws IOException {
603    Set<String> removeUsers = new HashSet<>();
604    byte[] namespace = tableName.getNamespace();
605    for (String user : tablesUsers) {
606      List<byte[]> userEntries = SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, user);
607      boolean remove = true;
608      for (byte[] entry : userEntries) {
609        if (
610          PermissionStorage.isGlobalEntry(entry)
611            || (PermissionStorage.isNamespaceEntry(entry)
612              && Bytes.equals(PermissionStorage.fromNamespaceEntry(entry), namespace))
613            || (PermissionStorage.isTableEntry(entry) && !Bytes.equals(tableName.getName(), entry)
614              && Bytes.equals(TableName.valueOf(entry).getNamespace(), namespace))
615        ) {
616          remove = false;
617          break;
618        }
619      }
620      if (remove) {
621        removeUsers.add(user);
622      }
623    }
624    return removeUsers;
625  }
626
627  static final class SnapshotScannerHDFSAclStorage {
628    /**
629     * Add a new CF in HBase acl table to record if the HBase read permission is synchronized to
630     * related hfiles. The record has two usages: 1. check if we need to remove HDFS acls for a
631     * grant without READ permission(eg: grant user table read permission and then grant user table
632     * write permission without merging the existing permissions, in this case, need to remove HDFS
633     * acls); 2. skip some HDFS acl sync because it may be already set(eg: grant user table read
634     * permission and then grant user ns read permission; grant user table read permission and then
635     * grant user table write permission with merging the existing permissions).
636     */
637    static final byte[] HDFS_ACL_FAMILY = Bytes.toBytes("m");
638    // The value 'R' has no specific meaning, if cell value is not null, it means that the user HDFS
639    // acls is set to hfiles.
640    private static final byte[] HDFS_ACL_VALUE = Bytes.toBytes("R");
641
642    static void addUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
643      addUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
644    }
645
646    static void addUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
647      throws IOException {
648      addUserEntry(aclTable, user, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
649    }
650
651    static void addUserTableHdfsAcl(Connection connection, Set<String> users, TableName tableName)
652      throws IOException {
653      try (Table aclTable = connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
654        for (String user : users) {
655          addUserTableHdfsAcl(aclTable, user, tableName);
656        }
657      }
658    }
659
660    static void addUserTableHdfsAcl(Connection connection, String user, TableName tableName)
661      throws IOException {
662      try (Table aclTable = connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
663        addUserTableHdfsAcl(aclTable, user, tableName);
664      }
665    }
666
667    static void addUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
668      throws IOException {
669      addUserEntry(aclTable, user, tableName.getName());
670    }
671
672    private static void addUserEntry(Table t, String user, byte[] entry) throws IOException {
673      Put p = new Put(entry);
674      p.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(user), HDFS_ACL_VALUE);
675      t.put(p);
676    }
677
678    static void deleteUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
679      deleteUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
680    }
681
682    static void deleteUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
683      throws IOException {
684      deleteUserEntry(aclTable, user, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
685    }
686
687    static void deleteUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
688      throws IOException {
689      deleteUserEntry(aclTable, user, tableName.getName());
690    }
691
692    static void deleteUserTableHdfsAcl(Connection connection, Set<String> users,
693      TableName tableName) throws IOException {
694      try (Table aclTable = connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
695        for (String user : users) {
696          deleteUserTableHdfsAcl(aclTable, user, tableName);
697        }
698      }
699    }
700
701    private static void deleteUserEntry(Table aclTable, String user, byte[] entry)
702      throws IOException {
703      Delete delete = new Delete(entry);
704      delete.addColumns(HDFS_ACL_FAMILY, Bytes.toBytes(user));
705      aclTable.delete(delete);
706    }
707
708    static void deleteNamespaceHdfsAcl(Connection connection, String namespace) throws IOException {
709      try (Table aclTable = connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
710        deleteEntry(aclTable, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
711      }
712    }
713
714    static void deleteTableHdfsAcl(Table aclTable, TableName tableName) throws IOException {
715      deleteEntry(aclTable, tableName.getName());
716    }
717
718    private static void deleteEntry(Table aclTable, byte[] entry) throws IOException {
719      Delete delete = new Delete(entry);
720      delete.addFamily(HDFS_ACL_FAMILY);
721      aclTable.delete(delete);
722    }
723
724    static Set<String> getTableUsers(Table aclTable, TableName tableName) throws IOException {
725      return getEntryUsers(aclTable, tableName.getName());
726    }
727
728    private static Set<String> getEntryUsers(Table aclTable, byte[] entry) throws IOException {
729      Set<String> users = new HashSet<>();
730      Get get = new Get(entry);
731      get.addFamily(HDFS_ACL_FAMILY);
732      Result result = aclTable.get(get);
733      List<Cell> cells = result.listCells();
734      if (cells != null) {
735        for (Cell cell : cells) {
736          if (cell != null) {
737            users.add(Bytes.toString(CellUtil.cloneQualifier(cell)));
738          }
739        }
740      }
741      return users;
742    }
743
744    static Pair<Set<String>, Set<TableName>> getUserNamespaceAndTable(Table aclTable,
745      String userName) throws IOException {
746      Set<String> namespaces = new HashSet<>();
747      Set<TableName> tables = new HashSet<>();
748      List<byte[]> userEntries = getUserEntries(aclTable, userName);
749      for (byte[] entry : userEntries) {
750        if (PermissionStorage.isNamespaceEntry(entry)) {
751          namespaces.add(Bytes.toString(PermissionStorage.fromNamespaceEntry(entry)));
752        } else if (PermissionStorage.isTableEntry(entry)) {
753          tables.add(TableName.valueOf(entry));
754        }
755      }
756      return new Pair<>(namespaces, tables);
757    }
758
759    static List<byte[]> getUserEntries(Table aclTable, String userName) throws IOException {
760      Scan scan = new Scan();
761      scan.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(userName));
762      ResultScanner scanner = aclTable.getScanner(scan);
763      List<byte[]> entry = new ArrayList<>();
764      for (Result result : scanner) {
765        if (result != null && result.getRow() != null) {
766          entry.add(result.getRow());
767        }
768      }
769      return entry;
770    }
771
772    static boolean hasUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
773      return hasUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
774    }
775
776    static boolean hasUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
777      throws IOException {
778      return hasUserEntry(aclTable, user,
779        Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
780    }
781
782    static boolean hasUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
783      throws IOException {
784      return hasUserEntry(aclTable, user, tableName.getName());
785    }
786
787    private static boolean hasUserEntry(Table aclTable, String userName, byte[] entry)
788      throws IOException {
789      Get get = new Get(entry);
790      get.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(userName));
791      return aclTable.exists(get);
792    }
793  }
794}