001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hbase.security.access;
020
021import com.google.protobuf.Message;
022import com.google.protobuf.RpcCallback;
023import com.google.protobuf.RpcController;
024import com.google.protobuf.Service;
025import java.io.IOException;
026import java.security.PrivilegedExceptionAction;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.Map.Entry;
035import java.util.Optional;
036import java.util.Set;
037import java.util.TreeMap;
038import java.util.TreeSet;
039import java.util.stream.Collectors;
040
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.hbase.ArrayBackedTag;
043import org.apache.hadoop.hbase.Cell;
044import org.apache.hadoop.hbase.CellScanner;
045import org.apache.hadoop.hbase.CellUtil;
046import org.apache.hadoop.hbase.CompareOperator;
047import org.apache.hadoop.hbase.CompoundConfiguration;
048import org.apache.hadoop.hbase.CoprocessorEnvironment;
049import org.apache.hadoop.hbase.DoNotRetryIOException;
050import org.apache.hadoop.hbase.HBaseInterfaceAudience;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.KeyValue;
053import org.apache.hadoop.hbase.KeyValue.Type;
054import org.apache.hadoop.hbase.NamespaceDescriptor;
055import org.apache.hadoop.hbase.PrivateCellUtil;
056import org.apache.hadoop.hbase.ServerName;
057import org.apache.hadoop.hbase.TableName;
058import org.apache.hadoop.hbase.Tag;
059import org.apache.hadoop.hbase.client.Admin;
060import org.apache.hadoop.hbase.client.Append;
061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
063import org.apache.hadoop.hbase.client.Delete;
064import org.apache.hadoop.hbase.client.Durability;
065import org.apache.hadoop.hbase.client.Get;
066import org.apache.hadoop.hbase.client.Increment;
067import org.apache.hadoop.hbase.client.MasterSwitchType;
068import org.apache.hadoop.hbase.client.Mutation;
069import org.apache.hadoop.hbase.client.Put;
070import org.apache.hadoop.hbase.client.Query;
071import org.apache.hadoop.hbase.client.RegionInfo;
072import org.apache.hadoop.hbase.client.Result;
073import org.apache.hadoop.hbase.client.Scan;
074import org.apache.hadoop.hbase.client.SnapshotDescription;
075import org.apache.hadoop.hbase.client.Table;
076import org.apache.hadoop.hbase.client.TableDescriptor;
077import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
078import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
079import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
080import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
081import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
082import org.apache.hadoop.hbase.coprocessor.HasMasterServices;
083import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
084import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
085import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
086import org.apache.hadoop.hbase.coprocessor.MasterObserver;
087import org.apache.hadoop.hbase.coprocessor.ObserverContext;
088import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
089import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
090import org.apache.hadoop.hbase.coprocessor.RegionObserver;
091import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
092import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
093import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
094import org.apache.hadoop.hbase.filter.ByteArrayComparable;
095import org.apache.hadoop.hbase.filter.Filter;
096import org.apache.hadoop.hbase.filter.FilterList;
097import org.apache.hadoop.hbase.io.hfile.HFile;
098import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
099import org.apache.hadoop.hbase.ipc.RpcServer;
100import org.apache.hadoop.hbase.master.MasterServices;
101import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
102import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
103import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
104import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.HasPermissionRequest;
105import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.HasPermissionResponse;
106import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings;
107import org.apache.hadoop.hbase.regionserver.BloomType;
108import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
109import org.apache.hadoop.hbase.regionserver.InternalScanner;
110import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
111import org.apache.hadoop.hbase.regionserver.Region;
112import org.apache.hadoop.hbase.regionserver.RegionScanner;
113import org.apache.hadoop.hbase.regionserver.RegionServerServices;
114import org.apache.hadoop.hbase.regionserver.ScanType;
115import org.apache.hadoop.hbase.regionserver.ScannerContext;
116import org.apache.hadoop.hbase.regionserver.Store;
117import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
118import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
119import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
120import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
121import org.apache.hadoop.hbase.security.AccessDeniedException;
122import org.apache.hadoop.hbase.security.Superusers;
123import org.apache.hadoop.hbase.security.User;
124import org.apache.hadoop.hbase.security.UserProvider;
125import org.apache.hadoop.hbase.security.access.Permission.Action;
126import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
127import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
128import org.apache.hadoop.hbase.util.ByteRange;
129import org.apache.hadoop.hbase.util.Bytes;
130import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
131import org.apache.hadoop.hbase.util.Pair;
132import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
133import org.apache.hadoop.hbase.wal.WALEdit;
134import org.apache.yetus.audience.InterfaceAudience;
135import org.slf4j.Logger;
136import org.slf4j.LoggerFactory;
137
138import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
139import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
140import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
141import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
142import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
143import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
144
145/**
146 * Provides basic authorization checks for data access and administrative
147 * operations.
148 *
149 * <p>
150 * {@code AccessController} performs authorization checks for HBase operations
151 * based on:
152 * </p>
153 * <ul>
154 *   <li>the identity of the user performing the operation</li>
155 *   <li>the scope over which the operation is performed, in increasing
156 *   specificity: global, table, column family, or qualifier</li>
157 *   <li>the type of action being performed (as mapped to
158 *   {@link Permission.Action} values)</li>
159 * </ul>
160 * <p>
161 * If the authorization check fails, an {@link AccessDeniedException}
162 * will be thrown for the operation.
163 * </p>
164 *
165 * <p>
166 * To perform authorization checks, {@code AccessController} relies on the
167 * RpcServerEngine being loaded to provide
168 * the user identities for remote requests.
169 * </p>
170 *
171 * <p>
172 * The access control lists used for authorization can be manipulated via the
173 * exposed {@link AccessControlService} Interface implementation, and the associated
174 * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
175 * commands.
176 * </p>
177 */
178@CoreCoprocessor
179@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
180public class AccessController implements MasterCoprocessor, RegionCoprocessor,
181    RegionServerCoprocessor, AccessControlService.Interface,
182    MasterObserver, RegionObserver, RegionServerObserver, EndpointObserver, BulkLoadObserver {
183  // TODO: encapsulate observer functions into separate class/sub-class.
184
185  private static final Logger LOG = LoggerFactory.getLogger(AccessController.class);
186
187  private static final Logger AUDITLOG =
188    LoggerFactory.getLogger("SecurityLogger."+AccessController.class.getName());
189  private static final String CHECK_COVERING_PERM = "check_covering_perm";
190  private static final String TAG_CHECK_PASSED = "tag_check_passed";
191  private static final byte[] TRUE = Bytes.toBytes(true);
192
193  private AccessChecker accessChecker;
194  private ZKPermissionWatcher zkPermissionWatcher;
195
196  /** flags if we are running on a region of the _acl_ table */
197  private boolean aclRegion = false;
198
199  /** defined only for Endpoint implementation, so it can have way to
200   access region services */
201  private RegionCoprocessorEnvironment regionEnv;
202
203  /** Mapping of scanner instances to the user who created them */
204  private Map<InternalScanner,String> scannerOwners =
205      new MapMaker().weakKeys().makeMap();
206
207  private Map<TableName, List<UserPermission>> tableAcls;
208
209  /** Provider for mapping principal names to Users */
210  private UserProvider userProvider;
211
212  /** if we are active, usually false, only true if "hbase.security.authorization"
213   has been set to true in site configuration */
214  private boolean authorizationEnabled;
215
216  /** if we are able to support cell ACLs */
217  private boolean cellFeaturesEnabled;
218
219  /** if we should check EXEC permissions */
220  private boolean shouldCheckExecPermission;
221
222  /** if we should terminate access checks early as soon as table or CF grants
223    allow access; pre-0.98 compatible behavior */
224  private boolean compatibleEarlyTermination;
225
226  /** if we have been successfully initialized */
227  private volatile boolean initialized = false;
228
229  /** if the ACL table is available, only relevant in the master */
230  private volatile boolean aclTabAvailable = false;
231
232  public static boolean isCellAuthorizationSupported(Configuration conf) {
233    return AccessChecker.isAuthorizationSupported(conf) &&
234        (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
235  }
236
237  public Region getRegion() {
238    return regionEnv != null ? regionEnv.getRegion() : null;
239  }
240
241  public AuthManager getAuthManager() {
242    return accessChecker.getAuthManager();
243  }
244
245  private void initialize(RegionCoprocessorEnvironment e) throws IOException {
246    final Region region = e.getRegion();
247    Configuration conf = e.getConfiguration();
248    Map<byte[], ListMultimap<String, UserPermission>> tables = PermissionStorage.loadAll(region);
249    // For each table, write out the table's permissions to the respective
250    // znode for that table.
251    for (Map.Entry<byte[], ListMultimap<String, UserPermission>> t:
252      tables.entrySet()) {
253      byte[] entry = t.getKey();
254      ListMultimap<String, UserPermission> perms = t.getValue();
255      byte[] serialized = PermissionStorage.writePermissionsAsBytes(perms, conf);
256      zkPermissionWatcher.writeToZookeeper(entry, serialized);
257    }
258    initialized = true;
259  }
260
261  /**
262   * Writes all table ACLs for the tables in the given Map up into ZooKeeper
263   * znodes.  This is called to synchronize ACL changes following {@code _acl_}
264   * table updates.
265   */
266  private void updateACL(RegionCoprocessorEnvironment e,
267      final Map<byte[], List<Cell>> familyMap) {
268    Set<byte[]> entries = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
269    for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
270      List<Cell> cells = f.getValue();
271      for (Cell cell: cells) {
272        if (CellUtil.matchingFamily(cell, PermissionStorage.ACL_LIST_FAMILY)) {
273          entries.add(CellUtil.cloneRow(cell));
274        }
275      }
276    }
277    Configuration conf = regionEnv.getConfiguration();
278    byte [] currentEntry = null;
279    // TODO: Here we are already on the ACL region. (And it is single
280    // region) We can even just get the region from the env and do get
281    // directly. The short circuit connection would avoid the RPC overhead
282    // so no socket communication, req write/read ..  But we have the PB
283    // to and fro conversion overhead. get req is converted to PB req
284    // and results are converted to PB results 1st and then to POJOs
285    // again. We could have avoided such at least in ACL table context..
286    try (Table t = e.getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
287      for (byte[] entry : entries) {
288        currentEntry = entry;
289        ListMultimap<String, UserPermission> perms =
290            PermissionStorage.getPermissions(conf, entry, t, null, null, null, false);
291        byte[] serialized = PermissionStorage.writePermissionsAsBytes(perms, conf);
292        zkPermissionWatcher.writeToZookeeper(entry, serialized);
293      }
294    } catch(IOException ex) {
295          LOG.error("Failed updating permissions mirror for '" +
296                  (currentEntry == null? "null": Bytes.toString(currentEntry)) + "'", ex);
297    }
298  }
299
300  /**
301   * Check the current user for authorization to perform a specific action
302   * against the given set of row data.
303   * @param opType the operation type
304   * @param user the user
305   * @param e the coprocessor environment
306   * @param families the map of column families to qualifiers present in
307   * the request
308   * @param actions the desired actions
309   * @return an authorization result
310   */
311  private AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
312      Map<byte [], ? extends Collection<?>> families, Action... actions) {
313    AuthResult result = null;
314    for (Action action: actions) {
315      result = accessChecker.permissionGranted(opType.toString(), user, action,
316        e.getRegion().getRegionInfo().getTable(), families);
317      if (!result.isAllowed()) {
318        return result;
319      }
320    }
321    return result;
322  }
323
324  public void requireAccess(ObserverContext<?> ctx, String request, TableName tableName,
325      Action... permissions) throws IOException {
326    accessChecker.requireAccess(getActiveUser(ctx), request, tableName, permissions);
327  }
328
329  public void requirePermission(ObserverContext<?> ctx, String request,
330      Action perm) throws IOException {
331    accessChecker.requirePermission(getActiveUser(ctx), request, null, perm);
332  }
333
334  public void requireGlobalPermission(ObserverContext<?> ctx, String request,
335      Action perm, TableName tableName,
336      Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
337    accessChecker.requireGlobalPermission(getActiveUser(ctx), request, perm, tableName, familyMap,
338      null);
339  }
340
341  public void requireGlobalPermission(ObserverContext<?> ctx, String request,
342      Action perm, String namespace) throws IOException {
343    accessChecker.requireGlobalPermission(getActiveUser(ctx),
344        request, perm, namespace);
345  }
346
347  public void requireNamespacePermission(ObserverContext<?> ctx, String request, String namespace,
348      Action... permissions) throws IOException {
349    accessChecker.requireNamespacePermission(getActiveUser(ctx),
350        request, namespace, null, permissions);
351  }
352
353  public void requireNamespacePermission(ObserverContext<?> ctx, String request, String namespace,
354      TableName tableName, Map<byte[], ? extends Collection<byte[]>> familyMap,
355      Action... permissions) throws IOException {
356    accessChecker.requireNamespacePermission(getActiveUser(ctx),
357        request, namespace, tableName, familyMap,
358        permissions);
359  }
360
361  public void requirePermission(ObserverContext<?> ctx, String request, TableName tableName,
362      byte[] family, byte[] qualifier, Action... permissions) throws IOException {
363    accessChecker.requirePermission(getActiveUser(ctx), request,
364        tableName, family, qualifier, null, permissions);
365  }
366
367  public void requireTablePermission(ObserverContext<?> ctx, String request,
368      TableName tableName,byte[] family, byte[] qualifier,
369      Action... permissions) throws IOException {
370    accessChecker.requireTablePermission(getActiveUser(ctx),
371        request, tableName, family, qualifier, permissions);
372  }
373
374  public void checkLockPermissions(ObserverContext<?> ctx, String namespace,
375      TableName tableName, RegionInfo[] regionInfos, String reason)
376      throws IOException {
377    accessChecker.checkLockPermissions(getActiveUser(ctx),
378        namespace, tableName, regionInfos, reason);
379  }
380
381  /**
382   * Returns <code>true</code> if the current user is allowed the given action
383   * over at least one of the column qualifiers in the given column families.
384   */
385  private boolean hasFamilyQualifierPermission(User user,
386      Action perm,
387      RegionCoprocessorEnvironment env,
388      Map<byte[], ? extends Collection<byte[]>> familyMap)
389    throws IOException {
390    RegionInfo hri = env.getRegion().getRegionInfo();
391    TableName tableName = hri.getTable();
392
393    if (user == null) {
394      return false;
395    }
396
397    if (familyMap != null && familyMap.size() > 0) {
398      // at least one family must be allowed
399      for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
400          familyMap.entrySet()) {
401        if (family.getValue() != null && !family.getValue().isEmpty()) {
402          for (byte[] qualifier : family.getValue()) {
403            if (getAuthManager().authorizeUserTable(user, tableName,
404                  family.getKey(), qualifier, perm)) {
405              return true;
406            }
407          }
408        } else {
409          if (getAuthManager().authorizeUserFamily(user, tableName, family.getKey(), perm)) {
410            return true;
411          }
412        }
413      }
414    } else if (LOG.isDebugEnabled()) {
415      LOG.debug("Empty family map passed for permission check");
416    }
417
418    return false;
419  }
420
421  private enum OpType {
422    GET("get"),
423    EXISTS("exists"),
424    SCAN("scan"),
425    PUT("put"),
426    DELETE("delete"),
427    CHECK_AND_PUT("checkAndPut"),
428    CHECK_AND_DELETE("checkAndDelete"),
429    APPEND("append"),
430    INCREMENT("increment");
431
432    private String type;
433
434    private OpType(String type) {
435      this.type = type;
436    }
437
438    @Override
439    public String toString() {
440      return type;
441    }
442  }
443
444  /**
445   * Determine if cell ACLs covered by the operation grant access. This is expensive.
446   * @return false if cell ACLs failed to grant access, true otherwise
447   * @throws IOException
448   */
449  private boolean checkCoveringPermission(User user, OpType request, RegionCoprocessorEnvironment e,
450      byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
451      throws IOException {
452    if (!cellFeaturesEnabled) {
453      return false;
454    }
455    long cellGrants = 0;
456    long latestCellTs = 0;
457    Get get = new Get(row);
458    // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
459    // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
460    // version. We have to get every cell version and check its TS against the TS asked for in
461    // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
462    // consider only one such passing cell. In case of Delete we have to consider all the cell
463    // versions under this passing version. When Delete Mutation contains columns which are a
464    // version delete just consider only one version for those column cells.
465    boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
466    if (considerCellTs) {
467      get.setMaxVersions();
468    } else {
469      get.setMaxVersions(1);
470    }
471    boolean diffCellTsFromOpTs = false;
472    for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
473      byte[] col = entry.getKey();
474      // TODO: HBASE-7114 could possibly unify the collection type in family
475      // maps so we would not need to do this
476      if (entry.getValue() instanceof Set) {
477        Set<byte[]> set = (Set<byte[]>)entry.getValue();
478        if (set == null || set.isEmpty()) {
479          get.addFamily(col);
480        } else {
481          for (byte[] qual: set) {
482            get.addColumn(col, qual);
483          }
484        }
485      } else if (entry.getValue() instanceof List) {
486        List<Cell> list = (List<Cell>)entry.getValue();
487        if (list == null || list.isEmpty()) {
488          get.addFamily(col);
489        } else {
490          // In case of family delete, a Cell will be added into the list with Qualifier as null.
491          for (Cell cell : list) {
492            if (cell.getQualifierLength() == 0
493                && (cell.getTypeByte() == Type.DeleteFamily.getCode()
494                || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
495              get.addFamily(col);
496            } else {
497              get.addColumn(col, CellUtil.cloneQualifier(cell));
498            }
499            if (considerCellTs) {
500              long cellTs = cell.getTimestamp();
501              latestCellTs = Math.max(latestCellTs, cellTs);
502              diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
503            }
504          }
505        }
506      } else if (entry.getValue() == null) {
507        get.addFamily(col);
508      } else {
509        throw new RuntimeException("Unhandled collection type " +
510          entry.getValue().getClass().getName());
511      }
512    }
513    // We want to avoid looking into the future. So, if the cells of the
514    // operation specify a timestamp, or the operation itself specifies a
515    // timestamp, then we use the maximum ts found. Otherwise, we bound
516    // the Get to the current server time. We add 1 to the timerange since
517    // the upper bound of a timerange is exclusive yet we need to examine
518    // any cells found there inclusively.
519    long latestTs = Math.max(opTs, latestCellTs);
520    if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
521      latestTs = EnvironmentEdgeManager.currentTime();
522    }
523    get.setTimeRange(0, latestTs + 1);
524    // In case of Put operation we set to read all versions. This was done to consider the case
525    // where columns are added with TS other than the Mutation TS. But normally this wont be the
526    // case with Put. There no need to get all versions but get latest version only.
527    if (!diffCellTsFromOpTs && request == OpType.PUT) {
528      get.setMaxVersions(1);
529    }
530    if (LOG.isTraceEnabled()) {
531      LOG.trace("Scanning for cells with " + get);
532    }
533    // This Map is identical to familyMap. The key is a BR rather than byte[].
534    // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
535    // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
536    Map<ByteRange, List<Cell>> familyMap1 = new HashMap<>();
537    for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
538      if (entry.getValue() instanceof List) {
539        familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
540      }
541    }
542    RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
543    List<Cell> cells = Lists.newArrayList();
544    Cell prevCell = null;
545    ByteRange curFam = new SimpleMutableByteRange();
546    boolean curColAllVersions = (request == OpType.DELETE);
547    long curColCheckTs = opTs;
548    boolean foundColumn = false;
549    try {
550      boolean more = false;
551      ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
552
553      do {
554        cells.clear();
555        // scan with limit as 1 to hold down memory use on wide rows
556        more = scanner.next(cells, scannerContext);
557        for (Cell cell: cells) {
558          if (LOG.isTraceEnabled()) {
559            LOG.trace("Found cell " + cell);
560          }
561          boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
562          if (colChange) foundColumn = false;
563          prevCell = cell;
564          if (!curColAllVersions && foundColumn) {
565            continue;
566          }
567          if (colChange && considerCellTs) {
568            curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
569            List<Cell> cols = familyMap1.get(curFam);
570            for (Cell col : cols) {
571              // null/empty qualifier is used to denote a Family delete. The TS and delete type
572              // associated with this is applicable for all columns within the family. That is
573              // why the below (col.getQualifierLength() == 0) check.
574              if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
575                  || CellUtil.matchingQualifier(cell, col)) {
576                byte type = col.getTypeByte();
577                if (considerCellTs) {
578                  curColCheckTs = col.getTimestamp();
579                }
580                // For a Delete op we pass allVersions as true. When a Delete Mutation contains
581                // a version delete for a column no need to check all the covering cells within
582                // that column. Check all versions when Type is DeleteColumn or DeleteFamily
583                // One version delete types are Delete/DeleteFamilyVersion
584                curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
585                    || (KeyValue.Type.DeleteFamily.getCode() == type);
586                break;
587              }
588            }
589          }
590          if (cell.getTimestamp() > curColCheckTs) {
591            // Just ignore this cell. This is not a covering cell.
592            continue;
593          }
594          foundColumn = true;
595          for (Action action: actions) {
596            // Are there permissions for this user for the cell?
597            if (!getAuthManager().authorizeCell(user, getTableName(e), cell, action)) {
598              // We can stop if the cell ACL denies access
599              return false;
600            }
601          }
602          cellGrants++;
603        }
604      } while (more);
605    } catch (AccessDeniedException ex) {
606      throw ex;
607    } catch (IOException ex) {
608      LOG.error("Exception while getting cells to calculate covering permission", ex);
609    } finally {
610      scanner.close();
611    }
612    // We should not authorize unless we have found one or more cell ACLs that
613    // grant access. This code is used to check for additional permissions
614    // after no table or CF grants are found.
615    return cellGrants > 0;
616  }
617
618  private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
619    // Iterate over the entries in the familyMap, replacing the cells therein
620    // with new cells including the ACL data
621    for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
622      List<Cell> newCells = Lists.newArrayList();
623      for (Cell cell: e.getValue()) {
624        // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
625        List<Tag> tags = new ArrayList<>();
626        tags.add(new ArrayBackedTag(PermissionStorage.ACL_TAG_TYPE, perms));
627        Iterator<Tag> tagIterator = PrivateCellUtil.tagsIterator(cell);
628        while (tagIterator.hasNext()) {
629          tags.add(tagIterator.next());
630        }
631        newCells.add(PrivateCellUtil.createCell(cell, tags));
632      }
633      // This is supposed to be safe, won't CME
634      e.setValue(newCells);
635    }
636  }
637
638  // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
639  // type is reserved and should not be explicitly set by user.
640  private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
641    // No need to check if we're not going to throw
642    if (!authorizationEnabled) {
643      m.setAttribute(TAG_CHECK_PASSED, TRUE);
644      return;
645    }
646    // Superusers are allowed to store cells unconditionally.
647    if (Superusers.isSuperUser(user)) {
648      m.setAttribute(TAG_CHECK_PASSED, TRUE);
649      return;
650    }
651    // We already checked (prePut vs preBatchMutation)
652    if (m.getAttribute(TAG_CHECK_PASSED) != null) {
653      return;
654    }
655    for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
656      Iterator<Tag> tagsItr = PrivateCellUtil.tagsIterator(cellScanner.current());
657      while (tagsItr.hasNext()) {
658        if (tagsItr.next().getType() == PermissionStorage.ACL_TAG_TYPE) {
659          throw new AccessDeniedException("Mutation contains cell with reserved type tag");
660        }
661      }
662    }
663    m.setAttribute(TAG_CHECK_PASSED, TRUE);
664  }
665
666  /* ---- MasterObserver implementation ---- */
667  @Override
668  public void start(CoprocessorEnvironment env) throws IOException {
669    CompoundConfiguration conf = new CompoundConfiguration();
670    conf.add(env.getConfiguration());
671
672    authorizationEnabled = AccessChecker.isAuthorizationSupported(conf);
673    if (!authorizationEnabled) {
674      LOG.warn("AccessController has been loaded with authorization checks DISABLED!");
675    }
676
677    shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
678      AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
679
680    cellFeaturesEnabled = (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
681    if (!cellFeaturesEnabled) {
682      LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
683          + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
684          + " accordingly.");
685    }
686
687    if (env instanceof MasterCoprocessorEnvironment) {
688      // if running on HMaster
689      MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
690      if (mEnv instanceof HasMasterServices) {
691        MasterServices masterServices = ((HasMasterServices) mEnv).getMasterServices();
692        zkPermissionWatcher = masterServices.getZKPermissionWatcher();
693        accessChecker = masterServices.getAccessChecker();
694      }
695    } else if (env instanceof RegionServerCoprocessorEnvironment) {
696      RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
697      if (rsEnv instanceof HasRegionServerServices) {
698        RegionServerServices rsServices =
699            ((HasRegionServerServices) rsEnv).getRegionServerServices();
700        zkPermissionWatcher = rsServices.getZKPermissionWatcher();
701        accessChecker = rsServices.getAccessChecker();
702      }
703    } else if (env instanceof RegionCoprocessorEnvironment) {
704      // if running at region
705      regionEnv = (RegionCoprocessorEnvironment) env;
706      conf.addBytesMap(regionEnv.getRegion().getTableDescriptor().getValues());
707      compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
708        AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
709      if (regionEnv instanceof HasRegionServerServices) {
710        RegionServerServices rsServices =
711            ((HasRegionServerServices) regionEnv).getRegionServerServices();
712        zkPermissionWatcher = rsServices.getZKPermissionWatcher();
713        accessChecker = rsServices.getAccessChecker();
714      }
715    }
716
717    if (zkPermissionWatcher == null) {
718      throw new NullPointerException("ZKPermissionWatcher is null");
719    } else if (accessChecker == null) {
720      throw new NullPointerException("AccessChecker is null");
721    }
722    // set the user-provider.
723    this.userProvider = UserProvider.instantiate(env.getConfiguration());
724    tableAcls = new MapMaker().weakValues().makeMap();
725  }
726
727  @Override
728  public void stop(CoprocessorEnvironment env) {
729  }
730
731  /*********************************** Observer/Service Getters ***********************************/
732  @Override
733  public Optional<RegionObserver> getRegionObserver() {
734    return Optional.of(this);
735  }
736
737  @Override
738  public Optional<MasterObserver> getMasterObserver() {
739    return Optional.of(this);
740  }
741
742  @Override
743  public Optional<EndpointObserver> getEndpointObserver() {
744    return Optional.of(this);
745  }
746
747  @Override
748  public Optional<BulkLoadObserver> getBulkLoadObserver() {
749    return Optional.of(this);
750  }
751
752  @Override
753  public Optional<RegionServerObserver> getRegionServerObserver() {
754    return Optional.of(this);
755  }
756
757  @Override
758  public Iterable<Service> getServices() {
759    return Collections.singleton(
760        AccessControlProtos.AccessControlService.newReflectiveService(this));
761  }
762
763  /*********************************** Observer implementations ***********************************/
764
765  @Override
766  public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
767      TableDescriptor desc, RegionInfo[] regions) throws IOException {
768    Set<byte[]> families = desc.getColumnFamilyNames();
769    Map<byte[], Set<byte[]>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
770    for (byte[] family: families) {
771      familyMap.put(family, null);
772    }
773    requireNamespacePermission(c, "createTable",
774        desc.getTableName().getNamespaceAsString(), desc.getTableName(), familyMap, Action.ADMIN,
775        Action.CREATE);
776  }
777
778  @Override
779  public void postCompletedCreateTableAction(
780      final ObserverContext<MasterCoprocessorEnvironment> c,
781      final TableDescriptor desc,
782      final RegionInfo[] regions) throws IOException {
783    // When AC is used, it should be configured as the 1st CP.
784    // In Master, the table operations like create, are handled by a Thread pool but the max size
785    // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
786    // sequentially only.
787    // Related code in HMaster#startServiceThreads
788    // {code}
789    //   // We depend on there being only one instance of this executor running
790    //   // at a time. To do concurrency, would need fencing of enable/disable of
791    //   // tables.
792    //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
793    // {code}
794    // In future if we change this pool to have more threads, then there is a chance for thread,
795    // creating acl table, getting delayed and by that time another table creation got over and
796    // this hook is getting called. In such a case, we will need a wait logic here which will
797    // wait till the acl table is created.
798    if (PermissionStorage.isAclTable(desc)) {
799      this.aclTabAvailable = true;
800    } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
801      if (!aclTabAvailable) {
802        LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
803            + PermissionStorage.ACL_TABLE_NAME + " is not yet created. "
804            + getClass().getSimpleName() + " should be configured as the first Coprocessor");
805      } else {
806        String owner = desc.getOwnerString();
807        // default the table owner to current user, if not specified.
808        if (owner == null)
809          owner = getActiveUser(c).getShortName();
810        final UserPermission userPermission = new UserPermission(owner,
811            Permission.newBuilder(desc.getTableName()).withActions(Action.values()).build());
812        // switch to the real hbase master user for doing the RPC on the ACL table
813        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
814          @Override
815          public Void run() throws Exception {
816            try (Table table =
817                c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
818              PermissionStorage.addUserPermission(c.getEnvironment().getConfiguration(),
819                userPermission, table);
820            }
821            return null;
822          }
823        });
824      }
825    }
826  }
827
828  @Override
829  public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
830      throws IOException {
831    requirePermission(c, "deleteTable",
832        tableName, null, null, Action.ADMIN, Action.CREATE);
833  }
834
835  @Override
836  public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
837      final TableName tableName) throws IOException {
838    final Configuration conf = c.getEnvironment().getConfiguration();
839    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
840      @Override
841      public Void run() throws Exception {
842        try (Table table =
843            c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
844          PermissionStorage.removeTablePermissions(conf, tableName, table);
845        }
846        return null;
847      }
848    });
849    zkPermissionWatcher.deleteTableACLNode(tableName);
850  }
851
852  @Override
853  public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
854      final TableName tableName) throws IOException {
855    requirePermission(c, "truncateTable",
856        tableName, null, null, Action.ADMIN, Action.CREATE);
857
858    final Configuration conf = c.getEnvironment().getConfiguration();
859    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
860      @Override
861      public Void run() throws Exception {
862        List<UserPermission> acls =
863            PermissionStorage.getUserTablePermissions(conf, tableName, null, null, null, false);
864        if (acls != null) {
865          tableAcls.put(tableName, acls);
866        }
867        return null;
868      }
869    });
870  }
871
872  @Override
873  public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
874      final TableName tableName) throws IOException {
875    final Configuration conf = ctx.getEnvironment().getConfiguration();
876    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
877      @Override
878      public Void run() throws Exception {
879        List<UserPermission> perms = tableAcls.get(tableName);
880        if (perms != null) {
881          for (UserPermission perm : perms) {
882            try (Table table =
883                ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
884              PermissionStorage.addUserPermission(conf, perm, table);
885            }
886          }
887        }
888        tableAcls.remove(tableName);
889        return null;
890      }
891    });
892  }
893
894  @Override
895  public TableDescriptor preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
896      TableName tableName, TableDescriptor currentDesc, TableDescriptor newDesc)
897      throws IOException {
898    // TODO: potentially check if this is a add/modify/delete column operation
899    requirePermission(c, "modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
900    return newDesc;
901  }
902
903  @Override
904  public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
905      TableName tableName, final TableDescriptor htd) throws IOException {
906    final Configuration conf = c.getEnvironment().getConfiguration();
907    // default the table owner to current user, if not specified.
908    final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
909      getActiveUser(c).getShortName();
910    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
911      @Override
912      public Void run() throws Exception {
913        UserPermission userperm = new UserPermission(owner,
914            Permission.newBuilder(htd.getTableName()).withActions(Action.values()).build());
915        try (Table table =
916            c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
917          PermissionStorage.addUserPermission(conf, userperm, table);
918        }
919        return null;
920      }
921    });
922  }
923
924  @Override
925  public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
926      throws IOException {
927    requirePermission(c, "enableTable",
928        tableName, null, null, Action.ADMIN, Action.CREATE);
929  }
930
931  @Override
932  public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
933      throws IOException {
934    if (Bytes.equals(tableName.getName(), PermissionStorage.ACL_GLOBAL_NAME)) {
935      // We have to unconditionally disallow disable of the ACL table when we are installed,
936      // even if not enforcing authorizations. We are still allowing grants and revocations,
937      // checking permissions and logging audit messages, etc. If the ACL table is not
938      // available we will fail random actions all over the place.
939      throw new AccessDeniedException("Not allowed to disable " + PermissionStorage.ACL_TABLE_NAME
940          + " table with AccessController installed");
941    }
942    requirePermission(c, "disableTable",
943        tableName, null, null, Action.ADMIN, Action.CREATE);
944  }
945
946  @Override
947  public void preAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx,
948      final long procId) throws IOException {
949    requirePermission(ctx, "abortProcedure", Action.ADMIN);
950  }
951
952  @Override
953  public void postAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx)
954      throws IOException {
955    // There is nothing to do at this time after the procedure abort request was sent.
956  }
957
958  @Override
959  public void preGetProcedures(ObserverContext<MasterCoprocessorEnvironment> ctx)
960      throws IOException {
961    requirePermission(ctx, "getProcedure", Action.ADMIN);
962  }
963
964  @Override
965  public void preGetLocks(ObserverContext<MasterCoprocessorEnvironment> ctx)
966      throws IOException {
967    User user = getActiveUser(ctx);
968    accessChecker.requirePermission(user, "getLocks", null, Action.ADMIN);
969  }
970
971  @Override
972  public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo region,
973      ServerName srcServer, ServerName destServer) throws IOException {
974    requirePermission(c, "move",
975        region.getTable(), null, null, Action.ADMIN);
976  }
977
978  @Override
979  public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo regionInfo)
980      throws IOException {
981    requirePermission(c, "assign",
982        regionInfo.getTable(), null, null, Action.ADMIN);
983  }
984
985  @Override
986  public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo regionInfo)
987      throws IOException {
988    requirePermission(c, "unassign",
989        regionInfo.getTable(), null, null, Action.ADMIN);
990  }
991
992  @Override
993  public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
994      RegionInfo regionInfo) throws IOException {
995    requirePermission(c, "regionOffline",
996        regionInfo.getTable(), null, null, Action.ADMIN);
997  }
998
999  @Override
1000  public void preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1001      final boolean newValue, final MasterSwitchType switchType) throws IOException {
1002    requirePermission(ctx, "setSplitOrMergeEnabled",
1003        Action.ADMIN);
1004  }
1005
1006  @Override
1007  public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1008      throws IOException {
1009    requirePermission(c, "balance", Action.ADMIN);
1010  }
1011
1012  @Override
1013  public void preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1014      boolean newValue) throws IOException {
1015    requirePermission(c, "balanceSwitch", Action.ADMIN);
1016  }
1017
1018  @Override
1019  public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1020      throws IOException {
1021    requirePermission(c, "shutdown", Action.ADMIN);
1022  }
1023
1024  @Override
1025  public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1026      throws IOException {
1027    requirePermission(c, "stopMaster", Action.ADMIN);
1028  }
1029
1030  @Override
1031  public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1032      throws IOException {
1033    try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) {
1034      if (!admin.tableExists(PermissionStorage.ACL_TABLE_NAME)) {
1035        createACLTable(admin);
1036      } else {
1037        this.aclTabAvailable = true;
1038      }
1039    }
1040  }
1041  /**
1042   * Create the ACL table
1043   * @throws IOException
1044   */
1045  private static void createACLTable(Admin admin) throws IOException {
1046    /** Table descriptor for ACL table */
1047    ColumnFamilyDescriptor cfd =
1048        ColumnFamilyDescriptorBuilder.newBuilder(PermissionStorage.ACL_LIST_FAMILY).
1049        setMaxVersions(1).
1050        setInMemory(true).
1051        setBlockCacheEnabled(true).
1052        setBlocksize(8 * 1024).
1053        setBloomFilterType(BloomType.NONE).
1054        setScope(HConstants.REPLICATION_SCOPE_LOCAL).build();
1055    TableDescriptor td =
1056        TableDescriptorBuilder.newBuilder(PermissionStorage.ACL_TABLE_NAME).
1057          setColumnFamily(cfd).build();
1058    admin.createTable(td);
1059  }
1060
1061  @Override
1062  public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1063      final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor)
1064      throws IOException {
1065    // Move this ACL check to SnapshotManager#checkPermissions as part of AC deprecation.
1066    requirePermission(ctx, "snapshot " + snapshot.getName(),
1067        hTableDescriptor.getTableName(), null, null, Permission.Action.ADMIN);
1068  }
1069
1070  @Override
1071  public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1072      final SnapshotDescription snapshot) throws IOException {
1073    User user = getActiveUser(ctx);
1074    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1075      // list it, if user is the owner of snapshot
1076      AuthResult result = AuthResult.allow("listSnapshot " + snapshot.getName(),
1077          "Snapshot owner check allowed", user, null, null, null);
1078      AccessChecker.logResult(result);
1079    } else {
1080      accessChecker.requirePermission(user, "listSnapshot " + snapshot.getName(), null,
1081        Action.ADMIN);
1082    }
1083  }
1084
1085  @Override
1086  public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1087      final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor)
1088      throws IOException {
1089    User user = getActiveUser(ctx);
1090    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)
1091        && hTableDescriptor.getTableName().getNameAsString().equals(snapshot.getTable())) {
1092      // Snapshot owner is allowed to create a table with the same name as the snapshot he took
1093      AuthResult result = AuthResult.allow("cloneSnapshot " + snapshot.getName(),
1094        "Snapshot owner check allowed", user, null, hTableDescriptor.getTableName(), null);
1095      AccessChecker.logResult(result);
1096    } else {
1097      accessChecker.requirePermission(user, "cloneSnapshot " + snapshot.getName(), null,
1098        Action.ADMIN);
1099    }
1100  }
1101
1102  @Override
1103  public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1104      final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor)
1105      throws IOException {
1106    User user = getActiveUser(ctx);
1107    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1108      accessChecker.requirePermission(user, "restoreSnapshot " + snapshot.getName(),
1109        hTableDescriptor.getTableName(), null, null, null, Permission.Action.ADMIN);
1110    } else {
1111      accessChecker.requirePermission(user, "restoreSnapshot " + snapshot.getName(), null,
1112        Action.ADMIN);
1113    }
1114  }
1115
1116  @Override
1117  public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1118      final SnapshotDescription snapshot) throws IOException {
1119    User user = getActiveUser(ctx);
1120    if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1121      // Snapshot owner is allowed to delete the snapshot
1122      AuthResult result = AuthResult.allow("deleteSnapshot " + snapshot.getName(),
1123          "Snapshot owner check allowed", user, null, null, null);
1124      AccessChecker.logResult(result);
1125    } else {
1126      accessChecker.requirePermission(user, "deleteSnapshot " + snapshot.getName(), null,
1127        Action.ADMIN);
1128    }
1129  }
1130
1131  @Override
1132  public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1133      NamespaceDescriptor ns) throws IOException {
1134    requireGlobalPermission(ctx, "createNamespace",
1135        Action.ADMIN, ns.getName());
1136  }
1137
1138  @Override
1139  public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1140      throws IOException {
1141    requireGlobalPermission(ctx, "deleteNamespace",
1142        Action.ADMIN, namespace);
1143  }
1144
1145  @Override
1146  public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1147      final String namespace) throws IOException {
1148    final Configuration conf = ctx.getEnvironment().getConfiguration();
1149    User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1150      @Override
1151      public Void run() throws Exception {
1152        try (Table table =
1153            ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
1154          PermissionStorage.removeNamespacePermissions(conf, namespace, table);
1155        }
1156        return null;
1157      }
1158    });
1159    zkPermissionWatcher.deleteNamespaceACLNode(namespace);
1160    LOG.info(namespace + " entry deleted in " + PermissionStorage.ACL_TABLE_NAME + " table.");
1161  }
1162
1163  @Override
1164  public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1165      NamespaceDescriptor ns) throws IOException {
1166    // We require only global permission so that
1167    // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1168    requireGlobalPermission(ctx, "modifyNamespace",
1169        Action.ADMIN, ns.getName());
1170  }
1171
1172  @Override
1173  public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1174      throws IOException {
1175    requireNamespacePermission(ctx, "getNamespaceDescriptor",
1176        namespace, Action.ADMIN);
1177  }
1178
1179  @Override
1180  public void postListNamespaces(ObserverContext<MasterCoprocessorEnvironment> ctx,
1181      List<String> namespaces) throws IOException {
1182    /* always allow namespace listing */
1183  }
1184
1185  @Override
1186  public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1187      List<NamespaceDescriptor> descriptors) throws IOException {
1188    // Retains only those which passes authorization checks, as the checks weren't done as part
1189    // of preGetTableDescriptors.
1190    Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1191    User user = getActiveUser(ctx);
1192    while (itr.hasNext()) {
1193      NamespaceDescriptor desc = itr.next();
1194      try {
1195        accessChecker.requireNamespacePermission(user, "listNamespaces", desc.getName(), null,
1196          Action.ADMIN);
1197      } catch (AccessDeniedException e) {
1198        itr.remove();
1199      }
1200    }
1201  }
1202
1203  @Override
1204  public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1205      final TableName tableName) throws IOException {
1206    // Move this ACL check to MasterFlushTableProcedureManager#checkPermissions as part of AC
1207    // deprecation.
1208    requirePermission(ctx, "flushTable", tableName,
1209        null, null, Action.ADMIN, Action.CREATE);
1210  }
1211
1212  @Override
1213  public void preSplitRegion(
1214      final ObserverContext<MasterCoprocessorEnvironment> ctx,
1215      final TableName tableName,
1216      final byte[] splitRow) throws IOException {
1217    requirePermission(ctx, "split", tableName,
1218        null, null, Action.ADMIN);
1219  }
1220
1221  @Override
1222  public void preClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
1223      throws IOException {
1224    requirePermission(ctx, "clearDeadServers", Action.ADMIN);
1225  }
1226
1227  @Override
1228  public void preDecommissionRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
1229      List<ServerName> servers, boolean offload) throws IOException {
1230    requirePermission(ctx, "decommissionRegionServers", Action.ADMIN);
1231  }
1232
1233  @Override
1234  public void preListDecommissionedRegionServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
1235      throws IOException {
1236    requirePermission(ctx, "listDecommissionedRegionServers",
1237        Action.ADMIN);
1238  }
1239
1240  @Override
1241  public void preRecommissionRegionServer(ObserverContext<MasterCoprocessorEnvironment> ctx,
1242      ServerName server, List<byte[]> encodedRegionNames) throws IOException {
1243    requirePermission(ctx, "recommissionRegionServers", Action.ADMIN);
1244  }
1245
1246  /* ---- RegionObserver implementation ---- */
1247
1248  @Override
1249  public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c)
1250      throws IOException {
1251    RegionCoprocessorEnvironment env = c.getEnvironment();
1252    final Region region = env.getRegion();
1253    if (region == null) {
1254      LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1255    } else {
1256      RegionInfo regionInfo = region.getRegionInfo();
1257      if (regionInfo.getTable().isSystemTable()) {
1258        checkSystemOrSuperUser(getActiveUser(c));
1259      } else {
1260        requirePermission(c, "preOpen", Action.ADMIN);
1261      }
1262    }
1263  }
1264
1265  @Override
1266  public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1267    RegionCoprocessorEnvironment env = c.getEnvironment();
1268    final Region region = env.getRegion();
1269    if (region == null) {
1270      LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1271      return;
1272    }
1273    if (PermissionStorage.isAclRegion(region)) {
1274      aclRegion = true;
1275      try {
1276        initialize(env);
1277      } catch (IOException ex) {
1278        // if we can't obtain permissions, it's better to fail
1279        // than perform checks incorrectly
1280        throw new RuntimeException("Failed to initialize permissions cache", ex);
1281      }
1282    } else {
1283      initialized = true;
1284    }
1285  }
1286
1287  @Override
1288  public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
1289      FlushLifeCycleTracker tracker) throws IOException {
1290    requirePermission(c, "flush", getTableName(c.getEnvironment()),
1291        null, null, Action.ADMIN, Action.CREATE);
1292  }
1293
1294  @Override
1295  public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
1296      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
1297      CompactionRequest request) throws IOException {
1298    requirePermission(c, "compact", getTableName(c.getEnvironment()),
1299        null, null, Action.ADMIN, Action.CREATE);
1300    return scanner;
1301  }
1302
1303  private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1304      final Query query, OpType opType) throws IOException {
1305    Filter filter = query.getFilter();
1306    // Don't wrap an AccessControlFilter
1307    if (filter != null && filter instanceof AccessControlFilter) {
1308      return;
1309    }
1310    User user = getActiveUser(c);
1311    RegionCoprocessorEnvironment env = c.getEnvironment();
1312    Map<byte[],? extends Collection<byte[]>> families = null;
1313    switch (opType) {
1314    case GET:
1315    case EXISTS:
1316      families = ((Get)query).getFamilyMap();
1317      break;
1318    case SCAN:
1319      families = ((Scan)query).getFamilyMap();
1320      break;
1321    default:
1322      throw new RuntimeException("Unhandled operation " + opType);
1323    }
1324    AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1325    Region region = getRegion(env);
1326    TableName table = getTableName(region);
1327    Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1328    for (ColumnFamilyDescriptor hcd : region.getTableDescriptor().getColumnFamilies()) {
1329      cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1330    }
1331    if (!authResult.isAllowed()) {
1332      if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1333        // Old behavior: Scan with only qualifier checks if we have partial
1334        // permission. Backwards compatible behavior is to throw an
1335        // AccessDeniedException immediately if there are no grants for table
1336        // or CF or CF+qual. Only proceed with an injected filter if there are
1337        // grants for qualifiers. Otherwise we will fall through below and log
1338        // the result and throw an ADE. We may end up checking qualifier
1339        // grants three times (permissionGranted above, here, and in the
1340        // filter) but that's the price of backwards compatibility.
1341        if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1342          authResult.setAllowed(true);
1343          authResult.setReason("Access allowed with filter");
1344          // Only wrap the filter if we are enforcing authorizations
1345          if (authorizationEnabled) {
1346            Filter ourFilter = new AccessControlFilter(getAuthManager(), user, table,
1347              AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1348              cfVsMaxVersions);
1349            // wrap any existing filter
1350            if (filter != null) {
1351              ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1352                Lists.newArrayList(ourFilter, filter));
1353            }
1354            switch (opType) {
1355              case GET:
1356              case EXISTS:
1357                ((Get)query).setFilter(ourFilter);
1358                break;
1359              case SCAN:
1360                ((Scan)query).setFilter(ourFilter);
1361                break;
1362              default:
1363                throw new RuntimeException("Unhandled operation " + opType);
1364            }
1365          }
1366        }
1367      } else {
1368        // New behavior: Any access we might be granted is more fine-grained
1369        // than whole table or CF. Simply inject a filter and return what is
1370        // allowed. We will not throw an AccessDeniedException. This is a
1371        // behavioral change since 0.96.
1372        authResult.setAllowed(true);
1373        authResult.setReason("Access allowed with filter");
1374        // Only wrap the filter if we are enforcing authorizations
1375        if (authorizationEnabled) {
1376          Filter ourFilter = new AccessControlFilter(getAuthManager(), user, table,
1377            AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1378          // wrap any existing filter
1379          if (filter != null) {
1380            ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1381              Lists.newArrayList(ourFilter, filter));
1382          }
1383          switch (opType) {
1384            case GET:
1385            case EXISTS:
1386              ((Get)query).setFilter(ourFilter);
1387              break;
1388            case SCAN:
1389              ((Scan)query).setFilter(ourFilter);
1390              break;
1391            default:
1392              throw new RuntimeException("Unhandled operation " + opType);
1393          }
1394        }
1395      }
1396    }
1397
1398    AccessChecker.logResult(authResult);
1399    if (authorizationEnabled && !authResult.isAllowed()) {
1400      throw new AccessDeniedException("Insufficient permissions for user '"
1401          + (user != null ? user.getShortName() : "null")
1402          + "' (table=" + table + ", action=READ)");
1403    }
1404  }
1405
1406  @Override
1407  public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1408      final Get get, final List<Cell> result) throws IOException {
1409    internalPreRead(c, get, OpType.GET);
1410  }
1411
1412  @Override
1413  public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1414      final Get get, final boolean exists) throws IOException {
1415    internalPreRead(c, get, OpType.EXISTS);
1416    return exists;
1417  }
1418
1419  @Override
1420  public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1421      final Put put, final WALEdit edit, final Durability durability)
1422      throws IOException {
1423    User user = getActiveUser(c);
1424    checkForReservedTagPresence(user, put);
1425
1426    // Require WRITE permission to the table, CF, or top visible value, if any.
1427    // NOTE: We don't need to check the permissions for any earlier Puts
1428    // because we treat the ACLs in each Put as timestamped like any other
1429    // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1430    // change the ACL of any previous Put. This allows simple evolution of
1431    // security policy over time without requiring expensive updates.
1432    RegionCoprocessorEnvironment env = c.getEnvironment();
1433    Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1434    AuthResult authResult = permissionGranted(OpType.PUT,
1435        user, env, families, Action.WRITE);
1436    AccessChecker.logResult(authResult);
1437    if (!authResult.isAllowed()) {
1438      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1439        put.setAttribute(CHECK_COVERING_PERM, TRUE);
1440      } else if (authorizationEnabled) {
1441        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1442      }
1443    }
1444
1445    // Add cell ACLs from the operation to the cells themselves
1446    byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1447    if (bytes != null) {
1448      if (cellFeaturesEnabled) {
1449        addCellPermissions(bytes, put.getFamilyCellMap());
1450      } else {
1451        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1452      }
1453    }
1454  }
1455
1456  @Override
1457  public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1458      final Put put, final WALEdit edit, final Durability durability) {
1459    if (aclRegion) {
1460      updateACL(c.getEnvironment(), put.getFamilyCellMap());
1461    }
1462  }
1463
1464  @Override
1465  public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1466      final Delete delete, final WALEdit edit, final Durability durability)
1467      throws IOException {
1468    // An ACL on a delete is useless, we shouldn't allow it
1469    if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1470      throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1471    }
1472    // Require WRITE permissions on all cells covered by the delete. Unlike
1473    // for Puts we need to check all visible prior versions, because a major
1474    // compaction could remove them. If the user doesn't have permission to
1475    // overwrite any of the visible versions ('visible' defined as not covered
1476    // by a tombstone already) then we have to disallow this operation.
1477    RegionCoprocessorEnvironment env = c.getEnvironment();
1478    Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1479    User user = getActiveUser(c);
1480    AuthResult authResult = permissionGranted(OpType.DELETE,
1481        user, env, families, Action.WRITE);
1482    AccessChecker.logResult(authResult);
1483    if (!authResult.isAllowed()) {
1484      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1485        delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1486      } else if (authorizationEnabled) {
1487        throw new AccessDeniedException("Insufficient permissions " +
1488          authResult.toContextString());
1489      }
1490    }
1491  }
1492
1493  @Override
1494  public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1495      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1496    if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1497      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1498      User user = getActiveUser(c);
1499      for (int i = 0; i < miniBatchOp.size(); i++) {
1500        Mutation m = miniBatchOp.getOperation(i);
1501        if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1502          // We have a failure with table, cf and q perm checks and now giving a chance for cell
1503          // perm check
1504          OpType opType;
1505          long timestamp;
1506          if (m instanceof Put) {
1507            checkForReservedTagPresence(user, m);
1508            opType = OpType.PUT;
1509            timestamp = m.getTimestamp();
1510          } else if (m instanceof Delete) {
1511            opType = OpType.DELETE;
1512            timestamp = m.getTimestamp();
1513          } else if (m instanceof Increment) {
1514            opType = OpType.INCREMENT;
1515            timestamp = ((Increment) m).getTimeRange().getMax();
1516          } else if (m instanceof Append) {
1517            opType = OpType.APPEND;
1518            timestamp = ((Append) m).getTimeRange().getMax();
1519          } else {
1520            // If the operation type is not Put/Delete/Increment/Append, do nothing
1521            continue;
1522          }
1523          AuthResult authResult = null;
1524          if (checkCoveringPermission(user, opType, c.getEnvironment(), m.getRow(),
1525            m.getFamilyCellMap(), timestamp, Action.WRITE)) {
1526            authResult = AuthResult.allow(opType.toString(), "Covering cell set",
1527              user, Action.WRITE, table, m.getFamilyCellMap());
1528          } else {
1529            authResult = AuthResult.deny(opType.toString(), "Covering cell set",
1530              user, Action.WRITE, table, m.getFamilyCellMap());
1531          }
1532          AccessChecker.logResult(authResult);
1533          if (authorizationEnabled && !authResult.isAllowed()) {
1534            throw new AccessDeniedException("Insufficient permissions "
1535              + authResult.toContextString());
1536          }
1537        }
1538      }
1539    }
1540  }
1541
1542  @Override
1543  public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1544      final Delete delete, final WALEdit edit, final Durability durability)
1545      throws IOException {
1546    if (aclRegion) {
1547      updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1548    }
1549  }
1550
1551  @Override
1552  public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1553      final byte [] row, final byte [] family, final byte [] qualifier,
1554      final CompareOperator op,
1555      final ByteArrayComparable comparator, final Put put,
1556      final boolean result) throws IOException {
1557    User user = getActiveUser(c);
1558    checkForReservedTagPresence(user, put);
1559
1560    // Require READ and WRITE permissions on the table, CF, and KV to update
1561    RegionCoprocessorEnvironment env = c.getEnvironment();
1562    Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1563    AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT,
1564        user, env, families, Action.READ, Action.WRITE);
1565    AccessChecker.logResult(authResult);
1566    if (!authResult.isAllowed()) {
1567      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1568        put.setAttribute(CHECK_COVERING_PERM, TRUE);
1569      } else if (authorizationEnabled) {
1570        throw new AccessDeniedException("Insufficient permissions " +
1571          authResult.toContextString());
1572      }
1573    }
1574
1575    byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1576    if (bytes != null) {
1577      if (cellFeaturesEnabled) {
1578        addCellPermissions(bytes, put.getFamilyCellMap());
1579      } else {
1580        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1581      }
1582    }
1583    return result;
1584  }
1585
1586  @Override
1587  public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1588      final byte[] row, final byte[] family, final byte[] qualifier,
1589      final CompareOperator opp, final ByteArrayComparable comparator, final Put put,
1590      final boolean result) throws IOException {
1591    if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1592      // We had failure with table, cf and q perm checks and now giving a chance for cell
1593      // perm check
1594      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1595      Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1596      AuthResult authResult = null;
1597      User user = getActiveUser(c);
1598      if (checkCoveringPermission(user, OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1599          HConstants.LATEST_TIMESTAMP, Action.READ)) {
1600        authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(),
1601            "Covering cell set", user, Action.READ, table, families);
1602      } else {
1603        authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(),
1604            "Covering cell set", user, Action.READ, table, families);
1605      }
1606      AccessChecker.logResult(authResult);
1607      if (authorizationEnabled && !authResult.isAllowed()) {
1608        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1609      }
1610    }
1611    return result;
1612  }
1613
1614  @Override
1615  public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1616      final byte [] row, final byte [] family, final byte [] qualifier,
1617      final CompareOperator op,
1618      final ByteArrayComparable comparator, final Delete delete,
1619      final boolean result) throws IOException {
1620    // An ACL on a delete is useless, we shouldn't allow it
1621    if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1622      throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1623          delete.toString());
1624    }
1625    // Require READ and WRITE permissions on the table, CF, and the KV covered
1626    // by the delete
1627    RegionCoprocessorEnvironment env = c.getEnvironment();
1628    Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1629    User user = getActiveUser(c);
1630    AuthResult authResult = permissionGranted(
1631        OpType.CHECK_AND_DELETE, user, env, families, Action.READ, Action.WRITE);
1632    AccessChecker.logResult(authResult);
1633    if (!authResult.isAllowed()) {
1634      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1635        delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1636      } else if (authorizationEnabled) {
1637        throw new AccessDeniedException("Insufficient permissions " +
1638          authResult.toContextString());
1639      }
1640    }
1641    return result;
1642  }
1643
1644  @Override
1645  public boolean preCheckAndDeleteAfterRowLock(
1646      final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row,
1647      final byte[] family, final byte[] qualifier, final CompareOperator op,
1648      final ByteArrayComparable comparator, final Delete delete, final boolean result)
1649      throws IOException {
1650    if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1651      // We had failure with table, cf and q perm checks and now giving a chance for cell
1652      // perm check
1653      TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1654      Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1655      AuthResult authResult = null;
1656      User user = getActiveUser(c);
1657      if (checkCoveringPermission(user, OpType.CHECK_AND_DELETE, c.getEnvironment(),
1658          row, families, HConstants.LATEST_TIMESTAMP, Action.READ)) {
1659        authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(),
1660            "Covering cell set", user, Action.READ, table, families);
1661      } else {
1662        authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(),
1663            "Covering cell set", user, Action.READ, table, families);
1664      }
1665      AccessChecker.logResult(authResult);
1666      if (authorizationEnabled && !authResult.isAllowed()) {
1667        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1668      }
1669    }
1670    return result;
1671  }
1672
1673  @Override
1674  public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1675      throws IOException {
1676    User user = getActiveUser(c);
1677    checkForReservedTagPresence(user, append);
1678
1679    // Require WRITE permission to the table, CF, and the KV to be appended
1680    RegionCoprocessorEnvironment env = c.getEnvironment();
1681    Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1682    AuthResult authResult = permissionGranted(OpType.APPEND, user,
1683        env, families, Action.WRITE);
1684    AccessChecker.logResult(authResult);
1685    if (!authResult.isAllowed()) {
1686      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1687        append.setAttribute(CHECK_COVERING_PERM, TRUE);
1688      } else if (authorizationEnabled)  {
1689        throw new AccessDeniedException("Insufficient permissions " +
1690          authResult.toContextString());
1691      }
1692    }
1693
1694    byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1695    if (bytes != null) {
1696      if (cellFeaturesEnabled) {
1697        addCellPermissions(bytes, append.getFamilyCellMap());
1698      } else {
1699        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1700      }
1701    }
1702
1703    return null;
1704  }
1705
1706  @Override
1707  public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1708      final Increment increment)
1709      throws IOException {
1710    User user = getActiveUser(c);
1711    checkForReservedTagPresence(user, increment);
1712
1713    // Require WRITE permission to the table, CF, and the KV to be replaced by
1714    // the incremented value
1715    RegionCoprocessorEnvironment env = c.getEnvironment();
1716    Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1717    AuthResult authResult = permissionGranted(OpType.INCREMENT,
1718        user, env, families, Action.WRITE);
1719    AccessChecker.logResult(authResult);
1720    if (!authResult.isAllowed()) {
1721      if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1722        increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1723      } else if (authorizationEnabled) {
1724        throw new AccessDeniedException("Insufficient permissions " +
1725          authResult.toContextString());
1726      }
1727    }
1728
1729    byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1730    if (bytes != null) {
1731      if (cellFeaturesEnabled) {
1732        addCellPermissions(bytes, increment.getFamilyCellMap());
1733      } else {
1734        throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1735      }
1736    }
1737
1738    return null;
1739  }
1740
1741  @Override
1742  public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
1743      ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
1744      List<Pair<Cell, Cell>> cellPairs) throws IOException {
1745    // If the HFile version is insufficient to persist tags, we won't have any
1746    // work to do here
1747    if (!cellFeaturesEnabled) {
1748      return cellPairs;
1749    }
1750    return cellPairs.stream().map(pair -> new Pair<>(pair.getFirst(),
1751        createNewCellWithTags(mutation, pair.getFirst(), pair.getSecond())))
1752        .collect(Collectors.toList());
1753  }
1754
1755  @Override
1756  public List<Pair<Cell, Cell>> postAppendBeforeWAL(
1757      ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
1758      List<Pair<Cell, Cell>> cellPairs) throws IOException {
1759    // If the HFile version is insufficient to persist tags, we won't have any
1760    // work to do here
1761    if (!cellFeaturesEnabled) {
1762      return cellPairs;
1763    }
1764    return cellPairs.stream().map(pair -> new Pair<>(pair.getFirst(),
1765        createNewCellWithTags(mutation, pair.getFirst(), pair.getSecond())))
1766        .collect(Collectors.toList());
1767  }
1768
1769  private Cell createNewCellWithTags(Mutation mutation, Cell oldCell, Cell newCell) {
1770    // Collect any ACLs from the old cell
1771    List<Tag> tags = Lists.newArrayList();
1772    List<Tag> aclTags = Lists.newArrayList();
1773    ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1774    if (oldCell != null) {
1775      Iterator<Tag> tagIterator = PrivateCellUtil.tagsIterator(oldCell);
1776      while (tagIterator.hasNext()) {
1777        Tag tag = tagIterator.next();
1778        if (tag.getType() != PermissionStorage.ACL_TAG_TYPE) {
1779          // Not an ACL tag, just carry it through
1780          if (LOG.isTraceEnabled()) {
1781            LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType()
1782                + " length " + tag.getValueLength());
1783          }
1784          tags.add(tag);
1785        } else {
1786          aclTags.add(tag);
1787        }
1788      }
1789    }
1790
1791    // Do we have an ACL on the operation?
1792    byte[] aclBytes = mutation.getACL();
1793    if (aclBytes != null) {
1794      // Yes, use it
1795      tags.add(new ArrayBackedTag(PermissionStorage.ACL_TAG_TYPE, aclBytes));
1796    } else {
1797      // No, use what we carried forward
1798      if (perms != null) {
1799        // TODO: If we collected ACLs from more than one tag we may have a
1800        // List<Permission> of size > 1, this can be collapsed into a single
1801        // Permission
1802        if (LOG.isTraceEnabled()) {
1803          LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1804        }
1805        tags.addAll(aclTags);
1806      }
1807    }
1808
1809    // If we have no tags to add, just return
1810    if (tags.isEmpty()) {
1811      return newCell;
1812    }
1813
1814    return PrivateCellUtil.createCell(newCell, tags);
1815  }
1816
1817  @Override
1818  public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan)
1819      throws IOException {
1820    internalPreRead(c, scan, OpType.SCAN);
1821  }
1822
1823  @Override
1824  public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1825      final Scan scan, final RegionScanner s) throws IOException {
1826    User user = getActiveUser(c);
1827    if (user != null && user.getShortName() != null) {
1828      // store reference to scanner owner for later checks
1829      scannerOwners.put(s, user.getShortName());
1830    }
1831    return s;
1832  }
1833
1834  @Override
1835  public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1836      final InternalScanner s, final List<Result> result,
1837      final int limit, final boolean hasNext) throws IOException {
1838    requireScannerOwner(s);
1839    return hasNext;
1840  }
1841
1842  @Override
1843  public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1844      final InternalScanner s) throws IOException {
1845    requireScannerOwner(s);
1846  }
1847
1848  @Override
1849  public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1850      final InternalScanner s) throws IOException {
1851    // clean up any associated owner mapping
1852    scannerOwners.remove(s);
1853  }
1854
1855  @Override
1856  public boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> e,
1857      final InternalScanner s, final Cell curRowCell, final boolean hasMore) throws IOException {
1858    // 'default' in RegionObserver might do unnecessary copy for Off heap backed Cells.
1859    return hasMore;
1860  }
1861
1862  /**
1863   * Verify, when servicing an RPC, that the caller is the scanner owner.
1864   * If so, we assume that access control is correctly enforced based on
1865   * the checks performed in preScannerOpen()
1866   */
1867  private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
1868    if (!RpcServer.isInRpcCallContext()) {
1869      return;
1870    }
1871    String requestUserName = RpcServer.getRequestUserName().orElse(null);
1872    String owner = scannerOwners.get(s);
1873    if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
1874      throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
1875    }
1876  }
1877
1878  /**
1879   * Verifies user has CREATE or ADMIN privileges on
1880   * the Column Families involved in the bulkLoadHFile
1881   * request. Specific Column Write privileges are presently
1882   * ignored.
1883   */
1884  @Override
1885  public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1886      List<Pair<byte[], String>> familyPaths) throws IOException {
1887    User user = getActiveUser(ctx);
1888    for(Pair<byte[],String> el : familyPaths) {
1889      accessChecker.requirePermission(user, "preBulkLoadHFile",
1890        ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), el.getFirst(), null,
1891        null, Action.ADMIN, Action.CREATE);
1892    }
1893  }
1894
1895  /**
1896   * Authorization check for
1897   * SecureBulkLoadProtocol.prepareBulkLoad()
1898   * @param ctx the context
1899   * @throws IOException
1900   */
1901  @Override
1902  public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
1903  throws IOException {
1904    requireAccess(ctx, "prePrepareBulkLoad",
1905        ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.ADMIN,
1906        Action.CREATE);
1907  }
1908
1909  /**
1910   * Authorization security check for
1911   * SecureBulkLoadProtocol.cleanupBulkLoad()
1912   * @param ctx the context
1913   * @throws IOException
1914   */
1915  @Override
1916  public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx)
1917  throws IOException {
1918    requireAccess(ctx, "preCleanupBulkLoad",
1919        ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), Action.ADMIN,
1920        Action.CREATE);
1921  }
1922
1923  /* ---- EndpointObserver implementation ---- */
1924
1925  @Override
1926  public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1927      Service service, String methodName, Message request) throws IOException {
1928    // Don't intercept calls to our own AccessControlService, we check for
1929    // appropriate permissions in the service handlers
1930    if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
1931      requirePermission(ctx,
1932          "invoke(" + service.getDescriptorForType().getName() + "." + methodName + ")",
1933          getTableName(ctx.getEnvironment()), null, null,
1934          Action.EXEC);
1935    }
1936    return request;
1937  }
1938
1939  @Override
1940  public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1941      Service service, String methodName, Message request, Message.Builder responseBuilder)
1942      throws IOException { }
1943
1944  /* ---- Protobuf AccessControlService implementation ---- */
1945
1946  /**
1947   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
1948   *   {@link Admin#grant(UserPermission, boolean)} instead.
1949   * @see Admin#grant(UserPermission, boolean)
1950   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21739">HBASE-21739</a>
1951   */
1952  @Deprecated
1953  @Override
1954  public void grant(RpcController controller,
1955      AccessControlProtos.GrantRequest request,
1956      RpcCallback<AccessControlProtos.GrantResponse> done) {
1957    final UserPermission perm = AccessControlUtil.toUserPermission(request.getUserPermission());
1958    AccessControlProtos.GrantResponse response = null;
1959    try {
1960      // verify it's only running at .acl.
1961      if (aclRegion) {
1962        if (!initialized) {
1963          throw new CoprocessorException("AccessController not yet initialized");
1964        }
1965        User caller = RpcServer.getRequestUser().orElse(null);
1966        if (LOG.isDebugEnabled()) {
1967          LOG.debug("Received request from {} to grant access permission {}",
1968            caller.getName(), perm.toString());
1969        }
1970        preGrantOrRevoke(caller, "grant", perm);
1971
1972        // regionEnv is set at #start. Hopefully not null at this point.
1973        regionEnv.getConnection().getAdmin().grant(
1974          new UserPermission(perm.getUser(), perm.getPermission()),
1975          request.getMergeExistingPermissions());
1976        if (AUDITLOG.isTraceEnabled()) {
1977          // audit log should store permission changes in addition to auth results
1978          AUDITLOG.trace("Granted permission " + perm.toString());
1979        }
1980      } else {
1981        throw new CoprocessorException(AccessController.class, "This method "
1982            + "can only execute at " + PermissionStorage.ACL_TABLE_NAME + " table.");
1983      }
1984      response = AccessControlProtos.GrantResponse.getDefaultInstance();
1985    } catch (IOException ioe) {
1986      // pass exception back up
1987      CoprocessorRpcUtils.setControllerException(controller, ioe);
1988    }
1989    done.run(response);
1990  }
1991
1992  /**
1993   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use {@link Admin#revoke(UserPermission)}
1994   *   instead.
1995   * @see Admin#revoke(UserPermission)
1996   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21739">HBASE-21739</a>
1997   */
1998  @Deprecated
1999  @Override
2000  public void revoke(RpcController controller, AccessControlProtos.RevokeRequest request,
2001      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2002    final UserPermission perm = AccessControlUtil.toUserPermission(request.getUserPermission());
2003    AccessControlProtos.RevokeResponse response = null;
2004    try {
2005      // only allowed to be called on _acl_ region
2006      if (aclRegion) {
2007        if (!initialized) {
2008          throw new CoprocessorException("AccessController not yet initialized");
2009        }
2010        User caller = RpcServer.getRequestUser().orElse(null);
2011        if (LOG.isDebugEnabled()) {
2012          LOG.debug("Received request from {} to revoke access permission {}",
2013            caller.getShortName(), perm.toString());
2014        }
2015        preGrantOrRevoke(caller, "revoke", perm);
2016        // regionEnv is set at #start. Hopefully not null here.
2017        regionEnv.getConnection().getAdmin()
2018            .revoke(new UserPermission(perm.getUser(), perm.getPermission()));
2019        if (AUDITLOG.isTraceEnabled()) {
2020          // audit log should record all permission changes
2021          AUDITLOG.trace("Revoked permission " + perm.toString());
2022        }
2023      } else {
2024        throw new CoprocessorException(AccessController.class, "This method "
2025            + "can only execute at " + PermissionStorage.ACL_TABLE_NAME + " table.");
2026      }
2027      response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2028    } catch (IOException ioe) {
2029      // pass exception back up
2030      CoprocessorRpcUtils.setControllerException(controller, ioe);
2031    }
2032    done.run(response);
2033  }
2034
2035  /**
2036   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
2037   *   {@link Admin#getUserPermissions(GetUserPermissionsRequest)} instead.
2038   * @see Admin#getUserPermissions(GetUserPermissionsRequest)
2039   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21911">HBASE-21911</a>
2040   */
2041  @Deprecated
2042  @Override
2043  public void getUserPermissions(RpcController controller,
2044      AccessControlProtos.GetUserPermissionsRequest request,
2045      RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2046    AccessControlProtos.GetUserPermissionsResponse response = null;
2047    try {
2048      // only allowed to be called on _acl_ region
2049      if (aclRegion) {
2050        if (!initialized) {
2051          throw new CoprocessorException("AccessController not yet initialized");
2052        }
2053        User caller = RpcServer.getRequestUser().orElse(null);
2054        final String userName = request.hasUserName() ? request.getUserName().toStringUtf8() : null;
2055        final String namespace =
2056            request.hasNamespaceName() ? request.getNamespaceName().toStringUtf8() : null;
2057        final TableName table =
2058            request.hasTableName() ? ProtobufUtil.toTableName(request.getTableName()) : null;
2059        final byte[] cf =
2060            request.hasColumnFamily() ? request.getColumnFamily().toByteArray() : null;
2061        final byte[] cq =
2062            request.hasColumnQualifier() ? request.getColumnQualifier().toByteArray() : null;
2063        preGetUserPermissions(caller, userName, namespace, table, cf, cq);
2064        GetUserPermissionsRequest getUserPermissionsRequest = null;
2065        if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2066          getUserPermissionsRequest = GetUserPermissionsRequest.newBuilder(table).withFamily(cf)
2067              .withQualifier(cq).withUserName(userName).build();
2068        } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2069          getUserPermissionsRequest =
2070              GetUserPermissionsRequest.newBuilder(namespace).withUserName(userName).build();
2071        } else {
2072          getUserPermissionsRequest =
2073              GetUserPermissionsRequest.newBuilder().withUserName(userName).build();
2074        }
2075        List<UserPermission> perms =
2076            regionEnv.getConnection().getAdmin().getUserPermissions(getUserPermissionsRequest);
2077        response = AccessControlUtil.buildGetUserPermissionsResponse(perms);
2078      } else {
2079        throw new CoprocessorException(AccessController.class, "This method "
2080            + "can only execute at " + PermissionStorage.ACL_TABLE_NAME + " table.");
2081      }
2082    } catch (IOException ioe) {
2083      // pass exception back up
2084      CoprocessorRpcUtils.setControllerException(controller, ioe);
2085    }
2086    done.run(response);
2087  }
2088
2089  /**
2090   * @deprecated since 2.2.0 and will be removed 4.0.0. Use {@link Admin#hasUserPermissions(List)}
2091   *   instead.
2092   * @see Admin#hasUserPermissions(List)
2093   * @see <a href="https://issues.apache.org/jira/browse/HBASE-22117">HBASE-22117</a>
2094   */
2095  @Deprecated
2096  @Override
2097  public void checkPermissions(RpcController controller,
2098      AccessControlProtos.CheckPermissionsRequest request,
2099      RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2100    AccessControlProtos.CheckPermissionsResponse response = null;
2101    try {
2102      User user = RpcServer.getRequestUser().orElse(null);
2103      TableName tableName = regionEnv.getRegion().getTableDescriptor().getTableName();
2104      List<Permission> permissions = new ArrayList<>();
2105      for (int i = 0; i < request.getPermissionCount(); i++) {
2106        Permission permission = AccessControlUtil.toPermission(request.getPermission(i));
2107        permissions.add(permission);
2108        if (permission instanceof TablePermission) {
2109          TablePermission tperm = (TablePermission) permission;
2110          if (!tperm.getTableName().equals(tableName)) {
2111            throw new CoprocessorException(AccessController.class,
2112                String.format(
2113                  "This method can only execute at the table specified in "
2114                      + "TablePermission. Table of the region:%s , requested table:%s",
2115                  tableName, tperm.getTableName()));
2116          }
2117        }
2118      }
2119      for (Permission permission : permissions) {
2120        boolean hasPermission =
2121            accessChecker.hasUserPermission(user, "checkPermissions", permission);
2122        if (!hasPermission) {
2123          throw new AccessDeniedException("Insufficient permissions " + permission.toString());
2124        }
2125      }
2126      response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2127    } catch (IOException ioe) {
2128      CoprocessorRpcUtils.setControllerException(controller, ioe);
2129    }
2130    done.run(response);
2131  }
2132
2133  private Region getRegion(RegionCoprocessorEnvironment e) {
2134    return e.getRegion();
2135  }
2136
2137  private TableName getTableName(RegionCoprocessorEnvironment e) {
2138    Region region = e.getRegion();
2139    if (region != null) {
2140      return getTableName(region);
2141    }
2142    return null;
2143  }
2144
2145  private TableName getTableName(Region region) {
2146    RegionInfo regionInfo = region.getRegionInfo();
2147    if (regionInfo != null) {
2148      return regionInfo.getTable();
2149    }
2150    return null;
2151  }
2152
2153  @Override
2154  public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
2155      throws IOException {
2156    requirePermission(c, "preClose", Action.ADMIN);
2157  }
2158
2159  private void checkSystemOrSuperUser(User activeUser) throws IOException {
2160    // No need to check if we're not going to throw
2161    if (!authorizationEnabled) {
2162      return;
2163    }
2164    if (!Superusers.isSuperUser(activeUser)) {
2165      throw new AccessDeniedException("User '" + (activeUser != null ?
2166        activeUser.getShortName() : "null") + "' is not system or super user.");
2167    }
2168  }
2169
2170  @Override
2171  public void preStopRegionServer(
2172      ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2173      throws IOException {
2174    requirePermission(ctx, "preStopRegionServer", Action.ADMIN);
2175  }
2176
2177  private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2178      byte[] qualifier) {
2179    if (family == null) {
2180      return null;
2181    }
2182
2183    Map<byte[], Collection<byte[]>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2184    familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2185    return familyMap;
2186  }
2187
2188  @Override
2189  public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2190       List<TableName> tableNamesList, List<TableDescriptor> descriptors,
2191       String regex) throws IOException {
2192    // We are delegating the authorization check to postGetTableDescriptors as we don't have
2193    // any concrete set of table names when a regex is present or the full list is requested.
2194    if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2195      // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2196      // request can be granted.
2197      TableName [] sns = null;
2198      try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) {
2199        sns = admin.listTableNames();
2200        if (sns == null) return;
2201        for (TableName tableName: tableNamesList) {
2202          // Skip checks for a table that does not exist
2203          if (!admin.tableExists(tableName)) continue;
2204          requirePermission(ctx, "getTableDescriptors", tableName, null, null,
2205            Action.ADMIN, Action.CREATE);
2206        }
2207      }
2208    }
2209  }
2210
2211  @Override
2212  public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2213      List<TableName> tableNamesList, List<TableDescriptor> descriptors,
2214      String regex) throws IOException {
2215    // Skipping as checks in this case are already done by preGetTableDescriptors.
2216    if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2217      return;
2218    }
2219
2220    // Retains only those which passes authorization checks, as the checks weren't done as part
2221    // of preGetTableDescriptors.
2222    Iterator<TableDescriptor> itr = descriptors.iterator();
2223    while (itr.hasNext()) {
2224      TableDescriptor htd = itr.next();
2225      try {
2226        requirePermission(ctx, "getTableDescriptors", htd.getTableName(), null, null,
2227            Action.ADMIN, Action.CREATE);
2228      } catch (AccessDeniedException e) {
2229        itr.remove();
2230      }
2231    }
2232  }
2233
2234  @Override
2235  public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2236      List<TableDescriptor> descriptors, String regex) throws IOException {
2237    // Retains only those which passes authorization checks.
2238    Iterator<TableDescriptor> itr = descriptors.iterator();
2239    while (itr.hasNext()) {
2240      TableDescriptor htd = itr.next();
2241      try {
2242        requireAccess(ctx, "getTableNames", htd.getTableName(), Action.values());
2243      } catch (AccessDeniedException e) {
2244        itr.remove();
2245      }
2246    }
2247  }
2248
2249  @Override
2250  public void preMergeRegions(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2251                              final RegionInfo[] regionsToMerge) throws IOException {
2252    requirePermission(ctx, "mergeRegions", regionsToMerge[0].getTable(), null, null,
2253      Action.ADMIN);
2254  }
2255
2256  @Override
2257  public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2258      throws IOException {
2259    requirePermission(ctx, "preRollLogWriterRequest", Permission.Action.ADMIN);
2260  }
2261
2262  @Override
2263  public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2264      throws IOException { }
2265
2266  @Override
2267  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2268      final String userName, final GlobalQuotaSettings quotas) throws IOException {
2269    requirePermission(ctx, "setUserQuota", Action.ADMIN);
2270  }
2271
2272  @Override
2273  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2274      final String userName, final TableName tableName, final GlobalQuotaSettings quotas)
2275          throws IOException {
2276    requirePermission(ctx, "setUserTableQuota", tableName, null, null, Action.ADMIN);
2277  }
2278
2279  @Override
2280  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2281      final String userName, final String namespace, final GlobalQuotaSettings quotas)
2282          throws IOException {
2283    requirePermission(ctx, "setUserNamespaceQuota", Action.ADMIN);
2284  }
2285
2286  @Override
2287  public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2288      final TableName tableName, final GlobalQuotaSettings quotas) throws IOException {
2289    requirePermission(ctx, "setTableQuota", tableName, null, null, Action.ADMIN);
2290  }
2291
2292  @Override
2293  public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2294      final String namespace, final GlobalQuotaSettings quotas) throws IOException {
2295    requirePermission(ctx, "setNamespaceQuota", Action.ADMIN);
2296  }
2297
2298  @Override
2299  public void preSetRegionServerQuota(ObserverContext<MasterCoprocessorEnvironment> ctx,
2300      final String regionServer, GlobalQuotaSettings quotas) throws IOException {
2301    requirePermission(ctx, "setRegionServerQuota", Action.ADMIN);
2302  }
2303
2304  @Override
2305  public ReplicationEndpoint postCreateReplicationEndPoint(
2306      ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2307    return endpoint;
2308  }
2309
2310  @Override
2311  public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2312      throws IOException {
2313    requirePermission(ctx, "replicateLogEntries", Action.WRITE);
2314  }
2315  
2316  @Override
2317  public void  preClearCompactionQueues(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2318          throws IOException {
2319    requirePermission(ctx, "preClearCompactionQueues", Permission.Action.ADMIN);
2320  }
2321
2322  @Override
2323  public void preAddReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2324      String peerId, ReplicationPeerConfig peerConfig) throws IOException {
2325    requirePermission(ctx, "addReplicationPeer", Action.ADMIN);
2326  }
2327
2328  @Override
2329  public void preRemoveReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2330      String peerId) throws IOException {
2331    requirePermission(ctx, "removeReplicationPeer", Action.ADMIN);
2332  }
2333
2334  @Override
2335  public void preEnableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2336      String peerId) throws IOException {
2337    requirePermission(ctx, "enableReplicationPeer", Action.ADMIN);
2338  }
2339
2340  @Override
2341  public void preDisableReplicationPeer(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2342      String peerId) throws IOException {
2343    requirePermission(ctx, "disableReplicationPeer", Action.ADMIN);
2344  }
2345
2346  @Override
2347  public void preGetReplicationPeerConfig(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2348      String peerId) throws IOException {
2349    requirePermission(ctx, "getReplicationPeerConfig", Action.ADMIN);
2350  }
2351
2352  @Override
2353  public void preUpdateReplicationPeerConfig(
2354      final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
2355      ReplicationPeerConfig peerConfig) throws IOException {
2356    requirePermission(ctx, "updateReplicationPeerConfig", Action.ADMIN);
2357  }
2358
2359  @Override
2360  public void preListReplicationPeers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2361      String regex) throws IOException {
2362    requirePermission(ctx, "listReplicationPeers", Action.ADMIN);
2363  }
2364
2365  @Override
2366  public void preRequestLock(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace,
2367      TableName tableName, RegionInfo[] regionInfos, String description) throws IOException {
2368    // There are operations in the CREATE and ADMIN domain which may require lock, READ
2369    // or WRITE. So for any lock request, we check for these two perms irrespective of lock type.
2370    String reason = String.format("Description=%s", description);
2371    checkLockPermissions(ctx, namespace, tableName, regionInfos, reason);
2372  }
2373
2374  @Override
2375  public void preLockHeartbeat(ObserverContext<MasterCoprocessorEnvironment> ctx,
2376      TableName tableName, String description) throws IOException {
2377    checkLockPermissions(ctx, null, tableName, null, description);
2378  }
2379
2380  @Override
2381  public void preExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2382      throws IOException {
2383    checkSystemOrSuperUser(getActiveUser(ctx));
2384  }
2385
2386  @Override
2387  public void preSwitchRpcThrottle(ObserverContext<MasterCoprocessorEnvironment> ctx,
2388      boolean enable) throws IOException {
2389    requirePermission(ctx, "switchRpcThrottle", Action.ADMIN);
2390  }
2391
2392  @Override
2393  public void preIsRpcThrottleEnabled(ObserverContext<MasterCoprocessorEnvironment> ctx)
2394      throws IOException {
2395    requirePermission(ctx, "isRpcThrottleEnabled", Action.ADMIN);
2396  }
2397
2398  @Override
2399  public void preSwitchExceedThrottleQuota(ObserverContext<MasterCoprocessorEnvironment> ctx,
2400      boolean enable) throws IOException {
2401    requirePermission(ctx, "switchExceedThrottleQuota", Action.ADMIN);
2402  }
2403
2404  /**
2405   * Returns the active user to which authorization checks should be applied.
2406   * If we are in the context of an RPC call, the remote user is used,
2407   * otherwise the currently logged in user is used.
2408   */
2409  private User getActiveUser(ObserverContext<?> ctx) throws IOException {
2410    // for non-rpc handling, fallback to system user
2411    Optional<User> optionalUser = ctx.getCaller();
2412    if (optionalUser.isPresent()) {
2413      return optionalUser.get();
2414    }
2415    return userProvider.getCurrent();
2416  }
2417
2418  /**
2419   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
2420   *   {@link Admin#hasUserPermissions(String, List)} instead.
2421   * @see Admin#hasUserPermissions(String, List)
2422   * @see <a href="https://issues.apache.org/jira/browse/HBASE-22117">HBASE-22117</a>
2423   */
2424  @Deprecated
2425  @Override
2426  public void hasPermission(RpcController controller, HasPermissionRequest request,
2427      RpcCallback<HasPermissionResponse> done) {
2428    // Converts proto to a TablePermission object.
2429    TablePermission tPerm = AccessControlUtil.toTablePermission(request.getTablePermission());
2430    // Check input user name
2431    if (!request.hasUserName()) {
2432      throw new IllegalStateException("Input username cannot be empty");
2433    }
2434    final String inputUserName = request.getUserName().toStringUtf8();
2435    AccessControlProtos.HasPermissionResponse response = null;
2436    try {
2437      User caller = RpcServer.getRequestUser().orElse(null);
2438      List<Permission> permissions = Lists.newArrayList(tPerm);
2439      preHasUserPermissions(caller, inputUserName, permissions);
2440      boolean hasPermission = regionEnv.getConnection().getAdmin()
2441          .hasUserPermissions(inputUserName, permissions).get(0);
2442      response = ResponseConverter.buildHasPermissionResponse(hasPermission);
2443    } catch (IOException ioe) {
2444      ResponseConverter.setControllerException(controller, ioe);
2445    }
2446    done.run(response);
2447  }
2448
2449  @Override
2450  public void preGrant(ObserverContext<MasterCoprocessorEnvironment> ctx,
2451      UserPermission userPermission, boolean mergeExistingPermissions) throws IOException {
2452    preGrantOrRevoke(getActiveUser(ctx), "grant", userPermission);
2453  }
2454
2455  @Override
2456  public void preRevoke(ObserverContext<MasterCoprocessorEnvironment> ctx,
2457      UserPermission userPermission) throws IOException {
2458    preGrantOrRevoke(getActiveUser(ctx), "revoke", userPermission);
2459  }
2460
2461  private void preGrantOrRevoke(User caller, String request, UserPermission userPermission)
2462      throws IOException {
2463    switch (userPermission.getPermission().scope) {
2464      case GLOBAL:
2465        accessChecker.requireGlobalPermission(caller, request, Action.ADMIN, "");
2466        break;
2467      case NAMESPACE:
2468        NamespacePermission namespacePerm = (NamespacePermission) userPermission.getPermission();
2469        accessChecker.requireNamespacePermission(caller, request, namespacePerm.getNamespace(),
2470          null, Action.ADMIN);
2471        break;
2472      case TABLE:
2473        TablePermission tablePerm = (TablePermission) userPermission.getPermission();
2474        accessChecker.requirePermission(caller, request, tablePerm.getTableName(),
2475          tablePerm.getFamily(), tablePerm.getQualifier(), null, Action.ADMIN);
2476        break;
2477      default:
2478    }
2479    if (!Superusers.isSuperUser(caller)) {
2480      accessChecker.performOnSuperuser(request, caller, userPermission.getUser());
2481    }
2482  }
2483
2484  @Override
2485  public void preGetUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
2486      String userName, String namespace, TableName tableName, byte[] family, byte[] qualifier)
2487      throws IOException {
2488    preGetUserPermissions(getActiveUser(ctx), userName, namespace, tableName, family, qualifier);
2489  }
2490
2491  private void preGetUserPermissions(User caller, String userName, String namespace,
2492      TableName tableName, byte[] family, byte[] qualifier) throws IOException {
2493    if (tableName != null) {
2494      accessChecker.requirePermission(caller, "getUserPermissions", tableName, family, qualifier,
2495        userName, Action.ADMIN);
2496    } else if (namespace != null) {
2497      accessChecker.requireNamespacePermission(caller, "getUserPermissions", namespace, userName,
2498        Action.ADMIN);
2499    } else {
2500      accessChecker.requirePermission(caller, "getUserPermissions", userName, Action.ADMIN);
2501    }
2502  }
2503
2504  @Override
2505  public void preHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
2506      String userName, List<Permission> permissions) throws IOException {
2507    preHasUserPermissions(getActiveUser(ctx), userName, permissions);
2508  }
2509
2510  private void preHasUserPermissions(User caller, String userName, List<Permission> permissions)
2511      throws IOException {
2512    String request = "hasUserPermissions";
2513    for (Permission permission : permissions) {
2514      if (!caller.getShortName().equals(userName)) {
2515        // User should have admin privilege if checking permission for other users
2516        if (permission instanceof TablePermission) {
2517          TablePermission tPerm = (TablePermission) permission;
2518          accessChecker.requirePermission(caller, request, tPerm.getTableName(), tPerm.getFamily(),
2519            tPerm.getQualifier(), userName, Action.ADMIN);
2520        } else if (permission instanceof NamespacePermission) {
2521          NamespacePermission nsPerm = (NamespacePermission) permission;
2522          accessChecker.requireNamespacePermission(caller, request, nsPerm.getNamespace(), userName,
2523            Action.ADMIN);
2524        } else {
2525          accessChecker.requirePermission(caller, request, userName, Action.ADMIN);
2526        }
2527      } else {
2528        // User don't need ADMIN privilege for self check.
2529        // Setting action as null in AuthResult to display empty action in audit log
2530        AuthResult result;
2531        if (permission instanceof TablePermission) {
2532          TablePermission tPerm = (TablePermission) permission;
2533          result = AuthResult.allow(request, "Self user validation allowed", caller, null,
2534            tPerm.getTableName(), tPerm.getFamily(), tPerm.getQualifier());
2535        } else if (permission instanceof NamespacePermission) {
2536          NamespacePermission nsPerm = (NamespacePermission) permission;
2537          result = AuthResult.allow(request, "Self user validation allowed", caller, null,
2538            nsPerm.getNamespace());
2539        } else {
2540          result = AuthResult.allow(request, "Self user validation allowed", caller, null, null,
2541            null, null);
2542        }
2543        AccessChecker.logResult(result);
2544      }
2545    }
2546  }
2547}