View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.security.access;
20  
21  import com.google.common.net.HostAndPort;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.security.PrivilegedExceptionAction;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  import java.util.Set;
34  import java.util.TreeMap;
35  import java.util.TreeSet;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.ArrayBackedTag;
41  import org.apache.hadoop.hbase.Cell;
42  import org.apache.hadoop.hbase.CellScanner;
43  import org.apache.hadoop.hbase.CellUtil;
44  import org.apache.hadoop.hbase.CompoundConfiguration;
45  import org.apache.hadoop.hbase.CoprocessorEnvironment;
46  import org.apache.hadoop.hbase.DoNotRetryIOException;
47  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
48  import org.apache.hadoop.hbase.HColumnDescriptor;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.HTableDescriptor;
52  import org.apache.hadoop.hbase.KeyValue;
53  import org.apache.hadoop.hbase.KeyValue.Type;
54  import org.apache.hadoop.hbase.MetaTableAccessor;
55  import org.apache.hadoop.hbase.NamespaceDescriptor;
56  import org.apache.hadoop.hbase.ProcedureInfo;
57  import org.apache.hadoop.hbase.ServerName;
58  import org.apache.hadoop.hbase.TableName;
59  import org.apache.hadoop.hbase.Tag;
60  import org.apache.hadoop.hbase.TagRewriteCell;
61  import org.apache.hadoop.hbase.TagUtil;
62  import org.apache.hadoop.hbase.classification.InterfaceAudience;
63  import org.apache.hadoop.hbase.client.Append;
64  import org.apache.hadoop.hbase.client.Delete;
65  import org.apache.hadoop.hbase.client.Durability;
66  import org.apache.hadoop.hbase.client.Get;
67  import org.apache.hadoop.hbase.client.Increment;
68  import org.apache.hadoop.hbase.client.MasterSwitchType;
69  import org.apache.hadoop.hbase.client.Mutation;
70  import org.apache.hadoop.hbase.client.Put;
71  import org.apache.hadoop.hbase.client.Query;
72  import org.apache.hadoop.hbase.client.Result;
73  import org.apache.hadoop.hbase.client.Scan;
74  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
75  import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
76  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
77  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
78  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
79  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
80  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
81  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
82  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
83  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
84  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
85  import org.apache.hadoop.hbase.filter.CompareFilter;
86  import org.apache.hadoop.hbase.filter.Filter;
87  import org.apache.hadoop.hbase.filter.FilterList;
88  import org.apache.hadoop.hbase.io.hfile.HFile;
89  import org.apache.hadoop.hbase.ipc.RpcServer;
90  import org.apache.hadoop.hbase.master.MasterServices;
91  import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
92  import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
93  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
94  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
95  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
96  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
97  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
98  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
99  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
100 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
101 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
102 import org.apache.hadoop.hbase.regionserver.InternalScanner;
103 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
104 import org.apache.hadoop.hbase.regionserver.Region;
105 import org.apache.hadoop.hbase.regionserver.RegionScanner;
106 import org.apache.hadoop.hbase.regionserver.ScanType;
107 import org.apache.hadoop.hbase.regionserver.ScannerContext;
108 import org.apache.hadoop.hbase.regionserver.Store;
109 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
110 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
111 import org.apache.hadoop.hbase.security.AccessDeniedException;
112 import org.apache.hadoop.hbase.security.Superusers;
113 import org.apache.hadoop.hbase.security.User;
114 import org.apache.hadoop.hbase.security.UserProvider;
115 import org.apache.hadoop.hbase.security.access.Permission.Action;
116 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
117 import org.apache.hadoop.hbase.util.ByteRange;
118 import org.apache.hadoop.hbase.util.Bytes;
119 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
120 import org.apache.hadoop.hbase.util.Pair;
121 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
122 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
123 
124 import com.google.common.collect.ArrayListMultimap;
125 import com.google.common.collect.ImmutableSet;
126 import com.google.common.collect.ListMultimap;
127 import com.google.common.collect.Lists;
128 import com.google.common.collect.MapMaker;
129 import com.google.common.collect.Maps;
130 import com.google.common.collect.Sets;
131 import com.google.protobuf.Message;
132 import com.google.protobuf.RpcCallback;
133 import com.google.protobuf.RpcController;
134 import com.google.protobuf.Service;
135 
136 /**
137  * Provides basic authorization checks for data access and administrative
138  * operations.
139  *
140  * <p>
141  * {@code AccessController} performs authorization checks for HBase operations
142  * based on:
143  * </p>
144  * <ul>
145  *   <li>the identity of the user performing the operation</li>
146  *   <li>the scope over which the operation is performed, in increasing
147  *   specificity: global, table, column family, or qualifier</li>
148  *   <li>the type of action being performed (as mapped to
149  *   {@link Permission.Action} values)</li>
150  * </ul>
151  * <p>
152  * If the authorization check fails, an {@link AccessDeniedException}
153  * will be thrown for the operation.
154  * </p>
155  *
156  * <p>
157  * To perform authorization checks, {@code AccessController} relies on the
158  * RpcServerEngine being loaded to provide
159  * the user identities for remote requests.
160  * </p>
161  *
162  * <p>
163  * The access control lists used for authorization can be manipulated via the
164  * exposed {@link AccessControlService} Interface implementation, and the associated
165  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
166  * commands.
167  * </p>
168  */
169 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
170 public class AccessController extends BaseMasterAndRegionObserver
171     implements RegionServerObserver,
172       AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
173 
174   private static final Log LOG = LogFactory.getLog(AccessController.class);
175 
176   private static final Log AUDITLOG =
177     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
178   private static final String CHECK_COVERING_PERM = "check_covering_perm";
179   private static final String TAG_CHECK_PASSED = "tag_check_passed";
180   private static final byte[] TRUE = Bytes.toBytes(true);
181 
182   TableAuthManager authManager = null;
183 
184   /** flags if we are running on a region of the _acl_ table */
185   boolean aclRegion = false;
186 
187   /** defined only for Endpoint implementation, so it can have way to
188    access region services */
189   private RegionCoprocessorEnvironment regionEnv;
190 
191   /** Mapping of scanner instances to the user who created them */
192   private Map<InternalScanner,String> scannerOwners =
193       new MapMaker().weakKeys().makeMap();
194 
195   private Map<TableName, List<UserPermission>> tableAcls;
196 
197   /** Provider for mapping principal names to Users */
198   private UserProvider userProvider;
199 
200   /** if we are active, usually true, only not true if "hbase.security.authorization"
201    has been set to false in site configuration */
202   boolean authorizationEnabled;
203 
204   /** if we are able to support cell ACLs */
205   boolean cellFeaturesEnabled;
206 
207   /** if we should check EXEC permissions */
208   boolean shouldCheckExecPermission;
209 
210   /** if we should terminate access checks early as soon as table or CF grants
211     allow access; pre-0.98 compatible behavior */
212   boolean compatibleEarlyTermination;
213 
214   /** if we have been successfully initialized */
215   private volatile boolean initialized = false;
216 
217   /** if the ACL table is available, only relevant in the master */
218   private volatile boolean aclTabAvailable = false;
219 
220   public static boolean isAuthorizationSupported(Configuration conf) {
221     return conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
222   }
223 
224   public static boolean isCellAuthorizationSupported(Configuration conf) {
225     return isAuthorizationSupported(conf) &&
226         (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
227   }
228 
229   public Region getRegion() {
230     return regionEnv != null ? regionEnv.getRegion() : null;
231   }
232 
233   public TableAuthManager getAuthManager() {
234     return authManager;
235   }
236 
237   void initialize(RegionCoprocessorEnvironment e) throws IOException {
238     final Region region = e.getRegion();
239     Configuration conf = e.getConfiguration();
240     Map<byte[], ListMultimap<String,TablePermission>> tables =
241         AccessControlLists.loadAll(region);
242     // For each table, write out the table's permissions to the respective
243     // znode for that table.
244     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
245       tables.entrySet()) {
246       byte[] entry = t.getKey();
247       ListMultimap<String,TablePermission> perms = t.getValue();
248       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
249       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
250     }
251     initialized = true;
252   }
253 
254   /**
255    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
256    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
257    * table updates.
258    */
259   void updateACL(RegionCoprocessorEnvironment e,
260       final Map<byte[], List<Cell>> familyMap) {
261     Set<byte[]> entries =
262         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
263     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
264       List<Cell> cells = f.getValue();
265       for (Cell cell: cells) {
266         if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
267             cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
268             AccessControlLists.ACL_LIST_FAMILY.length)) {
269           entries.add(CellUtil.cloneRow(cell));
270         }
271       }
272     }
273     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
274     Configuration conf = regionEnv.getConfiguration();
275     for (byte[] entry: entries) {
276       try {
277         ListMultimap<String,TablePermission> perms =
278           AccessControlLists.getPermissions(conf, entry);
279         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
280         zkw.writeToZookeeper(entry, serialized);
281       } catch (IOException ex) {
282         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
283             ex);
284       }
285     }
286   }
287 
288   /**
289    * Check the current user for authorization to perform a specific action
290    * against the given set of row data.
291    *
292    * <p>Note: Ordering of the authorization checks
293    * has been carefully optimized to short-circuit the most common requests
294    * and minimize the amount of processing required.</p>
295    *
296    * @param permRequest the action being requested
297    * @param e the coprocessor environment
298    * @param families the map of column families to qualifiers present in
299    * the request
300    * @return an authorization result
301    */
302   AuthResult permissionGranted(String request, User user, Action permRequest,
303       RegionCoprocessorEnvironment e,
304       Map<byte [], ? extends Collection<?>> families) {
305     HRegionInfo hri = e.getRegion().getRegionInfo();
306     TableName tableName = hri.getTable();
307 
308     // 1. All users need read access to hbase:meta table.
309     // this is a very common operation, so deal with it quickly.
310     if (hri.isMetaRegion()) {
311       if (permRequest == Action.READ) {
312         return AuthResult.allow(request, "All users allowed", user,
313           permRequest, tableName, families);
314       }
315     }
316 
317     if (user == null) {
318       return AuthResult.deny(request, "No user associated with request!", null,
319         permRequest, tableName, families);
320     }
321 
322     // 2. check for the table-level, if successful we can short-circuit
323     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
324       return AuthResult.allow(request, "Table permission granted", user,
325         permRequest, tableName, families);
326     }
327 
328     // 3. check permissions against the requested families
329     if (families != null && families.size() > 0) {
330       // all families must pass
331       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
332         // a) check for family level access
333         if (authManager.authorize(user, tableName, family.getKey(),
334             permRequest)) {
335           continue;  // family-level permission overrides per-qualifier
336         }
337 
338         // b) qualifier level access can still succeed
339         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
340           if (family.getValue() instanceof Set) {
341             // for each qualifier of the family
342             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
343             for (byte[] qualifier : familySet) {
344               if (!authManager.authorize(user, tableName, family.getKey(),
345                                          qualifier, permRequest)) {
346                 return AuthResult.deny(request, "Failed qualifier check", user,
347                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
348               }
349             }
350           } else if (family.getValue() instanceof List) { // List<KeyValue>
351             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
352             for (KeyValue kv : kvList) {
353               if (!authManager.authorize(user, tableName, family.getKey(),
354                 CellUtil.cloneQualifier(kv), permRequest)) {
355                 return AuthResult.deny(request, "Failed qualifier check", user, permRequest,
356                   tableName, makeFamilyMap(family.getKey(), CellUtil.cloneQualifier(kv)));
357               }
358             }
359           }
360         } else {
361           // no qualifiers and family-level check already failed
362           return AuthResult.deny(request, "Failed family check", user, permRequest,
363               tableName, makeFamilyMap(family.getKey(), null));
364         }
365       }
366 
367       // all family checks passed
368       return AuthResult.allow(request, "All family checks passed", user, permRequest,
369           tableName, families);
370     }
371 
372     // 4. no families to check and table level access failed
373     return AuthResult.deny(request, "No families to check and table permission failed",
374         user, permRequest, tableName, families);
375   }
376 
377   /**
378    * Check the current user for authorization to perform a specific action
379    * against the given set of row data.
380    * @param opType the operation type
381    * @param user the user
382    * @param e the coprocessor environment
383    * @param families the map of column families to qualifiers present in
384    * the request
385    * @param actions the desired actions
386    * @return an authorization result
387    */
388   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
389       Map<byte [], ? extends Collection<?>> families, Action... actions) {
390     AuthResult result = null;
391     for (Action action: actions) {
392       result = permissionGranted(opType.toString(), user, action, e, families);
393       if (!result.isAllowed()) {
394         return result;
395       }
396     }
397     return result;
398   }
399 
400   private void logResult(AuthResult result) {
401     if (AUDITLOG.isTraceEnabled()) {
402       InetAddress remoteAddr = RpcServer.getRemoteAddress();
403       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
404           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
405           "; reason: " + result.getReason() +
406           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
407           "; request: " + result.getRequest() +
408           "; context: " + result.toContextString());
409     }
410   }
411 
412   /**
413    * Returns the active user to which authorization checks should be applied.
414    * If we are in the context of an RPC call, the remote user is used,
415    * otherwise the currently logged in user is used.
416    */
417   private User getActiveUser() throws IOException {
418     User user = RpcServer.getRequestUser();
419     if (user == null) {
420       // for non-rpc handling, fallback to system user
421       user = userProvider.getCurrent();
422     }
423     return user;
424   }
425 
426   /**
427    * Authorizes that the current user has any of the given permissions for the
428    * given table, column family and column qualifier.
429    * @param tableName Table requested
430    * @param family Column family requested
431    * @param qualifier Column qualifier requested
432    * @throws IOException if obtaining the current user fails
433    * @throws AccessDeniedException if user has no authorization
434    */
435   private void requirePermission(String request, TableName tableName, byte[] family,
436       byte[] qualifier, Action... permissions) throws IOException {
437     User user = getActiveUser();
438     AuthResult result = null;
439 
440     for (Action permission : permissions) {
441       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
442         result = AuthResult.allow(request, "Table permission granted", user,
443                                   permission, tableName, family, qualifier);
444         break;
445       } else {
446         // rest of the world
447         result = AuthResult.deny(request, "Insufficient permissions", user,
448                                  permission, tableName, family, qualifier);
449       }
450     }
451     logResult(result);
452     if (authorizationEnabled && !result.isAllowed()) {
453       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
454     }
455   }
456 
457   /**
458    * Authorizes that the current user has any of the given permissions for the
459    * given table, column family and column qualifier.
460    * @param tableName Table requested
461    * @param family Column family param
462    * @param qualifier Column qualifier param
463    * @throws IOException if obtaining the current user fails
464    * @throws AccessDeniedException if user has no authorization
465    */
466   private void requireTablePermission(String request, TableName tableName, byte[] family,
467       byte[] qualifier, Action... permissions) throws IOException {
468     User user = getActiveUser();
469     AuthResult result = null;
470 
471     for (Action permission : permissions) {
472       if (authManager.authorize(user, tableName, null, null, permission)) {
473         result = AuthResult.allow(request, "Table permission granted", user,
474             permission, tableName, null, null);
475         result.getParams().setFamily(family).setQualifier(qualifier);
476         break;
477       } else {
478         // rest of the world
479         result = AuthResult.deny(request, "Insufficient permissions", user,
480             permission, tableName, family, qualifier);
481         result.getParams().setFamily(family).setQualifier(qualifier);
482       }
483     }
484     logResult(result);
485     if (authorizationEnabled && !result.isAllowed()) {
486       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
487     }
488   }
489 
490   /**
491    * Authorizes that the current user has any of the given permissions to access the table.
492    *
493    * @param tableName Table requested
494    * @param permissions Actions being requested
495    * @throws IOException if obtaining the current user fails
496    * @throws AccessDeniedException if user has no authorization
497    */
498   private void requireAccess(String request, TableName tableName,
499       Action... permissions) throws IOException {
500     User user = getActiveUser();
501     AuthResult result = null;
502 
503     for (Action permission : permissions) {
504       if (authManager.hasAccess(user, tableName, permission)) {
505         result = AuthResult.allow(request, "Table permission granted", user,
506                                   permission, tableName, null, null);
507         break;
508       } else {
509         // rest of the world
510         result = AuthResult.deny(request, "Insufficient permissions", user,
511                                  permission, tableName, null, null);
512       }
513     }
514     logResult(result);
515     if (authorizationEnabled && !result.isAllowed()) {
516       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
517     }
518   }
519 
520   /**
521    * Authorizes that the current user has global privileges for the given action.
522    * @param perm The action being requested
523    * @throws IOException if obtaining the current user fails
524    * @throws AccessDeniedException if authorization is denied
525    */
526   private void requirePermission(String request, Action perm) throws IOException {
527     requireGlobalPermission(request, perm, null, null);
528   }
529 
530   /**
531    * Checks that the user has the given global permission. The generated
532    * audit log message will contain context information for the operation
533    * being authorized, based on the given parameters.
534    * @param perm Action being requested
535    * @param tableName Affected table name.
536    * @param familyMap Affected column families.
537    */
538   private void requireGlobalPermission(String request, Action perm, TableName tableName,
539       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
540     User user = getActiveUser();
541     AuthResult result = null;
542     if (authManager.authorize(user, perm)) {
543       result = AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap);
544       result.getParams().setTableName(tableName).setFamilies(familyMap);
545       logResult(result);
546     } else {
547       result = AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap);
548       result.getParams().setTableName(tableName).setFamilies(familyMap);
549       logResult(result);
550       if (authorizationEnabled) {
551         throw new AccessDeniedException("Insufficient permissions for user '" +
552           (user != null ? user.getShortName() : "null") +"' (global, action=" +
553           perm.toString() + ")");
554       }
555     }
556   }
557 
558   /**
559    * Checks that the user has the given global permission. The generated
560    * audit log message will contain context information for the operation
561    * being authorized, based on the given parameters.
562    * @param perm Action being requested
563    * @param namespace
564    */
565   private void requireGlobalPermission(String request, Action perm,
566                                        String namespace) throws IOException {
567     User user = getActiveUser();
568     AuthResult authResult = null;
569     if (authManager.authorize(user, perm)) {
570       authResult = AuthResult.allow(request, "Global check allowed", user, perm, null);
571       authResult.getParams().setNamespace(namespace);
572       logResult(authResult);
573     } else {
574       authResult = AuthResult.deny(request, "Global check failed", user, perm, null);
575       authResult.getParams().setNamespace(namespace);
576       logResult(authResult);
577       if (authorizationEnabled) {
578         throw new AccessDeniedException("Insufficient permissions for user '" +
579           (user != null ? user.getShortName() : "null") +"' (global, action=" +
580           perm.toString() + ")");
581       }
582     }
583   }
584 
585   /**
586    * Checks that the user has the given global or namespace permission.
587    * @param namespace
588    * @param permissions Actions being requested
589    */
590   public void requireNamespacePermission(String request, String namespace,
591       Action... permissions) throws IOException {
592     User user = getActiveUser();
593     AuthResult result = null;
594 
595     for (Action permission : permissions) {
596       if (authManager.authorize(user, namespace, permission)) {
597         result = AuthResult.allow(request, "Namespace permission granted",
598             user, permission, namespace);
599         break;
600       } else {
601         // rest of the world
602         result = AuthResult.deny(request, "Insufficient permissions", user,
603             permission, namespace);
604       }
605     }
606     logResult(result);
607     if (authorizationEnabled && !result.isAllowed()) {
608       throw new AccessDeniedException("Insufficient permissions "
609           + result.toContextString());
610     }
611   }
612 
613   /**
614    * Checks that the user has the given global or namespace permission.
615    * @param namespace
616    * @param permissions Actions being requested
617    */
618   public void requireNamespacePermission(String request, String namespace, TableName tableName,
619       Map<byte[], ? extends Collection<byte[]>> familyMap, Action... permissions)
620       throws IOException {
621     User user = getActiveUser();
622     AuthResult result = null;
623 
624     for (Action permission : permissions) {
625       if (authManager.authorize(user, namespace, permission)) {
626         result = AuthResult.allow(request, "Namespace permission granted",
627             user, permission, namespace);
628         result.getParams().setTableName(tableName).setFamilies(familyMap);
629         break;
630       } else {
631         // rest of the world
632         result = AuthResult.deny(request, "Insufficient permissions", user,
633             permission, namespace);
634         result.getParams().setTableName(tableName).setFamilies(familyMap);
635       }
636     }
637     logResult(result);
638     if (authorizationEnabled && !result.isAllowed()) {
639       throw new AccessDeniedException("Insufficient permissions "
640           + result.toContextString());
641     }
642   }
643 
644   /**
645    * Returns <code>true</code> if the current user is allowed the given action
646    * over at least one of the column qualifiers in the given column families.
647    */
648   private boolean hasFamilyQualifierPermission(User user,
649       Action perm,
650       RegionCoprocessorEnvironment env,
651       Map<byte[], ? extends Collection<byte[]>> familyMap)
652     throws IOException {
653     HRegionInfo hri = env.getRegion().getRegionInfo();
654     TableName tableName = hri.getTable();
655 
656     if (user == null) {
657       return false;
658     }
659 
660     if (familyMap != null && familyMap.size() > 0) {
661       // at least one family must be allowed
662       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
663           familyMap.entrySet()) {
664         if (family.getValue() != null && !family.getValue().isEmpty()) {
665           for (byte[] qualifier : family.getValue()) {
666             if (authManager.matchPermission(user, tableName,
667                 family.getKey(), qualifier, perm)) {
668               return true;
669             }
670           }
671         } else {
672           if (authManager.matchPermission(user, tableName, family.getKey(),
673               perm)) {
674             return true;
675           }
676         }
677       }
678     } else if (LOG.isDebugEnabled()) {
679       LOG.debug("Empty family map passed for permission check");
680     }
681 
682     return false;
683   }
684 
685   private enum OpType {
686     GET("get"),
687     EXISTS("exists"),
688     SCAN("scan"),
689     PUT("put"),
690     DELETE("delete"),
691     CHECK_AND_PUT("checkAndPut"),
692     CHECK_AND_DELETE("checkAndDelete"),
693     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
694     APPEND("append"),
695     INCREMENT("increment");
696 
697     private String type;
698 
699     private OpType(String type) {
700       this.type = type;
701     }
702 
703     @Override
704     public String toString() {
705       return type;
706     }
707   }
708 
709   /**
710    * Determine if cell ACLs covered by the operation grant access. This is expensive.
711    * @return false if cell ACLs failed to grant access, true otherwise
712    * @throws IOException
713    */
714   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
715       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
716       throws IOException {
717     if (!cellFeaturesEnabled) {
718       return false;
719     }
720     long cellGrants = 0;
721     User user = getActiveUser();
722     long latestCellTs = 0;
723     Get get = new Get(row);
724     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
725     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
726     // version. We have to get every cell version and check its TS against the TS asked for in
727     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
728     // consider only one such passing cell. In case of Delete we have to consider all the cell
729     // versions under this passing version. When Delete Mutation contains columns which are a
730     // version delete just consider only one version for those column cells.
731     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
732     if (considerCellTs) {
733       get.setMaxVersions();
734     } else {
735       get.setMaxVersions(1);
736     }
737     boolean diffCellTsFromOpTs = false;
738     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
739       byte[] col = entry.getKey();
740       // TODO: HBASE-7114 could possibly unify the collection type in family
741       // maps so we would not need to do this
742       if (entry.getValue() instanceof Set) {
743         Set<byte[]> set = (Set<byte[]>)entry.getValue();
744         if (set == null || set.isEmpty()) {
745           get.addFamily(col);
746         } else {
747           for (byte[] qual: set) {
748             get.addColumn(col, qual);
749           }
750         }
751       } else if (entry.getValue() instanceof List) {
752         List<Cell> list = (List<Cell>)entry.getValue();
753         if (list == null || list.isEmpty()) {
754           get.addFamily(col);
755         } else {
756           // In case of family delete, a Cell will be added into the list with Qualifier as null.
757           for (Cell cell : list) {
758             if (cell.getQualifierLength() == 0
759                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
760                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
761               get.addFamily(col);
762             } else {
763               get.addColumn(col, CellUtil.cloneQualifier(cell));
764             }
765             if (considerCellTs) {
766               long cellTs = cell.getTimestamp();
767               latestCellTs = Math.max(latestCellTs, cellTs);
768               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
769             }
770           }
771         }
772       } else if (entry.getValue() == null) {
773         get.addFamily(col);
774       } else {
775         throw new RuntimeException("Unhandled collection type " +
776           entry.getValue().getClass().getName());
777       }
778     }
779     // We want to avoid looking into the future. So, if the cells of the
780     // operation specify a timestamp, or the operation itself specifies a
781     // timestamp, then we use the maximum ts found. Otherwise, we bound
782     // the Get to the current server time. We add 1 to the timerange since
783     // the upper bound of a timerange is exclusive yet we need to examine
784     // any cells found there inclusively.
785     long latestTs = Math.max(opTs, latestCellTs);
786     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
787       latestTs = EnvironmentEdgeManager.currentTime();
788     }
789     get.setTimeRange(0, latestTs + 1);
790     // In case of Put operation we set to read all versions. This was done to consider the case
791     // where columns are added with TS other than the Mutation TS. But normally this wont be the
792     // case with Put. There no need to get all versions but get latest version only.
793     if (!diffCellTsFromOpTs && request == OpType.PUT) {
794       get.setMaxVersions(1);
795     }
796     if (LOG.isTraceEnabled()) {
797       LOG.trace("Scanning for cells with " + get);
798     }
799     // This Map is identical to familyMap. The key is a BR rather than byte[].
800     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
801     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
802     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
803     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
804       if (entry.getValue() instanceof List) {
805         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
806       }
807     }
808     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
809     List<Cell> cells = Lists.newArrayList();
810     Cell prevCell = null;
811     ByteRange curFam = new SimpleMutableByteRange();
812     boolean curColAllVersions = (request == OpType.DELETE);
813     long curColCheckTs = opTs;
814     boolean foundColumn = false;
815     try {
816       boolean more = false;
817       ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
818 
819       do {
820         cells.clear();
821         // scan with limit as 1 to hold down memory use on wide rows
822         more = scanner.next(cells, scannerContext);
823         for (Cell cell: cells) {
824           if (LOG.isTraceEnabled()) {
825             LOG.trace("Found cell " + cell);
826           }
827           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
828           if (colChange) foundColumn = false;
829           prevCell = cell;
830           if (!curColAllVersions && foundColumn) {
831             continue;
832           }
833           if (colChange && considerCellTs) {
834             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
835             List<Cell> cols = familyMap1.get(curFam);
836             for (Cell col : cols) {
837               // null/empty qualifier is used to denote a Family delete. The TS and delete type
838               // associated with this is applicable for all columns within the family. That is
839               // why the below (col.getQualifierLength() == 0) check.
840               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
841                   || CellUtil.matchingQualifier(cell, col)) {
842                 byte type = col.getTypeByte();
843                 if (considerCellTs) {
844                   curColCheckTs = col.getTimestamp();
845                 }
846                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
847                 // a version delete for a column no need to check all the covering cells within
848                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
849                 // One version delete types are Delete/DeleteFamilyVersion
850                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
851                     || (KeyValue.Type.DeleteFamily.getCode() == type);
852                 break;
853               }
854             }
855           }
856           if (cell.getTimestamp() > curColCheckTs) {
857             // Just ignore this cell. This is not a covering cell.
858             continue;
859           }
860           foundColumn = true;
861           for (Action action: actions) {
862             // Are there permissions for this user for the cell?
863             if (!authManager.authorize(user, getTableName(e), cell, action)) {
864               // We can stop if the cell ACL denies access
865               return false;
866             }
867           }
868           cellGrants++;
869         }
870       } while (more);
871     } catch (AccessDeniedException ex) {
872       throw ex;
873     } catch (IOException ex) {
874       LOG.error("Exception while getting cells to calculate covering permission", ex);
875     } finally {
876       scanner.close();
877     }
878     // We should not authorize unless we have found one or more cell ACLs that
879     // grant access. This code is used to check for additional permissions
880     // after no table or CF grants are found.
881     return cellGrants > 0;
882   }
883 
884   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
885     // Iterate over the entries in the familyMap, replacing the cells therein
886     // with new cells including the ACL data
887     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
888       List<Cell> newCells = Lists.newArrayList();
889       for (Cell cell: e.getValue()) {
890         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
891         List<Tag> tags = new ArrayList<Tag>();
892         tags.add(new ArrayBackedTag(AccessControlLists.ACL_TAG_TYPE, perms));
893         Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell);
894         while (tagIterator.hasNext()) {
895           tags.add(tagIterator.next());
896         }
897         newCells.add(new TagRewriteCell(cell, TagUtil.fromList(tags)));
898       }
899       // This is supposed to be safe, won't CME
900       e.setValue(newCells);
901     }
902   }
903 
904   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
905   // type is reserved and should not be explicitly set by user.
906   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
907     // No need to check if we're not going to throw
908     if (!authorizationEnabled) {
909       m.setAttribute(TAG_CHECK_PASSED, TRUE);
910       return;
911     }
912     // Superusers are allowed to store cells unconditionally.
913     if (Superusers.isSuperUser(user)) {
914       m.setAttribute(TAG_CHECK_PASSED, TRUE);
915       return;
916     }
917     // We already checked (prePut vs preBatchMutation)
918     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
919       return;
920     }
921     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
922       Iterator<Tag> tagsItr = CellUtil.tagsIterator(cellScanner.current());
923       while (tagsItr.hasNext()) {
924         if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
925           throw new AccessDeniedException("Mutation contains cell with reserved type tag");
926         }
927       }
928     }
929     m.setAttribute(TAG_CHECK_PASSED, TRUE);
930   }
931 
932   /* ---- MasterObserver implementation ---- */
933   @Override
934   public void start(CoprocessorEnvironment env) throws IOException {
935     CompoundConfiguration conf = new CompoundConfiguration();
936     conf.add(env.getConfiguration());
937 
938     authorizationEnabled = isAuthorizationSupported(conf);
939     if (!authorizationEnabled) {
940       LOG.warn("The AccessController has been loaded with authorization checks disabled.");
941     }
942 
943     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
944       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
945 
946     cellFeaturesEnabled = (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
947     if (!cellFeaturesEnabled) {
948       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
949           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
950           + " accordingly.");
951     }
952 
953     ZooKeeperWatcher zk = null;
954     if (env instanceof MasterCoprocessorEnvironment) {
955       // if running on HMaster
956       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
957       zk = mEnv.getMasterServices().getZooKeeper();
958     } else if (env instanceof RegionServerCoprocessorEnvironment) {
959       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
960       zk = rsEnv.getRegionServerServices().getZooKeeper();
961     } else if (env instanceof RegionCoprocessorEnvironment) {
962       // if running at region
963       regionEnv = (RegionCoprocessorEnvironment) env;
964       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
965       zk = regionEnv.getRegionServerServices().getZooKeeper();
966       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
967         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
968     }
969 
970     // set the user-provider.
971     this.userProvider = UserProvider.instantiate(env.getConfiguration());
972 
973     // If zk is null or IOException while obtaining auth manager,
974     // throw RuntimeException so that the coprocessor is unloaded.
975     if (zk != null) {
976       try {
977         this.authManager = TableAuthManager.getOrCreate(zk, env.getConfiguration());
978       } catch (IOException ioe) {
979         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
980       }
981     } else {
982       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
983     }
984 
985     tableAcls = new MapMaker().weakValues().makeMap();
986   }
987 
988   @Override
989   public void stop(CoprocessorEnvironment env) {
990     if (this.authManager != null) {
991       TableAuthManager.release(authManager);
992     }
993   }
994 
995   @Override
996   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
997       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
998     Set<byte[]> families = desc.getFamiliesKeys();
999     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
1000     for (byte[] family: families) {
1001       familyMap.put(family, null);
1002     }
1003     requireNamespacePermission("createTable", desc.getTableName().getNamespaceAsString(),
1004         desc.getTableName(), familyMap, Action.CREATE);
1005   }
1006 
1007   @Override
1008   public void postCompletedCreateTableAction(
1009       final ObserverContext<MasterCoprocessorEnvironment> c,
1010       final HTableDescriptor desc,
1011       final HRegionInfo[] regions) throws IOException {
1012     // When AC is used, it should be configured as the 1st CP.
1013     // In Master, the table operations like create, are handled by a Thread pool but the max size
1014     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
1015     // sequentially only.
1016     // Related code in HMaster#startServiceThreads
1017     // {code}
1018     //   // We depend on there being only one instance of this executor running
1019     //   // at a time. To do concurrency, would need fencing of enable/disable of
1020     //   // tables.
1021     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1022     // {code}
1023     // In future if we change this pool to have more threads, then there is a chance for thread,
1024     // creating acl table, getting delayed and by that time another table creation got over and
1025     // this hook is getting called. In such a case, we will need a wait logic here which will
1026     // wait till the acl table is created.
1027     if (AccessControlLists.isAclTable(desc)) {
1028       this.aclTabAvailable = true;
1029     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
1030       if (!aclTabAvailable) {
1031         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
1032             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
1033             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
1034       } else {
1035         String owner = desc.getOwnerString();
1036         // default the table owner to current user, if not specified.
1037         if (owner == null)
1038           owner = getActiveUser().getShortName();
1039         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1040             desc.getTableName(), null, Action.values());
1041         // switch to the real hbase master user for doing the RPC on the ACL table
1042         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1043           @Override
1044           public Void run() throws Exception {
1045             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
1046                 userperm);
1047             return null;
1048           }
1049         });
1050       }
1051     }
1052   }
1053
1054   @Override
1055   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1056       throws IOException {
1057     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1058   }
1059
1060   @Override
1061   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
1062       final TableName tableName) throws IOException {
1063     final Configuration conf = c.getEnvironment().getConfiguration();
1064     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1065       @Override
1066       public Void run() throws Exception {
1067         AccessControlLists.removeTablePermissions(conf, tableName);
1068         return null;
1069       }
1070     });
1071     this.authManager.getZKPermissionWatcher().deleteTableACLNode(tableName);
1072   }
1073
1074   @Override
1075   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
1076       final TableName tableName) throws IOException {
1077     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1078
1079     final Configuration conf = c.getEnvironment().getConfiguration();
1080     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1081       @Override
1082       public Void run() throws Exception {
1083         List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
1084         if (acls != null) {
1085           tableAcls.put(tableName, acls);
1086         }
1087         return null;
1088       }
1089     });
1090   }
1091
1092   @Override
1093   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
1094       final TableName tableName) throws IOException {
1095     final Configuration conf = ctx.getEnvironment().getConfiguration();
1096     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1097       @Override
1098       public Void run() throws Exception {
1099         List<UserPermission> perms = tableAcls.get(tableName);
1100         if (perms != null) {
1101           for (UserPermission perm : perms) {
1102             AccessControlLists.addUserPermission(conf, perm);
1103           }
1104         }
1105         tableAcls.remove(tableName);
1106         return null;
1107       }
1108     });
1109   }
1110
1111   @Override
1112   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1113       HTableDescriptor htd) throws IOException {
1114     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1115   }
1116
1117   @Override
1118   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
1119       TableName tableName, final HTableDescriptor htd) throws IOException {
1120     final Configuration conf = c.getEnvironment().getConfiguration();
1121     // default the table owner to current user, if not specified.
1122     final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
1123       getActiveUser().getShortName();
1124     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1125       @Override
1126       public Void run() throws Exception {
1127         UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1128             htd.getTableName(), null, Action.values());
1129         AccessControlLists.addUserPermission(conf, userperm);
1130         return null;
1131       }
1132     });
1133   }
1134
1135   @Override
1136   public void preAddColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1137                                  TableName tableName, HColumnDescriptor columnFamily)
1138       throws IOException {
1139     requireTablePermission("addColumn", tableName, columnFamily.getName(), null, Action.ADMIN,
1140                            Action.CREATE);
1141   }
1142
1143   @Override
1144   public void preModifyColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1145                                     TableName tableName, HColumnDescriptor columnFamily)
1146       throws IOException {
1147     requirePermission("modifyColumn", tableName, columnFamily.getName(), null, Action.ADMIN,
1148       Action.CREATE);
1149   }
1150
1151   @Override
1152   public void preDeleteColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1153                                     TableName tableName, byte[] columnFamily) throws IOException {
1154     requirePermission("deleteColumn", tableName, columnFamily, null, Action.ADMIN, Action.CREATE);
1155   }
1156
1157   @Override
1158   public void postDeleteColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1159       final TableName tableName, final byte[] columnFamily) throws IOException {
1160     final Configuration conf = ctx.getEnvironment().getConfiguration();
1161     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1162       @Override
1163       public Void run() throws Exception {
1164         AccessControlLists.removeTablePermissions(conf, tableName, columnFamily);
1165         return null;
1166       }
1167     });
1168   }
1169
1170   @Override
1171   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1172       throws IOException {
1173     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1174   }
1175
1176   @Override
1177   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1178       throws IOException {
1179     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1180       // We have to unconditionally disallow disable of the ACL table when we are installed,
1181       // even if not enforcing authorizations. We are still allowing grants and revocations,
1182       // checking permissions and logging audit messages, etc. If the ACL table is not
1183       // available we will fail random actions all over the place.
1184       throw new AccessDeniedException("Not allowed to disable "
1185           + AccessControlLists.ACL_TABLE_NAME + " table with AccessController installed");
1186     }
1187     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1188   }
1189
1190   @Override
1191   public void preAbortProcedure(
1192       ObserverContext<MasterCoprocessorEnvironment> ctx,
1193       final ProcedureExecutor<MasterProcedureEnv> procEnv,
1194       final long procId) throws IOException {
1195     if (!procEnv.isProcedureOwner(procId, getActiveUser())) {
1196       // If the user is not the procedure owner, then we should further probe whether
1197       // he can abort the procedure.
1198       requirePermission("abortProcedure", Action.ADMIN);
1199     }
1200   }
1201
1202   @Override
1203   public void postAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx)
1204       throws IOException {
1205     // There is nothing to do at this time after the procedure abort request was sent.
1206   }
1207
1208   @Override
1209   public void preListProcedures(ObserverContext<MasterCoprocessorEnvironment> ctx)
1210       throws IOException {
1211     // We are delegating the authorization check to postListProcedures as we don't have
1212     // any concrete set of procedures to work with
1213   }
1214
1215   @Override
1216   public void postListProcedures(
1217       ObserverContext<MasterCoprocessorEnvironment> ctx,
1218       List<ProcedureInfo> procInfoList) throws IOException {
1219     if (procInfoList.isEmpty()) {
1220       return;
1221     }
1222
1223     // Retains only those which passes authorization checks, as the checks weren't done as part
1224     // of preListProcedures.
1225     Iterator<ProcedureInfo> itr = procInfoList.iterator();
1226     User user = getActiveUser();
1227     while (itr.hasNext()) {
1228       ProcedureInfo procInfo = itr.next();
1229       try {
1230         if (!ProcedureInfo.isProcedureOwner(procInfo, user)) {
1231           // If the user is not the procedure owner, then we should further probe whether
1232           // he can see the procedure.
1233           requirePermission("listProcedures", Action.ADMIN);
1234         }
1235       } catch (AccessDeniedException e) {
1236         itr.remove();
1237       }
1238     }
1239   }
1240
1241   @Override
1242   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1243       ServerName srcServer, ServerName destServer) throws IOException {
1244     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1245   }
1246
1247   @Override
1248   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1249       throws IOException {
1250     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1251   }
1252
1253   @Override
1254   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1255       boolean force) throws IOException {
1256     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1257   }
1258
1259   @Override
1260   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1261       HRegionInfo regionInfo) throws IOException {
1262     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1263   }
1264
1265   @Override
1266   public boolean preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1267       final boolean newValue, final MasterSwitchType switchType) throws IOException {
1268     requirePermission("setSplitOrMergeEnabled", Action.ADMIN);
1269     return false;
1270   }
1271
1272   @Override
1273   public void postSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1274       final boolean newValue, final MasterSwitchType switchType) throws IOException {
1275   }
1276
1277   @Override
1278   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1279       throws IOException {
1280     requirePermission("balance", Action.ADMIN);
1281   }
1282
1283   @Override
1284   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1285       boolean newValue) throws IOException {
1286     requirePermission("balanceSwitch", Action.ADMIN);
1287     return newValue;
1288   }
1289
1290   @Override
1291   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1292       throws IOException {
1293     requirePermission("shutdown", Action.ADMIN);
1294   }
1295
1296   @Override
1297   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1298       throws IOException {
1299     requirePermission("stopMaster", Action.ADMIN);
1300   }
1301
1302   @Override
1303   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1304       throws IOException {
1305     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1306       .getConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1307       // initialize the ACL storage table
1308       AccessControlLists.createACLTable(ctx.getEnvironment().getMasterServices());
1309     } else {
1310       aclTabAvailable = true;
1311     }
1312   }
1313
1314   @Override
1315   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1316       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1317       throws IOException {
1318     requirePermission("snapshot", hTableDescriptor.getTableName(), null, null,
1319       Permission.Action.ADMIN);
1320   }
1321
1322   @Override
1323   public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1324       final SnapshotDescription snapshot) throws IOException {
1325     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1326       // list it, if user is the owner of snapshot
1327       // TODO: We are not logging this for audit
1328     } else {
1329       requirePermission("listSnapshot", Action.ADMIN);
1330     }
1331   }
1332
1333   @Override
1334   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1335       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1336       throws IOException {
1337     requirePermission("clone", Action.ADMIN);
1338   }
1339
1340   @Override
1341   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1342       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1343       throws IOException {
1344     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1345       requirePermission("restoreSnapshot", hTableDescriptor.getTableName(), null, null,
1346         Permission.Action.ADMIN);
1347     } else {
1348       requirePermission("restoreSnapshot", Action.ADMIN);
1349     }
1350   }
1351
1352   @Override
1353   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1354       final SnapshotDescription snapshot) throws IOException {
1355     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1356       // Snapshot owner is allowed to delete the snapshot
1357       // TODO: We are not logging this for audit
1358     } else {
1359       requirePermission("deleteSnapshot", Action.ADMIN);
1360     }
1361   }
1362
1363   @Override
1364   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1365       NamespaceDescriptor ns) throws IOException {
1366     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1367   }
1368
1369   @Override
1370   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1371       throws IOException {
1372     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1373   }
1374
1375   @Override
1376   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1377       final String namespace) throws IOException {
1378     final Configuration conf = ctx.getEnvironment().getConfiguration();
1379     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1380       @Override
1381       public Void run() throws Exception {
1382         AccessControlLists.removeNamespacePermissions(conf, namespace);
1383         return null;
1384       }
1385     });
1386     this.authManager.getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
1387     LOG.info(namespace + " entry deleted in " + AccessControlLists.ACL_TABLE_NAME + " table.");
1388   }
1389
1390   @Override
1391   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1392       NamespaceDescriptor ns) throws IOException {
1393     // We require only global permission so that
1394     // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1395     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1396   }
1397
1398   @Override
1399   public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1400       throws IOException {
1401     requireNamespacePermission("getNamespaceDescriptor", namespace, Action.ADMIN);
1402   }
1403
1404   @Override
1405   public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1406       List<NamespaceDescriptor> descriptors) throws IOException {
1407     // Retains only those which passes authorization checks, as the checks weren't done as part
1408     // of preGetTableDescriptors.
1409     Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1410     while (itr.hasNext()) {
1411       NamespaceDescriptor desc = itr.next();
1412       try {
1413         requireNamespacePermission("listNamespaces", desc.getName(), Action.ADMIN);
1414       } catch (AccessDeniedException e) {
1415         itr.remove();
1416       }
1417     }
1418   }
1419
1420   @Override
1421   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1422       final TableName tableName) throws IOException {
1423     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1424   }
1425 
1426   /* ---- RegionObserver implementation ---- */
1427
1428   @Override
1429   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1430       throws IOException {
1431     RegionCoprocessorEnvironment env = e.getEnvironment();
1432     final Region region = env.getRegion();
1433     if (region == null) {
1434       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1435     } else {
1436       HRegionInfo regionInfo = region.getRegionInfo();
1437       if (regionInfo.getTable().isSystemTable()) {
1438         checkSystemOrSuperUser();
1439       } else {
1440         requirePermission("preOpen", Action.ADMIN);
1441       }
1442     }
1443   }
1444
1445   @Override
1446   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1447     RegionCoprocessorEnvironment env = c.getEnvironment();
1448     final Region region = env.getRegion();
1449     if (region == null) {
1450       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1451       return;
1452     }
1453     if (AccessControlLists.isAclRegion(region)) {
1454       aclRegion = true;
1455       // When this region is under recovering state, initialize will be handled by postLogReplay
1456       if (!region.isRecovering()) {
1457         try {
1458           initialize(env);
1459         } catch (IOException ex) {
1460           // if we can't obtain permissions, it's better to fail
1461           // than perform checks incorrectly
1462           throw new RuntimeException("Failed to initialize permissions cache", ex);
1463         }
1464       }
1465     } else {
1466       initialized = true;
1467     }
1468   }
1469
1470   @Override
1471   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1472     if (aclRegion) {
1473       try {
1474         initialize(c.getEnvironment());
1475       } catch (IOException ex) {
1476         // if we can't obtain permissions, it's better to fail
1477         // than perform checks incorrectly
1478         throw new RuntimeException("Failed to initialize permissions cache", ex);
1479       }
1480     }
1481   }
1482
1483   @Override
1484   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1485     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1486         Action.CREATE);
1487   }
1488
1489   @Override
1490   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1491     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1492   }
1493
1494   @Override
1495   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1496       byte[] splitRow) throws IOException {
1497     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1498   }
1499
1500   @Override
1501   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1502       final Store store, final InternalScanner scanner, final ScanType scanType)
1503           throws IOException {
1504     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1505         Action.CREATE);
1506     return scanner;
1507   }
1508
1509   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1510       final Query query, OpType opType) throws IOException {
1511     Filter filter = query.getFilter();
1512     // Don't wrap an AccessControlFilter
1513     if (filter != null && filter instanceof AccessControlFilter) {
1514       return;
1515     }
1516     User user = getActiveUser();
1517     RegionCoprocessorEnvironment env = c.getEnvironment();
1518     Map<byte[],? extends Collection<byte[]>> families = null;
1519     switch (opType) {
1520     case GET:
1521     case EXISTS:
1522       families = ((Get)query).getFamilyMap();
1523       break;
1524     case SCAN:
1525       families = ((Scan)query).getFamilyMap();
1526       break;
1527     default:
1528       throw new RuntimeException("Unhandled operation " + opType);
1529     }
1530     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1531     Region region = getRegion(env);
1532     TableName table = getTableName(region);
1533     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1534     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1535       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1536     }
1537     if (!authResult.isAllowed()) {
1538       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1539         // Old behavior: Scan with only qualifier checks if we have partial
1540         // permission. Backwards compatible behavior is to throw an
1541         // AccessDeniedException immediately if there are no grants for table
1542         // or CF or CF+qual. Only proceed with an injected filter if there are
1543         // grants for qualifiers. Otherwise we will fall through below and log
1544         // the result and throw an ADE. We may end up checking qualifier
1545         // grants three times (permissionGranted above, here, and in the
1546         // filter) but that's the price of backwards compatibility.
1547         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1548           authResult.setAllowed(true);
1549           authResult.setReason("Access allowed with filter");
1550           // Only wrap the filter if we are enforcing authorizations
1551           if (authorizationEnabled) {
1552             Filter ourFilter = new AccessControlFilter(authManager, user, table,
1553               AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1554               cfVsMaxVersions);
1555             // wrap any existing filter
1556             if (filter != null) {
1557               ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1558                 Lists.newArrayList(ourFilter, filter));
1559             }
1560             switch (opType) {
1561               case GET:
1562               case EXISTS:
1563                 ((Get)query).setFilter(ourFilter);
1564                 break;
1565               case SCAN:
1566                 ((Scan)query).setFilter(ourFilter);
1567                 break;
1568               default:
1569                 throw new RuntimeException("Unhandled operation " + opType);
1570             }
1571           }
1572         }
1573       } else {
1574         // New behavior: Any access we might be granted is more fine-grained
1575         // than whole table or CF. Simply inject a filter and return what is
1576         // allowed. We will not throw an AccessDeniedException. This is a
1577         // behavioral change since 0.96.
1578         authResult.setAllowed(true);
1579         authResult.setReason("Access allowed with filter");
1580         // Only wrap the filter if we are enforcing authorizations
1581         if (authorizationEnabled) {
1582           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1583             AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1584           // wrap any existing filter
1585           if (filter != null) {
1586             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1587               Lists.newArrayList(ourFilter, filter));
1588           }
1589           switch (opType) {
1590             case GET:
1591             case EXISTS:
1592               ((Get)query).setFilter(ourFilter);
1593               break;
1594             case SCAN:
1595               ((Scan)query).setFilter(ourFilter);
1596               break;
1597             default:
1598               throw new RuntimeException("Unhandled operation " + opType);
1599           }
1600         }
1601       }
1602     }
1603
1604     logResult(authResult);
1605     if (authorizationEnabled && !authResult.isAllowed()) {
1606       throw new AccessDeniedException("Insufficient permissions for user '"
1607           + (user != null ? user.getShortName() : "null")
1608           + "' (table=" + table + ", action=READ)");
1609     }
1610   }
1611
1612   @Override
1613   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1614       final Get get, final List<Cell> result) throws IOException {
1615     internalPreRead(c, get, OpType.GET);
1616   }
1617
1618   @Override
1619   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1620       final Get get, final boolean exists) throws IOException {
1621     internalPreRead(c, get, OpType.EXISTS);
1622     return exists;
1623   }
1624
1625   @Override
1626   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1627       final Put put, final WALEdit edit, final Durability durability)
1628       throws IOException {
1629     User user = getActiveUser();
1630     checkForReservedTagPresence(user, put);
1631
1632     // Require WRITE permission to the table, CF, or top visible value, if any.
1633     // NOTE: We don't need to check the permissions for any earlier Puts
1634     // because we treat the ACLs in each Put as timestamped like any other
1635     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1636     // change the ACL of any previous Put. This allows simple evolution of
1637     // security policy over time without requiring expensive updates.
1638     RegionCoprocessorEnvironment env = c.getEnvironment();
1639     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1640     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1641     logResult(authResult);
1642     if (!authResult.isAllowed()) {
1643       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1644         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1645       } else if (authorizationEnabled) {
1646         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1647       }
1648     }
1649
1650     // Add cell ACLs from the operation to the cells themselves
1651     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1652     if (bytes != null) {
1653       if (cellFeaturesEnabled) {
1654         addCellPermissions(bytes, put.getFamilyCellMap());
1655       } else {
1656         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1657       }
1658     }
1659   }
1660
1661   @Override
1662   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1663       final Put put, final WALEdit edit, final Durability durability) {
1664     if (aclRegion) {
1665       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1666     }
1667   }
1668
1669   @Override
1670   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1671       final Delete delete, final WALEdit edit, final Durability durability)
1672       throws IOException {
1673     // An ACL on a delete is useless, we shouldn't allow it
1674     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1675       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1676     }
1677     // Require WRITE permissions on all cells covered by the delete. Unlike
1678     // for Puts we need to check all visible prior versions, because a major
1679     // compaction could remove them. If the user doesn't have permission to
1680     // overwrite any of the visible versions ('visible' defined as not covered
1681     // by a tombstone already) then we have to disallow this operation.
1682     RegionCoprocessorEnvironment env = c.getEnvironment();
1683     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1684     User user = getActiveUser();
1685     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1686     logResult(authResult);
1687     if (!authResult.isAllowed()) {
1688       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1689         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1690       } else if (authorizationEnabled) {
1691         throw new AccessDeniedException("Insufficient permissions " +
1692           authResult.toContextString());
1693       }
1694     }
1695   }
1696
1697   @Override
1698   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1699       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1700     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1701       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1702       for (int i = 0; i < miniBatchOp.size(); i++) {
1703         Mutation m = miniBatchOp.getOperation(i);
1704         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1705           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1706           // perm check
1707           OpType opType;
1708           if (m instanceof Put) {
1709             checkForReservedTagPresence(getActiveUser(), m);
1710             opType = OpType.PUT;
1711           } else {
1712             opType = OpType.DELETE;
1713           }
1714           AuthResult authResult = null;
1715           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(),
1716             m.getFamilyCellMap(), m.getTimeStamp(), Action.WRITE)) {
1717             authResult = AuthResult.allow(opType.toString(), "Covering cell set",
1718               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1719           } else {
1720             authResult = AuthResult.deny(opType.toString(), "Covering cell set",
1721               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1722           }
1723           logResult(authResult);
1724           if (authorizationEnabled && !authResult.isAllowed()) {
1725             throw new AccessDeniedException("Insufficient permissions "
1726               + authResult.toContextString());
1727           }
1728         }
1729       }
1730     }
1731   }
1732
1733   @Override
1734   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1735       final Delete delete, final WALEdit edit, final Durability durability)
1736       throws IOException {
1737     if (aclRegion) {
1738       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1739     }
1740   }
1741
1742   @Override
1743   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1744       final byte [] row, final byte [] family, final byte [] qualifier,
1745       final CompareFilter.CompareOp compareOp,
1746       final ByteArrayComparable comparator, final Put put,
1747       final boolean result) throws IOException {
1748     User user = getActiveUser();
1749     checkForReservedTagPresence(user, put);
1750
1751     // Require READ and WRITE permissions on the table, CF, and KV to update
1752     RegionCoprocessorEnvironment env = c.getEnvironment();
1753     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1754     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1755       Action.READ, Action.WRITE);
1756     logResult(authResult);
1757     if (!authResult.isAllowed()) {
1758       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1759         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1760       } else if (authorizationEnabled) {
1761         throw new AccessDeniedException("Insufficient permissions " +
1762           authResult.toContextString());
1763       }
1764     }
1765
1766     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1767     if (bytes != null) {
1768       if (cellFeaturesEnabled) {
1769         addCellPermissions(bytes, put.getFamilyCellMap());
1770       } else {
1771         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1772       }
1773     }
1774     return result;
1775   }
1776
1777   @Override
1778   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1779       final byte[] row, final byte[] family, final byte[] qualifier,
1780       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1781       final boolean result) throws IOException {
1782     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1783       // We had failure with table, cf and q perm checks and now giving a chance for cell
1784       // perm check
1785       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1786       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1787       AuthResult authResult = null;
1788       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1789           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1790         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1791             getActiveUser(), Action.READ, table, families);
1792       } else {
1793         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1794             getActiveUser(), Action.READ, table, families);
1795       }
1796       logResult(authResult);
1797       if (authorizationEnabled && !authResult.isAllowed()) {
1798         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1799       }
1800     }
1801     return result;
1802   }
1803
1804   @Override
1805   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1806       final byte [] row, final byte [] family, final byte [] qualifier,
1807       final CompareFilter.CompareOp compareOp,
1808       final ByteArrayComparable comparator, final Delete delete,
1809       final boolean result) throws IOException {
1810     // An ACL on a delete is useless, we shouldn't allow it
1811     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1812       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1813           delete.toString());
1814     }
1815     // Require READ and WRITE permissions on the table, CF, and the KV covered
1816     // by the delete
1817     RegionCoprocessorEnvironment env = c.getEnvironment();
1818     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1819     User user = getActiveUser();
1820     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1821         Action.READ, Action.WRITE);
1822     logResult(authResult);
1823     if (!authResult.isAllowed()) {
1824       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1825         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1826       } else if (authorizationEnabled) {
1827         throw new AccessDeniedException("Insufficient permissions " +
1828           authResult.toContextString());
1829       }
1830     }
1831     return result;
1832   }
1833
1834   @Override
1835   public boolean preCheckAndDeleteAfterRowLock(
1836       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1837       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1838       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1839       throws IOException {
1840     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1841       // We had failure with table, cf and q perm checks and now giving a chance for cell
1842       // perm check
1843       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1844       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1845       AuthResult authResult = null;
1846       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1847           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1848         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1849             getActiveUser(), Action.READ, table, families);
1850       } else {
1851         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1852             getActiveUser(), Action.READ, table, families);
1853       }
1854       logResult(authResult);
1855       if (authorizationEnabled && !authResult.isAllowed()) {
1856         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1857       }
1858     }
1859     return result;
1860   }
1861
1862   @Override
1863   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1864       final byte [] row, final byte [] family, final byte [] qualifier,
1865       final long amount, final boolean writeToWAL)
1866       throws IOException {
1867     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1868     // incremented value
1869     RegionCoprocessorEnvironment env = c.getEnvironment();
1870     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1871     User user = getActiveUser();
1872     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1873         Action.WRITE);
1874     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1875       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1876         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1877       authResult.setReason("Covering cell set");
1878     }
1879     logResult(authResult);
1880     if (authorizationEnabled && !authResult.isAllowed()) {
1881       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1882     }
1883     return -1;
1884   }
1885
1886   @Override
1887   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1888       throws IOException {
1889     User user = getActiveUser();
1890     checkForReservedTagPresence(user, append);
1891
1892     // Require WRITE permission to the table, CF, and the KV to be appended
1893     RegionCoprocessorEnvironment env = c.getEnvironment();
1894     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1895     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1896     logResult(authResult);
1897     if (!authResult.isAllowed()) {
1898       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1899         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1900       } else if (authorizationEnabled)  {
1901         throw new AccessDeniedException("Insufficient permissions " +
1902           authResult.toContextString());
1903       }
1904     }
1905
1906     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1907     if (bytes != null) {
1908       if (cellFeaturesEnabled) {
1909         addCellPermissions(bytes, append.getFamilyCellMap());
1910       } else {
1911         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1912       }
1913     }
1914
1915     return null;
1916   }
1917
1918   @Override
1919   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1920       final Append append) throws IOException {
1921     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1922       // We had failure with table, cf and q perm checks and now giving a chance for cell
1923       // perm check
1924       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1925       AuthResult authResult = null;
1926       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1927           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1928         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1929             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1930       } else {
1931         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1932             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1933       }
1934       logResult(authResult);
1935       if (authorizationEnabled && !authResult.isAllowed()) {
1936         throw new AccessDeniedException("Insufficient permissions " +
1937           authResult.toContextString());
1938       }
1939     }
1940     return null;
1941   }
1942
1943   @Override
1944   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1945       final Increment increment)
1946       throws IOException {
1947     User user = getActiveUser();
1948     checkForReservedTagPresence(user, increment);
1949
1950     // Require WRITE permission to the table, CF, and the KV to be replaced by
1951     // the incremented value
1952     RegionCoprocessorEnvironment env = c.getEnvironment();
1953     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1954     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1955       Action.WRITE);
1956     logResult(authResult);
1957     if (!authResult.isAllowed()) {
1958       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1959         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1960       } else if (authorizationEnabled) {
1961         throw new AccessDeniedException("Insufficient permissions " +
1962           authResult.toContextString());
1963       }
1964     }
1965
1966     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1967     if (bytes != null) {
1968       if (cellFeaturesEnabled) {
1969         addCellPermissions(bytes, increment.getFamilyCellMap());
1970       } else {
1971         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1972       }
1973     }
1974
1975     return null;
1976   }
1977
1978   @Override
1979   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1980       final Increment increment) throws IOException {
1981     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1982       // We had failure with table, cf and q perm checks and now giving a chance for cell
1983       // perm check
1984       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1985       AuthResult authResult = null;
1986       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1987           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1988         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1989             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1990       } else {
1991         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1992             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1993       }
1994       logResult(authResult);
1995       if (authorizationEnabled && !authResult.isAllowed()) {
1996         throw new AccessDeniedException("Insufficient permissions " +
1997           authResult.toContextString());
1998       }
1999     }
2000     return null;
2001   }
2002
2003   @Override
2004   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
2005       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
2006     // If the HFile version is insufficient to persist tags, we won't have any
2007     // work to do here
2008     if (!cellFeaturesEnabled) {
2009       return newCell;
2010     }
2011
2012     // Collect any ACLs from the old cell
2013     List<Tag> tags = Lists.newArrayList();
2014     List<Tag> aclTags = Lists.newArrayList();
2015     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
2016     if (oldCell != null) {
2017       Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell);
2018       while (tagIterator.hasNext()) {
2019         Tag tag = tagIterator.next();
2020         if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
2021           // Not an ACL tag, just carry it through
2022           if (LOG.isTraceEnabled()) {
2023             LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType()
2024                 + " length " + tag.getValueLength());
2025           }
2026           tags.add(tag);
2027         } else {
2028           aclTags.add(tag);
2029         }
2030       }
2031     }
2032
2033     // Do we have an ACL on the operation?
2034     byte[] aclBytes = mutation.getACL();
2035     if (aclBytes != null) {
2036       // Yes, use it
2037       tags.add(new ArrayBackedTag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
2038     } else {
2039       // No, use what we carried forward
2040       if (perms != null) {
2041         // TODO: If we collected ACLs from more than one tag we may have a
2042         // List<Permission> of size > 1, this can be collapsed into a single
2043         // Permission
2044         if (LOG.isTraceEnabled()) {
2045           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
2046         }
2047         tags.addAll(aclTags);
2048       }
2049     }
2050
2051     // If we have no tags to add, just return
2052     if (tags.isEmpty()) {
2053       return newCell;
2054     }
2055
2056     Cell rewriteCell = new TagRewriteCell(newCell, TagUtil.fromList(tags));
2057     return rewriteCell;
2058   }
2059
2060   @Override
2061   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2062       final Scan scan, final RegionScanner s) throws IOException {
2063     internalPreRead(c, scan, OpType.SCAN);
2064     return s;
2065   }
2066
2067   @Override
2068   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2069       final Scan scan, final RegionScanner s) throws IOException {
2070     User user = getActiveUser();
2071     if (user != null && user.getShortName() != null) {
2072       // store reference to scanner owner for later checks
2073       scannerOwners.put(s, user.getShortName());
2074     }
2075     return s;
2076   }
2077
2078   @Override
2079   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
2080       final InternalScanner s, final List<Result> result,
2081       final int limit, final boolean hasNext) throws IOException {
2082     requireScannerOwner(s);
2083     return hasNext;
2084   }
2085
2086   @Override
2087   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2088       final InternalScanner s) throws IOException {
2089     requireScannerOwner(s);
2090   }
2091
2092   @Override
2093   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2094       final InternalScanner s) throws IOException {
2095     // clean up any associated owner mapping
2096     scannerOwners.remove(s);
2097   }
2098
2099   @Override
2100   public boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> e,
2101       final InternalScanner s, final Cell curRowCell, final boolean hasMore) throws IOException {
2102     // Impl in BaseRegionObserver might do unnecessary copy for Off heap backed Cells.
2103     return hasMore;
2104   }
2105
2106   /**
2107    * Verify, when servicing an RPC, that the caller is the scanner owner.
2108    * If so, we assume that access control is correctly enforced based on
2109    * the checks performed in preScannerOpen()
2110    */
2111   private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
2112     if (!RpcServer.isInRpcCallContext())
2113       return;
2114     String requestUserName = RpcServer.getRequestUserName();
2115     String owner = scannerOwners.get(s);
2116     if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
2117       throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
2118     }
2119   }
2120
2121   /**
2122    * Verifies user has CREATE privileges on
2123    * the Column Families involved in the bulkLoadHFile
2124    * request. Specific Column Write privileges are presently
2125    * ignored.
2126    */
2127   @Override
2128   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
2129       List<Pair<byte[], String>> familyPaths) throws IOException {
2130     for(Pair<byte[],String> el : familyPaths) {
2131       requirePermission("preBulkLoadHFile",
2132           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
2133           el.getFirst(),
2134           null,
2135           Action.CREATE);
2136     }
2137   }
2138
2139   /**
2140    * Authorization check for
2141    * SecureBulkLoadProtocol.prepareBulkLoad()
2142    * @param ctx the context
2143    * @param request the request
2144    * @throws IOException
2145    */
2146   @Override
2147   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2148                                  PrepareBulkLoadRequest request) throws IOException {
2149     requireAccess("prePareBulkLoad",
2150         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2151   }
2152
2153   /**
2154    * Authorization security check for
2155    * SecureBulkLoadProtocol.cleanupBulkLoad()
2156    * @param ctx the context
2157    * @param request the request
2158    * @throws IOException
2159    */
2160   @Override
2161   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2162                                  CleanupBulkLoadRequest request) throws IOException {
2163     requireAccess("preCleanupBulkLoad",
2164         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2165   }
2166 
2167   /* ---- EndpointObserver implementation ---- */
2168
2169   @Override
2170   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2171       Service service, String methodName, Message request) throws IOException {
2172     // Don't intercept calls to our own AccessControlService, we check for
2173     // appropriate permissions in the service handlers
2174     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2175       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
2176         methodName + ")",
2177         getTableName(ctx.getEnvironment()), null, null,
2178         Action.EXEC);
2179     }
2180     return request;
2181   }
2182
2183   @Override
2184   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2185       Service service, String methodName, Message request, Message.Builder responseBuilder)
2186       throws IOException { }
2187 
2188   /* ---- Protobuf AccessControlService implementation ---- */
2189
2190   @Override
2191   public void grant(RpcController controller,
2192                     AccessControlProtos.GrantRequest request,
2193                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2194     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2195     AccessControlProtos.GrantResponse response = null;
2196     try {
2197       // verify it's only running at .acl.
2198       if (aclRegion) {
2199         if (!initialized) {
2200           throw new CoprocessorException("AccessController not yet initialized");
2201         }
2202         if (LOG.isDebugEnabled()) {
2203           LOG.debug("Received request to grant access permission " + perm.toString());
2204         }
2205
2206         switch(request.getUserPermission().getPermission().getType()) {
2207           case Global :
2208           case Table :
2209             requirePermission("grant", perm.getTableName(), perm.getFamily(),
2210               perm.getQualifier(), Action.ADMIN);
2211             break;
2212           case Namespace :
2213             requireNamespacePermission("grant", perm.getNamespace(), Action.ADMIN);
2214            break;
2215         }
2216
2217         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2218           @Override
2219           public Void run() throws Exception {
2220             AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2221             return null;
2222           }
2223         });
2224
2225         if (AUDITLOG.isTraceEnabled()) {
2226           // audit log should store permission changes in addition to auth results
2227           AUDITLOG.trace("Granted permission " + perm.toString());
2228         }
2229       } else {
2230         throw new CoprocessorException(AccessController.class, "This method "
2231             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2232       }
2233       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2234     } catch (IOException ioe) {
2235       // pass exception back up
2236       ResponseConverter.setControllerException(controller, ioe);
2237     }
2238     done.run(response);
2239   }
2240
2241   @Override
2242   public void revoke(RpcController controller,
2243                      AccessControlProtos.RevokeRequest request,
2244                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2245     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2246     AccessControlProtos.RevokeResponse response = null;
2247     try {
2248       // only allowed to be called on _acl_ region
2249       if (aclRegion) {
2250         if (!initialized) {
2251           throw new CoprocessorException("AccessController not yet initialized");
2252         }
2253         if (LOG.isDebugEnabled()) {
2254           LOG.debug("Received request to revoke access permission " + perm.toString());
2255         }
2256
2257         switch(request.getUserPermission().getPermission().getType()) {
2258           case Global :
2259           case Table :
2260             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2261               perm.getQualifier(), Action.ADMIN);
2262             break;
2263           case Namespace :
2264             requireNamespacePermission("revoke", perm.getNamespace(), Action.ADMIN);
2265             break;
2266         }
2267
2268         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2269           @Override
2270           public Void run() throws Exception {
2271             AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2272             return null;
2273           }
2274         });
2275
2276         if (AUDITLOG.isTraceEnabled()) {
2277           // audit log should record all permission changes
2278           AUDITLOG.trace("Revoked permission " + perm.toString());
2279         }
2280       } else {
2281         throw new CoprocessorException(AccessController.class, "This method "
2282             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2283       }
2284       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2285     } catch (IOException ioe) {
2286       // pass exception back up
2287       ResponseConverter.setControllerException(controller, ioe);
2288     }
2289     done.run(response);
2290   }
2291
2292   @Override
2293   public void getUserPermissions(RpcController controller,
2294                                  AccessControlProtos.GetUserPermissionsRequest request,
2295                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2296     AccessControlProtos.GetUserPermissionsResponse response = null;
2297     try {
2298       // only allowed to be called on _acl_ region
2299       if (aclRegion) {
2300         if (!initialized) {
2301           throw new CoprocessorException("AccessController not yet initialized");
2302         }
2303         List<UserPermission> perms = null;
2304         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2305           final TableName table = request.hasTableName() ?
2306             ProtobufUtil.toTableName(request.getTableName()) : null;
2307           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2308           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2309             @Override
2310             public List<UserPermission> run() throws Exception {
2311               return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2312             }
2313           });
2314         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2315           final String namespace = request.getNamespaceName().toStringUtf8();
2316           requireNamespacePermission("userPermissions", namespace, Action.ADMIN);
2317           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2318             @Override
2319             public List<UserPermission> run() throws Exception {
2320               return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2321                 namespace);
2322             }
2323           });
2324         } else {
2325           requirePermission("userPermissions", Action.ADMIN);
2326           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2327             @Override
2328             public List<UserPermission> run() throws Exception {
2329               return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2330             }
2331           });
2332           // Adding superusers explicitly to the result set as AccessControlLists do not store them.
2333           // Also using acl as table name to be inline  with the results of global admin and will
2334           // help in avoiding any leakage of information about being superusers.
2335           for (String user: Superusers.getSuperUsers()) {
2336             perms.add(new UserPermission(user.getBytes(), AccessControlLists.ACL_TABLE_NAME, null,
2337                 Action.values()));
2338           }
2339         }
2340         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2341       } else {
2342         throw new CoprocessorException(AccessController.class, "This method "
2343             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2344       }
2345     } catch (IOException ioe) {
2346       // pass exception back up
2347       ResponseConverter.setControllerException(controller, ioe);
2348     }
2349     done.run(response);
2350   }
2351
2352   @Override
2353   public void checkPermissions(RpcController controller,
2354                                AccessControlProtos.CheckPermissionsRequest request,
2355                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2356     Permission[] permissions = new Permission[request.getPermissionCount()];
2357     for (int i=0; i < request.getPermissionCount(); i++) {
2358       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2359     }
2360     AccessControlProtos.CheckPermissionsResponse response = null;
2361     try {
2362       User user = getActiveUser();
2363       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2364       for (Permission permission : permissions) {
2365         if (permission instanceof TablePermission) {
2366           // Check table permissions
2367
2368           TablePermission tperm = (TablePermission) permission;
2369           for (Action action : permission.getActions()) {
2370             if (!tperm.getTableName().equals(tableName)) {
2371               throw new CoprocessorException(AccessController.class, String.format("This method "
2372                   + "can only execute at the table specified in TablePermission. " +
2373                   "Table of the region:%s , requested table:%s", tableName,
2374                   tperm.getTableName()));
2375             }
2376
2377             Map<byte[], Set<byte[]>> familyMap =
2378                 new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2379             if (tperm.getFamily() != null) {
2380               if (tperm.getQualifier() != null) {
2381                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2382                 qualifiers.add(tperm.getQualifier());
2383                 familyMap.put(tperm.getFamily(), qualifiers);
2384               } else {
2385                 familyMap.put(tperm.getFamily(), null);
2386               }
2387             }
2388
2389             AuthResult result = permissionGranted("checkPermissions", user, action, regionEnv,
2390               familyMap);
2391             logResult(result);
2392             if (!result.isAllowed()) {
2393               // Even if passive we need to throw an exception here, we support checking
2394               // effective permissions, so throw unconditionally
2395               throw new AccessDeniedException("Insufficient permissions (table=" + tableName +
2396                 (familyMap.size() > 0 ? ", family: " + result.toFamilyString() : "") +
2397                 ", action=" + action.toString() + ")");
2398             }
2399           }
2400
2401         } else {
2402           // Check global permissions
2403
2404           for (Action action : permission.getActions()) {
2405             AuthResult result;
2406             if (authManager.authorize(user, action)) {
2407               result = AuthResult.allow("checkPermissions", "Global action allowed", user,
2408                 action, null, null);
2409             } else {
2410               result = AuthResult.deny("checkPermissions", "Global action denied", user, action,
2411                 null, null);
2412             }
2413             logResult(result);
2414             if (!result.isAllowed()) {
2415               // Even if passive we need to throw an exception here, we support checking
2416               // effective permissions, so throw unconditionally
2417               throw new AccessDeniedException("Insufficient permissions (action=" +
2418                 action.toString() + ")");
2419             }
2420           }
2421         }
2422       }
2423       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2424     } catch (IOException ioe) {
2425       ResponseConverter.setControllerException(controller, ioe);
2426     }
2427     done.run(response);
2428   }
2429
2430   @Override
2431   public Service getService() {
2432     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2433   }
2434
2435   private Region getRegion(RegionCoprocessorEnvironment e) {
2436     return e.getRegion();
2437   }
2438
2439   private TableName getTableName(RegionCoprocessorEnvironment e) {
2440     Region region = e.getRegion();
2441     if (region != null) {
2442       return getTableName(region);
2443     }
2444     return null;
2445   }
2446
2447   private TableName getTableName(Region region) {
2448     HRegionInfo regionInfo = region.getRegionInfo();
2449     if (regionInfo != null) {
2450       return regionInfo.getTable();
2451     }
2452     return null;
2453   }
2454
2455   @Override
2456   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2457       throws IOException {
2458     requirePermission("preClose", Action.ADMIN);
2459   }
2460
2461   private void checkSystemOrSuperUser() throws IOException {
2462     // No need to check if we're not going to throw
2463     if (!authorizationEnabled) {
2464       return;
2465     }
2466     User activeUser = getActiveUser();
2467     if (!Superusers.isSuperUser(activeUser)) {
2468       throw new AccessDeniedException("User '" + (activeUser != null ?
2469         activeUser.getShortName() : "null") + "' is not system or super user.");
2470     }
2471   }
2472
2473   @Override
2474   public void preStopRegionServer(
2475       ObserverContext<RegionServerCoprocessorEnvironment> env)
2476       throws IOException {
2477     requirePermission("preStopRegionServer", Action.ADMIN);
2478   }
2479
2480   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2481       byte[] qualifier) {
2482     if (family == null) {
2483       return null;
2484     }
2485
2486     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2487     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2488     return familyMap;
2489   }
2490
2491   @Override
2492   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2493        List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2494        String regex) throws IOException {
2495     // We are delegating the authorization check to postGetTableDescriptors as we don't have
2496     // any concrete set of table names when a regex is present or the full list is requested.
2497     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2498       // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2499       // request can be granted.
2500       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2501       for (TableName tableName: tableNamesList) {
2502         // Skip checks for a table that does not exist
2503         if (!masterServices.getTableStateManager().isTablePresent(tableName))
2504           continue;
2505         requirePermission("getTableDescriptors", tableName, null, null,
2506             Action.ADMIN, Action.CREATE);
2507       }
2508     }
2509   }
2510
2511   @Override
2512   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2513       List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2514       String regex) throws IOException {
2515     // Skipping as checks in this case are already done by preGetTableDescriptors.
2516     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2517       return;
2518     }
2519
2520     // Retains only those which passes authorization checks, as the checks weren't done as part
2521     // of preGetTableDescriptors.
2522     Iterator<HTableDescriptor> itr = descriptors.iterator();
2523     while (itr.hasNext()) {
2524       HTableDescriptor htd = itr.next();
2525       try {
2526         requirePermission("getTableDescriptors", htd.getTableName(), null, null,
2527             Action.ADMIN, Action.CREATE);
2528       } catch (AccessDeniedException e) {
2529         itr.remove();
2530       }
2531     }
2532   }
2533
2534   @Override
2535   public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2536       List<HTableDescriptor> descriptors, String regex) throws IOException {
2537     // Retains only those which passes authorization checks.
2538     Iterator<HTableDescriptor> itr = descriptors.iterator();
2539     while (itr.hasNext()) {
2540       HTableDescriptor htd = itr.next();
2541       try {
2542         requireAccess("getTableNames", htd.getTableName(), Action.values());
2543       } catch (AccessDeniedException e) {
2544         itr.remove();
2545       }
2546     }
2547   }
2548
2549   @Override
2550   public void preDispatchMerge(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2551       HRegionInfo regionA, HRegionInfo regionB) throws IOException {
2552     requirePermission("mergeRegions", regionA.getTable(), null, null,
2553       Action.ADMIN);
2554   }
2555
2556   @Override
2557   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA,
2558       Region regionB) throws IOException {
2559     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2560       Action.ADMIN);
2561   }
2562
2563   @Override
2564   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, Region regionA,
2565       Region regionB, Region mergedRegion) throws IOException { }
2566
2567   @Override
2568   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2569       Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException { }
2570
2571   @Override
2572   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2573       Region regionA, Region regionB, Region mergedRegion) throws IOException { }
2574
2575   @Override
2576   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2577       Region regionA, Region regionB) throws IOException { }
2578
2579   @Override
2580   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2581       Region regionA, Region regionB) throws IOException { }
2582
2583   @Override
2584   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2585       throws IOException {
2586     requirePermission("preRollLogWriterRequest", Permission.Action.ADMIN);
2587   }
2588
2589   @Override
2590   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2591       throws IOException { }
2592
2593   @Override
2594   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2595       final String userName, final Quotas quotas) throws IOException {
2596     requirePermission("setUserQuota", Action.ADMIN);
2597   }
2598
2599   @Override
2600   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2601       final String userName, final TableName tableName, final Quotas quotas) throws IOException {
2602     requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN);
2603   }
2604
2605   @Override
2606   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2607       final String userName, final String namespace, final Quotas quotas) throws IOException {
2608     requirePermission("setUserNamespaceQuota", Action.ADMIN);
2609   }
2610
2611   @Override
2612   public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2613       final TableName tableName, final Quotas quotas) throws IOException {
2614     requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
2615   }
2616
2617   @Override
2618   public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2619       final String namespace, final Quotas quotas) throws IOException {
2620     requirePermission("setNamespaceQuota", Action.ADMIN);
2621   }
2622
2623   @Override
2624   public ReplicationEndpoint postCreateReplicationEndPoint(
2625       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2626     return endpoint;
2627   }
2628
2629   @Override
2630   public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2631       List<WALEntry> entries, CellScanner cells) throws IOException {
2632     requirePermission("replicateLogEntries", Action.WRITE);
2633   }
2634
2635   @Override
2636   public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2637       List<WALEntry> entries, CellScanner cells) throws IOException {
2638   }
2639
2640   @Override
2641   public void preMoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
2642                              Set<HostAndPort> servers, String targetGroup) throws IOException {
2643     requirePermission("moveServers", Action.ADMIN);
2644   }
2645
2646   @Override
2647   public void preMoveTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
2648                             Set<TableName> tables, String targetGroup) throws IOException {
2649     requirePermission("moveTables", Action.ADMIN);
2650   }
2651
2652   @Override
2653   public void preAddRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2654                             String name) throws IOException {
2655     requirePermission("addRSGroup", Action.ADMIN);
2656   }
2657
2658   @Override
2659   public void preRemoveRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2660                                String name) throws IOException {
2661     requirePermission("removeRSGroup", Action.ADMIN);
2662   }
2663
2664   @Override
2665   public void preBalanceRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2666                                 String groupName) throws IOException {
2667     requirePermission("balanceRSGroup", Action.ADMIN);
2668   }
2669 }