View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package org.apache.hadoop.hbase.security.access;
16  
17  import java.io.IOException;
18  import java.net.InetAddress;
19  import java.security.PrivilegedExceptionAction;
20  import java.util.Collection;
21  import java.util.HashMap;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  import java.util.Set;
27  import java.util.TreeMap;
28  import java.util.TreeSet;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.CellScanner;
35  import org.apache.hadoop.hbase.CellUtil;
36  import org.apache.hadoop.hbase.CompoundConfiguration;
37  import org.apache.hadoop.hbase.CoprocessorEnvironment;
38  import org.apache.hadoop.hbase.DoNotRetryIOException;
39  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.KeyValue.Type;
46  import org.apache.hadoop.hbase.MetaTableAccessor;
47  import org.apache.hadoop.hbase.NamespaceDescriptor;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.Tag;
51  import org.apache.hadoop.hbase.TagRewriteCell;
52  import org.apache.hadoop.hbase.classification.InterfaceAudience;
53  import org.apache.hadoop.hbase.client.Append;
54  import org.apache.hadoop.hbase.client.Delete;
55  import org.apache.hadoop.hbase.client.Durability;
56  import org.apache.hadoop.hbase.client.Get;
57  import org.apache.hadoop.hbase.client.Increment;
58  import org.apache.hadoop.hbase.client.Mutation;
59  import org.apache.hadoop.hbase.client.Put;
60  import org.apache.hadoop.hbase.client.Query;
61  import org.apache.hadoop.hbase.client.Result;
62  import org.apache.hadoop.hbase.client.Scan;
63  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
64  import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
65  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
66  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
67  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
68  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
69  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
70  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
71  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
72  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
73  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
74  import org.apache.hadoop.hbase.filter.CompareFilter;
75  import org.apache.hadoop.hbase.filter.Filter;
76  import org.apache.hadoop.hbase.filter.FilterList;
77  import org.apache.hadoop.hbase.io.hfile.HFile;
78  import org.apache.hadoop.hbase.ipc.RpcServer;
79  import org.apache.hadoop.hbase.master.MasterServices;
80  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
81  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
82  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
83  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
84  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
85  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
86  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
87  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
88  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
89  import org.apache.hadoop.hbase.regionserver.InternalScanner;
90  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
91  import org.apache.hadoop.hbase.regionserver.Region;
92  import org.apache.hadoop.hbase.regionserver.RegionScanner;
93  import org.apache.hadoop.hbase.regionserver.ScanType;
94  import org.apache.hadoop.hbase.regionserver.ScannerContext;
95  import org.apache.hadoop.hbase.regionserver.Store;
96  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
97  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
98  import org.apache.hadoop.hbase.security.AccessDeniedException;
99  import org.apache.hadoop.hbase.security.User;
100 import org.apache.hadoop.hbase.security.UserProvider;
101 import org.apache.hadoop.hbase.security.access.Permission.Action;
102 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
103 import org.apache.hadoop.hbase.util.ByteRange;
104 import org.apache.hadoop.hbase.util.Bytes;
105 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
106 import org.apache.hadoop.hbase.util.Pair;
107 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
108 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
109 
110 import com.google.common.collect.ArrayListMultimap;
111 import com.google.common.collect.ImmutableSet;
112 import com.google.common.collect.ListMultimap;
113 import com.google.common.collect.Lists;
114 import com.google.common.collect.MapMaker;
115 import com.google.common.collect.Maps;
116 import com.google.common.collect.Sets;
117 import com.google.protobuf.Message;
118 import com.google.protobuf.RpcCallback;
119 import com.google.protobuf.RpcController;
120 import com.google.protobuf.Service;
121 
122 /**
123  * Provides basic authorization checks for data access and administrative
124  * operations.
125  *
126  * <p>
127  * {@code AccessController} performs authorization checks for HBase operations
128  * based on:
129  * <ul>
130  *   <li>the identity of the user performing the operation</li>
131  *   <li>the scope over which the operation is performed, in increasing
132  *   specificity: global, table, column family, or qualifier</li>
133  *   <li>the type of action being performed (as mapped to
134  *   {@link Permission.Action} values)</li>
135  * </ul>
136  * If the authorization check fails, an {@link AccessDeniedException}
137  * will be thrown for the operation.
138  * </p>
139  *
140  * <p>
141  * To perform authorization checks, {@code AccessController} relies on the
142  * RpcServerEngine being loaded to provide
143  * the user identities for remote requests.
144  * </p>
145  *
146  * <p>
147  * The access control lists used for authorization can be manipulated via the
148  * exposed {@link AccessControlService} Interface implementation, and the associated
149  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
150  * commands.
151  * </p>
152  */
153 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
154 public class AccessController extends BaseMasterAndRegionObserver
155     implements RegionServerObserver,
156       AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
157 
158   public static final Log LOG = LogFactory.getLog(AccessController.class);
159 
160   private static final Log AUDITLOG =
161     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
162   private static final String CHECK_COVERING_PERM = "check_covering_perm";
163   private static final String TAG_CHECK_PASSED = "tag_check_passed";
164   private static final byte[] TRUE = Bytes.toBytes(true);
165 
166   TableAuthManager authManager = null;
167 
168   /** flags if we are running on a region of the _acl_ table */
169   boolean aclRegion = false;
170 
171   /** defined only for Endpoint implementation, so it can have way to
172    access region services */
173   private RegionCoprocessorEnvironment regionEnv;
174 
175   /** Mapping of scanner instances to the user who created them */
176   private Map<InternalScanner,String> scannerOwners =
177       new MapMaker().weakKeys().makeMap();
178 
179   private Map<TableName, List<UserPermission>> tableAcls;
180 
181   /** Provider for mapping principal names to Users */
182   private UserProvider userProvider;
183 
184   /** The list of users with superuser authority */
185   private List<String> superusers;
186 
187   /** if we are active, usually true, only not true if "hbase.security.authorization"
188    has been set to false in site configuration */
189   boolean authorizationEnabled;
190 
191   /** if we are able to support cell ACLs */
192   boolean cellFeaturesEnabled;
193 
194   /** if we should check EXEC permissions */
195   boolean shouldCheckExecPermission;
196 
197   /** if we should terminate access checks early as soon as table or CF grants
198     allow access; pre-0.98 compatible behavior */
199   boolean compatibleEarlyTermination;
200 
201   /** if we have been successfully initialized */
202   private volatile boolean initialized = false;
203 
204   /** if the ACL table is available, only relevant in the master */
205   private volatile boolean aclTabAvailable = false;
206 
207   public Region getRegion() {
208     return regionEnv != null ? regionEnv.getRegion() : null;
209   }
210 
211   public TableAuthManager getAuthManager() {
212     return authManager;
213   }
214 
215   void initialize(RegionCoprocessorEnvironment e) throws IOException {
216     final Region region = e.getRegion();
217     Configuration conf = e.getConfiguration();
218     Map<byte[], ListMultimap<String,TablePermission>> tables =
219         AccessControlLists.loadAll(region);
220     // For each table, write out the table's permissions to the respective
221     // znode for that table.
222     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
223       tables.entrySet()) {
224       byte[] entry = t.getKey();
225       ListMultimap<String,TablePermission> perms = t.getValue();
226       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
227       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
228     }
229     initialized = true;
230   }
231 
232   /**
233    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
234    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
235    * table updates.
236    */
237   void updateACL(RegionCoprocessorEnvironment e,
238       final Map<byte[], List<Cell>> familyMap) {
239     Set<byte[]> entries =
240         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
241     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
242       List<Cell> cells = f.getValue();
243       for (Cell cell: cells) {
244         if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
245             cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
246             AccessControlLists.ACL_LIST_FAMILY.length)) {
247           entries.add(CellUtil.cloneRow(cell));
248         }
249       }
250     }
251     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
252     Configuration conf = regionEnv.getConfiguration();
253     for (byte[] entry: entries) {
254       try {
255         ListMultimap<String,TablePermission> perms =
256           AccessControlLists.getPermissions(conf, entry);
257         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
258         zkw.writeToZookeeper(entry, serialized);
259       } catch (IOException ex) {
260         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
261             ex);
262       }
263     }
264   }
265 
266   /**
267    * Check the current user for authorization to perform a specific action
268    * against the given set of row data.
269    *
270    * <p>Note: Ordering of the authorization checks
271    * has been carefully optimized to short-circuit the most common requests
272    * and minimize the amount of processing required.</p>
273    *
274    * @param permRequest the action being requested
275    * @param e the coprocessor environment
276    * @param families the map of column families to qualifiers present in
277    * the request
278    * @return an authorization result
279    */
280   AuthResult permissionGranted(String request, User user, Action permRequest,
281       RegionCoprocessorEnvironment e,
282       Map<byte [], ? extends Collection<?>> families) {
283     HRegionInfo hri = e.getRegion().getRegionInfo();
284     TableName tableName = hri.getTable();
285 
286     // 1. All users need read access to hbase:meta table.
287     // this is a very common operation, so deal with it quickly.
288     if (hri.isMetaRegion()) {
289       if (permRequest == Action.READ) {
290         return AuthResult.allow(request, "All users allowed", user,
291           permRequest, tableName, families);
292       }
293     }
294 
295     if (user == null) {
296       return AuthResult.deny(request, "No user associated with request!", null,
297         permRequest, tableName, families);
298     }
299 
300     // 2. check for the table-level, if successful we can short-circuit
301     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
302       return AuthResult.allow(request, "Table permission granted", user,
303         permRequest, tableName, families);
304     }
305 
306     // 3. check permissions against the requested families
307     if (families != null && families.size() > 0) {
308       // all families must pass
309       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
310         // a) check for family level access
311         if (authManager.authorize(user, tableName, family.getKey(),
312             permRequest)) {
313           continue;  // family-level permission overrides per-qualifier
314         }
315 
316         // b) qualifier level access can still succeed
317         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
318           if (family.getValue() instanceof Set) {
319             // for each qualifier of the family
320             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
321             for (byte[] qualifier : familySet) {
322               if (!authManager.authorize(user, tableName, family.getKey(),
323                                          qualifier, permRequest)) {
324                 return AuthResult.deny(request, "Failed qualifier check", user,
325                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
326               }
327             }
328           } else if (family.getValue() instanceof List) { // List<KeyValue>
329             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
330             for (KeyValue kv : kvList) {
331               if (!authManager.authorize(user, tableName, family.getKey(),
332                       kv.getQualifier(), permRequest)) {
333                 return AuthResult.deny(request, "Failed qualifier check", user,
334                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
335               }
336             }
337           }
338         } else {
339           // no qualifiers and family-level check already failed
340           return AuthResult.deny(request, "Failed family check", user, permRequest,
341               tableName, makeFamilyMap(family.getKey(), null));
342         }
343       }
344 
345       // all family checks passed
346       return AuthResult.allow(request, "All family checks passed", user, permRequest,
347           tableName, families);
348     }
349 
350     // 4. no families to check and table level access failed
351     return AuthResult.deny(request, "No families to check and table permission failed",
352         user, permRequest, tableName, families);
353   }
354 
355   /**
356    * Check the current user for authorization to perform a specific action
357    * against the given set of row data.
358    * @param opType the operation type
359    * @param user the user
360    * @param e the coprocessor environment
361    * @param families the map of column families to qualifiers present in
362    * the request
363    * @param actions the desired actions
364    * @return an authorization result
365    */
366   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
367       Map<byte [], ? extends Collection<?>> families, Action... actions) {
368     AuthResult result = null;
369     for (Action action: actions) {
370       result = permissionGranted(opType.toString(), user, action, e, families);
371       if (!result.isAllowed()) {
372         return result;
373       }
374     }
375     return result;
376   }
377 
378   private void logResult(AuthResult result) {
379     if (AUDITLOG.isTraceEnabled()) {
380       InetAddress remoteAddr = RpcServer.getRemoteAddress();
381       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
382           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
383           "; reason: " + result.getReason() +
384           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
385           "; request: " + result.getRequest() +
386           "; context: " + result.toContextString());
387     }
388   }
389 
390   /**
391    * Returns the active user to which authorization checks should be applied.
392    * If we are in the context of an RPC call, the remote user is used,
393    * otherwise the currently logged in user is used.
394    */
395   private User getActiveUser() throws IOException {
396     User user = RpcServer.getRequestUser();
397     if (user == null) {
398       // for non-rpc handling, fallback to system user
399       user = userProvider.getCurrent();
400     }
401     return user;
402   }
403 
404   /**
405    * Authorizes that the current user has any of the given permissions for the
406    * given table, column family and column qualifier.
407    * @param tableName Table requested
408    * @param family Column family requested
409    * @param qualifier Column qualifier requested
410    * @throws IOException if obtaining the current user fails
411    * @throws AccessDeniedException if user has no authorization
412    */
413   private void requirePermission(String request, TableName tableName, byte[] family,
414       byte[] qualifier, Action... permissions) throws IOException {
415     User user = getActiveUser();
416     AuthResult result = null;
417 
418     for (Action permission : permissions) {
419       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
420         result = AuthResult.allow(request, "Table permission granted", user,
421                                   permission, tableName, family, qualifier);
422         break;
423       } else {
424         // rest of the world
425         result = AuthResult.deny(request, "Insufficient permissions", user,
426                                  permission, tableName, family, qualifier);
427       }
428     }
429     logResult(result);
430     if (authorizationEnabled && !result.isAllowed()) {
431       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
432     }
433   }
434 
435   /**
436    * Authorizes that the current user has any of the given permissions for the
437    * given table, column family and column qualifier.
438    * @param tableName Table requested
439    * @param family Column family param
440    * @param qualifier Column qualifier param
441    * @throws IOException if obtaining the current user fails
442    * @throws AccessDeniedException if user has no authorization
443    */
444   private void requireTablePermission(String request, TableName tableName, byte[] family,
445       byte[] qualifier, Action... permissions) throws IOException {
446     User user = getActiveUser();
447     AuthResult result = null;
448 
449     for (Action permission : permissions) {
450       if (authManager.authorize(user, tableName, null, null, permission)) {
451         result = AuthResult.allow(request, "Table permission granted", user,
452             permission, tableName, null, null);
453         result.getParams().setFamily(family).setQualifier(qualifier);
454         break;
455       } else {
456         // rest of the world
457         result = AuthResult.deny(request, "Insufficient permissions", user,
458             permission, tableName, family, qualifier);
459         result.getParams().setFamily(family).setQualifier(qualifier);
460       }
461     }
462     logResult(result);
463     if (authorizationEnabled && !result.isAllowed()) {
464       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
465     }
466   }
467 
468   /**
469    * Authorizes that the current user has any of the given permissions to access the table.
470    *
471    * @param tableName Table requested
472    * @param permissions Actions being requested
473    * @throws IOException if obtaining the current user fails
474    * @throws AccessDeniedException if user has no authorization
475    */
476   private void requireAccess(String request, TableName tableName,
477       Action... permissions) throws IOException {
478     User user = getActiveUser();
479     AuthResult result = null;
480 
481     for (Action permission : permissions) {
482       if (authManager.hasAccess(user, tableName, permission)) {
483         result = AuthResult.allow(request, "Table permission granted", user,
484                                   permission, tableName, null, null);
485         break;
486       } else {
487         // rest of the world
488         result = AuthResult.deny(request, "Insufficient permissions", user,
489                                  permission, tableName, null, null);
490       }
491     }
492     logResult(result);
493     if (authorizationEnabled && !result.isAllowed()) {
494       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
495     }
496   }
497 
498   /**
499    * Authorizes that the current user has global privileges for the given action.
500    * @param perm The action being requested
501    * @throws IOException if obtaining the current user fails
502    * @throws AccessDeniedException if authorization is denied
503    */
504   private void requirePermission(String request, Action perm) throws IOException {
505     requireGlobalPermission(request, perm, null, null);
506   }
507 
508   /**
509    * Checks that the user has the given global permission. The generated
510    * audit log message will contain context information for the operation
511    * being authorized, based on the given parameters.
512    * @param perm Action being requested
513    * @param tableName Affected table name.
514    * @param familyMap Affected column families.
515    */
516   private void requireGlobalPermission(String request, Action perm, TableName tableName,
517       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
518     User user = getActiveUser();
519     AuthResult result = null;
520     if (authManager.authorize(user, perm)) {
521       result = AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap);
522       result.getParams().setTableName(tableName).setFamilies(familyMap);
523       logResult(result);
524     } else {
525       result = AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap);
526       result.getParams().setTableName(tableName).setFamilies(familyMap);
527       logResult(result);
528       if (authorizationEnabled) {
529         throw new AccessDeniedException("Insufficient permissions for user '" +
530           (user != null ? user.getShortName() : "null") +"' (global, action=" +
531           perm.toString() + ")");
532       }
533     }
534   }
535 
536   /**
537    * Checks that the user has the given global permission. The generated
538    * audit log message will contain context information for the operation
539    * being authorized, based on the given parameters.
540    * @param perm Action being requested
541    * @param namespace
542    */
543   private void requireGlobalPermission(String request, Action perm,
544                                        String namespace) throws IOException {
545     User user = getActiveUser();
546     AuthResult authResult = null;
547     if (authManager.authorize(user, perm)) {
548       authResult = AuthResult.allow(request, "Global check allowed", user, perm, null);
549       authResult.getParams().setNamespace(namespace);
550       logResult(authResult);
551     } else {
552       authResult = AuthResult.deny(request, "Global check failed", user, perm, null);
553       authResult.getParams().setNamespace(namespace);
554       logResult(authResult);
555       if (authorizationEnabled) {
556         throw new AccessDeniedException("Insufficient permissions for user '" +
557           (user != null ? user.getShortName() : "null") +"' (global, action=" +
558           perm.toString() + ")");
559       }
560     }
561   }
562 
563   /**
564    * Checks that the user has the given global or namespace permission.
565    * @param namespace
566    * @param permissions Actions being requested
567    */
568   public void requireNamespacePermission(String request, String namespace,
569       Action... permissions) throws IOException {
570     User user = getActiveUser();
571     AuthResult result = null;
572 
573     for (Action permission : permissions) {
574       if (authManager.authorize(user, namespace, permission)) {
575         result = AuthResult.allow(request, "Namespace permission granted",
576             user, permission, namespace);
577         break;
578       } else {
579         // rest of the world
580         result = AuthResult.deny(request, "Insufficient permissions", user,
581             permission, namespace);
582       }
583     }
584     logResult(result);
585     if (authorizationEnabled && !result.isAllowed()) {
586       throw new AccessDeniedException("Insufficient permissions "
587           + result.toContextString());
588     }
589   }
590 
591   /**
592    * Checks that the user has the given global or namespace permission.
593    * @param namespace
594    * @param permissions Actions being requested
595    */
596   public void requireNamespacePermission(String request, String namespace, TableName tableName,
597       Map<byte[], ? extends Collection<byte[]>> familyMap, Action... permissions)
598       throws IOException {
599     User user = getActiveUser();
600     AuthResult result = null;
601 
602     for (Action permission : permissions) {
603       if (authManager.authorize(user, namespace, permission)) {
604         result = AuthResult.allow(request, "Namespace permission granted",
605             user, permission, namespace);
606         result.getParams().setTableName(tableName).setFamilies(familyMap);
607         break;
608       } else {
609         // rest of the world
610         result = AuthResult.deny(request, "Insufficient permissions", user,
611             permission, namespace);
612         result.getParams().setTableName(tableName).setFamilies(familyMap);
613       }
614     }
615     logResult(result);
616     if (authorizationEnabled && !result.isAllowed()) {
617       throw new AccessDeniedException("Insufficient permissions "
618           + result.toContextString());
619     }
620   }
621 
622   /**
623    * Returns <code>true</code> if the current user is allowed the given action
624    * over at least one of the column qualifiers in the given column families.
625    */
626   private boolean hasFamilyQualifierPermission(User user,
627       Action perm,
628       RegionCoprocessorEnvironment env,
629       Map<byte[], ? extends Collection<byte[]>> familyMap)
630     throws IOException {
631     HRegionInfo hri = env.getRegion().getRegionInfo();
632     TableName tableName = hri.getTable();
633 
634     if (user == null) {
635       return false;
636     }
637 
638     if (familyMap != null && familyMap.size() > 0) {
639       // at least one family must be allowed
640       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
641           familyMap.entrySet()) {
642         if (family.getValue() != null && !family.getValue().isEmpty()) {
643           for (byte[] qualifier : family.getValue()) {
644             if (authManager.matchPermission(user, tableName,
645                 family.getKey(), qualifier, perm)) {
646               return true;
647             }
648           }
649         } else {
650           if (authManager.matchPermission(user, tableName, family.getKey(),
651               perm)) {
652             return true;
653           }
654         }
655       }
656     } else if (LOG.isDebugEnabled()) {
657       LOG.debug("Empty family map passed for permission check");
658     }
659 
660     return false;
661   }
662 
663   private enum OpType {
664     GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
665     GET("get"),
666     EXISTS("exists"),
667     SCAN("scan"),
668     PUT("put"),
669     DELETE("delete"),
670     CHECK_AND_PUT("checkAndPut"),
671     CHECK_AND_DELETE("checkAndDelete"),
672     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
673     APPEND("append"),
674     INCREMENT("increment");
675 
676     private String type;
677 
678     private OpType(String type) {
679       this.type = type;
680     }
681 
682     @Override
683     public String toString() {
684       return type;
685     }
686   }
687 
688   /**
689    * Determine if cell ACLs covered by the operation grant access. This is expensive.
690    * @return false if cell ACLs failed to grant access, true otherwise
691    * @throws IOException
692    */
693   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
694       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
695       throws IOException {
696     if (!cellFeaturesEnabled) {
697       return false;
698     }
699     long cellGrants = 0;
700     User user = getActiveUser();
701     long latestCellTs = 0;
702     Get get = new Get(row);
703     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
704     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
705     // version. We have to get every cell version and check its TS against the TS asked for in
706     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
707     // consider only one such passing cell. In case of Delete we have to consider all the cell
708     // versions under this passing version. When Delete Mutation contains columns which are a
709     // version delete just consider only one version for those column cells.
710     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
711     if (considerCellTs) {
712       get.setMaxVersions();
713     } else {
714       get.setMaxVersions(1);
715     }
716     boolean diffCellTsFromOpTs = false;
717     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
718       byte[] col = entry.getKey();
719       // TODO: HBASE-7114 could possibly unify the collection type in family
720       // maps so we would not need to do this
721       if (entry.getValue() instanceof Set) {
722         Set<byte[]> set = (Set<byte[]>)entry.getValue();
723         if (set == null || set.isEmpty()) {
724           get.addFamily(col);
725         } else {
726           for (byte[] qual: set) {
727             get.addColumn(col, qual);
728           }
729         }
730       } else if (entry.getValue() instanceof List) {
731         List<Cell> list = (List<Cell>)entry.getValue();
732         if (list == null || list.isEmpty()) {
733           get.addFamily(col);
734         } else {
735           // In case of family delete, a Cell will be added into the list with Qualifier as null.
736           for (Cell cell : list) {
737             if (cell.getQualifierLength() == 0
738                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
739                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
740               get.addFamily(col);
741             } else {
742               get.addColumn(col, CellUtil.cloneQualifier(cell));
743             }
744             if (considerCellTs) {
745               long cellTs = cell.getTimestamp();
746               latestCellTs = Math.max(latestCellTs, cellTs);
747               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
748             }
749           }
750         }
751       } else if (entry.getValue() == null) {
752         get.addFamily(col);        
753       } else {
754         throw new RuntimeException("Unhandled collection type " +
755           entry.getValue().getClass().getName());
756       }
757     }
758     // We want to avoid looking into the future. So, if the cells of the
759     // operation specify a timestamp, or the operation itself specifies a
760     // timestamp, then we use the maximum ts found. Otherwise, we bound
761     // the Get to the current server time. We add 1 to the timerange since
762     // the upper bound of a timerange is exclusive yet we need to examine
763     // any cells found there inclusively.
764     long latestTs = Math.max(opTs, latestCellTs);
765     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
766       latestTs = EnvironmentEdgeManager.currentTime();
767     }
768     get.setTimeRange(0, latestTs + 1);
769     // In case of Put operation we set to read all versions. This was done to consider the case
770     // where columns are added with TS other than the Mutation TS. But normally this wont be the
771     // case with Put. There no need to get all versions but get latest version only.
772     if (!diffCellTsFromOpTs && request == OpType.PUT) {
773       get.setMaxVersions(1);
774     }
775     if (LOG.isTraceEnabled()) {
776       LOG.trace("Scanning for cells with " + get);
777     }
778     // This Map is identical to familyMap. The key is a BR rather than byte[].
779     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
780     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
781     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
782     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
783       if (entry.getValue() instanceof List) {
784         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
785       }
786     }
787     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
788     List<Cell> cells = Lists.newArrayList();
789     Cell prevCell = null;
790     ByteRange curFam = new SimpleMutableByteRange();
791     boolean curColAllVersions = (request == OpType.DELETE);
792     long curColCheckTs = opTs;
793     boolean foundColumn = false;
794     try {
795       boolean more = false;
796       ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
797 
798       do {
799         cells.clear();
800         // scan with limit as 1 to hold down memory use on wide rows
801         more = scanner.next(cells, scannerContext);
802         for (Cell cell: cells) {
803           if (LOG.isTraceEnabled()) {
804             LOG.trace("Found cell " + cell);
805           }
806           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
807           if (colChange) foundColumn = false;
808           prevCell = cell;
809           if (!curColAllVersions && foundColumn) {
810             continue;
811           }
812           if (colChange && considerCellTs) {
813             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
814             List<Cell> cols = familyMap1.get(curFam);
815             for (Cell col : cols) {
816               // null/empty qualifier is used to denote a Family delete. The TS and delete type
817               // associated with this is applicable for all columns within the family. That is
818               // why the below (col.getQualifierLength() == 0) check.
819               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
820                   || CellUtil.matchingQualifier(cell, col)) {
821                 byte type = col.getTypeByte();
822                 if (considerCellTs) {
823                   curColCheckTs = col.getTimestamp();
824                 }
825                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
826                 // a version delete for a column no need to check all the covering cells within
827                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
828                 // One version delete types are Delete/DeleteFamilyVersion
829                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
830                     || (KeyValue.Type.DeleteFamily.getCode() == type);
831                 break;
832               }
833             }
834           }
835           if (cell.getTimestamp() > curColCheckTs) {
836             // Just ignore this cell. This is not a covering cell.
837             continue;
838           }
839           foundColumn = true;
840           for (Action action: actions) {
841             // Are there permissions for this user for the cell?
842             if (!authManager.authorize(user, getTableName(e), cell, action)) {
843               // We can stop if the cell ACL denies access
844               return false;
845             }
846           }
847           cellGrants++;
848         }
849       } while (more);
850     } catch (AccessDeniedException ex) {
851       throw ex;
852     } catch (IOException ex) {
853       LOG.error("Exception while getting cells to calculate covering permission", ex);
854     } finally {
855       scanner.close();
856     }
857     // We should not authorize unless we have found one or more cell ACLs that
858     // grant access. This code is used to check for additional permissions
859     // after no table or CF grants are found.
860     return cellGrants > 0;
861   }
862 
863   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
864     // Iterate over the entries in the familyMap, replacing the cells therein
865     // with new cells including the ACL data
866     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
867       List<Cell> newCells = Lists.newArrayList();
868       for (Cell cell: e.getValue()) {
869         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
870         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
871         if (cell.getTagsLength() > 0) {
872           Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
873             cell.getTagsOffset(), cell.getTagsLength());
874           while (tagIterator.hasNext()) {
875             tags.add(tagIterator.next());
876           }
877         }
878         newCells.add(new TagRewriteCell(cell, Tag.fromList(tags)));
879       }
880       // This is supposed to be safe, won't CME
881       e.setValue(newCells);
882     }
883   }
884 
885   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
886   // type is reserved and should not be explicitly set by user.
887   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
888     // No need to check if we're not going to throw
889     if (!authorizationEnabled) {
890       m.setAttribute(TAG_CHECK_PASSED, TRUE);
891       return;
892     }
893     // Superusers are allowed to store cells unconditionally.
894     if (superusers.contains(user.getShortName())) {
895       m.setAttribute(TAG_CHECK_PASSED, TRUE);
896       return;
897     }
898     // We already checked (prePut vs preBatchMutation)
899     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
900       return;
901     }
902     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
903       Cell cell = cellScanner.current();
904       if (cell.getTagsLength() > 0) {
905         Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
906           cell.getTagsLength());
907         while (tagsItr.hasNext()) {
908           if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
909             throw new AccessDeniedException("Mutation contains cell with reserved type tag");
910           }
911         }
912       }
913     }
914     m.setAttribute(TAG_CHECK_PASSED, TRUE);
915   }
916 
917   /* ---- MasterObserver implementation ---- */
918   @Override
919   public void start(CoprocessorEnvironment env) throws IOException {
920     CompoundConfiguration conf = new CompoundConfiguration();
921     conf.add(env.getConfiguration());
922 
923     authorizationEnabled = conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
924     if (!authorizationEnabled) {
925       LOG.warn("The AccessController has been loaded with authorization checks disabled.");
926     }
927 
928     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
929       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
930 
931     cellFeaturesEnabled = HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS;
932     if (!cellFeaturesEnabled) {
933       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
934           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
935           + " accordingly.");
936     }
937 
938     ZooKeeperWatcher zk = null;
939     if (env instanceof MasterCoprocessorEnvironment) {
940       // if running on HMaster
941       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
942       zk = mEnv.getMasterServices().getZooKeeper();
943     } else if (env instanceof RegionServerCoprocessorEnvironment) {
944       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
945       zk = rsEnv.getRegionServerServices().getZooKeeper();
946     } else if (env instanceof RegionCoprocessorEnvironment) {
947       // if running at region
948       regionEnv = (RegionCoprocessorEnvironment) env;
949       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
950       zk = regionEnv.getRegionServerServices().getZooKeeper();
951       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
952         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
953     }
954 
955     // set the user-provider.
956     this.userProvider = UserProvider.instantiate(env.getConfiguration());
957 
958     // set up the list of users with superuser privilege
959     User user = userProvider.getCurrent();
960     superusers = Lists.asList(user.getShortName(),
961       conf.getStrings(AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
962 
963     // If zk is null or IOException while obtaining auth manager,
964     // throw RuntimeException so that the coprocessor is unloaded.
965     if (zk != null) {
966       try {
967         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
968       } catch (IOException ioe) {
969         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
970       }
971     } else {
972       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
973     }
974 
975     tableAcls = new MapMaker().weakValues().makeMap();
976   }
977 
978   @Override
979   public void stop(CoprocessorEnvironment env) {
980 
981   }
982 
983   @Override
984   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
985       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
986     Set<byte[]> families = desc.getFamiliesKeys();
987     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
988     for (byte[] family: families) {
989       familyMap.put(family, null);
990     }
991     requireNamespacePermission("createTable", desc.getTableName().getNamespaceAsString(),
992         desc.getTableName(), familyMap, Action.CREATE);
993   }
994 
995   @Override
996   public void postCreateTableHandler(final ObserverContext<MasterCoprocessorEnvironment> c,
997       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
998     // When AC is used, it should be configured as the 1st CP.
999     // In Master, the table operations like create, are handled by a Thread pool but the max size
1000     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
1001     // sequentially only.
1002     // Related code in HMaster#startServiceThreads
1003     // {code}
1004     //   // We depend on there being only one instance of this executor running
1005     //   // at a time. To do concurrency, would need fencing of enable/disable of
1006     //   // tables.
1007     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1008     // {code}
1009     // In future if we change this pool to have more threads, then there is a chance for thread,
1010     // creating acl table, getting delayed and by that time another table creation got over and
1011     // this hook is getting called. In such a case, we will need a wait logic here which will
1012     // wait till the acl table is created.
1013     if (AccessControlLists.isAclTable(desc)) {
1014       this.aclTabAvailable = true;
1015     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
1016       if (!aclTabAvailable) {
1017         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
1018             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
1019             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
1020       } else {
1021         String owner = desc.getOwnerString();
1022         // default the table owner to current user, if not specified.
1023         if (owner == null)
1024           owner = getActiveUser().getShortName();
1025         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1026             desc.getTableName(), null, Action.values());
1027         // switch to the real hbase master user for doing the RPC on the ACL table
1028         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1029           @Override
1030           public Void run() throws Exception {
1031             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
1032                 userperm);
1033             return null;
1034           }
1035         });
1036       }
1037     }
1038   }
1039 
1040   @Override
1041   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1042       throws IOException {
1043     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1044   }
1045 
1046   @Override
1047   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
1048       final TableName tableName) throws IOException {
1049     final Configuration conf = c.getEnvironment().getConfiguration();
1050     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1051       @Override
1052       public Void run() throws Exception {
1053         AccessControlLists.removeTablePermissions(conf, tableName);
1054         return null;
1055       }
1056     });
1057     this.authManager.getZKPermissionWatcher().deleteTableACLNode(tableName);
1058   }
1059 
1060   @Override
1061   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
1062       final TableName tableName) throws IOException {
1063     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1064 
1065     final Configuration conf = c.getEnvironment().getConfiguration();
1066     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1067       @Override
1068       public Void run() throws Exception {
1069         List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
1070         if (acls != null) {
1071           tableAcls.put(tableName, acls);
1072         }
1073         return null;
1074       }
1075     });
1076   }
1077 
1078   @Override
1079   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
1080       final TableName tableName) throws IOException {
1081     final Configuration conf = ctx.getEnvironment().getConfiguration();
1082     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1083       @Override
1084       public Void run() throws Exception {
1085         List<UserPermission> perms = tableAcls.get(tableName);
1086         if (perms != null) {
1087           for (UserPermission perm : perms) {
1088             AccessControlLists.addUserPermission(conf, perm);
1089           }
1090         }
1091         tableAcls.remove(tableName);
1092         return null;
1093       }
1094     });
1095   }
1096 
1097   @Override
1098   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1099       HTableDescriptor htd) throws IOException {
1100     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1101   }
1102 
1103   @Override
1104   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
1105       TableName tableName, final HTableDescriptor htd) throws IOException {
1106     final Configuration conf = c.getEnvironment().getConfiguration();
1107     // default the table owner to current user, if not specified.
1108     final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
1109       getActiveUser().getShortName();
1110     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1111       @Override
1112       public Void run() throws Exception {
1113         UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1114           htd.getTableName(), null, Action.values());
1115         AccessControlLists.addUserPermission(conf, userperm);
1116         return null;
1117       }
1118     });
1119   }
1120 
1121   @Override
1122   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1123       HColumnDescriptor column) throws IOException {
1124     requireTablePermission("addColumn", tableName, column.getName(), null, Action.ADMIN,
1125         Action.CREATE);
1126   }
1127 
1128   @Override
1129   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1130       HColumnDescriptor descriptor) throws IOException {
1131     requirePermission("modifyColumn", tableName, descriptor.getName(), null, Action.ADMIN,
1132       Action.CREATE);
1133   }
1134 
1135   @Override
1136   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1137       byte[] col) throws IOException {
1138     requirePermission("deleteColumn", tableName, col, null, Action.ADMIN, Action.CREATE);
1139   }
1140 
1141   @Override
1142   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
1143       final TableName tableName, final byte[] col) throws IOException {
1144     final Configuration conf = c.getEnvironment().getConfiguration();
1145     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1146       @Override
1147       public Void run() throws Exception {
1148         AccessControlLists.removeTablePermissions(conf, tableName, col);
1149         return null;
1150       }
1151     });
1152   }
1153 
1154   @Override
1155   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1156       throws IOException {
1157     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1158   }
1159 
1160   @Override
1161   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1162       throws IOException {
1163     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1164       // We have to unconditionally disallow disable of the ACL table when we are installed,
1165       // even if not enforcing authorizations. We are still allowing grants and revocations,
1166       // checking permissions and logging audit messages, etc. If the ACL table is not
1167       // available we will fail random actions all over the place.
1168       throw new AccessDeniedException("Not allowed to disable "
1169           + AccessControlLists.ACL_TABLE_NAME + " table with AccessController installed");
1170     }
1171     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1172   }
1173 
1174   @Override
1175   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1176       ServerName srcServer, ServerName destServer) throws IOException {
1177     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1178   }
1179 
1180   @Override
1181   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1182       throws IOException {
1183     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1184   }
1185 
1186   @Override
1187   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1188       boolean force) throws IOException {
1189     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1190   }
1191 
1192   @Override
1193   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1194       HRegionInfo regionInfo) throws IOException {
1195     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1196   }
1197 
1198   @Override
1199   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1200       throws IOException {
1201     requirePermission("balance", Action.ADMIN);
1202   }
1203 
1204   @Override
1205   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1206       boolean newValue) throws IOException {
1207     requirePermission("balanceSwitch", Action.ADMIN);
1208     return newValue;
1209   }
1210 
1211   @Override
1212   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1213       throws IOException {
1214     requirePermission("shutdown", Action.ADMIN);
1215   }
1216 
1217   @Override
1218   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1219       throws IOException {
1220     requirePermission("stopMaster", Action.ADMIN);
1221   }
1222 
1223   @Override
1224   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1225       throws IOException {
1226     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1227       .getConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1228       // initialize the ACL storage table
1229       AccessControlLists.createACLTable(ctx.getEnvironment().getMasterServices());
1230     } else {
1231       aclTabAvailable = true;
1232     }
1233   }
1234 
1235   @Override
1236   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1237       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1238       throws IOException {
1239     requirePermission("snapshot", hTableDescriptor.getTableName(), null, null,
1240       Permission.Action.ADMIN);
1241   }
1242 
1243   @Override
1244   public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1245       final SnapshotDescription snapshot) throws IOException {
1246     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1247       // list it, if user is the owner of snapshot
1248       // TODO: We are not logging this for audit
1249     } else {
1250       requirePermission("listSnapshot", Action.ADMIN);
1251     }
1252   }
1253 
1254   @Override
1255   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1256       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1257       throws IOException {
1258     requirePermission("clone", Action.ADMIN);
1259   }
1260 
1261   @Override
1262   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1263       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1264       throws IOException {
1265     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1266       requirePermission("restoreSnapshot", hTableDescriptor.getTableName(), null, null,
1267         Permission.Action.ADMIN);
1268     } else {
1269       requirePermission("restoreSnapshot", Action.ADMIN);
1270     }
1271   }
1272 
1273   @Override
1274   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1275       final SnapshotDescription snapshot) throws IOException {
1276     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1277       // Snapshot owner is allowed to delete the snapshot
1278       // TODO: We are not logging this for audit
1279     } else {
1280       requirePermission("deleteSnapshot", Action.ADMIN);
1281     }
1282   }
1283 
1284   @Override
1285   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1286       NamespaceDescriptor ns) throws IOException {
1287     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1288   }
1289 
1290   @Override
1291   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1292       throws IOException {
1293     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1294   }
1295 
1296   @Override
1297   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1298       final String namespace) throws IOException {
1299     final Configuration conf = ctx.getEnvironment().getConfiguration();
1300     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1301       @Override
1302       public Void run() throws Exception {
1303         AccessControlLists.removeNamespacePermissions(conf, namespace);
1304         return null;
1305       }
1306     });
1307     this.authManager.getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
1308     LOG.info(namespace + " entry deleted in " + AccessControlLists.ACL_TABLE_NAME + " table.");
1309   }
1310 
1311   @Override
1312   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1313       NamespaceDescriptor ns) throws IOException {
1314     // We require only global permission so that 
1315     // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1316     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1317   }
1318 
1319   @Override
1320   public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1321       throws IOException {
1322     requireNamespacePermission("getNamespaceDescriptor", namespace, Action.ADMIN);
1323   }
1324 
1325   @Override
1326   public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1327       List<NamespaceDescriptor> descriptors) throws IOException {
1328     // Retains only those which passes authorization checks, as the checks weren't done as part
1329     // of preGetTableDescriptors.
1330     Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1331     while (itr.hasNext()) {
1332       NamespaceDescriptor desc = itr.next();
1333       try {
1334         requireNamespacePermission("listNamespaces", desc.getName(), Action.ADMIN);
1335       } catch (AccessDeniedException e) {
1336         itr.remove();
1337       }
1338     }
1339   }
1340 
1341   @Override
1342   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1343       final TableName tableName) throws IOException {
1344     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1345   }
1346 
1347   /* ---- RegionObserver implementation ---- */
1348 
1349   @Override
1350   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1351       throws IOException {
1352     RegionCoprocessorEnvironment env = e.getEnvironment();
1353     final Region region = env.getRegion();
1354     if (region == null) {
1355       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1356     } else {
1357       HRegionInfo regionInfo = region.getRegionInfo();
1358       if (regionInfo.getTable().isSystemTable()) {
1359         isSystemOrSuperUser(regionEnv.getConfiguration());
1360       } else {
1361         requirePermission("preOpen", Action.ADMIN);
1362       }
1363     }
1364   }
1365 
1366   @Override
1367   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1368     RegionCoprocessorEnvironment env = c.getEnvironment();
1369     final Region region = env.getRegion();
1370     if (region == null) {
1371       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1372       return;
1373     }
1374     if (AccessControlLists.isAclRegion(region)) {
1375       aclRegion = true;
1376       // When this region is under recovering state, initialize will be handled by postLogReplay
1377       if (!region.isRecovering()) {
1378         try {
1379           initialize(env);
1380         } catch (IOException ex) {
1381           // if we can't obtain permissions, it's better to fail
1382           // than perform checks incorrectly
1383           throw new RuntimeException("Failed to initialize permissions cache", ex);
1384         }
1385       }
1386     } else {
1387       initialized = true;
1388     }
1389   }
1390 
1391   @Override
1392   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1393     if (aclRegion) {
1394       try {
1395         initialize(c.getEnvironment());
1396       } catch (IOException ex) {
1397         // if we can't obtain permissions, it's better to fail
1398         // than perform checks incorrectly
1399         throw new RuntimeException("Failed to initialize permissions cache", ex);
1400       }
1401     }
1402   }
1403 
1404   @Override
1405   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1406     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1407         Action.CREATE);
1408   }
1409 
1410   @Override
1411   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1412     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1413   }
1414 
1415   @Override
1416   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1417       byte[] splitRow) throws IOException {
1418     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1419   }
1420 
1421   @Override
1422   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1423       final Store store, final InternalScanner scanner, final ScanType scanType)
1424           throws IOException {
1425     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1426         Action.CREATE);
1427     return scanner;
1428   }
1429 
1430   @Override
1431   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1432       final byte [] row, final byte [] family, final Result result)
1433       throws IOException {
1434     assert family != null;
1435     RegionCoprocessorEnvironment env = c.getEnvironment();
1436     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, null);
1437     User user = getActiveUser();
1438     AuthResult authResult = permissionGranted(OpType.GET_CLOSEST_ROW_BEFORE, user, env, families,
1439       Action.READ);
1440     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1441       authResult.setAllowed(checkCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, env, row,
1442         families, HConstants.LATEST_TIMESTAMP, Action.READ));
1443       authResult.setReason("Covering cell set");
1444     }
1445     logResult(authResult);
1446     if (authorizationEnabled && !authResult.isAllowed()) {
1447       throw new AccessDeniedException("Insufficient permissions " +
1448         authResult.toContextString());
1449     }
1450   }
1451 
1452   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1453       final Query query, OpType opType) throws IOException {
1454     Filter filter = query.getFilter();
1455     // Don't wrap an AccessControlFilter
1456     if (filter != null && filter instanceof AccessControlFilter) {
1457       return;
1458     }
1459     User user = getActiveUser();
1460     RegionCoprocessorEnvironment env = c.getEnvironment();
1461     Map<byte[],? extends Collection<byte[]>> families = null;
1462     switch (opType) {
1463     case GET:
1464     case EXISTS:
1465       families = ((Get)query).getFamilyMap();
1466       break;
1467     case SCAN:
1468       families = ((Scan)query).getFamilyMap();
1469       break;
1470     default:
1471       throw new RuntimeException("Unhandled operation " + opType);
1472     }
1473     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1474     Region region = getRegion(env);
1475     TableName table = getTableName(region);
1476     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1477     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1478       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1479     }
1480     if (!authResult.isAllowed()) {
1481       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1482         // Old behavior: Scan with only qualifier checks if we have partial
1483         // permission. Backwards compatible behavior is to throw an
1484         // AccessDeniedException immediately if there are no grants for table
1485         // or CF or CF+qual. Only proceed with an injected filter if there are
1486         // grants for qualifiers. Otherwise we will fall through below and log
1487         // the result and throw an ADE. We may end up checking qualifier
1488         // grants three times (permissionGranted above, here, and in the
1489         // filter) but that's the price of backwards compatibility.
1490         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1491           authResult.setAllowed(true);
1492           authResult.setReason("Access allowed with filter");
1493           // Only wrap the filter if we are enforcing authorizations
1494           if (authorizationEnabled) {
1495             Filter ourFilter = new AccessControlFilter(authManager, user, table,
1496               AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1497               cfVsMaxVersions);
1498             // wrap any existing filter
1499             if (filter != null) {
1500               ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1501                 Lists.newArrayList(ourFilter, filter));
1502             }
1503             switch (opType) {
1504               case GET:
1505               case EXISTS:
1506                 ((Get)query).setFilter(ourFilter);
1507                 break;
1508               case SCAN:
1509                 ((Scan)query).setFilter(ourFilter);
1510                 break;
1511               default:
1512                 throw new RuntimeException("Unhandled operation " + opType);
1513             }
1514           }
1515         }
1516       } else {
1517         // New behavior: Any access we might be granted is more fine-grained
1518         // than whole table or CF. Simply inject a filter and return what is
1519         // allowed. We will not throw an AccessDeniedException. This is a
1520         // behavioral change since 0.96.
1521         authResult.setAllowed(true);
1522         authResult.setReason("Access allowed with filter");
1523         // Only wrap the filter if we are enforcing authorizations
1524         if (authorizationEnabled) {
1525           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1526             AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1527           // wrap any existing filter
1528           if (filter != null) {
1529             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1530               Lists.newArrayList(ourFilter, filter));
1531           }
1532           switch (opType) {
1533             case GET:
1534             case EXISTS:
1535               ((Get)query).setFilter(ourFilter);
1536               break;
1537             case SCAN:
1538               ((Scan)query).setFilter(ourFilter);
1539               break;
1540             default:
1541               throw new RuntimeException("Unhandled operation " + opType);
1542           }
1543         }
1544       }
1545     }
1546 
1547     logResult(authResult);
1548     if (authorizationEnabled && !authResult.isAllowed()) {
1549       throw new AccessDeniedException("Insufficient permissions for user '"
1550           + (user != null ? user.getShortName() : "null")
1551           + "' (table=" + table + ", action=READ)");
1552     }
1553   }
1554 
1555   @Override
1556   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1557       final Get get, final List<Cell> result) throws IOException {
1558     internalPreRead(c, get, OpType.GET);
1559   }
1560 
1561   @Override
1562   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1563       final Get get, final boolean exists) throws IOException {
1564     internalPreRead(c, get, OpType.EXISTS);
1565     return exists;
1566   }
1567 
1568   @Override
1569   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1570       final Put put, final WALEdit edit, final Durability durability)
1571       throws IOException {
1572     User user = getActiveUser();
1573     checkForReservedTagPresence(user, put);
1574 
1575     // Require WRITE permission to the table, CF, or top visible value, if any.
1576     // NOTE: We don't need to check the permissions for any earlier Puts
1577     // because we treat the ACLs in each Put as timestamped like any other
1578     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1579     // change the ACL of any previous Put. This allows simple evolution of
1580     // security policy over time without requiring expensive updates.
1581     RegionCoprocessorEnvironment env = c.getEnvironment();
1582     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1583     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1584     logResult(authResult);
1585     if (!authResult.isAllowed()) {
1586       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1587         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1588       } else if (authorizationEnabled) {
1589         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1590       }
1591     }
1592 
1593     // Add cell ACLs from the operation to the cells themselves
1594     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1595     if (bytes != null) {
1596       if (cellFeaturesEnabled) {
1597         addCellPermissions(bytes, put.getFamilyCellMap());
1598       } else {
1599         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1600       }
1601     }
1602   }
1603 
1604   @Override
1605   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1606       final Put put, final WALEdit edit, final Durability durability) {
1607     if (aclRegion) {
1608       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1609     }
1610   }
1611 
1612   @Override
1613   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1614       final Delete delete, final WALEdit edit, final Durability durability)
1615       throws IOException {
1616     // An ACL on a delete is useless, we shouldn't allow it
1617     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1618       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1619     }
1620     // Require WRITE permissions on all cells covered by the delete. Unlike
1621     // for Puts we need to check all visible prior versions, because a major
1622     // compaction could remove them. If the user doesn't have permission to
1623     // overwrite any of the visible versions ('visible' defined as not covered
1624     // by a tombstone already) then we have to disallow this operation.
1625     RegionCoprocessorEnvironment env = c.getEnvironment();
1626     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1627     User user = getActiveUser();
1628     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1629     logResult(authResult);
1630     if (!authResult.isAllowed()) {
1631       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1632         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1633       } else if (authorizationEnabled) {
1634         throw new AccessDeniedException("Insufficient permissions " +
1635           authResult.toContextString());
1636       }
1637     }
1638   }
1639 
1640   @Override
1641   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1642       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1643     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1644       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1645       for (int i = 0; i < miniBatchOp.size(); i++) {
1646         Mutation m = miniBatchOp.getOperation(i);
1647         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1648           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1649           // perm check
1650           OpType opType;
1651           if (m instanceof Put) {
1652             checkForReservedTagPresence(getActiveUser(), m);
1653             opType = OpType.PUT;
1654           } else {
1655             opType = OpType.DELETE;
1656           }
1657           AuthResult authResult = null;
1658           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(),
1659             m.getFamilyCellMap(), m.getTimeStamp(), Action.WRITE)) {
1660             authResult = AuthResult.allow(opType.toString(), "Covering cell set",
1661               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1662           } else {
1663             authResult = AuthResult.deny(opType.toString(), "Covering cell set",
1664               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1665           }
1666           logResult(authResult);
1667           if (authorizationEnabled && !authResult.isAllowed()) {
1668             throw new AccessDeniedException("Insufficient permissions "
1669               + authResult.toContextString());
1670           }
1671         }
1672       }
1673     }
1674   }
1675 
1676   @Override
1677   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1678       final Delete delete, final WALEdit edit, final Durability durability)
1679       throws IOException {
1680     if (aclRegion) {
1681       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1682     }
1683   }
1684 
1685   @Override
1686   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1687       final byte [] row, final byte [] family, final byte [] qualifier,
1688       final CompareFilter.CompareOp compareOp,
1689       final ByteArrayComparable comparator, final Put put,
1690       final boolean result) throws IOException {
1691     User user = getActiveUser();
1692     checkForReservedTagPresence(user, put);
1693 
1694     // Require READ and WRITE permissions on the table, CF, and KV to update
1695     RegionCoprocessorEnvironment env = c.getEnvironment();
1696     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1697     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1698       Action.READ, Action.WRITE);
1699     logResult(authResult);
1700     if (!authResult.isAllowed()) {
1701       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1702         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1703       } else if (authorizationEnabled) {
1704         throw new AccessDeniedException("Insufficient permissions " +
1705           authResult.toContextString());
1706       }
1707     }
1708 
1709     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1710     if (bytes != null) {
1711       if (cellFeaturesEnabled) {
1712         addCellPermissions(bytes, put.getFamilyCellMap());
1713       } else {
1714         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1715       }
1716     }
1717     return result;
1718   }
1719 
1720   @Override
1721   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1722       final byte[] row, final byte[] family, final byte[] qualifier,
1723       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1724       final boolean result) throws IOException {
1725     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1726       // We had failure with table, cf and q perm checks and now giving a chance for cell
1727       // perm check
1728       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1729       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1730       AuthResult authResult = null;
1731       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1732           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1733         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1734             getActiveUser(), Action.READ, table, families);
1735       } else {
1736         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1737             getActiveUser(), Action.READ, table, families);
1738       }
1739       logResult(authResult);
1740       if (authorizationEnabled && !authResult.isAllowed()) {
1741         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1742       }
1743     }
1744     return result;
1745   }
1746 
1747   @Override
1748   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1749       final byte [] row, final byte [] family, final byte [] qualifier,
1750       final CompareFilter.CompareOp compareOp,
1751       final ByteArrayComparable comparator, final Delete delete,
1752       final boolean result) throws IOException {
1753     // An ACL on a delete is useless, we shouldn't allow it
1754     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1755       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1756           delete.toString());
1757     }
1758     // Require READ and WRITE permissions on the table, CF, and the KV covered
1759     // by the delete
1760     RegionCoprocessorEnvironment env = c.getEnvironment();
1761     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1762     User user = getActiveUser();
1763     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1764       Action.READ, Action.WRITE);
1765     logResult(authResult);
1766     if (!authResult.isAllowed()) {
1767       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1768         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1769       } else if (authorizationEnabled) {
1770         throw new AccessDeniedException("Insufficient permissions " +
1771           authResult.toContextString());
1772       }
1773     }
1774     return result;
1775   }
1776 
1777   @Override
1778   public boolean preCheckAndDeleteAfterRowLock(
1779       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1780       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1781       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1782       throws IOException {
1783     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1784       // We had failure with table, cf and q perm checks and now giving a chance for cell
1785       // perm check
1786       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1787       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1788       AuthResult authResult = null;
1789       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1790           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1791         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1792             getActiveUser(), Action.READ, table, families);
1793       } else {
1794         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1795             getActiveUser(), Action.READ, table, families);
1796       }
1797       logResult(authResult);
1798       if (authorizationEnabled && !authResult.isAllowed()) {
1799         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1800       }
1801     }
1802     return result;
1803   }
1804 
1805   @Override
1806   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1807       final byte [] row, final byte [] family, final byte [] qualifier,
1808       final long amount, final boolean writeToWAL)
1809       throws IOException {
1810     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1811     // incremented value
1812     RegionCoprocessorEnvironment env = c.getEnvironment();
1813     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1814     User user = getActiveUser();
1815     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1816       Action.WRITE);
1817     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1818       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1819         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1820       authResult.setReason("Covering cell set");
1821     }
1822     logResult(authResult);
1823     if (authorizationEnabled && !authResult.isAllowed()) {
1824       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1825     }
1826     return -1;
1827   }
1828 
1829   @Override
1830   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1831       throws IOException {
1832     User user = getActiveUser();
1833     checkForReservedTagPresence(user, append);
1834 
1835     // Require WRITE permission to the table, CF, and the KV to be appended
1836     RegionCoprocessorEnvironment env = c.getEnvironment();
1837     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1838     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1839     logResult(authResult);
1840     if (!authResult.isAllowed()) {
1841       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1842         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1843       } else if (authorizationEnabled)  {
1844         throw new AccessDeniedException("Insufficient permissions " +
1845           authResult.toContextString());
1846       }
1847     }
1848 
1849     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1850     if (bytes != null) {
1851       if (cellFeaturesEnabled) {
1852         addCellPermissions(bytes, append.getFamilyCellMap());
1853       } else {
1854         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1855       }
1856     }
1857 
1858     return null;
1859   }
1860 
1861   @Override
1862   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1863       final Append append) throws IOException {
1864     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1865       // We had failure with table, cf and q perm checks and now giving a chance for cell
1866       // perm check
1867       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1868       AuthResult authResult = null;
1869       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1870           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1871         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1872             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1873       } else {
1874         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1875             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1876       }
1877       logResult(authResult);
1878       if (authorizationEnabled && !authResult.isAllowed()) {
1879         throw new AccessDeniedException("Insufficient permissions " +
1880           authResult.toContextString());
1881       }
1882     }
1883     return null;
1884   }
1885 
1886   @Override
1887   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1888       final Increment increment)
1889       throws IOException {
1890     User user = getActiveUser();
1891     checkForReservedTagPresence(user, increment);
1892 
1893     // Require WRITE permission to the table, CF, and the KV to be replaced by
1894     // the incremented value
1895     RegionCoprocessorEnvironment env = c.getEnvironment();
1896     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1897     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1898       Action.WRITE);
1899     logResult(authResult);
1900     if (!authResult.isAllowed()) {
1901       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1902         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1903       } else if (authorizationEnabled) {
1904         throw new AccessDeniedException("Insufficient permissions " +
1905           authResult.toContextString());
1906       }
1907     }
1908 
1909     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1910     if (bytes != null) {
1911       if (cellFeaturesEnabled) {
1912         addCellPermissions(bytes, increment.getFamilyCellMap());
1913       } else {
1914         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1915       }
1916     }
1917 
1918     return null;
1919   }
1920 
1921   @Override
1922   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1923       final Increment increment) throws IOException {
1924     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1925       // We had failure with table, cf and q perm checks and now giving a chance for cell
1926       // perm check
1927       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1928       AuthResult authResult = null;
1929       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1930           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1931         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1932             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1933       } else {
1934         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1935             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1936       }
1937       logResult(authResult);
1938       if (authorizationEnabled && !authResult.isAllowed()) {
1939         throw new AccessDeniedException("Insufficient permissions " +
1940           authResult.toContextString());
1941       }
1942     }
1943     return null;
1944   }
1945 
1946   @Override
1947   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1948       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1949     // If the HFile version is insufficient to persist tags, we won't have any
1950     // work to do here
1951     if (!cellFeaturesEnabled) {
1952       return newCell;
1953     }
1954 
1955     // Collect any ACLs from the old cell
1956     List<Tag> tags = Lists.newArrayList();
1957     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1958     if (oldCell != null) {
1959       // Save an object allocation where we can
1960       if (oldCell.getTagsLength() > 0) {
1961         Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell.getTagsArray(),
1962           oldCell.getTagsOffset(), oldCell.getTagsLength());
1963         while (tagIterator.hasNext()) {
1964           Tag tag = tagIterator.next();
1965           if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
1966             // Not an ACL tag, just carry it through
1967             if (LOG.isTraceEnabled()) {
1968               LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
1969                 " length " + tag.getTagLength());
1970             }
1971             tags.add(tag);
1972           } else {
1973             // Merge the perms from the older ACL into the current permission set
1974             ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
1975               AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
1976                 tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
1977             perms.putAll(kvPerms);
1978           }
1979         }
1980       }
1981     }
1982 
1983     // Do we have an ACL on the operation?
1984     byte[] aclBytes = mutation.getACL();
1985     if (aclBytes != null) {
1986       // Yes, use it
1987       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
1988     } else {
1989       // No, use what we carried forward
1990       if (perms != null) {
1991         // TODO: If we collected ACLs from more than one tag we may have a
1992         // List<Permission> of size > 1, this can be collapsed into a single
1993         // Permission
1994         if (LOG.isTraceEnabled()) {
1995           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1996         }
1997         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
1998           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
1999       }
2000     }
2001 
2002     // If we have no tags to add, just return
2003     if (tags.isEmpty()) {
2004       return newCell;
2005     }
2006 
2007     Cell rewriteCell = new TagRewriteCell(newCell, Tag.fromList(tags));
2008     return rewriteCell;
2009   }
2010 
2011   @Override
2012   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2013       final Scan scan, final RegionScanner s) throws IOException {
2014     internalPreRead(c, scan, OpType.SCAN);
2015     return s;
2016   }
2017 
2018   @Override
2019   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2020       final Scan scan, final RegionScanner s) throws IOException {
2021     User user = getActiveUser();
2022     if (user != null && user.getShortName() != null) {
2023       // store reference to scanner owner for later checks
2024       scannerOwners.put(s, user.getShortName());
2025     }
2026     return s;
2027   }
2028 
2029   @Override
2030   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
2031       final InternalScanner s, final List<Result> result,
2032       final int limit, final boolean hasNext) throws IOException {
2033     requireScannerOwner(s);
2034     return hasNext;
2035   }
2036 
2037   @Override
2038   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2039       final InternalScanner s) throws IOException {
2040     requireScannerOwner(s);
2041   }
2042 
2043   @Override
2044   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2045       final InternalScanner s) throws IOException {
2046     // clean up any associated owner mapping
2047     scannerOwners.remove(s);
2048   }
2049 
2050   /**
2051    * Verify, when servicing an RPC, that the caller is the scanner owner.
2052    * If so, we assume that access control is correctly enforced based on
2053    * the checks performed in preScannerOpen()
2054    */
2055   private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
2056     if (!RpcServer.isInRpcCallContext())
2057       return;
2058     String requestUserName = RpcServer.getRequestUserName();
2059     String owner = scannerOwners.get(s);
2060     if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
2061       throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
2062     }
2063   }
2064 
2065   /**
2066    * Verifies user has CREATE privileges on
2067    * the Column Families involved in the bulkLoadHFile
2068    * request. Specific Column Write privileges are presently
2069    * ignored.
2070    */
2071   @Override
2072   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
2073       List<Pair<byte[], String>> familyPaths) throws IOException {
2074     for(Pair<byte[],String> el : familyPaths) {
2075       requirePermission("preBulkLoadHFile",
2076           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
2077           el.getFirst(),
2078           null,
2079           Action.CREATE);
2080     }
2081   }
2082 
2083   /**
2084    * Authorization check for
2085    * SecureBulkLoadProtocol.prepareBulkLoad()
2086    * @param ctx the context
2087    * @param request the request
2088    * @throws IOException
2089    */
2090   @Override
2091   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2092                                  PrepareBulkLoadRequest request) throws IOException {
2093     requireAccess("prePareBulkLoad",
2094         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2095   }
2096 
2097   /**
2098    * Authorization security check for
2099    * SecureBulkLoadProtocol.cleanupBulkLoad()
2100    * @param ctx the context
2101    * @param request the request
2102    * @throws IOException
2103    */
2104   @Override
2105   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2106                                  CleanupBulkLoadRequest request) throws IOException {
2107     requireAccess("preCleanupBulkLoad",
2108         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2109   }
2110 
2111   /* ---- EndpointObserver implementation ---- */
2112 
2113   @Override
2114   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2115       Service service, String methodName, Message request) throws IOException {
2116     // Don't intercept calls to our own AccessControlService, we check for
2117     // appropriate permissions in the service handlers
2118     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2119       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
2120         methodName + ")",
2121         getTableName(ctx.getEnvironment()), null, null,
2122         Action.EXEC);
2123     }
2124     return request;
2125   }
2126 
2127   @Override
2128   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2129       Service service, String methodName, Message request, Message.Builder responseBuilder)
2130       throws IOException { }
2131 
2132   /* ---- Protobuf AccessControlService implementation ---- */
2133 
2134   @Override
2135   public void grant(RpcController controller,
2136                     AccessControlProtos.GrantRequest request,
2137                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2138     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2139     AccessControlProtos.GrantResponse response = null;
2140     try {
2141       // verify it's only running at .acl.
2142       if (aclRegion) {
2143         if (!initialized) {
2144           throw new CoprocessorException("AccessController not yet initialized");
2145         }
2146         if (LOG.isDebugEnabled()) {
2147           LOG.debug("Received request to grant access permission " + perm.toString());
2148         }
2149 
2150         switch(request.getUserPermission().getPermission().getType()) {
2151           case Global :
2152           case Table :
2153             requirePermission("grant", perm.getTableName(), perm.getFamily(),
2154               perm.getQualifier(), Action.ADMIN);
2155             break;
2156           case Namespace :
2157             requireGlobalPermission("grant", Action.ADMIN, perm.getNamespace());
2158            break;
2159         }
2160 
2161         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2162           @Override
2163           public Void run() throws Exception {
2164             AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2165             return null;
2166           }
2167         });
2168 
2169         if (AUDITLOG.isTraceEnabled()) {
2170           // audit log should store permission changes in addition to auth results
2171           AUDITLOG.trace("Granted permission " + perm.toString());
2172         }
2173       } else {
2174         throw new CoprocessorException(AccessController.class, "This method "
2175             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2176       }
2177       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2178     } catch (IOException ioe) {
2179       // pass exception back up
2180       ResponseConverter.setControllerException(controller, ioe);
2181     }
2182     done.run(response);
2183   }
2184 
2185   @Override
2186   public void revoke(RpcController controller,
2187                      AccessControlProtos.RevokeRequest request,
2188                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2189     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2190     AccessControlProtos.RevokeResponse response = null;
2191     try {
2192       // only allowed to be called on _acl_ region
2193       if (aclRegion) {
2194         if (!initialized) {
2195           throw new CoprocessorException("AccessController not yet initialized");
2196         }
2197         if (LOG.isDebugEnabled()) {
2198           LOG.debug("Received request to revoke access permission " + perm.toString());
2199         }
2200 
2201         switch(request.getUserPermission().getPermission().getType()) {
2202           case Global :
2203           case Table :
2204             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2205               perm.getQualifier(), Action.ADMIN);
2206             break;
2207           case Namespace :
2208             requireGlobalPermission("revoke", Action.ADMIN, perm.getNamespace());
2209             break;
2210         }
2211 
2212         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2213           @Override
2214           public Void run() throws Exception {
2215             AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2216             return null;
2217           }
2218         });
2219 
2220         if (AUDITLOG.isTraceEnabled()) {
2221           // audit log should record all permission changes
2222           AUDITLOG.trace("Revoked permission " + perm.toString());
2223         }
2224       } else {
2225         throw new CoprocessorException(AccessController.class, "This method "
2226             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2227       }
2228       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2229     } catch (IOException ioe) {
2230       // pass exception back up
2231       ResponseConverter.setControllerException(controller, ioe);
2232     }
2233     done.run(response);
2234   }
2235 
2236   @Override
2237   public void getUserPermissions(RpcController controller,
2238                                  AccessControlProtos.GetUserPermissionsRequest request,
2239                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2240     AccessControlProtos.GetUserPermissionsResponse response = null;
2241     try {
2242       // only allowed to be called on _acl_ region
2243       if (aclRegion) {
2244         if (!initialized) {
2245           throw new CoprocessorException("AccessController not yet initialized");
2246         }
2247         List<UserPermission> perms = null;
2248         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2249           final TableName table = request.hasTableName() ?
2250             ProtobufUtil.toTableName(request.getTableName()) : null;
2251           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2252           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2253             @Override
2254             public List<UserPermission> run() throws Exception {
2255               return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2256             }
2257           });
2258         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2259           final String namespace = request.getNamespaceName().toStringUtf8();
2260           requireNamespacePermission("userPermissions", namespace, Action.ADMIN);
2261           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2262             @Override
2263             public List<UserPermission> run() throws Exception {
2264               return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2265                 namespace);
2266             }
2267           });
2268         } else {
2269           requirePermission("userPermissions", Action.ADMIN);
2270           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2271             @Override
2272             public List<UserPermission> run() throws Exception {
2273               return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2274             }
2275           });
2276         }
2277         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2278       } else {
2279         throw new CoprocessorException(AccessController.class, "This method "
2280             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2281       }
2282     } catch (IOException ioe) {
2283       // pass exception back up
2284       ResponseConverter.setControllerException(controller, ioe);
2285     }
2286     done.run(response);
2287   }
2288 
2289   @Override
2290   public void checkPermissions(RpcController controller,
2291                                AccessControlProtos.CheckPermissionsRequest request,
2292                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2293     Permission[] permissions = new Permission[request.getPermissionCount()];
2294     for (int i=0; i < request.getPermissionCount(); i++) {
2295       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2296     }
2297     AccessControlProtos.CheckPermissionsResponse response = null;
2298     try {
2299       User user = getActiveUser();
2300       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2301       for (Permission permission : permissions) {
2302         if (permission instanceof TablePermission) {
2303           // Check table permissions
2304 
2305           TablePermission tperm = (TablePermission) permission;
2306           for (Action action : permission.getActions()) {
2307             if (!tperm.getTableName().equals(tableName)) {
2308               throw new CoprocessorException(AccessController.class, String.format("This method "
2309                   + "can only execute at the table specified in TablePermission. " +
2310                   "Table of the region:%s , requested table:%s", tableName,
2311                   tperm.getTableName()));
2312             }
2313 
2314             Map<byte[], Set<byte[]>> familyMap =
2315                 new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2316             if (tperm.getFamily() != null) {
2317               if (tperm.getQualifier() != null) {
2318                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2319                 qualifiers.add(tperm.getQualifier());
2320                 familyMap.put(tperm.getFamily(), qualifiers);
2321               } else {
2322                 familyMap.put(tperm.getFamily(), null);
2323               }
2324             }
2325 
2326             AuthResult result = permissionGranted("checkPermissions", user, action, regionEnv,
2327               familyMap);
2328             logResult(result);
2329             if (!result.isAllowed()) {
2330               // Even if passive we need to throw an exception here, we support checking
2331               // effective permissions, so throw unconditionally
2332               throw new AccessDeniedException("Insufficient permissions (table=" + tableName +
2333                 (familyMap.size() > 0 ? ", family: " + result.toFamilyString() : "") +
2334                 ", action=" + action.toString() + ")");
2335             }
2336           }
2337 
2338         } else {
2339           // Check global permissions
2340 
2341           for (Action action : permission.getActions()) {
2342             AuthResult result;
2343             if (authManager.authorize(user, action)) {
2344               result = AuthResult.allow("checkPermissions", "Global action allowed", user,
2345                 action, null, null);
2346             } else {
2347               result = AuthResult.deny("checkPermissions", "Global action denied", user, action,
2348                 null, null);
2349             }
2350             logResult(result);
2351             if (!result.isAllowed()) {
2352               // Even if passive we need to throw an exception here, we support checking
2353               // effective permissions, so throw unconditionally
2354               throw new AccessDeniedException("Insufficient permissions (action=" +
2355                 action.toString() + ")");
2356             }
2357           }
2358         }
2359       }
2360       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2361     } catch (IOException ioe) {
2362       ResponseConverter.setControllerException(controller, ioe);
2363     }
2364     done.run(response);
2365   }
2366 
2367   @Override
2368   public Service getService() {
2369     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2370   }
2371 
2372   private Region getRegion(RegionCoprocessorEnvironment e) {
2373     return e.getRegion();
2374   }
2375 
2376   private TableName getTableName(RegionCoprocessorEnvironment e) {
2377     Region region = e.getRegion();
2378     if (region != null) {
2379       return getTableName(region);
2380     }
2381     return null;
2382   }
2383 
2384   private TableName getTableName(Region region) {
2385     HRegionInfo regionInfo = region.getRegionInfo();
2386     if (regionInfo != null) {
2387       return regionInfo.getTable();
2388     }
2389     return null;
2390   }
2391 
2392   @Override
2393   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2394       throws IOException {
2395     requirePermission("preClose", Action.ADMIN);
2396   }
2397 
2398   private void isSystemOrSuperUser(Configuration conf) throws IOException {
2399     // No need to check if we're not going to throw
2400     if (!authorizationEnabled) {
2401       return;
2402     }
2403     User user = userProvider.getCurrent();
2404     if (user == null) {
2405       throw new IOException("Unable to obtain the current user, " +
2406         "authorization checks for internal operations will not work correctly!");
2407     }
2408     User activeUser = getActiveUser();
2409     if (!(superusers.contains(activeUser.getShortName()))) {
2410       throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null") +
2411         "is not system or super user.");
2412     }
2413   }
2414 
2415   @Override
2416   public void preStopRegionServer(
2417       ObserverContext<RegionServerCoprocessorEnvironment> env)
2418       throws IOException {
2419     requirePermission("preStopRegionServer", Action.ADMIN);
2420   }
2421 
2422   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2423       byte[] qualifier) {
2424     if (family == null) {
2425       return null;
2426     }
2427 
2428     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2429     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2430     return familyMap;
2431   }
2432 
2433   @Override
2434   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2435        List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2436        String regex) throws IOException {
2437     // We are delegating the authorization check to postGetTableDescriptors as we don't have
2438     // any concrete set of table names when a regex is present or the full list is requested.
2439     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2440       // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2441       // request can be granted.
2442       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2443       for (TableName tableName: tableNamesList) {
2444         // Skip checks for a table that does not exist
2445         if (!masterServices.getTableStateManager().isTablePresent(tableName))
2446           continue;
2447         requirePermission("getTableDescriptors", tableName, null, null,
2448             Action.ADMIN, Action.CREATE);
2449       }
2450     }
2451   }
2452 
2453   @Override
2454   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2455       List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2456       String regex) throws IOException {
2457     // Skipping as checks in this case are already done by preGetTableDescriptors.
2458     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2459       return;
2460     }
2461 
2462     // Retains only those which passes authorization checks, as the checks weren't done as part
2463     // of preGetTableDescriptors.
2464     Iterator<HTableDescriptor> itr = descriptors.iterator();
2465     while (itr.hasNext()) {
2466       HTableDescriptor htd = itr.next();
2467       try {
2468         requirePermission("getTableDescriptors", htd.getTableName(), null, null,
2469             Action.ADMIN, Action.CREATE);
2470       } catch (AccessDeniedException e) {
2471         itr.remove();
2472       }
2473     }
2474   }
2475 
2476   @Override
2477   public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2478       List<HTableDescriptor> descriptors, String regex) throws IOException {
2479     // Retains only those which passes authorization checks.
2480     Iterator<HTableDescriptor> itr = descriptors.iterator();
2481     while (itr.hasNext()) {
2482       HTableDescriptor htd = itr.next();
2483       try {
2484         requireAccess("getTableNames", htd.getTableName(), Action.values());
2485       } catch (AccessDeniedException e) {
2486         itr.remove();
2487       }
2488     }
2489   }
2490 
2491   @Override
2492   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA,
2493       Region regionB) throws IOException {
2494     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2495       Action.ADMIN);
2496   }
2497 
2498   @Override
2499   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, Region regionA,
2500       Region regionB, Region mergedRegion) throws IOException { }
2501 
2502   @Override
2503   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2504       Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException { }
2505 
2506   @Override
2507   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2508       Region regionA, Region regionB, Region mergedRegion) throws IOException { }
2509 
2510   @Override
2511   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2512       Region regionA, Region regionB) throws IOException { }
2513 
2514   @Override
2515   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2516       Region regionA, Region regionB) throws IOException { }
2517 
2518   @Override
2519   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2520       throws IOException {
2521     requirePermission("preRollLogWriterRequest", Permission.Action.ADMIN);
2522   }
2523 
2524   @Override
2525   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2526       throws IOException { }
2527 
2528   @Override
2529   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2530       final String userName, final Quotas quotas) throws IOException {
2531     requirePermission("setUserQuota", Action.ADMIN);
2532   }
2533 
2534   @Override
2535   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2536       final String userName, final TableName tableName, final Quotas quotas) throws IOException {
2537     requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN);
2538   }
2539 
2540   @Override
2541   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2542       final String userName, final String namespace, final Quotas quotas) throws IOException {
2543     requirePermission("setUserNamespaceQuota", Action.ADMIN);
2544   }
2545 
2546   @Override
2547   public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2548       final TableName tableName, final Quotas quotas) throws IOException {
2549     requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
2550   }
2551 
2552   @Override
2553   public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2554       final String namespace, final Quotas quotas) throws IOException {
2555     requirePermission("setNamespaceQuota", Action.ADMIN);
2556   }
2557 
2558   @Override
2559   public ReplicationEndpoint postCreateReplicationEndPoint(
2560       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2561     return endpoint;
2562   }
2563 
2564   @Override
2565   public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2566       List<WALEntry> entries, CellScanner cells) throws IOException {
2567     requirePermission("replicateLogEntries", Action.WRITE);
2568   }
2569 
2570   @Override
2571   public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2572       List<WALEntry> entries, CellScanner cells) throws IOException {
2573   }
2574 }