001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.access;
019
020import java.io.IOException;
021import java.security.PrivilegedExceptionAction;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.Optional;
031import java.util.Set;
032import java.util.TreeMap;
033import java.util.TreeSet;
034import java.util.stream.Collectors;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.ArrayBackedTag;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellScanner;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.CompareOperator;
041import org.apache.hadoop.hbase.CompoundConfiguration;
042import org.apache.hadoop.hbase.CoprocessorEnvironment;
043import org.apache.hadoop.hbase.DoNotRetryIOException;
044import org.apache.hadoop.hbase.HBaseInterfaceAudience;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.KeyValue;
047import org.apache.hadoop.hbase.KeyValue.Type;
048import org.apache.hadoop.hbase.NamespaceDescriptor;
049import org.apache.hadoop.hbase.PrivateCellUtil;
050import org.apache.hadoop.hbase.ServerName;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.Tag;
053import org.apache.hadoop.hbase.client.Admin;
054import org.apache.hadoop.hbase.client.Append;
055import org.apache.hadoop.hbase.client.BalanceRequest;
056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
058import org.apache.hadoop.hbase.client.Delete;
059import org.apache.hadoop.hbase.client.Durability;
060import org.apache.hadoop.hbase.client.Get;
061import org.apache.hadoop.hbase.client.Increment;
062import org.apache.hadoop.hbase.client.MasterSwitchType;
063import org.apache.hadoop.hbase.client.Mutation;
064import org.apache.hadoop.hbase.client.Put;
065import org.apache.hadoop.hbase.client.Query;
066import org.apache.hadoop.hbase.client.RegionInfo;
067import org.apache.hadoop.hbase.client.Result;
068import org.apache.hadoop.hbase.client.Scan;
069import org.apache.hadoop.hbase.client.SnapshotDescription;
070import org.apache.hadoop.hbase.client.Table;
071import org.apache.hadoop.hbase.client.TableDescriptor;
072import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
073import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
074import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
075import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
076import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
077import org.apache.hadoop.hbase.coprocessor.HasMasterServices;
078import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
079import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
080import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
081import org.apache.hadoop.hbase.coprocessor.MasterObserver;
082import org.apache.hadoop.hbase.coprocessor.ObserverContext;
083import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
084import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
085import org.apache.hadoop.hbase.coprocessor.RegionObserver;
086import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
087import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
088import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
089import org.apache.hadoop.hbase.filter.ByteArrayComparable;
090import org.apache.hadoop.hbase.filter.Filter;
091import org.apache.hadoop.hbase.filter.FilterList;
092import org.apache.hadoop.hbase.io.hfile.HFile;
093import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
094import org.apache.hadoop.hbase.ipc.RpcServer;
095import org.apache.hadoop.hbase.master.MasterServices;
096import org.apache.hadoop.hbase.net.Address;
097import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings;
098import org.apache.hadoop.hbase.regionserver.BloomType;
099import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
100import org.apache.hadoop.hbase.regionserver.InternalScanner;
101import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
102import org.apache.hadoop.hbase.regionserver.Region;
103import org.apache.hadoop.hbase.regionserver.RegionScanner;
104import org.apache.hadoop.hbase.regionserver.RegionServerServices;
105import org.apache.hadoop.hbase.regionserver.ScanType;
106import org.apache.hadoop.hbase.regionserver.ScannerContext;
107import org.apache.hadoop.hbase.regionserver.Store;
108import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
109import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
110import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
111import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
112import org.apache.hadoop.hbase.replication.SyncReplicationState;
113import org.apache.hadoop.hbase.security.AccessDeniedException;
114import org.apache.hadoop.hbase.security.Superusers;
115import org.apache.hadoop.hbase.security.User;
116import org.apache.hadoop.hbase.security.UserProvider;
117import org.apache.hadoop.hbase.security.access.Permission.Action;
118import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
119import org.apache.hadoop.hbase.util.ByteRange;
120import org.apache.hadoop.hbase.util.Bytes;
121import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
122import org.apache.hadoop.hbase.util.Pair;
123import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
124import org.apache.hadoop.hbase.wal.WALEdit;
125import org.apache.yetus.audience.InterfaceAudience;
126import org.slf4j.Logger;
127import org.slf4j.LoggerFactory;
128
129import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
130import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
131import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
132import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
133import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
134import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
135import org.apache.hbase.thirdparty.com.google.protobuf.Message;
136import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
137import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
138import org.apache.hbase.thirdparty.com.google.protobuf.Service;
139
140import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
141import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.AccessControlService;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasPermissionRequest;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasPermissionResponse;
146
147/**
148 * Provides basic authorization checks for data access and administrative operations.
149 * <p>
150 * {@code AccessController} performs authorization checks for HBase operations based on:
151 * </p>
152 * <ul>
153 * <li>the identity of the user performing the operation</li>
154 * <li>the scope over which the operation is performed, in increasing specificity: global, table,
155 * column family, or qualifier</li>
156 * <li>the type of action being performed (as mapped to {@link Permission.Action} values)</li>
157 * </ul>
158 * <p>
159 * If the authorization check fails, an {@link AccessDeniedException} will be thrown for the
160 * operation.
161 * </p>
162 * <p>
163 * To perform authorization checks, {@code AccessController} relies on the RpcServerEngine being
164 * loaded to provide the user identities for remote requests.
165 * </p>
166 * <p>
167 * The access control lists used for authorization can be manipulated via the exposed
168 * {@link AccessControlService} Interface implementation, and the associated {@code grant},
169 * {@code revoke}, and {@code user_permission} HBase shell commands.
170 * </p>
171 */
172@CoreCoprocessor
173@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
174public class AccessController implements MasterCoprocessor, RegionCoprocessor,
175  RegionServerCoprocessor, AccessControlService.Interface, MasterObserver, RegionObserver,
176  RegionServerObserver, EndpointObserver, BulkLoadObserver {
177  // TODO: encapsulate observer functions into separate class/sub-class.
178
179  private static final Logger LOG = LoggerFactory.getLogger(AccessController.class);
180
181  private static final Logger AUDITLOG =
182    LoggerFactory.getLogger("SecurityLogger." + AccessController.class.getName());
183  private static final String CHECK_COVERING_PERM = "check_covering_perm";
184  private static final String TAG_CHECK_PASSED = "tag_check_passed";
185  private static final byte[] TRUE = Bytes.toBytes(true);
186
187  private AccessChecker accessChecker;
188  private ZKPermissionWatcher zkPermissionWatcher;
189
190  /** flags if we are running on a region of the _acl_ table */
191  private boolean aclRegion = false;
192
193  /**
194   * defined only for Endpoint implementation, so it can have way to access region services
195   */
196  private RegionCoprocessorEnvironment regionEnv;
197
198  /** Mapping of scanner instances to the user who created them */
199  private Map<InternalScanner, String> scannerOwners = new MapMaker().weakKeys().makeMap();
200
201  private Map<TableName, List<UserPermission>> tableAcls;
202
203  /** Provider for mapping principal names to Users */
204  private UserProvider userProvider;
205
206  /**
207   * if we are active, usually false, only true if "hbase.security.authorization" has been set to
208   * true in site configuration
209   */
210  private boolean authorizationEnabled;
211
212  /** if we are able to support cell ACLs */
213  private boolean cellFeaturesEnabled;
214
215  /** if we should check EXEC permissions */
216  private boolean shouldCheckExecPermission;
217
218  /**
219   * if we should terminate access checks early as soon as table or CF grants allow access; pre-0.98
220   * compatible behavior
221   */
222  private boolean compatibleEarlyTermination;
223
224  /** if we have been successfully initialized */
225  private volatile boolean initialized = false;
226
227  /** if the ACL table is available, only relevant in the master */
228  private volatile boolean aclTabAvailable = false;
229
230  public static boolean isCellAuthorizationSupported(Configuration conf) {
231    return AccessChecker.isAuthorizationSupported(conf)
232      && (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
233  }
234
235  public Region getRegion() {
236    return regionEnv != null ? regionEnv.getRegion() : null;
237  }
238
239  public AuthManager getAuthManager() {
240    return accessChecker.getAuthManager();
241  }
242
243  private void initialize(RegionCoprocessorEnvironment e) throws IOException {
244    final Region region = e.getRegion();
245    Configuration conf = e.getConfiguration();
246    Map<byte[], ListMultimap<String, UserPermission>> tables = PermissionStorage.loadAll(region);
247    // For each table, write out the table's permissions to the respective
248    // znode for that table.
249    for (Map.Entry<byte[], ListMultimap<String, UserPermission>> t : tables.entrySet()) {
250      byte[] entry = t.getKey();
251      ListMultimap<String, UserPermission> perms = t.getValue();
252      byte[] serialized = PermissionStorage.writePermissionsAsBytes(perms, conf);
253      zkPermissionWatcher.writeToZookeeper(entry, serialized);
254    }
255    initialized = true;
256  }
257
258  /**
259   * Writes all table ACLs for the tables in the given Map up into ZooKeeper znodes. This is called
260   * to synchronize ACL changes following {@code _acl_} table updates.
261   */
262  private void updateACL(RegionCoprocessorEnvironment e, final Map<byte[], List<Cell>> familyMap) {
263    Set<byte[]> entries = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
264    for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
265      List<Cell> cells = f.getValue();
266      for (Cell cell : cells) {
267        if (CellUtil.matchingFamily(cell, PermissionStorage.ACL_LIST_FAMILY)) {
268          entries.add(CellUtil.cloneRow(cell));
269        }
270      }
271    }
272    Configuration conf = regionEnv.getConfiguration();
273    byte[] currentEntry = null;
274    // TODO: Here we are already on the ACL region. (And it is single
275    // region) We can even just get the region from the env and do get
276    // directly. The short circuit connection would avoid the RPC overhead
277    // so no socket communication, req write/read .. But we have the PB
278    // to and fro conversion overhead. get req is converted to PB req
279    // and results are converted to PB results 1st and then to POJOs
280    // again. We could have avoided such at least in ACL table context..
281    try (Table t = e.getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
282      for (byte[] entry : entries) {
283        currentEntry = entry;
284        ListMultimap<String, UserPermission> perms =
285          PermissionStorage.getPermissions(conf, entry, t, null, null, null, false);
286        byte[] serialized = PermissionStorage.writePermissionsAsBytes(perms, conf);
287        zkPermissionWatcher.writeToZookeeper(entry, serialized);
288      }
289    } catch (IOException ex) {
290      LOG.error("Failed updating permissions mirror for '"
291        + (currentEntry == null ? "null" : Bytes.toString(currentEntry)) + "'", ex);
292    }
293  }
294
295  /**
296   * Check the current user for authorization to perform a specific action against the given set of
297   * row data.
298   * @param opType   the operation type
299   * @param user     the user
300   * @param e        the coprocessor environment
301   * @param families the map of column families to qualifiers present in the request
302   * @param actions  the desired actions
303   * @return an authorization result
304   */
305  private AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
306    Map<byte[], ? extends Collection<?>> families, Action... actions) {
307    AuthResult result = null;
308    for (Action action : actions) {
309      result = accessChecker.permissionGranted(opType.toString(), user, action,
310        e.getRegion().getRegionInfo().getTable(), families);
311      if (!result.isAllowed()) {
312        return result;
313      }
314    }
315    return result;
316  }
317
318  public void requireAccess(ObserverContext<?> ctx, String request, TableName tableName,
319    Action... permissions) throws IOException {
320    accessChecker.requireAccess(getActiveUser(ctx), request, tableName, permissions);
321  }
322
323  public void requirePermission(ObserverContext<?> ctx, String request, Action perm)
324    throws IOException {
325    accessChecker.requirePermission(getActiveUser(ctx), request, null, perm);
326  }
327
328  public void requireGlobalPermission(ObserverContext<?> ctx, String request, Action perm,
329    TableName tableName, Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
330    accessChecker.requireGlobalPermission(getActiveUser(ctx), request, perm, tableName, familyMap,
331      null);
332  }
333
334  public void requireGlobalPermission(ObserverContext<?> ctx, String request, Action perm,
335    String namespace) throws IOException {
336    accessChecker.requireGlobalPermission(getActiveUser(ctx), request, perm, namespace);
337  }
338
339  public void requireNamespacePermission(ObserverContext<?> ctx, String request, String namespace,
340    Action... permissions) throws IOException {
341    accessChecker.requireNamespacePermission(getActiveUser(ctx), request, namespace, null,
342      permissions);
343  }
344
345  public void requireNamespacePermission(ObserverContext<?> ctx, String request, String namespace,
346    TableName tableName, Map<byte[], ? extends Collection<byte[]>> familyMap, Action... permissions)
347    throws IOException {
348    accessChecker.requireNamespacePermission(getActiveUser(ctx), request, namespace, tableName,
349      familyMap, permissions);
350  }
351
352  public void requirePermission(ObserverContext<?> ctx, String request, TableName tableName,
353    byte[] family, byte[] qualifier, Action... permissions) throws IOException {
354    accessChecker.requirePermission(getActiveUser(ctx), request, tableName, family, qualifier, null,
355      permissions);
356  }
357
358  public void requireTablePermission(ObserverContext<?> ctx, String request, TableName tableName,
359    byte[] family, byte[] qualifier, Action... permissions) throws IOException {
360    accessChecker.requireTablePermission(getActiveUser(ctx), request, tableName, family, qualifier,
361      permissions);
362  }
363
364  public void checkLockPermissions(ObserverContext<?> ctx, String namespace, TableName tableName,
365    RegionInfo[] regionInfos, String reason) throws IOException {
366    accessChecker.checkLockPermissions(getActiveUser(ctx), namespace, tableName, regionInfos,
367      reason);
368  }
369
370  /**
371   * Returns <code>true</code> if the current user is allowed the given action over at least one of
372   * the column qualifiers in the given column families.
373   */
374  private boolean hasFamilyQualifierPermission(User user, Action perm,
375    RegionCoprocessorEnvironment env, Map<byte[], ? extends Collection<byte[]>> familyMap)
376    throws IOException {
377    RegionInfo hri = env.getRegion().getRegionInfo();
378    TableName tableName = hri.getTable();
379
380    if (user == null) {
381      return false;
382    }
383
384    if (familyMap != null && familyMap.size() > 0) {
385      // at least one family must be allowed
386      for (Map.Entry<byte[], ? extends Collection<byte[]>> family : familyMap.entrySet()) {
387        if (family.getValue() != null && !family.getValue().isEmpty()) {
388          for (byte[] qualifier : family.getValue()) {
389            if (
390              getAuthManager().authorizeUserTable(user, tableName, family.getKey(), qualifier, perm)
391            ) {
392              return true;
393            }
394          }
395        } else {
396          if (getAuthManager().authorizeUserFamily(user, tableName, family.getKey(), perm)) {
397            return true;
398          }
399        }
400      }
401    } else if (LOG.isDebugEnabled()) {
402      LOG.debug("Empty family map passed for permission check");
403    }
404
405    return false;
406  }
407
408  private enum OpType {
409    GET("get"),
410    EXISTS("exists"),
411    SCAN("scan"),
412    PUT("put"),
413    DELETE("delete"),
414    CHECK_AND_PUT("checkAndPut"),
415    CHECK_AND_DELETE("checkAndDelete"),
416    APPEND("append"),
417    INCREMENT("increment");
418
419    private String type;
420
421    private OpType(String type) {
422      this.type = type;
423    }
424
425    @Override
426    public String toString() {
427      return type;
428    }
429  }
430
431  /**
432   * Determine if cell ACLs covered by the operation grant access. This is expensive.
433   * @return false if cell ACLs failed to grant access, true otherwise
434   */
435  private boolean checkCoveringPermission(User user, OpType request, RegionCoprocessorEnvironment e,
436    byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
437    throws IOException {
438    if (!cellFeaturesEnabled) {
439      return false;
440    }
441    long cellGrants = 0;
442    long latestCellTs = 0;
443    Get get = new Get(row);
444    // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
445    // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
446    // version. We have to get every cell version and check its TS against the TS asked for in
447    // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
448    // consider only one such passing cell. In case of Delete we have to consider all the cell
449    // versions under this passing version. When Delete Mutation contains columns which are a
450    // version delete just consider only one version for those column cells.
451    boolean considerCellTs = (request == OpType.PUT || request == OpType.DELETE);
452    if (considerCellTs) {
453      get.readAllVersions();
454    } else {
455      get.readVersions(1);
456    }
457    boolean diffCellTsFromOpTs = false;
458    for (Map.Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
459      byte[] col = entry.getKey();
460      // TODO: HBASE-7114 could possibly unify the collection type in family
461      // maps so we would not need to do this
462      if (entry.getValue() instanceof Set) {
463        Set<byte[]> set = (Set<byte[]>) entry.getValue();
464        if (set == null || set.isEmpty()) {
465          get.addFamily(col);
466        } else {
467          for (byte[] qual : set) {
468            get.addColumn(col, qual);
469          }
470        }
471      } else if (entry.getValue() instanceof List) {
472        List<Cell> list = (List<Cell>) entry.getValue();
473        if (list == null || list.isEmpty()) {
474          get.addFamily(col);
475        } else {
476          // In case of family delete, a Cell will be added into the list with Qualifier as null.
477          for (Cell cell : list) {
478            if (
479              cell.getQualifierLength() == 0 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
480                || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())
481            ) {
482              get.addFamily(col);
483            } else {
484              get.addColumn(col, CellUtil.cloneQualifier(cell));
485            }
486            if (considerCellTs) {
487              long cellTs = cell.getTimestamp();
488              latestCellTs = Math.max(latestCellTs, cellTs);
489              diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
490            }
491          }
492        }
493      } else if (entry.getValue() == null) {
494        get.addFamily(col);
495      } else {
496        throw new RuntimeException(
497          "Unhandled collection type " + entry.getValue().getClass().getName());
498      }
499    }
500    // We want to avoid looking into the future. So, if the cells of the
501    // operation specify a timestamp, or the operation itself specifies a
502    // timestamp, then we use the maximum ts found. Otherwise, we bound
503    // the Get to the current server time. We add 1 to the timerange since
504    // the upper bound of a timerange is exclusive yet we need to examine
505    // any cells found there inclusively.
506    long latestTs = Math.max(opTs, latestCellTs);
507    if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
508      latestTs = EnvironmentEdgeManager.currentTime();
509    }
510    get.setTimeRange(0, latestTs + 1);
511    // In case of Put operation we set to read all versions. This was done to consider the case
512    // where columns are added with TS other than the Mutation TS. But normally this wont be the
513    // case with Put. There no need to get all versions but get latest version only.
514    if (!diffCellTsFromOpTs && request == OpType.PUT) {
515      get.readVersions(1);
516    }
517    if (LOG.isTraceEnabled()) {
518      LOG.trace("Scanning for cells with " + get);
519    }
520    // This Map is identical to familyMap. The key is a BR rather than byte[].
521    // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
522    // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
523    Map<ByteRange, List<Cell>> familyMap1 = new HashMap<>();
524    for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
525      if (entry.getValue() instanceof List) {
526        familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
527      }
528    }
529    RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
530    List<Cell> cells = Lists.newArrayList();
531    Cell prevCell = null;
532    ByteRange curFam = new SimpleMutableByteRange();
533    boolean curColAllVersions = (request == OpType.DELETE);
534    long curColCheckTs = opTs;
535    boolean foundColumn = false;
536    try {
537      boolean more = false;
538      ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
539
540      do {
541        cells.clear();
542        // scan with limit as 1 to hold down memory use on wide rows
543        more = scanner.next(cells, scannerContext);
544        for (Cell cell : cells) {
545          if (LOG.isTraceEnabled()) {
546            LOG.trace("Found cell " + cell);
547          }
548          boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
549          if (colChange) foundColumn = false;
550          prevCell = cell;
551          if (!curColAllVersions && foundColumn) {
552            continue;
553          }
554          if (colChange && considerCellTs) {
555            curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
556            List<Cell> cols = familyMap1.get(curFam);
557            for (Cell col : cols) {
558              // null/empty qualifier is used to denote a Family delete. The TS and delete type
559              // associated with this is applicable for all columns within the family. That is
560              // why the below (col.getQualifierLength() == 0) check.
561              if (
562                (col.getQualifierLength() == 0 && request == OpType.DELETE)
563                  || CellUtil.matchingQualifier(cell, col)
564              ) {
565                byte type = col.getTypeByte();
566                if (considerCellTs) {
567                  curColCheckTs = col.getTimestamp();
568                }
569                // For a Delete op we pass allVersions as true. When a Delete Mutation contains
570                // a version delete for a column no need to check all the covering cells within
571                // that column. Check all versions when Type is DeleteColumn or DeleteFamily
572                // One version delete types are Delete/DeleteFamilyVersion
573                curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
574                  || (KeyValue.Type.DeleteFamily.getCode() == type);
575                break;
576              }
577            }
578          }
579          if (cell.getTimestamp() > curColCheckTs) {
580            // Just ignore this cell. This is not a covering cell.
581            continue;
582          }
583          foundColumn = true;
584          for (Action action : actions) {
585            // Are there permissions for this user for the cell?
586            if (!getAuthManager().authorizeCell(user, getTableName(e), cell, action)) {
587              // We can stop if the cell ACL denies access
588              return false;
589            }
590          }
591          cellGrants++;
592        }
593      } while (more);
594    } catch (AccessDeniedException ex) {
595      throw ex;
596    } catch (IOException ex) {
597      LOG.error("Exception while getting cells to calculate covering permission", ex);
598    } finally {
599      scanner.close();
600    }
601    // We should not authorize unless we have found one or more cell ACLs that
602    // grant access. This code is used to check for additional permissions
603    // after no table or CF grants are found.
604    return cellGrants > 0;
605  }
606
607  private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
608    // Iterate over the entries in the familyMap, replacing the cells therein
609    // with new cells including the ACL data
610    for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
611      List<Cell> newCells = Lists.newArrayList();
612      for (Cell cell : e.getValue()) {
613        // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
614        List<Tag> tags = new ArrayList<>();
615        tags.add(new ArrayBackedTag(PermissionStorage.ACL_TAG_TYPE, perms));
616        Iterator<Tag> tagIterator = PrivateCellUtil.tagsIterator(cell);
617        while (tagIterator.hasNext()) {
618          tags.add(tagIterator.next());
619        }
620        newCells.add(PrivateCellUtil.createCell(cell, tags));
621      }
622      // This is supposed to be safe, won't CME
623      e.setValue(newCells);
624    }
625  }
626
627  // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
628  // type is reserved and should not be explicitly set by user.
629  private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
630    // No need to check if we're not going to throw
631    if (!authorizationEnabled) {
632      m.setAttribute(TAG_CHECK_PASSED, TRUE);
633      return;
634    }
635    // Superusers are allowed to store cells unconditionally.
636    if (Superusers.isSuperUser(user)) {
637      m.setAttribute(TAG_CHECK_PASSED, TRUE);
638      return;
639    }
640    // We already checked (prePut vs preBatchMutation)
641    if (m.getAttribute(TAG_CHECK_PASSED) != null) {
642      return;
643    }
644    for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
645      Iterator<Tag> tagsItr = PrivateCellUtil.tagsIterator(cellScanner.current());
646      while (tagsItr.hasNext()) {
647        if (tagsItr.next().getType() == PermissionStorage.ACL_TAG_TYPE) {
648          throw new AccessDeniedException("Mutation contains cell with reserved type tag");
649        }
650      }
651    }
652    m.setAttribute(TAG_CHECK_PASSED, TRUE);
653  }
654
655  /* ---- MasterObserver implementation ---- */
656  @Override
657  public void start(CoprocessorEnvironment env) throws IOException {
658    CompoundConfiguration conf = new CompoundConfiguration();
659    conf.add(env.getConfiguration());
660
661    authorizationEnabled = AccessChecker.isAuthorizationSupported(conf);
662    if (!authorizationEnabled) {
663      LOG.warn("AccessController has been loaded with authorization checks DISABLED!");
664    }
665
666    shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
667      AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
668
669    cellFeaturesEnabled = (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
670    if (!cellFeaturesEnabled) {
671      LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
672        + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
673        + " accordingly.");
674    }
675
676    if (env instanceof MasterCoprocessorEnvironment) {
677      // if running on HMaster
678      MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
679      if (mEnv instanceof HasMasterServices) {
680        MasterServices masterServices = ((HasMasterServices) mEnv).getMasterServices();
681        zkPermissionWatcher = masterServices.getZKPermissionWatcher();
682        accessChecker = masterServices.getAccessChecker();
683      }
684    } else if (env instanceof RegionServerCoprocessorEnvironment) {
685      RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
686      if (rsEnv instanceof HasRegionServerServices) {
687        RegionServerServices rsServices =
688          ((HasRegionServerServices) rsEnv).getRegionServerServices();
689        zkPermissionWatcher = rsServices.getZKPermissionWatcher();
690        accessChecker = rsServices.getAccessChecker();
691      }
692    } else if (env instanceof RegionCoprocessorEnvironment) {
693      // if running at region
694      regionEnv = (RegionCoprocessorEnvironment) env;
695      conf.addBytesMap(regionEnv.getRegion().getTableDescriptor().getValues());
696      compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
697        AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
698      if (regionEnv instanceof HasRegionServerServices) {
699        RegionServerServices rsServices =
700          ((HasRegionServerServices) regionEnv).getRegionServerServices();
701        zkPermissionWatcher = rsServices.getZKPermissionWatcher();
702        accessChecker = rsServices.getAccessChecker();
703      }
704    }
705
706    Preconditions.checkState(zkPermissionWatcher != null, "ZKPermissionWatcher is null");
707    Preconditions.checkState(accessChecker != null, "AccessChecker is null");
708
709    // set the user-provider.
710    this.userProvider = UserProvider.instantiate(env.getConfiguration());
711    tableAcls = new MapMaker().weakValues().makeMap();
712  }
713
714  @Override
715  public void stop(CoprocessorEnvironment env) {
716  }
717
718  /*********************************** Observer/Service Getters ***********************************/
719  @Override
720  public Optional<RegionObserver> getRegionObserver() {
721    return Optional.of(this);
722  }
723
724  @Override
725  public Optional<MasterObserver> getMasterObserver() {
726    return Optional.of(this);
727  }
728
729  @Override
730  public Optional<EndpointObserver> getEndpointObserver() {
731    return Optional.of(this);
732  }
733
734  @Override
735  public Optional<BulkLoadObserver> getBulkLoadObserver() {
736    return Optional.of(this);
737  }
738
739  @Override
740  public Optional<RegionServerObserver> getRegionServerObserver() {
741    return Optional.of(this);
742  }
743
744  @Override
745  public Iterable<Service> getServices() {
746    return Collections
747      .singleton(AccessControlProtos.AccessControlService.newReflectiveService(this));
748  }
749
750  /*********************************** Observer implementations ***********************************/
751
752  @Override
753  public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c, TableDescriptor desc,
754    RegionInfo[] regions) throws IOException {
755    Set<byte[]> families = desc.getColumnFamilyNames();
756    Map<byte[], Set<byte[]>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
757    for (byte[] family : families) {
758      familyMap.put(family, null);
759    }
760    requireNamespacePermission(c, "createTable", desc.getTableName().getNamespaceAsString(),
761      desc.getTableName(), familyMap, Action.ADMIN, Action.CREATE);
762  }
763
764  @Override
765  public void postCompletedCreateTableAction(final ObserverContext<MasterCoprocessorEnvironment> c,
766    final TableDescriptor desc, final RegionInfo[] regions) throws IOException {
767    // When AC is used, it should be configured as the 1st CP.
768    // In Master, the table operations like create, are handled by a Thread pool but the max size
769    // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
770    // sequentially only.
771    // Related code in HMaster#startServiceThreads
772    // {code}
773    // // We depend on there being only one instance of this executor running
774    // // at a time. To do concurrency, would need fencing of enable/disable of
775    // // tables.
776    // this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
777    // {code}
778    // In future if we change this pool to have more threads, then there is a chance for thread,
779    // creating acl table, getting delayed and by that time another table creation got over and
780    // this hook is getting called. In such a case, we will need a wait logic here which will
781    // wait till the acl table is created.
782    if (PermissionStorage.isAclTable(desc)) {
783      this.aclTabAvailable = true;
784    } else {
785      if (!aclTabAvailable) {
786        LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
787          + PermissionStorage.ACL_TABLE_NAME + " is not yet created. " + getClass().getSimpleName()
788          + " should be configured as the first Coprocessor");
789      } else {
790        String owner = getActiveUser(c).getShortName();
791        final UserPermission userPermission = new UserPermission(owner,
792          Permission.newBuilder(desc.getTableName()).withActions(Action.values()).build());
793        // switch to the real hbase master user for doing the RPC on the ACL table
794        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
795          @Override
796          public Void run() throws Exception {
797            try (Table table =
798              c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
799              PermissionStorage.addUserPermission(c.getEnvironment().getConfiguration(),
800                userPermission, table);
801            }
802            return null;
803          }
804        });
805      }
806    }
807  }
808
809  @Override
810  public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
811    throws IOException {
812    requirePermission(c, "deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
813  }
814
815  @Override
816  public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
817    final TableName tableName) throws IOException {
818    final Configuration conf = c.getEnvironment().getConfiguration();
819    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
820      @Override
821      public Void run() throws Exception {
822        try (Table table =
823          c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
824          PermissionStorage.removeTablePermissions(conf, tableName, table);
825        }
826        return null;
827      }
828    });
829    zkPermissionWatcher.deleteTableACLNode(tableName);
830  }
831
832  @Override
833  public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
834    final TableName tableName) throws IOException {
835    requirePermission(c, "truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
836
837    final Configuration conf = c.getEnvironment().getConfiguration();
838    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
839      @Override
840      public Void run() throws Exception {
841        List<UserPermission> acls =
842          PermissionStorage.getUserTablePermissions(conf, tableName, null, null, null, false);
843        if (acls != null) {
844          tableAcls.put(tableName, acls);
845        }
846        return null;
847      }
848    });
849  }
850
851  @Override
852  public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
853    final TableName tableName) throws IOException {
854    final Configuration conf = ctx.getEnvironment().getConfiguration();
855    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
856      @Override
857      public Void run() throws Exception {
858        List<UserPermission> perms = tableAcls.get(tableName);
859        if (perms != null) {
860          for (UserPermission perm : perms) {
861            try (Table table =
862              ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
863              PermissionStorage.addUserPermission(conf, perm, table);
864            }
865          }
866        }
867        tableAcls.remove(tableName);
868        return null;
869      }
870    });
871  }
872
873  @Override
874  public TableDescriptor preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
875    TableName tableName, TableDescriptor currentDesc, TableDescriptor newDesc) throws IOException {
876    // TODO: potentially check if this is a add/modify/delete column operation
877    requirePermission(c, "modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
878    return newDesc;
879  }
880
881  @Override
882  public String preModifyTableStoreFileTracker(ObserverContext<MasterCoprocessorEnvironment> c,
883    TableName tableName, String dstSFT) throws IOException {
884    requirePermission(c, "modifyTableStoreFileTracker", tableName, null, null, Action.ADMIN,
885      Action.CREATE);
886    return dstSFT;
887  }
888
889  @Override
890  public String preModifyColumnFamilyStoreFileTracker(
891    ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName, byte[] family,
892    String dstSFT) throws IOException {
893    requirePermission(c, "modifyColumnFamilyStoreFileTracker", tableName, family, null,
894      Action.ADMIN, Action.CREATE);
895    return dstSFT;
896  }
897
898  @Override
899  public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
900    TableDescriptor oldDesc, TableDescriptor currentDesc) throws IOException {
901    final Configuration conf = c.getEnvironment().getConfiguration();
902    // default the table owner to current user, if not specified.
903    final String owner = getActiveUser(c).getShortName();
904    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
905      @Override
906      public Void run() throws Exception {
907        UserPermission userperm = new UserPermission(owner,
908          Permission.newBuilder(currentDesc.getTableName()).withActions(Action.values()).build());
909        try (Table table =
910          c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
911          PermissionStorage.addUserPermission(conf, userperm, table);
912        }
913        return null;
914      }
915    });
916  }
917
918  @Override
919  public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
920    throws IOException {
921    requirePermission(c, "enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
922  }
923
924  @Override
925  public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
926    throws IOException {
927    if (Bytes.equals(tableName.getName(), PermissionStorage.ACL_GLOBAL_NAME)) {
928      // We have to unconditionally disallow disable of the ACL table when we are installed,
929      // even if not enforcing authorizations. We are still allowing grants and revocations,
930      // checking permissions and logging audit messages, etc. If the ACL table is not
931      // available we will fail random actions all over the place.
932      throw new AccessDeniedException("Not allowed to disable " + PermissionStorage.ACL_TABLE_NAME
933        + " table with AccessController installed");
934    }
935    requirePermission(c, "disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
936  }
937
938  @Override
939  public void preAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx,
940    final long procId) throws IOException {
941    requirePermission(ctx, "abortProcedure", Action.ADMIN);
942  }
943
944  @Override
945  public void postAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx)
946    throws IOException {
947    // There is nothing to do at this time after the procedure abort request was sent.
948  }
949
950  @Override
951  public void preGetProcedures(ObserverContext<MasterCoprocessorEnvironment> ctx)
952    throws IOException {
953    requirePermission(ctx, "getProcedure", Action.ADMIN);
954  }
955
956  @Override
957  public void preGetLocks(ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
958    User user = getActiveUser(ctx);
959    accessChecker.requirePermission(user, "getLocks", null, Action.ADMIN);
960  }
961
962  @Override
963  public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo region,
964    ServerName srcServer, ServerName destServer) throws IOException {
965    requirePermission(c, "move", region.getTable(), null, null, Action.ADMIN);
966  }
967
968  @Override
969  public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo regionInfo)
970    throws IOException {
971    requirePermission(c, "assign", regionInfo.getTable(), null, null, Action.ADMIN);
972  }
973
974  @Override
975  public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo regionInfo)
976    throws IOException {
977    requirePermission(c, "unassign", regionInfo.getTable(), null, null, Action.ADMIN);
978  }
979
980  @Override
981  public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
982    RegionInfo regionInfo) throws IOException {
983    requirePermission(c, "regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
984  }
985
986  @Override
987  public void preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
988    final boolean newValue, final MasterSwitchType switchType) throws IOException {
989    requirePermission(ctx, "setSplitOrMergeEnabled", Action.ADMIN);
990  }
991
992  @Override
993  public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c, BalanceRequest request)
994    throws IOException {
995    requirePermission(c, "balance", Action.ADMIN);
996  }
997
998  @Override
999  public void preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c, boolean newValue)
1000    throws IOException {
1001    requirePermission(c, "balanceSwitch", Action.ADMIN);
1002  }
1003
1004  @Override
1005  public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c) throws IOException {
1006    requirePermission(c, "shutdown", Action.ADMIN);
1007  }
1008
1009  @Override
1010  public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c) throws IOException {
1011    requirePermission(c, "stopMaster", Action.ADMIN);
1012  }
1013
1014  @Override
1015  public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1016    throws IOException {
1017    try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) {
1018      if (!admin.tableExists(PermissionStorage.ACL_TABLE_NAME)) {
1019        createACLTable(admin);
1020      } else {
1021        this.aclTabAvailable = true;
1022      }
1023    }
1024  }
1025
1026  /**
1027   * Create the ACL table
1028   */
1029  private static void createACLTable(Admin admin) throws IOException {
1030    /** Table descriptor for ACL table */
1031    ColumnFamilyDescriptor cfd =
1032      ColumnFamilyDescriptorBuilder.newBuilder(PermissionStorage.ACL_LIST_FAMILY).setMaxVersions(1)
1033        .setInMemory(true).setBlockCacheEnabled(true).setBlocksize(8 * 1024)
1034        .setBloomFilterType(BloomType.NONE).setScope(HConstants.REPLICATION_SCOPE_LOCAL).build();
1035    TableDescriptor td = TableDescriptorBuilder.newBuilder(PermissionStorage.ACL_TABLE_NAME)
1036      .setColumnFamily(cfd).build();
1037    admin.createTable(td);
1038  }
1039
1040  @Override
1041  public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1042    final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor) throws IOException {
1043    // Move this ACL check to SnapshotManager#checkPermissions as part of AC deprecation.
1044    requirePermission(ctx, "snapshot " + snapshot.getName(), hTableDescriptor.getTableName(), null,
1045      null, Permission.Action.ADMIN);
1046  }
1047
1048  @Override
1049  public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1050    final SnapshotDescription snapshot) throws IOException {
1051    User user = getActiveUser(ctx);
1052    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1053      // list it, if user is the owner of snapshot
1054      AuthResult result = AuthResult.allow("listSnapshot " + snapshot.getName(),
1055        "Snapshot owner check allowed", user, null, null, null);
1056      AccessChecker.logResult(result);
1057    } else {
1058      accessChecker.requirePermission(user, "listSnapshot " + snapshot.getName(), null,
1059        Action.ADMIN);
1060    }
1061  }
1062
1063  @Override
1064  public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1065    final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor) throws IOException {
1066    User user = getActiveUser(ctx);
1067    if (
1068      SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)
1069        && hTableDescriptor.getTableName().getNameAsString().equals(snapshot.getTableNameAsString())
1070    ) {
1071      // Snapshot owner is allowed to create a table with the same name as the snapshot he took
1072      AuthResult result = AuthResult.allow("cloneSnapshot " + snapshot.getName(),
1073        "Snapshot owner check allowed", user, null, hTableDescriptor.getTableName(), null);
1074      AccessChecker.logResult(result);
1075    } else if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1076      requireNamespacePermission(ctx, "cloneSnapshot",
1077        hTableDescriptor.getTableName().getNamespaceAsString(), Action.ADMIN);
1078    } else {
1079      accessChecker.requirePermission(user, "cloneSnapshot " + snapshot.getName(), null,
1080        Action.ADMIN);
1081    }
1082  }
1083
1084  @Override
1085  public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1086    final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor) throws IOException {
1087    User user = getActiveUser(ctx);
1088    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1089      accessChecker.requirePermission(user, "restoreSnapshot " + snapshot.getName(),
1090        hTableDescriptor.getTableName(), null, null, null, Permission.Action.ADMIN);
1091    } else {
1092      accessChecker.requirePermission(user, "restoreSnapshot " + snapshot.getName(), null,
1093        Action.ADMIN);
1094    }
1095  }
1096
1097  @Override
1098  public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1099    final SnapshotDescription snapshot) throws IOException {
1100    User user = getActiveUser(ctx);
1101    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1102      // Snapshot owner is allowed to delete the snapshot
1103      AuthResult result = AuthResult.allow("deleteSnapshot " + snapshot.getName(),
1104        "Snapshot owner check allowed", user, null, null, null);
1105      AccessChecker.logResult(result);
1106    } else {
1107      accessChecker.requirePermission(user, "deleteSnapshot " + snapshot.getName(), null,
1108        Action.ADMIN);
1109    }
1110  }
1111
1112  @Override
1113  public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1114    NamespaceDescriptor ns) throws IOException {
1115    requireGlobalPermission(ctx, "createNamespace", Action.ADMIN, ns.getName());
1116  }
1117
1118  @Override
1119  public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1120    String namespace) throws IOException {
1121    requireGlobalPermission(ctx, "deleteNamespace", Action.ADMIN, namespace);
1122  }
1123
1124  @Override
1125  public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1126    final String namespace) throws IOException {
1127    final Configuration conf = ctx.getEnvironment().getConfiguration();
1128    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1129      @Override
1130      public Void run() throws Exception {
1131        try (Table table =
1132          ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
1133          PermissionStorage.removeNamespacePermissions(conf, namespace, table);
1134        }
1135        return null;
1136      }
1137    });
1138    zkPermissionWatcher.deleteNamespaceACLNode(namespace);
1139    LOG.info(namespace + " entry deleted in " + PermissionStorage.ACL_TABLE_NAME + " table.");
1140  }
1141
1142  @Override
1143  public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1144    NamespaceDescriptor currentNsDesc, NamespaceDescriptor newNsDesc) throws IOException {
1145    // We require only global permission so that
1146    // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1147    requireGlobalPermission(ctx, "modifyNamespace", Action.ADMIN, newNsDesc.getName());
1148  }
1149
1150  @Override
1151  public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
1152    String namespace) throws IOException {
1153    requireNamespacePermission(ctx, "getNamespaceDescriptor", namespace, Action.ADMIN);
1154  }
1155
1156  @Override
1157  public void postListNamespaces(ObserverContext<MasterCoprocessorEnvironment> ctx,
1158    List<String> namespaces) throws IOException {
1159    /* always allow namespace listing */
1160  }
1161
1162  @Override
1163  public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1164    List<NamespaceDescriptor> descriptors) throws IOException {
1165    // Retains only those which passes authorization checks, as the checks weren't done as part
1166    // of preGetTableDescriptors.
1167    Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1168    User user = getActiveUser(ctx);
1169    while (itr.hasNext()) {
1170      NamespaceDescriptor desc = itr.next();
1171      try {
1172        accessChecker.requireNamespacePermission(user, "listNamespaces", desc.getName(), null,
1173          Action.ADMIN);
1174      } catch (AccessDeniedException e) {
1175        itr.remove();
1176      }
1177    }
1178  }
1179
1180  @Override
1181  public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1182    final TableName tableName) throws IOException {
1183    // Move this ACL check to MasterFlushTableProcedureManager#checkPermissions as part of AC
1184    // deprecation.
1185    requirePermission(ctx, "flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1186  }
1187
1188  @Override
1189  public void preSplitRegion(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1190    final TableName tableName, final byte[] splitRow) throws IOException {
1191    requirePermission(ctx, "split", tableName, null, null, Action.ADMIN);
1192  }
1193
1194  @Override
1195  public void preClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
1196    throws IOException {
1197    requirePermission(ctx, "clearDeadServers", Action.ADMIN);
1198  }
1199
1200  @Override
1201  public void preDecommissionRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
1202    List<ServerName> servers, boolean offload) throws IOException {
1203    requirePermission(ctx, "decommissionRegionServers", Action.ADMIN);
1204  }
1205
1206  @Override
1207  public void preListDecommissionedRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
1208    throws IOException {
1209    requirePermission(ctx, "listDecommissionedRegionServers", Action.READ);
1210  }
1211
1212  @Override
1213  public void preRecommissionRegionServer(ObserverContext<MasterCoprocessorEnvironment> ctx,
1214    ServerName server, List<byte[]> encodedRegionNames) throws IOException {
1215    requirePermission(ctx, "recommissionRegionServers", Action.ADMIN);
1216  }
1217
1218  /* ---- RegionObserver implementation ---- */
1219
1220  @Override
1221  public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
1222    RegionCoprocessorEnvironment env = c.getEnvironment();
1223    final Region region = env.getRegion();
1224    if (region == null) {
1225      LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1226    } else {
1227      RegionInfo regionInfo = region.getRegionInfo();
1228      if (regionInfo.getTable().isSystemTable()) {
1229        checkSystemOrSuperUser(getActiveUser(c));
1230      } else {
1231        requirePermission(c, "preOpen", Action.ADMIN);
1232      }
1233    }
1234  }
1235
1236  @Override
1237  public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1238    RegionCoprocessorEnvironment env = c.getEnvironment();
1239    final Region region = env.getRegion();
1240    if (region == null) {
1241      LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1242      return;
1243    }
1244    if (PermissionStorage.isAclRegion(region)) {
1245      aclRegion = true;
1246      try {
1247        initialize(env);
1248      } catch (IOException ex) {
1249        // if we can't obtain permissions, it's better to fail
1250        // than perform checks incorrectly
1251        throw new RuntimeException("Failed to initialize permissions cache", ex);
1252      }
1253    } else {
1254      initialized = true;
1255    }
1256  }
1257
1258  @Override
1259  public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
1260    FlushLifeCycleTracker tracker) throws IOException {
1261    requirePermission(c, "flush", getTableName(c.getEnvironment()), null, null, Action.ADMIN,
1262      Action.CREATE);
1263  }
1264
1265  @Override
1266  public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
1267    InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
1268    CompactionRequest request) throws IOException {
1269    requirePermission(c, "compact", getTableName(c.getEnvironment()), null, null, Action.ADMIN,
1270      Action.CREATE);
1271    return scanner;
1272  }
1273
1274  private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1275    final Query query, OpType opType) throws IOException {
1276    Filter filter = query.getFilter();
1277    // Don't wrap an AccessControlFilter
1278    if (filter != null && filter instanceof AccessControlFilter) {
1279      return;
1280    }
1281    User user = getActiveUser(c);
1282    RegionCoprocessorEnvironment env = c.getEnvironment();
1283    Map<byte[], ? extends Collection<byte[]>> families = null;
1284    switch (opType) {
1285      case GET:
1286      case EXISTS:
1287        families = ((Get) query).getFamilyMap();
1288        break;
1289      case SCAN:
1290        families = ((Scan) query).getFamilyMap();
1291        break;
1292      default:
1293        throw new RuntimeException("Unhandled operation " + opType);
1294    }
1295    AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1296    Region region = getRegion(env);
1297    TableName table = getTableName(region);
1298    Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1299    for (ColumnFamilyDescriptor hcd : region.getTableDescriptor().getColumnFamilies()) {
1300      cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1301    }
1302    if (!authResult.isAllowed()) {
1303      if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1304        // Old behavior: Scan with only qualifier checks if we have partial
1305        // permission. Backwards compatible behavior is to throw an
1306        // AccessDeniedException immediately if there are no grants for table
1307        // or CF or CF+qual. Only proceed with an injected filter if there are
1308        // grants for qualifiers. Otherwise we will fall through below and log
1309        // the result and throw an ADE. We may end up checking qualifier
1310        // grants three times (permissionGranted above, here, and in the
1311        // filter) but that's the price of backwards compatibility.
1312        if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1313          authResult.setAllowed(true);
1314          authResult.setReason("Access allowed with filter");
1315          // Only wrap the filter if we are enforcing authorizations
1316          if (authorizationEnabled) {
1317            Filter ourFilter = new AccessControlFilter(getAuthManager(), user, table,
1318              AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY, cfVsMaxVersions);
1319            // wrap any existing filter
1320            if (filter != null) {
1321              ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1322                Lists.newArrayList(ourFilter, filter));
1323            }
1324            switch (opType) {
1325              case GET:
1326              case EXISTS:
1327                ((Get) query).setFilter(ourFilter);
1328                break;
1329              case SCAN:
1330                ((Scan) query).setFilter(ourFilter);
1331                break;
1332              default:
1333                throw new RuntimeException("Unhandled operation " + opType);
1334            }
1335          }
1336        }
1337      } else {
1338        // New behavior: Any access we might be granted is more fine-grained
1339        // than whole table or CF. Simply inject a filter and return what is
1340        // allowed. We will not throw an AccessDeniedException. This is a
1341        // behavioral change since 0.96.
1342        authResult.setAllowed(true);
1343        authResult.setReason("Access allowed with filter");
1344        // Only wrap the filter if we are enforcing authorizations
1345        if (authorizationEnabled) {
1346          Filter ourFilter = new AccessControlFilter(getAuthManager(), user, table,
1347            AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1348          // wrap any existing filter
1349          if (filter != null) {
1350            ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1351              Lists.newArrayList(ourFilter, filter));
1352          }
1353          switch (opType) {
1354            case GET:
1355            case EXISTS:
1356              ((Get) query).setFilter(ourFilter);
1357              break;
1358            case SCAN:
1359              ((Scan) query).setFilter(ourFilter);
1360              break;
1361            default:
1362              throw new RuntimeException("Unhandled operation " + opType);
1363          }
1364        }
1365      }
1366    }
1367
1368    AccessChecker.logResult(authResult);
1369    if (authorizationEnabled && !authResult.isAllowed()) {
1370      throw new AccessDeniedException("Insufficient permissions for user '"
1371        + (user != null ? user.getShortName() : "null") + "' (table=" + table + ", action=READ)");
1372    }
1373  }
1374
1375  @Override
1376  public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c, final Get get,
1377    final List<Cell> result) throws IOException {
1378    internalPreRead(c, get, OpType.GET);
1379  }
1380
1381  @Override
1382  public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c, final Get get,
1383    final boolean exists) throws IOException {
1384    internalPreRead(c, get, OpType.EXISTS);
1385    return exists;
1386  }
1387
1388  @Override
1389  public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
1390    final WALEdit edit, final Durability durability) throws IOException {
1391    User user = getActiveUser(c);
1392    checkForReservedTagPresence(user, put);
1393
1394    // Require WRITE permission to the table, CF, or top visible value, if any.
1395    // NOTE: We don't need to check the permissions for any earlier Puts
1396    // because we treat the ACLs in each Put as timestamped like any other
1397    // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1398    // change the ACL of any previous Put. This allows simple evolution of
1399    // security policy over time without requiring expensive updates.
1400    RegionCoprocessorEnvironment env = c.getEnvironment();
1401    Map<byte[], ? extends Collection<Cell>> families = put.getFamilyCellMap();
1402    AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1403    AccessChecker.logResult(authResult);
1404    if (!authResult.isAllowed()) {
1405      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1406        put.setAttribute(CHECK_COVERING_PERM, TRUE);
1407      } else if (authorizationEnabled) {
1408        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1409      }
1410    }
1411
1412    // Add cell ACLs from the operation to the cells themselves
1413    byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1414    if (bytes != null) {
1415      if (cellFeaturesEnabled) {
1416        addCellPermissions(bytes, put.getFamilyCellMap());
1417      } else {
1418        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1419      }
1420    }
1421  }
1422
1423  @Override
1424  public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
1425    final WALEdit edit, final Durability durability) {
1426    if (aclRegion) {
1427      updateACL(c.getEnvironment(), put.getFamilyCellMap());
1428    }
1429  }
1430
1431  @Override
1432  public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c, final Delete delete,
1433    final WALEdit edit, final Durability durability) throws IOException {
1434    // An ACL on a delete is useless, we shouldn't allow it
1435    if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1436      throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1437    }
1438    // Require WRITE permissions on all cells covered by the delete. Unlike
1439    // for Puts we need to check all visible prior versions, because a major
1440    // compaction could remove them. If the user doesn't have permission to
1441    // overwrite any of the visible versions ('visible' defined as not covered
1442    // by a tombstone already) then we have to disallow this operation.
1443    RegionCoprocessorEnvironment env = c.getEnvironment();
1444    Map<byte[], ? extends Collection<Cell>> families = delete.getFamilyCellMap();
1445    User user = getActiveUser(c);
1446    AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1447    AccessChecker.logResult(authResult);
1448    if (!authResult.isAllowed()) {
1449      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1450        delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1451      } else if (authorizationEnabled) {
1452        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1453      }
1454    }
1455  }
1456
1457  @Override
1458  public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1459    MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1460    if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1461      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1462      User user = getActiveUser(c);
1463      for (int i = 0; i < miniBatchOp.size(); i++) {
1464        Mutation m = miniBatchOp.getOperation(i);
1465        if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1466          // We have a failure with table, cf and q perm checks and now giving a chance for cell
1467          // perm check
1468          OpType opType;
1469          long timestamp;
1470          if (m instanceof Put) {
1471            checkForReservedTagPresence(user, m);
1472            opType = OpType.PUT;
1473            timestamp = m.getTimestamp();
1474          } else if (m instanceof Delete) {
1475            opType = OpType.DELETE;
1476            timestamp = m.getTimestamp();
1477          } else if (m instanceof Increment) {
1478            opType = OpType.INCREMENT;
1479            timestamp = ((Increment) m).getTimeRange().getMax();
1480          } else if (m instanceof Append) {
1481            opType = OpType.APPEND;
1482            timestamp = ((Append) m).getTimeRange().getMax();
1483          } else {
1484            // If the operation type is not Put/Delete/Increment/Append, do nothing
1485            continue;
1486          }
1487          AuthResult authResult = null;
1488          if (
1489            checkCoveringPermission(user, opType, c.getEnvironment(), m.getRow(),
1490              m.getFamilyCellMap(), timestamp, Action.WRITE)
1491          ) {
1492            authResult = AuthResult.allow(opType.toString(), "Covering cell set", user,
1493              Action.WRITE, table, m.getFamilyCellMap());
1494          } else {
1495            authResult = AuthResult.deny(opType.toString(), "Covering cell set", user, Action.WRITE,
1496              table, m.getFamilyCellMap());
1497          }
1498          AccessChecker.logResult(authResult);
1499          if (authorizationEnabled && !authResult.isAllowed()) {
1500            throw new AccessDeniedException(
1501              "Insufficient permissions " + authResult.toContextString());
1502          }
1503        }
1504      }
1505    }
1506  }
1507
1508  @Override
1509  public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c, final Delete delete,
1510    final WALEdit edit, final Durability durability) throws IOException {
1511    if (aclRegion) {
1512      updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1513    }
1514  }
1515
1516  @Override
1517  public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1518    final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op,
1519    final ByteArrayComparable comparator, final Put put, final boolean result) throws IOException {
1520    User user = getActiveUser(c);
1521    checkForReservedTagPresence(user, put);
1522
1523    // Require READ and WRITE permissions on the table, CF, and KV to update
1524    RegionCoprocessorEnvironment env = c.getEnvironment();
1525    Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1526    AuthResult authResult =
1527      permissionGranted(OpType.CHECK_AND_PUT, user, env, families, Action.READ, Action.WRITE);
1528    AccessChecker.logResult(authResult);
1529    if (!authResult.isAllowed()) {
1530      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1531        put.setAttribute(CHECK_COVERING_PERM, TRUE);
1532      } else if (authorizationEnabled) {
1533        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1534      }
1535    }
1536
1537    byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1538    if (bytes != null) {
1539      if (cellFeaturesEnabled) {
1540        addCellPermissions(bytes, put.getFamilyCellMap());
1541      } else {
1542        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1543      }
1544    }
1545    return result;
1546  }
1547
1548  @Override
1549  public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1550    final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator opp,
1551    final ByteArrayComparable comparator, final Put put, final boolean result) throws IOException {
1552    if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1553      // We had failure with table, cf and q perm checks and now giving a chance for cell
1554      // perm check
1555      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1556      Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1557      AuthResult authResult = null;
1558      User user = getActiveUser(c);
1559      if (
1560        checkCoveringPermission(user, OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1561          HConstants.LATEST_TIMESTAMP, Action.READ)
1562      ) {
1563        authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set", user,
1564          Action.READ, table, families);
1565      } else {
1566        authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set", user,
1567          Action.READ, table, families);
1568      }
1569      AccessChecker.logResult(authResult);
1570      if (authorizationEnabled && !authResult.isAllowed()) {
1571        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1572      }
1573    }
1574    return result;
1575  }
1576
1577  @Override
1578  public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1579    final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op,
1580    final ByteArrayComparable comparator, final Delete delete, final boolean result)
1581    throws IOException {
1582    // An ACL on a delete is useless, we shouldn't allow it
1583    if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1584      throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " + delete.toString());
1585    }
1586    // Require READ and WRITE permissions on the table, CF, and the KV covered
1587    // by the delete
1588    RegionCoprocessorEnvironment env = c.getEnvironment();
1589    Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1590    User user = getActiveUser(c);
1591    AuthResult authResult =
1592      permissionGranted(OpType.CHECK_AND_DELETE, user, env, families, Action.READ, Action.WRITE);
1593    AccessChecker.logResult(authResult);
1594    if (!authResult.isAllowed()) {
1595      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1596        delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1597      } else if (authorizationEnabled) {
1598        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1599      }
1600    }
1601    return result;
1602  }
1603
1604  @Override
1605  public boolean preCheckAndDeleteAfterRowLock(
1606    final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1607    final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator,
1608    final Delete delete, final boolean result) throws IOException {
1609    if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1610      // We had failure with table, cf and q perm checks and now giving a chance for cell
1611      // perm check
1612      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1613      Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1614      AuthResult authResult = null;
1615      User user = getActiveUser(c);
1616      if (
1617        checkCoveringPermission(user, OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1618          HConstants.LATEST_TIMESTAMP, Action.READ)
1619      ) {
1620        authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set", user,
1621          Action.READ, table, families);
1622      } else {
1623        authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set", user,
1624          Action.READ, table, families);
1625      }
1626      AccessChecker.logResult(authResult);
1627      if (authorizationEnabled && !authResult.isAllowed()) {
1628        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1629      }
1630    }
1631    return result;
1632  }
1633
1634  @Override
1635  public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1636    throws IOException {
1637    User user = getActiveUser(c);
1638    checkForReservedTagPresence(user, append);
1639
1640    // Require WRITE permission to the table, CF, and the KV to be appended
1641    RegionCoprocessorEnvironment env = c.getEnvironment();
1642    Map<byte[], ? extends Collection<Cell>> families = append.getFamilyCellMap();
1643    AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1644    AccessChecker.logResult(authResult);
1645    if (!authResult.isAllowed()) {
1646      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1647        append.setAttribute(CHECK_COVERING_PERM, TRUE);
1648      } else if (authorizationEnabled) {
1649        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1650      }
1651    }
1652
1653    byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1654    if (bytes != null) {
1655      if (cellFeaturesEnabled) {
1656        addCellPermissions(bytes, append.getFamilyCellMap());
1657      } else {
1658        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1659      }
1660    }
1661
1662    return null;
1663  }
1664
1665  @Override
1666  public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1667    final Increment increment) throws IOException {
1668    User user = getActiveUser(c);
1669    checkForReservedTagPresence(user, increment);
1670
1671    // Require WRITE permission to the table, CF, and the KV to be replaced by
1672    // the incremented value
1673    RegionCoprocessorEnvironment env = c.getEnvironment();
1674    Map<byte[], ? extends Collection<Cell>> families = increment.getFamilyCellMap();
1675    AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families, Action.WRITE);
1676    AccessChecker.logResult(authResult);
1677    if (!authResult.isAllowed()) {
1678      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1679        increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1680      } else if (authorizationEnabled) {
1681        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1682      }
1683    }
1684
1685    byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1686    if (bytes != null) {
1687      if (cellFeaturesEnabled) {
1688        addCellPermissions(bytes, increment.getFamilyCellMap());
1689      } else {
1690        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1691      }
1692    }
1693
1694    return null;
1695  }
1696
1697  @Override
1698  public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
1699    ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
1700    List<Pair<Cell, Cell>> cellPairs) throws IOException {
1701    // If the HFile version is insufficient to persist tags, we won't have any
1702    // work to do here
1703    if (!cellFeaturesEnabled || mutation.getACL() == null) {
1704      return cellPairs;
1705    }
1706    return cellPairs.stream()
1707      .map(pair -> new Pair<>(pair.getFirst(),
1708        createNewCellWithTags(mutation, pair.getFirst(), pair.getSecond())))
1709      .collect(Collectors.toList());
1710  }
1711
1712  @Override
1713  public List<Pair<Cell, Cell>> postAppendBeforeWAL(
1714    ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
1715    List<Pair<Cell, Cell>> cellPairs) throws IOException {
1716    // If the HFile version is insufficient to persist tags, we won't have any
1717    // work to do here
1718    if (!cellFeaturesEnabled || mutation.getACL() == null) {
1719      return cellPairs;
1720    }
1721    return cellPairs.stream()
1722      .map(pair -> new Pair<>(pair.getFirst(),
1723        createNewCellWithTags(mutation, pair.getFirst(), pair.getSecond())))
1724      .collect(Collectors.toList());
1725  }
1726
1727  private Cell createNewCellWithTags(Mutation mutation, Cell oldCell, Cell newCell) {
1728    // As Increment and Append operations have already copied the tags of oldCell to the newCell,
1729    // there is no need to rewrite them again. Just extract non-acl tags of newCell if we need to
1730    // add a new acl tag for the cell. Actually, oldCell is useless here.
1731    List<Tag> tags = Lists.newArrayList();
1732    if (newCell != null) {
1733      Iterator<Tag> tagIterator = PrivateCellUtil.tagsIterator(newCell);
1734      while (tagIterator.hasNext()) {
1735        Tag tag = tagIterator.next();
1736        if (tag.getType() != PermissionStorage.ACL_TAG_TYPE) {
1737          // Not an ACL tag, just carry it through
1738          if (LOG.isTraceEnabled()) {
1739            LOG.trace("Carrying forward tag from " + newCell + ": type " + tag.getType()
1740              + " length " + tag.getValueLength());
1741          }
1742          tags.add(tag);
1743        }
1744      }
1745    }
1746
1747    // We have checked the ACL tag of mutation is not null.
1748    // So that the tags could not be empty.
1749    tags.add(new ArrayBackedTag(PermissionStorage.ACL_TAG_TYPE, mutation.getACL()));
1750    return PrivateCellUtil.createCell(newCell, tags);
1751  }
1752
1753  @Override
1754  public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan)
1755    throws IOException {
1756    internalPreRead(c, scan, OpType.SCAN);
1757  }
1758
1759  @Override
1760  public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1761    final Scan scan, final RegionScanner s) throws IOException {
1762    User user = getActiveUser(c);
1763    if (user != null && user.getShortName() != null) {
1764      // store reference to scanner owner for later checks
1765      scannerOwners.put(s, user.getShortName());
1766    }
1767    return s;
1768  }
1769
1770  @Override
1771  public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1772    final InternalScanner s, final List<Result> result, final int limit, final boolean hasNext)
1773    throws IOException {
1774    requireScannerOwner(s);
1775    return hasNext;
1776  }
1777
1778  @Override
1779  public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1780    final InternalScanner s) throws IOException {
1781    requireScannerOwner(s);
1782  }
1783
1784  @Override
1785  public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1786    final InternalScanner s) throws IOException {
1787    // clean up any associated owner mapping
1788    scannerOwners.remove(s);
1789  }
1790
1791  /**
1792   * Verify, when servicing an RPC, that the caller is the scanner owner. If so, we assume that
1793   * access control is correctly enforced based on the checks performed in preScannerOpen()
1794   */
1795  private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
1796    if (!RpcServer.isInRpcCallContext()) {
1797      return;
1798    }
1799    String requestUserName = RpcServer.getRequestUserName().orElse(null);
1800    String owner = scannerOwners.get(s);
1801    if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
1802      throw new AccessDeniedException("User '" + requestUserName + "' is not the scanner owner!");
1803    }
1804  }
1805
1806  /**
1807   * Verifies user has CREATE or ADMIN privileges on the Column Families involved in the
1808   * bulkLoadHFile request. Specific Column Write privileges are presently ignored.
1809   */
1810  @Override
1811  public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1812    List<Pair<byte[], String>> familyPaths) throws IOException {
1813    User user = getActiveUser(ctx);
1814    for (Pair<byte[], String> el : familyPaths) {
1815      accessChecker.requirePermission(user, "preBulkLoadHFile",
1816        ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), el.getFirst(), null,
1817        null, Action.ADMIN, Action.CREATE);
1818    }
1819  }
1820
1821  /**
1822   * Authorization check for SecureBulkLoadProtocol.prepareBulkLoad()
1823   * @param ctx the context
1824   */
1825  @Override
1826  public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
1827    throws IOException {
1828    requireAccess(ctx, "prePrepareBulkLoad",
1829      ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.ADMIN,
1830      Action.CREATE);
1831  }
1832
1833  /**
1834   * Authorization security check for SecureBulkLoadProtocol.cleanupBulkLoad()
1835   * @param ctx the context
1836   */
1837  @Override
1838  public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
1839    throws IOException {
1840    requireAccess(ctx, "preCleanupBulkLoad",
1841      ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.ADMIN,
1842      Action.CREATE);
1843  }
1844
1845  /* ---- EndpointObserver implementation ---- */
1846
1847  @Override
1848  public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1849    Service service, String methodName, Message request) throws IOException {
1850    // Don't intercept calls to our own AccessControlService, we check for
1851    // appropriate permissions in the service handlers
1852    if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
1853      requirePermission(ctx,
1854        "invoke(" + service.getDescriptorForType().getName() + "." + methodName + ")",
1855        getTableName(ctx.getEnvironment()), null, null, Action.EXEC);
1856    }
1857    return request;
1858  }
1859
1860  @Override
1861  public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1862    Service service, String methodName, Message request, Message.Builder responseBuilder)
1863    throws IOException {
1864  }
1865
1866  /* ---- Protobuf AccessControlService implementation ---- */
1867
1868  /**
1869   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1870   *             {@link Admin#grant(UserPermission, boolean)} instead.
1871   * @see Admin#grant(UserPermission, boolean)
1872   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21739">HBASE-21739</a>
1873   */
1874  @Deprecated
1875  @Override
1876  public void grant(RpcController controller, AccessControlProtos.GrantRequest request,
1877    RpcCallback<AccessControlProtos.GrantResponse> done) {
1878    final UserPermission perm = AccessControlUtil.toUserPermission(request.getUserPermission());
1879    AccessControlProtos.GrantResponse response = null;
1880    try {
1881      // verify it's only running at .acl.
1882      if (aclRegion) {
1883        if (!initialized) {
1884          throw new CoprocessorException("AccessController not yet initialized");
1885        }
1886        User caller = RpcServer.getRequestUser().orElse(null);
1887        if (LOG.isDebugEnabled()) {
1888          LOG.debug("Received request from {} to grant access permission {}", caller.getName(),
1889            perm.toString());
1890        }
1891        preGrantOrRevoke(caller, "grant", perm);
1892
1893        // regionEnv is set at #start. Hopefully not null at this point.
1894        regionEnv.getConnection().getAdmin().grant(
1895          new UserPermission(perm.getUser(), perm.getPermission()),
1896          request.getMergeExistingPermissions());
1897        if (AUDITLOG.isTraceEnabled()) {
1898          // audit log should store permission changes in addition to auth results
1899          AUDITLOG.trace("Granted permission " + perm.toString());
1900        }
1901      } else {
1902        throw new CoprocessorException(AccessController.class,
1903          "This method " + "can only execute at " + PermissionStorage.ACL_TABLE_NAME + " table.");
1904      }
1905      response = AccessControlProtos.GrantResponse.getDefaultInstance();
1906    } catch (IOException ioe) {
1907      // pass exception back up
1908      CoprocessorRpcUtils.setControllerException(controller, ioe);
1909    }
1910    done.run(response);
1911  }
1912
1913  /**
1914   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use {@link Admin#revoke(UserPermission)}
1915   *             instead.
1916   * @see Admin#revoke(UserPermission)
1917   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21739">HBASE-21739</a>
1918   */
1919  @Deprecated
1920  @Override
1921  public void revoke(RpcController controller, AccessControlProtos.RevokeRequest request,
1922    RpcCallback<AccessControlProtos.RevokeResponse> done) {
1923    final UserPermission perm = AccessControlUtil.toUserPermission(request.getUserPermission());
1924    AccessControlProtos.RevokeResponse response = null;
1925    try {
1926      // only allowed to be called on _acl_ region
1927      if (aclRegion) {
1928        if (!initialized) {
1929          throw new CoprocessorException("AccessController not yet initialized");
1930        }
1931        User caller = RpcServer.getRequestUser().orElse(null);
1932        if (LOG.isDebugEnabled()) {
1933          LOG.debug("Received request from {} to revoke access permission {}",
1934            caller.getShortName(), perm.toString());
1935        }
1936        preGrantOrRevoke(caller, "revoke", perm);
1937        // regionEnv is set at #start. Hopefully not null here.
1938        regionEnv.getConnection().getAdmin()
1939          .revoke(new UserPermission(perm.getUser(), perm.getPermission()));
1940        if (AUDITLOG.isTraceEnabled()) {
1941          // audit log should record all permission changes
1942          AUDITLOG.trace("Revoked permission " + perm.toString());
1943        }
1944      } else {
1945        throw new CoprocessorException(AccessController.class,
1946          "This method " + "can only execute at " + PermissionStorage.ACL_TABLE_NAME + " table.");
1947      }
1948      response = AccessControlProtos.RevokeResponse.getDefaultInstance();
1949    } catch (IOException ioe) {
1950      // pass exception back up
1951      CoprocessorRpcUtils.setControllerException(controller, ioe);
1952    }
1953    done.run(response);
1954  }
1955
1956  /**
1957   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1958   *             {@link Admin#getUserPermissions(GetUserPermissionsRequest)} instead.
1959   * @see Admin#getUserPermissions(GetUserPermissionsRequest)
1960   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21911">HBASE-21911</a>
1961   */
1962  @Deprecated
1963  @Override
1964  public void getUserPermissions(RpcController controller,
1965    AccessControlProtos.GetUserPermissionsRequest request,
1966    RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
1967    AccessControlProtos.GetUserPermissionsResponse response = null;
1968    try {
1969      // only allowed to be called on _acl_ region
1970      if (aclRegion) {
1971        if (!initialized) {
1972          throw new CoprocessorException("AccessController not yet initialized");
1973        }
1974        User caller = RpcServer.getRequestUser().orElse(null);
1975        final String userName = request.hasUserName() ? request.getUserName().toStringUtf8() : null;
1976        final String namespace =
1977          request.hasNamespaceName() ? request.getNamespaceName().toStringUtf8() : null;
1978        final TableName table =
1979          request.hasTableName() ? ProtobufUtil.toTableName(request.getTableName()) : null;
1980        final byte[] cf =
1981          request.hasColumnFamily() ? request.getColumnFamily().toByteArray() : null;
1982        final byte[] cq =
1983          request.hasColumnQualifier() ? request.getColumnQualifier().toByteArray() : null;
1984        preGetUserPermissions(caller, userName, namespace, table, cf, cq);
1985        GetUserPermissionsRequest getUserPermissionsRequest = null;
1986        if (request.getType() == AccessControlProtos.Permission.Type.Table) {
1987          getUserPermissionsRequest = GetUserPermissionsRequest.newBuilder(table).withFamily(cf)
1988            .withQualifier(cq).withUserName(userName).build();
1989        } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
1990          getUserPermissionsRequest =
1991            GetUserPermissionsRequest.newBuilder(namespace).withUserName(userName).build();
1992        } else {
1993          getUserPermissionsRequest =
1994            GetUserPermissionsRequest.newBuilder().withUserName(userName).build();
1995        }
1996        List<UserPermission> perms =
1997          regionEnv.getConnection().getAdmin().getUserPermissions(getUserPermissionsRequest);
1998        response = AccessControlUtil.buildGetUserPermissionsResponse(perms);
1999      } else {
2000        throw new CoprocessorException(AccessController.class,
2001          "This method " + "can only execute at " + PermissionStorage.ACL_TABLE_NAME + " table.");
2002      }
2003    } catch (IOException ioe) {
2004      // pass exception back up
2005      CoprocessorRpcUtils.setControllerException(controller, ioe);
2006    }
2007    done.run(response);
2008  }
2009
2010  /**
2011   * @deprecated since 2.2.0 and will be removed 4.0.0. Use {@link Admin#hasUserPermissions(List)}
2012   *             instead.
2013   * @see Admin#hasUserPermissions(List)
2014   * @see <a href="https://issues.apache.org/jira/browse/HBASE-22117">HBASE-22117</a>
2015   */
2016  @Deprecated
2017  @Override
2018  public void checkPermissions(RpcController controller,
2019    AccessControlProtos.CheckPermissionsRequest request,
2020    RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2021    AccessControlProtos.CheckPermissionsResponse response = null;
2022    try {
2023      User user = RpcServer.getRequestUser().orElse(null);
2024      TableName tableName = regionEnv.getRegion().getTableDescriptor().getTableName();
2025      List<Permission> permissions = new ArrayList<>();
2026      for (int i = 0; i < request.getPermissionCount(); i++) {
2027        Permission permission = AccessControlUtil.toPermission(request.getPermission(i));
2028        permissions.add(permission);
2029        if (permission instanceof TablePermission) {
2030          TablePermission tperm = (TablePermission) permission;
2031          if (!tperm.getTableName().equals(tableName)) {
2032            throw new CoprocessorException(AccessController.class,
2033              String.format(
2034                "This method can only execute at the table specified in "
2035                  + "TablePermission. Table of the region:%s , requested table:%s",
2036                tableName, tperm.getTableName()));
2037          }
2038        }
2039      }
2040      for (Permission permission : permissions) {
2041        boolean hasPermission =
2042          accessChecker.hasUserPermission(user, "checkPermissions", permission);
2043        if (!hasPermission) {
2044          throw new AccessDeniedException("Insufficient permissions " + permission.toString());
2045        }
2046      }
2047      response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2048    } catch (IOException ioe) {
2049      CoprocessorRpcUtils.setControllerException(controller, ioe);
2050    }
2051    done.run(response);
2052  }
2053
2054  private Region getRegion(RegionCoprocessorEnvironment e) {
2055    return e.getRegion();
2056  }
2057
2058  private TableName getTableName(RegionCoprocessorEnvironment e) {
2059    Region region = e.getRegion();
2060    if (region != null) {
2061      return getTableName(region);
2062    }
2063    return null;
2064  }
2065
2066  private TableName getTableName(Region region) {
2067    RegionInfo regionInfo = region.getRegionInfo();
2068    if (regionInfo != null) {
2069      return regionInfo.getTable();
2070    }
2071    return null;
2072  }
2073
2074  @Override
2075  public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
2076    throws IOException {
2077    requirePermission(c, "preClose", Action.ADMIN);
2078  }
2079
2080  private void checkSystemOrSuperUser(User activeUser) throws IOException {
2081    // No need to check if we're not going to throw
2082    if (!authorizationEnabled) {
2083      return;
2084    }
2085    if (!Superusers.isSuperUser(activeUser)) {
2086      throw new AccessDeniedException(
2087        "User '" + (activeUser != null ? activeUser.getShortName() : "null")
2088          + "' is not system or super user.");
2089    }
2090  }
2091
2092  @Override
2093  public void preStopRegionServer(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2094    throws IOException {
2095    requirePermission(ctx, "preStopRegionServer", Action.ADMIN);
2096  }
2097
2098  private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family, byte[] qualifier) {
2099    if (family == null) {
2100      return null;
2101    }
2102
2103    Map<byte[], Collection<byte[]>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2104    familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2105    return familyMap;
2106  }
2107
2108  @Override
2109  public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2110    List<TableName> tableNamesList, List<TableDescriptor> descriptors, String regex)
2111    throws IOException {
2112    // We are delegating the authorization check to postGetTableDescriptors as we don't have
2113    // any concrete set of table names when a regex is present or the full list is requested.
2114    if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2115      // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2116      // request can be granted.
2117      try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) {
2118        for (TableName tableName : tableNamesList) {
2119          // Skip checks for a table that does not exist
2120          if (!admin.tableExists(tableName)) {
2121            continue;
2122          }
2123          requirePermission(ctx, "getTableDescriptors", tableName, null, null, Action.ADMIN,
2124            Action.CREATE);
2125        }
2126      }
2127    }
2128  }
2129
2130  @Override
2131  public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2132    List<TableName> tableNamesList, List<TableDescriptor> descriptors, String regex)
2133    throws IOException {
2134    // Skipping as checks in this case are already done by preGetTableDescriptors.
2135    if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2136      return;
2137    }
2138
2139    // Retains only those which passes authorization checks, as the checks weren't done as part
2140    // of preGetTableDescriptors.
2141    Iterator<TableDescriptor> itr = descriptors.iterator();
2142    while (itr.hasNext()) {
2143      TableDescriptor htd = itr.next();
2144      try {
2145        requirePermission(ctx, "getTableDescriptors", htd.getTableName(), null, null, Action.ADMIN,
2146          Action.CREATE);
2147      } catch (AccessDeniedException e) {
2148        itr.remove();
2149      }
2150    }
2151  }
2152
2153  @Override
2154  public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2155    List<TableDescriptor> descriptors, String regex) throws IOException {
2156    // Retains only those which passes authorization checks.
2157    Iterator<TableDescriptor> itr = descriptors.iterator();
2158    while (itr.hasNext()) {
2159      TableDescriptor htd = itr.next();
2160      try {
2161        requireAccess(ctx, "getTableNames", htd.getTableName(), Action.values());
2162      } catch (AccessDeniedException e) {
2163        itr.remove();
2164      }
2165    }
2166  }
2167
2168  @Override
2169  public void preMergeRegions(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2170    final RegionInfo[] regionsToMerge) throws IOException {
2171    requirePermission(ctx, "mergeRegions", regionsToMerge[0].getTable(), null, null, Action.ADMIN);
2172  }
2173
2174  @Override
2175  public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2176    throws IOException {
2177    requirePermission(ctx, "preRollLogWriterRequest", Permission.Action.ADMIN);
2178  }
2179
2180  @Override
2181  public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2182    throws IOException {
2183  }
2184
2185  @Override
2186  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2187    final String userName, final GlobalQuotaSettings quotas) throws IOException {
2188    requirePermission(ctx, "setUserQuota", Action.ADMIN);
2189  }
2190
2191  @Override
2192  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2193    final String userName, final TableName tableName, final GlobalQuotaSettings quotas)
2194    throws IOException {
2195    requirePermission(ctx, "setUserTableQuota", tableName, null, null, Action.ADMIN);
2196  }
2197
2198  @Override
2199  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2200    final String userName, final String namespace, final GlobalQuotaSettings quotas)
2201    throws IOException {
2202    requirePermission(ctx, "setUserNamespaceQuota", Action.ADMIN);
2203  }
2204
2205  @Override
2206  public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2207    final TableName tableName, final GlobalQuotaSettings quotas) throws IOException {
2208    requirePermission(ctx, "setTableQuota", tableName, null, null, Action.ADMIN);
2209  }
2210
2211  @Override
2212  public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2213    final String namespace, final GlobalQuotaSettings quotas) throws IOException {
2214    requirePermission(ctx, "setNamespaceQuota", Action.ADMIN);
2215  }
2216
2217  @Override
2218  public void preSetRegionServerQuota(ObserverContext<MasterCoprocessorEnvironment> ctx,
2219    final String regionServer, GlobalQuotaSettings quotas) throws IOException {
2220    requirePermission(ctx, "setRegionServerQuota", Action.ADMIN);
2221  }
2222
2223  @Override
2224  public ReplicationEndpoint postCreateReplicationEndPoint(
2225    ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2226    return endpoint;
2227  }
2228
2229  @Override
2230  public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2231    throws IOException {
2232    requirePermission(ctx, "replicateLogEntries", Action.WRITE);
2233  }
2234
2235  @Override
2236  public void preClearCompactionQueues(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2237    throws IOException {
2238    requirePermission(ctx, "preClearCompactionQueues", Permission.Action.ADMIN);
2239  }
2240
2241  @Override
2242  public void preAddReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2243    String peerId, ReplicationPeerConfig peerConfig) throws IOException {
2244    requirePermission(ctx, "addReplicationPeer", Action.ADMIN);
2245  }
2246
2247  @Override
2248  public void preRemoveReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2249    String peerId) throws IOException {
2250    requirePermission(ctx, "removeReplicationPeer", Action.ADMIN);
2251  }
2252
2253  @Override
2254  public void preEnableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2255    String peerId) throws IOException {
2256    requirePermission(ctx, "enableReplicationPeer", Action.ADMIN);
2257  }
2258
2259  @Override
2260  public void preDisableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2261    String peerId) throws IOException {
2262    requirePermission(ctx, "disableReplicationPeer", Action.ADMIN);
2263  }
2264
2265  @Override
2266  public void preGetReplicationPeerConfig(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2267    String peerId) throws IOException {
2268    requirePermission(ctx, "getReplicationPeerConfig", Action.ADMIN);
2269  }
2270
2271  @Override
2272  public void preUpdateReplicationPeerConfig(
2273    final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
2274    ReplicationPeerConfig peerConfig) throws IOException {
2275    requirePermission(ctx, "updateReplicationPeerConfig", Action.ADMIN);
2276  }
2277
2278  @Override
2279  public void preTransitReplicationPeerSyncReplicationState(
2280    final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
2281    SyncReplicationState clusterState) throws IOException {
2282    requirePermission(ctx, "transitSyncReplicationPeerState", Action.ADMIN);
2283  }
2284
2285  @Override
2286  public void preListReplicationPeers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2287    String regex) throws IOException {
2288    requirePermission(ctx, "listReplicationPeers", Action.ADMIN);
2289  }
2290
2291  @Override
2292  public void preRequestLock(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace,
2293    TableName tableName, RegionInfo[] regionInfos, String description) throws IOException {
2294    // There are operations in the CREATE and ADMIN domain which may require lock, READ
2295    // or WRITE. So for any lock request, we check for these two perms irrespective of lock type.
2296    String reason = String.format("Description=%s", description);
2297    checkLockPermissions(ctx, namespace, tableName, regionInfos, reason);
2298  }
2299
2300  @Override
2301  public void preLockHeartbeat(ObserverContext<MasterCoprocessorEnvironment> ctx,
2302    TableName tableName, String description) throws IOException {
2303    checkLockPermissions(ctx, null, tableName, null, description);
2304  }
2305
2306  @Override
2307  public void preExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2308    throws IOException {
2309    checkSystemOrSuperUser(getActiveUser(ctx));
2310  }
2311
2312  @Override
2313  public void preSwitchRpcThrottle(ObserverContext<MasterCoprocessorEnvironment> ctx,
2314    boolean enable) throws IOException {
2315    requirePermission(ctx, "switchRpcThrottle", Action.ADMIN);
2316  }
2317
2318  @Override
2319  public void preIsRpcThrottleEnabled(ObserverContext<MasterCoprocessorEnvironment> ctx)
2320    throws IOException {
2321    requirePermission(ctx, "isRpcThrottleEnabled", Action.ADMIN);
2322  }
2323
2324  @Override
2325  public void preSwitchExceedThrottleQuota(ObserverContext<MasterCoprocessorEnvironment> ctx,
2326    boolean enable) throws IOException {
2327    requirePermission(ctx, "switchExceedThrottleQuota", Action.ADMIN);
2328  }
2329
2330  /**
2331   * Returns the active user to which authorization checks should be applied. If we are in the
2332   * context of an RPC call, the remote user is used, otherwise the currently logged in user is
2333   * used.
2334   */
2335  private User getActiveUser(ObserverContext<?> ctx) throws IOException {
2336    // for non-rpc handling, fallback to system user
2337    Optional<User> optionalUser = ctx.getCaller();
2338    if (optionalUser.isPresent()) {
2339      return optionalUser.get();
2340    }
2341    return userProvider.getCurrent();
2342  }
2343
2344  /**
2345   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
2346   *             {@link Admin#hasUserPermissions(String, List)} instead.
2347   * @see Admin#hasUserPermissions(String, List)
2348   * @see <a href="https://issues.apache.org/jira/browse/HBASE-22117">HBASE-22117</a>
2349   */
2350  @Deprecated
2351  @Override
2352  public void hasPermission(RpcController controller, HasPermissionRequest request,
2353    RpcCallback<HasPermissionResponse> done) {
2354    // Converts proto to a TablePermission object.
2355    TablePermission tPerm = AccessControlUtil.toTablePermission(request.getTablePermission());
2356    // Check input user name
2357    if (!request.hasUserName()) {
2358      throw new IllegalStateException("Input username cannot be empty");
2359    }
2360    final String inputUserName = request.getUserName().toStringUtf8();
2361    AccessControlProtos.HasPermissionResponse response = null;
2362    try {
2363      User caller = RpcServer.getRequestUser().orElse(null);
2364      List<Permission> permissions = Lists.newArrayList(tPerm);
2365      preHasUserPermissions(caller, inputUserName, permissions);
2366      boolean hasPermission =
2367        regionEnv.getConnection().getAdmin().hasUserPermissions(inputUserName, permissions).get(0);
2368      response = ResponseConverter.buildHasPermissionResponse(hasPermission);
2369    } catch (IOException ioe) {
2370      ResponseConverter.setControllerException(controller, ioe);
2371    }
2372    done.run(response);
2373  }
2374
2375  @Override
2376  public void preGrant(ObserverContext<MasterCoprocessorEnvironment> ctx,
2377    UserPermission userPermission, boolean mergeExistingPermissions) throws IOException {
2378    preGrantOrRevoke(getActiveUser(ctx), "grant", userPermission);
2379  }
2380
2381  @Override
2382  public void preRevoke(ObserverContext<MasterCoprocessorEnvironment> ctx,
2383    UserPermission userPermission) throws IOException {
2384    preGrantOrRevoke(getActiveUser(ctx), "revoke", userPermission);
2385  }
2386
2387  private void preGrantOrRevoke(User caller, String request, UserPermission userPermission)
2388    throws IOException {
2389    switch (userPermission.getPermission().scope) {
2390      case GLOBAL:
2391        accessChecker.requireGlobalPermission(caller, request, Action.ADMIN, "");
2392        break;
2393      case NAMESPACE:
2394        NamespacePermission namespacePerm = (NamespacePermission) userPermission.getPermission();
2395        accessChecker.requireNamespacePermission(caller, request, namespacePerm.getNamespace(),
2396          null, Action.ADMIN);
2397        break;
2398      case TABLE:
2399        TablePermission tablePerm = (TablePermission) userPermission.getPermission();
2400        accessChecker.requirePermission(caller, request, tablePerm.getTableName(),
2401          tablePerm.getFamily(), tablePerm.getQualifier(), null, Action.ADMIN);
2402        break;
2403      default:
2404    }
2405    if (!Superusers.isSuperUser(caller)) {
2406      accessChecker.performOnSuperuser(request, caller, userPermission.getUser());
2407    }
2408  }
2409
2410  @Override
2411  public void preGetUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
2412    String userName, String namespace, TableName tableName, byte[] family, byte[] qualifier)
2413    throws IOException {
2414    preGetUserPermissions(getActiveUser(ctx), userName, namespace, tableName, family, qualifier);
2415  }
2416
2417  private void preGetUserPermissions(User caller, String userName, String namespace,
2418    TableName tableName, byte[] family, byte[] qualifier) throws IOException {
2419    if (tableName != null) {
2420      accessChecker.requirePermission(caller, "getUserPermissions", tableName, family, qualifier,
2421        userName, Action.ADMIN);
2422    } else if (namespace != null) {
2423      accessChecker.requireNamespacePermission(caller, "getUserPermissions", namespace, userName,
2424        Action.ADMIN);
2425    } else {
2426      accessChecker.requirePermission(caller, "getUserPermissions", userName, Action.ADMIN);
2427    }
2428  }
2429
2430  @Override
2431  public void preHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
2432    String userName, List<Permission> permissions) throws IOException {
2433    preHasUserPermissions(getActiveUser(ctx), userName, permissions);
2434  }
2435
2436  private void preHasUserPermissions(User caller, String userName, List<Permission> permissions)
2437    throws IOException {
2438    String request = "hasUserPermissions";
2439    for (Permission permission : permissions) {
2440      if (!caller.getShortName().equals(userName)) {
2441        // User should have admin privilege if checking permission for other users
2442        if (permission instanceof TablePermission) {
2443          TablePermission tPerm = (TablePermission) permission;
2444          accessChecker.requirePermission(caller, request, tPerm.getTableName(), tPerm.getFamily(),
2445            tPerm.getQualifier(), userName, Action.ADMIN);
2446        } else if (permission instanceof NamespacePermission) {
2447          NamespacePermission nsPerm = (NamespacePermission) permission;
2448          accessChecker.requireNamespacePermission(caller, request, nsPerm.getNamespace(), userName,
2449            Action.ADMIN);
2450        } else {
2451          accessChecker.requirePermission(caller, request, userName, Action.ADMIN);
2452        }
2453      } else {
2454        // User don't need ADMIN privilege for self check.
2455        // Setting action as null in AuthResult to display empty action in audit log
2456        AuthResult result;
2457        if (permission instanceof TablePermission) {
2458          TablePermission tPerm = (TablePermission) permission;
2459          result = AuthResult.allow(request, "Self user validation allowed", caller, null,
2460            tPerm.getTableName(), tPerm.getFamily(), tPerm.getQualifier());
2461        } else if (permission instanceof NamespacePermission) {
2462          NamespacePermission nsPerm = (NamespacePermission) permission;
2463          result = AuthResult.allow(request, "Self user validation allowed", caller, null,
2464            nsPerm.getNamespace());
2465        } else {
2466          result = AuthResult.allow(request, "Self user validation allowed", caller, null, null,
2467            null, null);
2468        }
2469        AccessChecker.logResult(result);
2470      }
2471    }
2472  }
2473
2474  @Override
2475  public void preMoveServersAndTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
2476    Set<Address> servers, Set<TableName> tables, String targetGroup) throws IOException {
2477    accessChecker.requirePermission(getActiveUser(ctx), "moveServersAndTables", null,
2478      Permission.Action.ADMIN);
2479  }
2480
2481  @Override
2482  public void preMoveServers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2483    Set<Address> servers, String targetGroup) throws IOException {
2484    accessChecker.requirePermission(getActiveUser(ctx), "moveServers", null,
2485      Permission.Action.ADMIN);
2486  }
2487
2488  @Override
2489  public void preMoveTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
2490    Set<TableName> tables, String targetGroup) throws IOException {
2491    accessChecker.requirePermission(getActiveUser(ctx), "moveTables", null,
2492      Permission.Action.ADMIN);
2493  }
2494
2495  @Override
2496  public void preAddRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, String name)
2497    throws IOException {
2498    accessChecker.requirePermission(getActiveUser(ctx), "addRSGroup", null,
2499      Permission.Action.ADMIN);
2500  }
2501
2502  @Override
2503  public void preRemoveRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, String name)
2504    throws IOException {
2505    accessChecker.requirePermission(getActiveUser(ctx), "removeRSGroup", null,
2506      Permission.Action.ADMIN);
2507  }
2508
2509  @Override
2510  public void preBalanceRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, String groupName,
2511    BalanceRequest request) throws IOException {
2512    accessChecker.requirePermission(getActiveUser(ctx), "balanceRSGroup", null,
2513      Permission.Action.ADMIN);
2514  }
2515
2516  @Override
2517  public void preRemoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
2518    Set<Address> servers) throws IOException {
2519    accessChecker.requirePermission(getActiveUser(ctx), "removeServers", null,
2520      Permission.Action.ADMIN);
2521  }
2522
2523  @Override
2524  public void preGetRSGroupInfo(ObserverContext<MasterCoprocessorEnvironment> ctx, String groupName)
2525    throws IOException {
2526    accessChecker.requirePermission(getActiveUser(ctx), "getRSGroupInfo", null,
2527      Permission.Action.ADMIN);
2528  }
2529
2530  @Override
2531  public void preGetRSGroupInfoOfTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
2532    TableName tableName) throws IOException {
2533    accessChecker.requirePermission(getActiveUser(ctx), "getRSGroupInfoOfTable", null,
2534      Permission.Action.ADMIN);
2535    // todo: should add check for table existence
2536  }
2537
2538  @Override
2539  public void preListRSGroups(ObserverContext<MasterCoprocessorEnvironment> ctx)
2540    throws IOException {
2541    accessChecker.requirePermission(getActiveUser(ctx), "listRSGroups", null,
2542      Permission.Action.ADMIN);
2543  }
2544
2545  @Override
2546  public void preListTablesInRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2547    String groupName) throws IOException {
2548    accessChecker.requirePermission(getActiveUser(ctx), "listTablesInRSGroup", null,
2549      Permission.Action.ADMIN);
2550  }
2551
2552  @Override
2553  public void preGetConfiguredNamespacesAndTablesInRSGroup(
2554    ObserverContext<MasterCoprocessorEnvironment> ctx, String groupName) throws IOException {
2555    accessChecker.requirePermission(getActiveUser(ctx), "getConfiguredNamespacesAndTablesInRSGroup",
2556      null, Permission.Action.ADMIN);
2557  }
2558
2559  @Override
2560  public void preGetRSGroupInfoOfServer(ObserverContext<MasterCoprocessorEnvironment> ctx,
2561    Address server) throws IOException {
2562    accessChecker.requirePermission(getActiveUser(ctx), "getRSGroupInfoOfServer", null,
2563      Permission.Action.ADMIN);
2564  }
2565
2566  @Override
2567  public void preRenameRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx, String oldName,
2568    String newName) throws IOException {
2569    accessChecker.requirePermission(getActiveUser(ctx), "renameRSGroup", null,
2570      Permission.Action.ADMIN);
2571  }
2572
2573  @Override
2574  public void preUpdateRSGroupConfig(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2575    final String groupName, final Map<String, String> configuration) throws IOException {
2576    accessChecker.requirePermission(getActiveUser(ctx), "updateRSGroupConfig", null,
2577      Permission.Action.ADMIN);
2578  }
2579
2580  @Override
2581  public void preClearRegionBlockCache(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2582    throws IOException {
2583    accessChecker.requirePermission(getActiveUser(ctx), "clearRegionBlockCache", null,
2584      Permission.Action.ADMIN);
2585  }
2586
2587  @Override
2588  public void preUpdateRegionServerConfiguration(
2589    ObserverContext<RegionServerCoprocessorEnvironment> ctx, Configuration preReloadConf)
2590    throws IOException {
2591    accessChecker.requirePermission(getActiveUser(ctx), "updateConfiguration", null,
2592      Permission.Action.ADMIN);
2593  }
2594
2595  @Override
2596  public void preUpdateMasterConfiguration(ObserverContext<MasterCoprocessorEnvironment> ctx,
2597    Configuration preReloadConf) throws IOException {
2598    accessChecker.requirePermission(getActiveUser(ctx), "updateConfiguration", null,
2599      Permission.Action.ADMIN);
2600  }
2601
2602}