001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hbase.security.access;
020
021import com.google.protobuf.Message;
022import com.google.protobuf.RpcCallback;
023import com.google.protobuf.RpcController;
024import com.google.protobuf.Service;
025import java.io.IOException;
026import java.net.InetAddress;
027import java.security.PrivilegedExceptionAction;
028import java.util.ArrayList;
029import java.util.Collection;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Map;
035import java.util.Map.Entry;
036import java.util.Optional;
037import java.util.Set;
038import java.util.TreeMap;
039import java.util.TreeSet;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.ArrayBackedTag;
042import org.apache.hadoop.hbase.Cell;
043import org.apache.hadoop.hbase.CellScanner;
044import org.apache.hadoop.hbase.CellUtil;
045import org.apache.hadoop.hbase.CompareOperator;
046import org.apache.hadoop.hbase.CompoundConfiguration;
047import org.apache.hadoop.hbase.CoprocessorEnvironment;
048import org.apache.hadoop.hbase.DoNotRetryIOException;
049import org.apache.hadoop.hbase.HBaseInterfaceAudience;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.KeyValue.Type;
053import org.apache.hadoop.hbase.NamespaceDescriptor;
054import org.apache.hadoop.hbase.PrivateCellUtil;
055import org.apache.hadoop.hbase.ServerName;
056import org.apache.hadoop.hbase.TableName;
057import org.apache.hadoop.hbase.Tag;
058import org.apache.hadoop.hbase.client.Admin;
059import org.apache.hadoop.hbase.client.Append;
060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
062import org.apache.hadoop.hbase.client.Delete;
063import org.apache.hadoop.hbase.client.Durability;
064import org.apache.hadoop.hbase.client.Get;
065import org.apache.hadoop.hbase.client.Increment;
066import org.apache.hadoop.hbase.client.MasterSwitchType;
067import org.apache.hadoop.hbase.client.Mutation;
068import org.apache.hadoop.hbase.client.Put;
069import org.apache.hadoop.hbase.client.Query;
070import org.apache.hadoop.hbase.client.RegionInfo;
071import org.apache.hadoop.hbase.client.Result;
072import org.apache.hadoop.hbase.client.Scan;
073import org.apache.hadoop.hbase.client.SnapshotDescription;
074import org.apache.hadoop.hbase.client.Table;
075import org.apache.hadoop.hbase.client.TableDescriptor;
076import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
077import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
078import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
079import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
080import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
081import org.apache.hadoop.hbase.coprocessor.HasMasterServices;
082import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
083import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
084import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
085import org.apache.hadoop.hbase.coprocessor.MasterObserver;
086import org.apache.hadoop.hbase.coprocessor.ObserverContext;
087import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
088import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
089import org.apache.hadoop.hbase.coprocessor.RegionObserver;
090import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
091import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
092import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
093import org.apache.hadoop.hbase.filter.ByteArrayComparable;
094import org.apache.hadoop.hbase.filter.Filter;
095import org.apache.hadoop.hbase.filter.FilterList;
096import org.apache.hadoop.hbase.io.hfile.HFile;
097import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
098import org.apache.hadoop.hbase.ipc.RpcServer;
099import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
100import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
101import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
102import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings;
103import org.apache.hadoop.hbase.regionserver.BloomType;
104import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
105import org.apache.hadoop.hbase.regionserver.InternalScanner;
106import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
107import org.apache.hadoop.hbase.regionserver.Region;
108import org.apache.hadoop.hbase.regionserver.RegionScanner;
109import org.apache.hadoop.hbase.regionserver.ScanType;
110import org.apache.hadoop.hbase.regionserver.ScannerContext;
111import org.apache.hadoop.hbase.regionserver.Store;
112import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
113import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
114import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
115import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
116import org.apache.hadoop.hbase.security.AccessDeniedException;
117import org.apache.hadoop.hbase.security.Superusers;
118import org.apache.hadoop.hbase.security.User;
119import org.apache.hadoop.hbase.security.UserProvider;
120import org.apache.hadoop.hbase.security.access.Permission.Action;
121import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
122import org.apache.hadoop.hbase.util.ByteRange;
123import org.apache.hadoop.hbase.util.Bytes;
124import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
125import org.apache.hadoop.hbase.util.Pair;
126import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
127import org.apache.hadoop.hbase.wal.WALEdit;
128import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
129import org.apache.yetus.audience.InterfaceAudience;
130import org.slf4j.Logger;
131import org.slf4j.LoggerFactory;
132
133import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
134import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
135import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
136import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
137import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
138import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
139import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
140
141/**
142 * Provides basic authorization checks for data access and administrative
143 * operations.
144 *
145 * <p>
146 * {@code AccessController} performs authorization checks for HBase operations
147 * based on:
148 * </p>
149 * <ul>
150 *   <li>the identity of the user performing the operation</li>
151 *   <li>the scope over which the operation is performed, in increasing
152 *   specificity: global, table, column family, or qualifier</li>
153 *   <li>the type of action being performed (as mapped to
154 *   {@link Permission.Action} values)</li>
155 * </ul>
156 * <p>
157 * If the authorization check fails, an {@link AccessDeniedException}
158 * will be thrown for the operation.
159 * </p>
160 *
161 * <p>
162 * To perform authorization checks, {@code AccessController} relies on the
163 * RpcServerEngine being loaded to provide
164 * the user identities for remote requests.
165 * </p>
166 *
167 * <p>
168 * The access control lists used for authorization can be manipulated via the
169 * exposed {@link AccessControlService} Interface implementation, and the associated
170 * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
171 * commands.
172 * </p>
173 */
174@CoreCoprocessor
175@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
176public class AccessController implements MasterCoprocessor, RegionCoprocessor,
177    RegionServerCoprocessor, AccessControlService.Interface,
178    MasterObserver, RegionObserver, RegionServerObserver, EndpointObserver, BulkLoadObserver {
179  // TODO: encapsulate observer functions into separate class/sub-class.
180
181  private static final Logger LOG = LoggerFactory.getLogger(AccessController.class);
182
183  private static final Logger AUDITLOG =
184    LoggerFactory.getLogger("SecurityLogger."+AccessController.class.getName());
185  private static final String CHECK_COVERING_PERM = "check_covering_perm";
186  private static final String TAG_CHECK_PASSED = "tag_check_passed";
187  private static final byte[] TRUE = Bytes.toBytes(true);
188
189  private AccessChecker accessChecker;
190
191  /** flags if we are running on a region of the _acl_ table */
192  private boolean aclRegion = false;
193
194  /** defined only for Endpoint implementation, so it can have way to
195   access region services */
196  private RegionCoprocessorEnvironment regionEnv;
197
198  /** Mapping of scanner instances to the user who created them */
199  private Map<InternalScanner,String> scannerOwners =
200      new MapMaker().weakKeys().makeMap();
201
202  private Map<TableName, List<UserPermission>> tableAcls;
203
204  /** Provider for mapping principal names to Users */
205  private UserProvider userProvider;
206
207  /** if we are active, usually false, only true if "hbase.security.authorization"
208   has been set to true in site configuration */
209  private boolean authorizationEnabled;
210
211  /** if we are able to support cell ACLs */
212  private boolean cellFeaturesEnabled;
213
214  /** if we should check EXEC permissions */
215  private boolean shouldCheckExecPermission;
216
217  /** if we should terminate access checks early as soon as table or CF grants
218    allow access; pre-0.98 compatible behavior */
219  private boolean compatibleEarlyTermination;
220
221  /** if we have been successfully initialized */
222  private volatile boolean initialized = false;
223
224  /** if the ACL table is available, only relevant in the master */
225  private volatile boolean aclTabAvailable = false;
226
227  public static boolean isCellAuthorizationSupported(Configuration conf) {
228    return AccessChecker.isAuthorizationSupported(conf) &&
229        (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
230  }
231
232  public Region getRegion() {
233    return regionEnv != null ? regionEnv.getRegion() : null;
234  }
235
236  public TableAuthManager getAuthManager() {
237    return accessChecker.getAuthManager();
238  }
239
240  private void initialize(RegionCoprocessorEnvironment e) throws IOException {
241    final Region region = e.getRegion();
242    Configuration conf = e.getConfiguration();
243    Map<byte[], ListMultimap<String,TablePermission>> tables =
244        AccessControlLists.loadAll(region);
245    // For each table, write out the table's permissions to the respective
246    // znode for that table.
247    for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
248      tables.entrySet()) {
249      byte[] entry = t.getKey();
250      ListMultimap<String,TablePermission> perms = t.getValue();
251      byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
252      getAuthManager().getZKPermissionWatcher().writeToZookeeper(entry, serialized);
253    }
254    initialized = true;
255  }
256
257  /**
258   * Writes all table ACLs for the tables in the given Map up into ZooKeeper
259   * znodes.  This is called to synchronize ACL changes following {@code _acl_}
260   * table updates.
261   */
262  private void updateACL(RegionCoprocessorEnvironment e,
263      final Map<byte[], List<Cell>> familyMap) {
264    Set<byte[]> entries = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
265    for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
266      List<Cell> cells = f.getValue();
267      for (Cell cell: cells) {
268        if (CellUtil.matchingFamily(cell, AccessControlLists.ACL_LIST_FAMILY)) {
269          entries.add(CellUtil.cloneRow(cell));
270        }
271      }
272    }
273    ZKPermissionWatcher zkw = getAuthManager().getZKPermissionWatcher();
274    Configuration conf = regionEnv.getConfiguration();
275    byte [] currentEntry = null;
276    // TODO: Here we are already on the ACL region. (And it is single
277    // region) We can even just get the region from the env and do get
278    // directly. The short circuit connection would avoid the RPC overhead
279    // so no socket communication, req write/read ..  But we have the PB
280    // to and fro conversion overhead. get req is converted to PB req
281    // and results are converted to PB results 1st and then to POJOs
282    // again. We could have avoided such at least in ACL table context..
283    try (Table t = e.getConnection().getTable(AccessControlLists.ACL_TABLE_NAME)) {
284      for (byte[] entry : entries) {
285        currentEntry = entry;
286        ListMultimap<String, TablePermission> perms =
287            AccessControlLists.getPermissions(conf, entry, t);
288        byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
289        zkw.writeToZookeeper(entry, serialized);
290      }
291    } catch(IOException ex) {
292          LOG.error("Failed updating permissions mirror for '" +
293                  (currentEntry == null? "null": Bytes.toString(currentEntry)) + "'", ex);
294    }
295  }
296
297  /**
298   * Check the current user for authorization to perform a specific action
299   * against the given set of row data.
300   *
301   * <p>Note: Ordering of the authorization checks
302   * has been carefully optimized to short-circuit the most common requests
303   * and minimize the amount of processing required.</p>
304   *
305   * @param permRequest the action being requested
306   * @param e the coprocessor environment
307   * @param families the map of column families to qualifiers present in
308   * the request
309   * @return an authorization result
310   */
311  private AuthResult permissionGranted(String request, User user, Action permRequest,
312      RegionCoprocessorEnvironment e,
313      Map<byte [], ? extends Collection<?>> families) {
314    RegionInfo hri = e.getRegion().getRegionInfo();
315    TableName tableName = hri.getTable();
316
317    // 1. All users need read access to hbase:meta table.
318    // this is a very common operation, so deal with it quickly.
319    if (hri.isMetaRegion()) {
320      if (permRequest == Action.READ) {
321        return AuthResult.allow(request, "All users allowed", user,
322          permRequest, tableName, families);
323      }
324    }
325
326    if (user == null) {
327      return AuthResult.deny(request, "No user associated with request!", null,
328        permRequest, tableName, families);
329    }
330
331    // 2. check for the table-level, if successful we can short-circuit
332    if (getAuthManager().authorize(user, tableName, (byte[])null, permRequest)) {
333      return AuthResult.allow(request, "Table permission granted", user,
334        permRequest, tableName, families);
335    }
336
337    // 3. check permissions against the requested families
338    if (families != null && families.size() > 0) {
339      // all families must pass
340      for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
341        // a) check for family level access
342        if (getAuthManager().authorize(user, tableName, family.getKey(),
343            permRequest)) {
344          continue;  // family-level permission overrides per-qualifier
345        }
346
347        // b) qualifier level access can still succeed
348        if ((family.getValue() != null) && (family.getValue().size() > 0)) {
349          if (family.getValue() instanceof Set) {
350            // for each qualifier of the family
351            Set<byte[]> familySet = (Set<byte[]>)family.getValue();
352            for (byte[] qualifier : familySet) {
353              if (!getAuthManager().authorize(user, tableName, family.getKey(),
354                                         qualifier, permRequest)) {
355                return AuthResult.deny(request, "Failed qualifier check", user,
356                    permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
357              }
358            }
359          } else if (family.getValue() instanceof List) { // List<Cell>
360            List<Cell> cellList = (List<Cell>)family.getValue();
361            for (Cell cell : cellList) {
362              if (!getAuthManager().authorize(user, tableName, family.getKey(),
363                CellUtil.cloneQualifier(cell), permRequest)) {
364                return AuthResult.deny(request, "Failed qualifier check", user, permRequest,
365                  tableName, makeFamilyMap(family.getKey(), CellUtil.cloneQualifier(cell)));
366              }
367            }
368          }
369        } else {
370          // no qualifiers and family-level check already failed
371          return AuthResult.deny(request, "Failed family check", user, permRequest,
372              tableName, makeFamilyMap(family.getKey(), null));
373        }
374      }
375
376      // all family checks passed
377      return AuthResult.allow(request, "All family checks passed", user, permRequest,
378          tableName, families);
379    }
380
381    // 4. no families to check and table level access failed
382    return AuthResult.deny(request, "No families to check and table permission failed",
383        user, permRequest, tableName, families);
384  }
385
386  /**
387   * Check the current user for authorization to perform a specific action
388   * against the given set of row data.
389   * @param opType the operation type
390   * @param user the user
391   * @param e the coprocessor environment
392   * @param families the map of column families to qualifiers present in
393   * the request
394   * @param actions the desired actions
395   * @return an authorization result
396   */
397  private AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
398      Map<byte [], ? extends Collection<?>> families, Action... actions) {
399    AuthResult result = null;
400    for (Action action: actions) {
401      result = permissionGranted(opType.toString(), user, action, e, families);
402      if (!result.isAllowed()) {
403        return result;
404      }
405    }
406    return result;
407  }
408
409  public void requireAccess(ObserverContext<?> ctx, String request, TableName tableName,
410      Action... permissions) throws IOException {
411    accessChecker.requireAccess(getActiveUser(ctx), request, tableName, permissions);
412  }
413
414  public void requirePermission(ObserverContext<?> ctx, String request,
415      Action perm) throws IOException {
416    accessChecker.requirePermission(getActiveUser(ctx), request, perm);
417  }
418
419  public void requireGlobalPermission(ObserverContext<?> ctx, String request,
420      Action perm, TableName tableName,
421      Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
422    accessChecker.requireGlobalPermission(getActiveUser(ctx),
423        request, perm,tableName, familyMap);
424  }
425
426  public void requireGlobalPermission(ObserverContext<?> ctx, String request,
427      Action perm, String namespace) throws IOException {
428    accessChecker.requireGlobalPermission(getActiveUser(ctx),
429        request, perm, namespace);
430  }
431
432  public void requireNamespacePermission(ObserverContext<?> ctx, String request, String namespace,
433      Action... permissions) throws IOException {
434    accessChecker.requireNamespacePermission(getActiveUser(ctx),
435        request, namespace, permissions);
436  }
437
438  public void requireNamespacePermission(ObserverContext<?> ctx, String request, String namespace,
439      TableName tableName, Map<byte[], ? extends Collection<byte[]>> familyMap,
440      Action... permissions) throws IOException {
441    accessChecker.requireNamespacePermission(getActiveUser(ctx),
442        request, namespace, tableName, familyMap,
443        permissions);
444  }
445
446  public void requirePermission(ObserverContext<?> ctx, String request, TableName tableName,
447      byte[] family, byte[] qualifier, Action... permissions) throws IOException {
448    accessChecker.requirePermission(getActiveUser(ctx), request,
449        tableName, family, qualifier, permissions);
450  }
451
452  public void requireTablePermission(ObserverContext<?> ctx, String request,
453      TableName tableName,byte[] family, byte[] qualifier,
454      Action... permissions) throws IOException {
455    accessChecker.requireTablePermission(getActiveUser(ctx),
456        request, tableName, family, qualifier, permissions);
457  }
458
459  public void checkLockPermissions(ObserverContext<?> ctx, String namespace,
460      TableName tableName, RegionInfo[] regionInfos, String reason)
461      throws IOException {
462    accessChecker.checkLockPermissions(getActiveUser(ctx),
463        namespace, tableName, regionInfos, reason);
464  }
465
466  /**
467   * Returns <code>true</code> if the current user is allowed the given action
468   * over at least one of the column qualifiers in the given column families.
469   */
470  private boolean hasFamilyQualifierPermission(User user,
471      Action perm,
472      RegionCoprocessorEnvironment env,
473      Map<byte[], ? extends Collection<byte[]>> familyMap)
474    throws IOException {
475    RegionInfo hri = env.getRegion().getRegionInfo();
476    TableName tableName = hri.getTable();
477
478    if (user == null) {
479      return false;
480    }
481
482    if (familyMap != null && familyMap.size() > 0) {
483      // at least one family must be allowed
484      for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
485          familyMap.entrySet()) {
486        if (family.getValue() != null && !family.getValue().isEmpty()) {
487          for (byte[] qualifier : family.getValue()) {
488            if (getAuthManager().matchPermission(user, tableName,
489                family.getKey(), qualifier, perm)) {
490              return true;
491            }
492          }
493        } else {
494          if (getAuthManager().matchPermission(user, tableName, family.getKey(),
495              perm)) {
496            return true;
497          }
498        }
499      }
500    } else if (LOG.isDebugEnabled()) {
501      LOG.debug("Empty family map passed for permission check");
502    }
503
504    return false;
505  }
506
507  private enum OpType {
508    GET("get"),
509    EXISTS("exists"),
510    SCAN("scan"),
511    PUT("put"),
512    DELETE("delete"),
513    CHECK_AND_PUT("checkAndPut"),
514    CHECK_AND_DELETE("checkAndDelete"),
515    INCREMENT_COLUMN_VALUE("incrementColumnValue"),
516    APPEND("append"),
517    INCREMENT("increment");
518
519    private String type;
520
521    private OpType(String type) {
522      this.type = type;
523    }
524
525    @Override
526    public String toString() {
527      return type;
528    }
529  }
530
531  /**
532   * Determine if cell ACLs covered by the operation grant access. This is expensive.
533   * @return false if cell ACLs failed to grant access, true otherwise
534   * @throws IOException
535   */
536  private boolean checkCoveringPermission(User user, OpType request, RegionCoprocessorEnvironment e,
537      byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
538      throws IOException {
539    if (!cellFeaturesEnabled) {
540      return false;
541    }
542    long cellGrants = 0;
543    long latestCellTs = 0;
544    Get get = new Get(row);
545    // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
546    // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
547    // version. We have to get every cell version and check its TS against the TS asked for in
548    // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
549    // consider only one such passing cell. In case of Delete we have to consider all the cell
550    // versions under this passing version. When Delete Mutation contains columns which are a
551    // version delete just consider only one version for those column cells.
552    boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
553    if (considerCellTs) {
554      get.setMaxVersions();
555    } else {
556      get.setMaxVersions(1);
557    }
558    boolean diffCellTsFromOpTs = false;
559    for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
560      byte[] col = entry.getKey();
561      // TODO: HBASE-7114 could possibly unify the collection type in family
562      // maps so we would not need to do this
563      if (entry.getValue() instanceof Set) {
564        Set<byte[]> set = (Set<byte[]>)entry.getValue();
565        if (set == null || set.isEmpty()) {
566          get.addFamily(col);
567        } else {
568          for (byte[] qual: set) {
569            get.addColumn(col, qual);
570          }
571        }
572      } else if (entry.getValue() instanceof List) {
573        List<Cell> list = (List<Cell>)entry.getValue();
574        if (list == null || list.isEmpty()) {
575          get.addFamily(col);
576        } else {
577          // In case of family delete, a Cell will be added into the list with Qualifier as null.
578          for (Cell cell : list) {
579            if (cell.getQualifierLength() == 0
580                && (cell.getTypeByte() == Type.DeleteFamily.getCode()
581                || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
582              get.addFamily(col);
583            } else {
584              get.addColumn(col, CellUtil.cloneQualifier(cell));
585            }
586            if (considerCellTs) {
587              long cellTs = cell.getTimestamp();
588              latestCellTs = Math.max(latestCellTs, cellTs);
589              diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
590            }
591          }
592        }
593      } else if (entry.getValue() == null) {
594        get.addFamily(col);
595      } else {
596        throw new RuntimeException("Unhandled collection type " +
597          entry.getValue().getClass().getName());
598      }
599    }
600    // We want to avoid looking into the future. So, if the cells of the
601    // operation specify a timestamp, or the operation itself specifies a
602    // timestamp, then we use the maximum ts found. Otherwise, we bound
603    // the Get to the current server time. We add 1 to the timerange since
604    // the upper bound of a timerange is exclusive yet we need to examine
605    // any cells found there inclusively.
606    long latestTs = Math.max(opTs, latestCellTs);
607    if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
608      latestTs = EnvironmentEdgeManager.currentTime();
609    }
610    get.setTimeRange(0, latestTs + 1);
611    // In case of Put operation we set to read all versions. This was done to consider the case
612    // where columns are added with TS other than the Mutation TS. But normally this wont be the
613    // case with Put. There no need to get all versions but get latest version only.
614    if (!diffCellTsFromOpTs && request == OpType.PUT) {
615      get.setMaxVersions(1);
616    }
617    if (LOG.isTraceEnabled()) {
618      LOG.trace("Scanning for cells with " + get);
619    }
620    // This Map is identical to familyMap. The key is a BR rather than byte[].
621    // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
622    // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
623    Map<ByteRange, List<Cell>> familyMap1 = new HashMap<>();
624    for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
625      if (entry.getValue() instanceof List) {
626        familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
627      }
628    }
629    RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
630    List<Cell> cells = Lists.newArrayList();
631    Cell prevCell = null;
632    ByteRange curFam = new SimpleMutableByteRange();
633    boolean curColAllVersions = (request == OpType.DELETE);
634    long curColCheckTs = opTs;
635    boolean foundColumn = false;
636    try {
637      boolean more = false;
638      ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
639
640      do {
641        cells.clear();
642        // scan with limit as 1 to hold down memory use on wide rows
643        more = scanner.next(cells, scannerContext);
644        for (Cell cell: cells) {
645          if (LOG.isTraceEnabled()) {
646            LOG.trace("Found cell " + cell);
647          }
648          boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
649          if (colChange) foundColumn = false;
650          prevCell = cell;
651          if (!curColAllVersions && foundColumn) {
652            continue;
653          }
654          if (colChange && considerCellTs) {
655            curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
656            List<Cell> cols = familyMap1.get(curFam);
657            for (Cell col : cols) {
658              // null/empty qualifier is used to denote a Family delete. The TS and delete type
659              // associated with this is applicable for all columns within the family. That is
660              // why the below (col.getQualifierLength() == 0) check.
661              if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
662                  || CellUtil.matchingQualifier(cell, col)) {
663                byte type = col.getTypeByte();
664                if (considerCellTs) {
665                  curColCheckTs = col.getTimestamp();
666                }
667                // For a Delete op we pass allVersions as true. When a Delete Mutation contains
668                // a version delete for a column no need to check all the covering cells within
669                // that column. Check all versions when Type is DeleteColumn or DeleteFamily
670                // One version delete types are Delete/DeleteFamilyVersion
671                curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
672                    || (KeyValue.Type.DeleteFamily.getCode() == type);
673                break;
674              }
675            }
676          }
677          if (cell.getTimestamp() > curColCheckTs) {
678            // Just ignore this cell. This is not a covering cell.
679            continue;
680          }
681          foundColumn = true;
682          for (Action action: actions) {
683            // Are there permissions for this user for the cell?
684            if (!getAuthManager().authorize(user, getTableName(e), cell, action)) {
685              // We can stop if the cell ACL denies access
686              return false;
687            }
688          }
689          cellGrants++;
690        }
691      } while (more);
692    } catch (AccessDeniedException ex) {
693      throw ex;
694    } catch (IOException ex) {
695      LOG.error("Exception while getting cells to calculate covering permission", ex);
696    } finally {
697      scanner.close();
698    }
699    // We should not authorize unless we have found one or more cell ACLs that
700    // grant access. This code is used to check for additional permissions
701    // after no table or CF grants are found.
702    return cellGrants > 0;
703  }
704
705  private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
706    // Iterate over the entries in the familyMap, replacing the cells therein
707    // with new cells including the ACL data
708    for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
709      List<Cell> newCells = Lists.newArrayList();
710      for (Cell cell: e.getValue()) {
711        // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
712        List<Tag> tags = new ArrayList<>();
713        tags.add(new ArrayBackedTag(AccessControlLists.ACL_TAG_TYPE, perms));
714        Iterator<Tag> tagIterator = PrivateCellUtil.tagsIterator(cell);
715        while (tagIterator.hasNext()) {
716          tags.add(tagIterator.next());
717        }
718        newCells.add(PrivateCellUtil.createCell(cell, tags));
719      }
720      // This is supposed to be safe, won't CME
721      e.setValue(newCells);
722    }
723  }
724
725  // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
726  // type is reserved and should not be explicitly set by user.
727  private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
728    // No need to check if we're not going to throw
729    if (!authorizationEnabled) {
730      m.setAttribute(TAG_CHECK_PASSED, TRUE);
731      return;
732    }
733    // Superusers are allowed to store cells unconditionally.
734    if (Superusers.isSuperUser(user)) {
735      m.setAttribute(TAG_CHECK_PASSED, TRUE);
736      return;
737    }
738    // We already checked (prePut vs preBatchMutation)
739    if (m.getAttribute(TAG_CHECK_PASSED) != null) {
740      return;
741    }
742    for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
743      Iterator<Tag> tagsItr = PrivateCellUtil.tagsIterator(cellScanner.current());
744      while (tagsItr.hasNext()) {
745        if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
746          throw new AccessDeniedException("Mutation contains cell with reserved type tag");
747        }
748      }
749    }
750    m.setAttribute(TAG_CHECK_PASSED, TRUE);
751  }
752
753  /* ---- MasterObserver implementation ---- */
754  @Override
755  public void start(CoprocessorEnvironment env) throws IOException {
756    CompoundConfiguration conf = new CompoundConfiguration();
757    conf.add(env.getConfiguration());
758
759    authorizationEnabled = AccessChecker.isAuthorizationSupported(conf);
760    if (!authorizationEnabled) {
761      LOG.warn("AccessController has been loaded with authorization checks DISABLED!");
762    }
763
764    shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
765      AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
766
767    cellFeaturesEnabled = (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
768    if (!cellFeaturesEnabled) {
769      LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
770          + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
771          + " accordingly.");
772    }
773
774    ZKWatcher zk = null;
775    if (env instanceof MasterCoprocessorEnvironment) {
776      // if running on HMaster
777      MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment)env;
778      if (mEnv instanceof HasMasterServices) {
779        zk = ((HasMasterServices)mEnv).getMasterServices().getZooKeeper();
780      }
781    } else if (env instanceof RegionServerCoprocessorEnvironment) {
782      RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment)env;
783      if (rsEnv instanceof HasRegionServerServices) {
784        zk = ((HasRegionServerServices)rsEnv).getRegionServerServices().getZooKeeper();
785      }
786    } else if (env instanceof RegionCoprocessorEnvironment) {
787      // if running at region
788      regionEnv = (RegionCoprocessorEnvironment) env;
789      conf.addBytesMap(regionEnv.getRegion().getTableDescriptor().getValues());
790      compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
791          AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
792      if (regionEnv instanceof HasRegionServerServices) {
793        zk = ((HasRegionServerServices)regionEnv).getRegionServerServices().getZooKeeper();
794      }
795    }
796
797    // set the user-provider.
798    this.userProvider = UserProvider.instantiate(env.getConfiguration());
799    // Throws RuntimeException if fails to load TableAuthManager so that coprocessor is unloaded.
800    accessChecker = new AccessChecker(env.getConfiguration(), zk);
801    tableAcls = new MapMaker().weakValues().makeMap();
802  }
803
804  @Override
805  public void stop(CoprocessorEnvironment env) {
806    accessChecker.stop();
807  }
808
809  /*********************************** Observer/Service Getters ***********************************/
810  @Override
811  public Optional<RegionObserver> getRegionObserver() {
812    return Optional.of(this);
813  }
814
815  @Override
816  public Optional<MasterObserver> getMasterObserver() {
817    return Optional.of(this);
818  }
819
820  @Override
821  public Optional<EndpointObserver> getEndpointObserver() {
822    return Optional.of(this);
823  }
824
825  @Override
826  public Optional<BulkLoadObserver> getBulkLoadObserver() {
827    return Optional.of(this);
828  }
829
830  @Override
831  public Optional<RegionServerObserver> getRegionServerObserver() {
832    return Optional.of(this);
833  }
834
835  @Override
836  public Iterable<Service> getServices() {
837    return Collections.singleton(
838        AccessControlProtos.AccessControlService.newReflectiveService(this));
839  }
840
841  /*********************************** Observer implementations ***********************************/
842
843  @Override
844  public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
845      TableDescriptor desc, RegionInfo[] regions) throws IOException {
846    Set<byte[]> families = desc.getColumnFamilyNames();
847    Map<byte[], Set<byte[]>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
848    for (byte[] family: families) {
849      familyMap.put(family, null);
850    }
851    requireNamespacePermission(c, "createTable",
852        desc.getTableName().getNamespaceAsString(), desc.getTableName(), familyMap, Action.CREATE);
853  }
854
855  @Override
856  public void postCompletedCreateTableAction(
857      final ObserverContext<MasterCoprocessorEnvironment> c,
858      final TableDescriptor desc,
859      final RegionInfo[] regions) throws IOException {
860    // When AC is used, it should be configured as the 1st CP.
861    // In Master, the table operations like create, are handled by a Thread pool but the max size
862    // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
863    // sequentially only.
864    // Related code in HMaster#startServiceThreads
865    // {code}
866    //   // We depend on there being only one instance of this executor running
867    //   // at a time. To do concurrency, would need fencing of enable/disable of
868    //   // tables.
869    //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
870    // {code}
871    // In future if we change this pool to have more threads, then there is a chance for thread,
872    // creating acl table, getting delayed and by that time another table creation got over and
873    // this hook is getting called. In such a case, we will need a wait logic here which will
874    // wait till the acl table is created.
875    if (AccessControlLists.isAclTable(desc)) {
876      this.aclTabAvailable = true;
877    } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
878      if (!aclTabAvailable) {
879        LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
880            + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
881            + getClass().getSimpleName() + " should be configured as the first Coprocessor");
882      } else {
883        String owner = desc.getOwnerString();
884        // default the table owner to current user, if not specified.
885        if (owner == null)
886          owner = getActiveUser(c).getShortName();
887        final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
888            desc.getTableName(), null, Action.values());
889        // switch to the real hbase master user for doing the RPC on the ACL table
890        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
891          @Override
892          public Void run() throws Exception {
893            try (Table table = c.getEnvironment().getConnection().
894                getTable(AccessControlLists.ACL_TABLE_NAME)) {
895              AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
896                  userperm, table);
897            }
898            return null;
899          }
900        });
901      }
902    }
903  }
904
905  @Override
906  public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
907      throws IOException {
908    requirePermission(c, "deleteTable",
909        tableName, null, null, Action.ADMIN, Action.CREATE);
910  }
911
912  @Override
913  public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
914      final TableName tableName) throws IOException {
915    final Configuration conf = c.getEnvironment().getConfiguration();
916    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
917      @Override
918      public Void run() throws Exception {
919        try (Table table = c.getEnvironment().getConnection().
920            getTable(AccessControlLists.ACL_TABLE_NAME)) {
921          AccessControlLists.removeTablePermissions(conf, tableName, table);
922        }
923        return null;
924      }
925    });
926    getAuthManager().getZKPermissionWatcher().deleteTableACLNode(tableName);
927  }
928
929  @Override
930  public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
931      final TableName tableName) throws IOException {
932    requirePermission(c, "truncateTable",
933        tableName, null, null, Action.ADMIN, Action.CREATE);
934
935    final Configuration conf = c.getEnvironment().getConfiguration();
936    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
937      @Override
938      public Void run() throws Exception {
939        List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
940        if (acls != null) {
941          tableAcls.put(tableName, acls);
942        }
943        return null;
944      }
945    });
946  }
947
948  @Override
949  public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
950      final TableName tableName) throws IOException {
951    final Configuration conf = ctx.getEnvironment().getConfiguration();
952    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
953      @Override
954      public Void run() throws Exception {
955        List<UserPermission> perms = tableAcls.get(tableName);
956        if (perms != null) {
957          for (UserPermission perm : perms) {
958            try (Table table = ctx.getEnvironment().getConnection().
959                getTable(AccessControlLists.ACL_TABLE_NAME)) {
960              AccessControlLists.addUserPermission(conf, perm, table);
961            }
962          }
963        }
964        tableAcls.remove(tableName);
965        return null;
966      }
967    });
968  }
969
970  @Override
971  public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
972      TableDescriptor htd) throws IOException {
973    // TODO: potentially check if this is a add/modify/delete column operation
974    requirePermission(c, "modifyTable",
975        tableName, null, null, Action.ADMIN, Action.CREATE);
976  }
977
978  @Override
979  public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
980      TableName tableName, final TableDescriptor htd) throws IOException {
981    final Configuration conf = c.getEnvironment().getConfiguration();
982    // default the table owner to current user, if not specified.
983    final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
984      getActiveUser(c).getShortName();
985    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
986      @Override
987      public Void run() throws Exception {
988        UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
989            htd.getTableName(), null, Action.values());
990        try (Table table = c.getEnvironment().getConnection().
991            getTable(AccessControlLists.ACL_TABLE_NAME)) {
992          AccessControlLists.addUserPermission(conf, userperm, table);
993        }
994        return null;
995      }
996    });
997  }
998
999  @Override
1000  public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1001      throws IOException {
1002    requirePermission(c, "enableTable",
1003        tableName, null, null, Action.ADMIN, Action.CREATE);
1004  }
1005
1006  @Override
1007  public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1008      throws IOException {
1009    if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1010      // We have to unconditionally disallow disable of the ACL table when we are installed,
1011      // even if not enforcing authorizations. We are still allowing grants and revocations,
1012      // checking permissions and logging audit messages, etc. If the ACL table is not
1013      // available we will fail random actions all over the place.
1014      throw new AccessDeniedException("Not allowed to disable "
1015          + AccessControlLists.ACL_TABLE_NAME + " table with AccessController installed");
1016    }
1017    requirePermission(c, "disableTable",
1018        tableName, null, null, Action.ADMIN, Action.CREATE);
1019  }
1020
1021  @Override
1022  public void preAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx,
1023      final long procId) throws IOException {
1024    requirePermission(ctx, "abortProcedure", Action.ADMIN);
1025  }
1026
1027  @Override
1028  public void postAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx)
1029      throws IOException {
1030    // There is nothing to do at this time after the procedure abort request was sent.
1031  }
1032
1033  @Override
1034  public void preGetProcedures(ObserverContext<MasterCoprocessorEnvironment> ctx)
1035      throws IOException {
1036    requirePermission(ctx, "getProcedure", Action.ADMIN);
1037  }
1038
1039  @Override
1040  public void preGetLocks(ObserverContext<MasterCoprocessorEnvironment> ctx)
1041      throws IOException {
1042    User user = getActiveUser(ctx);
1043    accessChecker.requirePermission(user, "getLocks", Action.ADMIN);
1044  }
1045
1046  @Override
1047  public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo region,
1048      ServerName srcServer, ServerName destServer) throws IOException {
1049    requirePermission(c, "move",
1050        region.getTable(), null, null, Action.ADMIN);
1051  }
1052
1053  @Override
1054  public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo regionInfo)
1055      throws IOException {
1056    requirePermission(c, "assign",
1057        regionInfo.getTable(), null, null, Action.ADMIN);
1058  }
1059
1060  @Override
1061  public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo regionInfo,
1062      boolean force) throws IOException {
1063    requirePermission(c, "unassign",
1064        regionInfo.getTable(), null, null, Action.ADMIN);
1065  }
1066
1067  @Override
1068  public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1069      RegionInfo regionInfo) throws IOException {
1070    requirePermission(c, "regionOffline",
1071        regionInfo.getTable(), null, null, Action.ADMIN);
1072  }
1073
1074  @Override
1075  public void preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1076      final boolean newValue, final MasterSwitchType switchType) throws IOException {
1077    requirePermission(ctx, "setSplitOrMergeEnabled",
1078        Action.ADMIN);
1079  }
1080
1081  @Override
1082  public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1083      throws IOException {
1084    requirePermission(c, "balance", Action.ADMIN);
1085  }
1086
1087  @Override
1088  public void preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1089      boolean newValue) throws IOException {
1090    requirePermission(c, "balanceSwitch", Action.ADMIN);
1091  }
1092
1093  @Override
1094  public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1095      throws IOException {
1096    requirePermission(c, "shutdown", Action.ADMIN);
1097  }
1098
1099  @Override
1100  public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1101      throws IOException {
1102    requirePermission(c, "stopMaster", Action.ADMIN);
1103  }
1104
1105  @Override
1106  public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1107      throws IOException {
1108    try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) {
1109      if (!admin.tableExists(AccessControlLists.ACL_TABLE_NAME)) {
1110        createACLTable(admin);
1111      } else {
1112        this.aclTabAvailable = true;
1113      }
1114    }
1115  }
1116  /**
1117   * Create the ACL table
1118   * @throws IOException
1119   */
1120  private static void createACLTable(Admin admin) throws IOException {
1121    /** Table descriptor for ACL table */
1122    ColumnFamilyDescriptor cfd =
1123        ColumnFamilyDescriptorBuilder.newBuilder(AccessControlLists.ACL_LIST_FAMILY).
1124        setMaxVersions(1).
1125        setInMemory(true).
1126        setBlockCacheEnabled(true).
1127        setBlocksize(8 * 1024).
1128        setBloomFilterType(BloomType.NONE).
1129        setScope(HConstants.REPLICATION_SCOPE_LOCAL).build();
1130    TableDescriptor td =
1131        TableDescriptorBuilder.newBuilder(AccessControlLists.ACL_TABLE_NAME).
1132          setColumnFamily(cfd).build();
1133    admin.createTable(td);
1134  }
1135
1136  @Override
1137  public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1138      final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor)
1139      throws IOException {
1140    // Move this ACL check to SnapshotManager#checkPermissions as part of AC deprecation.
1141    requirePermission(ctx, "snapshot " + snapshot.getName(),
1142        hTableDescriptor.getTableName(), null, null, Permission.Action.ADMIN);
1143  }
1144
1145  @Override
1146  public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1147      final SnapshotDescription snapshot) throws IOException {
1148    User user = getActiveUser(ctx);
1149    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1150      // list it, if user is the owner of snapshot
1151      AuthResult result = AuthResult.allow("listSnapshot " + snapshot.getName(),
1152          "Snapshot owner check allowed", user, null, null, null);
1153      AccessChecker.logResult(result);
1154    } else {
1155      accessChecker.requirePermission(user, "listSnapshot " + snapshot.getName(), Action.ADMIN);
1156    }
1157  }
1158
1159  @Override
1160  public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1161      final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor)
1162      throws IOException {
1163    User user = getActiveUser(ctx);
1164    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)
1165        && hTableDescriptor.getTableName().getNameAsString().equals(snapshot.getTable())) {
1166      // Snapshot owner is allowed to create a table with the same name as the snapshot he took
1167      AuthResult result = AuthResult.allow("cloneSnapshot " + snapshot.getName(),
1168        "Snapshot owner check allowed", user, null, hTableDescriptor.getTableName(), null);
1169      AccessChecker.logResult(result);
1170    } else {
1171      accessChecker.requirePermission(user, "cloneSnapshot " + snapshot.getName(), Action.ADMIN);
1172    }
1173  }
1174
1175  @Override
1176  public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1177      final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor)
1178      throws IOException {
1179    User user = getActiveUser(ctx);
1180    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1181      accessChecker.requirePermission(user, "restoreSnapshot " + snapshot.getName(),
1182          hTableDescriptor.getTableName(), null, null, Permission.Action.ADMIN);
1183    } else {
1184      accessChecker.requirePermission(user, "restoreSnapshot " + snapshot.getName(), Action.ADMIN);
1185    }
1186  }
1187
1188  @Override
1189  public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1190      final SnapshotDescription snapshot) throws IOException {
1191    User user = getActiveUser(ctx);
1192    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1193      // Snapshot owner is allowed to delete the snapshot
1194      AuthResult result = AuthResult.allow("deleteSnapshot " + snapshot.getName(),
1195          "Snapshot owner check allowed", user, null, null, null);
1196      AccessChecker.logResult(result);
1197    } else {
1198      accessChecker.requirePermission(user, "deleteSnapshot " + snapshot.getName(), Action.ADMIN);
1199    }
1200  }
1201
1202  @Override
1203  public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1204      NamespaceDescriptor ns) throws IOException {
1205    requireGlobalPermission(ctx, "createNamespace",
1206        Action.ADMIN, ns.getName());
1207  }
1208
1209  @Override
1210  public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1211      throws IOException {
1212    requireGlobalPermission(ctx, "deleteNamespace",
1213        Action.ADMIN, namespace);
1214  }
1215
1216  @Override
1217  public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1218      final String namespace) throws IOException {
1219    final Configuration conf = ctx.getEnvironment().getConfiguration();
1220    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1221      @Override
1222      public Void run() throws Exception {
1223        try (Table table = ctx.getEnvironment().getConnection().
1224            getTable(AccessControlLists.ACL_TABLE_NAME)) {
1225          AccessControlLists.removeNamespacePermissions(conf, namespace, table);
1226        }
1227        return null;
1228      }
1229    });
1230    getAuthManager().getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
1231    LOG.info(namespace + " entry deleted in " + AccessControlLists.ACL_TABLE_NAME + " table.");
1232  }
1233
1234  @Override
1235  public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1236      NamespaceDescriptor ns) throws IOException {
1237    // We require only global permission so that
1238    // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1239    requireGlobalPermission(ctx, "modifyNamespace",
1240        Action.ADMIN, ns.getName());
1241  }
1242
1243  @Override
1244  public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1245      throws IOException {
1246    requireNamespacePermission(ctx, "getNamespaceDescriptor",
1247        namespace, Action.ADMIN);
1248  }
1249
1250  @Override
1251  public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1252      List<NamespaceDescriptor> descriptors) throws IOException {
1253    // Retains only those which passes authorization checks, as the checks weren't done as part
1254    // of preGetTableDescriptors.
1255    Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1256    User user = getActiveUser(ctx);
1257    while (itr.hasNext()) {
1258      NamespaceDescriptor desc = itr.next();
1259      try {
1260        accessChecker.requireNamespacePermission(user, "listNamespaces",
1261            desc.getName(), Action.ADMIN);
1262      } catch (AccessDeniedException e) {
1263        itr.remove();
1264      }
1265    }
1266  }
1267
1268  @Override
1269  public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1270      final TableName tableName) throws IOException {
1271    // Move this ACL check to MasterFlushTableProcedureManager#checkPermissions as part of AC
1272    // deprecation.
1273    requirePermission(ctx, "flushTable", tableName,
1274        null, null, Action.ADMIN, Action.CREATE);
1275  }
1276
1277  @Override
1278  public void preSplitRegion(
1279      final ObserverContext<MasterCoprocessorEnvironment> ctx,
1280      final TableName tableName,
1281      final byte[] splitRow) throws IOException {
1282    requirePermission(ctx, "split", tableName,
1283        null, null, Action.ADMIN);
1284  }
1285
1286  @Override
1287  public void preClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
1288      throws IOException {
1289    requirePermission(ctx, "clearDeadServers", Action.ADMIN);
1290  }
1291
1292  @Override
1293  public void preDecommissionRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
1294      List<ServerName> servers, boolean offload) throws IOException {
1295    requirePermission(ctx, "decommissionRegionServers", Action.ADMIN);
1296  }
1297
1298  @Override
1299  public void preListDecommissionedRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
1300      throws IOException {
1301    requirePermission(ctx, "listDecommissionedRegionServers",
1302        Action.ADMIN);
1303  }
1304
1305  @Override
1306  public void preRecommissionRegionServer(ObserverContext<MasterCoprocessorEnvironment> ctx,
1307      ServerName server, List<byte[]> encodedRegionNames) throws IOException {
1308    requirePermission(ctx, "recommissionRegionServers", Action.ADMIN);
1309  }
1310
1311  /* ---- RegionObserver implementation ---- */
1312
1313  @Override
1314  public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c)
1315      throws IOException {
1316    RegionCoprocessorEnvironment env = c.getEnvironment();
1317    final Region region = env.getRegion();
1318    if (region == null) {
1319      LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1320    } else {
1321      RegionInfo regionInfo = region.getRegionInfo();
1322      if (regionInfo.getTable().isSystemTable()) {
1323        checkSystemOrSuperUser(getActiveUser(c));
1324      } else {
1325        requirePermission(c, "preOpen", Action.ADMIN);
1326      }
1327    }
1328  }
1329
1330  @Override
1331  public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1332    RegionCoprocessorEnvironment env = c.getEnvironment();
1333    final Region region = env.getRegion();
1334    if (region == null) {
1335      LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1336      return;
1337    }
1338    if (AccessControlLists.isAclRegion(region)) {
1339      aclRegion = true;
1340      try {
1341        initialize(env);
1342      } catch (IOException ex) {
1343        // if we can't obtain permissions, it's better to fail
1344        // than perform checks incorrectly
1345        throw new RuntimeException("Failed to initialize permissions cache", ex);
1346      }
1347    } else {
1348      initialized = true;
1349    }
1350  }
1351
1352  @Override
1353  public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
1354      FlushLifeCycleTracker tracker) throws IOException {
1355    requirePermission(c, "flush", getTableName(c.getEnvironment()),
1356        null, null, Action.ADMIN, Action.CREATE);
1357  }
1358
1359  @Override
1360  public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
1361      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
1362      CompactionRequest request) throws IOException {
1363    requirePermission(c, "compact", getTableName(c.getEnvironment()),
1364        null, null, Action.ADMIN, Action.CREATE);
1365    return scanner;
1366  }
1367
1368  private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1369      final Query query, OpType opType) throws IOException {
1370    Filter filter = query.getFilter();
1371    // Don't wrap an AccessControlFilter
1372    if (filter != null && filter instanceof AccessControlFilter) {
1373      return;
1374    }
1375    User user = getActiveUser(c);
1376    RegionCoprocessorEnvironment env = c.getEnvironment();
1377    Map<byte[],? extends Collection<byte[]>> families = null;
1378    switch (opType) {
1379    case GET:
1380    case EXISTS:
1381      families = ((Get)query).getFamilyMap();
1382      break;
1383    case SCAN:
1384      families = ((Scan)query).getFamilyMap();
1385      break;
1386    default:
1387      throw new RuntimeException("Unhandled operation " + opType);
1388    }
1389    AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1390    Region region = getRegion(env);
1391    TableName table = getTableName(region);
1392    Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1393    for (ColumnFamilyDescriptor hcd : region.getTableDescriptor().getColumnFamilies()) {
1394      cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1395    }
1396    if (!authResult.isAllowed()) {
1397      if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1398        // Old behavior: Scan with only qualifier checks if we have partial
1399        // permission. Backwards compatible behavior is to throw an
1400        // AccessDeniedException immediately if there are no grants for table
1401        // or CF or CF+qual. Only proceed with an injected filter if there are
1402        // grants for qualifiers. Otherwise we will fall through below and log
1403        // the result and throw an ADE. We may end up checking qualifier
1404        // grants three times (permissionGranted above, here, and in the
1405        // filter) but that's the price of backwards compatibility.
1406        if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1407          authResult.setAllowed(true);
1408          authResult.setReason("Access allowed with filter");
1409          // Only wrap the filter if we are enforcing authorizations
1410          if (authorizationEnabled) {
1411            Filter ourFilter = new AccessControlFilter(getAuthManager(), user, table,
1412              AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1413              cfVsMaxVersions);
1414            // wrap any existing filter
1415            if (filter != null) {
1416              ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1417                Lists.newArrayList(ourFilter, filter));
1418            }
1419            switch (opType) {
1420              case GET:
1421              case EXISTS:
1422                ((Get)query).setFilter(ourFilter);
1423                break;
1424              case SCAN:
1425                ((Scan)query).setFilter(ourFilter);
1426                break;
1427              default:
1428                throw new RuntimeException("Unhandled operation " + opType);
1429            }
1430          }
1431        }
1432      } else {
1433        // New behavior: Any access we might be granted is more fine-grained
1434        // than whole table or CF. Simply inject a filter and return what is
1435        // allowed. We will not throw an AccessDeniedException. This is a
1436        // behavioral change since 0.96.
1437        authResult.setAllowed(true);
1438        authResult.setReason("Access allowed with filter");
1439        // Only wrap the filter if we are enforcing authorizations
1440        if (authorizationEnabled) {
1441          Filter ourFilter = new AccessControlFilter(getAuthManager(), user, table,
1442            AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1443          // wrap any existing filter
1444          if (filter != null) {
1445            ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1446              Lists.newArrayList(ourFilter, filter));
1447          }
1448          switch (opType) {
1449            case GET:
1450            case EXISTS:
1451              ((Get)query).setFilter(ourFilter);
1452              break;
1453            case SCAN:
1454              ((Scan)query).setFilter(ourFilter);
1455              break;
1456            default:
1457              throw new RuntimeException("Unhandled operation " + opType);
1458          }
1459        }
1460      }
1461    }
1462
1463    AccessChecker.logResult(authResult);
1464    if (authorizationEnabled && !authResult.isAllowed()) {
1465      throw new AccessDeniedException("Insufficient permissions for user '"
1466          + (user != null ? user.getShortName() : "null")
1467          + "' (table=" + table + ", action=READ)");
1468    }
1469  }
1470
1471  @Override
1472  public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1473      final Get get, final List<Cell> result) throws IOException {
1474    internalPreRead(c, get, OpType.GET);
1475  }
1476
1477  @Override
1478  public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1479      final Get get, final boolean exists) throws IOException {
1480    internalPreRead(c, get, OpType.EXISTS);
1481    return exists;
1482  }
1483
1484  @Override
1485  public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1486      final Put put, final WALEdit edit, final Durability durability)
1487      throws IOException {
1488    User user = getActiveUser(c);
1489    checkForReservedTagPresence(user, put);
1490
1491    // Require WRITE permission to the table, CF, or top visible value, if any.
1492    // NOTE: We don't need to check the permissions for any earlier Puts
1493    // because we treat the ACLs in each Put as timestamped like any other
1494    // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1495    // change the ACL of any previous Put. This allows simple evolution of
1496    // security policy over time without requiring expensive updates.
1497    RegionCoprocessorEnvironment env = c.getEnvironment();
1498    Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1499    AuthResult authResult = permissionGranted(OpType.PUT,
1500        user, env, families, Action.WRITE);
1501    AccessChecker.logResult(authResult);
1502    if (!authResult.isAllowed()) {
1503      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1504        put.setAttribute(CHECK_COVERING_PERM, TRUE);
1505      } else if (authorizationEnabled) {
1506        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1507      }
1508    }
1509
1510    // Add cell ACLs from the operation to the cells themselves
1511    byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1512    if (bytes != null) {
1513      if (cellFeaturesEnabled) {
1514        addCellPermissions(bytes, put.getFamilyCellMap());
1515      } else {
1516        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1517      }
1518    }
1519  }
1520
1521  @Override
1522  public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1523      final Put put, final WALEdit edit, final Durability durability) {
1524    if (aclRegion) {
1525      updateACL(c.getEnvironment(), put.getFamilyCellMap());
1526    }
1527  }
1528
1529  @Override
1530  public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1531      final Delete delete, final WALEdit edit, final Durability durability)
1532      throws IOException {
1533    // An ACL on a delete is useless, we shouldn't allow it
1534    if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1535      throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1536    }
1537    // Require WRITE permissions on all cells covered by the delete. Unlike
1538    // for Puts we need to check all visible prior versions, because a major
1539    // compaction could remove them. If the user doesn't have permission to
1540    // overwrite any of the visible versions ('visible' defined as not covered
1541    // by a tombstone already) then we have to disallow this operation.
1542    RegionCoprocessorEnvironment env = c.getEnvironment();
1543    Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1544    User user = getActiveUser(c);
1545    AuthResult authResult = permissionGranted(OpType.DELETE,
1546        user, env, families, Action.WRITE);
1547    AccessChecker.logResult(authResult);
1548    if (!authResult.isAllowed()) {
1549      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1550        delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1551      } else if (authorizationEnabled) {
1552        throw new AccessDeniedException("Insufficient permissions " +
1553          authResult.toContextString());
1554      }
1555    }
1556  }
1557
1558  @Override
1559  public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1560      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1561    if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1562      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1563      User user = getActiveUser(c);
1564      for (int i = 0; i < miniBatchOp.size(); i++) {
1565        Mutation m = miniBatchOp.getOperation(i);
1566        if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1567          // We have a failure with table, cf and q perm checks and now giving a chance for cell
1568          // perm check
1569          OpType opType;
1570          if (m instanceof Put) {
1571            checkForReservedTagPresence(user, m);
1572            opType = OpType.PUT;
1573          } else {
1574            opType = OpType.DELETE;
1575          }
1576          AuthResult authResult = null;
1577          if (checkCoveringPermission(user, opType, c.getEnvironment(), m.getRow(),
1578            m.getFamilyCellMap(), m.getTimestamp(), Action.WRITE)) {
1579            authResult = AuthResult.allow(opType.toString(), "Covering cell set",
1580              user, Action.WRITE, table, m.getFamilyCellMap());
1581          } else {
1582            authResult = AuthResult.deny(opType.toString(), "Covering cell set",
1583              user, Action.WRITE, table, m.getFamilyCellMap());
1584          }
1585          AccessChecker.logResult(authResult);
1586          if (authorizationEnabled && !authResult.isAllowed()) {
1587            throw new AccessDeniedException("Insufficient permissions "
1588              + authResult.toContextString());
1589          }
1590        }
1591      }
1592    }
1593  }
1594
1595  @Override
1596  public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1597      final Delete delete, final WALEdit edit, final Durability durability)
1598      throws IOException {
1599    if (aclRegion) {
1600      updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1601    }
1602  }
1603
1604  @Override
1605  public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1606      final byte [] row, final byte [] family, final byte [] qualifier,
1607      final CompareOperator op,
1608      final ByteArrayComparable comparator, final Put put,
1609      final boolean result) throws IOException {
1610    User user = getActiveUser(c);
1611    checkForReservedTagPresence(user, put);
1612
1613    // Require READ and WRITE permissions on the table, CF, and KV to update
1614    RegionCoprocessorEnvironment env = c.getEnvironment();
1615    Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1616    AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT,
1617        user, env, families, Action.READ, Action.WRITE);
1618    AccessChecker.logResult(authResult);
1619    if (!authResult.isAllowed()) {
1620      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1621        put.setAttribute(CHECK_COVERING_PERM, TRUE);
1622      } else if (authorizationEnabled) {
1623        throw new AccessDeniedException("Insufficient permissions " +
1624          authResult.toContextString());
1625      }
1626    }
1627
1628    byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1629    if (bytes != null) {
1630      if (cellFeaturesEnabled) {
1631        addCellPermissions(bytes, put.getFamilyCellMap());
1632      } else {
1633        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1634      }
1635    }
1636    return result;
1637  }
1638
1639  @Override
1640  public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1641      final byte[] row, final byte[] family, final byte[] qualifier,
1642      final CompareOperator opp, final ByteArrayComparable comparator, final Put put,
1643      final boolean result) throws IOException {
1644    if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1645      // We had failure with table, cf and q perm checks and now giving a chance for cell
1646      // perm check
1647      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1648      Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1649      AuthResult authResult = null;
1650      User user = getActiveUser(c);
1651      if (checkCoveringPermission(user, OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1652          HConstants.LATEST_TIMESTAMP, Action.READ)) {
1653        authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(),
1654            "Covering cell set", user, Action.READ, table, families);
1655      } else {
1656        authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(),
1657            "Covering cell set", user, Action.READ, table, families);
1658      }
1659      AccessChecker.logResult(authResult);
1660      if (authorizationEnabled && !authResult.isAllowed()) {
1661        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1662      }
1663    }
1664    return result;
1665  }
1666
1667  @Override
1668  public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1669      final byte [] row, final byte [] family, final byte [] qualifier,
1670      final CompareOperator op,
1671      final ByteArrayComparable comparator, final Delete delete,
1672      final boolean result) throws IOException {
1673    // An ACL on a delete is useless, we shouldn't allow it
1674    if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1675      throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1676          delete.toString());
1677    }
1678    // Require READ and WRITE permissions on the table, CF, and the KV covered
1679    // by the delete
1680    RegionCoprocessorEnvironment env = c.getEnvironment();
1681    Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1682    User user = getActiveUser(c);
1683    AuthResult authResult = permissionGranted(
1684        OpType.CHECK_AND_DELETE, user, env, families, Action.READ, Action.WRITE);
1685    AccessChecker.logResult(authResult);
1686    if (!authResult.isAllowed()) {
1687      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1688        delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1689      } else if (authorizationEnabled) {
1690        throw new AccessDeniedException("Insufficient permissions " +
1691          authResult.toContextString());
1692      }
1693    }
1694    return result;
1695  }
1696
1697  @Override
1698  public boolean preCheckAndDeleteAfterRowLock(
1699      final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row,
1700      final byte[] family, final byte[] qualifier, final CompareOperator op,
1701      final ByteArrayComparable comparator, final Delete delete, final boolean result)
1702      throws IOException {
1703    if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1704      // We had failure with table, cf and q perm checks and now giving a chance for cell
1705      // perm check
1706      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1707      Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1708      AuthResult authResult = null;
1709      User user = getActiveUser(c);
1710      if (checkCoveringPermission(user, OpType.CHECK_AND_DELETE, c.getEnvironment(),
1711          row, families, HConstants.LATEST_TIMESTAMP, Action.READ)) {
1712        authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(),
1713            "Covering cell set", user, Action.READ, table, families);
1714      } else {
1715        authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(),
1716            "Covering cell set", user, Action.READ, table, families);
1717      }
1718      AccessChecker.logResult(authResult);
1719      if (authorizationEnabled && !authResult.isAllowed()) {
1720        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1721      }
1722    }
1723    return result;
1724  }
1725
1726  @Override
1727  public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1728      throws IOException {
1729    User user = getActiveUser(c);
1730    checkForReservedTagPresence(user, append);
1731
1732    // Require WRITE permission to the table, CF, and the KV to be appended
1733    RegionCoprocessorEnvironment env = c.getEnvironment();
1734    Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1735    AuthResult authResult = permissionGranted(OpType.APPEND, user,
1736        env, families, Action.WRITE);
1737    AccessChecker.logResult(authResult);
1738    if (!authResult.isAllowed()) {
1739      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1740        append.setAttribute(CHECK_COVERING_PERM, TRUE);
1741      } else if (authorizationEnabled)  {
1742        throw new AccessDeniedException("Insufficient permissions " +
1743          authResult.toContextString());
1744      }
1745    }
1746
1747    byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1748    if (bytes != null) {
1749      if (cellFeaturesEnabled) {
1750        addCellPermissions(bytes, append.getFamilyCellMap());
1751      } else {
1752        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1753      }
1754    }
1755
1756    return null;
1757  }
1758
1759  @Override
1760  public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1761      final Append append) throws IOException {
1762    if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1763      // We had failure with table, cf and q perm checks and now giving a chance for cell
1764      // perm check
1765      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1766      AuthResult authResult = null;
1767      User user = getActiveUser(c);
1768      if (checkCoveringPermission(user, OpType.APPEND, c.getEnvironment(), append.getRow(),
1769          append.getFamilyCellMap(), append.getTimeRange().getMax(), Action.WRITE)) {
1770        authResult = AuthResult.allow(OpType.APPEND.toString(),
1771            "Covering cell set", user, Action.WRITE, table, append.getFamilyCellMap());
1772      } else {
1773        authResult = AuthResult.deny(OpType.APPEND.toString(),
1774            "Covering cell set", user, Action.WRITE, table, append.getFamilyCellMap());
1775      }
1776      AccessChecker.logResult(authResult);
1777      if (authorizationEnabled && !authResult.isAllowed()) {
1778        throw new AccessDeniedException("Insufficient permissions " +
1779          authResult.toContextString());
1780      }
1781    }
1782    return null;
1783  }
1784
1785  @Override
1786  public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1787      final Increment increment)
1788      throws IOException {
1789    User user = getActiveUser(c);
1790    checkForReservedTagPresence(user, increment);
1791
1792    // Require WRITE permission to the table, CF, and the KV to be replaced by
1793    // the incremented value
1794    RegionCoprocessorEnvironment env = c.getEnvironment();
1795    Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1796    AuthResult authResult = permissionGranted(OpType.INCREMENT,
1797        user, env, families, Action.WRITE);
1798    AccessChecker.logResult(authResult);
1799    if (!authResult.isAllowed()) {
1800      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1801        increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1802      } else if (authorizationEnabled) {
1803        throw new AccessDeniedException("Insufficient permissions " +
1804          authResult.toContextString());
1805      }
1806    }
1807
1808    byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1809    if (bytes != null) {
1810      if (cellFeaturesEnabled) {
1811        addCellPermissions(bytes, increment.getFamilyCellMap());
1812      } else {
1813        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1814      }
1815    }
1816
1817    return null;
1818  }
1819
1820  @Override
1821  public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1822      final Increment increment) throws IOException {
1823    if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1824      // We had failure with table, cf and q perm checks and now giving a chance for cell
1825      // perm check
1826      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1827      AuthResult authResult = null;
1828      User user = getActiveUser(c);
1829      if (checkCoveringPermission(user, OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1830          increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1831        authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1832            user, Action.WRITE, table, increment.getFamilyCellMap());
1833      } else {
1834        authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1835            user, Action.WRITE, table, increment.getFamilyCellMap());
1836      }
1837      AccessChecker.logResult(authResult);
1838      if (authorizationEnabled && !authResult.isAllowed()) {
1839        throw new AccessDeniedException("Insufficient permissions " +
1840          authResult.toContextString());
1841      }
1842    }
1843    return null;
1844  }
1845
1846  @Override
1847  public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1848      MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1849    // If the HFile version is insufficient to persist tags, we won't have any
1850    // work to do here
1851    if (!cellFeaturesEnabled) {
1852      return newCell;
1853    }
1854
1855    // Collect any ACLs from the old cell
1856    List<Tag> tags = Lists.newArrayList();
1857    List<Tag> aclTags = Lists.newArrayList();
1858    ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1859    if (oldCell != null) {
1860      Iterator<Tag> tagIterator = PrivateCellUtil.tagsIterator(oldCell);
1861      while (tagIterator.hasNext()) {
1862        Tag tag = tagIterator.next();
1863        if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
1864          // Not an ACL tag, just carry it through
1865          if (LOG.isTraceEnabled()) {
1866            LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType()
1867                + " length " + tag.getValueLength());
1868          }
1869          tags.add(tag);
1870        } else {
1871          aclTags.add(tag);
1872        }
1873      }
1874    }
1875
1876    // Do we have an ACL on the operation?
1877    byte[] aclBytes = mutation.getACL();
1878    if (aclBytes != null) {
1879      // Yes, use it
1880      tags.add(new ArrayBackedTag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
1881    } else {
1882      // No, use what we carried forward
1883      if (perms != null) {
1884        // TODO: If we collected ACLs from more than one tag we may have a
1885        // List<Permission> of size > 1, this can be collapsed into a single
1886        // Permission
1887        if (LOG.isTraceEnabled()) {
1888          LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1889        }
1890        tags.addAll(aclTags);
1891      }
1892    }
1893
1894    // If we have no tags to add, just return
1895    if (tags.isEmpty()) {
1896      return newCell;
1897    }
1898
1899    Cell rewriteCell = PrivateCellUtil.createCell(newCell, tags);
1900    return rewriteCell;
1901  }
1902
1903  @Override
1904  public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan)
1905      throws IOException {
1906    internalPreRead(c, scan, OpType.SCAN);
1907  }
1908
1909  @Override
1910  public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1911      final Scan scan, final RegionScanner s) throws IOException {
1912    User user = getActiveUser(c);
1913    if (user != null && user.getShortName() != null) {
1914      // store reference to scanner owner for later checks
1915      scannerOwners.put(s, user.getShortName());
1916    }
1917    return s;
1918  }
1919
1920  @Override
1921  public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1922      final InternalScanner s, final List<Result> result,
1923      final int limit, final boolean hasNext) throws IOException {
1924    requireScannerOwner(s);
1925    return hasNext;
1926  }
1927
1928  @Override
1929  public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1930      final InternalScanner s) throws IOException {
1931    requireScannerOwner(s);
1932  }
1933
1934  @Override
1935  public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1936      final InternalScanner s) throws IOException {
1937    // clean up any associated owner mapping
1938    scannerOwners.remove(s);
1939  }
1940
1941  @Override
1942  public boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> e,
1943      final InternalScanner s, final Cell curRowCell, final boolean hasMore) throws IOException {
1944    // 'default' in RegionObserver might do unnecessary copy for Off heap backed Cells.
1945    return hasMore;
1946  }
1947
1948  /**
1949   * Verify, when servicing an RPC, that the caller is the scanner owner.
1950   * If so, we assume that access control is correctly enforced based on
1951   * the checks performed in preScannerOpen()
1952   */
1953  private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
1954    if (!RpcServer.isInRpcCallContext()) {
1955      return;
1956    }
1957    String requestUserName = RpcServer.getRequestUserName().orElse(null);
1958    String owner = scannerOwners.get(s);
1959    if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
1960      throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
1961    }
1962  }
1963
1964  /**
1965   * Verifies user has CREATE privileges on
1966   * the Column Families involved in the bulkLoadHFile
1967   * request. Specific Column Write privileges are presently
1968   * ignored.
1969   */
1970  @Override
1971  public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1972      List<Pair<byte[], String>> familyPaths) throws IOException {
1973    User user = getActiveUser(ctx);
1974    for(Pair<byte[],String> el : familyPaths) {
1975      accessChecker.requirePermission(user, "preBulkLoadHFile",
1976          ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(),
1977          el.getFirst(),
1978          null,
1979          Action.CREATE);
1980    }
1981  }
1982
1983  /**
1984   * Authorization check for
1985   * SecureBulkLoadProtocol.prepareBulkLoad()
1986   * @param ctx the context
1987   * @throws IOException
1988   */
1989  @Override
1990  public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
1991  throws IOException {
1992    requireAccess(ctx, "prePrepareBulkLoad",
1993        ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.CREATE);
1994  }
1995
1996  /**
1997   * Authorization security check for
1998   * SecureBulkLoadProtocol.cleanupBulkLoad()
1999   * @param ctx the context
2000   * @throws IOException
2001   */
2002  @Override
2003  public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
2004  throws IOException {
2005    requireAccess(ctx, "preCleanupBulkLoad",
2006        ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.CREATE);
2007  }
2008
2009  /* ---- EndpointObserver implementation ---- */
2010
2011  @Override
2012  public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2013      Service service, String methodName, Message request) throws IOException {
2014    // Don't intercept calls to our own AccessControlService, we check for
2015    // appropriate permissions in the service handlers
2016    if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2017      requirePermission(ctx,
2018          "invoke(" + service.getDescriptorForType().getName() + "." + methodName + ")",
2019          getTableName(ctx.getEnvironment()), null, null,
2020          Action.EXEC);
2021    }
2022    return request;
2023  }
2024
2025  @Override
2026  public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2027      Service service, String methodName, Message request, Message.Builder responseBuilder)
2028      throws IOException { }
2029
2030  /* ---- Protobuf AccessControlService implementation ---- */
2031
2032  @Override
2033  public void grant(RpcController controller,
2034      AccessControlProtos.GrantRequest request,
2035      RpcCallback<AccessControlProtos.GrantResponse> done) {
2036    final UserPermission perm = AccessControlUtil.toUserPermission(request.getUserPermission());
2037    AccessControlProtos.GrantResponse response = null;
2038    try {
2039      // verify it's only running at .acl.
2040      if (aclRegion) {
2041        if (!initialized) {
2042          throw new CoprocessorException("AccessController not yet initialized");
2043        }
2044        if (LOG.isDebugEnabled()) {
2045          LOG.debug("Received request to grant access permission " + perm.toString());
2046        }
2047        User caller = RpcServer.getRequestUser().orElse(null);
2048
2049        switch(request.getUserPermission().getPermission().getType()) {
2050          case Global :
2051          case Table :
2052            accessChecker.requirePermission(caller, "grant", perm.getTableName(),
2053                perm.getFamily(), perm.getQualifier(), Action.ADMIN);
2054            break;
2055          case Namespace :
2056            accessChecker.requireNamespacePermission(caller, "grant", perm.getNamespace(),
2057                Action.ADMIN);
2058           break;
2059        }
2060
2061        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2062          @Override
2063          public Void run() throws Exception {
2064            // regionEnv is set at #start. Hopefully not null at this point.
2065            try (Table table = regionEnv.getConnection().
2066                getTable(AccessControlLists.ACL_TABLE_NAME)) {
2067              AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm, table,
2068                  request.getMergeExistingPermissions());
2069            }
2070            return null;
2071          }
2072        });
2073
2074        if (AUDITLOG.isTraceEnabled()) {
2075          // audit log should store permission changes in addition to auth results
2076          String remoteAddress = RpcServer.getRemoteAddress().map(InetAddress::toString).orElse("");
2077          AUDITLOG.trace("User {} (remote address: {}) granted permission {}", caller, remoteAddress, perm);
2078        }
2079      } else {
2080        throw new CoprocessorException(AccessController.class, "This method "
2081            + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2082      }
2083      response = AccessControlProtos.GrantResponse.getDefaultInstance();
2084    } catch (IOException ioe) {
2085      // pass exception back up
2086      CoprocessorRpcUtils.setControllerException(controller, ioe);
2087    }
2088    done.run(response);
2089  }
2090
2091  @Override
2092  public void revoke(RpcController controller,
2093      AccessControlProtos.RevokeRequest request,
2094      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2095    final UserPermission perm = AccessControlUtil.toUserPermission(request.getUserPermission());
2096    AccessControlProtos.RevokeResponse response = null;
2097    try {
2098      // only allowed to be called on _acl_ region
2099      if (aclRegion) {
2100        if (!initialized) {
2101          throw new CoprocessorException("AccessController not yet initialized");
2102        }
2103        if (LOG.isDebugEnabled()) {
2104          LOG.debug("Received request to revoke access permission " + perm.toString());
2105        }
2106        User caller = RpcServer.getRequestUser().orElse(null);
2107
2108        switch(request.getUserPermission().getPermission().getType()) {
2109          case Global :
2110          case Table :
2111            accessChecker.requirePermission(caller, "revoke", perm.getTableName(), perm.getFamily(),
2112              perm.getQualifier(), Action.ADMIN);
2113            break;
2114          case Namespace :
2115            accessChecker.requireNamespacePermission(caller, "revoke", perm.getNamespace(),
2116                Action.ADMIN);
2117            break;
2118        }
2119
2120        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2121          @Override
2122          public Void run() throws Exception {
2123            // regionEnv is set at #start. Hopefully not null here.
2124            try (Table table = regionEnv.getConnection().
2125                getTable(AccessControlLists.ACL_TABLE_NAME)) {
2126              AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm, table);
2127            }
2128            return null;
2129          }
2130        });
2131
2132        if (AUDITLOG.isTraceEnabled()) {
2133          // audit log should record all permission changes
2134          String remoteAddress = RpcServer.getRemoteAddress().map(InetAddress::toString).orElse("");
2135          AUDITLOG.trace("User {} (remote address: {}) revoked permission {}", caller, remoteAddress, perm);
2136        }
2137      } else {
2138        throw new CoprocessorException(AccessController.class, "This method "
2139            + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2140      }
2141      response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2142    } catch (IOException ioe) {
2143      // pass exception back up
2144      CoprocessorRpcUtils.setControllerException(controller, ioe);
2145    }
2146    done.run(response);
2147  }
2148
2149  @Override
2150  public void getUserPermissions(RpcController controller,
2151      AccessControlProtos.GetUserPermissionsRequest request,
2152      RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2153    AccessControlProtos.GetUserPermissionsResponse response = null;
2154    try {
2155      // only allowed to be called on _acl_ region
2156      if (aclRegion) {
2157        if (!initialized) {
2158          throw new CoprocessorException("AccessController not yet initialized");
2159        }
2160        User caller = RpcServer.getRequestUser().orElse(null);
2161
2162        List<UserPermission> perms = null;
2163        if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2164          final TableName table = request.hasTableName() ?
2165            ProtobufUtil.toTableName(request.getTableName()) : null;
2166          accessChecker.requirePermission(caller, "userPermissions",
2167              table, null, null, Action.ADMIN);
2168          perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2169            @Override
2170            public List<UserPermission> run() throws Exception {
2171              return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2172            }
2173          });
2174        } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2175          final String namespace = request.getNamespaceName().toStringUtf8();
2176          accessChecker.requireNamespacePermission(caller, "userPermissions",
2177              namespace, Action.ADMIN);
2178          perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2179            @Override
2180            public List<UserPermission> run() throws Exception {
2181              return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2182                namespace);
2183            }
2184          });
2185        } else {
2186          accessChecker.requirePermission(caller, "userPermissions", Action.ADMIN);
2187          perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2188            @Override
2189            public List<UserPermission> run() throws Exception {
2190              return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2191            }
2192          });
2193          // Adding superusers explicitly to the result set as AccessControlLists do not store them.
2194          // Also using acl as table name to be inline  with the results of global admin and will
2195          // help in avoiding any leakage of information about being superusers.
2196          for (String user: Superusers.getSuperUsers()) {
2197            perms.add(new UserPermission(Bytes.toBytes(user), AccessControlLists.ACL_TABLE_NAME,
2198                null, Action.values()));
2199          }
2200        }
2201        response = AccessControlUtil.buildGetUserPermissionsResponse(perms);
2202      } else {
2203        throw new CoprocessorException(AccessController.class, "This method "
2204            + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2205      }
2206    } catch (IOException ioe) {
2207      // pass exception back up
2208      CoprocessorRpcUtils.setControllerException(controller, ioe);
2209    }
2210    done.run(response);
2211  }
2212
2213  @Override
2214  public void checkPermissions(RpcController controller,
2215                               AccessControlProtos.CheckPermissionsRequest request,
2216                               RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2217    Permission[] permissions = new Permission[request.getPermissionCount()];
2218    for (int i=0; i < request.getPermissionCount(); i++) {
2219      permissions[i] = AccessControlUtil.toPermission(request.getPermission(i));
2220    }
2221    AccessControlProtos.CheckPermissionsResponse response = null;
2222    try {
2223      User user = RpcServer.getRequestUser().orElse(null);
2224      TableName tableName = regionEnv.getRegion().getTableDescriptor().getTableName();
2225      for (Permission permission : permissions) {
2226        if (permission instanceof TablePermission) {
2227          // Check table permissions
2228
2229          TablePermission tperm = (TablePermission) permission;
2230          for (Action action : permission.getActions()) {
2231            if (!tperm.getTableName().equals(tableName)) {
2232              throw new CoprocessorException(AccessController.class, String.format("This method "
2233                  + "can only execute at the table specified in TablePermission. " +
2234                  "Table of the region:%s , requested table:%s", tableName,
2235                  tperm.getTableName()));
2236            }
2237
2238            Map<byte[], Set<byte[]>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2239            if (tperm.getFamily() != null) {
2240              if (tperm.getQualifier() != null) {
2241                Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2242                qualifiers.add(tperm.getQualifier());
2243                familyMap.put(tperm.getFamily(), qualifiers);
2244              } else {
2245                familyMap.put(tperm.getFamily(), null);
2246              }
2247            }
2248
2249            AuthResult result = permissionGranted("checkPermissions", user, action, regionEnv,
2250              familyMap);
2251            AccessChecker.logResult(result);
2252            if (!result.isAllowed()) {
2253              // Even if passive we need to throw an exception here, we support checking
2254              // effective permissions, so throw unconditionally
2255              throw new AccessDeniedException("Insufficient permissions (table=" + tableName +
2256                (familyMap.size() > 0 ? ", family: " + result.toFamilyString() : "") +
2257                ", action=" + action.toString() + ")");
2258            }
2259          }
2260
2261        } else {
2262          // Check global permissions
2263
2264          for (Action action : permission.getActions()) {
2265            AuthResult result;
2266            if (getAuthManager().authorize(user, action)) {
2267              result = AuthResult.allow("checkPermissions", "Global action allowed", user,
2268                action, null, null);
2269            } else {
2270              result = AuthResult.deny("checkPermissions", "Global action denied", user, action,
2271                null, null);
2272            }
2273            AccessChecker.logResult(result);
2274            if (!result.isAllowed()) {
2275              // Even if passive we need to throw an exception here, we support checking
2276              // effective permissions, so throw unconditionally
2277              throw new AccessDeniedException("Insufficient permissions (action=" +
2278                action.toString() + ")");
2279            }
2280          }
2281        }
2282      }
2283      response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2284    } catch (IOException ioe) {
2285      CoprocessorRpcUtils.setControllerException(controller, ioe);
2286    }
2287    done.run(response);
2288  }
2289
2290  private Region getRegion(RegionCoprocessorEnvironment e) {
2291    return e.getRegion();
2292  }
2293
2294  private TableName getTableName(RegionCoprocessorEnvironment e) {
2295    Region region = e.getRegion();
2296    if (region != null) {
2297      return getTableName(region);
2298    }
2299    return null;
2300  }
2301
2302  private TableName getTableName(Region region) {
2303    RegionInfo regionInfo = region.getRegionInfo();
2304    if (regionInfo != null) {
2305      return regionInfo.getTable();
2306    }
2307    return null;
2308  }
2309
2310  @Override
2311  public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
2312      throws IOException {
2313    requirePermission(c, "preClose", Action.ADMIN);
2314  }
2315
2316  private void checkSystemOrSuperUser(User activeUser) throws IOException {
2317    // No need to check if we're not going to throw
2318    if (!authorizationEnabled) {
2319      return;
2320    }
2321    if (!Superusers.isSuperUser(activeUser)) {
2322      throw new AccessDeniedException("User '" + (activeUser != null ?
2323        activeUser.getShortName() : "null") + "' is not system or super user.");
2324    }
2325  }
2326
2327  @Override
2328  public void preStopRegionServer(
2329      ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2330      throws IOException {
2331    requirePermission(ctx, "preStopRegionServer", Action.ADMIN);
2332  }
2333
2334  private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2335      byte[] qualifier) {
2336    if (family == null) {
2337      return null;
2338    }
2339
2340    Map<byte[], Collection<byte[]>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2341    familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2342    return familyMap;
2343  }
2344
2345  @Override
2346  public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2347       List<TableName> tableNamesList, List<TableDescriptor> descriptors,
2348       String regex) throws IOException {
2349    // We are delegating the authorization check to postGetTableDescriptors as we don't have
2350    // any concrete set of table names when a regex is present or the full list is requested.
2351    if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2352      // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2353      // request can be granted.
2354      TableName [] sns = null;
2355      try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) {
2356        sns = admin.listTableNames();
2357        if (sns == null) return;
2358        for (TableName tableName: tableNamesList) {
2359          // Skip checks for a table that does not exist
2360          if (!admin.tableExists(tableName)) continue;
2361          requirePermission(ctx, "getTableDescriptors", tableName, null, null,
2362            Action.ADMIN, Action.CREATE);
2363        }
2364      }
2365    }
2366  }
2367
2368  @Override
2369  public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2370      List<TableName> tableNamesList, List<TableDescriptor> descriptors,
2371      String regex) throws IOException {
2372    // Skipping as checks in this case are already done by preGetTableDescriptors.
2373    if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2374      return;
2375    }
2376
2377    // Retains only those which passes authorization checks, as the checks weren't done as part
2378    // of preGetTableDescriptors.
2379    Iterator<TableDescriptor> itr = descriptors.iterator();
2380    while (itr.hasNext()) {
2381      TableDescriptor htd = itr.next();
2382      try {
2383        requirePermission(ctx, "getTableDescriptors", htd.getTableName(), null, null,
2384            Action.ADMIN, Action.CREATE);
2385      } catch (AccessDeniedException e) {
2386        itr.remove();
2387      }
2388    }
2389  }
2390
2391  @Override
2392  public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2393      List<TableDescriptor> descriptors, String regex) throws IOException {
2394    // Retains only those which passes authorization checks.
2395    Iterator<TableDescriptor> itr = descriptors.iterator();
2396    while (itr.hasNext()) {
2397      TableDescriptor htd = itr.next();
2398      try {
2399        requireAccess(ctx, "getTableNames", htd.getTableName(), Action.values());
2400      } catch (AccessDeniedException e) {
2401        itr.remove();
2402      }
2403    }
2404  }
2405
2406  @Override
2407  public void preMergeRegions(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2408                              final RegionInfo[] regionsToMerge) throws IOException {
2409    requirePermission(ctx, "mergeRegions", regionsToMerge[0].getTable(), null, null,
2410      Action.ADMIN);
2411  }
2412
2413  @Override
2414  public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2415      throws IOException {
2416    requirePermission(ctx, "preRollLogWriterRequest", Permission.Action.ADMIN);
2417  }
2418
2419  @Override
2420  public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2421      throws IOException { }
2422
2423  @Override
2424  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2425      final String userName, final GlobalQuotaSettings quotas) throws IOException {
2426    requirePermission(ctx, "setUserQuota", Action.ADMIN);
2427  }
2428
2429  @Override
2430  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2431      final String userName, final TableName tableName, final GlobalQuotaSettings quotas)
2432          throws IOException {
2433    requirePermission(ctx, "setUserTableQuota", tableName, null, null, Action.ADMIN);
2434  }
2435
2436  @Override
2437  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2438      final String userName, final String namespace, final GlobalQuotaSettings quotas)
2439          throws IOException {
2440    requirePermission(ctx, "setUserNamespaceQuota", Action.ADMIN);
2441  }
2442
2443  @Override
2444  public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2445      final TableName tableName, final GlobalQuotaSettings quotas) throws IOException {
2446    requirePermission(ctx, "setTableQuota", tableName, null, null, Action.ADMIN);
2447  }
2448
2449  @Override
2450  public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2451      final String namespace, final GlobalQuotaSettings quotas) throws IOException {
2452    requirePermission(ctx, "setNamespaceQuota", Action.ADMIN);
2453  }
2454
2455  @Override
2456  public ReplicationEndpoint postCreateReplicationEndPoint(
2457      ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2458    return endpoint;
2459  }
2460
2461  @Override
2462  public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2463      throws IOException {
2464    requirePermission(ctx, "replicateLogEntries", Action.WRITE);
2465  }
2466
2467  @Override
2468  public void  preClearCompactionQueues(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2469          throws IOException {
2470    requirePermission(ctx, "preClearCompactionQueues", Permission.Action.ADMIN);
2471  }
2472
2473  @Override
2474  public void preAddReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2475      String peerId, ReplicationPeerConfig peerConfig) throws IOException {
2476    requirePermission(ctx, "addReplicationPeer", Action.ADMIN);
2477  }
2478
2479  @Override
2480  public void preRemoveReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2481      String peerId) throws IOException {
2482    requirePermission(ctx, "removeReplicationPeer", Action.ADMIN);
2483  }
2484
2485  @Override
2486  public void preEnableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2487      String peerId) throws IOException {
2488    requirePermission(ctx, "enableReplicationPeer", Action.ADMIN);
2489  }
2490
2491  @Override
2492  public void preDisableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2493      String peerId) throws IOException {
2494    requirePermission(ctx, "disableReplicationPeer", Action.ADMIN);
2495  }
2496
2497  @Override
2498  public void preGetReplicationPeerConfig(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2499      String peerId) throws IOException {
2500    requirePermission(ctx, "getReplicationPeerConfig", Action.ADMIN);
2501  }
2502
2503  @Override
2504  public void preUpdateReplicationPeerConfig(
2505      final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
2506      ReplicationPeerConfig peerConfig) throws IOException {
2507    requirePermission(ctx, "updateReplicationPeerConfig", Action.ADMIN);
2508  }
2509
2510  @Override
2511  public void preListReplicationPeers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2512      String regex) throws IOException {
2513    requirePermission(ctx, "listReplicationPeers", Action.ADMIN);
2514  }
2515
2516  @Override
2517  public void preRequestLock(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace,
2518      TableName tableName, RegionInfo[] regionInfos, String description) throws IOException {
2519    // There are operations in the CREATE and ADMIN domain which may require lock, READ
2520    // or WRITE. So for any lock request, we check for these two perms irrespective of lock type.
2521    String reason = String.format("Description=%s", description);
2522    checkLockPermissions(ctx, namespace, tableName, regionInfos, reason);
2523  }
2524
2525  @Override
2526  public void preLockHeartbeat(ObserverContext<MasterCoprocessorEnvironment> ctx,
2527      TableName tableName, String description) throws IOException {
2528    checkLockPermissions(ctx, null, tableName, null, description);
2529  }
2530
2531  @Override
2532  public void preExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2533      throws IOException {
2534    checkSystemOrSuperUser(getActiveUser(ctx));
2535  }
2536
2537  /**
2538   * Returns the active user to which authorization checks should be applied.
2539   * If we are in the context of an RPC call, the remote user is used,
2540   * otherwise the currently logged in user is used.
2541   */
2542  private User getActiveUser(ObserverContext<?> ctx) throws IOException {
2543    // for non-rpc handling, fallback to system user
2544    Optional<User> optionalUser = ctx.getCaller();
2545    if (optionalUser.isPresent()) {
2546      return optionalUser.get();
2547    }
2548    return userProvider.getCurrent();
2549  }
2550}