001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security.access;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Optional;
027import java.util.Set;
028import java.util.stream.Collectors;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.HBaseInterfaceAudience;
035import org.apache.hadoop.hbase.NamespaceDescriptor;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.TableNotFoundException;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.Delete;
042import org.apache.hadoop.hbase.client.Get;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.Result;
046import org.apache.hadoop.hbase.client.ResultScanner;
047import org.apache.hadoop.hbase.client.Scan;
048import org.apache.hadoop.hbase.client.SnapshotDescription;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
053import org.apache.hadoop.hbase.coprocessor.HasMasterServices;
054import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
055import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
056import org.apache.hadoop.hbase.coprocessor.MasterObserver;
057import org.apache.hadoop.hbase.coprocessor.ObserverContext;
058import org.apache.hadoop.hbase.master.MasterServices;
059import org.apache.hadoop.hbase.security.User;
060import org.apache.hadoop.hbase.security.UserProvider;
061import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper.PathHelper;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.Pair;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
069import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
070
071/**
072 * Set HDFS ACLs to hFiles to make HBase granted users have permission to scan snapshot
073 * <p>
074 * To use this feature, please mask sure HDFS config:
075 * <ul>
076 * <li>dfs.namenode.acls.enabled = true</li>
077 * <li>fs.permissions.umask-mode = 027 (or smaller umask than 027)</li>
078 * </ul>
079 * </p>
080 * <p>
081 * The implementation of this feature is as followings:
082 * <ul>
083 * <li>For common directories such as 'data' and 'archive', set other permission to '--x' to make
084 * everyone have the permission to access the directory.</li>
085 * <li>For namespace or table directories such as 'data/ns/table', 'archive/ns/table' and
086 * '.hbase-snapshot/snapshotName', set user 'r-x' access acl and 'r-x' default acl when following
087 * operations happen:
088 * <ul>
089 * <li>grant user with global, namespace or table permission;</li>
090 * <li>revoke user from global, namespace or table;</li>
091 * <li>snapshot table;</li>
092 * <li>truncate table;</li>
093 * </ul>
094 * </li>
095 * <li>Note: Because snapshots are at table level, so this feature just considers users with global,
096 * namespace or table permissions, ignores users with table CF or cell permissions.</li>
097 * </ul>
098 * </p>
099 */
100@CoreCoprocessor
101@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
102public class SnapshotScannerHDFSAclController implements MasterCoprocessor, MasterObserver {
103  private static final Logger LOG = LoggerFactory.getLogger(SnapshotScannerHDFSAclController.class);
104
105  private SnapshotScannerHDFSAclHelper hdfsAclHelper = null;
106  private PathHelper pathHelper = null;
107  private MasterServices masterServices = null;
108  private volatile boolean initialized = false;
109  private volatile boolean aclTableInitialized = false;
110  /** Provider for mapping principal names to Users */
111  private UserProvider userProvider;
112
113  @Override
114  public Optional<MasterObserver> getMasterObserver() {
115    return Optional.of(this);
116  }
117
118  @Override
119  public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> c)
120      throws IOException {
121    if (c.getEnvironment().getConfiguration()
122        .getBoolean(SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE, false)) {
123      MasterCoprocessorEnvironment mEnv = c.getEnvironment();
124      if (!(mEnv instanceof HasMasterServices)) {
125        throw new IOException("Does not implement HMasterServices");
126      }
127      masterServices = ((HasMasterServices) mEnv).getMasterServices();
128      hdfsAclHelper = new SnapshotScannerHDFSAclHelper(masterServices.getConfiguration(),
129          masterServices.getConnection());
130      pathHelper = hdfsAclHelper.getPathHelper();
131      hdfsAclHelper.setCommonDirectoryPermission();
132      initialized = true;
133      userProvider = UserProvider.instantiate(c.getEnvironment().getConfiguration());
134    } else {
135      LOG.warn("Try to initialize the coprocessor SnapshotScannerHDFSAclController but failure "
136          + "because the config " + SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE
137          + " is false.");
138    }
139  }
140
141  @Override
142  public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> c) throws IOException {
143    if (!initialized) {
144      return;
145    }
146    try (Admin admin = c.getEnvironment().getConnection().getAdmin()) {
147      if (admin.tableExists(PermissionStorage.ACL_TABLE_NAME)) {
148        // Check if acl table has 'm' CF, if not, add 'm' CF
149        TableDescriptor tableDescriptor = admin.getDescriptor(PermissionStorage.ACL_TABLE_NAME);
150        boolean containHdfsAclFamily = Arrays.stream(tableDescriptor.getColumnFamilies()).anyMatch(
151          family -> Bytes.equals(family.getName(), SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY));
152        if (!containHdfsAclFamily) {
153          TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDescriptor)
154              .setColumnFamily(ColumnFamilyDescriptorBuilder
155                  .newBuilder(SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY).build());
156          admin.modifyTable(builder.build());
157        }
158        aclTableInitialized = true;
159      } else {
160        throw new TableNotFoundException("Table " + PermissionStorage.ACL_TABLE_NAME
161            + " is not created yet. Please check if " + getClass().getName()
162            + " is configured after " + AccessController.class.getName());
163      }
164    }
165  }
166
167  @Override
168  public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c) {
169    if (initialized) {
170      hdfsAclHelper.close();
171    }
172  }
173
174  @Override
175  public void postCompletedCreateTableAction(ObserverContext<MasterCoprocessorEnvironment> c,
176      TableDescriptor desc, RegionInfo[] regions) throws IOException {
177    if (needHandleTableHdfsAcl(desc, "createTable " + desc.getTableName())) {
178      TableName tableName = desc.getTableName();
179      // 1. Create table directories to make HDFS acls can be inherited
180      hdfsAclHelper.createTableDirectories(tableName);
181      // 2. Add table owner HDFS acls
182      String owner =
183          desc.getOwnerString() == null ? getActiveUser(c).getShortName() : desc.getOwnerString();
184      hdfsAclHelper.addTableAcl(tableName, Sets.newHashSet(owner), "create");
185      // 3. Record table owner permission is synced to HDFS in acl table
186      SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(c.getEnvironment().getConnection(), owner,
187        tableName);
188    }
189  }
190
191  @Override
192  public void postCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> c,
193      NamespaceDescriptor ns) throws IOException {
194    if (checkInitialized("createNamespace " + ns.getName())) {
195      // Create namespace directories to make HDFS acls can be inherited
196      List<Path> paths = hdfsAclHelper.getNamespaceRootPaths(ns.getName());
197      for (Path path : paths) {
198        hdfsAclHelper.createDirIfNotExist(path);
199      }
200    }
201  }
202
203  @Override
204  public void postCompletedSnapshotAction(ObserverContext<MasterCoprocessorEnvironment> c,
205      SnapshotDescription snapshot, TableDescriptor tableDescriptor) throws IOException {
206    if (needHandleTableHdfsAcl(tableDescriptor, "snapshot " + snapshot.getName())) {
207      // Add HDFS acls of users with table read permission to snapshot files
208      hdfsAclHelper.snapshotAcl(snapshot);
209    }
210  }
211
212  @Override
213  public void postCompletedTruncateTableAction(ObserverContext<MasterCoprocessorEnvironment> c,
214      TableName tableName) throws IOException {
215    if (needHandleTableHdfsAcl(tableName, "truncateTable " + tableName)) {
216      // 1. create tmp table directories
217      hdfsAclHelper.createTableDirectories(tableName);
218      // 2. Since the table directories is recreated, so add HDFS acls again
219      Set<String> users = hdfsAclHelper.getUsersWithTableReadAction(tableName, false, false);
220      hdfsAclHelper.addTableAcl(tableName, users, "truncate");
221    }
222  }
223
224  @Override
225  public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
226      TableName tableName) throws IOException {
227    if (needHandleTableHdfsAcl(tableName, "deleteTable " + tableName)) {
228      /*
229       * Remove table user access HDFS acl from namespace directory if the user has no permissions
230       * of global, ns of the table or other tables of the ns, eg: Bob has 'ns1:t1' read permission,
231       * when delete 'ns1:t1', if Bob has global read permission, '@ns1' read permission or
232       * 'ns1:other_tables' read permission, then skip remove Bob access acl in ns1Dirs, otherwise,
233       * remove Bob access acl.
234       */
235      try (Table aclTable =
236          ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
237        Set<String> users = SnapshotScannerHDFSAclStorage.getTableUsers(aclTable, tableName);
238        // 1. Remove table archive directory default ACLs
239        hdfsAclHelper.removeTableDefaultAcl(tableName, users);
240        // 2. Delete table owner permission is synced to HDFS in acl table
241        SnapshotScannerHDFSAclStorage.deleteTableHdfsAcl(aclTable, tableName);
242        // 3. Remove namespace access acls
243        Set<String> removeUsers = filterUsersToRemoveNsAccessAcl(aclTable, tableName, users);
244        if (removeUsers.size() > 0) {
245          hdfsAclHelper.removeNamespaceAccessAcl(tableName, removeUsers, "delete");
246        }
247      }
248    }
249  }
250
251  @Override
252  public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
253      TableName tableName, TableDescriptor oldDescriptor, TableDescriptor currentDescriptor)
254      throws IOException {
255    try (Table aclTable =
256        ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
257      if (needHandleTableHdfsAcl(currentDescriptor, "modifyTable " + tableName)
258          && !hdfsAclHelper.isAclSyncToHdfsEnabled(oldDescriptor)) {
259        // 1. Create table directories used for acl inherited
260        hdfsAclHelper.createTableDirectories(tableName);
261        // 2. Add table users HDFS acls
262        Set<String> tableUsers = hdfsAclHelper.getUsersWithTableReadAction(tableName, false, false);
263        Set<String> users =
264            hdfsAclHelper.getUsersWithNamespaceReadAction(tableName.getNamespaceAsString(), true);
265        users.addAll(tableUsers);
266        hdfsAclHelper.addTableAcl(tableName, users, "modify");
267        // 3. Record table user acls are synced to HDFS in acl table
268        SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(ctx.getEnvironment().getConnection(),
269          tableUsers, tableName);
270      } else if (needHandleTableHdfsAcl(oldDescriptor, "modifyTable " + tableName)
271          && !hdfsAclHelper.isAclSyncToHdfsEnabled(currentDescriptor)) {
272        // 1. Remove empty table directories
273        List<Path> tableRootPaths = hdfsAclHelper.getTableRootPaths(tableName, false);
274        for (Path path : tableRootPaths) {
275          hdfsAclHelper.deleteEmptyDir(path);
276        }
277        // 2. Remove all table HDFS acls
278        Set<String> tableUsers = hdfsAclHelper.getUsersWithTableReadAction(tableName, false, false);
279        Set<String> users = hdfsAclHelper
280            .getUsersWithNamespaceReadAction(tableName.getNamespaceAsString(), true);
281        users.addAll(tableUsers);
282        hdfsAclHelper.removeTableAcl(tableName, users);
283        // 3. Remove namespace access HDFS acls for users who only own permission for this table
284        hdfsAclHelper.removeNamespaceAccessAcl(tableName,
285          filterUsersToRemoveNsAccessAcl(aclTable, tableName, tableUsers), "modify");
286        // 4. Record table user acl is not synced to HDFS
287        SnapshotScannerHDFSAclStorage.deleteUserTableHdfsAcl(ctx.getEnvironment().getConnection(),
288          tableUsers, tableName);
289      }
290    }
291  }
292
293  @Override
294  public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
295      String namespace) throws IOException {
296    if (checkInitialized("deleteNamespace " + namespace)) {
297      try (Table aclTable =
298          ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
299        // 1. Delete namespace archive dir default ACLs
300        Set<String> users = SnapshotScannerHDFSAclStorage.getEntryUsers(aclTable,
301          PermissionStorage.toNamespaceEntry(Bytes.toBytes(namespace)));
302        hdfsAclHelper.removeNamespaceDefaultAcl(namespace, users);
303        // 2. Record namespace user acl is not synced to HDFS
304        SnapshotScannerHDFSAclStorage.deleteNamespaceHdfsAcl(ctx.getEnvironment().getConnection(),
305          namespace);
306        // 3. Delete tmp namespace directory
307        /**
308         * Delete namespace tmp directory because it's created by this coprocessor when namespace is
309         * created to make namespace default acl can be inherited by tables. The namespace data
310         * directory is deleted by DeleteNamespaceProcedure, the namespace archive directory is
311         * deleted by HFileCleaner.
312         */
313        hdfsAclHelper.deleteEmptyDir(pathHelper.getTmpNsDir(namespace));
314      }
315    }
316  }
317
318  @Override
319  public void postGrant(ObserverContext<MasterCoprocessorEnvironment> c,
320      UserPermission userPermission, boolean mergeExistingPermissions) throws IOException {
321    if (!checkInitialized(
322      "grant " + userPermission + ", merge existing permissions " + mergeExistingPermissions)) {
323      return;
324    }
325    try (Table aclTable =
326        c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
327      Configuration conf = c.getEnvironment().getConfiguration();
328      String userName = userPermission.getUser();
329      switch (userPermission.getAccessScope()) {
330        case GLOBAL:
331          UserPermission perm = getUserGlobalPermission(conf, userName);
332          if (perm != null && hdfsAclHelper.containReadAction(perm)) {
333            if (!isHdfsAclSet(aclTable, userName)) {
334              // 1. Get namespaces and tables which global user acls are already synced
335              Pair<Set<String>, Set<TableName>> skipNamespaceAndTables =
336                  SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName);
337              Set<String> skipNamespaces = skipNamespaceAndTables.getFirst();
338              Set<TableName> skipTables = skipNamespaceAndTables.getSecond().stream()
339                  .filter(t -> !skipNamespaces.contains(t.getNamespaceAsString()))
340                  .collect(Collectors.toSet());
341              // 2. Add HDFS acl(skip namespaces and tables directories whose acl is set)
342              hdfsAclHelper.grantAcl(userPermission, skipNamespaces, skipTables);
343              // 3. Record global acl is sync to HDFS
344              SnapshotScannerHDFSAclStorage.addUserGlobalHdfsAcl(aclTable, userName);
345            }
346          } else {
347            // The merged user permission doesn't contain READ, so remove user global HDFS acls if
348            // it's set
349            removeUserGlobalHdfsAcl(aclTable, userName, userPermission);
350          }
351          break;
352        case NAMESPACE:
353          String namespace = ((NamespacePermission) userPermission.getPermission()).getNamespace();
354          UserPermission nsPerm = getUserNamespacePermission(conf, userName, namespace);
355          if (nsPerm != null && hdfsAclHelper.containReadAction(nsPerm)) {
356            if (!isHdfsAclSet(aclTable, userName, namespace)) {
357              // 1. Get tables which namespace user acls are already synced
358              Set<TableName> skipTables = SnapshotScannerHDFSAclStorage
359                  .getUserNamespaceAndTable(aclTable, userName).getSecond();
360              // 2. Add HDFS acl(skip tables directories whose acl is set)
361              hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), skipTables);
362            }
363            // 3. Record namespace acl is synced to HDFS
364            SnapshotScannerHDFSAclStorage.addUserNamespaceHdfsAcl(aclTable, userName, namespace);
365          } else {
366            // The merged user permission doesn't contain READ, so remove user namespace HDFS acls
367            // if it's set
368            removeUserNamespaceHdfsAcl(aclTable, userName, namespace, userPermission);
369          }
370          break;
371        case TABLE:
372          TablePermission tablePerm = (TablePermission) userPermission.getPermission();
373          if (needHandleTableHdfsAcl(tablePerm)) {
374            TableName tableName = tablePerm.getTableName();
375            UserPermission tPerm = getUserTablePermission(conf, userName, tableName);
376            if (tPerm != null && hdfsAclHelper.containReadAction(tPerm)) {
377              if (!isHdfsAclSet(aclTable, userName, tableName)) {
378                // 1. create table dirs
379                hdfsAclHelper.createTableDirectories(tableName);
380                // 2. Add HDFS acl
381                hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), new HashSet<>(0));
382              }
383              // 2. Record table acl is synced to HDFS
384              SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(aclTable, userName, tableName);
385            } else {
386              // The merged user permission doesn't contain READ, so remove user table HDFS acls if
387              // it's set
388              removeUserTableHdfsAcl(aclTable, userName, tableName, userPermission);
389            }
390          }
391          break;
392        default:
393          throw new IllegalArgumentException(
394              "Illegal user permission scope " + userPermission.getAccessScope());
395      }
396    }
397  }
398
399  @Override
400  public void postRevoke(ObserverContext<MasterCoprocessorEnvironment> c,
401      UserPermission userPermission) throws IOException {
402    if (checkInitialized("revoke " + userPermission)) {
403      try (Table aclTable =
404          c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
405        String userName = userPermission.getUser();
406        Configuration conf = c.getEnvironment().getConfiguration();
407        switch (userPermission.getAccessScope()) {
408          case GLOBAL:
409            UserPermission userGlobalPerm = getUserGlobalPermission(conf, userName);
410            if (userGlobalPerm == null || !hdfsAclHelper.containReadAction(userGlobalPerm)) {
411              removeUserGlobalHdfsAcl(aclTable, userName, userPermission);
412            }
413            break;
414          case NAMESPACE:
415            NamespacePermission nsPerm = (NamespacePermission) userPermission.getPermission();
416            UserPermission userNsPerm =
417                getUserNamespacePermission(conf, userName, nsPerm.getNamespace());
418            if (userNsPerm == null || !hdfsAclHelper.containReadAction(userNsPerm)) {
419              removeUserNamespaceHdfsAcl(aclTable, userName, nsPerm.getNamespace(), userPermission);
420            }
421            break;
422          case TABLE:
423            TablePermission tPerm = (TablePermission) userPermission.getPermission();
424            if (needHandleTableHdfsAcl(tPerm)) {
425              TableName tableName = tPerm.getTableName();
426              UserPermission userTablePerm = getUserTablePermission(conf, userName, tableName);
427              if (userTablePerm == null || !hdfsAclHelper.containReadAction(userTablePerm)) {
428                removeUserTableHdfsAcl(aclTable, userName, tableName, userPermission);
429              }
430            }
431            break;
432          default:
433            throw new IllegalArgumentException(
434                "Illegal user permission scope " + userPermission.getAccessScope());
435        }
436      }
437    }
438  }
439
440  private void removeUserGlobalHdfsAcl(Table aclTable, String userName,
441      UserPermission userPermission) throws IOException {
442    if (SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) {
443      // 1. Get namespaces and tables which global user acls are already synced
444      Pair<Set<String>, Set<TableName>> namespaceAndTable =
445          SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName);
446      Set<String> skipNamespaces = namespaceAndTable.getFirst();
447      Set<TableName> skipTables = namespaceAndTable.getSecond().stream()
448          .filter(t -> !skipNamespaces.contains(t.getNamespaceAsString()))
449          .collect(Collectors.toSet());
450      // 2. Remove user HDFS acls(skip namespaces and tables directories
451      // whose acl must be reversed)
452      hdfsAclHelper.revokeAcl(userPermission, skipNamespaces, skipTables);
453      // 3. Remove global user acl is synced to HDFS in acl table
454      SnapshotScannerHDFSAclStorage.deleteUserGlobalHdfsAcl(aclTable, userName);
455    }
456  }
457
458  private void removeUserNamespaceHdfsAcl(Table aclTable, String userName, String namespace,
459      UserPermission userPermission) throws IOException {
460    if (SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName, namespace)) {
461      if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) {
462        // 1. Get tables whose namespace user acls are already synced
463        Set<TableName> skipTables =
464            SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName).getSecond();
465        // 2. Remove user HDFS acls(skip tables directories whose acl must be reversed)
466        hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(), skipTables);
467      }
468      // 3. Remove namespace user acl is synced to HDFS in acl table
469      SnapshotScannerHDFSAclStorage.deleteUserNamespaceHdfsAcl(aclTable, userName, namespace);
470    }
471  }
472
473  private void removeUserTableHdfsAcl(Table aclTable, String userName, TableName tableName,
474      UserPermission userPermission) throws IOException {
475    if (SnapshotScannerHDFSAclStorage.hasUserTableHdfsAcl(aclTable, userName, tableName)) {
476      if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)
477          && !SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName,
478            tableName.getNamespaceAsString())) {
479        // 1. Remove table acls
480        hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(0), new HashSet<>(0));
481      }
482      // 2. Remove table user acl is synced to HDFS in acl table
483      SnapshotScannerHDFSAclStorage.deleteUserTableHdfsAcl(aclTable, userName, tableName);
484    }
485  }
486
487  private UserPermission getUserGlobalPermission(Configuration conf, String userName)
488      throws IOException {
489    List<UserPermission> permissions = PermissionStorage.getUserPermissions(conf,
490      PermissionStorage.ACL_GLOBAL_NAME, null, null, userName, true);
491    return permissions.size() > 0 ? permissions.get(0) : null;
492  }
493
494  private UserPermission getUserNamespacePermission(Configuration conf, String userName,
495      String namespace) throws IOException {
496    List<UserPermission> permissions =
497        PermissionStorage.getUserNamespacePermissions(conf, namespace, userName, true);
498    return permissions.size() > 0 ? permissions.get(0) : null;
499  }
500
501  private UserPermission getUserTablePermission(Configuration conf, String userName,
502      TableName tableName) throws IOException {
503    List<UserPermission> permissions = PermissionStorage
504        .getUserTablePermissions(conf, tableName, null, null, userName, true).stream()
505        .filter(userPermission -> hdfsAclHelper
506            .isNotFamilyOrQualifierPermission((TablePermission) userPermission.getPermission()))
507        .collect(Collectors.toList());
508    return permissions.size() > 0 ? permissions.get(0) : null;
509  }
510
511  private boolean isHdfsAclSet(Table aclTable, String userName) throws IOException {
512    return isHdfsAclSet(aclTable, userName, null, null);
513  }
514
515  private boolean isHdfsAclSet(Table aclTable, String userName, String namespace)
516      throws IOException {
517    return isHdfsAclSet(aclTable, userName, namespace, null);
518  }
519
520  private boolean isHdfsAclSet(Table aclTable, String userName, TableName tableName)
521      throws IOException {
522    return isHdfsAclSet(aclTable, userName, null, tableName);
523  }
524
525  /**
526   * Check if user global/namespace/table HDFS acls is already set
527   */
528  private boolean isHdfsAclSet(Table aclTable, String userName, String namespace,
529      TableName tableName) throws IOException {
530    boolean isSet = SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName);
531    if (namespace != null) {
532      isSet = isSet
533          || SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName, namespace);
534    }
535    if (tableName != null) {
536      isSet = isSet
537          || SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName,
538            tableName.getNamespaceAsString())
539          || SnapshotScannerHDFSAclStorage.hasUserTableHdfsAcl(aclTable, userName, tableName);
540    }
541    return isSet;
542  }
543
544  @VisibleForTesting
545  boolean checkInitialized(String operation) {
546    if (initialized) {
547      if (aclTableInitialized) {
548        return true;
549      } else {
550        LOG.warn("Skip set HDFS acls because acl table is not initialized when " + operation);
551      }
552    }
553    return false;
554  }
555
556  private boolean needHandleTableHdfsAcl(TablePermission tablePermission) throws IOException {
557    return needHandleTableHdfsAcl(tablePermission.getTableName(), "")
558        && hdfsAclHelper.isNotFamilyOrQualifierPermission(tablePermission);
559  }
560
561  private boolean needHandleTableHdfsAcl(TableName tableName, String operation) throws IOException {
562    return !tableName.isSystemTable() && checkInitialized(operation) && hdfsAclHelper
563        .isAclSyncToHdfsEnabled(masterServices.getTableDescriptors().get(tableName));
564  }
565
566  private boolean needHandleTableHdfsAcl(TableDescriptor tableDescriptor, String operation) {
567    TableName tableName = tableDescriptor.getTableName();
568    return !tableName.isSystemTable() && checkInitialized(operation)
569        && hdfsAclHelper.isAclSyncToHdfsEnabled(tableDescriptor);
570  }
571
572  private User getActiveUser(ObserverContext<?> ctx) throws IOException {
573    // for non-rpc handling, fallback to system user
574    Optional<User> optionalUser = ctx.getCaller();
575    if (optionalUser.isPresent()) {
576      return optionalUser.get();
577    }
578    return userProvider.getCurrent();
579  }
580
581  /**
582   * Remove table user access HDFS acl from namespace directory if the user has no permissions of
583   * global, ns of the table or other tables of the ns, eg: Bob has 'ns1:t1' read permission, when
584   * delete 'ns1:t1', if Bob has global read permission, '@ns1' read permission or
585   * 'ns1:other_tables' read permission, then skip remove Bob access acl in ns1Dirs, otherwise,
586   * remove Bob access acl.
587   * @param aclTable acl table
588   * @param tableName the name of the table
589   * @param tablesUsers table users set
590   * @return users whose access acl will be removed from the namespace of the table
591   * @throws IOException if an error occurred
592   */
593  private Set<String> filterUsersToRemoveNsAccessAcl(Table aclTable, TableName tableName,
594      Set<String> tablesUsers) throws IOException {
595    Set<String> removeUsers = new HashSet<>();
596    byte[] namespace = tableName.getNamespace();
597    for (String user : tablesUsers) {
598      List<byte[]> userEntries = SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, user);
599      boolean remove = true;
600      for (byte[] entry : userEntries) {
601        if (PermissionStorage.isGlobalEntry(entry)
602            || (PermissionStorage.isNamespaceEntry(entry)
603                && Bytes.equals(PermissionStorage.fromNamespaceEntry(entry), namespace))
604            || (!Bytes.equals(tableName.getName(), entry)
605                && Bytes.equals(TableName.valueOf(entry).getNamespace(), namespace))) {
606          remove = false;
607          break;
608        }
609      }
610      if (remove) {
611        removeUsers.add(user);
612      }
613    }
614    return removeUsers;
615  }
616
617  static final class SnapshotScannerHDFSAclStorage {
618    /**
619     * Add a new CF in HBase acl table to record if the HBase read permission is synchronized to
620     * related hfiles. The record has two usages: 1. check if we need to remove HDFS acls for a
621     * grant without READ permission(eg: grant user table read permission and then grant user table
622     * write permission without merging the existing permissions, in this case, need to remove HDFS
623     * acls); 2. skip some HDFS acl sync because it may be already set(eg: grant user table read
624     * permission and then grant user ns read permission; grant user table read permission and then
625     * grant user table write permission with merging the existing permissions).
626     */
627    static final byte[] HDFS_ACL_FAMILY = Bytes.toBytes("m");
628    // The value 'R' has no specific meaning, if cell value is not null, it means that the user HDFS
629    // acls is set to hfiles.
630    private static final byte[] HDFS_ACL_VALUE = Bytes.toBytes("R");
631
632    static void addUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
633      addUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
634    }
635
636    static void addUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
637        throws IOException {
638      addUserEntry(aclTable, user, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
639    }
640
641    static void addUserTableHdfsAcl(Connection connection, Set<String> users, TableName tableName)
642        throws IOException {
643      try (Table aclTable = connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
644        for (String user : users) {
645          addUserTableHdfsAcl(aclTable, user, tableName);
646        }
647      }
648    }
649
650    static void addUserTableHdfsAcl(Connection connection, String user, TableName tableName)
651        throws IOException {
652      try (Table aclTable = connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
653        addUserTableHdfsAcl(aclTable, user, tableName);
654      }
655    }
656
657    static void addUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
658        throws IOException {
659      addUserEntry(aclTable, user, tableName.getName());
660    }
661
662    private static void addUserEntry(Table t, String user, byte[] entry) throws IOException {
663      Put p = new Put(entry);
664      p.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(user), HDFS_ACL_VALUE);
665      t.put(p);
666    }
667
668    static void deleteUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
669      deleteUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
670    }
671
672    static void deleteUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
673        throws IOException {
674      deleteUserEntry(aclTable, user, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
675    }
676
677    static void deleteUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
678        throws IOException {
679      deleteUserEntry(aclTable, user, tableName.getName());
680    }
681
682    static void deleteUserTableHdfsAcl(Connection connection, Set<String> users,
683        TableName tableName) throws IOException {
684      try (Table aclTable = connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
685        for (String user : users) {
686          deleteUserTableHdfsAcl(aclTable, user, tableName);
687        }
688      }
689    }
690
691    private static void deleteUserEntry(Table aclTable, String user, byte[] entry)
692        throws IOException {
693      Delete delete = new Delete(entry);
694      delete.addColumns(HDFS_ACL_FAMILY, Bytes.toBytes(user));
695      aclTable.delete(delete);
696    }
697
698    static void deleteNamespaceHdfsAcl(Connection connection, String namespace) throws IOException {
699      try (Table aclTable = connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
700        deleteEntry(aclTable, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
701      }
702    }
703
704    static void deleteTableHdfsAcl(Table aclTable, TableName tableName) throws IOException {
705      deleteEntry(aclTable, tableName.getName());
706    }
707
708    private static void deleteEntry(Table aclTable, byte[] entry) throws IOException {
709      Delete delete = new Delete(entry);
710      delete.addFamily(HDFS_ACL_FAMILY);
711      aclTable.delete(delete);
712    }
713
714    static Set<String> getTableUsers(Table aclTable, TableName tableName) throws IOException {
715      return getEntryUsers(aclTable, tableName.getName());
716    }
717
718    private static Set<String> getEntryUsers(Table aclTable, byte[] entry) throws IOException {
719      Set<String> users = new HashSet<>();
720      Get get = new Get(entry);
721      get.addFamily(HDFS_ACL_FAMILY);
722      Result result = aclTable.get(get);
723      List<Cell> cells = result.listCells();
724      if (cells != null) {
725        for (Cell cell : cells) {
726          if (cell != null) {
727            users.add(Bytes.toString(CellUtil.cloneQualifier(cell)));
728          }
729        }
730      }
731      return users;
732    }
733
734    static Pair<Set<String>, Set<TableName>> getUserNamespaceAndTable(Table aclTable,
735        String userName) throws IOException {
736      Set<String> namespaces = new HashSet<>();
737      Set<TableName> tables = new HashSet<>();
738      List<byte[]> userEntries = getUserEntries(aclTable, userName);
739      for (byte[] entry : userEntries) {
740        if (PermissionStorage.isNamespaceEntry(entry)) {
741          namespaces.add(Bytes.toString(PermissionStorage.fromNamespaceEntry(entry)));
742        } else if (PermissionStorage.isTableEntry(entry)) {
743          tables.add(TableName.valueOf(entry));
744        }
745      }
746      return new Pair<>(namespaces, tables);
747    }
748
749    static List<byte[]> getUserEntries(Table aclTable, String userName) throws IOException {
750      Scan scan = new Scan();
751      scan.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(userName));
752      ResultScanner scanner = aclTable.getScanner(scan);
753      List<byte[]> entry = new ArrayList<>();
754      for (Result result : scanner) {
755        if (result != null && result.getRow() != null) {
756          entry.add(result.getRow());
757        }
758      }
759      return entry;
760    }
761
762    static boolean hasUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
763      return hasUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
764    }
765
766    static boolean hasUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
767        throws IOException {
768      return hasUserEntry(aclTable, user,
769        Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
770    }
771
772    static boolean hasUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
773        throws IOException {
774      return hasUserEntry(aclTable, user, tableName.getName());
775    }
776
777    private static boolean hasUserEntry(Table aclTable, String userName, byte[] entry)
778        throws IOException {
779      Get get = new Get(entry);
780      get.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(userName));
781      return aclTable.exists(get);
782    }
783  }
784}