001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hbase.security.access;
020
021import com.google.protobuf.Message;
022import com.google.protobuf.RpcCallback;
023import com.google.protobuf.RpcController;
024import com.google.protobuf.Service;
025import java.io.IOException;
026import java.security.PrivilegedExceptionAction;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.Map.Entry;
035import java.util.Optional;
036import java.util.Set;
037import java.util.TreeMap;
038import java.util.TreeSet;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.ArrayBackedTag;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.CellScanner;
043import org.apache.hadoop.hbase.CellUtil;
044import org.apache.hadoop.hbase.CompareOperator;
045import org.apache.hadoop.hbase.CompoundConfiguration;
046import org.apache.hadoop.hbase.CoprocessorEnvironment;
047import org.apache.hadoop.hbase.DoNotRetryIOException;
048import org.apache.hadoop.hbase.HBaseInterfaceAudience;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.KeyValue;
051import org.apache.hadoop.hbase.KeyValue.Type;
052import org.apache.hadoop.hbase.NamespaceDescriptor;
053import org.apache.hadoop.hbase.PrivateCellUtil;
054import org.apache.hadoop.hbase.ServerName;
055import org.apache.hadoop.hbase.TableName;
056import org.apache.hadoop.hbase.Tag;
057import org.apache.hadoop.hbase.client.Admin;
058import org.apache.hadoop.hbase.client.Append;
059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
061import org.apache.hadoop.hbase.client.Delete;
062import org.apache.hadoop.hbase.client.Durability;
063import org.apache.hadoop.hbase.client.Get;
064import org.apache.hadoop.hbase.client.Increment;
065import org.apache.hadoop.hbase.client.MasterSwitchType;
066import org.apache.hadoop.hbase.client.Mutation;
067import org.apache.hadoop.hbase.client.Put;
068import org.apache.hadoop.hbase.client.Query;
069import org.apache.hadoop.hbase.client.RegionInfo;
070import org.apache.hadoop.hbase.client.Result;
071import org.apache.hadoop.hbase.client.Scan;
072import org.apache.hadoop.hbase.client.SnapshotDescription;
073import org.apache.hadoop.hbase.client.Table;
074import org.apache.hadoop.hbase.client.TableDescriptor;
075import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
076import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
077import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
078import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
079import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
080import org.apache.hadoop.hbase.coprocessor.HasMasterServices;
081import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
082import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
083import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
084import org.apache.hadoop.hbase.coprocessor.MasterObserver;
085import org.apache.hadoop.hbase.coprocessor.ObserverContext;
086import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
087import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
088import org.apache.hadoop.hbase.coprocessor.RegionObserver;
089import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
090import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
091import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
092import org.apache.hadoop.hbase.filter.ByteArrayComparable;
093import org.apache.hadoop.hbase.filter.Filter;
094import org.apache.hadoop.hbase.filter.FilterList;
095import org.apache.hadoop.hbase.io.hfile.HFile;
096import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
097import org.apache.hadoop.hbase.ipc.RpcServer;
098import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
099import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
100import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
101import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings;
102import org.apache.hadoop.hbase.regionserver.BloomType;
103import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
104import org.apache.hadoop.hbase.regionserver.InternalScanner;
105import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
106import org.apache.hadoop.hbase.regionserver.Region;
107import org.apache.hadoop.hbase.regionserver.RegionScanner;
108import org.apache.hadoop.hbase.regionserver.ScanType;
109import org.apache.hadoop.hbase.regionserver.ScannerContext;
110import org.apache.hadoop.hbase.regionserver.Store;
111import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
112import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
113import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
114import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
115import org.apache.hadoop.hbase.security.AccessDeniedException;
116import org.apache.hadoop.hbase.security.Superusers;
117import org.apache.hadoop.hbase.security.User;
118import org.apache.hadoop.hbase.security.UserProvider;
119import org.apache.hadoop.hbase.security.access.Permission.Action;
120import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
121import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
122import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
123import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
124import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
125import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
126import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
127import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
128import org.apache.hadoop.hbase.util.ByteRange;
129import org.apache.hadoop.hbase.util.Bytes;
130import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
131import org.apache.hadoop.hbase.util.Pair;
132import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
133import org.apache.hadoop.hbase.wal.WALEdit;
134import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
135import org.apache.yetus.audience.InterfaceAudience;
136import org.slf4j.Logger;
137import org.slf4j.LoggerFactory;
138
139/**
140 * Provides basic authorization checks for data access and administrative
141 * operations.
142 *
143 * <p>
144 * {@code AccessController} performs authorization checks for HBase operations
145 * based on:
146 * </p>
147 * <ul>
148 *   <li>the identity of the user performing the operation</li>
149 *   <li>the scope over which the operation is performed, in increasing
150 *   specificity: global, table, column family, or qualifier</li>
151 *   <li>the type of action being performed (as mapped to
152 *   {@link Permission.Action} values)</li>
153 * </ul>
154 * <p>
155 * If the authorization check fails, an {@link AccessDeniedException}
156 * will be thrown for the operation.
157 * </p>
158 *
159 * <p>
160 * To perform authorization checks, {@code AccessController} relies on the
161 * RpcServerEngine being loaded to provide
162 * the user identities for remote requests.
163 * </p>
164 *
165 * <p>
166 * The access control lists used for authorization can be manipulated via the
167 * exposed {@link AccessControlService} Interface implementation, and the associated
168 * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
169 * commands.
170 * </p>
171 */
172@CoreCoprocessor
173@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
174public class AccessController implements MasterCoprocessor, RegionCoprocessor,
175    RegionServerCoprocessor, AccessControlService.Interface,
176    MasterObserver, RegionObserver, RegionServerObserver, EndpointObserver, BulkLoadObserver {
177  // TODO: encapsulate observer functions into separate class/sub-class.
178
179  private static final Logger LOG = LoggerFactory.getLogger(AccessController.class);
180
181  private static final Logger AUDITLOG =
182    LoggerFactory.getLogger("SecurityLogger."+AccessController.class.getName());
183  private static final String CHECK_COVERING_PERM = "check_covering_perm";
184  private static final String TAG_CHECK_PASSED = "tag_check_passed";
185  private static final byte[] TRUE = Bytes.toBytes(true);
186
187  private AccessChecker accessChecker;
188
189  /** flags if we are running on a region of the _acl_ table */
190  private boolean aclRegion = false;
191
192  /** defined only for Endpoint implementation, so it can have way to
193   access region services */
194  private RegionCoprocessorEnvironment regionEnv;
195
196  /** Mapping of scanner instances to the user who created them */
197  private Map<InternalScanner,String> scannerOwners =
198      new MapMaker().weakKeys().makeMap();
199
200  private Map<TableName, List<UserPermission>> tableAcls;
201
202  /** Provider for mapping principal names to Users */
203  private UserProvider userProvider;
204
205  /** if we are active, usually false, only true if "hbase.security.authorization"
206   has been set to true in site configuration */
207  private boolean authorizationEnabled;
208
209  /** if we are able to support cell ACLs */
210  private boolean cellFeaturesEnabled;
211
212  /** if we should check EXEC permissions */
213  private boolean shouldCheckExecPermission;
214
215  /** if we should terminate access checks early as soon as table or CF grants
216    allow access; pre-0.98 compatible behavior */
217  private boolean compatibleEarlyTermination;
218
219  /** if we have been successfully initialized */
220  private volatile boolean initialized = false;
221
222  /** if the ACL table is available, only relevant in the master */
223  private volatile boolean aclTabAvailable = false;
224
225  public static boolean isCellAuthorizationSupported(Configuration conf) {
226    return AccessChecker.isAuthorizationSupported(conf) &&
227        (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
228  }
229
230  public Region getRegion() {
231    return regionEnv != null ? regionEnv.getRegion() : null;
232  }
233
234  public TableAuthManager getAuthManager() {
235    return accessChecker.getAuthManager();
236  }
237
238  private void initialize(RegionCoprocessorEnvironment e) throws IOException {
239    final Region region = e.getRegion();
240    Configuration conf = e.getConfiguration();
241    Map<byte[], ListMultimap<String,TablePermission>> tables =
242        AccessControlLists.loadAll(region);
243    // For each table, write out the table's permissions to the respective
244    // znode for that table.
245    for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
246      tables.entrySet()) {
247      byte[] entry = t.getKey();
248      ListMultimap<String,TablePermission> perms = t.getValue();
249      byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
250      getAuthManager().getZKPermissionWatcher().writeToZookeeper(entry, serialized);
251    }
252    initialized = true;
253  }
254
255  /**
256   * Writes all table ACLs for the tables in the given Map up into ZooKeeper
257   * znodes.  This is called to synchronize ACL changes following {@code _acl_}
258   * table updates.
259   */
260  private void updateACL(RegionCoprocessorEnvironment e,
261      final Map<byte[], List<Cell>> familyMap) {
262    Set<byte[]> entries = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
263    for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
264      List<Cell> cells = f.getValue();
265      for (Cell cell: cells) {
266        if (CellUtil.matchingFamily(cell, AccessControlLists.ACL_LIST_FAMILY)) {
267          entries.add(CellUtil.cloneRow(cell));
268        }
269      }
270    }
271    ZKPermissionWatcher zkw = getAuthManager().getZKPermissionWatcher();
272    Configuration conf = regionEnv.getConfiguration();
273    byte [] currentEntry = null;
274    // TODO: Here we are already on the ACL region. (And it is single
275    // region) We can even just get the region from the env and do get
276    // directly. The short circuit connection would avoid the RPC overhead
277    // so no socket communication, req write/read ..  But we have the PB
278    // to and fro conversion overhead. get req is converted to PB req
279    // and results are converted to PB results 1st and then to POJOs
280    // again. We could have avoided such at least in ACL table context..
281    try (Table t = e.getConnection().getTable(AccessControlLists.ACL_TABLE_NAME)) {
282      for (byte[] entry : entries) {
283        currentEntry = entry;
284        ListMultimap<String, TablePermission> perms =
285            AccessControlLists.getPermissions(conf, entry, t);
286        byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
287        zkw.writeToZookeeper(entry, serialized);
288      }
289    } catch(IOException ex) {
290          LOG.error("Failed updating permissions mirror for '" +
291                  (currentEntry == null? "null": Bytes.toString(currentEntry)) + "'", ex);
292    }
293  }
294
295  /**
296   * Check the current user for authorization to perform a specific action
297   * against the given set of row data.
298   *
299   * <p>Note: Ordering of the authorization checks
300   * has been carefully optimized to short-circuit the most common requests
301   * and minimize the amount of processing required.</p>
302   *
303   * @param permRequest the action being requested
304   * @param e the coprocessor environment
305   * @param families the map of column families to qualifiers present in
306   * the request
307   * @return an authorization result
308   */
309  private AuthResult permissionGranted(String request, User user, Action permRequest,
310      RegionCoprocessorEnvironment e,
311      Map<byte [], ? extends Collection<?>> families) {
312    RegionInfo hri = e.getRegion().getRegionInfo();
313    TableName tableName = hri.getTable();
314
315    // 1. All users need read access to hbase:meta table.
316    // this is a very common operation, so deal with it quickly.
317    if (hri.isMetaRegion()) {
318      if (permRequest == Action.READ) {
319        return AuthResult.allow(request, "All users allowed", user,
320          permRequest, tableName, families);
321      }
322    }
323
324    if (user == null) {
325      return AuthResult.deny(request, "No user associated with request!", null,
326        permRequest, tableName, families);
327    }
328
329    // 2. check for the table-level, if successful we can short-circuit
330    if (getAuthManager().authorize(user, tableName, (byte[])null, permRequest)) {
331      return AuthResult.allow(request, "Table permission granted", user,
332        permRequest, tableName, families);
333    }
334
335    // 3. check permissions against the requested families
336    if (families != null && families.size() > 0) {
337      // all families must pass
338      for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
339        // a) check for family level access
340        if (getAuthManager().authorize(user, tableName, family.getKey(),
341            permRequest)) {
342          continue;  // family-level permission overrides per-qualifier
343        }
344
345        // b) qualifier level access can still succeed
346        if ((family.getValue() != null) && (family.getValue().size() > 0)) {
347          if (family.getValue() instanceof Set) {
348            // for each qualifier of the family
349            Set<byte[]> familySet = (Set<byte[]>)family.getValue();
350            for (byte[] qualifier : familySet) {
351              if (!getAuthManager().authorize(user, tableName, family.getKey(),
352                                         qualifier, permRequest)) {
353                return AuthResult.deny(request, "Failed qualifier check", user,
354                    permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
355              }
356            }
357          } else if (family.getValue() instanceof List) { // List<Cell>
358            List<Cell> cellList = (List<Cell>)family.getValue();
359            for (Cell cell : cellList) {
360              if (!getAuthManager().authorize(user, tableName, family.getKey(),
361                CellUtil.cloneQualifier(cell), permRequest)) {
362                return AuthResult.deny(request, "Failed qualifier check", user, permRequest,
363                  tableName, makeFamilyMap(family.getKey(), CellUtil.cloneQualifier(cell)));
364              }
365            }
366          }
367        } else {
368          // no qualifiers and family-level check already failed
369          return AuthResult.deny(request, "Failed family check", user, permRequest,
370              tableName, makeFamilyMap(family.getKey(), null));
371        }
372      }
373
374      // all family checks passed
375      return AuthResult.allow(request, "All family checks passed", user, permRequest,
376          tableName, families);
377    }
378
379    // 4. no families to check and table level access failed
380    return AuthResult.deny(request, "No families to check and table permission failed",
381        user, permRequest, tableName, families);
382  }
383
384  /**
385   * Check the current user for authorization to perform a specific action
386   * against the given set of row data.
387   * @param opType the operation type
388   * @param user the user
389   * @param e the coprocessor environment
390   * @param families the map of column families to qualifiers present in
391   * the request
392   * @param actions the desired actions
393   * @return an authorization result
394   */
395  private AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
396      Map<byte [], ? extends Collection<?>> families, Action... actions) {
397    AuthResult result = null;
398    for (Action action: actions) {
399      result = permissionGranted(opType.toString(), user, action, e, families);
400      if (!result.isAllowed()) {
401        return result;
402      }
403    }
404    return result;
405  }
406
407  public void requireAccess(ObserverContext<?> ctx, String request, TableName tableName,
408      Action... permissions) throws IOException {
409    accessChecker.requireAccess(getActiveUser(ctx), request, tableName, permissions);
410  }
411
412  public void requirePermission(ObserverContext<?> ctx, String request,
413      Action perm) throws IOException {
414    accessChecker.requirePermission(getActiveUser(ctx), request, perm);
415  }
416
417  public void requireGlobalPermission(ObserverContext<?> ctx, String request,
418      Action perm, TableName tableName,
419      Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
420    accessChecker.requireGlobalPermission(getActiveUser(ctx),
421        request, perm,tableName, familyMap);
422  }
423
424  public void requireGlobalPermission(ObserverContext<?> ctx, String request,
425      Action perm, String namespace) throws IOException {
426    accessChecker.requireGlobalPermission(getActiveUser(ctx),
427        request, perm, namespace);
428  }
429
430  public void requireNamespacePermission(ObserverContext<?> ctx, String request, String namespace,
431      Action... permissions) throws IOException {
432    accessChecker.requireNamespacePermission(getActiveUser(ctx),
433        request, namespace, permissions);
434  }
435
436  public void requireNamespacePermission(ObserverContext<?> ctx, String request, String namespace,
437      TableName tableName, Map<byte[], ? extends Collection<byte[]>> familyMap,
438      Action... permissions) throws IOException {
439    accessChecker.requireNamespacePermission(getActiveUser(ctx),
440        request, namespace, tableName, familyMap,
441        permissions);
442  }
443
444  public void requirePermission(ObserverContext<?> ctx, String request, TableName tableName,
445      byte[] family, byte[] qualifier, Action... permissions) throws IOException {
446    accessChecker.requirePermission(getActiveUser(ctx), request,
447        tableName, family, qualifier, permissions);
448  }
449
450  public void requireTablePermission(ObserverContext<?> ctx, String request,
451      TableName tableName,byte[] family, byte[] qualifier,
452      Action... permissions) throws IOException {
453    accessChecker.requireTablePermission(getActiveUser(ctx),
454        request, tableName, family, qualifier, permissions);
455  }
456
457  public void checkLockPermissions(ObserverContext<?> ctx, String namespace,
458      TableName tableName, RegionInfo[] regionInfos, String reason)
459      throws IOException {
460    accessChecker.checkLockPermissions(getActiveUser(ctx),
461        namespace, tableName, regionInfos, reason);
462  }
463
464  /**
465   * Returns <code>true</code> if the current user is allowed the given action
466   * over at least one of the column qualifiers in the given column families.
467   */
468  private boolean hasFamilyQualifierPermission(User user,
469      Action perm,
470      RegionCoprocessorEnvironment env,
471      Map<byte[], ? extends Collection<byte[]>> familyMap)
472    throws IOException {
473    RegionInfo hri = env.getRegion().getRegionInfo();
474    TableName tableName = hri.getTable();
475
476    if (user == null) {
477      return false;
478    }
479
480    if (familyMap != null && familyMap.size() > 0) {
481      // at least one family must be allowed
482      for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
483          familyMap.entrySet()) {
484        if (family.getValue() != null && !family.getValue().isEmpty()) {
485          for (byte[] qualifier : family.getValue()) {
486            if (getAuthManager().matchPermission(user, tableName,
487                family.getKey(), qualifier, perm)) {
488              return true;
489            }
490          }
491        } else {
492          if (getAuthManager().matchPermission(user, tableName, family.getKey(),
493              perm)) {
494            return true;
495          }
496        }
497      }
498    } else if (LOG.isDebugEnabled()) {
499      LOG.debug("Empty family map passed for permission check");
500    }
501
502    return false;
503  }
504
505  private enum OpType {
506    GET("get"),
507    EXISTS("exists"),
508    SCAN("scan"),
509    PUT("put"),
510    DELETE("delete"),
511    CHECK_AND_PUT("checkAndPut"),
512    CHECK_AND_DELETE("checkAndDelete"),
513    INCREMENT_COLUMN_VALUE("incrementColumnValue"),
514    APPEND("append"),
515    INCREMENT("increment");
516
517    private String type;
518
519    private OpType(String type) {
520      this.type = type;
521    }
522
523    @Override
524    public String toString() {
525      return type;
526    }
527  }
528
529  /**
530   * Determine if cell ACLs covered by the operation grant access. This is expensive.
531   * @return false if cell ACLs failed to grant access, true otherwise
532   * @throws IOException
533   */
534  private boolean checkCoveringPermission(User user, OpType request, RegionCoprocessorEnvironment e,
535      byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
536      throws IOException {
537    if (!cellFeaturesEnabled) {
538      return false;
539    }
540    long cellGrants = 0;
541    long latestCellTs = 0;
542    Get get = new Get(row);
543    // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
544    // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
545    // version. We have to get every cell version and check its TS against the TS asked for in
546    // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
547    // consider only one such passing cell. In case of Delete we have to consider all the cell
548    // versions under this passing version. When Delete Mutation contains columns which are a
549    // version delete just consider only one version for those column cells.
550    boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
551    if (considerCellTs) {
552      get.setMaxVersions();
553    } else {
554      get.setMaxVersions(1);
555    }
556    boolean diffCellTsFromOpTs = false;
557    for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
558      byte[] col = entry.getKey();
559      // TODO: HBASE-7114 could possibly unify the collection type in family
560      // maps so we would not need to do this
561      if (entry.getValue() instanceof Set) {
562        Set<byte[]> set = (Set<byte[]>)entry.getValue();
563        if (set == null || set.isEmpty()) {
564          get.addFamily(col);
565        } else {
566          for (byte[] qual: set) {
567            get.addColumn(col, qual);
568          }
569        }
570      } else if (entry.getValue() instanceof List) {
571        List<Cell> list = (List<Cell>)entry.getValue();
572        if (list == null || list.isEmpty()) {
573          get.addFamily(col);
574        } else {
575          // In case of family delete, a Cell will be added into the list with Qualifier as null.
576          for (Cell cell : list) {
577            if (cell.getQualifierLength() == 0
578                && (cell.getTypeByte() == Type.DeleteFamily.getCode()
579                || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
580              get.addFamily(col);
581            } else {
582              get.addColumn(col, CellUtil.cloneQualifier(cell));
583            }
584            if (considerCellTs) {
585              long cellTs = cell.getTimestamp();
586              latestCellTs = Math.max(latestCellTs, cellTs);
587              diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
588            }
589          }
590        }
591      } else if (entry.getValue() == null) {
592        get.addFamily(col);
593      } else {
594        throw new RuntimeException("Unhandled collection type " +
595          entry.getValue().getClass().getName());
596      }
597    }
598    // We want to avoid looking into the future. So, if the cells of the
599    // operation specify a timestamp, or the operation itself specifies a
600    // timestamp, then we use the maximum ts found. Otherwise, we bound
601    // the Get to the current server time. We add 1 to the timerange since
602    // the upper bound of a timerange is exclusive yet we need to examine
603    // any cells found there inclusively.
604    long latestTs = Math.max(opTs, latestCellTs);
605    if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
606      latestTs = EnvironmentEdgeManager.currentTime();
607    }
608    get.setTimeRange(0, latestTs + 1);
609    // In case of Put operation we set to read all versions. This was done to consider the case
610    // where columns are added with TS other than the Mutation TS. But normally this wont be the
611    // case with Put. There no need to get all versions but get latest version only.
612    if (!diffCellTsFromOpTs && request == OpType.PUT) {
613      get.setMaxVersions(1);
614    }
615    if (LOG.isTraceEnabled()) {
616      LOG.trace("Scanning for cells with " + get);
617    }
618    // This Map is identical to familyMap. The key is a BR rather than byte[].
619    // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
620    // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
621    Map<ByteRange, List<Cell>> familyMap1 = new HashMap<>();
622    for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
623      if (entry.getValue() instanceof List) {
624        familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
625      }
626    }
627    RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
628    List<Cell> cells = Lists.newArrayList();
629    Cell prevCell = null;
630    ByteRange curFam = new SimpleMutableByteRange();
631    boolean curColAllVersions = (request == OpType.DELETE);
632    long curColCheckTs = opTs;
633    boolean foundColumn = false;
634    try {
635      boolean more = false;
636      ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
637
638      do {
639        cells.clear();
640        // scan with limit as 1 to hold down memory use on wide rows
641        more = scanner.next(cells, scannerContext);
642        for (Cell cell: cells) {
643          if (LOG.isTraceEnabled()) {
644            LOG.trace("Found cell " + cell);
645          }
646          boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
647          if (colChange) foundColumn = false;
648          prevCell = cell;
649          if (!curColAllVersions && foundColumn) {
650            continue;
651          }
652          if (colChange && considerCellTs) {
653            curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
654            List<Cell> cols = familyMap1.get(curFam);
655            for (Cell col : cols) {
656              // null/empty qualifier is used to denote a Family delete. The TS and delete type
657              // associated with this is applicable for all columns within the family. That is
658              // why the below (col.getQualifierLength() == 0) check.
659              if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
660                  || CellUtil.matchingQualifier(cell, col)) {
661                byte type = col.getTypeByte();
662                if (considerCellTs) {
663                  curColCheckTs = col.getTimestamp();
664                }
665                // For a Delete op we pass allVersions as true. When a Delete Mutation contains
666                // a version delete for a column no need to check all the covering cells within
667                // that column. Check all versions when Type is DeleteColumn or DeleteFamily
668                // One version delete types are Delete/DeleteFamilyVersion
669                curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
670                    || (KeyValue.Type.DeleteFamily.getCode() == type);
671                break;
672              }
673            }
674          }
675          if (cell.getTimestamp() > curColCheckTs) {
676            // Just ignore this cell. This is not a covering cell.
677            continue;
678          }
679          foundColumn = true;
680          for (Action action: actions) {
681            // Are there permissions for this user for the cell?
682            if (!getAuthManager().authorize(user, getTableName(e), cell, action)) {
683              // We can stop if the cell ACL denies access
684              return false;
685            }
686          }
687          cellGrants++;
688        }
689      } while (more);
690    } catch (AccessDeniedException ex) {
691      throw ex;
692    } catch (IOException ex) {
693      LOG.error("Exception while getting cells to calculate covering permission", ex);
694    } finally {
695      scanner.close();
696    }
697    // We should not authorize unless we have found one or more cell ACLs that
698    // grant access. This code is used to check for additional permissions
699    // after no table or CF grants are found.
700    return cellGrants > 0;
701  }
702
703  private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
704    // Iterate over the entries in the familyMap, replacing the cells therein
705    // with new cells including the ACL data
706    for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
707      List<Cell> newCells = Lists.newArrayList();
708      for (Cell cell: e.getValue()) {
709        // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
710        List<Tag> tags = new ArrayList<>();
711        tags.add(new ArrayBackedTag(AccessControlLists.ACL_TAG_TYPE, perms));
712        Iterator<Tag> tagIterator = PrivateCellUtil.tagsIterator(cell);
713        while (tagIterator.hasNext()) {
714          tags.add(tagIterator.next());
715        }
716        newCells.add(PrivateCellUtil.createCell(cell, tags));
717      }
718      // This is supposed to be safe, won't CME
719      e.setValue(newCells);
720    }
721  }
722
723  // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
724  // type is reserved and should not be explicitly set by user.
725  private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
726    // No need to check if we're not going to throw
727    if (!authorizationEnabled) {
728      m.setAttribute(TAG_CHECK_PASSED, TRUE);
729      return;
730    }
731    // Superusers are allowed to store cells unconditionally.
732    if (Superusers.isSuperUser(user)) {
733      m.setAttribute(TAG_CHECK_PASSED, TRUE);
734      return;
735    }
736    // We already checked (prePut vs preBatchMutation)
737    if (m.getAttribute(TAG_CHECK_PASSED) != null) {
738      return;
739    }
740    for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
741      Iterator<Tag> tagsItr = PrivateCellUtil.tagsIterator(cellScanner.current());
742      while (tagsItr.hasNext()) {
743        if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
744          throw new AccessDeniedException("Mutation contains cell with reserved type tag");
745        }
746      }
747    }
748    m.setAttribute(TAG_CHECK_PASSED, TRUE);
749  }
750
751  /* ---- MasterObserver implementation ---- */
752  @Override
753  public void start(CoprocessorEnvironment env) throws IOException {
754    CompoundConfiguration conf = new CompoundConfiguration();
755    conf.add(env.getConfiguration());
756
757    authorizationEnabled = AccessChecker.isAuthorizationSupported(conf);
758    if (!authorizationEnabled) {
759      LOG.warn("AccessController has been loaded with authorization checks DISABLED!");
760    }
761
762    shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
763      AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
764
765    cellFeaturesEnabled = (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
766    if (!cellFeaturesEnabled) {
767      LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
768          + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
769          + " accordingly.");
770    }
771
772    ZKWatcher zk = null;
773    if (env instanceof MasterCoprocessorEnvironment) {
774      // if running on HMaster
775      MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment)env;
776      if (mEnv instanceof HasMasterServices) {
777        zk = ((HasMasterServices)mEnv).getMasterServices().getZooKeeper();
778      }
779    } else if (env instanceof RegionServerCoprocessorEnvironment) {
780      RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment)env;
781      if (rsEnv instanceof HasRegionServerServices) {
782        zk = ((HasRegionServerServices)rsEnv).getRegionServerServices().getZooKeeper();
783      }
784    } else if (env instanceof RegionCoprocessorEnvironment) {
785      // if running at region
786      regionEnv = (RegionCoprocessorEnvironment) env;
787      conf.addBytesMap(regionEnv.getRegion().getTableDescriptor().getValues());
788      compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
789          AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
790      if (regionEnv instanceof HasRegionServerServices) {
791        zk = ((HasRegionServerServices)regionEnv).getRegionServerServices().getZooKeeper();
792      }
793    }
794
795    // set the user-provider.
796    this.userProvider = UserProvider.instantiate(env.getConfiguration());
797    // Throws RuntimeException if fails to load TableAuthManager so that coprocessor is unloaded.
798    accessChecker = new AccessChecker(env.getConfiguration(), zk);
799    tableAcls = new MapMaker().weakValues().makeMap();
800  }
801
802  @Override
803  public void stop(CoprocessorEnvironment env) {
804    accessChecker.stop();
805  }
806
807  /*********************************** Observer/Service Getters ***********************************/
808  @Override
809  public Optional<RegionObserver> getRegionObserver() {
810    return Optional.of(this);
811  }
812
813  @Override
814  public Optional<MasterObserver> getMasterObserver() {
815    return Optional.of(this);
816  }
817
818  @Override
819  public Optional<EndpointObserver> getEndpointObserver() {
820    return Optional.of(this);
821  }
822
823  @Override
824  public Optional<BulkLoadObserver> getBulkLoadObserver() {
825    return Optional.of(this);
826  }
827
828  @Override
829  public Optional<RegionServerObserver> getRegionServerObserver() {
830    return Optional.of(this);
831  }
832
833  @Override
834  public Iterable<Service> getServices() {
835    return Collections.singleton(
836        AccessControlProtos.AccessControlService.newReflectiveService(this));
837  }
838
839  /*********************************** Observer implementations ***********************************/
840
841  @Override
842  public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
843      TableDescriptor desc, RegionInfo[] regions) throws IOException {
844    Set<byte[]> families = desc.getColumnFamilyNames();
845    Map<byte[], Set<byte[]>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
846    for (byte[] family: families) {
847      familyMap.put(family, null);
848    }
849    requireNamespacePermission(c, "createTable",
850        desc.getTableName().getNamespaceAsString(), desc.getTableName(), familyMap, Action.CREATE);
851  }
852
853  @Override
854  public void postCompletedCreateTableAction(
855      final ObserverContext<MasterCoprocessorEnvironment> c,
856      final TableDescriptor desc,
857      final RegionInfo[] regions) throws IOException {
858    // When AC is used, it should be configured as the 1st CP.
859    // In Master, the table operations like create, are handled by a Thread pool but the max size
860    // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
861    // sequentially only.
862    // Related code in HMaster#startServiceThreads
863    // {code}
864    //   // We depend on there being only one instance of this executor running
865    //   // at a time. To do concurrency, would need fencing of enable/disable of
866    //   // tables.
867    //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
868    // {code}
869    // In future if we change this pool to have more threads, then there is a chance for thread,
870    // creating acl table, getting delayed and by that time another table creation got over and
871    // this hook is getting called. In such a case, we will need a wait logic here which will
872    // wait till the acl table is created.
873    if (AccessControlLists.isAclTable(desc)) {
874      this.aclTabAvailable = true;
875    } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
876      if (!aclTabAvailable) {
877        LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
878            + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
879            + getClass().getSimpleName() + " should be configured as the first Coprocessor");
880      } else {
881        String owner = desc.getOwnerString();
882        // default the table owner to current user, if not specified.
883        if (owner == null)
884          owner = getActiveUser(c).getShortName();
885        final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
886            desc.getTableName(), null, Action.values());
887        // switch to the real hbase master user for doing the RPC on the ACL table
888        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
889          @Override
890          public Void run() throws Exception {
891            try (Table table = c.getEnvironment().getConnection().
892                getTable(AccessControlLists.ACL_TABLE_NAME)) {
893              AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
894                  userperm, table);
895            }
896            return null;
897          }
898        });
899      }
900    }
901  }
902
903  @Override
904  public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
905      throws IOException {
906    requirePermission(c, "deleteTable",
907        tableName, null, null, Action.ADMIN, Action.CREATE);
908  }
909
910  @Override
911  public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
912      final TableName tableName) throws IOException {
913    final Configuration conf = c.getEnvironment().getConfiguration();
914    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
915      @Override
916      public Void run() throws Exception {
917        try (Table table = c.getEnvironment().getConnection().
918            getTable(AccessControlLists.ACL_TABLE_NAME)) {
919          AccessControlLists.removeTablePermissions(conf, tableName, table);
920        }
921        return null;
922      }
923    });
924    getAuthManager().getZKPermissionWatcher().deleteTableACLNode(tableName);
925  }
926
927  @Override
928  public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
929      final TableName tableName) throws IOException {
930    requirePermission(c, "truncateTable",
931        tableName, null, null, Action.ADMIN, Action.CREATE);
932
933    final Configuration conf = c.getEnvironment().getConfiguration();
934    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
935      @Override
936      public Void run() throws Exception {
937        List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
938        if (acls != null) {
939          tableAcls.put(tableName, acls);
940        }
941        return null;
942      }
943    });
944  }
945
946  @Override
947  public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
948      final TableName tableName) throws IOException {
949    final Configuration conf = ctx.getEnvironment().getConfiguration();
950    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
951      @Override
952      public Void run() throws Exception {
953        List<UserPermission> perms = tableAcls.get(tableName);
954        if (perms != null) {
955          for (UserPermission perm : perms) {
956            try (Table table = ctx.getEnvironment().getConnection().
957                getTable(AccessControlLists.ACL_TABLE_NAME)) {
958              AccessControlLists.addUserPermission(conf, perm, table);
959            }
960          }
961        }
962        tableAcls.remove(tableName);
963        return null;
964      }
965    });
966  }
967
968  @Override
969  public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
970      TableDescriptor htd) throws IOException {
971    // TODO: potentially check if this is a add/modify/delete column operation
972    requirePermission(c, "modifyTable",
973        tableName, null, null, Action.ADMIN, Action.CREATE);
974  }
975
976  @Override
977  public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
978      TableName tableName, final TableDescriptor htd) throws IOException {
979    final Configuration conf = c.getEnvironment().getConfiguration();
980    // default the table owner to current user, if not specified.
981    final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
982      getActiveUser(c).getShortName();
983    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
984      @Override
985      public Void run() throws Exception {
986        UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
987            htd.getTableName(), null, Action.values());
988        try (Table table = c.getEnvironment().getConnection().
989            getTable(AccessControlLists.ACL_TABLE_NAME)) {
990          AccessControlLists.addUserPermission(conf, userperm, table);
991        }
992        return null;
993      }
994    });
995  }
996
997  @Override
998  public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
999      throws IOException {
1000    requirePermission(c, "enableTable",
1001        tableName, null, null, Action.ADMIN, Action.CREATE);
1002  }
1003
1004  @Override
1005  public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1006      throws IOException {
1007    if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1008      // We have to unconditionally disallow disable of the ACL table when we are installed,
1009      // even if not enforcing authorizations. We are still allowing grants and revocations,
1010      // checking permissions and logging audit messages, etc. If the ACL table is not
1011      // available we will fail random actions all over the place.
1012      throw new AccessDeniedException("Not allowed to disable "
1013          + AccessControlLists.ACL_TABLE_NAME + " table with AccessController installed");
1014    }
1015    requirePermission(c, "disableTable",
1016        tableName, null, null, Action.ADMIN, Action.CREATE);
1017  }
1018
1019  @Override
1020  public void preAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx,
1021      final long procId) throws IOException {
1022    requirePermission(ctx, "abortProcedure", Action.ADMIN);
1023  }
1024
1025  @Override
1026  public void postAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx)
1027      throws IOException {
1028    // There is nothing to do at this time after the procedure abort request was sent.
1029  }
1030
1031  @Override
1032  public void preGetProcedures(ObserverContext<MasterCoprocessorEnvironment> ctx)
1033      throws IOException {
1034    requirePermission(ctx, "getProcedure", Action.ADMIN);
1035  }
1036
1037  @Override
1038  public void preGetLocks(ObserverContext<MasterCoprocessorEnvironment> ctx)
1039      throws IOException {
1040    User user = getActiveUser(ctx);
1041    accessChecker.requirePermission(user, "getLocks", Action.ADMIN);
1042  }
1043
1044  @Override
1045  public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo region,
1046      ServerName srcServer, ServerName destServer) throws IOException {
1047    requirePermission(c, "move",
1048        region.getTable(), null, null, Action.ADMIN);
1049  }
1050
1051  @Override
1052  public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo regionInfo)
1053      throws IOException {
1054    requirePermission(c, "assign",
1055        regionInfo.getTable(), null, null, Action.ADMIN);
1056  }
1057
1058  @Override
1059  public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo regionInfo,
1060      boolean force) throws IOException {
1061    requirePermission(c, "unassign",
1062        regionInfo.getTable(), null, null, Action.ADMIN);
1063  }
1064
1065  @Override
1066  public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1067      RegionInfo regionInfo) throws IOException {
1068    requirePermission(c, "regionOffline",
1069        regionInfo.getTable(), null, null, Action.ADMIN);
1070  }
1071
1072  @Override
1073  public void preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1074      final boolean newValue, final MasterSwitchType switchType) throws IOException {
1075    requirePermission(ctx, "setSplitOrMergeEnabled",
1076        Action.ADMIN);
1077  }
1078
1079  @Override
1080  public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1081      throws IOException {
1082    requirePermission(c, "balance", Action.ADMIN);
1083  }
1084
1085  @Override
1086  public void preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1087      boolean newValue) throws IOException {
1088    requirePermission(c, "balanceSwitch", Action.ADMIN);
1089  }
1090
1091  @Override
1092  public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1093      throws IOException {
1094    requirePermission(c, "shutdown", Action.ADMIN);
1095  }
1096
1097  @Override
1098  public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1099      throws IOException {
1100    requirePermission(c, "stopMaster", Action.ADMIN);
1101  }
1102
1103  @Override
1104  public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1105      throws IOException {
1106    try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) {
1107      if (!admin.tableExists(AccessControlLists.ACL_TABLE_NAME)) {
1108        createACLTable(admin);
1109      } else {
1110        this.aclTabAvailable = true;
1111      }
1112    }
1113  }
1114  /**
1115   * Create the ACL table
1116   * @throws IOException
1117   */
1118  private static void createACLTable(Admin admin) throws IOException {
1119    /** Table descriptor for ACL table */
1120    ColumnFamilyDescriptor cfd =
1121        ColumnFamilyDescriptorBuilder.newBuilder(AccessControlLists.ACL_LIST_FAMILY).
1122        setMaxVersions(1).
1123        setInMemory(true).
1124        setBlockCacheEnabled(true).
1125        setBlocksize(8 * 1024).
1126        setBloomFilterType(BloomType.NONE).
1127        setScope(HConstants.REPLICATION_SCOPE_LOCAL).build();
1128    TableDescriptor td =
1129        TableDescriptorBuilder.newBuilder(AccessControlLists.ACL_TABLE_NAME).
1130          setColumnFamily(cfd).build();
1131    admin.createTable(td);
1132  }
1133
1134  @Override
1135  public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1136      final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor)
1137      throws IOException {
1138    // Move this ACL check to SnapshotManager#checkPermissions as part of AC deprecation.
1139    requirePermission(ctx, "snapshot " + snapshot.getName(),
1140        hTableDescriptor.getTableName(), null, null, Permission.Action.ADMIN);
1141  }
1142
1143  @Override
1144  public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1145      final SnapshotDescription snapshot) throws IOException {
1146    User user = getActiveUser(ctx);
1147    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1148      // list it, if user is the owner of snapshot
1149      AuthResult result = AuthResult.allow("listSnapshot " + snapshot.getName(),
1150          "Snapshot owner check allowed", user, null, null, null);
1151      AccessChecker.logResult(result);
1152    } else {
1153      accessChecker.requirePermission(user, "listSnapshot " + snapshot.getName(), Action.ADMIN);
1154    }
1155  }
1156
1157  @Override
1158  public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1159      final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor)
1160      throws IOException {
1161    User user = getActiveUser(ctx);
1162    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)
1163        && hTableDescriptor.getTableName().getNameAsString().equals(snapshot.getTable())) {
1164      // Snapshot owner is allowed to create a table with the same name as the snapshot he took
1165      AuthResult result = AuthResult.allow("cloneSnapshot " + snapshot.getName(),
1166        "Snapshot owner check allowed", user, null, hTableDescriptor.getTableName(), null);
1167      AccessChecker.logResult(result);
1168    } else {
1169      accessChecker.requirePermission(user, "cloneSnapshot " + snapshot.getName(), Action.ADMIN);
1170    }
1171  }
1172
1173  @Override
1174  public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1175      final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor)
1176      throws IOException {
1177    User user = getActiveUser(ctx);
1178    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1179      accessChecker.requirePermission(user, "restoreSnapshot " + snapshot.getName(),
1180          hTableDescriptor.getTableName(), null, null, Permission.Action.ADMIN);
1181    } else {
1182      accessChecker.requirePermission(user, "restoreSnapshot " + snapshot.getName(), Action.ADMIN);
1183    }
1184  }
1185
1186  @Override
1187  public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1188      final SnapshotDescription snapshot) throws IOException {
1189    User user = getActiveUser(ctx);
1190    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1191      // Snapshot owner is allowed to delete the snapshot
1192      AuthResult result = AuthResult.allow("deleteSnapshot " + snapshot.getName(),
1193          "Snapshot owner check allowed", user, null, null, null);
1194      AccessChecker.logResult(result);
1195    } else {
1196      accessChecker.requirePermission(user, "deleteSnapshot " + snapshot.getName(), Action.ADMIN);
1197    }
1198  }
1199
1200  @Override
1201  public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1202      NamespaceDescriptor ns) throws IOException {
1203    requireGlobalPermission(ctx, "createNamespace",
1204        Action.ADMIN, ns.getName());
1205  }
1206
1207  @Override
1208  public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1209      throws IOException {
1210    requireGlobalPermission(ctx, "deleteNamespace",
1211        Action.ADMIN, namespace);
1212  }
1213
1214  @Override
1215  public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1216      final String namespace) throws IOException {
1217    final Configuration conf = ctx.getEnvironment().getConfiguration();
1218    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1219      @Override
1220      public Void run() throws Exception {
1221        try (Table table = ctx.getEnvironment().getConnection().
1222            getTable(AccessControlLists.ACL_TABLE_NAME)) {
1223          AccessControlLists.removeNamespacePermissions(conf, namespace, table);
1224        }
1225        return null;
1226      }
1227    });
1228    getAuthManager().getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
1229    LOG.info(namespace + " entry deleted in " + AccessControlLists.ACL_TABLE_NAME + " table.");
1230  }
1231
1232  @Override
1233  public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1234      NamespaceDescriptor ns) throws IOException {
1235    // We require only global permission so that
1236    // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1237    requireGlobalPermission(ctx, "modifyNamespace",
1238        Action.ADMIN, ns.getName());
1239  }
1240
1241  @Override
1242  public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1243      throws IOException {
1244    requireNamespacePermission(ctx, "getNamespaceDescriptor",
1245        namespace, Action.ADMIN);
1246  }
1247
1248  @Override
1249  public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1250      List<NamespaceDescriptor> descriptors) throws IOException {
1251    // Retains only those which passes authorization checks, as the checks weren't done as part
1252    // of preGetTableDescriptors.
1253    Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1254    User user = getActiveUser(ctx);
1255    while (itr.hasNext()) {
1256      NamespaceDescriptor desc = itr.next();
1257      try {
1258        accessChecker.requireNamespacePermission(user, "listNamespaces",
1259            desc.getName(), Action.ADMIN);
1260      } catch (AccessDeniedException e) {
1261        itr.remove();
1262      }
1263    }
1264  }
1265
1266  @Override
1267  public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1268      final TableName tableName) throws IOException {
1269    // Move this ACL check to MasterFlushTableProcedureManager#checkPermissions as part of AC
1270    // deprecation.
1271    requirePermission(ctx, "flushTable", tableName,
1272        null, null, Action.ADMIN, Action.CREATE);
1273  }
1274
1275  @Override
1276  public void preSplitRegion(
1277      final ObserverContext<MasterCoprocessorEnvironment> ctx,
1278      final TableName tableName,
1279      final byte[] splitRow) throws IOException {
1280    requirePermission(ctx, "split", tableName,
1281        null, null, Action.ADMIN);
1282  }
1283
1284  @Override
1285  public void preClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
1286      throws IOException {
1287    requirePermission(ctx, "clearDeadServers", Action.ADMIN);
1288  }
1289
1290  @Override
1291  public void preDecommissionRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
1292      List<ServerName> servers, boolean offload) throws IOException {
1293    requirePermission(ctx, "decommissionRegionServers", Action.ADMIN);
1294  }
1295
1296  @Override
1297  public void preListDecommissionedRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
1298      throws IOException {
1299    requirePermission(ctx, "listDecommissionedRegionServers",
1300        Action.ADMIN);
1301  }
1302
1303  @Override
1304  public void preRecommissionRegionServer(ObserverContext<MasterCoprocessorEnvironment> ctx,
1305      ServerName server, List<byte[]> encodedRegionNames) throws IOException {
1306    requirePermission(ctx, "recommissionRegionServers", Action.ADMIN);
1307  }
1308
1309  /* ---- RegionObserver implementation ---- */
1310
1311  @Override
1312  public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c)
1313      throws IOException {
1314    RegionCoprocessorEnvironment env = c.getEnvironment();
1315    final Region region = env.getRegion();
1316    if (region == null) {
1317      LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1318    } else {
1319      RegionInfo regionInfo = region.getRegionInfo();
1320      if (regionInfo.getTable().isSystemTable()) {
1321        checkSystemOrSuperUser(getActiveUser(c));
1322      } else {
1323        requirePermission(c, "preOpen", Action.ADMIN);
1324      }
1325    }
1326  }
1327
1328  @Override
1329  public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1330    RegionCoprocessorEnvironment env = c.getEnvironment();
1331    final Region region = env.getRegion();
1332    if (region == null) {
1333      LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1334      return;
1335    }
1336    if (AccessControlLists.isAclRegion(region)) {
1337      aclRegion = true;
1338      try {
1339        initialize(env);
1340      } catch (IOException ex) {
1341        // if we can't obtain permissions, it's better to fail
1342        // than perform checks incorrectly
1343        throw new RuntimeException("Failed to initialize permissions cache", ex);
1344      }
1345    } else {
1346      initialized = true;
1347    }
1348  }
1349
1350  @Override
1351  public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
1352      FlushLifeCycleTracker tracker) throws IOException {
1353    requirePermission(c, "flush", getTableName(c.getEnvironment()),
1354        null, null, Action.ADMIN, Action.CREATE);
1355  }
1356
1357  @Override
1358  public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
1359      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
1360      CompactionRequest request) throws IOException {
1361    requirePermission(c, "compact", getTableName(c.getEnvironment()),
1362        null, null, Action.ADMIN, Action.CREATE);
1363    return scanner;
1364  }
1365
1366  private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1367      final Query query, OpType opType) throws IOException {
1368    Filter filter = query.getFilter();
1369    // Don't wrap an AccessControlFilter
1370    if (filter != null && filter instanceof AccessControlFilter) {
1371      return;
1372    }
1373    User user = getActiveUser(c);
1374    RegionCoprocessorEnvironment env = c.getEnvironment();
1375    Map<byte[],? extends Collection<byte[]>> families = null;
1376    switch (opType) {
1377    case GET:
1378    case EXISTS:
1379      families = ((Get)query).getFamilyMap();
1380      break;
1381    case SCAN:
1382      families = ((Scan)query).getFamilyMap();
1383      break;
1384    default:
1385      throw new RuntimeException("Unhandled operation " + opType);
1386    }
1387    AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1388    Region region = getRegion(env);
1389    TableName table = getTableName(region);
1390    Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1391    for (ColumnFamilyDescriptor hcd : region.getTableDescriptor().getColumnFamilies()) {
1392      cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1393    }
1394    if (!authResult.isAllowed()) {
1395      if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1396        // Old behavior: Scan with only qualifier checks if we have partial
1397        // permission. Backwards compatible behavior is to throw an
1398        // AccessDeniedException immediately if there are no grants for table
1399        // or CF or CF+qual. Only proceed with an injected filter if there are
1400        // grants for qualifiers. Otherwise we will fall through below and log
1401        // the result and throw an ADE. We may end up checking qualifier
1402        // grants three times (permissionGranted above, here, and in the
1403        // filter) but that's the price of backwards compatibility.
1404        if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1405          authResult.setAllowed(true);
1406          authResult.setReason("Access allowed with filter");
1407          // Only wrap the filter if we are enforcing authorizations
1408          if (authorizationEnabled) {
1409            Filter ourFilter = new AccessControlFilter(getAuthManager(), user, table,
1410              AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1411              cfVsMaxVersions);
1412            // wrap any existing filter
1413            if (filter != null) {
1414              ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1415                Lists.newArrayList(ourFilter, filter));
1416            }
1417            switch (opType) {
1418              case GET:
1419              case EXISTS:
1420                ((Get)query).setFilter(ourFilter);
1421                break;
1422              case SCAN:
1423                ((Scan)query).setFilter(ourFilter);
1424                break;
1425              default:
1426                throw new RuntimeException("Unhandled operation " + opType);
1427            }
1428          }
1429        }
1430      } else {
1431        // New behavior: Any access we might be granted is more fine-grained
1432        // than whole table or CF. Simply inject a filter and return what is
1433        // allowed. We will not throw an AccessDeniedException. This is a
1434        // behavioral change since 0.96.
1435        authResult.setAllowed(true);
1436        authResult.setReason("Access allowed with filter");
1437        // Only wrap the filter if we are enforcing authorizations
1438        if (authorizationEnabled) {
1439          Filter ourFilter = new AccessControlFilter(getAuthManager(), user, table,
1440            AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1441          // wrap any existing filter
1442          if (filter != null) {
1443            ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1444              Lists.newArrayList(ourFilter, filter));
1445          }
1446          switch (opType) {
1447            case GET:
1448            case EXISTS:
1449              ((Get)query).setFilter(ourFilter);
1450              break;
1451            case SCAN:
1452              ((Scan)query).setFilter(ourFilter);
1453              break;
1454            default:
1455              throw new RuntimeException("Unhandled operation " + opType);
1456          }
1457        }
1458      }
1459    }
1460
1461    AccessChecker.logResult(authResult);
1462    if (authorizationEnabled && !authResult.isAllowed()) {
1463      throw new AccessDeniedException("Insufficient permissions for user '"
1464          + (user != null ? user.getShortName() : "null")
1465          + "' (table=" + table + ", action=READ)");
1466    }
1467  }
1468
1469  @Override
1470  public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1471      final Get get, final List<Cell> result) throws IOException {
1472    internalPreRead(c, get, OpType.GET);
1473  }
1474
1475  @Override
1476  public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1477      final Get get, final boolean exists) throws IOException {
1478    internalPreRead(c, get, OpType.EXISTS);
1479    return exists;
1480  }
1481
1482  @Override
1483  public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1484      final Put put, final WALEdit edit, final Durability durability)
1485      throws IOException {
1486    User user = getActiveUser(c);
1487    checkForReservedTagPresence(user, put);
1488
1489    // Require WRITE permission to the table, CF, or top visible value, if any.
1490    // NOTE: We don't need to check the permissions for any earlier Puts
1491    // because we treat the ACLs in each Put as timestamped like any other
1492    // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1493    // change the ACL of any previous Put. This allows simple evolution of
1494    // security policy over time without requiring expensive updates.
1495    RegionCoprocessorEnvironment env = c.getEnvironment();
1496    Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1497    AuthResult authResult = permissionGranted(OpType.PUT,
1498        user, env, families, Action.WRITE);
1499    AccessChecker.logResult(authResult);
1500    if (!authResult.isAllowed()) {
1501      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1502        put.setAttribute(CHECK_COVERING_PERM, TRUE);
1503      } else if (authorizationEnabled) {
1504        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1505      }
1506    }
1507
1508    // Add cell ACLs from the operation to the cells themselves
1509    byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1510    if (bytes != null) {
1511      if (cellFeaturesEnabled) {
1512        addCellPermissions(bytes, put.getFamilyCellMap());
1513      } else {
1514        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1515      }
1516    }
1517  }
1518
1519  @Override
1520  public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1521      final Put put, final WALEdit edit, final Durability durability) {
1522    if (aclRegion) {
1523      updateACL(c.getEnvironment(), put.getFamilyCellMap());
1524    }
1525  }
1526
1527  @Override
1528  public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1529      final Delete delete, final WALEdit edit, final Durability durability)
1530      throws IOException {
1531    // An ACL on a delete is useless, we shouldn't allow it
1532    if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1533      throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1534    }
1535    // Require WRITE permissions on all cells covered by the delete. Unlike
1536    // for Puts we need to check all visible prior versions, because a major
1537    // compaction could remove them. If the user doesn't have permission to
1538    // overwrite any of the visible versions ('visible' defined as not covered
1539    // by a tombstone already) then we have to disallow this operation.
1540    RegionCoprocessorEnvironment env = c.getEnvironment();
1541    Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1542    User user = getActiveUser(c);
1543    AuthResult authResult = permissionGranted(OpType.DELETE,
1544        user, env, families, Action.WRITE);
1545    AccessChecker.logResult(authResult);
1546    if (!authResult.isAllowed()) {
1547      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1548        delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1549      } else if (authorizationEnabled) {
1550        throw new AccessDeniedException("Insufficient permissions " +
1551          authResult.toContextString());
1552      }
1553    }
1554  }
1555
1556  @Override
1557  public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1558      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1559    if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1560      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1561      User user = getActiveUser(c);
1562      for (int i = 0; i < miniBatchOp.size(); i++) {
1563        Mutation m = miniBatchOp.getOperation(i);
1564        if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1565          // We have a failure with table, cf and q perm checks and now giving a chance for cell
1566          // perm check
1567          OpType opType;
1568          if (m instanceof Put) {
1569            checkForReservedTagPresence(user, m);
1570            opType = OpType.PUT;
1571          } else {
1572            opType = OpType.DELETE;
1573          }
1574          AuthResult authResult = null;
1575          if (checkCoveringPermission(user, opType, c.getEnvironment(), m.getRow(),
1576            m.getFamilyCellMap(), m.getTimestamp(), Action.WRITE)) {
1577            authResult = AuthResult.allow(opType.toString(), "Covering cell set",
1578              user, Action.WRITE, table, m.getFamilyCellMap());
1579          } else {
1580            authResult = AuthResult.deny(opType.toString(), "Covering cell set",
1581              user, Action.WRITE, table, m.getFamilyCellMap());
1582          }
1583          AccessChecker.logResult(authResult);
1584          if (authorizationEnabled && !authResult.isAllowed()) {
1585            throw new AccessDeniedException("Insufficient permissions "
1586              + authResult.toContextString());
1587          }
1588        }
1589      }
1590    }
1591  }
1592
1593  @Override
1594  public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1595      final Delete delete, final WALEdit edit, final Durability durability)
1596      throws IOException {
1597    if (aclRegion) {
1598      updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1599    }
1600  }
1601
1602  @Override
1603  public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1604      final byte [] row, final byte [] family, final byte [] qualifier,
1605      final CompareOperator op,
1606      final ByteArrayComparable comparator, final Put put,
1607      final boolean result) throws IOException {
1608    User user = getActiveUser(c);
1609    checkForReservedTagPresence(user, put);
1610
1611    // Require READ and WRITE permissions on the table, CF, and KV to update
1612    RegionCoprocessorEnvironment env = c.getEnvironment();
1613    Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1614    AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT,
1615        user, env, families, Action.READ, Action.WRITE);
1616    AccessChecker.logResult(authResult);
1617    if (!authResult.isAllowed()) {
1618      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1619        put.setAttribute(CHECK_COVERING_PERM, TRUE);
1620      } else if (authorizationEnabled) {
1621        throw new AccessDeniedException("Insufficient permissions " +
1622          authResult.toContextString());
1623      }
1624    }
1625
1626    byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1627    if (bytes != null) {
1628      if (cellFeaturesEnabled) {
1629        addCellPermissions(bytes, put.getFamilyCellMap());
1630      } else {
1631        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1632      }
1633    }
1634    return result;
1635  }
1636
1637  @Override
1638  public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1639      final byte[] row, final byte[] family, final byte[] qualifier,
1640      final CompareOperator opp, final ByteArrayComparable comparator, final Put put,
1641      final boolean result) throws IOException {
1642    if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1643      // We had failure with table, cf and q perm checks and now giving a chance for cell
1644      // perm check
1645      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1646      Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1647      AuthResult authResult = null;
1648      User user = getActiveUser(c);
1649      if (checkCoveringPermission(user, OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1650          HConstants.LATEST_TIMESTAMP, Action.READ)) {
1651        authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(),
1652            "Covering cell set", user, Action.READ, table, families);
1653      } else {
1654        authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(),
1655            "Covering cell set", user, Action.READ, table, families);
1656      }
1657      AccessChecker.logResult(authResult);
1658      if (authorizationEnabled && !authResult.isAllowed()) {
1659        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1660      }
1661    }
1662    return result;
1663  }
1664
1665  @Override
1666  public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1667      final byte [] row, final byte [] family, final byte [] qualifier,
1668      final CompareOperator op,
1669      final ByteArrayComparable comparator, final Delete delete,
1670      final boolean result) throws IOException {
1671    // An ACL on a delete is useless, we shouldn't allow it
1672    if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1673      throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1674          delete.toString());
1675    }
1676    // Require READ and WRITE permissions on the table, CF, and the KV covered
1677    // by the delete
1678    RegionCoprocessorEnvironment env = c.getEnvironment();
1679    Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1680    User user = getActiveUser(c);
1681    AuthResult authResult = permissionGranted(
1682        OpType.CHECK_AND_DELETE, user, env, families, Action.READ, Action.WRITE);
1683    AccessChecker.logResult(authResult);
1684    if (!authResult.isAllowed()) {
1685      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1686        delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1687      } else if (authorizationEnabled) {
1688        throw new AccessDeniedException("Insufficient permissions " +
1689          authResult.toContextString());
1690      }
1691    }
1692    return result;
1693  }
1694
1695  @Override
1696  public boolean preCheckAndDeleteAfterRowLock(
1697      final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row,
1698      final byte[] family, final byte[] qualifier, final CompareOperator op,
1699      final ByteArrayComparable comparator, final Delete delete, final boolean result)
1700      throws IOException {
1701    if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1702      // We had failure with table, cf and q perm checks and now giving a chance for cell
1703      // perm check
1704      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1705      Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1706      AuthResult authResult = null;
1707      User user = getActiveUser(c);
1708      if (checkCoveringPermission(user, OpType.CHECK_AND_DELETE, c.getEnvironment(),
1709          row, families, HConstants.LATEST_TIMESTAMP, Action.READ)) {
1710        authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(),
1711            "Covering cell set", user, Action.READ, table, families);
1712      } else {
1713        authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(),
1714            "Covering cell set", user, Action.READ, table, families);
1715      }
1716      AccessChecker.logResult(authResult);
1717      if (authorizationEnabled && !authResult.isAllowed()) {
1718        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1719      }
1720    }
1721    return result;
1722  }
1723
1724  @Override
1725  public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1726      throws IOException {
1727    User user = getActiveUser(c);
1728    checkForReservedTagPresence(user, append);
1729
1730    // Require WRITE permission to the table, CF, and the KV to be appended
1731    RegionCoprocessorEnvironment env = c.getEnvironment();
1732    Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1733    AuthResult authResult = permissionGranted(OpType.APPEND, user,
1734        env, families, Action.WRITE);
1735    AccessChecker.logResult(authResult);
1736    if (!authResult.isAllowed()) {
1737      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1738        append.setAttribute(CHECK_COVERING_PERM, TRUE);
1739      } else if (authorizationEnabled)  {
1740        throw new AccessDeniedException("Insufficient permissions " +
1741          authResult.toContextString());
1742      }
1743    }
1744
1745    byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1746    if (bytes != null) {
1747      if (cellFeaturesEnabled) {
1748        addCellPermissions(bytes, append.getFamilyCellMap());
1749      } else {
1750        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1751      }
1752    }
1753
1754    return null;
1755  }
1756
1757  @Override
1758  public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1759      final Append append) throws IOException {
1760    if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1761      // We had failure with table, cf and q perm checks and now giving a chance for cell
1762      // perm check
1763      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1764      AuthResult authResult = null;
1765      User user = getActiveUser(c);
1766      if (checkCoveringPermission(user, OpType.APPEND, c.getEnvironment(), append.getRow(),
1767          append.getFamilyCellMap(), append.getTimeRange().getMax(), Action.WRITE)) {
1768        authResult = AuthResult.allow(OpType.APPEND.toString(),
1769            "Covering cell set", user, Action.WRITE, table, append.getFamilyCellMap());
1770      } else {
1771        authResult = AuthResult.deny(OpType.APPEND.toString(),
1772            "Covering cell set", user, Action.WRITE, table, append.getFamilyCellMap());
1773      }
1774      AccessChecker.logResult(authResult);
1775      if (authorizationEnabled && !authResult.isAllowed()) {
1776        throw new AccessDeniedException("Insufficient permissions " +
1777          authResult.toContextString());
1778      }
1779    }
1780    return null;
1781  }
1782
1783  @Override
1784  public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1785      final Increment increment)
1786      throws IOException {
1787    User user = getActiveUser(c);
1788    checkForReservedTagPresence(user, increment);
1789
1790    // Require WRITE permission to the table, CF, and the KV to be replaced by
1791    // the incremented value
1792    RegionCoprocessorEnvironment env = c.getEnvironment();
1793    Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1794    AuthResult authResult = permissionGranted(OpType.INCREMENT,
1795        user, env, families, Action.WRITE);
1796    AccessChecker.logResult(authResult);
1797    if (!authResult.isAllowed()) {
1798      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1799        increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1800      } else if (authorizationEnabled) {
1801        throw new AccessDeniedException("Insufficient permissions " +
1802          authResult.toContextString());
1803      }
1804    }
1805
1806    byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1807    if (bytes != null) {
1808      if (cellFeaturesEnabled) {
1809        addCellPermissions(bytes, increment.getFamilyCellMap());
1810      } else {
1811        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1812      }
1813    }
1814
1815    return null;
1816  }
1817
1818  @Override
1819  public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1820      final Increment increment) throws IOException {
1821    if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1822      // We had failure with table, cf and q perm checks and now giving a chance for cell
1823      // perm check
1824      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1825      AuthResult authResult = null;
1826      User user = getActiveUser(c);
1827      if (checkCoveringPermission(user, OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1828          increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1829        authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1830            user, Action.WRITE, table, increment.getFamilyCellMap());
1831      } else {
1832        authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1833            user, Action.WRITE, table, increment.getFamilyCellMap());
1834      }
1835      AccessChecker.logResult(authResult);
1836      if (authorizationEnabled && !authResult.isAllowed()) {
1837        throw new AccessDeniedException("Insufficient permissions " +
1838          authResult.toContextString());
1839      }
1840    }
1841    return null;
1842  }
1843
1844  @Override
1845  public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1846      MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1847    // If the HFile version is insufficient to persist tags, we won't have any
1848    // work to do here
1849    if (!cellFeaturesEnabled) {
1850      return newCell;
1851    }
1852
1853    // Collect any ACLs from the old cell
1854    List<Tag> tags = Lists.newArrayList();
1855    List<Tag> aclTags = Lists.newArrayList();
1856    ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1857    if (oldCell != null) {
1858      Iterator<Tag> tagIterator = PrivateCellUtil.tagsIterator(oldCell);
1859      while (tagIterator.hasNext()) {
1860        Tag tag = tagIterator.next();
1861        if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
1862          // Not an ACL tag, just carry it through
1863          if (LOG.isTraceEnabled()) {
1864            LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType()
1865                + " length " + tag.getValueLength());
1866          }
1867          tags.add(tag);
1868        } else {
1869          aclTags.add(tag);
1870        }
1871      }
1872    }
1873
1874    // Do we have an ACL on the operation?
1875    byte[] aclBytes = mutation.getACL();
1876    if (aclBytes != null) {
1877      // Yes, use it
1878      tags.add(new ArrayBackedTag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
1879    } else {
1880      // No, use what we carried forward
1881      if (perms != null) {
1882        // TODO: If we collected ACLs from more than one tag we may have a
1883        // List<Permission> of size > 1, this can be collapsed into a single
1884        // Permission
1885        if (LOG.isTraceEnabled()) {
1886          LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1887        }
1888        tags.addAll(aclTags);
1889      }
1890    }
1891
1892    // If we have no tags to add, just return
1893    if (tags.isEmpty()) {
1894      return newCell;
1895    }
1896
1897    Cell rewriteCell = PrivateCellUtil.createCell(newCell, tags);
1898    return rewriteCell;
1899  }
1900
1901  @Override
1902  public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan)
1903      throws IOException {
1904    internalPreRead(c, scan, OpType.SCAN);
1905  }
1906
1907  @Override
1908  public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1909      final Scan scan, final RegionScanner s) throws IOException {
1910    User user = getActiveUser(c);
1911    if (user != null && user.getShortName() != null) {
1912      // store reference to scanner owner for later checks
1913      scannerOwners.put(s, user.getShortName());
1914    }
1915    return s;
1916  }
1917
1918  @Override
1919  public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1920      final InternalScanner s, final List<Result> result,
1921      final int limit, final boolean hasNext) throws IOException {
1922    requireScannerOwner(s);
1923    return hasNext;
1924  }
1925
1926  @Override
1927  public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1928      final InternalScanner s) throws IOException {
1929    requireScannerOwner(s);
1930  }
1931
1932  @Override
1933  public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1934      final InternalScanner s) throws IOException {
1935    // clean up any associated owner mapping
1936    scannerOwners.remove(s);
1937  }
1938
1939  @Override
1940  public boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> e,
1941      final InternalScanner s, final Cell curRowCell, final boolean hasMore) throws IOException {
1942    // 'default' in RegionObserver might do unnecessary copy for Off heap backed Cells.
1943    return hasMore;
1944  }
1945
1946  /**
1947   * Verify, when servicing an RPC, that the caller is the scanner owner.
1948   * If so, we assume that access control is correctly enforced based on
1949   * the checks performed in preScannerOpen()
1950   */
1951  private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
1952    if (!RpcServer.isInRpcCallContext()) {
1953      return;
1954    }
1955    String requestUserName = RpcServer.getRequestUserName().orElse(null);
1956    String owner = scannerOwners.get(s);
1957    if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
1958      throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
1959    }
1960  }
1961
1962  /**
1963   * Verifies user has CREATE privileges on
1964   * the Column Families involved in the bulkLoadHFile
1965   * request. Specific Column Write privileges are presently
1966   * ignored.
1967   */
1968  @Override
1969  public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1970      List<Pair<byte[], String>> familyPaths) throws IOException {
1971    User user = getActiveUser(ctx);
1972    for(Pair<byte[],String> el : familyPaths) {
1973      accessChecker.requirePermission(user, "preBulkLoadHFile",
1974          ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(),
1975          el.getFirst(),
1976          null,
1977          Action.CREATE);
1978    }
1979  }
1980
1981  /**
1982   * Authorization check for
1983   * SecureBulkLoadProtocol.prepareBulkLoad()
1984   * @param ctx the context
1985   * @throws IOException
1986   */
1987  @Override
1988  public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
1989  throws IOException {
1990    requireAccess(ctx, "prePrepareBulkLoad",
1991        ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.CREATE);
1992  }
1993
1994  /**
1995   * Authorization security check for
1996   * SecureBulkLoadProtocol.cleanupBulkLoad()
1997   * @param ctx the context
1998   * @throws IOException
1999   */
2000  @Override
2001  public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
2002  throws IOException {
2003    requireAccess(ctx, "preCleanupBulkLoad",
2004        ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.CREATE);
2005  }
2006
2007  /* ---- EndpointObserver implementation ---- */
2008
2009  @Override
2010  public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2011      Service service, String methodName, Message request) throws IOException {
2012    // Don't intercept calls to our own AccessControlService, we check for
2013    // appropriate permissions in the service handlers
2014    if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2015      requirePermission(ctx,
2016          "invoke(" + service.getDescriptorForType().getName() + "." + methodName + ")",
2017          getTableName(ctx.getEnvironment()), null, null,
2018          Action.EXEC);
2019    }
2020    return request;
2021  }
2022
2023  @Override
2024  public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2025      Service service, String methodName, Message request, Message.Builder responseBuilder)
2026      throws IOException { }
2027
2028  /* ---- Protobuf AccessControlService implementation ---- */
2029
2030  @Override
2031  public void grant(RpcController controller,
2032      AccessControlProtos.GrantRequest request,
2033      RpcCallback<AccessControlProtos.GrantResponse> done) {
2034    final UserPermission perm = AccessControlUtil.toUserPermission(request.getUserPermission());
2035    AccessControlProtos.GrantResponse response = null;
2036    try {
2037      // verify it's only running at .acl.
2038      if (aclRegion) {
2039        if (!initialized) {
2040          throw new CoprocessorException("AccessController not yet initialized");
2041        }
2042        if (LOG.isDebugEnabled()) {
2043          LOG.debug("Received request to grant access permission " + perm.toString());
2044        }
2045        User caller = RpcServer.getRequestUser().orElse(null);
2046
2047        switch(request.getUserPermission().getPermission().getType()) {
2048          case Global :
2049          case Table :
2050            accessChecker.requirePermission(caller, "grant", perm.getTableName(),
2051                perm.getFamily(), perm.getQualifier(), Action.ADMIN);
2052            break;
2053          case Namespace :
2054            accessChecker.requireNamespacePermission(caller, "grant", perm.getNamespace(),
2055                Action.ADMIN);
2056           break;
2057        }
2058
2059        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2060          @Override
2061          public Void run() throws Exception {
2062            // regionEnv is set at #start. Hopefully not null at this point.
2063            try (Table table = regionEnv.getConnection().
2064                getTable(AccessControlLists.ACL_TABLE_NAME)) {
2065              AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm, table,
2066                  request.getMergeExistingPermissions());
2067            }
2068            return null;
2069          }
2070        });
2071
2072        if (AUDITLOG.isTraceEnabled()) {
2073          // audit log should store permission changes in addition to auth results
2074          AUDITLOG.trace("Granted permission " + perm.toString());
2075        }
2076      } else {
2077        throw new CoprocessorException(AccessController.class, "This method "
2078            + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2079      }
2080      response = AccessControlProtos.GrantResponse.getDefaultInstance();
2081    } catch (IOException ioe) {
2082      // pass exception back up
2083      CoprocessorRpcUtils.setControllerException(controller, ioe);
2084    }
2085    done.run(response);
2086  }
2087
2088  @Override
2089  public void revoke(RpcController controller,
2090      AccessControlProtos.RevokeRequest request,
2091      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2092    final UserPermission perm = AccessControlUtil.toUserPermission(request.getUserPermission());
2093    AccessControlProtos.RevokeResponse response = null;
2094    try {
2095      // only allowed to be called on _acl_ region
2096      if (aclRegion) {
2097        if (!initialized) {
2098          throw new CoprocessorException("AccessController not yet initialized");
2099        }
2100        if (LOG.isDebugEnabled()) {
2101          LOG.debug("Received request to revoke access permission " + perm.toString());
2102        }
2103        User caller = RpcServer.getRequestUser().orElse(null);
2104
2105        switch(request.getUserPermission().getPermission().getType()) {
2106          case Global :
2107          case Table :
2108            accessChecker.requirePermission(caller, "revoke", perm.getTableName(), perm.getFamily(),
2109              perm.getQualifier(), Action.ADMIN);
2110            break;
2111          case Namespace :
2112            accessChecker.requireNamespacePermission(caller, "revoke", perm.getNamespace(),
2113                Action.ADMIN);
2114            break;
2115        }
2116
2117        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2118          @Override
2119          public Void run() throws Exception {
2120            // regionEnv is set at #start. Hopefully not null here.
2121            try (Table table = regionEnv.getConnection().
2122                getTable(AccessControlLists.ACL_TABLE_NAME)) {
2123              AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm, table);
2124            }
2125            return null;
2126          }
2127        });
2128
2129        if (AUDITLOG.isTraceEnabled()) {
2130          // audit log should record all permission changes
2131          AUDITLOG.trace("Revoked permission " + perm.toString());
2132        }
2133      } else {
2134        throw new CoprocessorException(AccessController.class, "This method "
2135            + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2136      }
2137      response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2138    } catch (IOException ioe) {
2139      // pass exception back up
2140      CoprocessorRpcUtils.setControllerException(controller, ioe);
2141    }
2142    done.run(response);
2143  }
2144
2145  @Override
2146  public void getUserPermissions(RpcController controller,
2147      AccessControlProtos.GetUserPermissionsRequest request,
2148      RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2149    AccessControlProtos.GetUserPermissionsResponse response = null;
2150    try {
2151      // only allowed to be called on _acl_ region
2152      if (aclRegion) {
2153        if (!initialized) {
2154          throw new CoprocessorException("AccessController not yet initialized");
2155        }
2156        User caller = RpcServer.getRequestUser().orElse(null);
2157
2158        List<UserPermission> perms = null;
2159        if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2160          final TableName table = request.hasTableName() ?
2161            ProtobufUtil.toTableName(request.getTableName()) : null;
2162          accessChecker.requirePermission(caller, "userPermissions",
2163              table, null, null, Action.ADMIN);
2164          perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2165            @Override
2166            public List<UserPermission> run() throws Exception {
2167              return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2168            }
2169          });
2170        } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2171          final String namespace = request.getNamespaceName().toStringUtf8();
2172          accessChecker.requireNamespacePermission(caller, "userPermissions",
2173              namespace, Action.ADMIN);
2174          perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2175            @Override
2176            public List<UserPermission> run() throws Exception {
2177              return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2178                namespace);
2179            }
2180          });
2181        } else {
2182          accessChecker.requirePermission(caller, "userPermissions", Action.ADMIN);
2183          perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2184            @Override
2185            public List<UserPermission> run() throws Exception {
2186              return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2187            }
2188          });
2189          // Adding superusers explicitly to the result set as AccessControlLists do not store them.
2190          // Also using acl as table name to be inline  with the results of global admin and will
2191          // help in avoiding any leakage of information about being superusers.
2192          for (String user: Superusers.getSuperUsers()) {
2193            perms.add(new UserPermission(Bytes.toBytes(user), AccessControlLists.ACL_TABLE_NAME,
2194                null, Action.values()));
2195          }
2196        }
2197        response = AccessControlUtil.buildGetUserPermissionsResponse(perms);
2198      } else {
2199        throw new CoprocessorException(AccessController.class, "This method "
2200            + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2201      }
2202    } catch (IOException ioe) {
2203      // pass exception back up
2204      CoprocessorRpcUtils.setControllerException(controller, ioe);
2205    }
2206    done.run(response);
2207  }
2208
2209  @Override
2210  public void checkPermissions(RpcController controller,
2211                               AccessControlProtos.CheckPermissionsRequest request,
2212                               RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2213    Permission[] permissions = new Permission[request.getPermissionCount()];
2214    for (int i=0; i < request.getPermissionCount(); i++) {
2215      permissions[i] = AccessControlUtil.toPermission(request.getPermission(i));
2216    }
2217    AccessControlProtos.CheckPermissionsResponse response = null;
2218    try {
2219      User user = RpcServer.getRequestUser().orElse(null);
2220      TableName tableName = regionEnv.getRegion().getTableDescriptor().getTableName();
2221      for (Permission permission : permissions) {
2222        if (permission instanceof TablePermission) {
2223          // Check table permissions
2224
2225          TablePermission tperm = (TablePermission) permission;
2226          for (Action action : permission.getActions()) {
2227            if (!tperm.getTableName().equals(tableName)) {
2228              throw new CoprocessorException(AccessController.class, String.format("This method "
2229                  + "can only execute at the table specified in TablePermission. " +
2230                  "Table of the region:%s , requested table:%s", tableName,
2231                  tperm.getTableName()));
2232            }
2233
2234            Map<byte[], Set<byte[]>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2235            if (tperm.getFamily() != null) {
2236              if (tperm.getQualifier() != null) {
2237                Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2238                qualifiers.add(tperm.getQualifier());
2239                familyMap.put(tperm.getFamily(), qualifiers);
2240              } else {
2241                familyMap.put(tperm.getFamily(), null);
2242              }
2243            }
2244
2245            AuthResult result = permissionGranted("checkPermissions", user, action, regionEnv,
2246              familyMap);
2247            AccessChecker.logResult(result);
2248            if (!result.isAllowed()) {
2249              // Even if passive we need to throw an exception here, we support checking
2250              // effective permissions, so throw unconditionally
2251              throw new AccessDeniedException("Insufficient permissions (table=" + tableName +
2252                (familyMap.size() > 0 ? ", family: " + result.toFamilyString() : "") +
2253                ", action=" + action.toString() + ")");
2254            }
2255          }
2256
2257        } else {
2258          // Check global permissions
2259
2260          for (Action action : permission.getActions()) {
2261            AuthResult result;
2262            if (getAuthManager().authorize(user, action)) {
2263              result = AuthResult.allow("checkPermissions", "Global action allowed", user,
2264                action, null, null);
2265            } else {
2266              result = AuthResult.deny("checkPermissions", "Global action denied", user, action,
2267                null, null);
2268            }
2269            AccessChecker.logResult(result);
2270            if (!result.isAllowed()) {
2271              // Even if passive we need to throw an exception here, we support checking
2272              // effective permissions, so throw unconditionally
2273              throw new AccessDeniedException("Insufficient permissions (action=" +
2274                action.toString() + ")");
2275            }
2276          }
2277        }
2278      }
2279      response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2280    } catch (IOException ioe) {
2281      CoprocessorRpcUtils.setControllerException(controller, ioe);
2282    }
2283    done.run(response);
2284  }
2285
2286  private Region getRegion(RegionCoprocessorEnvironment e) {
2287    return e.getRegion();
2288  }
2289
2290  private TableName getTableName(RegionCoprocessorEnvironment e) {
2291    Region region = e.getRegion();
2292    if (region != null) {
2293      return getTableName(region);
2294    }
2295    return null;
2296  }
2297
2298  private TableName getTableName(Region region) {
2299    RegionInfo regionInfo = region.getRegionInfo();
2300    if (regionInfo != null) {
2301      return regionInfo.getTable();
2302    }
2303    return null;
2304  }
2305
2306  @Override
2307  public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
2308      throws IOException {
2309    requirePermission(c, "preClose", Action.ADMIN);
2310  }
2311
2312  private void checkSystemOrSuperUser(User activeUser) throws IOException {
2313    // No need to check if we're not going to throw
2314    if (!authorizationEnabled) {
2315      return;
2316    }
2317    if (!Superusers.isSuperUser(activeUser)) {
2318      throw new AccessDeniedException("User '" + (activeUser != null ?
2319        activeUser.getShortName() : "null") + "' is not system or super user.");
2320    }
2321  }
2322
2323  @Override
2324  public void preStopRegionServer(
2325      ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2326      throws IOException {
2327    requirePermission(ctx, "preStopRegionServer", Action.ADMIN);
2328  }
2329
2330  private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2331      byte[] qualifier) {
2332    if (family == null) {
2333      return null;
2334    }
2335
2336    Map<byte[], Collection<byte[]>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2337    familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2338    return familyMap;
2339  }
2340
2341  @Override
2342  public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2343       List<TableName> tableNamesList, List<TableDescriptor> descriptors,
2344       String regex) throws IOException {
2345    // We are delegating the authorization check to postGetTableDescriptors as we don't have
2346    // any concrete set of table names when a regex is present or the full list is requested.
2347    if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2348      // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2349      // request can be granted.
2350      TableName [] sns = null;
2351      try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) {
2352        sns = admin.listTableNames();
2353        if (sns == null) return;
2354        for (TableName tableName: tableNamesList) {
2355          // Skip checks for a table that does not exist
2356          if (!admin.tableExists(tableName)) continue;
2357          requirePermission(ctx, "getTableDescriptors", tableName, null, null,
2358            Action.ADMIN, Action.CREATE);
2359        }
2360      }
2361    }
2362  }
2363
2364  @Override
2365  public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2366      List<TableName> tableNamesList, List<TableDescriptor> descriptors,
2367      String regex) throws IOException {
2368    // Skipping as checks in this case are already done by preGetTableDescriptors.
2369    if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2370      return;
2371    }
2372
2373    // Retains only those which passes authorization checks, as the checks weren't done as part
2374    // of preGetTableDescriptors.
2375    Iterator<TableDescriptor> itr = descriptors.iterator();
2376    while (itr.hasNext()) {
2377      TableDescriptor htd = itr.next();
2378      try {
2379        requirePermission(ctx, "getTableDescriptors", htd.getTableName(), null, null,
2380            Action.ADMIN, Action.CREATE);
2381      } catch (AccessDeniedException e) {
2382        itr.remove();
2383      }
2384    }
2385  }
2386
2387  @Override
2388  public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2389      List<TableDescriptor> descriptors, String regex) throws IOException {
2390    // Retains only those which passes authorization checks.
2391    Iterator<TableDescriptor> itr = descriptors.iterator();
2392    while (itr.hasNext()) {
2393      TableDescriptor htd = itr.next();
2394      try {
2395        requireAccess(ctx, "getTableNames", htd.getTableName(), Action.values());
2396      } catch (AccessDeniedException e) {
2397        itr.remove();
2398      }
2399    }
2400  }
2401
2402  @Override
2403  public void preMergeRegions(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2404                              final RegionInfo[] regionsToMerge) throws IOException {
2405    requirePermission(ctx, "mergeRegions", regionsToMerge[0].getTable(), null, null,
2406      Action.ADMIN);
2407  }
2408
2409  @Override
2410  public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2411      throws IOException {
2412    requirePermission(ctx, "preRollLogWriterRequest", Permission.Action.ADMIN);
2413  }
2414
2415  @Override
2416  public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2417      throws IOException { }
2418
2419  @Override
2420  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2421      final String userName, final GlobalQuotaSettings quotas) throws IOException {
2422    requirePermission(ctx, "setUserQuota", Action.ADMIN);
2423  }
2424
2425  @Override
2426  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2427      final String userName, final TableName tableName, final GlobalQuotaSettings quotas)
2428          throws IOException {
2429    requirePermission(ctx, "setUserTableQuota", tableName, null, null, Action.ADMIN);
2430  }
2431
2432  @Override
2433  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2434      final String userName, final String namespace, final GlobalQuotaSettings quotas)
2435          throws IOException {
2436    requirePermission(ctx, "setUserNamespaceQuota", Action.ADMIN);
2437  }
2438
2439  @Override
2440  public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2441      final TableName tableName, final GlobalQuotaSettings quotas) throws IOException {
2442    requirePermission(ctx, "setTableQuota", tableName, null, null, Action.ADMIN);
2443  }
2444
2445  @Override
2446  public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2447      final String namespace, final GlobalQuotaSettings quotas) throws IOException {
2448    requirePermission(ctx, "setNamespaceQuota", Action.ADMIN);
2449  }
2450
2451  @Override
2452  public ReplicationEndpoint postCreateReplicationEndPoint(
2453      ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2454    return endpoint;
2455  }
2456
2457  @Override
2458  public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2459      throws IOException {
2460    requirePermission(ctx, "replicateLogEntries", Action.WRITE);
2461  }
2462
2463  @Override
2464  public void  preClearCompactionQueues(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2465          throws IOException {
2466    requirePermission(ctx, "preClearCompactionQueues", Permission.Action.ADMIN);
2467  }
2468
2469  @Override
2470  public void preAddReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2471      String peerId, ReplicationPeerConfig peerConfig) throws IOException {
2472    requirePermission(ctx, "addReplicationPeer", Action.ADMIN);
2473  }
2474
2475  @Override
2476  public void preRemoveReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2477      String peerId) throws IOException {
2478    requirePermission(ctx, "removeReplicationPeer", Action.ADMIN);
2479  }
2480
2481  @Override
2482  public void preEnableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2483      String peerId) throws IOException {
2484    requirePermission(ctx, "enableReplicationPeer", Action.ADMIN);
2485  }
2486
2487  @Override
2488  public void preDisableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2489      String peerId) throws IOException {
2490    requirePermission(ctx, "disableReplicationPeer", Action.ADMIN);
2491  }
2492
2493  @Override
2494  public void preGetReplicationPeerConfig(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2495      String peerId) throws IOException {
2496    requirePermission(ctx, "getReplicationPeerConfig", Action.ADMIN);
2497  }
2498
2499  @Override
2500  public void preUpdateReplicationPeerConfig(
2501      final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
2502      ReplicationPeerConfig peerConfig) throws IOException {
2503    requirePermission(ctx, "updateReplicationPeerConfig", Action.ADMIN);
2504  }
2505
2506  @Override
2507  public void preListReplicationPeers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2508      String regex) throws IOException {
2509    requirePermission(ctx, "listReplicationPeers", Action.ADMIN);
2510  }
2511
2512  @Override
2513  public void preRequestLock(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace,
2514      TableName tableName, RegionInfo[] regionInfos, String description)
2515  throws IOException {
2516    // There are operations in the CREATE and ADMIN domain which may require lock, READ
2517    // or WRITE. So for any lock request, we check for these two perms irrespective of lock type.
2518    String reason = String.format("Description=%s", description);
2519    checkLockPermissions(ctx, namespace, tableName, regionInfos, reason);
2520  }
2521
2522  @Override
2523  public void preLockHeartbeat(ObserverContext<MasterCoprocessorEnvironment> ctx,
2524      TableName tableName, String description) throws IOException {
2525    checkLockPermissions(ctx, null, tableName, null, description);
2526  }
2527
2528  /**
2529   * Returns the active user to which authorization checks should be applied.
2530   * If we are in the context of an RPC call, the remote user is used,
2531   * otherwise the currently logged in user is used.
2532   */
2533  public User getActiveUser(ObserverContext<?> ctx) throws IOException {
2534    // for non-rpc handling, fallback to system user
2535    Optional<User> optionalUser = ctx.getCaller();
2536    if (optionalUser.isPresent()) {
2537      return optionalUser.get();
2538    }
2539    return userProvider.getCurrent();
2540  }
2541}