001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security.access;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Optional;
027import java.util.Set;
028import java.util.stream.Collectors;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.HBaseInterfaceAudience;
035import org.apache.hadoop.hbase.NamespaceDescriptor;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.TableNotFoundException;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.Delete;
042import org.apache.hadoop.hbase.client.Get;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.Result;
046import org.apache.hadoop.hbase.client.ResultScanner;
047import org.apache.hadoop.hbase.client.Scan;
048import org.apache.hadoop.hbase.client.SnapshotDescription;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
053import org.apache.hadoop.hbase.coprocessor.HasMasterServices;
054import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
055import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
056import org.apache.hadoop.hbase.coprocessor.MasterObserver;
057import org.apache.hadoop.hbase.coprocessor.ObserverContext;
058import org.apache.hadoop.hbase.master.MasterServices;
059import org.apache.hadoop.hbase.security.User;
060import org.apache.hadoop.hbase.security.UserProvider;
061import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper.PathHelper;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.Pair;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
069import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
070
071/**
072 * Set HDFS ACLs to hFiles to make HBase granted users have permission to scan snapshot
073 * <p>
074 * To use this feature, please mask sure HDFS config:
075 * <ul>
076 * <li>dfs.namenode.acls.enabled = true</li>
077 * <li>fs.permissions.umask-mode = 027 (or smaller umask than 027)</li>
078 * </ul>
079 * </p>
080 * <p>
081 * The implementation of this feature is as followings:
082 * <ul>
083 * <li>For common directories such as 'data' and 'archive', set other permission to '--x' to make
084 * everyone have the permission to access the directory.</li>
085 * <li>For namespace or table directories such as 'data/ns/table', 'archive/ns/table' and
086 * '.hbase-snapshot/snapshotName', set user 'r-x' access acl and 'r-x' default acl when following
087 * operations happen:
088 * <ul>
089 * <li>grant user with global, namespace or table permission;</li>
090 * <li>revoke user from global, namespace or table;</li>
091 * <li>snapshot table;</li>
092 * <li>truncate table;</li>
093 * </ul>
094 * </li>
095 * <li>Note: Because snapshots are at table level, so this feature just considers users with global,
096 * namespace or table permissions, ignores users with table CF or cell permissions.</li>
097 * </ul>
098 * </p>
099 */
100@CoreCoprocessor
101@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
102public class SnapshotScannerHDFSAclController implements MasterCoprocessor, MasterObserver {
103  private static final Logger LOG = LoggerFactory.getLogger(SnapshotScannerHDFSAclController.class);
104
105  private SnapshotScannerHDFSAclHelper hdfsAclHelper = null;
106  private PathHelper pathHelper = null;
107  private MasterServices masterServices = null;
108  private volatile boolean initialized = false;
109  private volatile boolean aclTableInitialized = false;
110  /** Provider for mapping principal names to Users */
111  private UserProvider userProvider;
112
113  @Override
114  public Optional<MasterObserver> getMasterObserver() {
115    return Optional.of(this);
116  }
117
118  @Override
119  public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> c)
120      throws IOException {
121    if (c.getEnvironment().getConfiguration()
122        .getBoolean(SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE, false)) {
123      MasterCoprocessorEnvironment mEnv = c.getEnvironment();
124      if (!(mEnv instanceof HasMasterServices)) {
125        throw new IOException("Does not implement HMasterServices");
126      }
127      masterServices = ((HasMasterServices) mEnv).getMasterServices();
128      hdfsAclHelper = new SnapshotScannerHDFSAclHelper(masterServices.getConfiguration(),
129          masterServices.getConnection());
130      pathHelper = hdfsAclHelper.getPathHelper();
131      hdfsAclHelper.setCommonDirectoryPermission();
132      initialized = true;
133      userProvider = UserProvider.instantiate(c.getEnvironment().getConfiguration());
134    } else {
135      LOG.warn("Try to initialize the coprocessor SnapshotScannerHDFSAclController but failure "
136          + "because the config " + SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE
137          + " is false.");
138    }
139  }
140
141  @Override
142  public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> c) throws IOException {
143    if (!initialized) {
144      return;
145    }
146    try (Admin admin = c.getEnvironment().getConnection().getAdmin()) {
147      if (admin.tableExists(PermissionStorage.ACL_TABLE_NAME)) {
148        // Check if acl table has 'm' CF, if not, add 'm' CF
149        TableDescriptor tableDescriptor = admin.getDescriptor(PermissionStorage.ACL_TABLE_NAME);
150        boolean containHdfsAclFamily = Arrays.stream(tableDescriptor.getColumnFamilies()).anyMatch(
151          family -> Bytes.equals(family.getName(), SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY));
152        if (!containHdfsAclFamily) {
153          TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDescriptor)
154              .setColumnFamily(ColumnFamilyDescriptorBuilder
155                  .newBuilder(SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY).build());
156          admin.modifyTable(builder.build());
157        }
158        aclTableInitialized = true;
159      } else {
160        throw new TableNotFoundException("Table " + PermissionStorage.ACL_TABLE_NAME
161            + " is not created yet. Please check if " + getClass().getName()
162            + " is configured after " + AccessController.class.getName());
163      }
164    }
165  }
166
167  @Override
168  public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c) {
169    if (initialized) {
170      hdfsAclHelper.close();
171    }
172  }
173
174  @Override
175  public void postCompletedCreateTableAction(ObserverContext<MasterCoprocessorEnvironment> c,
176      TableDescriptor desc, RegionInfo[] regions) throws IOException {
177    if (needHandleTableHdfsAcl(desc, "createTable " + desc.getTableName())) {
178      TableName tableName = desc.getTableName();
179      // 1. Create table directories to make HDFS acls can be inherited
180      hdfsAclHelper.createTableDirectories(tableName);
181      // 2. Add table owner HDFS acls
182      String owner =
183          desc.getOwnerString() == null ? getActiveUser(c).getShortName() : desc.getOwnerString();
184      hdfsAclHelper.addTableAcl(tableName, Sets.newHashSet(owner), "create");
185      // 3. Record table owner permission is synced to HDFS in acl table
186      SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(c.getEnvironment().getConnection(), owner,
187        tableName);
188    }
189  }
190
191  @Override
192  public void postCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> c,
193      NamespaceDescriptor ns) throws IOException {
194    if (checkInitialized("createNamespace " + ns.getName())) {
195      // Create namespace directories to make HDFS acls can be inherited
196      List<Path> paths = hdfsAclHelper.getNamespaceRootPaths(ns.getName());
197      for (Path path : paths) {
198        hdfsAclHelper.createDirIfNotExist(path);
199      }
200    }
201  }
202
203  @Override
204  public void postCompletedSnapshotAction(ObserverContext<MasterCoprocessorEnvironment> c,
205      SnapshotDescription snapshot, TableDescriptor tableDescriptor) throws IOException {
206    if (needHandleTableHdfsAcl(tableDescriptor, "snapshot " + snapshot.getName())) {
207      // Add HDFS acls of users with table read permission to snapshot files
208      hdfsAclHelper.snapshotAcl(snapshot);
209    }
210  }
211
212  @Override
213  public void postCompletedTruncateTableAction(ObserverContext<MasterCoprocessorEnvironment> c,
214      TableName tableName) throws IOException {
215    if (needHandleTableHdfsAcl(tableName, "truncateTable " + tableName)) {
216      // 1. create tmp table directories
217      hdfsAclHelper.createTableDirectories(tableName);
218      // 2. Since the table directories is recreated, so add HDFS acls again
219      Set<String> users = hdfsAclHelper.getUsersWithTableReadAction(tableName, false, false);
220      hdfsAclHelper.addTableAcl(tableName, users, "truncate");
221    }
222  }
223
224  @Override
225  public void postCompletedDeleteTableAction(ObserverContext<MasterCoprocessorEnvironment> ctx,
226      TableName tableName) throws IOException {
227    if (!tableName.isSystemTable() && checkInitialized("deleteTable " + tableName)) {
228      /*
229       * Remove table user access HDFS acl from namespace directory if the user has no permissions
230       * of global, ns of the table or other tables of the ns, eg: Bob has 'ns1:t1' read permission,
231       * when delete 'ns1:t1', if Bob has global read permission, '@ns1' read permission or
232       * 'ns1:other_tables' read permission, then skip remove Bob access acl in ns1Dirs, otherwise,
233       * remove Bob access acl.
234       */
235      try (Table aclTable =
236          ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
237        Set<String> users = SnapshotScannerHDFSAclStorage.getTableUsers(aclTable, tableName);
238        if (users.size() > 0) {
239          // 1. Remove table archive directory default ACLs
240          hdfsAclHelper.removeTableDefaultAcl(tableName, users);
241          // 2. Delete table owner permission is synced to HDFS in acl table
242          SnapshotScannerHDFSAclStorage.deleteTableHdfsAcl(aclTable, tableName);
243          // 3. Remove namespace access acls
244          Set<String> removeUsers = filterUsersToRemoveNsAccessAcl(aclTable, tableName, users);
245          if (removeUsers.size() > 0) {
246            hdfsAclHelper.removeNamespaceAccessAcl(tableName, removeUsers, "delete");
247          }
248        }
249      }
250    }
251  }
252
253  @Override
254  public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
255      TableName tableName, TableDescriptor oldDescriptor, TableDescriptor currentDescriptor)
256      throws IOException {
257    try (Table aclTable =
258        ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
259      if (needHandleTableHdfsAcl(currentDescriptor, "modifyTable " + tableName)
260          && !hdfsAclHelper.isAclSyncToHdfsEnabled(oldDescriptor)) {
261        // 1. Create table directories used for acl inherited
262        hdfsAclHelper.createTableDirectories(tableName);
263        // 2. Add table users HDFS acls
264        Set<String> tableUsers = hdfsAclHelper.getUsersWithTableReadAction(tableName, false, false);
265        Set<String> users =
266            hdfsAclHelper.getUsersWithNamespaceReadAction(tableName.getNamespaceAsString(), true);
267        users.addAll(tableUsers);
268        hdfsAclHelper.addTableAcl(tableName, users, "modify");
269        // 3. Record table user acls are synced to HDFS in acl table
270        SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(ctx.getEnvironment().getConnection(),
271          tableUsers, tableName);
272      } else if (needHandleTableHdfsAcl(oldDescriptor, "modifyTable " + tableName)
273          && !hdfsAclHelper.isAclSyncToHdfsEnabled(currentDescriptor)) {
274        // 1. Remove empty table directories
275        List<Path> tableRootPaths = hdfsAclHelper.getTableRootPaths(tableName, false);
276        for (Path path : tableRootPaths) {
277          hdfsAclHelper.deleteEmptyDir(path);
278        }
279        // 2. Remove all table HDFS acls
280        Set<String> tableUsers = hdfsAclHelper.getUsersWithTableReadAction(tableName, false, false);
281        Set<String> users = hdfsAclHelper
282            .getUsersWithNamespaceReadAction(tableName.getNamespaceAsString(), true);
283        users.addAll(tableUsers);
284        hdfsAclHelper.removeTableAcl(tableName, users);
285        // 3. Remove namespace access HDFS acls for users who only own permission for this table
286        hdfsAclHelper.removeNamespaceAccessAcl(tableName,
287          filterUsersToRemoveNsAccessAcl(aclTable, tableName, tableUsers), "modify");
288        // 4. Record table user acl is not synced to HDFS
289        SnapshotScannerHDFSAclStorage.deleteUserTableHdfsAcl(ctx.getEnvironment().getConnection(),
290          tableUsers, tableName);
291      }
292    }
293  }
294
295  @Override
296  public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
297      String namespace) throws IOException {
298    if (checkInitialized("deleteNamespace " + namespace)) {
299      try (Table aclTable =
300          ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
301        // 1. Delete namespace archive dir default ACLs
302        Set<String> users = SnapshotScannerHDFSAclStorage.getEntryUsers(aclTable,
303          PermissionStorage.toNamespaceEntry(Bytes.toBytes(namespace)));
304        hdfsAclHelper.removeNamespaceDefaultAcl(namespace, users);
305        // 2. Record namespace user acl is not synced to HDFS
306        SnapshotScannerHDFSAclStorage.deleteNamespaceHdfsAcl(ctx.getEnvironment().getConnection(),
307          namespace);
308        // 3. Delete tmp namespace directory
309        /**
310         * Delete namespace tmp directory because it's created by this coprocessor when namespace is
311         * created to make namespace default acl can be inherited by tables. The namespace data
312         * directory is deleted by DeleteNamespaceProcedure, the namespace archive directory is
313         * deleted by HFileCleaner.
314         */
315        hdfsAclHelper.deleteEmptyDir(pathHelper.getTmpNsDir(namespace));
316      }
317    }
318  }
319
320  @Override
321  public void postGrant(ObserverContext<MasterCoprocessorEnvironment> c,
322      UserPermission userPermission, boolean mergeExistingPermissions) throws IOException {
323    if (!checkInitialized(
324      "grant " + userPermission + ", merge existing permissions " + mergeExistingPermissions)) {
325      return;
326    }
327    try (Table aclTable =
328        c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
329      Configuration conf = c.getEnvironment().getConfiguration();
330      String userName = userPermission.getUser();
331      switch (userPermission.getAccessScope()) {
332        case GLOBAL:
333          UserPermission perm = getUserGlobalPermission(conf, userName);
334          if (perm != null && hdfsAclHelper.containReadAction(perm)) {
335            if (!isHdfsAclSet(aclTable, userName)) {
336              // 1. Get namespaces and tables which global user acls are already synced
337              Pair<Set<String>, Set<TableName>> skipNamespaceAndTables =
338                  SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName);
339              Set<String> skipNamespaces = skipNamespaceAndTables.getFirst();
340              Set<TableName> skipTables = skipNamespaceAndTables.getSecond().stream()
341                  .filter(t -> !skipNamespaces.contains(t.getNamespaceAsString()))
342                  .collect(Collectors.toSet());
343              // 2. Add HDFS acl(skip namespaces and tables directories whose acl is set)
344              hdfsAclHelper.grantAcl(userPermission, skipNamespaces, skipTables);
345              // 3. Record global acl is sync to HDFS
346              SnapshotScannerHDFSAclStorage.addUserGlobalHdfsAcl(aclTable, userName);
347            }
348          } else {
349            // The merged user permission doesn't contain READ, so remove user global HDFS acls if
350            // it's set
351            removeUserGlobalHdfsAcl(aclTable, userName, userPermission);
352          }
353          break;
354        case NAMESPACE:
355          String namespace = ((NamespacePermission) userPermission.getPermission()).getNamespace();
356          UserPermission nsPerm = getUserNamespacePermission(conf, userName, namespace);
357          if (nsPerm != null && hdfsAclHelper.containReadAction(nsPerm)) {
358            if (!isHdfsAclSet(aclTable, userName, namespace)) {
359              // 1. Get tables which namespace user acls are already synced
360              Set<TableName> skipTables = SnapshotScannerHDFSAclStorage
361                  .getUserNamespaceAndTable(aclTable, userName).getSecond();
362              // 2. Add HDFS acl(skip tables directories whose acl is set)
363              hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), skipTables);
364            }
365            // 3. Record namespace acl is synced to HDFS
366            SnapshotScannerHDFSAclStorage.addUserNamespaceHdfsAcl(aclTable, userName, namespace);
367          } else {
368            // The merged user permission doesn't contain READ, so remove user namespace HDFS acls
369            // if it's set
370            removeUserNamespaceHdfsAcl(aclTable, userName, namespace, userPermission);
371          }
372          break;
373        case TABLE:
374          TablePermission tablePerm = (TablePermission) userPermission.getPermission();
375          if (needHandleTableHdfsAcl(tablePerm)) {
376            TableName tableName = tablePerm.getTableName();
377            UserPermission tPerm = getUserTablePermission(conf, userName, tableName);
378            if (tPerm != null && hdfsAclHelper.containReadAction(tPerm)) {
379              if (!isHdfsAclSet(aclTable, userName, tableName)) {
380                // 1. create table dirs
381                hdfsAclHelper.createTableDirectories(tableName);
382                // 2. Add HDFS acl
383                hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), new HashSet<>(0));
384              }
385              // 2. Record table acl is synced to HDFS
386              SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(aclTable, userName, tableName);
387            } else {
388              // The merged user permission doesn't contain READ, so remove user table HDFS acls if
389              // it's set
390              removeUserTableHdfsAcl(aclTable, userName, tableName, userPermission);
391            }
392          }
393          break;
394        default:
395          throw new IllegalArgumentException(
396              "Illegal user permission scope " + userPermission.getAccessScope());
397      }
398    }
399  }
400
401  @Override
402  public void postRevoke(ObserverContext<MasterCoprocessorEnvironment> c,
403      UserPermission userPermission) throws IOException {
404    if (checkInitialized("revoke " + userPermission)) {
405      try (Table aclTable =
406          c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
407        String userName = userPermission.getUser();
408        Configuration conf = c.getEnvironment().getConfiguration();
409        switch (userPermission.getAccessScope()) {
410          case GLOBAL:
411            UserPermission userGlobalPerm = getUserGlobalPermission(conf, userName);
412            if (userGlobalPerm == null || !hdfsAclHelper.containReadAction(userGlobalPerm)) {
413              removeUserGlobalHdfsAcl(aclTable, userName, userPermission);
414            }
415            break;
416          case NAMESPACE:
417            NamespacePermission nsPerm = (NamespacePermission) userPermission.getPermission();
418            UserPermission userNsPerm =
419                getUserNamespacePermission(conf, userName, nsPerm.getNamespace());
420            if (userNsPerm == null || !hdfsAclHelper.containReadAction(userNsPerm)) {
421              removeUserNamespaceHdfsAcl(aclTable, userName, nsPerm.getNamespace(), userPermission);
422            }
423            break;
424          case TABLE:
425            TablePermission tPerm = (TablePermission) userPermission.getPermission();
426            if (needHandleTableHdfsAcl(tPerm)) {
427              TableName tableName = tPerm.getTableName();
428              UserPermission userTablePerm = getUserTablePermission(conf, userName, tableName);
429              if (userTablePerm == null || !hdfsAclHelper.containReadAction(userTablePerm)) {
430                removeUserTableHdfsAcl(aclTable, userName, tableName, userPermission);
431              }
432            }
433            break;
434          default:
435            throw new IllegalArgumentException(
436                "Illegal user permission scope " + userPermission.getAccessScope());
437        }
438      }
439    }
440  }
441
442  private void removeUserGlobalHdfsAcl(Table aclTable, String userName,
443      UserPermission userPermission) throws IOException {
444    if (SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) {
445      // 1. Get namespaces and tables which global user acls are already synced
446      Pair<Set<String>, Set<TableName>> namespaceAndTable =
447          SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName);
448      Set<String> skipNamespaces = namespaceAndTable.getFirst();
449      Set<TableName> skipTables = namespaceAndTable.getSecond().stream()
450          .filter(t -> !skipNamespaces.contains(t.getNamespaceAsString()))
451          .collect(Collectors.toSet());
452      // 2. Remove user HDFS acls(skip namespaces and tables directories
453      // whose acl must be reversed)
454      hdfsAclHelper.revokeAcl(userPermission, skipNamespaces, skipTables);
455      // 3. Remove global user acl is synced to HDFS in acl table
456      SnapshotScannerHDFSAclStorage.deleteUserGlobalHdfsAcl(aclTable, userName);
457    }
458  }
459
460  private void removeUserNamespaceHdfsAcl(Table aclTable, String userName, String namespace,
461      UserPermission userPermission) throws IOException {
462    if (SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName, namespace)) {
463      if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) {
464        // 1. Get tables whose namespace user acls are already synced
465        Set<TableName> skipTables =
466            SnapshotScannerHDFSAclStorage.getUserNamespaceAndTable(aclTable, userName).getSecond();
467        // 2. Remove user HDFS acls(skip tables directories whose acl must be reversed)
468        hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(), skipTables);
469      }
470      // 3. Remove namespace user acl is synced to HDFS in acl table
471      SnapshotScannerHDFSAclStorage.deleteUserNamespaceHdfsAcl(aclTable, userName, namespace);
472    }
473  }
474
475  private void removeUserTableHdfsAcl(Table aclTable, String userName, TableName tableName,
476      UserPermission userPermission) throws IOException {
477    if (SnapshotScannerHDFSAclStorage.hasUserTableHdfsAcl(aclTable, userName, tableName)) {
478      if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)
479          && !SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName,
480            tableName.getNamespaceAsString())) {
481        // 1. Remove table acls
482        hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(0), new HashSet<>(0));
483      }
484      // 2. Remove table user acl is synced to HDFS in acl table
485      SnapshotScannerHDFSAclStorage.deleteUserTableHdfsAcl(aclTable, userName, tableName);
486    }
487  }
488
489  private UserPermission getUserGlobalPermission(Configuration conf, String userName)
490      throws IOException {
491    List<UserPermission> permissions = PermissionStorage.getUserPermissions(conf,
492      PermissionStorage.ACL_GLOBAL_NAME, null, null, userName, true);
493    return permissions.size() > 0 ? permissions.get(0) : null;
494  }
495
496  private UserPermission getUserNamespacePermission(Configuration conf, String userName,
497      String namespace) throws IOException {
498    List<UserPermission> permissions =
499        PermissionStorage.getUserNamespacePermissions(conf, namespace, userName, true);
500    return permissions.size() > 0 ? permissions.get(0) : null;
501  }
502
503  private UserPermission getUserTablePermission(Configuration conf, String userName,
504      TableName tableName) throws IOException {
505    List<UserPermission> permissions = PermissionStorage
506        .getUserTablePermissions(conf, tableName, null, null, userName, true).stream()
507        .filter(userPermission -> hdfsAclHelper
508            .isNotFamilyOrQualifierPermission((TablePermission) userPermission.getPermission()))
509        .collect(Collectors.toList());
510    return permissions.size() > 0 ? permissions.get(0) : null;
511  }
512
513  private boolean isHdfsAclSet(Table aclTable, String userName) throws IOException {
514    return isHdfsAclSet(aclTable, userName, null, null);
515  }
516
517  private boolean isHdfsAclSet(Table aclTable, String userName, String namespace)
518      throws IOException {
519    return isHdfsAclSet(aclTable, userName, namespace, null);
520  }
521
522  private boolean isHdfsAclSet(Table aclTable, String userName, TableName tableName)
523      throws IOException {
524    return isHdfsAclSet(aclTable, userName, null, tableName);
525  }
526
527  /**
528   * Check if user global/namespace/table HDFS acls is already set
529   */
530  private boolean isHdfsAclSet(Table aclTable, String userName, String namespace,
531      TableName tableName) throws IOException {
532    boolean isSet = SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName);
533    if (namespace != null) {
534      isSet = isSet
535          || SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName, namespace);
536    }
537    if (tableName != null) {
538      isSet = isSet
539          || SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName,
540            tableName.getNamespaceAsString())
541          || SnapshotScannerHDFSAclStorage.hasUserTableHdfsAcl(aclTable, userName, tableName);
542    }
543    return isSet;
544  }
545
546  @VisibleForTesting
547  boolean checkInitialized(String operation) {
548    if (initialized) {
549      if (aclTableInitialized) {
550        return true;
551      } else {
552        LOG.warn("Skip set HDFS acls because acl table is not initialized when " + operation);
553      }
554    }
555    return false;
556  }
557
558  private boolean needHandleTableHdfsAcl(TablePermission tablePermission) throws IOException {
559    return needHandleTableHdfsAcl(tablePermission.getTableName(), "")
560        && hdfsAclHelper.isNotFamilyOrQualifierPermission(tablePermission);
561  }
562
563  private boolean needHandleTableHdfsAcl(TableName tableName, String operation) throws IOException {
564    return !tableName.isSystemTable() && checkInitialized(operation) && hdfsAclHelper
565        .isAclSyncToHdfsEnabled(masterServices.getTableDescriptors().get(tableName));
566  }
567
568  private boolean needHandleTableHdfsAcl(TableDescriptor tableDescriptor, String operation) {
569    TableName tableName = tableDescriptor.getTableName();
570    return !tableName.isSystemTable() && checkInitialized(operation)
571        && hdfsAclHelper.isAclSyncToHdfsEnabled(tableDescriptor);
572  }
573
574  private User getActiveUser(ObserverContext<?> ctx) throws IOException {
575    // for non-rpc handling, fallback to system user
576    Optional<User> optionalUser = ctx.getCaller();
577    if (optionalUser.isPresent()) {
578      return optionalUser.get();
579    }
580    return userProvider.getCurrent();
581  }
582
583  /**
584   * Remove table user access HDFS acl from namespace directory if the user has no permissions of
585   * global, ns of the table or other tables of the ns, eg: Bob has 'ns1:t1' read permission, when
586   * delete 'ns1:t1', if Bob has global read permission, '@ns1' read permission or
587   * 'ns1:other_tables' read permission, then skip remove Bob access acl in ns1Dirs, otherwise,
588   * remove Bob access acl.
589   * @param aclTable acl table
590   * @param tableName the name of the table
591   * @param tablesUsers table users set
592   * @return users whose access acl will be removed from the namespace of the table
593   * @throws IOException if an error occurred
594   */
595  private Set<String> filterUsersToRemoveNsAccessAcl(Table aclTable, TableName tableName,
596      Set<String> tablesUsers) throws IOException {
597    Set<String> removeUsers = new HashSet<>();
598    byte[] namespace = tableName.getNamespace();
599    for (String user : tablesUsers) {
600      List<byte[]> userEntries = SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, user);
601      boolean remove = true;
602      for (byte[] entry : userEntries) {
603        if (PermissionStorage.isGlobalEntry(entry)
604            || (PermissionStorage.isNamespaceEntry(entry)
605                && Bytes.equals(PermissionStorage.fromNamespaceEntry(entry), namespace))
606            || (!Bytes.equals(tableName.getName(), entry)
607                && Bytes.equals(TableName.valueOf(entry).getNamespace(), namespace))) {
608          remove = false;
609          break;
610        }
611      }
612      if (remove) {
613        removeUsers.add(user);
614      }
615    }
616    return removeUsers;
617  }
618
619  static final class SnapshotScannerHDFSAclStorage {
620    /**
621     * Add a new CF in HBase acl table to record if the HBase read permission is synchronized to
622     * related hfiles. The record has two usages: 1. check if we need to remove HDFS acls for a
623     * grant without READ permission(eg: grant user table read permission and then grant user table
624     * write permission without merging the existing permissions, in this case, need to remove HDFS
625     * acls); 2. skip some HDFS acl sync because it may be already set(eg: grant user table read
626     * permission and then grant user ns read permission; grant user table read permission and then
627     * grant user table write permission with merging the existing permissions).
628     */
629    static final byte[] HDFS_ACL_FAMILY = Bytes.toBytes("m");
630    // The value 'R' has no specific meaning, if cell value is not null, it means that the user HDFS
631    // acls is set to hfiles.
632    private static final byte[] HDFS_ACL_VALUE = Bytes.toBytes("R");
633
634    static void addUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
635      addUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
636    }
637
638    static void addUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
639        throws IOException {
640      addUserEntry(aclTable, user, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
641    }
642
643    static void addUserTableHdfsAcl(Connection connection, Set<String> users, TableName tableName)
644        throws IOException {
645      try (Table aclTable = connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
646        for (String user : users) {
647          addUserTableHdfsAcl(aclTable, user, tableName);
648        }
649      }
650    }
651
652    static void addUserTableHdfsAcl(Connection connection, String user, TableName tableName)
653        throws IOException {
654      try (Table aclTable = connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
655        addUserTableHdfsAcl(aclTable, user, tableName);
656      }
657    }
658
659    static void addUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
660        throws IOException {
661      addUserEntry(aclTable, user, tableName.getName());
662    }
663
664    private static void addUserEntry(Table t, String user, byte[] entry) throws IOException {
665      Put p = new Put(entry);
666      p.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(user), HDFS_ACL_VALUE);
667      t.put(p);
668    }
669
670    static void deleteUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
671      deleteUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
672    }
673
674    static void deleteUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
675        throws IOException {
676      deleteUserEntry(aclTable, user, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
677    }
678
679    static void deleteUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
680        throws IOException {
681      deleteUserEntry(aclTable, user, tableName.getName());
682    }
683
684    static void deleteUserTableHdfsAcl(Connection connection, Set<String> users,
685        TableName tableName) throws IOException {
686      try (Table aclTable = connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
687        for (String user : users) {
688          deleteUserTableHdfsAcl(aclTable, user, tableName);
689        }
690      }
691    }
692
693    private static void deleteUserEntry(Table aclTable, String user, byte[] entry)
694        throws IOException {
695      Delete delete = new Delete(entry);
696      delete.addColumns(HDFS_ACL_FAMILY, Bytes.toBytes(user));
697      aclTable.delete(delete);
698    }
699
700    static void deleteNamespaceHdfsAcl(Connection connection, String namespace) throws IOException {
701      try (Table aclTable = connection.getTable(PermissionStorage.ACL_TABLE_NAME)) {
702        deleteEntry(aclTable, Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
703      }
704    }
705
706    static void deleteTableHdfsAcl(Table aclTable, TableName tableName) throws IOException {
707      deleteEntry(aclTable, tableName.getName());
708    }
709
710    private static void deleteEntry(Table aclTable, byte[] entry) throws IOException {
711      Delete delete = new Delete(entry);
712      delete.addFamily(HDFS_ACL_FAMILY);
713      aclTable.delete(delete);
714    }
715
716    static Set<String> getTableUsers(Table aclTable, TableName tableName) throws IOException {
717      return getEntryUsers(aclTable, tableName.getName());
718    }
719
720    private static Set<String> getEntryUsers(Table aclTable, byte[] entry) throws IOException {
721      Set<String> users = new HashSet<>();
722      Get get = new Get(entry);
723      get.addFamily(HDFS_ACL_FAMILY);
724      Result result = aclTable.get(get);
725      List<Cell> cells = result.listCells();
726      if (cells != null) {
727        for (Cell cell : cells) {
728          if (cell != null) {
729            users.add(Bytes.toString(CellUtil.cloneQualifier(cell)));
730          }
731        }
732      }
733      return users;
734    }
735
736    static Pair<Set<String>, Set<TableName>> getUserNamespaceAndTable(Table aclTable,
737        String userName) throws IOException {
738      Set<String> namespaces = new HashSet<>();
739      Set<TableName> tables = new HashSet<>();
740      List<byte[]> userEntries = getUserEntries(aclTable, userName);
741      for (byte[] entry : userEntries) {
742        if (PermissionStorage.isNamespaceEntry(entry)) {
743          namespaces.add(Bytes.toString(PermissionStorage.fromNamespaceEntry(entry)));
744        } else if (PermissionStorage.isTableEntry(entry)) {
745          tables.add(TableName.valueOf(entry));
746        }
747      }
748      return new Pair<>(namespaces, tables);
749    }
750
751    static List<byte[]> getUserEntries(Table aclTable, String userName) throws IOException {
752      Scan scan = new Scan();
753      scan.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(userName));
754      ResultScanner scanner = aclTable.getScanner(scan);
755      List<byte[]> entry = new ArrayList<>();
756      for (Result result : scanner) {
757        if (result != null && result.getRow() != null) {
758          entry.add(result.getRow());
759        }
760      }
761      return entry;
762    }
763
764    static boolean hasUserGlobalHdfsAcl(Table aclTable, String user) throws IOException {
765      return hasUserEntry(aclTable, user, PermissionStorage.ACL_GLOBAL_NAME);
766    }
767
768    static boolean hasUserNamespaceHdfsAcl(Table aclTable, String user, String namespace)
769        throws IOException {
770      return hasUserEntry(aclTable, user,
771        Bytes.toBytes(PermissionStorage.toNamespaceEntry(namespace)));
772    }
773
774    static boolean hasUserTableHdfsAcl(Table aclTable, String user, TableName tableName)
775        throws IOException {
776      return hasUserEntry(aclTable, user, tableName.getName());
777    }
778
779    private static boolean hasUserEntry(Table aclTable, String userName, byte[] entry)
780        throws IOException {
781      Get get = new Get(entry);
782      get.addColumn(HDFS_ACL_FAMILY, Bytes.toBytes(userName));
783      return aclTable.exists(get);
784    }
785  }
786}