View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.security.access;
20  
21  import com.google.common.net.HostAndPort;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.security.PrivilegedExceptionAction;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  import java.util.Set;
34  import java.util.TreeMap;
35  import java.util.TreeSet;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.ArrayBackedTag;
41  import org.apache.hadoop.hbase.Cell;
42  import org.apache.hadoop.hbase.CellScanner;
43  import org.apache.hadoop.hbase.CellUtil;
44  import org.apache.hadoop.hbase.CompoundConfiguration;
45  import org.apache.hadoop.hbase.CoprocessorEnvironment;
46  import org.apache.hadoop.hbase.DoNotRetryIOException;
47  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
48  import org.apache.hadoop.hbase.HColumnDescriptor;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.HTableDescriptor;
52  import org.apache.hadoop.hbase.KeyValue;
53  import org.apache.hadoop.hbase.KeyValue.Type;
54  import org.apache.hadoop.hbase.MetaTableAccessor;
55  import org.apache.hadoop.hbase.NamespaceDescriptor;
56  import org.apache.hadoop.hbase.ProcedureInfo;
57  import org.apache.hadoop.hbase.ServerName;
58  import org.apache.hadoop.hbase.TableName;
59  import org.apache.hadoop.hbase.Tag;
60  import org.apache.hadoop.hbase.TagUtil;
61  import org.apache.hadoop.hbase.classification.InterfaceAudience;
62  import org.apache.hadoop.hbase.client.Append;
63  import org.apache.hadoop.hbase.client.Delete;
64  import org.apache.hadoop.hbase.client.Durability;
65  import org.apache.hadoop.hbase.client.Get;
66  import org.apache.hadoop.hbase.client.Increment;
67  import org.apache.hadoop.hbase.client.MasterSwitchType;
68  import org.apache.hadoop.hbase.client.Mutation;
69  import org.apache.hadoop.hbase.client.Put;
70  import org.apache.hadoop.hbase.client.Query;
71  import org.apache.hadoop.hbase.client.Result;
72  import org.apache.hadoop.hbase.client.Scan;
73  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
74  import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
75  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
76  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
77  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
78  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
79  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
80  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
81  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
82  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
83  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
84  import org.apache.hadoop.hbase.filter.CompareFilter;
85  import org.apache.hadoop.hbase.filter.Filter;
86  import org.apache.hadoop.hbase.filter.FilterList;
87  import org.apache.hadoop.hbase.io.hfile.HFile;
88  import org.apache.hadoop.hbase.ipc.RpcServer;
89  import org.apache.hadoop.hbase.master.MasterServices;
90  import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
91  import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
92  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
93  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
94  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
95  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
96  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
97  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
98  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
99  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
100 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
101 import org.apache.hadoop.hbase.regionserver.InternalScanner;
102 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
103 import org.apache.hadoop.hbase.regionserver.Region;
104 import org.apache.hadoop.hbase.regionserver.RegionScanner;
105 import org.apache.hadoop.hbase.regionserver.ScanType;
106 import org.apache.hadoop.hbase.regionserver.ScannerContext;
107 import org.apache.hadoop.hbase.regionserver.Store;
108 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
109 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
110 import org.apache.hadoop.hbase.security.AccessDeniedException;
111 import org.apache.hadoop.hbase.security.Superusers;
112 import org.apache.hadoop.hbase.security.User;
113 import org.apache.hadoop.hbase.security.UserProvider;
114 import org.apache.hadoop.hbase.security.access.Permission.Action;
115 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
116 import org.apache.hadoop.hbase.util.ByteRange;
117 import org.apache.hadoop.hbase.util.Bytes;
118 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
119 import org.apache.hadoop.hbase.util.Pair;
120 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
121 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
122
123 import com.google.common.collect.ArrayListMultimap;
124 import com.google.common.collect.ImmutableSet;
125 import com.google.common.collect.ListMultimap;
126 import com.google.common.collect.Lists;
127 import com.google.common.collect.MapMaker;
128 import com.google.common.collect.Maps;
129 import com.google.common.collect.Sets;
130 import com.google.protobuf.Message;
131 import com.google.protobuf.RpcCallback;
132 import com.google.protobuf.RpcController;
133 import com.google.protobuf.Service;
134
135 /**
136  * Provides basic authorization checks for data access and administrative
137  * operations.
138  *
139  * <p>
140  * {@code AccessController} performs authorization checks for HBase operations
141  * based on:
142  * </p>
143  * <ul>
144  *   <li>the identity of the user performing the operation</li>
145  *   <li>the scope over which the operation is performed, in increasing
146  *   specificity: global, table, column family, or qualifier</li>
147  *   <li>the type of action being performed (as mapped to
148  *   {@link Permission.Action} values)</li>
149  * </ul>
150  * <p>
151  * If the authorization check fails, an {@link AccessDeniedException}
152  * will be thrown for the operation.
153  * </p>
154  *
155  * <p>
156  * To perform authorization checks, {@code AccessController} relies on the
157  * RpcServerEngine being loaded to provide
158  * the user identities for remote requests.
159  * </p>
160  *
161  * <p>
162  * The access control lists used for authorization can be manipulated via the
163  * exposed {@link AccessControlService} Interface implementation, and the associated
164  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
165  * commands.
166  * </p>
167  */
168 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
169 public class AccessController extends BaseMasterAndRegionObserver
170     implements RegionServerObserver,
171       AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
172
173   private static final Log LOG = LogFactory.getLog(AccessController.class);
174
175   private static final Log AUDITLOG =
176     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
177   private static final String CHECK_COVERING_PERM = "check_covering_perm";
178   private static final String TAG_CHECK_PASSED = "tag_check_passed";
179   private static final byte[] TRUE = Bytes.toBytes(true);
180
181   TableAuthManager authManager = null;
182
183   /** flags if we are running on a region of the _acl_ table */
184   boolean aclRegion = false;
185
186   /** defined only for Endpoint implementation, so it can have way to
187    access region services */
188   private RegionCoprocessorEnvironment regionEnv;
189
190   /** Mapping of scanner instances to the user who created them */
191   private Map<InternalScanner,String> scannerOwners =
192       new MapMaker().weakKeys().makeMap();
193
194   private Map<TableName, List<UserPermission>> tableAcls;
195
196   /** Provider for mapping principal names to Users */
197   private UserProvider userProvider;
198
199   /** if we are active, usually true, only not true if "hbase.security.authorization"
200    has been set to false in site configuration */
201   boolean authorizationEnabled;
202
203   /** if we are able to support cell ACLs */
204   boolean cellFeaturesEnabled;
205
206   /** if we should check EXEC permissions */
207   boolean shouldCheckExecPermission;
208
209   /** if we should terminate access checks early as soon as table or CF grants
210     allow access; pre-0.98 compatible behavior */
211   boolean compatibleEarlyTermination;
212
213   /** if we have been successfully initialized */
214   private volatile boolean initialized = false;
215
216   /** if the ACL table is available, only relevant in the master */
217   private volatile boolean aclTabAvailable = false;
218
219   public static boolean isAuthorizationSupported(Configuration conf) {
220     return conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
221   }
222
223   public static boolean isCellAuthorizationSupported(Configuration conf) {
224     return isAuthorizationSupported(conf) &&
225         (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
226   }
227
228   public Region getRegion() {
229     return regionEnv != null ? regionEnv.getRegion() : null;
230   }
231
232   public TableAuthManager getAuthManager() {
233     return authManager;
234   }
235
236   void initialize(RegionCoprocessorEnvironment e) throws IOException {
237     final Region region = e.getRegion();
238     Configuration conf = e.getConfiguration();
239     Map<byte[], ListMultimap<String,TablePermission>> tables =
240         AccessControlLists.loadAll(region);
241     // For each table, write out the table's permissions to the respective
242     // znode for that table.
243     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
244       tables.entrySet()) {
245       byte[] entry = t.getKey();
246       ListMultimap<String,TablePermission> perms = t.getValue();
247       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
248       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
249     }
250     initialized = true;
251   }
252
253   /**
254    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
255    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
256    * table updates.
257    */
258   void updateACL(RegionCoprocessorEnvironment e,
259       final Map<byte[], List<Cell>> familyMap) {
260     Set<byte[]> entries =
261         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
262     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
263       List<Cell> cells = f.getValue();
264       for (Cell cell: cells) {
265         if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
266             cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
267             AccessControlLists.ACL_LIST_FAMILY.length)) {
268           entries.add(CellUtil.cloneRow(cell));
269         }
270       }
271     }
272     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
273     Configuration conf = regionEnv.getConfiguration();
274     for (byte[] entry: entries) {
275       try {
276         ListMultimap<String,TablePermission> perms =
277           AccessControlLists.getPermissions(conf, entry);
278         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
279         zkw.writeToZookeeper(entry, serialized);
280       } catch (IOException ex) {
281         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
282             ex);
283       }
284     }
285   }
286
287   /**
288    * Check the current user for authorization to perform a specific action
289    * against the given set of row data.
290    *
291    * <p>Note: Ordering of the authorization checks
292    * has been carefully optimized to short-circuit the most common requests
293    * and minimize the amount of processing required.</p>
294    *
295    * @param permRequest the action being requested
296    * @param e the coprocessor environment
297    * @param families the map of column families to qualifiers present in
298    * the request
299    * @return an authorization result
300    */
301   AuthResult permissionGranted(String request, User user, Action permRequest,
302       RegionCoprocessorEnvironment e,
303       Map<byte [], ? extends Collection<?>> families) {
304     HRegionInfo hri = e.getRegion().getRegionInfo();
305     TableName tableName = hri.getTable();
306
307     // 1. All users need read access to hbase:meta table.
308     // this is a very common operation, so deal with it quickly.
309     if (hri.isMetaRegion()) {
310       if (permRequest == Action.READ) {
311         return AuthResult.allow(request, "All users allowed", user,
312           permRequest, tableName, families);
313       }
314     }
315
316     if (user == null) {
317       return AuthResult.deny(request, "No user associated with request!", null,
318         permRequest, tableName, families);
319     }
320
321     // 2. check for the table-level, if successful we can short-circuit
322     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
323       return AuthResult.allow(request, "Table permission granted", user,
324         permRequest, tableName, families);
325     }
326
327     // 3. check permissions against the requested families
328     if (families != null && families.size() > 0) {
329       // all families must pass
330       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
331         // a) check for family level access
332         if (authManager.authorize(user, tableName, family.getKey(),
333             permRequest)) {
334           continue;  // family-level permission overrides per-qualifier
335         }
336
337         // b) qualifier level access can still succeed
338         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
339           if (family.getValue() instanceof Set) {
340             // for each qualifier of the family
341             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
342             for (byte[] qualifier : familySet) {
343               if (!authManager.authorize(user, tableName, family.getKey(),
344                                          qualifier, permRequest)) {
345                 return AuthResult.deny(request, "Failed qualifier check", user,
346                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
347               }
348             }
349           } else if (family.getValue() instanceof List) { // List<KeyValue>
350             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
351             for (KeyValue kv : kvList) {
352               if (!authManager.authorize(user, tableName, family.getKey(),
353                 CellUtil.cloneQualifier(kv), permRequest)) {
354                 return AuthResult.deny(request, "Failed qualifier check", user, permRequest,
355                   tableName, makeFamilyMap(family.getKey(), CellUtil.cloneQualifier(kv)));
356               }
357             }
358           }
359         } else {
360           // no qualifiers and family-level check already failed
361           return AuthResult.deny(request, "Failed family check", user, permRequest,
362               tableName, makeFamilyMap(family.getKey(), null));
363         }
364       }
365
366       // all family checks passed
367       return AuthResult.allow(request, "All family checks passed", user, permRequest,
368           tableName, families);
369     }
370
371     // 4. no families to check and table level access failed
372     return AuthResult.deny(request, "No families to check and table permission failed",
373         user, permRequest, tableName, families);
374   }
375
376   /**
377    * Check the current user for authorization to perform a specific action
378    * against the given set of row data.
379    * @param opType the operation type
380    * @param user the user
381    * @param e the coprocessor environment
382    * @param families the map of column families to qualifiers present in
383    * the request
384    * @param actions the desired actions
385    * @return an authorization result
386    */
387   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
388       Map<byte [], ? extends Collection<?>> families, Action... actions) {
389     AuthResult result = null;
390     for (Action action: actions) {
391       result = permissionGranted(opType.toString(), user, action, e, families);
392       if (!result.isAllowed()) {
393         return result;
394       }
395     }
396     return result;
397   }
398
399   private void logResult(AuthResult result) {
400     if (AUDITLOG.isTraceEnabled()) {
401       InetAddress remoteAddr = RpcServer.getRemoteAddress();
402       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
403           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
404           "; reason: " + result.getReason() +
405           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
406           "; request: " + result.getRequest() +
407           "; context: " + result.toContextString());
408     }
409   }
410
411   /**
412    * Returns the active user to which authorization checks should be applied.
413    * If we are in the context of an RPC call, the remote user is used,
414    * otherwise the currently logged in user is used.
415    */
416   private User getActiveUser(ObserverContext ctx) throws IOException {
417     User user = ctx.getCaller();
418     if (user == null) {
419       // for non-rpc handling, fallback to system user
420       user = userProvider.getCurrent();
421     }
422     return user;
423   }
424
425   /**
426    * Authorizes that the current user has any of the given permissions for the
427    * given table, column family and column qualifier.
428    * @param tableName Table requested
429    * @param family Column family requested
430    * @param qualifier Column qualifier requested
431    * @throws IOException if obtaining the current user fails
432    * @throws AccessDeniedException if user has no authorization
433    */
434   private void requirePermission(User user, String request, TableName tableName, byte[] family,
435       byte[] qualifier, Action... permissions) throws IOException {
436     AuthResult result = null;
437
438     for (Action permission : permissions) {
439       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
440         result = AuthResult.allow(request, "Table permission granted", user,
441                                   permission, tableName, family, qualifier);
442         break;
443       } else {
444         // rest of the world
445         result = AuthResult.deny(request, "Insufficient permissions", user,
446                                  permission, tableName, family, qualifier);
447       }
448     }
449     logResult(result);
450     if (authorizationEnabled && !result.isAllowed()) {
451       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
452     }
453   }
454
455   /**
456    * Authorizes that the current user has any of the given permissions for the
457    * given table, column family and column qualifier.
458    * @param tableName Table requested
459    * @param family Column family param
460    * @param qualifier Column qualifier param
461    * @throws IOException if obtaining the current user fails
462    * @throws AccessDeniedException if user has no authorization
463    */
464   private void requireTablePermission(User user, String request, TableName tableName, byte[] family,
465       byte[] qualifier, Action... permissions) throws IOException {
466     AuthResult result = null;
467
468     for (Action permission : permissions) {
469       if (authManager.authorize(user, tableName, null, null, permission)) {
470         result = AuthResult.allow(request, "Table permission granted", user,
471             permission, tableName, null, null);
472         result.getParams().setFamily(family).setQualifier(qualifier);
473         break;
474       } else {
475         // rest of the world
476         result = AuthResult.deny(request, "Insufficient permissions", user,
477             permission, tableName, family, qualifier);
478         result.getParams().setFamily(family).setQualifier(qualifier);
479       }
480     }
481     logResult(result);
482     if (authorizationEnabled && !result.isAllowed()) {
483       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
484     }
485   }
486
487   /**
488    * Authorizes that the current user has any of the given permissions to access the table.
489    *
490    * @param tableName Table requested
491    * @param permissions Actions being requested
492    * @throws IOException if obtaining the current user fails
493    * @throws AccessDeniedException if user has no authorization
494    */
495   private void requireAccess(User user, String request, TableName tableName,
496       Action... permissions) throws IOException {
497     AuthResult result = null;
498
499     for (Action permission : permissions) {
500       if (authManager.hasAccess(user, tableName, permission)) {
501         result = AuthResult.allow(request, "Table permission granted", user,
502                                   permission, tableName, null, null);
503         break;
504       } else {
505         // rest of the world
506         result = AuthResult.deny(request, "Insufficient permissions", user,
507                                  permission, tableName, null, null);
508       }
509     }
510     logResult(result);
511     if (authorizationEnabled && !result.isAllowed()) {
512       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
513     }
514   }
515
516   /**
517    * Authorizes that the current user has global privileges for the given action.
518    * @param perm The action being requested
519    * @throws IOException if obtaining the current user fails
520    * @throws AccessDeniedException if authorization is denied
521    */
522   private void requirePermission(User user, String request, Action perm) throws IOException {
523     requireGlobalPermission(user, request, perm, null, null);
524   }
525
526   /**
527    * Checks that the user has the given global permission. The generated
528    * audit log message will contain context information for the operation
529    * being authorized, based on the given parameters.
530    * @param perm Action being requested
531    * @param tableName Affected table name.
532    * @param familyMap Affected column families.
533    */
534   private void requireGlobalPermission(User user, String request, Action perm, TableName tableName,
535       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
536     AuthResult result = null;
537     if (authManager.authorize(user, perm)) {
538       result = AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap);
539       result.getParams().setTableName(tableName).setFamilies(familyMap);
540       logResult(result);
541     } else {
542       result = AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap);
543       result.getParams().setTableName(tableName).setFamilies(familyMap);
544       logResult(result);
545       if (authorizationEnabled) {
546         throw new AccessDeniedException("Insufficient permissions for user '" +
547           (user != null ? user.getShortName() : "null") +"' (global, action=" +
548           perm.toString() + ")");
549       }
550     }
551   }
552
553   /**
554    * Checks that the user has the given global permission. The generated
555    * audit log message will contain context information for the operation
556    * being authorized, based on the given parameters.
557    * @param perm Action being requested
558    * @param namespace
559    */
560   private void requireGlobalPermission(User user, String request, Action perm,
561                                        String namespace) throws IOException {
562     AuthResult authResult = null;
563     if (authManager.authorize(user, perm)) {
564       authResult = AuthResult.allow(request, "Global check allowed", user, perm, null);
565       authResult.getParams().setNamespace(namespace);
566       logResult(authResult);
567     } else {
568       authResult = AuthResult.deny(request, "Global check failed", user, perm, null);
569       authResult.getParams().setNamespace(namespace);
570       logResult(authResult);
571       if (authorizationEnabled) {
572         throw new AccessDeniedException("Insufficient permissions for user '" +
573           (user != null ? user.getShortName() : "null") +"' (global, action=" +
574           perm.toString() + ")");
575       }
576     }
577   }
578
579   /**
580    * Checks that the user has the given global or namespace permission.
581    * @param namespace
582    * @param permissions Actions being requested
583    */
584   public void requireNamespacePermission(User user, String request, String namespace,
585       Action... permissions) throws IOException {
586     AuthResult result = null;
587
588     for (Action permission : permissions) {
589       if (authManager.authorize(user, namespace, permission)) {
590         result = AuthResult.allow(request, "Namespace permission granted",
591             user, permission, namespace);
592         break;
593       } else {
594         // rest of the world
595         result = AuthResult.deny(request, "Insufficient permissions", user,
596             permission, namespace);
597       }
598     }
599     logResult(result);
600     if (authorizationEnabled && !result.isAllowed()) {
601       throw new AccessDeniedException("Insufficient permissions "
602           + result.toContextString());
603     }
604   }
605
606   /**
607    * Checks that the user has the given global or namespace permission.
608    * @param namespace
609    * @param permissions Actions being requested
610    */
611   public void requireNamespacePermission(User user, String request, String namespace,
612       TableName tableName, Map<byte[], ? extends Collection<byte[]>> familyMap,
613       Action... permissions)
614       throws IOException {
615     AuthResult result = null;
616
617     for (Action permission : permissions) {
618       if (authManager.authorize(user, namespace, permission)) {
619         result = AuthResult.allow(request, "Namespace permission granted",
620             user, permission, namespace);
621         result.getParams().setTableName(tableName).setFamilies(familyMap);
622         break;
623       } else {
624         // rest of the world
625         result = AuthResult.deny(request, "Insufficient permissions", user,
626             permission, namespace);
627         result.getParams().setTableName(tableName).setFamilies(familyMap);
628       }
629     }
630     logResult(result);
631     if (authorizationEnabled && !result.isAllowed()) {
632       throw new AccessDeniedException("Insufficient permissions "
633           + result.toContextString());
634     }
635   }
636
637   /**
638    * Returns <code>true</code> if the current user is allowed the given action
639    * over at least one of the column qualifiers in the given column families.
640    */
641   private boolean hasFamilyQualifierPermission(User user,
642       Action perm,
643       RegionCoprocessorEnvironment env,
644       Map<byte[], ? extends Collection<byte[]>> familyMap)
645     throws IOException {
646     HRegionInfo hri = env.getRegion().getRegionInfo();
647     TableName tableName = hri.getTable();
648
649     if (user == null) {
650       return false;
651     }
652
653     if (familyMap != null && familyMap.size() > 0) {
654       // at least one family must be allowed
655       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
656           familyMap.entrySet()) {
657         if (family.getValue() != null && !family.getValue().isEmpty()) {
658           for (byte[] qualifier : family.getValue()) {
659             if (authManager.matchPermission(user, tableName,
660                 family.getKey(), qualifier, perm)) {
661               return true;
662             }
663           }
664         } else {
665           if (authManager.matchPermission(user, tableName, family.getKey(),
666               perm)) {
667             return true;
668           }
669         }
670       }
671     } else if (LOG.isDebugEnabled()) {
672       LOG.debug("Empty family map passed for permission check");
673     }
674
675     return false;
676   }
677
678   private enum OpType {
679     GET("get"),
680     EXISTS("exists"),
681     SCAN("scan"),
682     PUT("put"),
683     DELETE("delete"),
684     CHECK_AND_PUT("checkAndPut"),
685     CHECK_AND_DELETE("checkAndDelete"),
686     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
687     APPEND("append"),
688     INCREMENT("increment");
689
690     private String type;
691
692     private OpType(String type) {
693       this.type = type;
694     }
695
696     @Override
697     public String toString() {
698       return type;
699     }
700   }
701
702   /**
703    * Determine if cell ACLs covered by the operation grant access. This is expensive.
704    * @return false if cell ACLs failed to grant access, true otherwise
705    * @throws IOException
706    */
707   private boolean checkCoveringPermission(User user, OpType request, RegionCoprocessorEnvironment e,
708       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
709       throws IOException {
710     if (!cellFeaturesEnabled) {
711       return false;
712     }
713     long cellGrants = 0;
714     long latestCellTs = 0;
715     Get get = new Get(row);
716     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
717     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
718     // version. We have to get every cell version and check its TS against the TS asked for in
719     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
720     // consider only one such passing cell. In case of Delete we have to consider all the cell
721     // versions under this passing version. When Delete Mutation contains columns which are a
722     // version delete just consider only one version for those column cells.
723     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
724     if (considerCellTs) {
725       get.setMaxVersions();
726     } else {
727       get.setMaxVersions(1);
728     }
729     boolean diffCellTsFromOpTs = false;
730     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
731       byte[] col = entry.getKey();
732       // TODO: HBASE-7114 could possibly unify the collection type in family
733       // maps so we would not need to do this
734       if (entry.getValue() instanceof Set) {
735         Set<byte[]> set = (Set<byte[]>)entry.getValue();
736         if (set == null || set.isEmpty()) {
737           get.addFamily(col);
738         } else {
739           for (byte[] qual: set) {
740             get.addColumn(col, qual);
741           }
742         }
743       } else if (entry.getValue() instanceof List) {
744         List<Cell> list = (List<Cell>)entry.getValue();
745         if (list == null || list.isEmpty()) {
746           get.addFamily(col);
747         } else {
748           // In case of family delete, a Cell will be added into the list with Qualifier as null.
749           for (Cell cell : list) {
750             if (cell.getQualifierLength() == 0
751                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
752                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
753               get.addFamily(col);
754             } else {
755               get.addColumn(col, CellUtil.cloneQualifier(cell));
756             }
757             if (considerCellTs) {
758               long cellTs = cell.getTimestamp();
759               latestCellTs = Math.max(latestCellTs, cellTs);
760               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
761             }
762           }
763         }
764       } else if (entry.getValue() == null) {
765         get.addFamily(col);
766       } else {
767         throw new RuntimeException("Unhandled collection type " +
768           entry.getValue().getClass().getName());
769       }
770     }
771     // We want to avoid looking into the future. So, if the cells of the
772     // operation specify a timestamp, or the operation itself specifies a
773     // timestamp, then we use the maximum ts found. Otherwise, we bound
774     // the Get to the current server time. We add 1 to the timerange since
775     // the upper bound of a timerange is exclusive yet we need to examine
776     // any cells found there inclusively.
777     long latestTs = Math.max(opTs, latestCellTs);
778     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
779       latestTs = EnvironmentEdgeManager.currentTime();
780     }
781     get.setTimeRange(0, latestTs + 1);
782     // In case of Put operation we set to read all versions. This was done to consider the case
783     // where columns are added with TS other than the Mutation TS. But normally this wont be the
784     // case with Put. There no need to get all versions but get latest version only.
785     if (!diffCellTsFromOpTs && request == OpType.PUT) {
786       get.setMaxVersions(1);
787     }
788     if (LOG.isTraceEnabled()) {
789       LOG.trace("Scanning for cells with " + get);
790     }
791     // This Map is identical to familyMap. The key is a BR rather than byte[].
792     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
793     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
794     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
795     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
796       if (entry.getValue() instanceof List) {
797         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
798       }
799     }
800     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
801     List<Cell> cells = Lists.newArrayList();
802     Cell prevCell = null;
803     ByteRange curFam = new SimpleMutableByteRange();
804     boolean curColAllVersions = (request == OpType.DELETE);
805     long curColCheckTs = opTs;
806     boolean foundColumn = false;
807     try {
808       boolean more = false;
809       ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
810
811       do {
812         cells.clear();
813         // scan with limit as 1 to hold down memory use on wide rows
814         more = scanner.next(cells, scannerContext);
815         for (Cell cell: cells) {
816           if (LOG.isTraceEnabled()) {
817             LOG.trace("Found cell " + cell);
818           }
819           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
820           if (colChange) foundColumn = false;
821           prevCell = cell;
822           if (!curColAllVersions && foundColumn) {
823             continue;
824           }
825           if (colChange && considerCellTs) {
826             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
827             List<Cell> cols = familyMap1.get(curFam);
828             for (Cell col : cols) {
829               // null/empty qualifier is used to denote a Family delete. The TS and delete type
830               // associated with this is applicable for all columns within the family. That is
831               // why the below (col.getQualifierLength() == 0) check.
832               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
833                   || CellUtil.matchingQualifier(cell, col)) {
834                 byte type = col.getTypeByte();
835                 if (considerCellTs) {
836                   curColCheckTs = col.getTimestamp();
837                 }
838                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
839                 // a version delete for a column no need to check all the covering cells within
840                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
841                 // One version delete types are Delete/DeleteFamilyVersion
842                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
843                     || (KeyValue.Type.DeleteFamily.getCode() == type);
844                 break;
845               }
846             }
847           }
848           if (cell.getTimestamp() > curColCheckTs) {
849             // Just ignore this cell. This is not a covering cell.
850             continue;
851           }
852           foundColumn = true;
853           for (Action action: actions) {
854             // Are there permissions for this user for the cell?
855             if (!authManager.authorize(user, getTableName(e), cell, action)) {
856               // We can stop if the cell ACL denies access
857               return false;
858             }
859           }
860           cellGrants++;
861         }
862       } while (more);
863     } catch (AccessDeniedException ex) {
864       throw ex;
865     } catch (IOException ex) {
866       LOG.error("Exception while getting cells to calculate covering permission", ex);
867     } finally {
868       scanner.close();
869     }
870     // We should not authorize unless we have found one or more cell ACLs that
871     // grant access. This code is used to check for additional permissions
872     // after no table or CF grants are found.
873     return cellGrants > 0;
874   }
875
876   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
877     // Iterate over the entries in the familyMap, replacing the cells therein
878     // with new cells including the ACL data
879     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
880       List<Cell> newCells = Lists.newArrayList();
881       for (Cell cell: e.getValue()) {
882         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
883         List<Tag> tags = new ArrayList<Tag>();
884         tags.add(new ArrayBackedTag(AccessControlLists.ACL_TAG_TYPE, perms));
885         Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell);
886         while (tagIterator.hasNext()) {
887           tags.add(tagIterator.next());
888         }
889         newCells.add(CellUtil.createCell(cell, tags));
890       }
891       // This is supposed to be safe, won't CME
892       e.setValue(newCells);
893     }
894   }
895
896   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
897   // type is reserved and should not be explicitly set by user.
898   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
899     // No need to check if we're not going to throw
900     if (!authorizationEnabled) {
901       m.setAttribute(TAG_CHECK_PASSED, TRUE);
902       return;
903     }
904     // Superusers are allowed to store cells unconditionally.
905     if (Superusers.isSuperUser(user)) {
906       m.setAttribute(TAG_CHECK_PASSED, TRUE);
907       return;
908     }
909     // We already checked (prePut vs preBatchMutation)
910     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
911       return;
912     }
913     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
914       Iterator<Tag> tagsItr = CellUtil.tagsIterator(cellScanner.current());
915       while (tagsItr.hasNext()) {
916         if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
917           throw new AccessDeniedException("Mutation contains cell with reserved type tag");
918         }
919       }
920     }
921     m.setAttribute(TAG_CHECK_PASSED, TRUE);
922   }
923
924   /* ---- MasterObserver implementation ---- */
925   @Override
926   public void start(CoprocessorEnvironment env) throws IOException {
927     CompoundConfiguration conf = new CompoundConfiguration();
928     conf.add(env.getConfiguration());
929
930     authorizationEnabled = isAuthorizationSupported(conf);
931     if (!authorizationEnabled) {
932       LOG.warn("The AccessController has been loaded with authorization checks disabled.");
933     }
934
935     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
936       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
937
938     cellFeaturesEnabled = (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
939     if (!cellFeaturesEnabled) {
940       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
941           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
942           + " accordingly.");
943     }
944
945     ZooKeeperWatcher zk = null;
946     if (env instanceof MasterCoprocessorEnvironment) {
947       // if running on HMaster
948       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
949       zk = mEnv.getMasterServices().getZooKeeper();
950     } else if (env instanceof RegionServerCoprocessorEnvironment) {
951       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
952       zk = rsEnv.getRegionServerServices().getZooKeeper();
953     } else if (env instanceof RegionCoprocessorEnvironment) {
954       // if running at region
955       regionEnv = (RegionCoprocessorEnvironment) env;
956       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
957       zk = regionEnv.getRegionServerServices().getZooKeeper();
958       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
959         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
960     }
961
962     // set the user-provider.
963     this.userProvider = UserProvider.instantiate(env.getConfiguration());
964
965     // If zk is null or IOException while obtaining auth manager,
966     // throw RuntimeException so that the coprocessor is unloaded.
967     if (zk != null) {
968       try {
969         this.authManager = TableAuthManager.getOrCreate(zk, env.getConfiguration());
970       } catch (IOException ioe) {
971         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
972       }
973     } else {
974       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
975     }
976
977     tableAcls = new MapMaker().weakValues().makeMap();
978   }
979
980   @Override
981   public void stop(CoprocessorEnvironment env) {
982     if (this.authManager != null) {
983       TableAuthManager.release(authManager);
984     }
985   }
986
987   @Override
988   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
989       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
990     Set<byte[]> families = desc.getFamiliesKeys();
991     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
992     for (byte[] family: families) {
993       familyMap.put(family, null);
994     }
995     requireNamespacePermission(getActiveUser(c), "createTable",
996         desc.getTableName().getNamespaceAsString(), desc.getTableName(), familyMap, Action.CREATE);
997   }
998
999   @Override
1000   public void postCompletedCreateTableAction(
1001       final ObserverContext<MasterCoprocessorEnvironment> c,
1002       final HTableDescriptor desc,
1003       final HRegionInfo[] regions) throws IOException {
1004     // When AC is used, it should be configured as the 1st CP.
1005     // In Master, the table operations like create, are handled by a Thread pool but the max size
1006     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
1007     // sequentially only.
1008     // Related code in HMaster#startServiceThreads
1009     // {code}
1010     //   // We depend on there being only one instance of this executor running
1011     //   // at a time. To do concurrency, would need fencing of enable/disable of
1012     //   // tables.
1013     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1014     // {code}
1015     // In future if we change this pool to have more threads, then there is a chance for thread,
1016     // creating acl table, getting delayed and by that time another table creation got over and
1017     // this hook is getting called. In such a case, we will need a wait logic here which will
1018     // wait till the acl table is created.
1019     if (AccessControlLists.isAclTable(desc)) {
1020       this.aclTabAvailable = true;
1021     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
1022       if (!aclTabAvailable) {
1023         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
1024             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
1025             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
1026       } else {
1027         String owner = desc.getOwnerString();
1028         // default the table owner to current user, if not specified.
1029         if (owner == null)
1030           owner = getActiveUser(c).getShortName();
1031         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1032             desc.getTableName(), null, Action.values());
1033         // switch to the real hbase master user for doing the RPC on the ACL table
1034         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1035           @Override
1036           public Void run() throws Exception {
1037             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
1038                 userperm);
1039             return null;
1040           }
1041         });
1042       }
1043     }
1044   }
1045
1046   @Override
1047   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1048       throws IOException {
1049     requirePermission(getActiveUser(c), "deleteTable", tableName, null, null,
1050         Action.ADMIN, Action.CREATE);
1051   }
1052
1053   @Override
1054   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
1055       final TableName tableName) throws IOException {
1056     final Configuration conf = c.getEnvironment().getConfiguration();
1057     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1058       @Override
1059       public Void run() throws Exception {
1060         AccessControlLists.removeTablePermissions(conf, tableName);
1061         return null;
1062       }
1063     });
1064     this.authManager.getZKPermissionWatcher().deleteTableACLNode(tableName);
1065   }
1066
1067   @Override
1068   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
1069       final TableName tableName) throws IOException {
1070     requirePermission(getActiveUser(c), "truncateTable", tableName, null, null,
1071         Action.ADMIN, Action.CREATE);
1072
1073     final Configuration conf = c.getEnvironment().getConfiguration();
1074     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1075       @Override
1076       public Void run() throws Exception {
1077         List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
1078         if (acls != null) {
1079           tableAcls.put(tableName, acls);
1080         }
1081         return null;
1082       }
1083     });
1084   }
1085
1086   @Override
1087   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
1088       final TableName tableName) throws IOException {
1089     final Configuration conf = ctx.getEnvironment().getConfiguration();
1090     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1091       @Override
1092       public Void run() throws Exception {
1093         List<UserPermission> perms = tableAcls.get(tableName);
1094         if (perms != null) {
1095           for (UserPermission perm : perms) {
1096             AccessControlLists.addUserPermission(conf, perm);
1097           }
1098         }
1099         tableAcls.remove(tableName);
1100         return null;
1101       }
1102     });
1103   }
1104
1105   @Override
1106   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1107       HTableDescriptor htd) throws IOException {
1108     requirePermission(getActiveUser(c), "modifyTable", tableName, null, null,
1109         Action.ADMIN, Action.CREATE);
1110   }
1111
1112   @Override
1113   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
1114       TableName tableName, final HTableDescriptor htd) throws IOException {
1115     final Configuration conf = c.getEnvironment().getConfiguration();
1116     // default the table owner to current user, if not specified.
1117     final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
1118       getActiveUser(c).getShortName();
1119     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1120       @Override
1121       public Void run() throws Exception {
1122         UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1123             htd.getTableName(), null, Action.values());
1124         AccessControlLists.addUserPermission(conf, userperm);
1125         return null;
1126       }
1127     });
1128   }
1129
1130   @Override
1131   public void preAddColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1132                                  TableName tableName, HColumnDescriptor columnFamily)
1133       throws IOException {
1134     requireTablePermission(getActiveUser(ctx), "addColumn", tableName, columnFamily.getName(), null,
1135         Action.ADMIN, Action.CREATE);
1136   }
1137
1138   @Override
1139   public void preModifyColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1140                                     TableName tableName, HColumnDescriptor columnFamily)
1141       throws IOException {
1142     requirePermission(getActiveUser(ctx), "modifyColumn", tableName, columnFamily.getName(), null,
1143         Action.ADMIN, Action.CREATE);
1144   }
1145
1146   @Override
1147   public void preDeleteColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1148                                     TableName tableName, byte[] columnFamily) throws IOException {
1149     requirePermission(getActiveUser(ctx), "deleteColumn", tableName, columnFamily, null,
1150         Action.ADMIN, Action.CREATE);
1151   }
1152
1153   @Override
1154   public void postDeleteColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1155       final TableName tableName, final byte[] columnFamily) throws IOException {
1156     final Configuration conf = ctx.getEnvironment().getConfiguration();
1157     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1158       @Override
1159       public Void run() throws Exception {
1160         AccessControlLists.removeTablePermissions(conf, tableName, columnFamily);
1161         return null;
1162       }
1163     });
1164   }
1165
1166   @Override
1167   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1168       throws IOException {
1169     requirePermission(getActiveUser(c), "enableTable", tableName, null, null,
1170         Action.ADMIN, Action.CREATE);
1171   }
1172
1173   @Override
1174   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1175       throws IOException {
1176     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1177       // We have to unconditionally disallow disable of the ACL table when we are installed,
1178       // even if not enforcing authorizations. We are still allowing grants and revocations,
1179       // checking permissions and logging audit messages, etc. If the ACL table is not
1180       // available we will fail random actions all over the place.
1181       throw new AccessDeniedException("Not allowed to disable "
1182           + AccessControlLists.ACL_TABLE_NAME + " table with AccessController installed");
1183     }
1184     requirePermission(getActiveUser(c), "disableTable", tableName, null, null,
1185         Action.ADMIN, Action.CREATE);
1186   }
1187
1188   @Override
1189   public void preAbortProcedure(
1190       ObserverContext<MasterCoprocessorEnvironment> ctx,
1191       final ProcedureExecutor<MasterProcedureEnv> procEnv,
1192       final long procId) throws IOException {
1193     if (!procEnv.isProcedureOwner(procId, getActiveUser(ctx))) {
1194       // If the user is not the procedure owner, then we should further probe whether
1195       // he can abort the procedure.
1196       requirePermission(getActiveUser(ctx), "abortProcedure", Action.ADMIN);
1197     }
1198   }
1199
1200   @Override
1201   public void postAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx)
1202       throws IOException {
1203     // There is nothing to do at this time after the procedure abort request was sent.
1204   }
1205
1206   @Override
1207   public void preListProcedures(ObserverContext<MasterCoprocessorEnvironment> ctx)
1208       throws IOException {
1209     // We are delegating the authorization check to postListProcedures as we don't have
1210     // any concrete set of procedures to work with
1211   }
1212
1213   @Override
1214   public void postListProcedures(
1215       ObserverContext<MasterCoprocessorEnvironment> ctx,
1216       List<ProcedureInfo> procInfoList) throws IOException {
1217     if (procInfoList.isEmpty()) {
1218       return;
1219     }
1220
1221     // Retains only those which passes authorization checks, as the checks weren't done as part
1222     // of preListProcedures.
1223     Iterator<ProcedureInfo> itr = procInfoList.iterator();
1224     User user = getActiveUser(ctx);
1225     while (itr.hasNext()) {
1226       ProcedureInfo procInfo = itr.next();
1227       try {
1228         if (!ProcedureInfo.isProcedureOwner(procInfo, user)) {
1229           // If the user is not the procedure owner, then we should further probe whether
1230           // he can see the procedure.
1231           requirePermission(user, "listProcedures", Action.ADMIN);
1232         }
1233       } catch (AccessDeniedException e) {
1234         itr.remove();
1235       }
1236     }
1237   }
1238
1239   @Override
1240   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1241       ServerName srcServer, ServerName destServer) throws IOException {
1242     requirePermission(getActiveUser(c), "move", region.getTable(), null, null, Action.ADMIN);
1243   }
1244
1245   @Override
1246   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1247       throws IOException {
1248     requirePermission(getActiveUser(c), "assign", regionInfo.getTable(), null, null, Action.ADMIN);
1249   }
1250
1251   @Override
1252   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1253       boolean force) throws IOException {
1254     requirePermission(getActiveUser(c), "unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1255   }
1256
1257   @Override
1258   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1259       HRegionInfo regionInfo) throws IOException {
1260     requirePermission(getActiveUser(c), "regionOffline", regionInfo.getTable(), null, null,
1261         Action.ADMIN);
1262   }
1263
1264   @Override
1265   public boolean preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1266       final boolean newValue, final MasterSwitchType switchType) throws IOException {
1267     requirePermission(getActiveUser(ctx), "setSplitOrMergeEnabled", Action.ADMIN);
1268     return false;
1269   }
1270
1271   @Override
1272   public void postSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1273       final boolean newValue, final MasterSwitchType switchType) throws IOException {
1274   }
1275
1276   @Override
1277   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1278       throws IOException {
1279     requirePermission(getActiveUser(c), "balance", Action.ADMIN);
1280   }
1281
1282   @Override
1283   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1284       boolean newValue) throws IOException {
1285     requirePermission(getActiveUser(c), "balanceSwitch", Action.ADMIN);
1286     return newValue;
1287   }
1288
1289   @Override
1290   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1291       throws IOException {
1292     requirePermission(getActiveUser(c), "shutdown", Action.ADMIN);
1293   }
1294
1295   @Override
1296   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1297       throws IOException {
1298     requirePermission(getActiveUser(c), "stopMaster", Action.ADMIN);
1299   }
1300
1301   @Override
1302   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1303       throws IOException {
1304     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1305       .getConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1306       // initialize the ACL storage table
1307       AccessControlLists.createACLTable(ctx.getEnvironment().getMasterServices());
1308     } else {
1309       aclTabAvailable = true;
1310     }
1311   }
1312
1313   @Override
1314   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1315       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1316       throws IOException {
1317     requirePermission(getActiveUser(ctx), "snapshot", hTableDescriptor.getTableName(), null, null,
1318       Permission.Action.ADMIN);
1319   }
1320
1321   @Override
1322   public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1323       final SnapshotDescription snapshot) throws IOException {
1324     User user = getActiveUser(ctx);
1325     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1326       // list it, if user is the owner of snapshot
1327       // TODO: We are not logging this for audit
1328     } else {
1329       requirePermission(user, "listSnapshot", Action.ADMIN);
1330     }
1331   }
1332
1333   @Override
1334   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1335       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1336       throws IOException {
1337     requirePermission(getActiveUser(ctx), "clone", Action.ADMIN);
1338   }
1339
1340   @Override
1341   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1342       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1343       throws IOException {
1344     User user = getActiveUser(ctx);
1345     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1346       requirePermission(user, "restoreSnapshot", hTableDescriptor.getTableName(), null, null,
1347         Permission.Action.ADMIN);
1348     } else {
1349       requirePermission(user, "restoreSnapshot", Action.ADMIN);
1350     }
1351   }
1352
1353   @Override
1354   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1355       final SnapshotDescription snapshot) throws IOException {
1356     User user = getActiveUser(ctx);
1357     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1358       // Snapshot owner is allowed to delete the snapshot
1359       // TODO: We are not logging this for audit
1360     } else {
1361       requirePermission(user, "deleteSnapshot", Action.ADMIN);
1362     }
1363   }
1364
1365   @Override
1366   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1367       NamespaceDescriptor ns) throws IOException {
1368     requireGlobalPermission(getActiveUser(ctx), "createNamespace", Action.ADMIN, ns.getName());
1369   }
1370
1371   @Override
1372   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1373       throws IOException {
1374     requireGlobalPermission(getActiveUser(ctx), "deleteNamespace", Action.ADMIN, namespace);
1375   }
1376
1377   @Override
1378   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1379       final String namespace) throws IOException {
1380     final Configuration conf = ctx.getEnvironment().getConfiguration();
1381     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1382       @Override
1383       public Void run() throws Exception {
1384         AccessControlLists.removeNamespacePermissions(conf, namespace);
1385         return null;
1386       }
1387     });
1388     this.authManager.getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
1389     LOG.info(namespace + " entry deleted in " + AccessControlLists.ACL_TABLE_NAME + " table.");
1390   }
1391
1392   @Override
1393   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1394       NamespaceDescriptor ns) throws IOException {
1395     // We require only global permission so that
1396     // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1397     requireGlobalPermission(getActiveUser(ctx), "modifyNamespace", Action.ADMIN, ns.getName());
1398   }
1399
1400   @Override
1401   public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1402       throws IOException {
1403     requireNamespacePermission(getActiveUser(ctx), "getNamespaceDescriptor", namespace, Action.ADMIN);
1404   }
1405
1406   @Override
1407   public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1408       List<NamespaceDescriptor> descriptors) throws IOException {
1409     // Retains only those which passes authorization checks, as the checks weren't done as part
1410     // of preGetTableDescriptors.
1411     Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1412     User user = getActiveUser(ctx);
1413     while (itr.hasNext()) {
1414       NamespaceDescriptor desc = itr.next();
1415       try {
1416         requireNamespacePermission(user, "listNamespaces", desc.getName(), Action.ADMIN);
1417       } catch (AccessDeniedException e) {
1418         itr.remove();
1419       }
1420     }
1421   }
1422
1423   @Override
1424   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1425       final TableName tableName) throws IOException {
1426     requirePermission(getActiveUser(ctx), "flushTable", tableName, null, null,
1427         Action.ADMIN, Action.CREATE);
1428   }
1429
1430   /* ---- RegionObserver implementation ---- */
1431
1432   @Override
1433   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c)
1434       throws IOException {
1435     RegionCoprocessorEnvironment env = c.getEnvironment();
1436     final Region region = env.getRegion();
1437     if (region == null) {
1438       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1439     } else {
1440       HRegionInfo regionInfo = region.getRegionInfo();
1441       if (regionInfo.getTable().isSystemTable()) {
1442         checkSystemOrSuperUser(getActiveUser(c));
1443       } else {
1444         requirePermission(getActiveUser(c), "preOpen", Action.ADMIN);
1445       }
1446     }
1447   }
1448
1449   @Override
1450   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1451     RegionCoprocessorEnvironment env = c.getEnvironment();
1452     final Region region = env.getRegion();
1453     if (region == null) {
1454       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1455       return;
1456     }
1457     if (AccessControlLists.isAclRegion(region)) {
1458       aclRegion = true;
1459       // When this region is under recovering state, initialize will be handled by postLogReplay
1460       if (!region.isRecovering()) {
1461         try {
1462           initialize(env);
1463         } catch (IOException ex) {
1464           // if we can't obtain permissions, it's better to fail
1465           // than perform checks incorrectly
1466           throw new RuntimeException("Failed to initialize permissions cache", ex);
1467         }
1468       }
1469     } else {
1470       initialized = true;
1471     }
1472   }
1473
1474   @Override
1475   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1476     if (aclRegion) {
1477       try {
1478         initialize(c.getEnvironment());
1479       } catch (IOException ex) {
1480         // if we can't obtain permissions, it's better to fail
1481         // than perform checks incorrectly
1482         throw new RuntimeException("Failed to initialize permissions cache", ex);
1483       }
1484     }
1485   }
1486
1487   @Override
1488   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
1489     requirePermission(getActiveUser(c), "flush", getTableName(c.getEnvironment()), null, null,
1490         Action.ADMIN, Action.CREATE);
1491   }
1492
1493   @Override
1494   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
1495     requirePermission(getActiveUser(c), "split", getTableName(c.getEnvironment()), null, null,
1496         Action.ADMIN);
1497   }
1498
1499   @Override
1500   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c,
1501       byte[] splitRow) throws IOException {
1502     requirePermission(getActiveUser(c), "split", getTableName(c.getEnvironment()), null, null,
1503         Action.ADMIN);
1504   }
1505
1506   @Override
1507   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c,
1508       final Store store, final InternalScanner scanner, final ScanType scanType)
1509           throws IOException {
1510     requirePermission(getActiveUser(c), "compact", getTableName(c.getEnvironment()), null, null,
1511         Action.ADMIN, Action.CREATE);
1512     return scanner;
1513   }
1514
1515   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1516       final Query query, OpType opType) throws IOException {
1517     Filter filter = query.getFilter();
1518     // Don't wrap an AccessControlFilter
1519     if (filter != null && filter instanceof AccessControlFilter) {
1520       return;
1521     }
1522     User user = getActiveUser(c);
1523     RegionCoprocessorEnvironment env = c.getEnvironment();
1524     Map<byte[],? extends Collection<byte[]>> families = null;
1525     switch (opType) {
1526     case GET:
1527     case EXISTS:
1528       families = ((Get)query).getFamilyMap();
1529       break;
1530     case SCAN:
1531       families = ((Scan)query).getFamilyMap();
1532       break;
1533     default:
1534       throw new RuntimeException("Unhandled operation " + opType);
1535     }
1536     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1537     Region region = getRegion(env);
1538     TableName table = getTableName(region);
1539     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1540     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1541       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1542     }
1543     if (!authResult.isAllowed()) {
1544       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1545         // Old behavior: Scan with only qualifier checks if we have partial
1546         // permission. Backwards compatible behavior is to throw an
1547         // AccessDeniedException immediately if there are no grants for table
1548         // or CF or CF+qual. Only proceed with an injected filter if there are
1549         // grants for qualifiers. Otherwise we will fall through below and log
1550         // the result and throw an ADE. We may end up checking qualifier
1551         // grants three times (permissionGranted above, here, and in the
1552         // filter) but that's the price of backwards compatibility.
1553         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1554           authResult.setAllowed(true);
1555           authResult.setReason("Access allowed with filter");
1556           // Only wrap the filter if we are enforcing authorizations
1557           if (authorizationEnabled) {
1558             Filter ourFilter = new AccessControlFilter(authManager, user, table,
1559               AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1560               cfVsMaxVersions);
1561             // wrap any existing filter
1562             if (filter != null) {
1563               ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1564                 Lists.newArrayList(ourFilter, filter));
1565             }
1566             switch (opType) {
1567               case GET:
1568               case EXISTS:
1569                 ((Get)query).setFilter(ourFilter);
1570                 break;
1571               case SCAN:
1572                 ((Scan)query).setFilter(ourFilter);
1573                 break;
1574               default:
1575                 throw new RuntimeException("Unhandled operation " + opType);
1576             }
1577           }
1578         }
1579       } else {
1580         // New behavior: Any access we might be granted is more fine-grained
1581         // than whole table or CF. Simply inject a filter and return what is
1582         // allowed. We will not throw an AccessDeniedException. This is a
1583         // behavioral change since 0.96.
1584         authResult.setAllowed(true);
1585         authResult.setReason("Access allowed with filter");
1586         // Only wrap the filter if we are enforcing authorizations
1587         if (authorizationEnabled) {
1588           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1589             AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1590           // wrap any existing filter
1591           if (filter != null) {
1592             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1593               Lists.newArrayList(ourFilter, filter));
1594           }
1595           switch (opType) {
1596             case GET:
1597             case EXISTS:
1598               ((Get)query).setFilter(ourFilter);
1599               break;
1600             case SCAN:
1601               ((Scan)query).setFilter(ourFilter);
1602               break;
1603             default:
1604               throw new RuntimeException("Unhandled operation " + opType);
1605           }
1606         }
1607       }
1608     }
1609
1610     logResult(authResult);
1611     if (authorizationEnabled && !authResult.isAllowed()) {
1612       throw new AccessDeniedException("Insufficient permissions for user '"
1613           + (user != null ? user.getShortName() : "null")
1614           + "' (table=" + table + ", action=READ)");
1615     }
1616   }
1617
1618   @Override
1619   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1620       final Get get, final List<Cell> result) throws IOException {
1621     internalPreRead(c, get, OpType.GET);
1622   }
1623
1624   @Override
1625   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1626       final Get get, final boolean exists) throws IOException {
1627     internalPreRead(c, get, OpType.EXISTS);
1628     return exists;
1629   }
1630
1631   @Override
1632   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1633       final Put put, final WALEdit edit, final Durability durability)
1634       throws IOException {
1635     User user = getActiveUser(c);
1636     checkForReservedTagPresence(user, put);
1637
1638     // Require WRITE permission to the table, CF, or top visible value, if any.
1639     // NOTE: We don't need to check the permissions for any earlier Puts
1640     // because we treat the ACLs in each Put as timestamped like any other
1641     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1642     // change the ACL of any previous Put. This allows simple evolution of
1643     // security policy over time without requiring expensive updates.
1644     RegionCoprocessorEnvironment env = c.getEnvironment();
1645     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1646     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1647     logResult(authResult);
1648     if (!authResult.isAllowed()) {
1649       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1650         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1651       } else if (authorizationEnabled) {
1652         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1653       }
1654     }
1655
1656     // Add cell ACLs from the operation to the cells themselves
1657     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1658     if (bytes != null) {
1659       if (cellFeaturesEnabled) {
1660         addCellPermissions(bytes, put.getFamilyCellMap());
1661       } else {
1662         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1663       }
1664     }
1665   }
1666
1667   @Override
1668   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1669       final Put put, final WALEdit edit, final Durability durability) {
1670     if (aclRegion) {
1671       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1672     }
1673   }
1674
1675   @Override
1676   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1677       final Delete delete, final WALEdit edit, final Durability durability)
1678       throws IOException {
1679     // An ACL on a delete is useless, we shouldn't allow it
1680     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1681       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1682     }
1683     // Require WRITE permissions on all cells covered by the delete. Unlike
1684     // for Puts we need to check all visible prior versions, because a major
1685     // compaction could remove them. If the user doesn't have permission to
1686     // overwrite any of the visible versions ('visible' defined as not covered
1687     // by a tombstone already) then we have to disallow this operation.
1688     RegionCoprocessorEnvironment env = c.getEnvironment();
1689     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1690     User user = getActiveUser(c);
1691     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1692     logResult(authResult);
1693     if (!authResult.isAllowed()) {
1694       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1695         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1696       } else if (authorizationEnabled) {
1697         throw new AccessDeniedException("Insufficient permissions " +
1698           authResult.toContextString());
1699       }
1700     }
1701   }
1702
1703   @Override
1704   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1705       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1706     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1707       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1708       User user = getActiveUser(c);
1709       for (int i = 0; i < miniBatchOp.size(); i++) {
1710         Mutation m = miniBatchOp.getOperation(i);
1711         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1712           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1713           // perm check
1714           OpType opType;
1715           if (m instanceof Put) {
1716             checkForReservedTagPresence(user, m);
1717             opType = OpType.PUT;
1718           } else {
1719             opType = OpType.DELETE;
1720           }
1721           AuthResult authResult = null;
1722           if (checkCoveringPermission(user, opType, c.getEnvironment(), m.getRow(),
1723             m.getFamilyCellMap(), m.getTimeStamp(), Action.WRITE)) {
1724             authResult = AuthResult.allow(opType.toString(), "Covering cell set",
1725               user, Action.WRITE, table, m.getFamilyCellMap());
1726           } else {
1727             authResult = AuthResult.deny(opType.toString(), "Covering cell set",
1728               user, Action.WRITE, table, m.getFamilyCellMap());
1729           }
1730           logResult(authResult);
1731           if (authorizationEnabled && !authResult.isAllowed()) {
1732             throw new AccessDeniedException("Insufficient permissions "
1733               + authResult.toContextString());
1734           }
1735         }
1736       }
1737     }
1738   }
1739
1740   @Override
1741   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1742       final Delete delete, final WALEdit edit, final Durability durability)
1743       throws IOException {
1744     if (aclRegion) {
1745       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1746     }
1747   }
1748
1749   @Override
1750   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1751       final byte [] row, final byte [] family, final byte [] qualifier,
1752       final CompareFilter.CompareOp compareOp,
1753       final ByteArrayComparable comparator, final Put put,
1754       final boolean result) throws IOException {
1755     User user = getActiveUser(c);
1756     checkForReservedTagPresence(user, put);
1757
1758     // Require READ and WRITE permissions on the table, CF, and KV to update
1759     RegionCoprocessorEnvironment env = c.getEnvironment();
1760     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1761     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1762       Action.READ, Action.WRITE);
1763     logResult(authResult);
1764     if (!authResult.isAllowed()) {
1765       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1766         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1767       } else if (authorizationEnabled) {
1768         throw new AccessDeniedException("Insufficient permissions " +
1769           authResult.toContextString());
1770       }
1771     }
1772
1773     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1774     if (bytes != null) {
1775       if (cellFeaturesEnabled) {
1776         addCellPermissions(bytes, put.getFamilyCellMap());
1777       } else {
1778         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1779       }
1780     }
1781     return result;
1782   }
1783
1784   @Override
1785   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1786       final byte[] row, final byte[] family, final byte[] qualifier,
1787       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1788       final boolean result) throws IOException {
1789     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1790       // We had failure with table, cf and q perm checks and now giving a chance for cell
1791       // perm check
1792       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1793       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1794       AuthResult authResult = null;
1795       User user = getActiveUser(c);
1796       if (checkCoveringPermission(user, OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1797           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1798         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1799             user, Action.READ, table, families);
1800       } else {
1801         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1802             user, Action.READ, table, families);
1803       }
1804       logResult(authResult);
1805       if (authorizationEnabled && !authResult.isAllowed()) {
1806         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1807       }
1808     }
1809     return result;
1810   }
1811
1812   @Override
1813   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1814       final byte [] row, final byte [] family, final byte [] qualifier,
1815       final CompareFilter.CompareOp compareOp,
1816       final ByteArrayComparable comparator, final Delete delete,
1817       final boolean result) throws IOException {
1818     // An ACL on a delete is useless, we shouldn't allow it
1819     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1820       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1821           delete.toString());
1822     }
1823     // Require READ and WRITE permissions on the table, CF, and the KV covered
1824     // by the delete
1825     RegionCoprocessorEnvironment env = c.getEnvironment();
1826     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1827     User user = getActiveUser(c);
1828     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1829         Action.READ, Action.WRITE);
1830     logResult(authResult);
1831     if (!authResult.isAllowed()) {
1832       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1833         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1834       } else if (authorizationEnabled) {
1835         throw new AccessDeniedException("Insufficient permissions " +
1836           authResult.toContextString());
1837       }
1838     }
1839     return result;
1840   }
1841
1842   @Override
1843   public boolean preCheckAndDeleteAfterRowLock(
1844       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1845       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1846       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1847       throws IOException {
1848     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1849       // We had failure with table, cf and q perm checks and now giving a chance for cell
1850       // perm check
1851       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1852       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1853       AuthResult authResult = null;
1854       User user = getActiveUser(c);
1855       if (checkCoveringPermission(user, OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1856           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1857         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1858             user, Action.READ, table, families);
1859       } else {
1860         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1861             user, Action.READ, table, families);
1862       }
1863       logResult(authResult);
1864       if (authorizationEnabled && !authResult.isAllowed()) {
1865         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1866       }
1867     }
1868     return result;
1869   }
1870
1871   @Override
1872   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1873       final byte [] row, final byte [] family, final byte [] qualifier,
1874       final long amount, final boolean writeToWAL)
1875       throws IOException {
1876     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1877     // incremented value
1878     RegionCoprocessorEnvironment env = c.getEnvironment();
1879     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1880     User user = getActiveUser(c);
1881     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1882         Action.WRITE);
1883     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1884       authResult.setAllowed(checkCoveringPermission(user, OpType.INCREMENT_COLUMN_VALUE, env, row,
1885         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1886       authResult.setReason("Covering cell set");
1887     }
1888     logResult(authResult);
1889     if (authorizationEnabled && !authResult.isAllowed()) {
1890       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1891     }
1892     return -1;
1893   }
1894
1895   @Override
1896   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1897       throws IOException {
1898     User user = getActiveUser(c);
1899     checkForReservedTagPresence(user, append);
1900
1901     // Require WRITE permission to the table, CF, and the KV to be appended
1902     RegionCoprocessorEnvironment env = c.getEnvironment();
1903     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1904     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1905     logResult(authResult);
1906     if (!authResult.isAllowed()) {
1907       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1908         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1909       } else if (authorizationEnabled)  {
1910         throw new AccessDeniedException("Insufficient permissions " +
1911           authResult.toContextString());
1912       }
1913     }
1914
1915     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1916     if (bytes != null) {
1917       if (cellFeaturesEnabled) {
1918         addCellPermissions(bytes, append.getFamilyCellMap());
1919       } else {
1920         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1921       }
1922     }
1923
1924     return null;
1925   }
1926
1927   @Override
1928   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1929       final Append append) throws IOException {
1930     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1931       // We had failure with table, cf and q perm checks and now giving a chance for cell
1932       // perm check
1933       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1934       AuthResult authResult = null;
1935       User user = getActiveUser(c);
1936       if (checkCoveringPermission(user, OpType.APPEND, c.getEnvironment(), append.getRow(),
1937           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1938         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1939             user, Action.WRITE, table, append.getFamilyCellMap());
1940       } else {
1941         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1942             user, Action.WRITE, table, append.getFamilyCellMap());
1943       }
1944       logResult(authResult);
1945       if (authorizationEnabled && !authResult.isAllowed()) {
1946         throw new AccessDeniedException("Insufficient permissions " +
1947           authResult.toContextString());
1948       }
1949     }
1950     return null;
1951   }
1952
1953   @Override
1954   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1955       final Increment increment)
1956       throws IOException {
1957     User user = getActiveUser(c);
1958     checkForReservedTagPresence(user, increment);
1959
1960     // Require WRITE permission to the table, CF, and the KV to be replaced by
1961     // the incremented value
1962     RegionCoprocessorEnvironment env = c.getEnvironment();
1963     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1964     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1965       Action.WRITE);
1966     logResult(authResult);
1967     if (!authResult.isAllowed()) {
1968       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1969         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1970       } else if (authorizationEnabled) {
1971         throw new AccessDeniedException("Insufficient permissions " +
1972           authResult.toContextString());
1973       }
1974     }
1975
1976     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1977     if (bytes != null) {
1978       if (cellFeaturesEnabled) {
1979         addCellPermissions(bytes, increment.getFamilyCellMap());
1980       } else {
1981         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1982       }
1983     }
1984
1985     return null;
1986   }
1987
1988   @Override
1989   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1990       final Increment increment) throws IOException {
1991     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1992       // We had failure with table, cf and q perm checks and now giving a chance for cell
1993       // perm check
1994       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1995       AuthResult authResult = null;
1996       User user = getActiveUser(c);
1997       if (checkCoveringPermission(user, OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1998           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1999         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
2000             user, Action.WRITE, table, increment.getFamilyCellMap());
2001       } else {
2002         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
2003             user, Action.WRITE, table, increment.getFamilyCellMap());
2004       }
2005       logResult(authResult);
2006       if (authorizationEnabled && !authResult.isAllowed()) {
2007         throw new AccessDeniedException("Insufficient permissions " +
2008           authResult.toContextString());
2009       }
2010     }
2011     return null;
2012   }
2013
2014   @Override
2015   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
2016       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
2017     // If the HFile version is insufficient to persist tags, we won't have any
2018     // work to do here
2019     if (!cellFeaturesEnabled) {
2020       return newCell;
2021     }
2022
2023     // Collect any ACLs from the old cell
2024     List<Tag> tags = Lists.newArrayList();
2025     List<Tag> aclTags = Lists.newArrayList();
2026     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
2027     if (oldCell != null) {
2028       Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell);
2029       while (tagIterator.hasNext()) {
2030         Tag tag = tagIterator.next();
2031         if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
2032           // Not an ACL tag, just carry it through
2033           if (LOG.isTraceEnabled()) {
2034             LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType()
2035                 + " length " + tag.getValueLength());
2036           }
2037           tags.add(tag);
2038         } else {
2039           aclTags.add(tag);
2040         }
2041       }
2042     }
2043
2044     // Do we have an ACL on the operation?
2045     byte[] aclBytes = mutation.getACL();
2046     if (aclBytes != null) {
2047       // Yes, use it
2048       tags.add(new ArrayBackedTag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
2049     } else {
2050       // No, use what we carried forward
2051       if (perms != null) {
2052         // TODO: If we collected ACLs from more than one tag we may have a
2053         // List<Permission> of size > 1, this can be collapsed into a single
2054         // Permission
2055         if (LOG.isTraceEnabled()) {
2056           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
2057         }
2058         tags.addAll(aclTags);
2059       }
2060     }
2061
2062     // If we have no tags to add, just return
2063     if (tags.isEmpty()) {
2064       return newCell;
2065     }
2066
2067     Cell rewriteCell = CellUtil.createCell(newCell, tags);
2068     return rewriteCell;
2069   }
2070
2071   @Override
2072   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2073       final Scan scan, final RegionScanner s) throws IOException {
2074     internalPreRead(c, scan, OpType.SCAN);
2075     return s;
2076   }
2077
2078   @Override
2079   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2080       final Scan scan, final RegionScanner s) throws IOException {
2081     User user = getActiveUser(c);
2082     if (user != null && user.getShortName() != null) {
2083       // store reference to scanner owner for later checks
2084       scannerOwners.put(s, user.getShortName());
2085     }
2086     return s;
2087   }
2088
2089   @Override
2090   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
2091       final InternalScanner s, final List<Result> result,
2092       final int limit, final boolean hasNext) throws IOException {
2093     requireScannerOwner(s);
2094     return hasNext;
2095   }
2096
2097   @Override
2098   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2099       final InternalScanner s) throws IOException {
2100     requireScannerOwner(s);
2101   }
2102
2103   @Override
2104   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2105       final InternalScanner s) throws IOException {
2106     // clean up any associated owner mapping
2107     scannerOwners.remove(s);
2108   }
2109
2110   @Override
2111   public boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> e,
2112       final InternalScanner s, final Cell curRowCell, final boolean hasMore) throws IOException {
2113     // Impl in BaseRegionObserver might do unnecessary copy for Off heap backed Cells.
2114     return hasMore;
2115   }
2116
2117   /**
2118    * Verify, when servicing an RPC, that the caller is the scanner owner.
2119    * If so, we assume that access control is correctly enforced based on
2120    * the checks performed in preScannerOpen()
2121    */
2122   private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
2123     if (!RpcServer.isInRpcCallContext())
2124       return;
2125     String requestUserName = RpcServer.getRequestUserName();
2126     String owner = scannerOwners.get(s);
2127     if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
2128       throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
2129     }
2130   }
2131
2132   /**
2133    * Verifies user has CREATE privileges on
2134    * the Column Families involved in the bulkLoadHFile
2135    * request. Specific Column Write privileges are presently
2136    * ignored.
2137    */
2138   @Override
2139   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
2140       List<Pair<byte[], String>> familyPaths) throws IOException {
2141     User user = getActiveUser(ctx);
2142     for(Pair<byte[],String> el : familyPaths) {
2143       requirePermission(user, "preBulkLoadHFile",
2144           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
2145           el.getFirst(),
2146           null,
2147           Action.CREATE);
2148     }
2149   }
2150
2151   /**
2152    * Authorization check for
2153    * SecureBulkLoadProtocol.prepareBulkLoad()
2154    * @param ctx the context
2155    * @param request the request
2156    * @throws IOException
2157    */
2158   @Override
2159   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2160       PrepareBulkLoadRequest request) throws IOException {
2161     requireAccess(getActiveUser(ctx), "prePrepareBulkLoad",
2162         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2163   }
2164
2165   /**
2166    * Authorization security check for
2167    * SecureBulkLoadProtocol.cleanupBulkLoad()
2168    * @param ctx the context
2169    * @param request the request
2170    * @throws IOException
2171    */
2172   @Override
2173   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2174       CleanupBulkLoadRequest request) throws IOException {
2175     requireAccess(getActiveUser(ctx), "preCleanupBulkLoad",
2176         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2177   }
2178
2179   /* ---- EndpointObserver implementation ---- */
2180
2181   @Override
2182   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2183       Service service, String methodName, Message request) throws IOException {
2184     // Don't intercept calls to our own AccessControlService, we check for
2185     // appropriate permissions in the service handlers
2186     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2187       requirePermission(getActiveUser(ctx),
2188           "invoke(" + service.getDescriptorForType().getName() + "." + methodName + ")",
2189           getTableName(ctx.getEnvironment()), null, null,
2190           Action.EXEC);
2191     }
2192     return request;
2193   }
2194
2195   @Override
2196   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2197       Service service, String methodName, Message request, Message.Builder responseBuilder)
2198       throws IOException { }
2199
2200   /* ---- Protobuf AccessControlService implementation ---- */
2201
2202   @Override
2203   public void grant(RpcController controller,
2204                     AccessControlProtos.GrantRequest request,
2205                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2206     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2207     AccessControlProtos.GrantResponse response = null;
2208     try {
2209       // verify it's only running at .acl.
2210       if (aclRegion) {
2211         if (!initialized) {
2212           throw new CoprocessorException("AccessController not yet initialized");
2213         }
2214         if (LOG.isDebugEnabled()) {
2215           LOG.debug("Received request to grant access permission " + perm.toString());
2216         }
2217         User caller = RpcServer.getRequestUser();
2218
2219         switch(request.getUserPermission().getPermission().getType()) {
2220           case Global :
2221           case Table :
2222             requirePermission(caller, "grant", perm.getTableName(),
2223                 perm.getFamily(), perm.getQualifier(), Action.ADMIN);
2224             break;
2225           case Namespace :
2226             requireNamespacePermission(caller, "grant", perm.getNamespace(), Action.ADMIN);
2227            break;
2228         }
2229
2230         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2231           @Override
2232           public Void run() throws Exception {
2233             AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2234             return null;
2235           }
2236         });
2237
2238         if (AUDITLOG.isTraceEnabled()) {
2239           // audit log should store permission changes in addition to auth results
2240           AUDITLOG.trace("Granted permission " + perm.toString());
2241         }
2242       } else {
2243         throw new CoprocessorException(AccessController.class, "This method "
2244             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2245       }
2246       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2247     } catch (IOException ioe) {
2248       // pass exception back up
2249       ResponseConverter.setControllerException(controller, ioe);
2250     }
2251     done.run(response);
2252   }
2253
2254   @Override
2255   public void revoke(RpcController controller,
2256                      AccessControlProtos.RevokeRequest request,
2257                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2258     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2259     AccessControlProtos.RevokeResponse response = null;
2260     try {
2261       // only allowed to be called on _acl_ region
2262       if (aclRegion) {
2263         if (!initialized) {
2264           throw new CoprocessorException("AccessController not yet initialized");
2265         }
2266         if (LOG.isDebugEnabled()) {
2267           LOG.debug("Received request to revoke access permission " + perm.toString());
2268         }
2269         User caller = RpcServer.getRequestUser();
2270
2271         switch(request.getUserPermission().getPermission().getType()) {
2272           case Global :
2273           case Table :
2274             requirePermission(caller, "revoke", perm.getTableName(), perm.getFamily(),
2275               perm.getQualifier(), Action.ADMIN);
2276             break;
2277           case Namespace :
2278             requireNamespacePermission(caller, "revoke", perm.getNamespace(), Action.ADMIN);
2279             break;
2280         }
2281
2282         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2283           @Override
2284           public Void run() throws Exception {
2285             AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2286             return null;
2287           }
2288         });
2289
2290         if (AUDITLOG.isTraceEnabled()) {
2291           // audit log should record all permission changes
2292           AUDITLOG.trace("Revoked permission " + perm.toString());
2293         }
2294       } else {
2295         throw new CoprocessorException(AccessController.class, "This method "
2296             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2297       }
2298       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2299     } catch (IOException ioe) {
2300       // pass exception back up
2301       ResponseConverter.setControllerException(controller, ioe);
2302     }
2303     done.run(response);
2304   }
2305
2306   @Override
2307   public void getUserPermissions(RpcController controller,
2308                                  AccessControlProtos.GetUserPermissionsRequest request,
2309                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2310     AccessControlProtos.GetUserPermissionsResponse response = null;
2311     try {
2312       // only allowed to be called on _acl_ region
2313       if (aclRegion) {
2314         if (!initialized) {
2315           throw new CoprocessorException("AccessController not yet initialized");
2316         }
2317         User caller = RpcServer.getRequestUser();
2318
2319         List<UserPermission> perms = null;
2320         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2321           final TableName table = request.hasTableName() ?
2322             ProtobufUtil.toTableName(request.getTableName()) : null;
2323           requirePermission(caller, "userPermissions", table, null, null, Action.ADMIN);
2324           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2325             @Override
2326             public List<UserPermission> run() throws Exception {
2327               return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2328             }
2329           });
2330         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2331           final String namespace = request.getNamespaceName().toStringUtf8();
2332           requireNamespacePermission(caller, "userPermissions", namespace, Action.ADMIN);
2333           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2334             @Override
2335             public List<UserPermission> run() throws Exception {
2336               return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2337                 namespace);
2338             }
2339           });
2340         } else {
2341           requirePermission(caller, "userPermissions", Action.ADMIN);
2342           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2343             @Override
2344             public List<UserPermission> run() throws Exception {
2345               return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2346             }
2347           });
2348           // Adding superusers explicitly to the result set as AccessControlLists do not store them.
2349           // Also using acl as table name to be inline  with the results of global admin and will
2350           // help in avoiding any leakage of information about being superusers.
2351           for (String user: Superusers.getSuperUsers()) {
2352             perms.add(new UserPermission(user.getBytes(), AccessControlLists.ACL_TABLE_NAME, null,
2353                 Action.values()));
2354           }
2355         }
2356         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2357       } else {
2358         throw new CoprocessorException(AccessController.class, "This method "
2359             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2360       }
2361     } catch (IOException ioe) {
2362       // pass exception back up
2363       ResponseConverter.setControllerException(controller, ioe);
2364     }
2365     done.run(response);
2366   }
2367
2368   @Override
2369   public void checkPermissions(RpcController controller,
2370                                AccessControlProtos.CheckPermissionsRequest request,
2371                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2372     Permission[] permissions = new Permission[request.getPermissionCount()];
2373     for (int i=0; i < request.getPermissionCount(); i++) {
2374       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2375     }
2376     AccessControlProtos.CheckPermissionsResponse response = null;
2377     try {
2378       User user = RpcServer.getRequestUser();
2379       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2380       for (Permission permission : permissions) {
2381         if (permission instanceof TablePermission) {
2382           // Check table permissions
2383
2384           TablePermission tperm = (TablePermission) permission;
2385           for (Action action : permission.getActions()) {
2386             if (!tperm.getTableName().equals(tableName)) {
2387               throw new CoprocessorException(AccessController.class, String.format("This method "
2388                   + "can only execute at the table specified in TablePermission. " +
2389                   "Table of the region:%s , requested table:%s", tableName,
2390                   tperm.getTableName()));
2391             }
2392
2393             Map<byte[], Set<byte[]>> familyMap =
2394                 new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2395             if (tperm.getFamily() != null) {
2396               if (tperm.getQualifier() != null) {
2397                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2398                 qualifiers.add(tperm.getQualifier());
2399                 familyMap.put(tperm.getFamily(), qualifiers);
2400               } else {
2401                 familyMap.put(tperm.getFamily(), null);
2402               }
2403             }
2404
2405             AuthResult result = permissionGranted("checkPermissions", user, action, regionEnv,
2406               familyMap);
2407             logResult(result);
2408             if (!result.isAllowed()) {
2409               // Even if passive we need to throw an exception here, we support checking
2410               // effective permissions, so throw unconditionally
2411               throw new AccessDeniedException("Insufficient permissions (table=" + tableName +
2412                 (familyMap.size() > 0 ? ", family: " + result.toFamilyString() : "") +
2413                 ", action=" + action.toString() + ")");
2414             }
2415           }
2416
2417         } else {
2418           // Check global permissions
2419
2420           for (Action action : permission.getActions()) {
2421             AuthResult result;
2422             if (authManager.authorize(user, action)) {
2423               result = AuthResult.allow("checkPermissions", "Global action allowed", user,
2424                 action, null, null);
2425             } else {
2426               result = AuthResult.deny("checkPermissions", "Global action denied", user, action,
2427                 null, null);
2428             }
2429             logResult(result);
2430             if (!result.isAllowed()) {
2431               // Even if passive we need to throw an exception here, we support checking
2432               // effective permissions, so throw unconditionally
2433               throw new AccessDeniedException("Insufficient permissions (action=" +
2434                 action.toString() + ")");
2435             }
2436           }
2437         }
2438       }
2439       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2440     } catch (IOException ioe) {
2441       ResponseConverter.setControllerException(controller, ioe);
2442     }
2443     done.run(response);
2444   }
2445
2446   @Override
2447   public Service getService() {
2448     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2449   }
2450
2451   private Region getRegion(RegionCoprocessorEnvironment e) {
2452     return e.getRegion();
2453   }
2454
2455   private TableName getTableName(RegionCoprocessorEnvironment e) {
2456     Region region = e.getRegion();
2457     if (region != null) {
2458       return getTableName(region);
2459     }
2460     return null;
2461   }
2462
2463   private TableName getTableName(Region region) {
2464     HRegionInfo regionInfo = region.getRegionInfo();
2465     if (regionInfo != null) {
2466       return regionInfo.getTable();
2467     }
2468     return null;
2469   }
2470
2471   @Override
2472   public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
2473       throws IOException {
2474     requirePermission(getActiveUser(c), "preClose", Action.ADMIN);
2475   }
2476
2477   private void checkSystemOrSuperUser(User activeUser) throws IOException {
2478     // No need to check if we're not going to throw
2479     if (!authorizationEnabled) {
2480       return;
2481     }
2482     if (!Superusers.isSuperUser(activeUser)) {
2483       throw new AccessDeniedException("User '" + (activeUser != null ?
2484         activeUser.getShortName() : "null") + "' is not system or super user.");
2485     }
2486   }
2487
2488   @Override
2489   public void preStopRegionServer(
2490       ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2491       throws IOException {
2492     requirePermission(getActiveUser(ctx), "preStopRegionServer", Action.ADMIN);
2493   }
2494
2495   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2496       byte[] qualifier) {
2497     if (family == null) {
2498       return null;
2499     }
2500
2501     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2502     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2503     return familyMap;
2504   }
2505
2506   @Override
2507   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2508        List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2509        String regex) throws IOException {
2510     // We are delegating the authorization check to postGetTableDescriptors as we don't have
2511     // any concrete set of table names when a regex is present or the full list is requested.
2512     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2513       // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2514       // request can be granted.
2515       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2516       for (TableName tableName: tableNamesList) {
2517         // Skip checks for a table that does not exist
2518         if (!masterServices.getTableStateManager().isTablePresent(tableName))
2519           continue;
2520         requirePermission(getActiveUser(ctx), "getTableDescriptors", tableName, null, null,
2521             Action.ADMIN, Action.CREATE);
2522       }
2523     }
2524   }
2525
2526   @Override
2527   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2528       List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2529       String regex) throws IOException {
2530     // Skipping as checks in this case are already done by preGetTableDescriptors.
2531     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2532       return;
2533     }
2534
2535     // Retains only those which passes authorization checks, as the checks weren't done as part
2536     // of preGetTableDescriptors.
2537     Iterator<HTableDescriptor> itr = descriptors.iterator();
2538     while (itr.hasNext()) {
2539       HTableDescriptor htd = itr.next();
2540       try {
2541         requirePermission(getActiveUser(ctx), "getTableDescriptors", htd.getTableName(), null, null,
2542             Action.ADMIN, Action.CREATE);
2543       } catch (AccessDeniedException e) {
2544         itr.remove();
2545       }
2546     }
2547   }
2548
2549   @Override
2550   public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2551       List<HTableDescriptor> descriptors, String regex) throws IOException {
2552     // Retains only those which passes authorization checks.
2553     Iterator<HTableDescriptor> itr = descriptors.iterator();
2554     while (itr.hasNext()) {
2555       HTableDescriptor htd = itr.next();
2556       try {
2557         requireAccess(getActiveUser(ctx), "getTableNames", htd.getTableName(), Action.values());
2558       } catch (AccessDeniedException e) {
2559         itr.remove();
2560       }
2561     }
2562   }
2563
2564   @Override
2565   public void preDispatchMerge(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2566       HRegionInfo regionA, HRegionInfo regionB) throws IOException {
2567     requirePermission(getActiveUser(ctx), "mergeRegions", regionA.getTable(), null, null,
2568       Action.ADMIN);
2569   }
2570
2571   @Override
2572   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA,
2573       Region regionB) throws IOException {
2574     requirePermission(getActiveUser(ctx), "mergeRegions", regionA.getTableDesc().getTableName(),
2575         null, null, Action.ADMIN);
2576   }
2577
2578   @Override
2579   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, Region regionA,
2580       Region regionB, Region mergedRegion) throws IOException { }
2581
2582   @Override
2583   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2584       Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException { }
2585
2586   @Override
2587   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2588       Region regionA, Region regionB, Region mergedRegion) throws IOException { }
2589
2590   @Override
2591   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2592       Region regionA, Region regionB) throws IOException { }
2593
2594   @Override
2595   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2596       Region regionA, Region regionB) throws IOException { }
2597
2598   @Override
2599   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2600       throws IOException {
2601     requirePermission(getActiveUser(ctx), "preRollLogWriterRequest", Permission.Action.ADMIN);
2602   }
2603
2604   @Override
2605   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2606       throws IOException { }
2607
2608   @Override
2609   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2610       final String userName, final Quotas quotas) throws IOException {
2611     requirePermission(getActiveUser(ctx), "setUserQuota", Action.ADMIN);
2612   }
2613
2614   @Override
2615   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2616       final String userName, final TableName tableName, final Quotas quotas) throws IOException {
2617     requirePermission(getActiveUser(ctx), "setUserTableQuota", tableName, null, null, Action.ADMIN);
2618   }
2619
2620   @Override
2621   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2622       final String userName, final String namespace, final Quotas quotas) throws IOException {
2623     requirePermission(getActiveUser(ctx), "setUserNamespaceQuota", Action.ADMIN);
2624   }
2625
2626   @Override
2627   public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2628       final TableName tableName, final Quotas quotas) throws IOException {
2629     requirePermission(getActiveUser(ctx), "setTableQuota", tableName, null, null, Action.ADMIN);
2630   }
2631
2632   @Override
2633   public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2634       final String namespace, final Quotas quotas) throws IOException {
2635     requirePermission(getActiveUser(ctx), "setNamespaceQuota", Action.ADMIN);
2636   }
2637
2638   @Override
2639   public ReplicationEndpoint postCreateReplicationEndPoint(
2640       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2641     return endpoint;
2642   }
2643
2644   @Override
2645   public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2646       List<WALEntry> entries, CellScanner cells) throws IOException {
2647     requirePermission(getActiveUser(ctx), "replicateLogEntries", Action.WRITE);
2648   }
2649
2650   @Override
2651   public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2652       List<WALEntry> entries, CellScanner cells) throws IOException {
2653   }
2654
2655   @Override
2656   public void preMoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
2657                              Set<HostAndPort> servers, String targetGroup) throws IOException {
2658     requirePermission(getActiveUser(ctx), "moveServers", Action.ADMIN);
2659   }
2660
2661   @Override
2662   public void preMoveTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
2663                             Set<TableName> tables, String targetGroup) throws IOException {
2664     requirePermission(getActiveUser(ctx), "moveTables", Action.ADMIN);
2665   }
2666
2667   @Override
2668   public void preAddRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2669                             String name) throws IOException {
2670     requirePermission(getActiveUser(ctx), "addRSGroup", Action.ADMIN);
2671   }
2672
2673   @Override
2674   public void preRemoveRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2675                                String name) throws IOException {
2676     requirePermission(getActiveUser(ctx), "removeRSGroup", Action.ADMIN);
2677   }
2678
2679   @Override
2680   public void preBalanceRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2681                                 String groupName) throws IOException {
2682     requirePermission(getActiveUser(ctx), "balanceRSGroup", Action.ADMIN);
2683   }
2684 }