View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.security.access;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.security.PrivilegedExceptionAction;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.HashMap;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Map.Entry;
31  import java.util.Set;
32  import java.util.TreeMap;
33  import java.util.TreeSet;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.ArrayBackedTag;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellScanner;
41  import org.apache.hadoop.hbase.CellUtil;
42  import org.apache.hadoop.hbase.CompoundConfiguration;
43  import org.apache.hadoop.hbase.CoprocessorEnvironment;
44  import org.apache.hadoop.hbase.DoNotRetryIOException;
45  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
46  import org.apache.hadoop.hbase.HColumnDescriptor;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.HTableDescriptor;
50  import org.apache.hadoop.hbase.KeyValue;
51  import org.apache.hadoop.hbase.KeyValue.Type;
52  import org.apache.hadoop.hbase.MetaTableAccessor;
53  import org.apache.hadoop.hbase.NamespaceDescriptor;
54  import org.apache.hadoop.hbase.ProcedureInfo;
55  import org.apache.hadoop.hbase.ServerName;
56  import org.apache.hadoop.hbase.TableName;
57  import org.apache.hadoop.hbase.Tag;
58  import org.apache.hadoop.hbase.TagRewriteCell;
59  import org.apache.hadoop.hbase.TagUtil;
60  import org.apache.hadoop.hbase.classification.InterfaceAudience;
61  import org.apache.hadoop.hbase.client.Append;
62  import org.apache.hadoop.hbase.client.Delete;
63  import org.apache.hadoop.hbase.client.Durability;
64  import org.apache.hadoop.hbase.client.Get;
65  import org.apache.hadoop.hbase.client.Increment;
66  import org.apache.hadoop.hbase.client.Mutation;
67  import org.apache.hadoop.hbase.client.Put;
68  import org.apache.hadoop.hbase.client.Query;
69  import org.apache.hadoop.hbase.client.Result;
70  import org.apache.hadoop.hbase.client.Scan;
71  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
72  import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
73  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
74  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
75  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
76  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
77  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
78  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
79  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
80  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
81  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
82  import org.apache.hadoop.hbase.filter.CompareFilter;
83  import org.apache.hadoop.hbase.filter.Filter;
84  import org.apache.hadoop.hbase.filter.FilterList;
85  import org.apache.hadoop.hbase.io.hfile.HFile;
86  import org.apache.hadoop.hbase.ipc.RpcServer;
87  import org.apache.hadoop.hbase.master.MasterServices;
88  import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
89  import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
90  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
91  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
92  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
93  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
94  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
95  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
96  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
97  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
98  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
99  import org.apache.hadoop.hbase.regionserver.InternalScanner;
100 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
101 import org.apache.hadoop.hbase.regionserver.Region;
102 import org.apache.hadoop.hbase.regionserver.RegionScanner;
103 import org.apache.hadoop.hbase.regionserver.ScanType;
104 import org.apache.hadoop.hbase.regionserver.ScannerContext;
105 import org.apache.hadoop.hbase.regionserver.Store;
106 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
107 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
108 import org.apache.hadoop.hbase.security.AccessDeniedException;
109 import org.apache.hadoop.hbase.security.Superusers;
110 import org.apache.hadoop.hbase.security.User;
111 import org.apache.hadoop.hbase.security.UserProvider;
112 import org.apache.hadoop.hbase.security.access.Permission.Action;
113 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
114 import org.apache.hadoop.hbase.util.ByteRange;
115 import org.apache.hadoop.hbase.util.Bytes;
116 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
117 import org.apache.hadoop.hbase.util.Pair;
118 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
119 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
120 
121 import com.google.common.collect.ArrayListMultimap;
122 import com.google.common.collect.ImmutableSet;
123 import com.google.common.collect.ListMultimap;
124 import com.google.common.collect.Lists;
125 import com.google.common.collect.MapMaker;
126 import com.google.common.collect.Maps;
127 import com.google.common.collect.Sets;
128 import com.google.protobuf.Message;
129 import com.google.protobuf.RpcCallback;
130 import com.google.protobuf.RpcController;
131 import com.google.protobuf.Service;
132 
133 /**
134  * Provides basic authorization checks for data access and administrative
135  * operations.
136  *
137  * <p>
138  * {@code AccessController} performs authorization checks for HBase operations
139  * based on:
140  * </p>
141  * <ul>
142  *   <li>the identity of the user performing the operation</li>
143  *   <li>the scope over which the operation is performed, in increasing
144  *   specificity: global, table, column family, or qualifier</li>
145  *   <li>the type of action being performed (as mapped to
146  *   {@link Permission.Action} values)</li>
147  * </ul>
148  * <p>
149  * If the authorization check fails, an {@link AccessDeniedException}
150  * will be thrown for the operation.
151  * </p>
152  *
153  * <p>
154  * To perform authorization checks, {@code AccessController} relies on the
155  * RpcServerEngine being loaded to provide
156  * the user identities for remote requests.
157  * </p>
158  *
159  * <p>
160  * The access control lists used for authorization can be manipulated via the
161  * exposed {@link AccessControlService} Interface implementation, and the associated
162  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
163  * commands.
164  * </p>
165  */
166 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
167 public class AccessController extends BaseMasterAndRegionObserver
168     implements RegionServerObserver,
169       AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
170 
171   private static final Log LOG = LogFactory.getLog(AccessController.class);
172 
173   private static final Log AUDITLOG =
174     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
175   private static final String CHECK_COVERING_PERM = "check_covering_perm";
176   private static final String TAG_CHECK_PASSED = "tag_check_passed";
177   private static final byte[] TRUE = Bytes.toBytes(true);
178 
179   TableAuthManager authManager = null;
180 
181   /** flags if we are running on a region of the _acl_ table */
182   boolean aclRegion = false;
183 
184   /** defined only for Endpoint implementation, so it can have way to
185    access region services */
186   private RegionCoprocessorEnvironment regionEnv;
187 
188   /** Mapping of scanner instances to the user who created them */
189   private Map<InternalScanner,String> scannerOwners =
190       new MapMaker().weakKeys().makeMap();
191 
192   private Map<TableName, List<UserPermission>> tableAcls;
193 
194   /** Provider for mapping principal names to Users */
195   private UserProvider userProvider;
196 
197   /** if we are active, usually true, only not true if "hbase.security.authorization"
198    has been set to false in site configuration */
199   boolean authorizationEnabled;
200 
201   /** if we are able to support cell ACLs */
202   boolean cellFeaturesEnabled;
203 
204   /** if we should check EXEC permissions */
205   boolean shouldCheckExecPermission;
206 
207   /** if we should terminate access checks early as soon as table or CF grants
208     allow access; pre-0.98 compatible behavior */
209   boolean compatibleEarlyTermination;
210 
211   /** if we have been successfully initialized */
212   private volatile boolean initialized = false;
213 
214   /** if the ACL table is available, only relevant in the master */
215   private volatile boolean aclTabAvailable = false;
216 
217   public static boolean isAuthorizationSupported(Configuration conf) {
218     return conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
219   }
220 
221   public static boolean isCellAuthorizationSupported(Configuration conf) {
222     return isAuthorizationSupported(conf) &&
223         (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
224   }
225 
226   public Region getRegion() {
227     return regionEnv != null ? regionEnv.getRegion() : null;
228   }
229 
230   public TableAuthManager getAuthManager() {
231     return authManager;
232   }
233 
234   void initialize(RegionCoprocessorEnvironment e) throws IOException {
235     final Region region = e.getRegion();
236     Configuration conf = e.getConfiguration();
237     Map<byte[], ListMultimap<String,TablePermission>> tables =
238         AccessControlLists.loadAll(region);
239     // For each table, write out the table's permissions to the respective
240     // znode for that table.
241     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
242       tables.entrySet()) {
243       byte[] entry = t.getKey();
244       ListMultimap<String,TablePermission> perms = t.getValue();
245       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
246       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
247     }
248     initialized = true;
249   }
250 
251   /**
252    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
253    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
254    * table updates.
255    */
256   void updateACL(RegionCoprocessorEnvironment e,
257       final Map<byte[], List<Cell>> familyMap) {
258     Set<byte[]> entries =
259         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
260     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
261       List<Cell> cells = f.getValue();
262       for (Cell cell: cells) {
263         if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
264             cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
265             AccessControlLists.ACL_LIST_FAMILY.length)) {
266           entries.add(CellUtil.cloneRow(cell));
267         }
268       }
269     }
270     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
271     Configuration conf = regionEnv.getConfiguration();
272     for (byte[] entry: entries) {
273       try {
274         ListMultimap<String,TablePermission> perms =
275           AccessControlLists.getPermissions(conf, entry);
276         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
277         zkw.writeToZookeeper(entry, serialized);
278       } catch (IOException ex) {
279         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
280             ex);
281       }
282     }
283   }
284 
285   /**
286    * Check the current user for authorization to perform a specific action
287    * against the given set of row data.
288    *
289    * <p>Note: Ordering of the authorization checks
290    * has been carefully optimized to short-circuit the most common requests
291    * and minimize the amount of processing required.</p>
292    *
293    * @param permRequest the action being requested
294    * @param e the coprocessor environment
295    * @param families the map of column families to qualifiers present in
296    * the request
297    * @return an authorization result
298    */
299   AuthResult permissionGranted(String request, User user, Action permRequest,
300       RegionCoprocessorEnvironment e,
301       Map<byte [], ? extends Collection<?>> families) {
302     HRegionInfo hri = e.getRegion().getRegionInfo();
303     TableName tableName = hri.getTable();
304 
305     // 1. All users need read access to hbase:meta table.
306     // this is a very common operation, so deal with it quickly.
307     if (hri.isMetaRegion()) {
308       if (permRequest == Action.READ) {
309         return AuthResult.allow(request, "All users allowed", user,
310           permRequest, tableName, families);
311       }
312     }
313 
314     if (user == null) {
315       return AuthResult.deny(request, "No user associated with request!", null,
316         permRequest, tableName, families);
317     }
318 
319     // 2. check for the table-level, if successful we can short-circuit
320     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
321       return AuthResult.allow(request, "Table permission granted", user,
322         permRequest, tableName, families);
323     }
324 
325     // 3. check permissions against the requested families
326     if (families != null && families.size() > 0) {
327       // all families must pass
328       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
329         // a) check for family level access
330         if (authManager.authorize(user, tableName, family.getKey(),
331             permRequest)) {
332           continue;  // family-level permission overrides per-qualifier
333         }
334 
335         // b) qualifier level access can still succeed
336         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
337           if (family.getValue() instanceof Set) {
338             // for each qualifier of the family
339             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
340             for (byte[] qualifier : familySet) {
341               if (!authManager.authorize(user, tableName, family.getKey(),
342                                          qualifier, permRequest)) {
343                 return AuthResult.deny(request, "Failed qualifier check", user,
344                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
345               }
346             }
347           } else if (family.getValue() instanceof List) { // List<KeyValue>
348             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
349             for (KeyValue kv : kvList) {
350               if (!authManager.authorize(user, tableName, family.getKey(),
351                 CellUtil.cloneQualifier(kv), permRequest)) {
352                 return AuthResult.deny(request, "Failed qualifier check", user, permRequest,
353                   tableName, makeFamilyMap(family.getKey(), CellUtil.cloneQualifier(kv)));
354               }
355             }
356           }
357         } else {
358           // no qualifiers and family-level check already failed
359           return AuthResult.deny(request, "Failed family check", user, permRequest,
360               tableName, makeFamilyMap(family.getKey(), null));
361         }
362       }
363 
364       // all family checks passed
365       return AuthResult.allow(request, "All family checks passed", user, permRequest,
366           tableName, families);
367     }
368 
369     // 4. no families to check and table level access failed
370     return AuthResult.deny(request, "No families to check and table permission failed",
371         user, permRequest, tableName, families);
372   }
373 
374   /**
375    * Check the current user for authorization to perform a specific action
376    * against the given set of row data.
377    * @param opType the operation type
378    * @param user the user
379    * @param e the coprocessor environment
380    * @param families the map of column families to qualifiers present in
381    * the request
382    * @param actions the desired actions
383    * @return an authorization result
384    */
385   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
386       Map<byte [], ? extends Collection<?>> families, Action... actions) {
387     AuthResult result = null;
388     for (Action action: actions) {
389       result = permissionGranted(opType.toString(), user, action, e, families);
390       if (!result.isAllowed()) {
391         return result;
392       }
393     }
394     return result;
395   }
396 
397   private void logResult(AuthResult result) {
398     if (AUDITLOG.isTraceEnabled()) {
399       InetAddress remoteAddr = RpcServer.getRemoteAddress();
400       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
401           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
402           "; reason: " + result.getReason() +
403           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
404           "; request: " + result.getRequest() +
405           "; context: " + result.toContextString());
406     }
407   }
408 
409   /**
410    * Returns the active user to which authorization checks should be applied.
411    * If we are in the context of an RPC call, the remote user is used,
412    * otherwise the currently logged in user is used.
413    */
414   private User getActiveUser() throws IOException {
415     User user = RpcServer.getRequestUser();
416     if (user == null) {
417       // for non-rpc handling, fallback to system user
418       user = userProvider.getCurrent();
419     }
420     return user;
421   }
422 
423   /**
424    * Authorizes that the current user has any of the given permissions for the
425    * given table, column family and column qualifier.
426    * @param tableName Table requested
427    * @param family Column family requested
428    * @param qualifier Column qualifier requested
429    * @throws IOException if obtaining the current user fails
430    * @throws AccessDeniedException if user has no authorization
431    */
432   private void requirePermission(String request, TableName tableName, byte[] family,
433       byte[] qualifier, Action... permissions) throws IOException {
434     User user = getActiveUser();
435     AuthResult result = null;
436 
437     for (Action permission : permissions) {
438       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
439         result = AuthResult.allow(request, "Table permission granted", user,
440                                   permission, tableName, family, qualifier);
441         break;
442       } else {
443         // rest of the world
444         result = AuthResult.deny(request, "Insufficient permissions", user,
445                                  permission, tableName, family, qualifier);
446       }
447     }
448     logResult(result);
449     if (authorizationEnabled && !result.isAllowed()) {
450       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
451     }
452   }
453 
454   /**
455    * Authorizes that the current user has any of the given permissions for the
456    * given table, column family and column qualifier.
457    * @param tableName Table requested
458    * @param family Column family param
459    * @param qualifier Column qualifier param
460    * @throws IOException if obtaining the current user fails
461    * @throws AccessDeniedException if user has no authorization
462    */
463   private void requireTablePermission(String request, TableName tableName, byte[] family,
464       byte[] qualifier, Action... permissions) throws IOException {
465     User user = getActiveUser();
466     AuthResult result = null;
467 
468     for (Action permission : permissions) {
469       if (authManager.authorize(user, tableName, null, null, permission)) {
470         result = AuthResult.allow(request, "Table permission granted", user,
471             permission, tableName, null, null);
472         result.getParams().setFamily(family).setQualifier(qualifier);
473         break;
474       } else {
475         // rest of the world
476         result = AuthResult.deny(request, "Insufficient permissions", user,
477             permission, tableName, family, qualifier);
478         result.getParams().setFamily(family).setQualifier(qualifier);
479       }
480     }
481     logResult(result);
482     if (authorizationEnabled && !result.isAllowed()) {
483       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
484     }
485   }
486 
487   /**
488    * Authorizes that the current user has any of the given permissions to access the table.
489    *
490    * @param tableName Table requested
491    * @param permissions Actions being requested
492    * @throws IOException if obtaining the current user fails
493    * @throws AccessDeniedException if user has no authorization
494    */
495   private void requireAccess(String request, TableName tableName,
496       Action... permissions) throws IOException {
497     User user = getActiveUser();
498     AuthResult result = null;
499 
500     for (Action permission : permissions) {
501       if (authManager.hasAccess(user, tableName, permission)) {
502         result = AuthResult.allow(request, "Table permission granted", user,
503                                   permission, tableName, null, null);
504         break;
505       } else {
506         // rest of the world
507         result = AuthResult.deny(request, "Insufficient permissions", user,
508                                  permission, tableName, null, null);
509       }
510     }
511     logResult(result);
512     if (authorizationEnabled && !result.isAllowed()) {
513       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
514     }
515   }
516 
517   /**
518    * Authorizes that the current user has global privileges for the given action.
519    * @param perm The action being requested
520    * @throws IOException if obtaining the current user fails
521    * @throws AccessDeniedException if authorization is denied
522    */
523   private void requirePermission(String request, Action perm) throws IOException {
524     requireGlobalPermission(request, perm, null, null);
525   }
526 
527   /**
528    * Checks that the user has the given global permission. The generated
529    * audit log message will contain context information for the operation
530    * being authorized, based on the given parameters.
531    * @param perm Action being requested
532    * @param tableName Affected table name.
533    * @param familyMap Affected column families.
534    */
535   private void requireGlobalPermission(String request, Action perm, TableName tableName,
536       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
537     User user = getActiveUser();
538     AuthResult result = null;
539     if (authManager.authorize(user, perm)) {
540       result = AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap);
541       result.getParams().setTableName(tableName).setFamilies(familyMap);
542       logResult(result);
543     } else {
544       result = AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap);
545       result.getParams().setTableName(tableName).setFamilies(familyMap);
546       logResult(result);
547       if (authorizationEnabled) {
548         throw new AccessDeniedException("Insufficient permissions for user '" +
549           (user != null ? user.getShortName() : "null") +"' (global, action=" +
550           perm.toString() + ")");
551       }
552     }
553   }
554 
555   /**
556    * Checks that the user has the given global permission. The generated
557    * audit log message will contain context information for the operation
558    * being authorized, based on the given parameters.
559    * @param perm Action being requested
560    * @param namespace
561    */
562   private void requireGlobalPermission(String request, Action perm,
563                                        String namespace) throws IOException {
564     User user = getActiveUser();
565     AuthResult authResult = null;
566     if (authManager.authorize(user, perm)) {
567       authResult = AuthResult.allow(request, "Global check allowed", user, perm, null);
568       authResult.getParams().setNamespace(namespace);
569       logResult(authResult);
570     } else {
571       authResult = AuthResult.deny(request, "Global check failed", user, perm, null);
572       authResult.getParams().setNamespace(namespace);
573       logResult(authResult);
574       if (authorizationEnabled) {
575         throw new AccessDeniedException("Insufficient permissions for user '" +
576           (user != null ? user.getShortName() : "null") +"' (global, action=" +
577           perm.toString() + ")");
578       }
579     }
580   }
581 
582   /**
583    * Checks that the user has the given global or namespace permission.
584    * @param namespace
585    * @param permissions Actions being requested
586    */
587   public void requireNamespacePermission(String request, String namespace,
588       Action... permissions) throws IOException {
589     User user = getActiveUser();
590     AuthResult result = null;
591 
592     for (Action permission : permissions) {
593       if (authManager.authorize(user, namespace, permission)) {
594         result = AuthResult.allow(request, "Namespace permission granted",
595             user, permission, namespace);
596         break;
597       } else {
598         // rest of the world
599         result = AuthResult.deny(request, "Insufficient permissions", user,
600             permission, namespace);
601       }
602     }
603     logResult(result);
604     if (authorizationEnabled && !result.isAllowed()) {
605       throw new AccessDeniedException("Insufficient permissions "
606           + result.toContextString());
607     }
608   }
609 
610   /**
611    * Checks that the user has the given global or namespace permission.
612    * @param namespace
613    * @param permissions Actions being requested
614    */
615   public void requireNamespacePermission(String request, String namespace, TableName tableName,
616       Map<byte[], ? extends Collection<byte[]>> familyMap, Action... permissions)
617       throws IOException {
618     User user = getActiveUser();
619     AuthResult result = null;
620 
621     for (Action permission : permissions) {
622       if (authManager.authorize(user, namespace, permission)) {
623         result = AuthResult.allow(request, "Namespace permission granted",
624             user, permission, namespace);
625         result.getParams().setTableName(tableName).setFamilies(familyMap);
626         break;
627       } else {
628         // rest of the world
629         result = AuthResult.deny(request, "Insufficient permissions", user,
630             permission, namespace);
631         result.getParams().setTableName(tableName).setFamilies(familyMap);
632       }
633     }
634     logResult(result);
635     if (authorizationEnabled && !result.isAllowed()) {
636       throw new AccessDeniedException("Insufficient permissions "
637           + result.toContextString());
638     }
639   }
640 
641   /**
642    * Returns <code>true</code> if the current user is allowed the given action
643    * over at least one of the column qualifiers in the given column families.
644    */
645   private boolean hasFamilyQualifierPermission(User user,
646       Action perm,
647       RegionCoprocessorEnvironment env,
648       Map<byte[], ? extends Collection<byte[]>> familyMap)
649     throws IOException {
650     HRegionInfo hri = env.getRegion().getRegionInfo();
651     TableName tableName = hri.getTable();
652 
653     if (user == null) {
654       return false;
655     }
656 
657     if (familyMap != null && familyMap.size() > 0) {
658       // at least one family must be allowed
659       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
660           familyMap.entrySet()) {
661         if (family.getValue() != null && !family.getValue().isEmpty()) {
662           for (byte[] qualifier : family.getValue()) {
663             if (authManager.matchPermission(user, tableName,
664                 family.getKey(), qualifier, perm)) {
665               return true;
666             }
667           }
668         } else {
669           if (authManager.matchPermission(user, tableName, family.getKey(),
670               perm)) {
671             return true;
672           }
673         }
674       }
675     } else if (LOG.isDebugEnabled()) {
676       LOG.debug("Empty family map passed for permission check");
677     }
678 
679     return false;
680   }
681 
682   private enum OpType {
683     GET("get"),
684     EXISTS("exists"),
685     SCAN("scan"),
686     PUT("put"),
687     DELETE("delete"),
688     CHECK_AND_PUT("checkAndPut"),
689     CHECK_AND_DELETE("checkAndDelete"),
690     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
691     APPEND("append"),
692     INCREMENT("increment");
693 
694     private String type;
695 
696     private OpType(String type) {
697       this.type = type;
698     }
699 
700     @Override
701     public String toString() {
702       return type;
703     }
704   }
705 
706   /**
707    * Determine if cell ACLs covered by the operation grant access. This is expensive.
708    * @return false if cell ACLs failed to grant access, true otherwise
709    * @throws IOException
710    */
711   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
712       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
713       throws IOException {
714     if (!cellFeaturesEnabled) {
715       return false;
716     }
717     long cellGrants = 0;
718     User user = getActiveUser();
719     long latestCellTs = 0;
720     Get get = new Get(row);
721     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
722     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
723     // version. We have to get every cell version and check its TS against the TS asked for in
724     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
725     // consider only one such passing cell. In case of Delete we have to consider all the cell
726     // versions under this passing version. When Delete Mutation contains columns which are a
727     // version delete just consider only one version for those column cells.
728     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
729     if (considerCellTs) {
730       get.setMaxVersions();
731     } else {
732       get.setMaxVersions(1);
733     }
734     boolean diffCellTsFromOpTs = false;
735     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
736       byte[] col = entry.getKey();
737       // TODO: HBASE-7114 could possibly unify the collection type in family
738       // maps so we would not need to do this
739       if (entry.getValue() instanceof Set) {
740         Set<byte[]> set = (Set<byte[]>)entry.getValue();
741         if (set == null || set.isEmpty()) {
742           get.addFamily(col);
743         } else {
744           for (byte[] qual: set) {
745             get.addColumn(col, qual);
746           }
747         }
748       } else if (entry.getValue() instanceof List) {
749         List<Cell> list = (List<Cell>)entry.getValue();
750         if (list == null || list.isEmpty()) {
751           get.addFamily(col);
752         } else {
753           // In case of family delete, a Cell will be added into the list with Qualifier as null.
754           for (Cell cell : list) {
755             if (cell.getQualifierLength() == 0
756                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
757                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
758               get.addFamily(col);
759             } else {
760               get.addColumn(col, CellUtil.cloneQualifier(cell));
761             }
762             if (considerCellTs) {
763               long cellTs = cell.getTimestamp();
764               latestCellTs = Math.max(latestCellTs, cellTs);
765               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
766             }
767           }
768         }
769       } else if (entry.getValue() == null) {
770         get.addFamily(col);
771       } else {
772         throw new RuntimeException("Unhandled collection type " +
773           entry.getValue().getClass().getName());
774       }
775     }
776     // We want to avoid looking into the future. So, if the cells of the
777     // operation specify a timestamp, or the operation itself specifies a
778     // timestamp, then we use the maximum ts found. Otherwise, we bound
779     // the Get to the current server time. We add 1 to the timerange since
780     // the upper bound of a timerange is exclusive yet we need to examine
781     // any cells found there inclusively.
782     long latestTs = Math.max(opTs, latestCellTs);
783     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
784       latestTs = EnvironmentEdgeManager.currentTime();
785     }
786     get.setTimeRange(0, latestTs + 1);
787     // In case of Put operation we set to read all versions. This was done to consider the case
788     // where columns are added with TS other than the Mutation TS. But normally this wont be the
789     // case with Put. There no need to get all versions but get latest version only.
790     if (!diffCellTsFromOpTs && request == OpType.PUT) {
791       get.setMaxVersions(1);
792     }
793     if (LOG.isTraceEnabled()) {
794       LOG.trace("Scanning for cells with " + get);
795     }
796     // This Map is identical to familyMap. The key is a BR rather than byte[].
797     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
798     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
799     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
800     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
801       if (entry.getValue() instanceof List) {
802         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
803       }
804     }
805     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
806     List<Cell> cells = Lists.newArrayList();
807     Cell prevCell = null;
808     ByteRange curFam = new SimpleMutableByteRange();
809     boolean curColAllVersions = (request == OpType.DELETE);
810     long curColCheckTs = opTs;
811     boolean foundColumn = false;
812     try {
813       boolean more = false;
814       ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
815 
816       do {
817         cells.clear();
818         // scan with limit as 1 to hold down memory use on wide rows
819         more = scanner.next(cells, scannerContext);
820         for (Cell cell: cells) {
821           if (LOG.isTraceEnabled()) {
822             LOG.trace("Found cell " + cell);
823           }
824           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
825           if (colChange) foundColumn = false;
826           prevCell = cell;
827           if (!curColAllVersions && foundColumn) {
828             continue;
829           }
830           if (colChange && considerCellTs) {
831             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
832             List<Cell> cols = familyMap1.get(curFam);
833             for (Cell col : cols) {
834               // null/empty qualifier is used to denote a Family delete. The TS and delete type
835               // associated with this is applicable for all columns within the family. That is
836               // why the below (col.getQualifierLength() == 0) check.
837               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
838                   || CellUtil.matchingQualifier(cell, col)) {
839                 byte type = col.getTypeByte();
840                 if (considerCellTs) {
841                   curColCheckTs = col.getTimestamp();
842                 }
843                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
844                 // a version delete for a column no need to check all the covering cells within
845                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
846                 // One version delete types are Delete/DeleteFamilyVersion
847                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
848                     || (KeyValue.Type.DeleteFamily.getCode() == type);
849                 break;
850               }
851             }
852           }
853           if (cell.getTimestamp() > curColCheckTs) {
854             // Just ignore this cell. This is not a covering cell.
855             continue;
856           }
857           foundColumn = true;
858           for (Action action: actions) {
859             // Are there permissions for this user for the cell?
860             if (!authManager.authorize(user, getTableName(e), cell, action)) {
861               // We can stop if the cell ACL denies access
862               return false;
863             }
864           }
865           cellGrants++;
866         }
867       } while (more);
868     } catch (AccessDeniedException ex) {
869       throw ex;
870     } catch (IOException ex) {
871       LOG.error("Exception while getting cells to calculate covering permission", ex);
872     } finally {
873       scanner.close();
874     }
875     // We should not authorize unless we have found one or more cell ACLs that
876     // grant access. This code is used to check for additional permissions
877     // after no table or CF grants are found.
878     return cellGrants > 0;
879   }
880 
881   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
882     // Iterate over the entries in the familyMap, replacing the cells therein
883     // with new cells including the ACL data
884     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
885       List<Cell> newCells = Lists.newArrayList();
886       for (Cell cell: e.getValue()) {
887         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
888         List<Tag> tags = new ArrayList<Tag>();
889         tags.add(new ArrayBackedTag(AccessControlLists.ACL_TAG_TYPE, perms));
890         Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell);
891         while (tagIterator.hasNext()) {
892           tags.add(tagIterator.next());
893         }
894         newCells.add(new TagRewriteCell(cell, TagUtil.fromList(tags)));
895       }
896       // This is supposed to be safe, won't CME
897       e.setValue(newCells);
898     }
899   }
900 
901   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
902   // type is reserved and should not be explicitly set by user.
903   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
904     // No need to check if we're not going to throw
905     if (!authorizationEnabled) {
906       m.setAttribute(TAG_CHECK_PASSED, TRUE);
907       return;
908     }
909     // Superusers are allowed to store cells unconditionally.
910     if (Superusers.isSuperUser(user)) {
911       m.setAttribute(TAG_CHECK_PASSED, TRUE);
912       return;
913     }
914     // We already checked (prePut vs preBatchMutation)
915     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
916       return;
917     }
918     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
919       Iterator<Tag> tagsItr = CellUtil.tagsIterator(cellScanner.current());
920       while (tagsItr.hasNext()) {
921         if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
922           throw new AccessDeniedException("Mutation contains cell with reserved type tag");
923         }
924       }
925     }
926     m.setAttribute(TAG_CHECK_PASSED, TRUE);
927   }
928 
929   /* ---- MasterObserver implementation ---- */
930   @Override
931   public void start(CoprocessorEnvironment env) throws IOException {
932     CompoundConfiguration conf = new CompoundConfiguration();
933     conf.add(env.getConfiguration());
934 
935     authorizationEnabled = isAuthorizationSupported(conf);
936     if (!authorizationEnabled) {
937       LOG.warn("The AccessController has been loaded with authorization checks disabled.");
938     }
939 
940     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
941       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
942 
943     cellFeaturesEnabled = (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
944     if (!cellFeaturesEnabled) {
945       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
946           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
947           + " accordingly.");
948     }
949 
950     ZooKeeperWatcher zk = null;
951     if (env instanceof MasterCoprocessorEnvironment) {
952       // if running on HMaster
953       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
954       zk = mEnv.getMasterServices().getZooKeeper();
955     } else if (env instanceof RegionServerCoprocessorEnvironment) {
956       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
957       zk = rsEnv.getRegionServerServices().getZooKeeper();
958     } else if (env instanceof RegionCoprocessorEnvironment) {
959       // if running at region
960       regionEnv = (RegionCoprocessorEnvironment) env;
961       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
962       zk = regionEnv.getRegionServerServices().getZooKeeper();
963       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
964         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
965     }
966 
967     // set the user-provider.
968     this.userProvider = UserProvider.instantiate(env.getConfiguration());
969 
970     // If zk is null or IOException while obtaining auth manager,
971     // throw RuntimeException so that the coprocessor is unloaded.
972     if (zk != null) {
973       try {
974         this.authManager = TableAuthManager.getOrCreate(zk, env.getConfiguration());
975       } catch (IOException ioe) {
976         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
977       }
978     } else {
979       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
980     }
981 
982     tableAcls = new MapMaker().weakValues().makeMap();
983   }
984 
985   @Override
986   public void stop(CoprocessorEnvironment env) {
987     if (this.authManager != null) {
988       TableAuthManager.release(authManager);
989     }
990   }
991 
992   @Override
993   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
994       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
995     Set<byte[]> families = desc.getFamiliesKeys();
996     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
997     for (byte[] family: families) {
998       familyMap.put(family, null);
999     }
1000     requireNamespacePermission("createTable", desc.getTableName().getNamespaceAsString(),
1001         desc.getTableName(), familyMap, Action.CREATE);
1002   }
1003 
1004   @Override
1005   public void postCreateTableHandler(final ObserverContext<MasterCoprocessorEnvironment> c,
1006       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
1007     // When AC is used, it should be configured as the 1st CP.
1008     // In Master, the table operations like create, are handled by a Thread pool but the max size
1009     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
1010     // sequentially only.
1011     // Related code in HMaster#startServiceThreads
1012     // {code}
1013     //   // We depend on there being only one instance of this executor running
1014     //   // at a time. To do concurrency, would need fencing of enable/disable of
1015     //   // tables.
1016     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1017     // {code}
1018     // In future if we change this pool to have more threads, then there is a chance for thread,
1019     // creating acl table, getting delayed and by that time another table creation got over and
1020     // this hook is getting called. In such a case, we will need a wait logic here which will
1021     // wait till the acl table is created.
1022     if (AccessControlLists.isAclTable(desc)) {
1023       this.aclTabAvailable = true;
1024     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
1025       if (!aclTabAvailable) {
1026         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
1027             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
1028             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
1029       } else {
1030         String owner = desc.getOwnerString();
1031         // default the table owner to current user, if not specified.
1032         if (owner == null)
1033           owner = getActiveUser().getShortName();
1034         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1035             desc.getTableName(), null, Action.values());
1036         // switch to the real hbase master user for doing the RPC on the ACL table
1037         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1038           @Override
1039           public Void run() throws Exception {
1040             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
1041                 userperm);
1042             return null;
1043           }
1044         });
1045       }
1046     }
1047   }
1048 
1049   @Override
1050   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1051       throws IOException {
1052     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1053   }
1054 
1055   @Override
1056   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
1057       final TableName tableName) throws IOException {
1058     final Configuration conf = c.getEnvironment().getConfiguration();
1059     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1060       @Override
1061       public Void run() throws Exception {
1062         AccessControlLists.removeTablePermissions(conf, tableName);
1063         return null;
1064       }
1065     });
1066     this.authManager.getZKPermissionWatcher().deleteTableACLNode(tableName);
1067   }
1068 
1069   @Override
1070   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
1071       final TableName tableName) throws IOException {
1072     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1073 
1074     final Configuration conf = c.getEnvironment().getConfiguration();
1075     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1076       @Override
1077       public Void run() throws Exception {
1078         List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
1079         if (acls != null) {
1080           tableAcls.put(tableName, acls);
1081         }
1082         return null;
1083       }
1084     });
1085   }
1086 
1087   @Override
1088   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
1089       final TableName tableName) throws IOException {
1090     final Configuration conf = ctx.getEnvironment().getConfiguration();
1091     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1092       @Override
1093       public Void run() throws Exception {
1094         List<UserPermission> perms = tableAcls.get(tableName);
1095         if (perms != null) {
1096           for (UserPermission perm : perms) {
1097             AccessControlLists.addUserPermission(conf, perm);
1098           }
1099         }
1100         tableAcls.remove(tableName);
1101         return null;
1102       }
1103     });
1104   }
1105 
1106   @Override
1107   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1108       HTableDescriptor htd) throws IOException {
1109     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1110   }
1111 
1112   @Override
1113   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
1114       TableName tableName, final HTableDescriptor htd) throws IOException {
1115     final Configuration conf = c.getEnvironment().getConfiguration();
1116     // default the table owner to current user, if not specified.
1117     final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
1118       getActiveUser().getShortName();
1119     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1120       @Override
1121       public Void run() throws Exception {
1122         UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1123             htd.getTableName(), null, Action.values());
1124         AccessControlLists.addUserPermission(conf, userperm);
1125         return null;
1126       }
1127     });
1128   }
1129 
1130   @Override
1131   public void preAddColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1132                                  TableName tableName, HColumnDescriptor columnFamily)
1133       throws IOException {
1134     requireTablePermission("addColumn", tableName, columnFamily.getName(), null, Action.ADMIN,
1135                            Action.CREATE);
1136   }
1137 
1138   @Override
1139   public void preModifyColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1140                                     TableName tableName, HColumnDescriptor columnFamily)
1141       throws IOException {
1142     requirePermission("modifyColumn", tableName, columnFamily.getName(), null, Action.ADMIN,
1143       Action.CREATE);
1144   }
1145 
1146   @Override
1147   public void preDeleteColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1148                                     TableName tableName, byte[] columnFamily) throws IOException {
1149     requirePermission("deleteColumn", tableName, columnFamily, null, Action.ADMIN, Action.CREATE);
1150   }
1151 
1152   @Override
1153   public void postDeleteColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1154       final TableName tableName, final byte[] columnFamily) throws IOException {
1155     final Configuration conf = ctx.getEnvironment().getConfiguration();
1156     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1157       @Override
1158       public Void run() throws Exception {
1159         AccessControlLists.removeTablePermissions(conf, tableName, columnFamily);
1160         return null;
1161       }
1162     });
1163   }
1164 
1165   @Override
1166   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1167       throws IOException {
1168     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1169   }
1170 
1171   @Override
1172   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1173       throws IOException {
1174     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1175       // We have to unconditionally disallow disable of the ACL table when we are installed,
1176       // even if not enforcing authorizations. We are still allowing grants and revocations,
1177       // checking permissions and logging audit messages, etc. If the ACL table is not
1178       // available we will fail random actions all over the place.
1179       throw new AccessDeniedException("Not allowed to disable "
1180           + AccessControlLists.ACL_TABLE_NAME + " table with AccessController installed");
1181     }
1182     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1183   }
1184 
1185   @Override
1186   public void preAbortProcedure(
1187       ObserverContext<MasterCoprocessorEnvironment> ctx,
1188       final ProcedureExecutor<MasterProcedureEnv> procEnv,
1189       final long procId) throws IOException {
1190     if (!procEnv.isProcedureOwner(procId, getActiveUser())) {
1191       // If the user is not the procedure owner, then we should further probe whether
1192       // he can abort the procedure.
1193       requirePermission("abortProcedure", Action.ADMIN);
1194     }
1195   }
1196 
1197   @Override
1198   public void postAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx)
1199       throws IOException {
1200     // There is nothing to do at this time after the procedure abort request was sent.
1201   }
1202 
1203   @Override
1204   public void preListProcedures(ObserverContext<MasterCoprocessorEnvironment> ctx)
1205       throws IOException {
1206     // We are delegating the authorization check to postListProcedures as we don't have
1207     // any concrete set of procedures to work with
1208   }
1209 
1210   @Override
1211   public void postListProcedures(
1212       ObserverContext<MasterCoprocessorEnvironment> ctx,
1213       List<ProcedureInfo> procInfoList) throws IOException {
1214     if (procInfoList.isEmpty()) {
1215       return;
1216     }
1217 
1218     // Retains only those which passes authorization checks, as the checks weren't done as part
1219     // of preListProcedures.
1220     Iterator<ProcedureInfo> itr = procInfoList.iterator();
1221     User user = getActiveUser();
1222     while (itr.hasNext()) {
1223       ProcedureInfo procInfo = itr.next();
1224       try {
1225         if (!ProcedureInfo.isProcedureOwner(procInfo, user)) {
1226           // If the user is not the procedure owner, then we should further probe whether
1227           // he can see the procedure.
1228           requirePermission("listProcedures", Action.ADMIN);
1229         }
1230       } catch (AccessDeniedException e) {
1231         itr.remove();
1232       }
1233     }
1234   }
1235 
1236   @Override
1237   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1238       ServerName srcServer, ServerName destServer) throws IOException {
1239     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1240   }
1241 
1242   @Override
1243   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1244       throws IOException {
1245     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1246   }
1247 
1248   @Override
1249   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1250       boolean force) throws IOException {
1251     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1252   }
1253 
1254   @Override
1255   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1256       HRegionInfo regionInfo) throws IOException {
1257     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1258   }
1259 
1260   @Override
1261   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1262       throws IOException {
1263     requirePermission("balance", Action.ADMIN);
1264   }
1265 
1266   @Override
1267   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1268       boolean newValue) throws IOException {
1269     requirePermission("balanceSwitch", Action.ADMIN);
1270     return newValue;
1271   }
1272 
1273   @Override
1274   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1275       throws IOException {
1276     requirePermission("shutdown", Action.ADMIN);
1277   }
1278 
1279   @Override
1280   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1281       throws IOException {
1282     requirePermission("stopMaster", Action.ADMIN);
1283   }
1284 
1285   @Override
1286   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1287       throws IOException {
1288     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1289       .getConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1290       // initialize the ACL storage table
1291       AccessControlLists.createACLTable(ctx.getEnvironment().getMasterServices());
1292     } else {
1293       aclTabAvailable = true;
1294     }
1295   }
1296 
1297   @Override
1298   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1299       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1300       throws IOException {
1301     requirePermission("snapshot", hTableDescriptor.getTableName(), null, null,
1302       Permission.Action.ADMIN);
1303   }
1304 
1305   @Override
1306   public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1307       final SnapshotDescription snapshot) throws IOException {
1308     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1309       // list it, if user is the owner of snapshot
1310       // TODO: We are not logging this for audit
1311     } else {
1312       requirePermission("listSnapshot", Action.ADMIN);
1313     }
1314   }
1315 
1316   @Override
1317   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1318       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1319       throws IOException {
1320     requirePermission("clone", Action.ADMIN);
1321   }
1322 
1323   @Override
1324   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1325       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1326       throws IOException {
1327     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1328       requirePermission("restoreSnapshot", hTableDescriptor.getTableName(), null, null,
1329         Permission.Action.ADMIN);
1330     } else {
1331       requirePermission("restoreSnapshot", Action.ADMIN);
1332     }
1333   }
1334 
1335   @Override
1336   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1337       final SnapshotDescription snapshot) throws IOException {
1338     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1339       // Snapshot owner is allowed to delete the snapshot
1340       // TODO: We are not logging this for audit
1341     } else {
1342       requirePermission("deleteSnapshot", Action.ADMIN);
1343     }
1344   }
1345 
1346   @Override
1347   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1348       NamespaceDescriptor ns) throws IOException {
1349     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1350   }
1351 
1352   @Override
1353   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1354       throws IOException {
1355     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1356   }
1357 
1358   @Override
1359   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1360       final String namespace) throws IOException {
1361     final Configuration conf = ctx.getEnvironment().getConfiguration();
1362     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1363       @Override
1364       public Void run() throws Exception {
1365         AccessControlLists.removeNamespacePermissions(conf, namespace);
1366         return null;
1367       }
1368     });
1369     this.authManager.getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
1370     LOG.info(namespace + " entry deleted in " + AccessControlLists.ACL_TABLE_NAME + " table.");
1371   }
1372 
1373   @Override
1374   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1375       NamespaceDescriptor ns) throws IOException {
1376     // We require only global permission so that
1377     // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1378     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1379   }
1380 
1381   @Override
1382   public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1383       throws IOException {
1384     requireNamespacePermission("getNamespaceDescriptor", namespace, Action.ADMIN);
1385   }
1386 
1387   @Override
1388   public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1389       List<NamespaceDescriptor> descriptors) throws IOException {
1390     // Retains only those which passes authorization checks, as the checks weren't done as part
1391     // of preGetTableDescriptors.
1392     Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1393     while (itr.hasNext()) {
1394       NamespaceDescriptor desc = itr.next();
1395       try {
1396         requireNamespacePermission("listNamespaces", desc.getName(), Action.ADMIN);
1397       } catch (AccessDeniedException e) {
1398         itr.remove();
1399       }
1400     }
1401   }
1402 
1403   @Override
1404   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1405       final TableName tableName) throws IOException {
1406     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1407   }
1408 
1409   /* ---- RegionObserver implementation ---- */
1410 
1411   @Override
1412   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1413       throws IOException {
1414     RegionCoprocessorEnvironment env = e.getEnvironment();
1415     final Region region = env.getRegion();
1416     if (region == null) {
1417       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1418     } else {
1419       HRegionInfo regionInfo = region.getRegionInfo();
1420       if (regionInfo.getTable().isSystemTable()) {
1421         checkSystemOrSuperUser();
1422       } else {
1423         requirePermission("preOpen", Action.ADMIN);
1424       }
1425     }
1426   }
1427 
1428   @Override
1429   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1430     RegionCoprocessorEnvironment env = c.getEnvironment();
1431     final Region region = env.getRegion();
1432     if (region == null) {
1433       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1434       return;
1435     }
1436     if (AccessControlLists.isAclRegion(region)) {
1437       aclRegion = true;
1438       // When this region is under recovering state, initialize will be handled by postLogReplay
1439       if (!region.isRecovering()) {
1440         try {
1441           initialize(env);
1442         } catch (IOException ex) {
1443           // if we can't obtain permissions, it's better to fail
1444           // than perform checks incorrectly
1445           throw new RuntimeException("Failed to initialize permissions cache", ex);
1446         }
1447       }
1448     } else {
1449       initialized = true;
1450     }
1451   }
1452 
1453   @Override
1454   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1455     if (aclRegion) {
1456       try {
1457         initialize(c.getEnvironment());
1458       } catch (IOException ex) {
1459         // if we can't obtain permissions, it's better to fail
1460         // than perform checks incorrectly
1461         throw new RuntimeException("Failed to initialize permissions cache", ex);
1462       }
1463     }
1464   }
1465 
1466   @Override
1467   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1468     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1469         Action.CREATE);
1470   }
1471 
1472   @Override
1473   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1474     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1475   }
1476 
1477   @Override
1478   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1479       byte[] splitRow) throws IOException {
1480     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1481   }
1482 
1483   @Override
1484   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1485       final Store store, final InternalScanner scanner, final ScanType scanType)
1486           throws IOException {
1487     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1488         Action.CREATE);
1489     return scanner;
1490   }
1491 
1492   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1493       final Query query, OpType opType) throws IOException {
1494     Filter filter = query.getFilter();
1495     // Don't wrap an AccessControlFilter
1496     if (filter != null && filter instanceof AccessControlFilter) {
1497       return;
1498     }
1499     User user = getActiveUser();
1500     RegionCoprocessorEnvironment env = c.getEnvironment();
1501     Map<byte[],? extends Collection<byte[]>> families = null;
1502     switch (opType) {
1503     case GET:
1504     case EXISTS:
1505       families = ((Get)query).getFamilyMap();
1506       break;
1507     case SCAN:
1508       families = ((Scan)query).getFamilyMap();
1509       break;
1510     default:
1511       throw new RuntimeException("Unhandled operation " + opType);
1512     }
1513     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1514     Region region = getRegion(env);
1515     TableName table = getTableName(region);
1516     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1517     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1518       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1519     }
1520     if (!authResult.isAllowed()) {
1521       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1522         // Old behavior: Scan with only qualifier checks if we have partial
1523         // permission. Backwards compatible behavior is to throw an
1524         // AccessDeniedException immediately if there are no grants for table
1525         // or CF or CF+qual. Only proceed with an injected filter if there are
1526         // grants for qualifiers. Otherwise we will fall through below and log
1527         // the result and throw an ADE. We may end up checking qualifier
1528         // grants three times (permissionGranted above, here, and in the
1529         // filter) but that's the price of backwards compatibility.
1530         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1531           authResult.setAllowed(true);
1532           authResult.setReason("Access allowed with filter");
1533           // Only wrap the filter if we are enforcing authorizations
1534           if (authorizationEnabled) {
1535             Filter ourFilter = new AccessControlFilter(authManager, user, table,
1536               AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1537               cfVsMaxVersions);
1538             // wrap any existing filter
1539             if (filter != null) {
1540               ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1541                 Lists.newArrayList(ourFilter, filter));
1542             }
1543             switch (opType) {
1544               case GET:
1545               case EXISTS:
1546                 ((Get)query).setFilter(ourFilter);
1547                 break;
1548               case SCAN:
1549                 ((Scan)query).setFilter(ourFilter);
1550                 break;
1551               default:
1552                 throw new RuntimeException("Unhandled operation " + opType);
1553             }
1554           }
1555         }
1556       } else {
1557         // New behavior: Any access we might be granted is more fine-grained
1558         // than whole table or CF. Simply inject a filter and return what is
1559         // allowed. We will not throw an AccessDeniedException. This is a
1560         // behavioral change since 0.96.
1561         authResult.setAllowed(true);
1562         authResult.setReason("Access allowed with filter");
1563         // Only wrap the filter if we are enforcing authorizations
1564         if (authorizationEnabled) {
1565           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1566             AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1567           // wrap any existing filter
1568           if (filter != null) {
1569             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1570               Lists.newArrayList(ourFilter, filter));
1571           }
1572           switch (opType) {
1573             case GET:
1574             case EXISTS:
1575               ((Get)query).setFilter(ourFilter);
1576               break;
1577             case SCAN:
1578               ((Scan)query).setFilter(ourFilter);
1579               break;
1580             default:
1581               throw new RuntimeException("Unhandled operation " + opType);
1582           }
1583         }
1584       }
1585     }
1586 
1587     logResult(authResult);
1588     if (authorizationEnabled && !authResult.isAllowed()) {
1589       throw new AccessDeniedException("Insufficient permissions for user '"
1590           + (user != null ? user.getShortName() : "null")
1591           + "' (table=" + table + ", action=READ)");
1592     }
1593   }
1594 
1595   @Override
1596   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1597       final Get get, final List<Cell> result) throws IOException {
1598     internalPreRead(c, get, OpType.GET);
1599   }
1600 
1601   @Override
1602   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1603       final Get get, final boolean exists) throws IOException {
1604     internalPreRead(c, get, OpType.EXISTS);
1605     return exists;
1606   }
1607 
1608   @Override
1609   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1610       final Put put, final WALEdit edit, final Durability durability)
1611       throws IOException {
1612     User user = getActiveUser();
1613     checkForReservedTagPresence(user, put);
1614 
1615     // Require WRITE permission to the table, CF, or top visible value, if any.
1616     // NOTE: We don't need to check the permissions for any earlier Puts
1617     // because we treat the ACLs in each Put as timestamped like any other
1618     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1619     // change the ACL of any previous Put. This allows simple evolution of
1620     // security policy over time without requiring expensive updates.
1621     RegionCoprocessorEnvironment env = c.getEnvironment();
1622     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1623     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1624     logResult(authResult);
1625     if (!authResult.isAllowed()) {
1626       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1627         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1628       } else if (authorizationEnabled) {
1629         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1630       }
1631     }
1632 
1633     // Add cell ACLs from the operation to the cells themselves
1634     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1635     if (bytes != null) {
1636       if (cellFeaturesEnabled) {
1637         addCellPermissions(bytes, put.getFamilyCellMap());
1638       } else {
1639         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1640       }
1641     }
1642   }
1643 
1644   @Override
1645   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1646       final Put put, final WALEdit edit, final Durability durability) {
1647     if (aclRegion) {
1648       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1649     }
1650   }
1651 
1652   @Override
1653   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1654       final Delete delete, final WALEdit edit, final Durability durability)
1655       throws IOException {
1656     // An ACL on a delete is useless, we shouldn't allow it
1657     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1658       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1659     }
1660     // Require WRITE permissions on all cells covered by the delete. Unlike
1661     // for Puts we need to check all visible prior versions, because a major
1662     // compaction could remove them. If the user doesn't have permission to
1663     // overwrite any of the visible versions ('visible' defined as not covered
1664     // by a tombstone already) then we have to disallow this operation.
1665     RegionCoprocessorEnvironment env = c.getEnvironment();
1666     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1667     User user = getActiveUser();
1668     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1669     logResult(authResult);
1670     if (!authResult.isAllowed()) {
1671       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1672         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1673       } else if (authorizationEnabled) {
1674         throw new AccessDeniedException("Insufficient permissions " +
1675           authResult.toContextString());
1676       }
1677     }
1678   }
1679 
1680   @Override
1681   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1682       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1683     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1684       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1685       for (int i = 0; i < miniBatchOp.size(); i++) {
1686         Mutation m = miniBatchOp.getOperation(i);
1687         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1688           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1689           // perm check
1690           OpType opType;
1691           if (m instanceof Put) {
1692             checkForReservedTagPresence(getActiveUser(), m);
1693             opType = OpType.PUT;
1694           } else {
1695             opType = OpType.DELETE;
1696           }
1697           AuthResult authResult = null;
1698           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(),
1699             m.getFamilyCellMap(), m.getTimeStamp(), Action.WRITE)) {
1700             authResult = AuthResult.allow(opType.toString(), "Covering cell set",
1701               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1702           } else {
1703             authResult = AuthResult.deny(opType.toString(), "Covering cell set",
1704               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1705           }
1706           logResult(authResult);
1707           if (authorizationEnabled && !authResult.isAllowed()) {
1708             throw new AccessDeniedException("Insufficient permissions "
1709               + authResult.toContextString());
1710           }
1711         }
1712       }
1713     }
1714   }
1715 
1716   @Override
1717   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1718       final Delete delete, final WALEdit edit, final Durability durability)
1719       throws IOException {
1720     if (aclRegion) {
1721       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1722     }
1723   }
1724 
1725   @Override
1726   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1727       final byte [] row, final byte [] family, final byte [] qualifier,
1728       final CompareFilter.CompareOp compareOp,
1729       final ByteArrayComparable comparator, final Put put,
1730       final boolean result) throws IOException {
1731     User user = getActiveUser();
1732     checkForReservedTagPresence(user, put);
1733 
1734     // Require READ and WRITE permissions on the table, CF, and KV to update
1735     RegionCoprocessorEnvironment env = c.getEnvironment();
1736     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1737     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1738       Action.READ, Action.WRITE);
1739     logResult(authResult);
1740     if (!authResult.isAllowed()) {
1741       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1742         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1743       } else if (authorizationEnabled) {
1744         throw new AccessDeniedException("Insufficient permissions " +
1745           authResult.toContextString());
1746       }
1747     }
1748 
1749     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1750     if (bytes != null) {
1751       if (cellFeaturesEnabled) {
1752         addCellPermissions(bytes, put.getFamilyCellMap());
1753       } else {
1754         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1755       }
1756     }
1757     return result;
1758   }
1759 
1760   @Override
1761   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1762       final byte[] row, final byte[] family, final byte[] qualifier,
1763       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1764       final boolean result) throws IOException {
1765     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1766       // We had failure with table, cf and q perm checks and now giving a chance for cell
1767       // perm check
1768       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1769       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1770       AuthResult authResult = null;
1771       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1772           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1773         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1774             getActiveUser(), Action.READ, table, families);
1775       } else {
1776         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1777             getActiveUser(), Action.READ, table, families);
1778       }
1779       logResult(authResult);
1780       if (authorizationEnabled && !authResult.isAllowed()) {
1781         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1782       }
1783     }
1784     return result;
1785   }
1786 
1787   @Override
1788   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1789       final byte [] row, final byte [] family, final byte [] qualifier,
1790       final CompareFilter.CompareOp compareOp,
1791       final ByteArrayComparable comparator, final Delete delete,
1792       final boolean result) throws IOException {
1793     // An ACL on a delete is useless, we shouldn't allow it
1794     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1795       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1796           delete.toString());
1797     }
1798     // Require READ and WRITE permissions on the table, CF, and the KV covered
1799     // by the delete
1800     RegionCoprocessorEnvironment env = c.getEnvironment();
1801     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1802     User user = getActiveUser();
1803     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1804         Action.READ, Action.WRITE);
1805     logResult(authResult);
1806     if (!authResult.isAllowed()) {
1807       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1808         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1809       } else if (authorizationEnabled) {
1810         throw new AccessDeniedException("Insufficient permissions " +
1811           authResult.toContextString());
1812       }
1813     }
1814     return result;
1815   }
1816 
1817   @Override
1818   public boolean preCheckAndDeleteAfterRowLock(
1819       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1820       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1821       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1822       throws IOException {
1823     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1824       // We had failure with table, cf and q perm checks and now giving a chance for cell
1825       // perm check
1826       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1827       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1828       AuthResult authResult = null;
1829       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1830           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1831         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1832             getActiveUser(), Action.READ, table, families);
1833       } else {
1834         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1835             getActiveUser(), Action.READ, table, families);
1836       }
1837       logResult(authResult);
1838       if (authorizationEnabled && !authResult.isAllowed()) {
1839         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1840       }
1841     }
1842     return result;
1843   }
1844 
1845   @Override
1846   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1847       final byte [] row, final byte [] family, final byte [] qualifier,
1848       final long amount, final boolean writeToWAL)
1849       throws IOException {
1850     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1851     // incremented value
1852     RegionCoprocessorEnvironment env = c.getEnvironment();
1853     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1854     User user = getActiveUser();
1855     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1856         Action.WRITE);
1857     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1858       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1859         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1860       authResult.setReason("Covering cell set");
1861     }
1862     logResult(authResult);
1863     if (authorizationEnabled && !authResult.isAllowed()) {
1864       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1865     }
1866     return -1;
1867   }
1868 
1869   @Override
1870   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1871       throws IOException {
1872     User user = getActiveUser();
1873     checkForReservedTagPresence(user, append);
1874 
1875     // Require WRITE permission to the table, CF, and the KV to be appended
1876     RegionCoprocessorEnvironment env = c.getEnvironment();
1877     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1878     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1879     logResult(authResult);
1880     if (!authResult.isAllowed()) {
1881       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1882         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1883       } else if (authorizationEnabled)  {
1884         throw new AccessDeniedException("Insufficient permissions " +
1885           authResult.toContextString());
1886       }
1887     }
1888 
1889     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1890     if (bytes != null) {
1891       if (cellFeaturesEnabled) {
1892         addCellPermissions(bytes, append.getFamilyCellMap());
1893       } else {
1894         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1895       }
1896     }
1897 
1898     return null;
1899   }
1900 
1901   @Override
1902   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1903       final Append append) throws IOException {
1904     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1905       // We had failure with table, cf and q perm checks and now giving a chance for cell
1906       // perm check
1907       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1908       AuthResult authResult = null;
1909       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1910           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1911         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1912             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1913       } else {
1914         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1915             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1916       }
1917       logResult(authResult);
1918       if (authorizationEnabled && !authResult.isAllowed()) {
1919         throw new AccessDeniedException("Insufficient permissions " +
1920           authResult.toContextString());
1921       }
1922     }
1923     return null;
1924   }
1925 
1926   @Override
1927   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1928       final Increment increment)
1929       throws IOException {
1930     User user = getActiveUser();
1931     checkForReservedTagPresence(user, increment);
1932 
1933     // Require WRITE permission to the table, CF, and the KV to be replaced by
1934     // the incremented value
1935     RegionCoprocessorEnvironment env = c.getEnvironment();
1936     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1937     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1938       Action.WRITE);
1939     logResult(authResult);
1940     if (!authResult.isAllowed()) {
1941       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1942         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1943       } else if (authorizationEnabled) {
1944         throw new AccessDeniedException("Insufficient permissions " +
1945           authResult.toContextString());
1946       }
1947     }
1948 
1949     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1950     if (bytes != null) {
1951       if (cellFeaturesEnabled) {
1952         addCellPermissions(bytes, increment.getFamilyCellMap());
1953       } else {
1954         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1955       }
1956     }
1957 
1958     return null;
1959   }
1960 
1961   @Override
1962   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1963       final Increment increment) throws IOException {
1964     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1965       // We had failure with table, cf and q perm checks and now giving a chance for cell
1966       // perm check
1967       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1968       AuthResult authResult = null;
1969       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1970           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1971         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1972             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1973       } else {
1974         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1975             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1976       }
1977       logResult(authResult);
1978       if (authorizationEnabled && !authResult.isAllowed()) {
1979         throw new AccessDeniedException("Insufficient permissions " +
1980           authResult.toContextString());
1981       }
1982     }
1983     return null;
1984   }
1985 
1986   @Override
1987   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1988       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1989     // If the HFile version is insufficient to persist tags, we won't have any
1990     // work to do here
1991     if (!cellFeaturesEnabled) {
1992       return newCell;
1993     }
1994 
1995     // Collect any ACLs from the old cell
1996     List<Tag> tags = Lists.newArrayList();
1997     List<Tag> aclTags = Lists.newArrayList();
1998     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1999     if (oldCell != null) {
2000       Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell);
2001       while (tagIterator.hasNext()) {
2002         Tag tag = tagIterator.next();
2003         if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
2004           // Not an ACL tag, just carry it through
2005           if (LOG.isTraceEnabled()) {
2006             LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType()
2007                 + " length " + tag.getValueLength());
2008           }
2009           tags.add(tag);
2010         } else {
2011           aclTags.add(tag);
2012         }
2013       }
2014     }
2015 
2016     // Do we have an ACL on the operation?
2017     byte[] aclBytes = mutation.getACL();
2018     if (aclBytes != null) {
2019       // Yes, use it
2020       tags.add(new ArrayBackedTag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
2021     } else {
2022       // No, use what we carried forward
2023       if (perms != null) {
2024         // TODO: If we collected ACLs from more than one tag we may have a
2025         // List<Permission> of size > 1, this can be collapsed into a single
2026         // Permission
2027         if (LOG.isTraceEnabled()) {
2028           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
2029         }
2030         tags.addAll(aclTags);
2031       }
2032     }
2033 
2034     // If we have no tags to add, just return
2035     if (tags.isEmpty()) {
2036       return newCell;
2037     }
2038 
2039     Cell rewriteCell = new TagRewriteCell(newCell, TagUtil.fromList(tags));
2040     return rewriteCell;
2041   }
2042 
2043   @Override
2044   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2045       final Scan scan, final RegionScanner s) throws IOException {
2046     internalPreRead(c, scan, OpType.SCAN);
2047     return s;
2048   }
2049 
2050   @Override
2051   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2052       final Scan scan, final RegionScanner s) throws IOException {
2053     User user = getActiveUser();
2054     if (user != null && user.getShortName() != null) {
2055       // store reference to scanner owner for later checks
2056       scannerOwners.put(s, user.getShortName());
2057     }
2058     return s;
2059   }
2060 
2061   @Override
2062   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
2063       final InternalScanner s, final List<Result> result,
2064       final int limit, final boolean hasNext) throws IOException {
2065     requireScannerOwner(s);
2066     return hasNext;
2067   }
2068 
2069   @Override
2070   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2071       final InternalScanner s) throws IOException {
2072     requireScannerOwner(s);
2073   }
2074 
2075   @Override
2076   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2077       final InternalScanner s) throws IOException {
2078     // clean up any associated owner mapping
2079     scannerOwners.remove(s);
2080   }
2081 
2082   @Override
2083   public boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> e,
2084       final InternalScanner s, final Cell curRowCell, final boolean hasMore) throws IOException {
2085     // Impl in BaseRegionObserver might do unnecessary copy for Off heap backed Cells.
2086     return hasMore;
2087   }
2088 
2089   /**
2090    * Verify, when servicing an RPC, that the caller is the scanner owner.
2091    * If so, we assume that access control is correctly enforced based on
2092    * the checks performed in preScannerOpen()
2093    */
2094   private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
2095     if (!RpcServer.isInRpcCallContext())
2096       return;
2097     String requestUserName = RpcServer.getRequestUserName();
2098     String owner = scannerOwners.get(s);
2099     if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
2100       throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
2101     }
2102   }
2103 
2104   /**
2105    * Verifies user has CREATE privileges on
2106    * the Column Families involved in the bulkLoadHFile
2107    * request. Specific Column Write privileges are presently
2108    * ignored.
2109    */
2110   @Override
2111   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
2112       List<Pair<byte[], String>> familyPaths) throws IOException {
2113     for(Pair<byte[],String> el : familyPaths) {
2114       requirePermission("preBulkLoadHFile",
2115           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
2116           el.getFirst(),
2117           null,
2118           Action.CREATE);
2119     }
2120   }
2121 
2122   /**
2123    * Authorization check for
2124    * SecureBulkLoadProtocol.prepareBulkLoad()
2125    * @param ctx the context
2126    * @param request the request
2127    * @throws IOException
2128    */
2129   @Override
2130   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2131                                  PrepareBulkLoadRequest request) throws IOException {
2132     requireAccess("prePareBulkLoad",
2133         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2134   }
2135 
2136   /**
2137    * Authorization security check for
2138    * SecureBulkLoadProtocol.cleanupBulkLoad()
2139    * @param ctx the context
2140    * @param request the request
2141    * @throws IOException
2142    */
2143   @Override
2144   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2145                                  CleanupBulkLoadRequest request) throws IOException {
2146     requireAccess("preCleanupBulkLoad",
2147         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2148   }
2149 
2150   /* ---- EndpointObserver implementation ---- */
2151 
2152   @Override
2153   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2154       Service service, String methodName, Message request) throws IOException {
2155     // Don't intercept calls to our own AccessControlService, we check for
2156     // appropriate permissions in the service handlers
2157     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2158       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
2159         methodName + ")",
2160         getTableName(ctx.getEnvironment()), null, null,
2161         Action.EXEC);
2162     }
2163     return request;
2164   }
2165 
2166   @Override
2167   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2168       Service service, String methodName, Message request, Message.Builder responseBuilder)
2169       throws IOException { }
2170 
2171   /* ---- Protobuf AccessControlService implementation ---- */
2172 
2173   @Override
2174   public void grant(RpcController controller,
2175                     AccessControlProtos.GrantRequest request,
2176                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2177     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2178     AccessControlProtos.GrantResponse response = null;
2179     try {
2180       // verify it's only running at .acl.
2181       if (aclRegion) {
2182         if (!initialized) {
2183           throw new CoprocessorException("AccessController not yet initialized");
2184         }
2185         if (LOG.isDebugEnabled()) {
2186           LOG.debug("Received request to grant access permission " + perm.toString());
2187         }
2188 
2189         switch(request.getUserPermission().getPermission().getType()) {
2190           case Global :
2191           case Table :
2192             requirePermission("grant", perm.getTableName(), perm.getFamily(),
2193               perm.getQualifier(), Action.ADMIN);
2194             break;
2195           case Namespace :
2196             requireNamespacePermission("grant", perm.getNamespace(), Action.ADMIN);
2197            break;
2198         }
2199 
2200         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2201           @Override
2202           public Void run() throws Exception {
2203             AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2204             return null;
2205           }
2206         });
2207 
2208         if (AUDITLOG.isTraceEnabled()) {
2209           // audit log should store permission changes in addition to auth results
2210           AUDITLOG.trace("Granted permission " + perm.toString());
2211         }
2212       } else {
2213         throw new CoprocessorException(AccessController.class, "This method "
2214             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2215       }
2216       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2217     } catch (IOException ioe) {
2218       // pass exception back up
2219       ResponseConverter.setControllerException(controller, ioe);
2220     }
2221     done.run(response);
2222   }
2223 
2224   @Override
2225   public void revoke(RpcController controller,
2226                      AccessControlProtos.RevokeRequest request,
2227                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2228     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2229     AccessControlProtos.RevokeResponse response = null;
2230     try {
2231       // only allowed to be called on _acl_ region
2232       if (aclRegion) {
2233         if (!initialized) {
2234           throw new CoprocessorException("AccessController not yet initialized");
2235         }
2236         if (LOG.isDebugEnabled()) {
2237           LOG.debug("Received request to revoke access permission " + perm.toString());
2238         }
2239 
2240         switch(request.getUserPermission().getPermission().getType()) {
2241           case Global :
2242           case Table :
2243             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2244               perm.getQualifier(), Action.ADMIN);
2245             break;
2246           case Namespace :
2247             requireNamespacePermission("revoke", perm.getNamespace(), Action.ADMIN);
2248             break;
2249         }
2250 
2251         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2252           @Override
2253           public Void run() throws Exception {
2254             AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2255             return null;
2256           }
2257         });
2258 
2259         if (AUDITLOG.isTraceEnabled()) {
2260           // audit log should record all permission changes
2261           AUDITLOG.trace("Revoked permission " + perm.toString());
2262         }
2263       } else {
2264         throw new CoprocessorException(AccessController.class, "This method "
2265             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2266       }
2267       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2268     } catch (IOException ioe) {
2269       // pass exception back up
2270       ResponseConverter.setControllerException(controller, ioe);
2271     }
2272     done.run(response);
2273   }
2274 
2275   @Override
2276   public void getUserPermissions(RpcController controller,
2277                                  AccessControlProtos.GetUserPermissionsRequest request,
2278                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2279     AccessControlProtos.GetUserPermissionsResponse response = null;
2280     try {
2281       // only allowed to be called on _acl_ region
2282       if (aclRegion) {
2283         if (!initialized) {
2284           throw new CoprocessorException("AccessController not yet initialized");
2285         }
2286         List<UserPermission> perms = null;
2287         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2288           final TableName table = request.hasTableName() ?
2289             ProtobufUtil.toTableName(request.getTableName()) : null;
2290           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2291           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2292             @Override
2293             public List<UserPermission> run() throws Exception {
2294               return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2295             }
2296           });
2297         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2298           final String namespace = request.getNamespaceName().toStringUtf8();
2299           requireNamespacePermission("userPermissions", namespace, Action.ADMIN);
2300           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2301             @Override
2302             public List<UserPermission> run() throws Exception {
2303               return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2304                 namespace);
2305             }
2306           });
2307         } else {
2308           requirePermission("userPermissions", Action.ADMIN);
2309           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2310             @Override
2311             public List<UserPermission> run() throws Exception {
2312               return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2313             }
2314           });
2315           // Adding superusers explicitly to the result set as AccessControlLists do not store them.
2316           // Also using acl as table name to be inline  with the results of global admin and will
2317           // help in avoiding any leakage of information about being superusers.
2318           for (String user: Superusers.getSuperUsers()) {
2319             perms.add(new UserPermission(user.getBytes(), AccessControlLists.ACL_TABLE_NAME, null,
2320                 Action.values()));
2321           }
2322         }
2323         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2324       } else {
2325         throw new CoprocessorException(AccessController.class, "This method "
2326             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2327       }
2328     } catch (IOException ioe) {
2329       // pass exception back up
2330       ResponseConverter.setControllerException(controller, ioe);
2331     }
2332     done.run(response);
2333   }
2334 
2335   @Override
2336   public void checkPermissions(RpcController controller,
2337                                AccessControlProtos.CheckPermissionsRequest request,
2338                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2339     Permission[] permissions = new Permission[request.getPermissionCount()];
2340     for (int i=0; i < request.getPermissionCount(); i++) {
2341       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2342     }
2343     AccessControlProtos.CheckPermissionsResponse response = null;
2344     try {
2345       User user = getActiveUser();
2346       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2347       for (Permission permission : permissions) {
2348         if (permission instanceof TablePermission) {
2349           // Check table permissions
2350 
2351           TablePermission tperm = (TablePermission) permission;
2352           for (Action action : permission.getActions()) {
2353             if (!tperm.getTableName().equals(tableName)) {
2354               throw new CoprocessorException(AccessController.class, String.format("This method "
2355                   + "can only execute at the table specified in TablePermission. " +
2356                   "Table of the region:%s , requested table:%s", tableName,
2357                   tperm.getTableName()));
2358             }
2359 
2360             Map<byte[], Set<byte[]>> familyMap =
2361                 new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2362             if (tperm.getFamily() != null) {
2363               if (tperm.getQualifier() != null) {
2364                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2365                 qualifiers.add(tperm.getQualifier());
2366                 familyMap.put(tperm.getFamily(), qualifiers);
2367               } else {
2368                 familyMap.put(tperm.getFamily(), null);
2369               }
2370             }
2371 
2372             AuthResult result = permissionGranted("checkPermissions", user, action, regionEnv,
2373               familyMap);
2374             logResult(result);
2375             if (!result.isAllowed()) {
2376               // Even if passive we need to throw an exception here, we support checking
2377               // effective permissions, so throw unconditionally
2378               throw new AccessDeniedException("Insufficient permissions (table=" + tableName +
2379                 (familyMap.size() > 0 ? ", family: " + result.toFamilyString() : "") +
2380                 ", action=" + action.toString() + ")");
2381             }
2382           }
2383 
2384         } else {
2385           // Check global permissions
2386 
2387           for (Action action : permission.getActions()) {
2388             AuthResult result;
2389             if (authManager.authorize(user, action)) {
2390               result = AuthResult.allow("checkPermissions", "Global action allowed", user,
2391                 action, null, null);
2392             } else {
2393               result = AuthResult.deny("checkPermissions", "Global action denied", user, action,
2394                 null, null);
2395             }
2396             logResult(result);
2397             if (!result.isAllowed()) {
2398               // Even if passive we need to throw an exception here, we support checking
2399               // effective permissions, so throw unconditionally
2400               throw new AccessDeniedException("Insufficient permissions (action=" +
2401                 action.toString() + ")");
2402             }
2403           }
2404         }
2405       }
2406       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2407     } catch (IOException ioe) {
2408       ResponseConverter.setControllerException(controller, ioe);
2409     }
2410     done.run(response);
2411   }
2412 
2413   @Override
2414   public Service getService() {
2415     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2416   }
2417 
2418   private Region getRegion(RegionCoprocessorEnvironment e) {
2419     return e.getRegion();
2420   }
2421 
2422   private TableName getTableName(RegionCoprocessorEnvironment e) {
2423     Region region = e.getRegion();
2424     if (region != null) {
2425       return getTableName(region);
2426     }
2427     return null;
2428   }
2429 
2430   private TableName getTableName(Region region) {
2431     HRegionInfo regionInfo = region.getRegionInfo();
2432     if (regionInfo != null) {
2433       return regionInfo.getTable();
2434     }
2435     return null;
2436   }
2437 
2438   @Override
2439   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2440       throws IOException {
2441     requirePermission("preClose", Action.ADMIN);
2442   }
2443 
2444   private void checkSystemOrSuperUser() throws IOException {
2445     // No need to check if we're not going to throw
2446     if (!authorizationEnabled) {
2447       return;
2448     }
2449     User activeUser = getActiveUser();
2450     if (!Superusers.isSuperUser(activeUser)) {
2451       throw new AccessDeniedException("User '" + (activeUser != null ?
2452         activeUser.getShortName() : "null") + "is not system or super user.");
2453     }
2454   }
2455 
2456   @Override
2457   public void preStopRegionServer(
2458       ObserverContext<RegionServerCoprocessorEnvironment> env)
2459       throws IOException {
2460     requirePermission("preStopRegionServer", Action.ADMIN);
2461   }
2462 
2463   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2464       byte[] qualifier) {
2465     if (family == null) {
2466       return null;
2467     }
2468 
2469     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2470     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2471     return familyMap;
2472   }
2473 
2474   @Override
2475   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2476        List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2477        String regex) throws IOException {
2478     // We are delegating the authorization check to postGetTableDescriptors as we don't have
2479     // any concrete set of table names when a regex is present or the full list is requested.
2480     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2481       // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2482       // request can be granted.
2483       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2484       for (TableName tableName: tableNamesList) {
2485         // Skip checks for a table that does not exist
2486         if (!masterServices.getTableStateManager().isTablePresent(tableName))
2487           continue;
2488         requirePermission("getTableDescriptors", tableName, null, null,
2489             Action.ADMIN, Action.CREATE);
2490       }
2491     }
2492   }
2493 
2494   @Override
2495   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2496       List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2497       String regex) throws IOException {
2498     // Skipping as checks in this case are already done by preGetTableDescriptors.
2499     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2500       return;
2501     }
2502 
2503     // Retains only those which passes authorization checks, as the checks weren't done as part
2504     // of preGetTableDescriptors.
2505     Iterator<HTableDescriptor> itr = descriptors.iterator();
2506     while (itr.hasNext()) {
2507       HTableDescriptor htd = itr.next();
2508       try {
2509         requirePermission("getTableDescriptors", htd.getTableName(), null, null,
2510             Action.ADMIN, Action.CREATE);
2511       } catch (AccessDeniedException e) {
2512         itr.remove();
2513       }
2514     }
2515   }
2516 
2517   @Override
2518   public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2519       List<HTableDescriptor> descriptors, String regex) throws IOException {
2520     // Retains only those which passes authorization checks.
2521     Iterator<HTableDescriptor> itr = descriptors.iterator();
2522     while (itr.hasNext()) {
2523       HTableDescriptor htd = itr.next();
2524       try {
2525         requireAccess("getTableNames", htd.getTableName(), Action.values());
2526       } catch (AccessDeniedException e) {
2527         itr.remove();
2528       }
2529     }
2530   }
2531 
2532   @Override
2533   public void preDispatchMerge(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2534       HRegionInfo regionA, HRegionInfo regionB) throws IOException {
2535     requirePermission("mergeRegions", regionA.getTable(), null, null,
2536       Action.ADMIN);
2537   }
2538 
2539   @Override
2540   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA,
2541       Region regionB) throws IOException {
2542     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2543       Action.ADMIN);
2544   }
2545 
2546   @Override
2547   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, Region regionA,
2548       Region regionB, Region mergedRegion) throws IOException { }
2549 
2550   @Override
2551   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2552       Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException { }
2553 
2554   @Override
2555   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2556       Region regionA, Region regionB, Region mergedRegion) throws IOException { }
2557 
2558   @Override
2559   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2560       Region regionA, Region regionB) throws IOException { }
2561 
2562   @Override
2563   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2564       Region regionA, Region regionB) throws IOException { }
2565 
2566   @Override
2567   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2568       throws IOException {
2569     requirePermission("preRollLogWriterRequest", Permission.Action.ADMIN);
2570   }
2571 
2572   @Override
2573   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2574       throws IOException { }
2575 
2576   @Override
2577   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2578       final String userName, final Quotas quotas) throws IOException {
2579     requirePermission("setUserQuota", Action.ADMIN);
2580   }
2581 
2582   @Override
2583   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2584       final String userName, final TableName tableName, final Quotas quotas) throws IOException {
2585     requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN);
2586   }
2587 
2588   @Override
2589   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2590       final String userName, final String namespace, final Quotas quotas) throws IOException {
2591     requirePermission("setUserNamespaceQuota", Action.ADMIN);
2592   }
2593 
2594   @Override
2595   public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2596       final TableName tableName, final Quotas quotas) throws IOException {
2597     requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
2598   }
2599 
2600   @Override
2601   public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2602       final String namespace, final Quotas quotas) throws IOException {
2603     requirePermission("setNamespaceQuota", Action.ADMIN);
2604   }
2605 
2606   @Override
2607   public ReplicationEndpoint postCreateReplicationEndPoint(
2608       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2609     return endpoint;
2610   }
2611 
2612   @Override
2613   public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2614       List<WALEntry> entries, CellScanner cells) throws IOException {
2615     requirePermission("replicateLogEntries", Action.WRITE);
2616   }
2617 
2618   @Override
2619   public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2620       List<WALEntry> entries, CellScanner cells) throws IOException {
2621   }
2622 }