View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.security.access;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.security.PrivilegedExceptionAction;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Map.Entry;
30  import java.util.Set;
31  import java.util.TreeMap;
32  import java.util.TreeSet;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.CellScanner;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.CompoundConfiguration;
41  import org.apache.hadoop.hbase.CoprocessorEnvironment;
42  import org.apache.hadoop.hbase.DoNotRetryIOException;
43  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
44  import org.apache.hadoop.hbase.HColumnDescriptor;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.HRegionInfo;
47  import org.apache.hadoop.hbase.HTableDescriptor;
48  import org.apache.hadoop.hbase.KeyValue;
49  import org.apache.hadoop.hbase.KeyValue.Type;
50  import org.apache.hadoop.hbase.MetaTableAccessor;
51  import org.apache.hadoop.hbase.NamespaceDescriptor;
52  import org.apache.hadoop.hbase.ProcedureInfo;
53  import org.apache.hadoop.hbase.ServerName;
54  import org.apache.hadoop.hbase.TableName;
55  import org.apache.hadoop.hbase.Tag;
56  import org.apache.hadoop.hbase.TagRewriteCell;
57  import org.apache.hadoop.hbase.classification.InterfaceAudience;
58  import org.apache.hadoop.hbase.client.Append;
59  import org.apache.hadoop.hbase.client.Delete;
60  import org.apache.hadoop.hbase.client.Durability;
61  import org.apache.hadoop.hbase.client.Get;
62  import org.apache.hadoop.hbase.client.Increment;
63  import org.apache.hadoop.hbase.client.Mutation;
64  import org.apache.hadoop.hbase.client.Put;
65  import org.apache.hadoop.hbase.client.Query;
66  import org.apache.hadoop.hbase.client.Result;
67  import org.apache.hadoop.hbase.client.Scan;
68  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
69  import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
70  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
71  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
72  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
73  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
74  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
75  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
76  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
77  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
78  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
79  import org.apache.hadoop.hbase.filter.CompareFilter;
80  import org.apache.hadoop.hbase.filter.Filter;
81  import org.apache.hadoop.hbase.filter.FilterList;
82  import org.apache.hadoop.hbase.io.hfile.HFile;
83  import org.apache.hadoop.hbase.ipc.RpcServer;
84  import org.apache.hadoop.hbase.master.MasterServices;
85  import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
86  import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
87  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
88  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
89  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
90  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
91  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
92  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
93  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
94  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
95  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
96  import org.apache.hadoop.hbase.regionserver.InternalScanner;
97  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
98  import org.apache.hadoop.hbase.regionserver.Region;
99  import org.apache.hadoop.hbase.regionserver.RegionScanner;
100 import org.apache.hadoop.hbase.regionserver.ScanType;
101 import org.apache.hadoop.hbase.regionserver.ScannerContext;
102 import org.apache.hadoop.hbase.regionserver.Store;
103 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
104 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
105 import org.apache.hadoop.hbase.security.AccessDeniedException;
106 import org.apache.hadoop.hbase.security.Superusers;
107 import org.apache.hadoop.hbase.security.User;
108 import org.apache.hadoop.hbase.security.UserProvider;
109 import org.apache.hadoop.hbase.security.access.Permission.Action;
110 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
111 import org.apache.hadoop.hbase.util.ByteRange;
112 import org.apache.hadoop.hbase.util.Bytes;
113 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
114 import org.apache.hadoop.hbase.util.Pair;
115 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
116 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
117 
118 import com.google.common.collect.ArrayListMultimap;
119 import com.google.common.collect.ImmutableSet;
120 import com.google.common.collect.ListMultimap;
121 import com.google.common.collect.Lists;
122 import com.google.common.collect.MapMaker;
123 import com.google.common.collect.Maps;
124 import com.google.common.collect.Sets;
125 import com.google.protobuf.Message;
126 import com.google.protobuf.RpcCallback;
127 import com.google.protobuf.RpcController;
128 import com.google.protobuf.Service;
129 
130 /**
131  * Provides basic authorization checks for data access and administrative
132  * operations.
133  *
134  * <p>
135  * {@code AccessController} performs authorization checks for HBase operations
136  * based on:
137  * </p>
138  * <ul>
139  *   <li>the identity of the user performing the operation</li>
140  *   <li>the scope over which the operation is performed, in increasing
141  *   specificity: global, table, column family, or qualifier</li>
142  *   <li>the type of action being performed (as mapped to
143  *   {@link Permission.Action} values)</li>
144  * </ul>
145  * <p>
146  * If the authorization check fails, an {@link AccessDeniedException}
147  * will be thrown for the operation.
148  * </p>
149  *
150  * <p>
151  * To perform authorization checks, {@code AccessController} relies on the
152  * RpcServerEngine being loaded to provide
153  * the user identities for remote requests.
154  * </p>
155  *
156  * <p>
157  * The access control lists used for authorization can be manipulated via the
158  * exposed {@link AccessControlService} Interface implementation, and the associated
159  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
160  * commands.
161  * </p>
162  */
163 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
164 public class AccessController extends BaseMasterAndRegionObserver
165     implements RegionServerObserver,
166       AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
167 
168   private static final Log LOG = LogFactory.getLog(AccessController.class);
169 
170   private static final Log AUDITLOG =
171     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
172   private static final String CHECK_COVERING_PERM = "check_covering_perm";
173   private static final String TAG_CHECK_PASSED = "tag_check_passed";
174   private static final byte[] TRUE = Bytes.toBytes(true);
175 
176   TableAuthManager authManager = null;
177 
178   /** flags if we are running on a region of the _acl_ table */
179   boolean aclRegion = false;
180 
181   /** defined only for Endpoint implementation, so it can have way to
182    access region services */
183   private RegionCoprocessorEnvironment regionEnv;
184 
185   /** Mapping of scanner instances to the user who created them */
186   private Map<InternalScanner,String> scannerOwners =
187       new MapMaker().weakKeys().makeMap();
188 
189   private Map<TableName, List<UserPermission>> tableAcls;
190 
191   /** Provider for mapping principal names to Users */
192   private UserProvider userProvider;
193 
194   /** if we are active, usually true, only not true if "hbase.security.authorization"
195    has been set to false in site configuration */
196   boolean authorizationEnabled;
197 
198   /** if we are able to support cell ACLs */
199   boolean cellFeaturesEnabled;
200 
201   /** if we should check EXEC permissions */
202   boolean shouldCheckExecPermission;
203 
204   /** if we should terminate access checks early as soon as table or CF grants
205     allow access; pre-0.98 compatible behavior */
206   boolean compatibleEarlyTermination;
207 
208   /** if we have been successfully initialized */
209   private volatile boolean initialized = false;
210 
211   /** if the ACL table is available, only relevant in the master */
212   private volatile boolean aclTabAvailable = false;
213 
214   public static boolean isAuthorizationSupported(Configuration conf) {
215     return conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
216   }
217 
218   public static boolean isCellAuthorizationSupported(Configuration conf) {
219     return isAuthorizationSupported(conf) &&
220         (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
221   }
222 
223   public Region getRegion() {
224     return regionEnv != null ? regionEnv.getRegion() : null;
225   }
226 
227   public TableAuthManager getAuthManager() {
228     return authManager;
229   }
230 
231   void initialize(RegionCoprocessorEnvironment e) throws IOException {
232     final Region region = e.getRegion();
233     Configuration conf = e.getConfiguration();
234     Map<byte[], ListMultimap<String,TablePermission>> tables =
235         AccessControlLists.loadAll(region);
236     // For each table, write out the table's permissions to the respective
237     // znode for that table.
238     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
239       tables.entrySet()) {
240       byte[] entry = t.getKey();
241       ListMultimap<String,TablePermission> perms = t.getValue();
242       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
243       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
244     }
245     initialized = true;
246   }
247 
248   /**
249    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
250    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
251    * table updates.
252    */
253   void updateACL(RegionCoprocessorEnvironment e,
254       final Map<byte[], List<Cell>> familyMap) {
255     Set<byte[]> entries =
256         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
257     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
258       List<Cell> cells = f.getValue();
259       for (Cell cell: cells) {
260         if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
261             cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
262             AccessControlLists.ACL_LIST_FAMILY.length)) {
263           entries.add(CellUtil.cloneRow(cell));
264         }
265       }
266     }
267     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
268     Configuration conf = regionEnv.getConfiguration();
269     for (byte[] entry: entries) {
270       try {
271         ListMultimap<String,TablePermission> perms =
272           AccessControlLists.getPermissions(conf, entry);
273         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
274         zkw.writeToZookeeper(entry, serialized);
275       } catch (IOException ex) {
276         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
277             ex);
278       }
279     }
280   }
281 
282   /**
283    * Check the current user for authorization to perform a specific action
284    * against the given set of row data.
285    *
286    * <p>Note: Ordering of the authorization checks
287    * has been carefully optimized to short-circuit the most common requests
288    * and minimize the amount of processing required.</p>
289    *
290    * @param permRequest the action being requested
291    * @param e the coprocessor environment
292    * @param families the map of column families to qualifiers present in
293    * the request
294    * @return an authorization result
295    */
296   AuthResult permissionGranted(String request, User user, Action permRequest,
297       RegionCoprocessorEnvironment e,
298       Map<byte [], ? extends Collection<?>> families) {
299     HRegionInfo hri = e.getRegion().getRegionInfo();
300     TableName tableName = hri.getTable();
301 
302     // 1. All users need read access to hbase:meta table.
303     // this is a very common operation, so deal with it quickly.
304     if (hri.isMetaRegion()) {
305       if (permRequest == Action.READ) {
306         return AuthResult.allow(request, "All users allowed", user,
307           permRequest, tableName, families);
308       }
309     }
310 
311     if (user == null) {
312       return AuthResult.deny(request, "No user associated with request!", null,
313         permRequest, tableName, families);
314     }
315 
316     // 2. check for the table-level, if successful we can short-circuit
317     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
318       return AuthResult.allow(request, "Table permission granted", user,
319         permRequest, tableName, families);
320     }
321 
322     // 3. check permissions against the requested families
323     if (families != null && families.size() > 0) {
324       // all families must pass
325       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
326         // a) check for family level access
327         if (authManager.authorize(user, tableName, family.getKey(),
328             permRequest)) {
329           continue;  // family-level permission overrides per-qualifier
330         }
331 
332         // b) qualifier level access can still succeed
333         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
334           if (family.getValue() instanceof Set) {
335             // for each qualifier of the family
336             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
337             for (byte[] qualifier : familySet) {
338               if (!authManager.authorize(user, tableName, family.getKey(),
339                                          qualifier, permRequest)) {
340                 return AuthResult.deny(request, "Failed qualifier check", user,
341                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
342               }
343             }
344           } else if (family.getValue() instanceof List) { // List<KeyValue>
345             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
346             for (KeyValue kv : kvList) {
347               if (!authManager.authorize(user, tableName, family.getKey(),
348                       kv.getQualifier(), permRequest)) {
349                 return AuthResult.deny(request, "Failed qualifier check", user,
350                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
351               }
352             }
353           }
354         } else {
355           // no qualifiers and family-level check already failed
356           return AuthResult.deny(request, "Failed family check", user, permRequest,
357               tableName, makeFamilyMap(family.getKey(), null));
358         }
359       }
360 
361       // all family checks passed
362       return AuthResult.allow(request, "All family checks passed", user, permRequest,
363           tableName, families);
364     }
365 
366     // 4. no families to check and table level access failed
367     return AuthResult.deny(request, "No families to check and table permission failed",
368         user, permRequest, tableName, families);
369   }
370 
371   /**
372    * Check the current user for authorization to perform a specific action
373    * against the given set of row data.
374    * @param opType the operation type
375    * @param user the user
376    * @param e the coprocessor environment
377    * @param families the map of column families to qualifiers present in
378    * the request
379    * @param actions the desired actions
380    * @return an authorization result
381    */
382   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
383       Map<byte [], ? extends Collection<?>> families, Action... actions) {
384     AuthResult result = null;
385     for (Action action: actions) {
386       result = permissionGranted(opType.toString(), user, action, e, families);
387       if (!result.isAllowed()) {
388         return result;
389       }
390     }
391     return result;
392   }
393 
394   private void logResult(AuthResult result) {
395     if (AUDITLOG.isTraceEnabled()) {
396       InetAddress remoteAddr = RpcServer.getRemoteAddress();
397       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
398           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
399           "; reason: " + result.getReason() +
400           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
401           "; request: " + result.getRequest() +
402           "; context: " + result.toContextString());
403     }
404   }
405 
406   /**
407    * Returns the active user to which authorization checks should be applied.
408    * If we are in the context of an RPC call, the remote user is used,
409    * otherwise the currently logged in user is used.
410    */
411   private User getActiveUser() throws IOException {
412     User user = RpcServer.getRequestUser();
413     if (user == null) {
414       // for non-rpc handling, fallback to system user
415       user = userProvider.getCurrent();
416     }
417     return user;
418   }
419 
420   /**
421    * Authorizes that the current user has any of the given permissions for the
422    * given table, column family and column qualifier.
423    * @param tableName Table requested
424    * @param family Column family requested
425    * @param qualifier Column qualifier requested
426    * @throws IOException if obtaining the current user fails
427    * @throws AccessDeniedException if user has no authorization
428    */
429   private void requirePermission(String request, TableName tableName, byte[] family,
430       byte[] qualifier, Action... permissions) throws IOException {
431     User user = getActiveUser();
432     AuthResult result = null;
433 
434     for (Action permission : permissions) {
435       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
436         result = AuthResult.allow(request, "Table permission granted", user,
437                                   permission, tableName, family, qualifier);
438         break;
439       } else {
440         // rest of the world
441         result = AuthResult.deny(request, "Insufficient permissions", user,
442                                  permission, tableName, family, qualifier);
443       }
444     }
445     logResult(result);
446     if (authorizationEnabled && !result.isAllowed()) {
447       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
448     }
449   }
450 
451   /**
452    * Authorizes that the current user has any of the given permissions for the
453    * given table, column family and column qualifier.
454    * @param tableName Table requested
455    * @param family Column family param
456    * @param qualifier Column qualifier param
457    * @throws IOException if obtaining the current user fails
458    * @throws AccessDeniedException if user has no authorization
459    */
460   private void requireTablePermission(String request, TableName tableName, byte[] family,
461       byte[] qualifier, Action... permissions) throws IOException {
462     User user = getActiveUser();
463     AuthResult result = null;
464 
465     for (Action permission : permissions) {
466       if (authManager.authorize(user, tableName, null, null, permission)) {
467         result = AuthResult.allow(request, "Table permission granted", user,
468             permission, tableName, null, null);
469         result.getParams().setFamily(family).setQualifier(qualifier);
470         break;
471       } else {
472         // rest of the world
473         result = AuthResult.deny(request, "Insufficient permissions", user,
474             permission, tableName, family, qualifier);
475         result.getParams().setFamily(family).setQualifier(qualifier);
476       }
477     }
478     logResult(result);
479     if (authorizationEnabled && !result.isAllowed()) {
480       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
481     }
482   }
483 
484   /**
485    * Authorizes that the current user has any of the given permissions to access the table.
486    *
487    * @param tableName Table requested
488    * @param permissions Actions being requested
489    * @throws IOException if obtaining the current user fails
490    * @throws AccessDeniedException if user has no authorization
491    */
492   private void requireAccess(String request, TableName tableName,
493       Action... permissions) throws IOException {
494     User user = getActiveUser();
495     AuthResult result = null;
496 
497     for (Action permission : permissions) {
498       if (authManager.hasAccess(user, tableName, permission)) {
499         result = AuthResult.allow(request, "Table permission granted", user,
500                                   permission, tableName, null, null);
501         break;
502       } else {
503         // rest of the world
504         result = AuthResult.deny(request, "Insufficient permissions", user,
505                                  permission, tableName, null, null);
506       }
507     }
508     logResult(result);
509     if (authorizationEnabled && !result.isAllowed()) {
510       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
511     }
512   }
513 
514   /**
515    * Authorizes that the current user has global privileges for the given action.
516    * @param perm The action being requested
517    * @throws IOException if obtaining the current user fails
518    * @throws AccessDeniedException if authorization is denied
519    */
520   private void requirePermission(String request, Action perm) throws IOException {
521     requireGlobalPermission(request, perm, null, null);
522   }
523 
524   /**
525    * Checks that the user has the given global permission. The generated
526    * audit log message will contain context information for the operation
527    * being authorized, based on the given parameters.
528    * @param perm Action being requested
529    * @param tableName Affected table name.
530    * @param familyMap Affected column families.
531    */
532   private void requireGlobalPermission(String request, Action perm, TableName tableName,
533       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
534     User user = getActiveUser();
535     AuthResult result = null;
536     if (authManager.authorize(user, perm)) {
537       result = AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap);
538       result.getParams().setTableName(tableName).setFamilies(familyMap);
539       logResult(result);
540     } else {
541       result = AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap);
542       result.getParams().setTableName(tableName).setFamilies(familyMap);
543       logResult(result);
544       if (authorizationEnabled) {
545         throw new AccessDeniedException("Insufficient permissions for user '" +
546           (user != null ? user.getShortName() : "null") +"' (global, action=" +
547           perm.toString() + ")");
548       }
549     }
550   }
551 
552   /**
553    * Checks that the user has the given global permission. The generated
554    * audit log message will contain context information for the operation
555    * being authorized, based on the given parameters.
556    * @param perm Action being requested
557    * @param namespace
558    */
559   private void requireGlobalPermission(String request, Action perm,
560                                        String namespace) throws IOException {
561     User user = getActiveUser();
562     AuthResult authResult = null;
563     if (authManager.authorize(user, perm)) {
564       authResult = AuthResult.allow(request, "Global check allowed", user, perm, null);
565       authResult.getParams().setNamespace(namespace);
566       logResult(authResult);
567     } else {
568       authResult = AuthResult.deny(request, "Global check failed", user, perm, null);
569       authResult.getParams().setNamespace(namespace);
570       logResult(authResult);
571       if (authorizationEnabled) {
572         throw new AccessDeniedException("Insufficient permissions for user '" +
573           (user != null ? user.getShortName() : "null") +"' (global, action=" +
574           perm.toString() + ")");
575       }
576     }
577   }
578 
579   /**
580    * Checks that the user has the given global or namespace permission.
581    * @param namespace
582    * @param permissions Actions being requested
583    */
584   public void requireNamespacePermission(String request, String namespace,
585       Action... permissions) throws IOException {
586     User user = getActiveUser();
587     AuthResult result = null;
588 
589     for (Action permission : permissions) {
590       if (authManager.authorize(user, namespace, permission)) {
591         result = AuthResult.allow(request, "Namespace permission granted",
592             user, permission, namespace);
593         break;
594       } else {
595         // rest of the world
596         result = AuthResult.deny(request, "Insufficient permissions", user,
597             permission, namespace);
598       }
599     }
600     logResult(result);
601     if (authorizationEnabled && !result.isAllowed()) {
602       throw new AccessDeniedException("Insufficient permissions "
603           + result.toContextString());
604     }
605   }
606 
607   /**
608    * Checks that the user has the given global or namespace permission.
609    * @param namespace
610    * @param permissions Actions being requested
611    */
612   public void requireNamespacePermission(String request, String namespace, TableName tableName,
613       Map<byte[], ? extends Collection<byte[]>> familyMap, Action... permissions)
614       throws IOException {
615     User user = getActiveUser();
616     AuthResult result = null;
617 
618     for (Action permission : permissions) {
619       if (authManager.authorize(user, namespace, permission)) {
620         result = AuthResult.allow(request, "Namespace permission granted",
621             user, permission, namespace);
622         result.getParams().setTableName(tableName).setFamilies(familyMap);
623         break;
624       } else {
625         // rest of the world
626         result = AuthResult.deny(request, "Insufficient permissions", user,
627             permission, namespace);
628         result.getParams().setTableName(tableName).setFamilies(familyMap);
629       }
630     }
631     logResult(result);
632     if (authorizationEnabled && !result.isAllowed()) {
633       throw new AccessDeniedException("Insufficient permissions "
634           + result.toContextString());
635     }
636   }
637 
638   /**
639    * Returns <code>true</code> if the current user is allowed the given action
640    * over at least one of the column qualifiers in the given column families.
641    */
642   private boolean hasFamilyQualifierPermission(User user,
643       Action perm,
644       RegionCoprocessorEnvironment env,
645       Map<byte[], ? extends Collection<byte[]>> familyMap)
646     throws IOException {
647     HRegionInfo hri = env.getRegion().getRegionInfo();
648     TableName tableName = hri.getTable();
649 
650     if (user == null) {
651       return false;
652     }
653 
654     if (familyMap != null && familyMap.size() > 0) {
655       // at least one family must be allowed
656       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
657           familyMap.entrySet()) {
658         if (family.getValue() != null && !family.getValue().isEmpty()) {
659           for (byte[] qualifier : family.getValue()) {
660             if (authManager.matchPermission(user, tableName,
661                 family.getKey(), qualifier, perm)) {
662               return true;
663             }
664           }
665         } else {
666           if (authManager.matchPermission(user, tableName, family.getKey(),
667               perm)) {
668             return true;
669           }
670         }
671       }
672     } else if (LOG.isDebugEnabled()) {
673       LOG.debug("Empty family map passed for permission check");
674     }
675 
676     return false;
677   }
678 
679   private enum OpType {
680     GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
681     GET("get"),
682     EXISTS("exists"),
683     SCAN("scan"),
684     PUT("put"),
685     DELETE("delete"),
686     CHECK_AND_PUT("checkAndPut"),
687     CHECK_AND_DELETE("checkAndDelete"),
688     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
689     APPEND("append"),
690     INCREMENT("increment");
691 
692     private String type;
693 
694     private OpType(String type) {
695       this.type = type;
696     }
697 
698     @Override
699     public String toString() {
700       return type;
701     }
702   }
703 
704   /**
705    * Determine if cell ACLs covered by the operation grant access. This is expensive.
706    * @return false if cell ACLs failed to grant access, true otherwise
707    * @throws IOException
708    */
709   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
710       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
711       throws IOException {
712     if (!cellFeaturesEnabled) {
713       return false;
714     }
715     long cellGrants = 0;
716     User user = getActiveUser();
717     long latestCellTs = 0;
718     Get get = new Get(row);
719     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
720     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
721     // version. We have to get every cell version and check its TS against the TS asked for in
722     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
723     // consider only one such passing cell. In case of Delete we have to consider all the cell
724     // versions under this passing version. When Delete Mutation contains columns which are a
725     // version delete just consider only one version for those column cells.
726     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
727     if (considerCellTs) {
728       get.setMaxVersions();
729     } else {
730       get.setMaxVersions(1);
731     }
732     boolean diffCellTsFromOpTs = false;
733     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
734       byte[] col = entry.getKey();
735       // TODO: HBASE-7114 could possibly unify the collection type in family
736       // maps so we would not need to do this
737       if (entry.getValue() instanceof Set) {
738         Set<byte[]> set = (Set<byte[]>)entry.getValue();
739         if (set == null || set.isEmpty()) {
740           get.addFamily(col);
741         } else {
742           for (byte[] qual: set) {
743             get.addColumn(col, qual);
744           }
745         }
746       } else if (entry.getValue() instanceof List) {
747         List<Cell> list = (List<Cell>)entry.getValue();
748         if (list == null || list.isEmpty()) {
749           get.addFamily(col);
750         } else {
751           // In case of family delete, a Cell will be added into the list with Qualifier as null.
752           for (Cell cell : list) {
753             if (cell.getQualifierLength() == 0
754                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
755                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
756               get.addFamily(col);
757             } else {
758               get.addColumn(col, CellUtil.cloneQualifier(cell));
759             }
760             if (considerCellTs) {
761               long cellTs = cell.getTimestamp();
762               latestCellTs = Math.max(latestCellTs, cellTs);
763               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
764             }
765           }
766         }
767       } else if (entry.getValue() == null) {
768         get.addFamily(col);        
769       } else {
770         throw new RuntimeException("Unhandled collection type " +
771           entry.getValue().getClass().getName());
772       }
773     }
774     // We want to avoid looking into the future. So, if the cells of the
775     // operation specify a timestamp, or the operation itself specifies a
776     // timestamp, then we use the maximum ts found. Otherwise, we bound
777     // the Get to the current server time. We add 1 to the timerange since
778     // the upper bound of a timerange is exclusive yet we need to examine
779     // any cells found there inclusively.
780     long latestTs = Math.max(opTs, latestCellTs);
781     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
782       latestTs = EnvironmentEdgeManager.currentTime();
783     }
784     get.setTimeRange(0, latestTs + 1);
785     // In case of Put operation we set to read all versions. This was done to consider the case
786     // where columns are added with TS other than the Mutation TS. But normally this wont be the
787     // case with Put. There no need to get all versions but get latest version only.
788     if (!diffCellTsFromOpTs && request == OpType.PUT) {
789       get.setMaxVersions(1);
790     }
791     if (LOG.isTraceEnabled()) {
792       LOG.trace("Scanning for cells with " + get);
793     }
794     // This Map is identical to familyMap. The key is a BR rather than byte[].
795     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
796     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
797     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
798     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
799       if (entry.getValue() instanceof List) {
800         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
801       }
802     }
803     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
804     List<Cell> cells = Lists.newArrayList();
805     Cell prevCell = null;
806     ByteRange curFam = new SimpleMutableByteRange();
807     boolean curColAllVersions = (request == OpType.DELETE);
808     long curColCheckTs = opTs;
809     boolean foundColumn = false;
810     try {
811       boolean more = false;
812       ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
813 
814       do {
815         cells.clear();
816         // scan with limit as 1 to hold down memory use on wide rows
817         more = scanner.next(cells, scannerContext);
818         for (Cell cell: cells) {
819           if (LOG.isTraceEnabled()) {
820             LOG.trace("Found cell " + cell);
821           }
822           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
823           if (colChange) foundColumn = false;
824           prevCell = cell;
825           if (!curColAllVersions && foundColumn) {
826             continue;
827           }
828           if (colChange && considerCellTs) {
829             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
830             List<Cell> cols = familyMap1.get(curFam);
831             for (Cell col : cols) {
832               // null/empty qualifier is used to denote a Family delete. The TS and delete type
833               // associated with this is applicable for all columns within the family. That is
834               // why the below (col.getQualifierLength() == 0) check.
835               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
836                   || CellUtil.matchingQualifier(cell, col)) {
837                 byte type = col.getTypeByte();
838                 if (considerCellTs) {
839                   curColCheckTs = col.getTimestamp();
840                 }
841                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
842                 // a version delete for a column no need to check all the covering cells within
843                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
844                 // One version delete types are Delete/DeleteFamilyVersion
845                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
846                     || (KeyValue.Type.DeleteFamily.getCode() == type);
847                 break;
848               }
849             }
850           }
851           if (cell.getTimestamp() > curColCheckTs) {
852             // Just ignore this cell. This is not a covering cell.
853             continue;
854           }
855           foundColumn = true;
856           for (Action action: actions) {
857             // Are there permissions for this user for the cell?
858             if (!authManager.authorize(user, getTableName(e), cell, action)) {
859               // We can stop if the cell ACL denies access
860               return false;
861             }
862           }
863           cellGrants++;
864         }
865       } while (more);
866     } catch (AccessDeniedException ex) {
867       throw ex;
868     } catch (IOException ex) {
869       LOG.error("Exception while getting cells to calculate covering permission", ex);
870     } finally {
871       scanner.close();
872     }
873     // We should not authorize unless we have found one or more cell ACLs that
874     // grant access. This code is used to check for additional permissions
875     // after no table or CF grants are found.
876     return cellGrants > 0;
877   }
878 
879   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
880     // Iterate over the entries in the familyMap, replacing the cells therein
881     // with new cells including the ACL data
882     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
883       List<Cell> newCells = Lists.newArrayList();
884       for (Cell cell: e.getValue()) {
885         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
886         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
887         if (cell.getTagsLength() > 0) {
888           Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
889             cell.getTagsOffset(), cell.getTagsLength());
890           while (tagIterator.hasNext()) {
891             tags.add(tagIterator.next());
892           }
893         }
894         newCells.add(new TagRewriteCell(cell, Tag.fromList(tags)));
895       }
896       // This is supposed to be safe, won't CME
897       e.setValue(newCells);
898     }
899   }
900 
901   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
902   // type is reserved and should not be explicitly set by user.
903   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
904     // No need to check if we're not going to throw
905     if (!authorizationEnabled) {
906       m.setAttribute(TAG_CHECK_PASSED, TRUE);
907       return;
908     }
909     // Superusers are allowed to store cells unconditionally.
910     if (Superusers.isSuperUser(user)) {
911       m.setAttribute(TAG_CHECK_PASSED, TRUE);
912       return;
913     }
914     // We already checked (prePut vs preBatchMutation)
915     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
916       return;
917     }
918     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
919       Cell cell = cellScanner.current();
920       if (cell.getTagsLength() > 0) {
921         Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
922           cell.getTagsLength());
923         while (tagsItr.hasNext()) {
924           if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
925             throw new AccessDeniedException("Mutation contains cell with reserved type tag");
926           }
927         }
928       }
929     }
930     m.setAttribute(TAG_CHECK_PASSED, TRUE);
931   }
932 
933   /* ---- MasterObserver implementation ---- */
934   @Override
935   public void start(CoprocessorEnvironment env) throws IOException {
936     CompoundConfiguration conf = new CompoundConfiguration();
937     conf.add(env.getConfiguration());
938 
939     authorizationEnabled = isAuthorizationSupported(conf);
940     if (!authorizationEnabled) {
941       LOG.warn("The AccessController has been loaded with authorization checks disabled.");
942     }
943 
944     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
945       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
946 
947     cellFeaturesEnabled = (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
948     if (!cellFeaturesEnabled) {
949       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
950           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
951           + " accordingly.");
952     }
953 
954     ZooKeeperWatcher zk = null;
955     if (env instanceof MasterCoprocessorEnvironment) {
956       // if running on HMaster
957       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
958       zk = mEnv.getMasterServices().getZooKeeper();
959     } else if (env instanceof RegionServerCoprocessorEnvironment) {
960       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
961       zk = rsEnv.getRegionServerServices().getZooKeeper();
962     } else if (env instanceof RegionCoprocessorEnvironment) {
963       // if running at region
964       regionEnv = (RegionCoprocessorEnvironment) env;
965       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
966       zk = regionEnv.getRegionServerServices().getZooKeeper();
967       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
968         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
969     }
970 
971     // set the user-provider.
972     this.userProvider = UserProvider.instantiate(env.getConfiguration());
973 
974     // If zk is null or IOException while obtaining auth manager,
975     // throw RuntimeException so that the coprocessor is unloaded.
976     if (zk != null) {
977       try {
978         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
979       } catch (IOException ioe) {
980         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
981       }
982     } else {
983       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
984     }
985 
986     tableAcls = new MapMaker().weakValues().makeMap();
987   }
988 
989   @Override
990   public void stop(CoprocessorEnvironment env) {
991 
992   }
993 
994   @Override
995   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
996       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
997     Set<byte[]> families = desc.getFamiliesKeys();
998     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
999     for (byte[] family: families) {
1000       familyMap.put(family, null);
1001     }
1002     requireNamespacePermission("createTable", desc.getTableName().getNamespaceAsString(),
1003         desc.getTableName(), familyMap, Action.CREATE);
1004   }
1005 
1006   @Override
1007   public void postCreateTableHandler(final ObserverContext<MasterCoprocessorEnvironment> c,
1008       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
1009     // When AC is used, it should be configured as the 1st CP.
1010     // In Master, the table operations like create, are handled by a Thread pool but the max size
1011     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
1012     // sequentially only.
1013     // Related code in HMaster#startServiceThreads
1014     // {code}
1015     //   // We depend on there being only one instance of this executor running
1016     //   // at a time. To do concurrency, would need fencing of enable/disable of
1017     //   // tables.
1018     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1019     // {code}
1020     // In future if we change this pool to have more threads, then there is a chance for thread,
1021     // creating acl table, getting delayed and by that time another table creation got over and
1022     // this hook is getting called. In such a case, we will need a wait logic here which will
1023     // wait till the acl table is created.
1024     if (AccessControlLists.isAclTable(desc)) {
1025       this.aclTabAvailable = true;
1026     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
1027       if (!aclTabAvailable) {
1028         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
1029             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
1030             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
1031       } else {
1032         String owner = desc.getOwnerString();
1033         // default the table owner to current user, if not specified.
1034         if (owner == null)
1035           owner = getActiveUser().getShortName();
1036         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1037             desc.getTableName(), null, Action.values());
1038         // switch to the real hbase master user for doing the RPC on the ACL table
1039         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1040           @Override
1041           public Void run() throws Exception {
1042             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
1043                 userperm);
1044             return null;
1045           }
1046         });
1047       }
1048     }
1049   }
1050 
1051   @Override
1052   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1053       throws IOException {
1054     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1055   }
1056 
1057   @Override
1058   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
1059       final TableName tableName) throws IOException {
1060     final Configuration conf = c.getEnvironment().getConfiguration();
1061     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1062       @Override
1063       public Void run() throws Exception {
1064         AccessControlLists.removeTablePermissions(conf, tableName);
1065         return null;
1066       }
1067     });
1068     this.authManager.getZKPermissionWatcher().deleteTableACLNode(tableName);
1069   }
1070 
1071   @Override
1072   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
1073       final TableName tableName) throws IOException {
1074     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1075 
1076     final Configuration conf = c.getEnvironment().getConfiguration();
1077     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1078       @Override
1079       public Void run() throws Exception {
1080         List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
1081         if (acls != null) {
1082           tableAcls.put(tableName, acls);
1083         }
1084         return null;
1085       }
1086     });
1087   }
1088 
1089   @Override
1090   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
1091       final TableName tableName) throws IOException {
1092     final Configuration conf = ctx.getEnvironment().getConfiguration();
1093     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1094       @Override
1095       public Void run() throws Exception {
1096         List<UserPermission> perms = tableAcls.get(tableName);
1097         if (perms != null) {
1098           for (UserPermission perm : perms) {
1099             AccessControlLists.addUserPermission(conf, perm);
1100           }
1101         }
1102         tableAcls.remove(tableName);
1103         return null;
1104       }
1105     });
1106   }
1107 
1108   @Override
1109   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1110       HTableDescriptor htd) throws IOException {
1111     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1112   }
1113 
1114   @Override
1115   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
1116       TableName tableName, final HTableDescriptor htd) throws IOException {
1117     final Configuration conf = c.getEnvironment().getConfiguration();
1118     // default the table owner to current user, if not specified.
1119     final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
1120       getActiveUser().getShortName();
1121     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1122       @Override
1123       public Void run() throws Exception {
1124         UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1125           htd.getTableName(), null, Action.values());
1126         AccessControlLists.addUserPermission(conf, userperm);
1127         return null;
1128       }
1129     });
1130   }
1131 
1132   @Override
1133   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1134       HColumnDescriptor column) throws IOException {
1135     requireTablePermission("addColumn", tableName, column.getName(), null, Action.ADMIN,
1136         Action.CREATE);
1137   }
1138 
1139   @Override
1140   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1141       HColumnDescriptor descriptor) throws IOException {
1142     requirePermission("modifyColumn", tableName, descriptor.getName(), null, Action.ADMIN,
1143       Action.CREATE);
1144   }
1145 
1146   @Override
1147   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1148       byte[] col) throws IOException {
1149     requirePermission("deleteColumn", tableName, col, null, Action.ADMIN, Action.CREATE);
1150   }
1151 
1152   @Override
1153   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
1154       final TableName tableName, final byte[] col) throws IOException {
1155     final Configuration conf = c.getEnvironment().getConfiguration();
1156     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1157       @Override
1158       public Void run() throws Exception {
1159         AccessControlLists.removeTablePermissions(conf, tableName, col);
1160         return null;
1161       }
1162     });
1163   }
1164 
1165   @Override
1166   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1167       throws IOException {
1168     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1169   }
1170 
1171   @Override
1172   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1173       throws IOException {
1174     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1175       // We have to unconditionally disallow disable of the ACL table when we are installed,
1176       // even if not enforcing authorizations. We are still allowing grants and revocations,
1177       // checking permissions and logging audit messages, etc. If the ACL table is not
1178       // available we will fail random actions all over the place.
1179       throw new AccessDeniedException("Not allowed to disable "
1180           + AccessControlLists.ACL_TABLE_NAME + " table with AccessController installed");
1181     }
1182     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1183   }
1184 
1185   @Override
1186   public void preAbortProcedure(
1187       ObserverContext<MasterCoprocessorEnvironment> ctx,
1188       final ProcedureExecutor<MasterProcedureEnv> procEnv,
1189       final long procId) throws IOException {
1190     if (!procEnv.isProcedureOwner(procId, getActiveUser())) {
1191       // If the user is not the procedure owner, then we should further probe whether
1192       // he can abort the procedure.
1193       requirePermission("abortProcedure", Action.ADMIN);
1194     }
1195   }
1196 
1197   @Override
1198   public void postAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx)
1199       throws IOException {
1200     // There is nothing to do at this time after the procedure abort request was sent.
1201   }
1202 
1203   @Override
1204   public void preListProcedures(ObserverContext<MasterCoprocessorEnvironment> ctx)
1205       throws IOException {
1206     // We are delegating the authorization check to postListProcedures as we don't have
1207     // any concrete set of procedures to work with
1208   }
1209 
1210   @Override
1211   public void postListProcedures(
1212       ObserverContext<MasterCoprocessorEnvironment> ctx,
1213       List<ProcedureInfo> procInfoList) throws IOException {
1214     if (procInfoList.isEmpty()) {
1215       return;
1216     }
1217 
1218     // Retains only those which passes authorization checks, as the checks weren't done as part
1219     // of preListProcedures.
1220     Iterator<ProcedureInfo> itr = procInfoList.iterator();
1221     User user = getActiveUser();
1222     while (itr.hasNext()) {
1223       ProcedureInfo procInfo = itr.next();
1224       try {
1225         if (!ProcedureInfo.isProcedureOwner(procInfo, user)) {
1226           // If the user is not the procedure owner, then we should further probe whether
1227           // he can see the procedure.
1228           requirePermission("listProcedures", Action.ADMIN);
1229         }
1230       } catch (AccessDeniedException e) {
1231         itr.remove();
1232       }
1233     }
1234   }
1235 
1236   @Override
1237   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1238       ServerName srcServer, ServerName destServer) throws IOException {
1239     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1240   }
1241 
1242   @Override
1243   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1244       throws IOException {
1245     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1246   }
1247 
1248   @Override
1249   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1250       boolean force) throws IOException {
1251     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1252   }
1253 
1254   @Override
1255   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1256       HRegionInfo regionInfo) throws IOException {
1257     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1258   }
1259 
1260   @Override
1261   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1262       throws IOException {
1263     requirePermission("balance", Action.ADMIN);
1264   }
1265 
1266   @Override
1267   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1268       boolean newValue) throws IOException {
1269     requirePermission("balanceSwitch", Action.ADMIN);
1270     return newValue;
1271   }
1272 
1273   @Override
1274   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1275       throws IOException {
1276     requirePermission("shutdown", Action.ADMIN);
1277   }
1278 
1279   @Override
1280   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1281       throws IOException {
1282     requirePermission("stopMaster", Action.ADMIN);
1283   }
1284 
1285   @Override
1286   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1287       throws IOException {
1288     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1289       .getConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1290       // initialize the ACL storage table
1291       AccessControlLists.createACLTable(ctx.getEnvironment().getMasterServices());
1292     } else {
1293       aclTabAvailable = true;
1294     }
1295   }
1296 
1297   @Override
1298   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1299       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1300       throws IOException {
1301     requirePermission("snapshot", hTableDescriptor.getTableName(), null, null,
1302       Permission.Action.ADMIN);
1303   }
1304 
1305   @Override
1306   public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1307       final SnapshotDescription snapshot) throws IOException {
1308     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1309       // list it, if user is the owner of snapshot
1310     } else {
1311       requirePermission("listSnapshot", Action.ADMIN);
1312     }
1313   }
1314   
1315   @Override
1316   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1317       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1318       throws IOException {
1319     requirePermission("clone", Action.ADMIN);
1320   }
1321 
1322   @Override
1323   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1324       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1325       throws IOException {
1326     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1327       requirePermission("restoreSnapshot", hTableDescriptor.getTableName(), null, null,
1328         Permission.Action.ADMIN);
1329     } else {
1330       requirePermission("restore", Action.ADMIN);
1331     }
1332   }
1333 
1334   @Override
1335   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1336       final SnapshotDescription snapshot) throws IOException {
1337     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1338       // Snapshot owner is allowed to delete the snapshot
1339       // TODO: We are not logging this for audit
1340     } else {
1341       requirePermission("deleteSnapshot", Action.ADMIN);
1342     }
1343   }
1344 
1345   @Override
1346   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1347       NamespaceDescriptor ns) throws IOException {
1348     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1349   }
1350 
1351   @Override
1352   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1353       throws IOException {
1354     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1355   }
1356 
1357   @Override
1358   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1359       final String namespace) throws IOException {
1360     final Configuration conf = ctx.getEnvironment().getConfiguration();
1361     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1362       @Override
1363       public Void run() throws Exception {
1364         AccessControlLists.removeNamespacePermissions(conf, namespace);
1365         return null;
1366       }
1367     });
1368     this.authManager.getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
1369     LOG.info(namespace + " entry deleted in "+AccessControlLists.ACL_TABLE_NAME+" table.");
1370   }
1371 
1372   @Override
1373   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1374       NamespaceDescriptor ns) throws IOException {
1375     // We require only global permission so that 
1376     // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1377     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1378   }
1379 
1380   @Override
1381   public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1382       throws IOException {
1383     requireNamespacePermission("getNamespaceDescriptor", namespace, Action.ADMIN);
1384   }
1385 
1386   @Override
1387   public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1388       List<NamespaceDescriptor> descriptors) throws IOException {
1389     // Retains only those which passes authorization checks, as the checks weren't done as part
1390     // of preGetTableDescriptors.
1391     Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1392     while (itr.hasNext()) {
1393       NamespaceDescriptor desc = itr.next();
1394       try {
1395         requireNamespacePermission("listNamespaces", desc.getName(), Action.ADMIN);
1396       } catch (AccessDeniedException e) {
1397         itr.remove();
1398       }
1399     }
1400   }
1401 
1402   @Override
1403   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1404       final TableName tableName) throws IOException {
1405     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1406   }
1407 
1408   /* ---- RegionObserver implementation ---- */
1409 
1410   @Override
1411   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1412       throws IOException {
1413     RegionCoprocessorEnvironment env = e.getEnvironment();
1414     final Region region = env.getRegion();
1415     if (region == null) {
1416       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1417     } else {
1418       HRegionInfo regionInfo = region.getRegionInfo();
1419       if (regionInfo.getTable().isSystemTable()) {
1420         checkSystemOrSuperUser();
1421       } else {
1422         requirePermission("preOpen", Action.ADMIN);
1423       }
1424     }
1425   }
1426 
1427   @Override
1428   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1429     RegionCoprocessorEnvironment env = c.getEnvironment();
1430     final Region region = env.getRegion();
1431     if (region == null) {
1432       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1433       return;
1434     }
1435     if (AccessControlLists.isAclRegion(region)) {
1436       aclRegion = true;
1437       // When this region is under recovering state, initialize will be handled by postLogReplay
1438       if (!region.isRecovering()) {
1439         try {
1440           initialize(env);
1441         } catch (IOException ex) {
1442           // if we can't obtain permissions, it's better to fail
1443           // than perform checks incorrectly
1444           throw new RuntimeException("Failed to initialize permissions cache", ex);
1445         }
1446       }
1447     } else {
1448       initialized = true;
1449     }
1450   }
1451 
1452   @Override
1453   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1454     if (aclRegion) {
1455       try {
1456         initialize(c.getEnvironment());
1457       } catch (IOException ex) {
1458         // if we can't obtain permissions, it's better to fail
1459         // than perform checks incorrectly
1460         throw new RuntimeException("Failed to initialize permissions cache", ex);
1461       }
1462     }
1463   }
1464 
1465   @Override
1466   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1467     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1468         Action.CREATE);
1469   }
1470 
1471   @Override
1472   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1473     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1474   }
1475 
1476   @Override
1477   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1478       byte[] splitRow) throws IOException {
1479     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1480   }
1481 
1482   @Override
1483   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1484       final Store store, final InternalScanner scanner, final ScanType scanType)
1485           throws IOException {
1486     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1487         Action.CREATE);
1488     return scanner;
1489   }
1490 
1491   @Override
1492   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1493       final byte [] row, final byte [] family, final Result result)
1494       throws IOException {
1495     assert family != null;
1496     RegionCoprocessorEnvironment env = c.getEnvironment();
1497     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, null);
1498     User user = getActiveUser();
1499     AuthResult authResult = permissionGranted(OpType.GET_CLOSEST_ROW_BEFORE, user, env, families,
1500       Action.READ);
1501     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1502       authResult.setAllowed(checkCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, env, row,
1503         families, HConstants.LATEST_TIMESTAMP, Action.READ));
1504       authResult.setReason("Covering cell set");
1505     }
1506     logResult(authResult);
1507     if (authorizationEnabled && !authResult.isAllowed()) {
1508       throw new AccessDeniedException("Insufficient permissions " +
1509         authResult.toContextString());
1510     }
1511   }
1512 
1513   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1514       final Query query, OpType opType) throws IOException {
1515     Filter filter = query.getFilter();
1516     // Don't wrap an AccessControlFilter
1517     if (filter != null && filter instanceof AccessControlFilter) {
1518       return;
1519     }
1520     User user = getActiveUser();
1521     RegionCoprocessorEnvironment env = c.getEnvironment();
1522     Map<byte[],? extends Collection<byte[]>> families = null;
1523     switch (opType) {
1524     case GET:
1525     case EXISTS:
1526       families = ((Get)query).getFamilyMap();
1527       break;
1528     case SCAN:
1529       families = ((Scan)query).getFamilyMap();
1530       break;
1531     default:
1532       throw new RuntimeException("Unhandled operation " + opType);
1533     }
1534     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1535     Region region = getRegion(env);
1536     TableName table = getTableName(region);
1537     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1538     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1539       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1540     }
1541     if (!authResult.isAllowed()) {
1542       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1543         // Old behavior: Scan with only qualifier checks if we have partial
1544         // permission. Backwards compatible behavior is to throw an
1545         // AccessDeniedException immediately if there are no grants for table
1546         // or CF or CF+qual. Only proceed with an injected filter if there are
1547         // grants for qualifiers. Otherwise we will fall through below and log
1548         // the result and throw an ADE. We may end up checking qualifier
1549         // grants three times (permissionGranted above, here, and in the
1550         // filter) but that's the price of backwards compatibility.
1551         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1552           authResult.setAllowed(true);
1553           authResult.setReason("Access allowed with filter");
1554           // Only wrap the filter if we are enforcing authorizations
1555           if (authorizationEnabled) {
1556             Filter ourFilter = new AccessControlFilter(authManager, user, table,
1557               AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1558               cfVsMaxVersions);
1559             // wrap any existing filter
1560             if (filter != null) {
1561               ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1562                 Lists.newArrayList(ourFilter, filter));
1563             }
1564             switch (opType) {
1565               case GET:
1566               case EXISTS:
1567                 ((Get)query).setFilter(ourFilter);
1568                 break;
1569               case SCAN:
1570                 ((Scan)query).setFilter(ourFilter);
1571                 break;
1572               default:
1573                 throw new RuntimeException("Unhandled operation " + opType);
1574             }
1575           }
1576         }
1577       } else {
1578         // New behavior: Any access we might be granted is more fine-grained
1579         // than whole table or CF. Simply inject a filter and return what is
1580         // allowed. We will not throw an AccessDeniedException. This is a
1581         // behavioral change since 0.96.
1582         authResult.setAllowed(true);
1583         authResult.setReason("Access allowed with filter");
1584         // Only wrap the filter if we are enforcing authorizations
1585         if (authorizationEnabled) {
1586           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1587             AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1588           // wrap any existing filter
1589           if (filter != null) {
1590             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1591               Lists.newArrayList(ourFilter, filter));
1592           }
1593           switch (opType) {
1594             case GET:
1595             case EXISTS:
1596               ((Get)query).setFilter(ourFilter);
1597               break;
1598             case SCAN:
1599               ((Scan)query).setFilter(ourFilter);
1600               break;
1601             default:
1602               throw new RuntimeException("Unhandled operation " + opType);
1603           }
1604         }
1605       }
1606     }
1607 
1608     logResult(authResult);
1609     if (authorizationEnabled && !authResult.isAllowed()) {
1610       throw new AccessDeniedException("Insufficient permissions for user '"
1611           + (user != null ? user.getShortName() : "null")
1612           + "' (table=" + table + ", action=READ)");
1613     }
1614   }
1615 
1616   @Override
1617   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1618       final Get get, final List<Cell> result) throws IOException {
1619     internalPreRead(c, get, OpType.GET);
1620   }
1621 
1622   @Override
1623   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1624       final Get get, final boolean exists) throws IOException {
1625     internalPreRead(c, get, OpType.EXISTS);
1626     return exists;
1627   }
1628 
1629   @Override
1630   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1631       final Put put, final WALEdit edit, final Durability durability)
1632       throws IOException {
1633     User user = getActiveUser();
1634     checkForReservedTagPresence(user, put);
1635 
1636     // Require WRITE permission to the table, CF, or top visible value, if any.
1637     // NOTE: We don't need to check the permissions for any earlier Puts
1638     // because we treat the ACLs in each Put as timestamped like any other
1639     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1640     // change the ACL of any previous Put. This allows simple evolution of
1641     // security policy over time without requiring expensive updates.
1642     RegionCoprocessorEnvironment env = c.getEnvironment();
1643     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1644     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1645     logResult(authResult);
1646     if (!authResult.isAllowed()) {
1647       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1648         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1649       } else if (authorizationEnabled) {
1650         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1651       }
1652     }
1653 
1654     // Add cell ACLs from the operation to the cells themselves
1655     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1656     if (bytes != null) {
1657       if (cellFeaturesEnabled) {
1658         addCellPermissions(bytes, put.getFamilyCellMap());
1659       } else {
1660         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1661       }
1662     }
1663   }
1664 
1665   @Override
1666   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1667       final Put put, final WALEdit edit, final Durability durability) {
1668     if (aclRegion) {
1669       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1670     }
1671   }
1672 
1673   @Override
1674   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1675       final Delete delete, final WALEdit edit, final Durability durability)
1676       throws IOException {
1677     // An ACL on a delete is useless, we shouldn't allow it
1678     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1679       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1680     }
1681     // Require WRITE permissions on all cells covered by the delete. Unlike
1682     // for Puts we need to check all visible prior versions, because a major
1683     // compaction could remove them. If the user doesn't have permission to
1684     // overwrite any of the visible versions ('visible' defined as not covered
1685     // by a tombstone already) then we have to disallow this operation.
1686     RegionCoprocessorEnvironment env = c.getEnvironment();
1687     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1688     User user = getActiveUser();
1689     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1690     logResult(authResult);
1691     if (!authResult.isAllowed()) {
1692       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1693         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1694       } else if (authorizationEnabled) {
1695         throw new AccessDeniedException("Insufficient permissions " +
1696           authResult.toContextString());
1697       }
1698     }
1699   }
1700 
1701   @Override
1702   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1703       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1704     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1705       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1706       for (int i = 0; i < miniBatchOp.size(); i++) {
1707         Mutation m = miniBatchOp.getOperation(i);
1708         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1709           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1710           // perm check
1711           OpType opType;
1712           if (m instanceof Put) {
1713             checkForReservedTagPresence(getActiveUser(), m);
1714             opType = OpType.PUT;
1715           } else {
1716             opType = OpType.DELETE;
1717           }
1718           AuthResult authResult = null;
1719           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(),
1720             m.getFamilyCellMap(), m.getTimeStamp(), Action.WRITE)) {
1721             authResult = AuthResult.allow(opType.toString(), "Covering cell set",
1722               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1723           } else {
1724             authResult = AuthResult.deny(opType.toString(), "Covering cell set",
1725               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1726           }
1727           logResult(authResult);
1728           if (authorizationEnabled && !authResult.isAllowed()) {
1729             throw new AccessDeniedException("Insufficient permissions "
1730               + authResult.toContextString());
1731           }
1732         }
1733       }
1734     }
1735   }
1736 
1737   @Override
1738   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1739       final Delete delete, final WALEdit edit, final Durability durability)
1740       throws IOException {
1741     if (aclRegion) {
1742       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1743     }
1744   }
1745 
1746   @Override
1747   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1748       final byte [] row, final byte [] family, final byte [] qualifier,
1749       final CompareFilter.CompareOp compareOp,
1750       final ByteArrayComparable comparator, final Put put,
1751       final boolean result) throws IOException {
1752     User user = getActiveUser();
1753     checkForReservedTagPresence(user, put);
1754 
1755     // Require READ and WRITE permissions on the table, CF, and KV to update
1756     RegionCoprocessorEnvironment env = c.getEnvironment();
1757     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1758     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1759       Action.READ, Action.WRITE);
1760     logResult(authResult);
1761     if (!authResult.isAllowed()) {
1762       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1763         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1764       } else if (authorizationEnabled) {
1765         throw new AccessDeniedException("Insufficient permissions " +
1766           authResult.toContextString());
1767       }
1768     }
1769 
1770     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1771     if (bytes != null) {
1772       if (cellFeaturesEnabled) {
1773         addCellPermissions(bytes, put.getFamilyCellMap());
1774       } else {
1775         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1776       }
1777     }
1778     return result;
1779   }
1780 
1781   @Override
1782   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1783       final byte[] row, final byte[] family, final byte[] qualifier,
1784       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1785       final boolean result) throws IOException {
1786     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1787       // We had failure with table, cf and q perm checks and now giving a chance for cell
1788       // perm check
1789       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1790       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1791       AuthResult authResult = null;
1792       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1793           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1794         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1795             getActiveUser(), Action.READ, table, families);
1796       } else {
1797         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1798             getActiveUser(), Action.READ, table, families);
1799       }
1800       logResult(authResult);
1801       if (authorizationEnabled && !authResult.isAllowed()) {
1802         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1803       }
1804     }
1805     return result;
1806   }
1807 
1808   @Override
1809   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1810       final byte [] row, final byte [] family, final byte [] qualifier,
1811       final CompareFilter.CompareOp compareOp,
1812       final ByteArrayComparable comparator, final Delete delete,
1813       final boolean result) throws IOException {
1814     // An ACL on a delete is useless, we shouldn't allow it
1815     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1816       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1817           delete.toString());
1818     }
1819     // Require READ and WRITE permissions on the table, CF, and the KV covered
1820     // by the delete
1821     RegionCoprocessorEnvironment env = c.getEnvironment();
1822     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1823     User user = getActiveUser();
1824     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1825       Action.READ, Action.WRITE);
1826     logResult(authResult);
1827     if (!authResult.isAllowed()) {
1828       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1829         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1830       } else if (authorizationEnabled) {
1831         throw new AccessDeniedException("Insufficient permissions " +
1832           authResult.toContextString());
1833       }
1834     }
1835     return result;
1836   }
1837 
1838   @Override
1839   public boolean preCheckAndDeleteAfterRowLock(
1840       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1841       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1842       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1843       throws IOException {
1844     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1845       // We had failure with table, cf and q perm checks and now giving a chance for cell
1846       // perm check
1847       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1848       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1849       AuthResult authResult = null;
1850       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1851           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1852         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1853             getActiveUser(), Action.READ, table, families);
1854       } else {
1855         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1856             getActiveUser(), Action.READ, table, families);
1857       }
1858       logResult(authResult);
1859       if (authorizationEnabled && !authResult.isAllowed()) {
1860         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1861       }
1862     }
1863     return result;
1864   }
1865 
1866   @Override
1867   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1868       final byte [] row, final byte [] family, final byte [] qualifier,
1869       final long amount, final boolean writeToWAL)
1870       throws IOException {
1871     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1872     // incremented value
1873     RegionCoprocessorEnvironment env = c.getEnvironment();
1874     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1875     User user = getActiveUser();
1876     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1877       Action.WRITE);
1878     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1879       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1880         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1881       authResult.setReason("Covering cell set");
1882     }
1883     logResult(authResult);
1884     if (authorizationEnabled && !authResult.isAllowed()) {
1885       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1886     }
1887     return -1;
1888   }
1889 
1890   @Override
1891   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1892       throws IOException {
1893     User user = getActiveUser();
1894     checkForReservedTagPresence(user, append);
1895 
1896     // Require WRITE permission to the table, CF, and the KV to be appended
1897     RegionCoprocessorEnvironment env = c.getEnvironment();
1898     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1899     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1900     logResult(authResult);
1901     if (!authResult.isAllowed()) {
1902       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1903         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1904       } else if (authorizationEnabled)  {
1905         throw new AccessDeniedException("Insufficient permissions " +
1906           authResult.toContextString());
1907       }
1908     }
1909 
1910     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1911     if (bytes != null) {
1912       if (cellFeaturesEnabled) {
1913         addCellPermissions(bytes, append.getFamilyCellMap());
1914       } else {
1915         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1916       }
1917     }
1918 
1919     return null;
1920   }
1921 
1922   @Override
1923   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1924       final Append append) throws IOException {
1925     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1926       // We had failure with table, cf and q perm checks and now giving a chance for cell
1927       // perm check
1928       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1929       AuthResult authResult = null;
1930       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1931           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1932         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1933             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1934       } else {
1935         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1936             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1937       }
1938       logResult(authResult);
1939       if (authorizationEnabled && !authResult.isAllowed()) {
1940         throw new AccessDeniedException("Insufficient permissions " +
1941           authResult.toContextString());
1942       }
1943     }
1944     return null;
1945   }
1946 
1947   @Override
1948   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1949       final Increment increment)
1950       throws IOException {
1951     User user = getActiveUser();
1952     checkForReservedTagPresence(user, increment);
1953 
1954     // Require WRITE permission to the table, CF, and the KV to be replaced by
1955     // the incremented value
1956     RegionCoprocessorEnvironment env = c.getEnvironment();
1957     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1958     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1959       Action.WRITE);
1960     logResult(authResult);
1961     if (!authResult.isAllowed()) {
1962       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1963         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1964       } else if (authorizationEnabled) {
1965         throw new AccessDeniedException("Insufficient permissions " +
1966           authResult.toContextString());
1967       }
1968     }
1969 
1970     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1971     if (bytes != null) {
1972       if (cellFeaturesEnabled) {
1973         addCellPermissions(bytes, increment.getFamilyCellMap());
1974       } else {
1975         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1976       }
1977     }
1978 
1979     return null;
1980   }
1981 
1982   @Override
1983   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1984       final Increment increment) throws IOException {
1985     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1986       // We had failure with table, cf and q perm checks and now giving a chance for cell
1987       // perm check
1988       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1989       AuthResult authResult = null;
1990       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1991           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1992         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1993             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1994       } else {
1995         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1996             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1997       }
1998       logResult(authResult);
1999       if (authorizationEnabled && !authResult.isAllowed()) {
2000         throw new AccessDeniedException("Insufficient permissions " +
2001           authResult.toContextString());
2002       }
2003     }
2004     return null;
2005   }
2006 
2007   @Override
2008   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
2009       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
2010     // If the HFile version is insufficient to persist tags, we won't have any
2011     // work to do here
2012     if (!cellFeaturesEnabled) {
2013       return newCell;
2014     }
2015 
2016     // Collect any ACLs from the old cell
2017     List<Tag> tags = Lists.newArrayList();
2018     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
2019     if (oldCell != null) {
2020       // Save an object allocation where we can
2021       if (oldCell.getTagsLength() > 0) {
2022         Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell.getTagsArray(),
2023           oldCell.getTagsOffset(), oldCell.getTagsLength());
2024         while (tagIterator.hasNext()) {
2025           Tag tag = tagIterator.next();
2026           if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
2027             // Not an ACL tag, just carry it through
2028             if (LOG.isTraceEnabled()) {
2029               LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
2030                 " length " + tag.getTagLength());
2031             }
2032             tags.add(tag);
2033           } else {
2034             // Merge the perms from the older ACL into the current permission set
2035             // TODO: The efficiency of this can be improved. Don't build just to unpack
2036             // again, use the builder
2037             AccessControlProtos.UsersAndPermissions.Builder builder =
2038               AccessControlProtos.UsersAndPermissions.newBuilder();
2039             ProtobufUtil.mergeFrom(builder, tag.getBuffer(), tag.getTagOffset(), tag.getTagLength());
2040             ListMultimap<String,Permission> kvPerms =
2041               ProtobufUtil.toUsersAndPermissions(builder.build());
2042             perms.putAll(kvPerms);
2043           }
2044         }
2045       }
2046     }
2047 
2048     // Do we have an ACL on the operation?
2049     byte[] aclBytes = mutation.getACL();
2050     if (aclBytes != null) {
2051       // Yes, use it
2052       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
2053     } else {
2054       // No, use what we carried forward
2055       if (perms != null) {
2056         // TODO: If we collected ACLs from more than one tag we may have a
2057         // List<Permission> of size > 1, this can be collapsed into a single
2058         // Permission
2059         if (LOG.isTraceEnabled()) {
2060           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
2061         }
2062         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
2063           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
2064       }
2065     }
2066 
2067     // If we have no tags to add, just return
2068     if (tags.isEmpty()) {
2069       return newCell;
2070     }
2071 
2072     Cell rewriteCell = new TagRewriteCell(newCell, Tag.fromList(tags));
2073     return rewriteCell;
2074   }
2075 
2076   @Override
2077   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2078       final Scan scan, final RegionScanner s) throws IOException {
2079     internalPreRead(c, scan, OpType.SCAN);
2080     return s;
2081   }
2082 
2083   @Override
2084   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2085       final Scan scan, final RegionScanner s) throws IOException {
2086     User user = getActiveUser();
2087     if (user != null && user.getShortName() != null) {
2088       // store reference to scanner owner for later checks
2089       scannerOwners.put(s, user.getShortName());
2090     }
2091     return s;
2092   }
2093 
2094   @Override
2095   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
2096       final InternalScanner s, final List<Result> result,
2097       final int limit, final boolean hasNext) throws IOException {
2098     requireScannerOwner(s);
2099     return hasNext;
2100   }
2101 
2102   @Override
2103   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2104       final InternalScanner s) throws IOException {
2105     requireScannerOwner(s);
2106   }
2107 
2108   @Override
2109   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2110       final InternalScanner s) throws IOException {
2111     // clean up any associated owner mapping
2112     scannerOwners.remove(s);
2113   }
2114 
2115   /**
2116    * Verify, when servicing an RPC, that the caller is the scanner owner.
2117    * If so, we assume that access control is correctly enforced based on
2118    * the checks performed in preScannerOpen()
2119    */
2120   private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
2121     if (!RpcServer.isInRpcCallContext())
2122       return;
2123     String requestUserName = RpcServer.getRequestUserName();
2124     String owner = scannerOwners.get(s);
2125     if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
2126       throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
2127     }
2128   }
2129 
2130   /**
2131    * Verifies user has CREATE privileges on
2132    * the Column Families involved in the bulkLoadHFile
2133    * request. Specific Column Write privileges are presently
2134    * ignored.
2135    */
2136   @Override
2137   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
2138       List<Pair<byte[], String>> familyPaths) throws IOException {
2139     for(Pair<byte[],String> el : familyPaths) {
2140       requirePermission("preBulkLoadHFile",
2141           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
2142           el.getFirst(),
2143           null,
2144           Action.CREATE);
2145     }
2146   }
2147 
2148   /**
2149    * Authorization check for
2150    * SecureBulkLoadProtocol.prepareBulkLoad()
2151    * @param ctx the context
2152    * @param request the request
2153    * @throws IOException
2154    */
2155   @Override
2156   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2157                                  PrepareBulkLoadRequest request) throws IOException {
2158     requireAccess("prePareBulkLoad",
2159         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2160   }
2161 
2162   /**
2163    * Authorization security check for
2164    * SecureBulkLoadProtocol.cleanupBulkLoad()
2165    * @param ctx the context
2166    * @param request the request
2167    * @throws IOException
2168    */
2169   @Override
2170   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2171                                  CleanupBulkLoadRequest request) throws IOException {
2172     requireAccess("preCleanupBulkLoad",
2173         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2174   }
2175 
2176   /* ---- EndpointObserver implementation ---- */
2177 
2178   @Override
2179   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2180       Service service, String methodName, Message request) throws IOException {
2181     // Don't intercept calls to our own AccessControlService, we check for
2182     // appropriate permissions in the service handlers
2183     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2184       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
2185         methodName + ")",
2186         getTableName(ctx.getEnvironment()), null, null,
2187         Action.EXEC);
2188     }
2189     return request;
2190   }
2191 
2192   @Override
2193   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2194       Service service, String methodName, Message request, Message.Builder responseBuilder)
2195       throws IOException { }
2196 
2197   /* ---- Protobuf AccessControlService implementation ---- */
2198 
2199   @Override
2200   public void grant(RpcController controller,
2201                     AccessControlProtos.GrantRequest request,
2202                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2203     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2204     AccessControlProtos.GrantResponse response = null;
2205     try {
2206       // verify it's only running at .acl.
2207       if (aclRegion) {
2208         if (!initialized) {
2209           throw new CoprocessorException("AccessController not yet initialized");
2210         }
2211         if (LOG.isDebugEnabled()) {
2212           LOG.debug("Received request to grant access permission " + perm.toString());
2213         }
2214 
2215         switch(request.getUserPermission().getPermission().getType()) {
2216           case Global :
2217           case Table :
2218             requirePermission("grant", perm.getTableName(), perm.getFamily(),
2219               perm.getQualifier(), Action.ADMIN);
2220             break;
2221           case Namespace :
2222             requireNamespacePermission("grant", perm.getNamespace(), Action.ADMIN);
2223            break;
2224         }
2225 
2226         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2227           @Override
2228           public Void run() throws Exception {
2229             AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2230             return null;
2231           }
2232         });
2233 
2234         if (AUDITLOG.isTraceEnabled()) {
2235           // audit log should store permission changes in addition to auth results
2236           AUDITLOG.trace("Granted permission " + perm.toString());
2237         }
2238       } else {
2239         throw new CoprocessorException(AccessController.class, "This method "
2240             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2241       }
2242       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2243     } catch (IOException ioe) {
2244       // pass exception back up
2245       ResponseConverter.setControllerException(controller, ioe);
2246     }
2247     done.run(response);
2248   }
2249 
2250   @Override
2251   public void revoke(RpcController controller,
2252                      AccessControlProtos.RevokeRequest request,
2253                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2254     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2255     AccessControlProtos.RevokeResponse response = null;
2256     try {
2257       // only allowed to be called on _acl_ region
2258       if (aclRegion) {
2259         if (!initialized) {
2260           throw new CoprocessorException("AccessController not yet initialized");
2261         }
2262         if (LOG.isDebugEnabled()) {
2263           LOG.debug("Received request to revoke access permission " + perm.toString());
2264         }
2265 
2266         switch(request.getUserPermission().getPermission().getType()) {
2267           case Global :
2268           case Table :
2269             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2270               perm.getQualifier(), Action.ADMIN);
2271             break;
2272           case Namespace :
2273             requireNamespacePermission("revoke", perm.getNamespace(), Action.ADMIN);
2274             break;
2275         }
2276 
2277         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2278           @Override
2279           public Void run() throws Exception {
2280             AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2281             return null;
2282           }
2283         });
2284 
2285         if (AUDITLOG.isTraceEnabled()) {
2286           // audit log should record all permission changes
2287           AUDITLOG.trace("Revoked permission " + perm.toString());
2288         }
2289       } else {
2290         throw new CoprocessorException(AccessController.class, "This method "
2291             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2292       }
2293       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2294     } catch (IOException ioe) {
2295       // pass exception back up
2296       ResponseConverter.setControllerException(controller, ioe);
2297     }
2298     done.run(response);
2299   }
2300 
2301   @Override
2302   public void getUserPermissions(RpcController controller,
2303                                  AccessControlProtos.GetUserPermissionsRequest request,
2304                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2305     AccessControlProtos.GetUserPermissionsResponse response = null;
2306     try {
2307       // only allowed to be called on _acl_ region
2308       if (aclRegion) {
2309         if (!initialized) {
2310           throw new CoprocessorException("AccessController not yet initialized");
2311         }
2312         List<UserPermission> perms = null;
2313         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2314           final TableName table = request.hasTableName() ?
2315             ProtobufUtil.toTableName(request.getTableName()) : null;
2316           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2317           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2318             @Override
2319             public List<UserPermission> run() throws Exception {
2320               return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2321             }
2322           });
2323         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2324           final String namespace = request.getNamespaceName().toStringUtf8();
2325           requireNamespacePermission("userPermissions", namespace, Action.ADMIN);
2326           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2327             @Override
2328             public List<UserPermission> run() throws Exception {
2329               return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2330                 namespace);
2331             }
2332           });
2333         } else {
2334           requirePermission("userPermissions", Action.ADMIN);
2335           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2336             @Override
2337             public List<UserPermission> run() throws Exception {
2338               return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2339             }
2340           });
2341         }
2342         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2343       } else {
2344         throw new CoprocessorException(AccessController.class, "This method "
2345             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2346       }
2347     } catch (IOException ioe) {
2348       // pass exception back up
2349       ResponseConverter.setControllerException(controller, ioe);
2350     }
2351     done.run(response);
2352   }
2353 
2354   @Override
2355   public void checkPermissions(RpcController controller,
2356                                AccessControlProtos.CheckPermissionsRequest request,
2357                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2358     Permission[] permissions = new Permission[request.getPermissionCount()];
2359     for (int i=0; i < request.getPermissionCount(); i++) {
2360       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2361     }
2362     AccessControlProtos.CheckPermissionsResponse response = null;
2363     try {
2364       User user = getActiveUser();
2365       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2366       for (Permission permission : permissions) {
2367         if (permission instanceof TablePermission) {
2368           // Check table permissions
2369 
2370           TablePermission tperm = (TablePermission) permission;
2371           for (Action action : permission.getActions()) {
2372             if (!tperm.getTableName().equals(tableName)) {
2373               throw new CoprocessorException(AccessController.class, String.format("This method "
2374                   + "can only execute at the table specified in TablePermission. " +
2375                   "Table of the region:%s , requested table:%s", tableName,
2376                   tperm.getTableName()));
2377             }
2378 
2379             Map<byte[], Set<byte[]>> familyMap =
2380                 new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2381             if (tperm.getFamily() != null) {
2382               if (tperm.getQualifier() != null) {
2383                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2384                 qualifiers.add(tperm.getQualifier());
2385                 familyMap.put(tperm.getFamily(), qualifiers);
2386               } else {
2387                 familyMap.put(tperm.getFamily(), null);
2388               }
2389             }
2390 
2391             AuthResult result = permissionGranted("checkPermissions", user, action, regionEnv,
2392               familyMap);
2393             logResult(result);
2394             if (!result.isAllowed()) {
2395               // Even if passive we need to throw an exception here, we support checking
2396               // effective permissions, so throw unconditionally
2397               throw new AccessDeniedException("Insufficient permissions (table=" + tableName +
2398                 (familyMap.size() > 0 ? ", family: " + result.toFamilyString() : "") +
2399                 ", action=" + action.toString() + ")");
2400             }
2401           }
2402 
2403         } else {
2404           // Check global permissions
2405 
2406           for (Action action : permission.getActions()) {
2407             AuthResult result;
2408             if (authManager.authorize(user, action)) {
2409               result = AuthResult.allow("checkPermissions", "Global action allowed", user,
2410                 action, null, null);
2411             } else {
2412               result = AuthResult.deny("checkPermissions", "Global action denied", user, action,
2413                 null, null);
2414             }
2415             logResult(result);
2416             if (!result.isAllowed()) {
2417               // Even if passive we need to throw an exception here, we support checking
2418               // effective permissions, so throw unconditionally
2419               throw new AccessDeniedException("Insufficient permissions (action=" +
2420                 action.toString() + ")");
2421             }
2422           }
2423         }
2424       }
2425       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2426     } catch (IOException ioe) {
2427       ResponseConverter.setControllerException(controller, ioe);
2428     }
2429     done.run(response);
2430   }
2431 
2432   @Override
2433   public Service getService() {
2434     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2435   }
2436 
2437   private Region getRegion(RegionCoprocessorEnvironment e) {
2438     return e.getRegion();
2439   }
2440 
2441   private TableName getTableName(RegionCoprocessorEnvironment e) {
2442     Region region = e.getRegion();
2443     if (region != null) {
2444       return getTableName(region);
2445     }
2446     return null;
2447   }
2448 
2449   private TableName getTableName(Region region) {
2450     HRegionInfo regionInfo = region.getRegionInfo();
2451     if (regionInfo != null) {
2452       return regionInfo.getTable();
2453     }
2454     return null;
2455   }
2456 
2457   @Override
2458   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2459       throws IOException {
2460     requirePermission("preClose", Action.ADMIN);
2461   }
2462 
2463   private void checkSystemOrSuperUser() throws IOException {
2464     // No need to check if we're not going to throw
2465     if (!authorizationEnabled) {
2466       return;
2467     }
2468     User activeUser = getActiveUser();
2469     if (!Superusers.isSuperUser(activeUser)) {
2470       throw new AccessDeniedException("User '" + (activeUser != null ?
2471         activeUser.getShortName() : "null") + "' is not system or super user.");
2472     }
2473   }
2474 
2475   @Override
2476   public void preStopRegionServer(
2477       ObserverContext<RegionServerCoprocessorEnvironment> env)
2478       throws IOException {
2479     requirePermission("preStopRegionServer", Action.ADMIN);
2480   }
2481 
2482   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2483       byte[] qualifier) {
2484     if (family == null) {
2485       return null;
2486     }
2487 
2488     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2489     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2490     return familyMap;
2491   }
2492 
2493   @Override
2494   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2495        List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2496        String regex) throws IOException {
2497     // We are delegating the authorization check to postGetTableDescriptors as we don't have
2498     // any concrete set of table names when a regex is present or the full list is requested.
2499     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2500       // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2501       // request can be granted.
2502       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2503       for (TableName tableName: tableNamesList) {
2504         // Skip checks for a table that does not exist
2505         if (masterServices.getTableDescriptors().get(tableName) == null) {
2506           continue;
2507         }
2508         requirePermission("getTableDescriptors", tableName, null, null,
2509             Action.ADMIN, Action.CREATE);
2510       }
2511     }
2512   }
2513 
2514   @Override
2515   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2516       List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2517       String regex) throws IOException {
2518     // Skipping as checks in this case are already done by preGetTableDescriptors.
2519     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2520       return;
2521     }
2522 
2523     // Retains only those which passes authorization checks, as the checks weren't done as part
2524     // of preGetTableDescriptors.
2525     Iterator<HTableDescriptor> itr = descriptors.iterator();
2526     while (itr.hasNext()) {
2527       HTableDescriptor htd = itr.next();
2528       try {
2529         requirePermission("getTableDescriptors", htd.getTableName(), null, null,
2530             Action.ADMIN, Action.CREATE);
2531       } catch (AccessDeniedException e) {
2532         itr.remove();
2533       }
2534     }
2535   }
2536 
2537   @Override
2538   public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2539       List<HTableDescriptor> descriptors, String regex) throws IOException {
2540     // Retains only those which passes authorization checks.
2541     Iterator<HTableDescriptor> itr = descriptors.iterator();
2542     while (itr.hasNext()) {
2543       HTableDescriptor htd = itr.next();
2544       try {
2545         requireAccess("getTableNames", htd.getTableName(), Action.values());
2546       } catch (AccessDeniedException e) {
2547         itr.remove();
2548       }
2549     }
2550   }
2551 
2552   @Override
2553   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA,
2554       Region regionB) throws IOException {
2555     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2556       Action.ADMIN);
2557   }
2558 
2559   @Override
2560   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, Region regionA,
2561       Region regionB, Region mergedRegion) throws IOException { }
2562 
2563   @Override
2564   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2565       Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException { }
2566 
2567   @Override
2568   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2569       Region regionA, Region regionB, Region mergedRegion) throws IOException { }
2570 
2571   @Override
2572   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2573       Region regionA, Region regionB) throws IOException { }
2574 
2575   @Override
2576   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2577       Region regionA, Region regionB) throws IOException { }
2578 
2579   @Override
2580   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2581       throws IOException {
2582     requirePermission("preRollLogWriterRequest", Permission.Action.ADMIN);
2583   }
2584 
2585   @Override
2586   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2587       throws IOException { }
2588 
2589   @Override
2590   public ReplicationEndpoint postCreateReplicationEndPoint(
2591       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2592     return endpoint;
2593   }
2594 
2595   @Override
2596   public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2597       List<WALEntry> entries, CellScanner cells) throws IOException {
2598     requirePermission("replicateLogEntries", Action.WRITE);
2599   }
2600 
2601   @Override
2602   public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2603       List<WALEntry> entries, CellScanner cells) throws IOException {
2604   }
2605   
2606   @Override
2607   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2608       final String userName, final Quotas quotas) throws IOException {
2609     requirePermission("setUserQuota", Action.ADMIN);
2610   }
2611 
2612   @Override
2613   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2614       final String userName, final TableName tableName, final Quotas quotas) throws IOException {
2615     requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN);
2616   }
2617 
2618   @Override
2619   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2620       final String userName, final String namespace, final Quotas quotas) throws IOException {
2621     requirePermission("setUserNamespaceQuota", Action.ADMIN);
2622   }
2623 
2624   @Override
2625   public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2626       final TableName tableName, final Quotas quotas) throws IOException {
2627     requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
2628   }
2629 
2630   @Override
2631   public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2632       final String namespace, final Quotas quotas) throws IOException {
2633     requirePermission("setNamespaceQuota", Action.ADMIN);
2634   }
2635 }