View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package org.apache.hadoop.hbase.security.access;
16  
17  import java.io.IOException;
18  import java.net.InetAddress;
19  import java.security.PrivilegedExceptionAction;
20  import java.util.Collection;
21  import java.util.HashMap;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  import java.util.Set;
27  import java.util.TreeMap;
28  import java.util.TreeSet;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.CellScanner;
35  import org.apache.hadoop.hbase.CellUtil;
36  import org.apache.hadoop.hbase.CompoundConfiguration;
37  import org.apache.hadoop.hbase.CoprocessorEnvironment;
38  import org.apache.hadoop.hbase.DoNotRetryIOException;
39  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.KeyValue.Type;
46  import org.apache.hadoop.hbase.MetaTableAccessor;
47  import org.apache.hadoop.hbase.NamespaceDescriptor;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.Tag;
51  import org.apache.hadoop.hbase.TagRewriteCell;
52  import org.apache.hadoop.hbase.classification.InterfaceAudience;
53  import org.apache.hadoop.hbase.client.Append;
54  import org.apache.hadoop.hbase.client.Delete;
55  import org.apache.hadoop.hbase.client.Durability;
56  import org.apache.hadoop.hbase.client.Get;
57  import org.apache.hadoop.hbase.client.Increment;
58  import org.apache.hadoop.hbase.client.Mutation;
59  import org.apache.hadoop.hbase.client.Put;
60  import org.apache.hadoop.hbase.client.Query;
61  import org.apache.hadoop.hbase.client.Result;
62  import org.apache.hadoop.hbase.client.Scan;
63  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
64  import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
65  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
66  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
67  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
68  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
69  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
70  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
71  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
72  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
73  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
74  import org.apache.hadoop.hbase.filter.CompareFilter;
75  import org.apache.hadoop.hbase.filter.Filter;
76  import org.apache.hadoop.hbase.filter.FilterList;
77  import org.apache.hadoop.hbase.io.hfile.HFile;
78  import org.apache.hadoop.hbase.ipc.RpcServer;
79  import org.apache.hadoop.hbase.master.MasterServices;
80  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
81  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
82  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
83  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
84  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
85  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
86  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
87  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
88  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
89  import org.apache.hadoop.hbase.regionserver.InternalScanner;
90  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
91  import org.apache.hadoop.hbase.regionserver.Region;
92  import org.apache.hadoop.hbase.regionserver.RegionScanner;
93  import org.apache.hadoop.hbase.regionserver.ScanType;
94  import org.apache.hadoop.hbase.regionserver.ScannerContext;
95  import org.apache.hadoop.hbase.regionserver.Store;
96  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
97  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
98  import org.apache.hadoop.hbase.security.AccessDeniedException;
99  import org.apache.hadoop.hbase.security.Superusers;
100 import org.apache.hadoop.hbase.security.User;
101 import org.apache.hadoop.hbase.security.UserProvider;
102 import org.apache.hadoop.hbase.security.access.Permission.Action;
103 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
104 import org.apache.hadoop.hbase.util.ByteRange;
105 import org.apache.hadoop.hbase.util.Bytes;
106 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
107 import org.apache.hadoop.hbase.util.Pair;
108 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
109 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
110 
111 import com.google.common.collect.ArrayListMultimap;
112 import com.google.common.collect.ImmutableSet;
113 import com.google.common.collect.ListMultimap;
114 import com.google.common.collect.Lists;
115 import com.google.common.collect.MapMaker;
116 import com.google.common.collect.Maps;
117 import com.google.common.collect.Sets;
118 import com.google.protobuf.Message;
119 import com.google.protobuf.RpcCallback;
120 import com.google.protobuf.RpcController;
121 import com.google.protobuf.Service;
122 
123 /**
124  * Provides basic authorization checks for data access and administrative
125  * operations.
126  *
127  * <p>
128  * {@code AccessController} performs authorization checks for HBase operations
129  * based on:
130  * </p>
131  * <ul>
132  *   <li>the identity of the user performing the operation</li>
133  *   <li>the scope over which the operation is performed, in increasing
134  *   specificity: global, table, column family, or qualifier</li>
135  *   <li>the type of action being performed (as mapped to
136  *   {@link Permission.Action} values)</li>
137  * </ul>
138  * <p>
139  * If the authorization check fails, an {@link AccessDeniedException}
140  * will be thrown for the operation.
141  * </p>
142  *
143  * <p>
144  * To perform authorization checks, {@code AccessController} relies on the
145  * RpcServerEngine being loaded to provide
146  * the user identities for remote requests.
147  * </p>
148  *
149  * <p>
150  * The access control lists used for authorization can be manipulated via the
151  * exposed {@link AccessControlService} Interface implementation, and the associated
152  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
153  * commands.
154  * </p>
155  */
156 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
157 public class AccessController extends BaseMasterAndRegionObserver
158     implements RegionServerObserver,
159       AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
160 
161   private static final Log LOG = LogFactory.getLog(AccessController.class);
162 
163   private static final Log AUDITLOG =
164     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
165   private static final String CHECK_COVERING_PERM = "check_covering_perm";
166   private static final String TAG_CHECK_PASSED = "tag_check_passed";
167   private static final byte[] TRUE = Bytes.toBytes(true);
168 
169   TableAuthManager authManager = null;
170 
171   /** flags if we are running on a region of the _acl_ table */
172   boolean aclRegion = false;
173 
174   /** defined only for Endpoint implementation, so it can have way to
175    access region services */
176   private RegionCoprocessorEnvironment regionEnv;
177 
178   /** Mapping of scanner instances to the user who created them */
179   private Map<InternalScanner,String> scannerOwners =
180       new MapMaker().weakKeys().makeMap();
181 
182   private Map<TableName, List<UserPermission>> tableAcls;
183 
184   /** Provider for mapping principal names to Users */
185   private UserProvider userProvider;
186 
187   /** if we are active, usually true, only not true if "hbase.security.authorization"
188    has been set to false in site configuration */
189   boolean authorizationEnabled;
190 
191   /** if we are able to support cell ACLs */
192   boolean cellFeaturesEnabled;
193 
194   /** if we should check EXEC permissions */
195   boolean shouldCheckExecPermission;
196 
197   /** if we should terminate access checks early as soon as table or CF grants
198     allow access; pre-0.98 compatible behavior */
199   boolean compatibleEarlyTermination;
200 
201   /** if we have been successfully initialized */
202   private volatile boolean initialized = false;
203 
204   /** if the ACL table is available, only relevant in the master */
205   private volatile boolean aclTabAvailable = false;
206 
207   public Region getRegion() {
208     return regionEnv != null ? regionEnv.getRegion() : null;
209   }
210 
211   public TableAuthManager getAuthManager() {
212     return authManager;
213   }
214 
215   void initialize(RegionCoprocessorEnvironment e) throws IOException {
216     final Region region = e.getRegion();
217     Configuration conf = e.getConfiguration();
218     Map<byte[], ListMultimap<String,TablePermission>> tables =
219         AccessControlLists.loadAll(region);
220     // For each table, write out the table's permissions to the respective
221     // znode for that table.
222     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
223       tables.entrySet()) {
224       byte[] entry = t.getKey();
225       ListMultimap<String,TablePermission> perms = t.getValue();
226       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
227       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
228     }
229     initialized = true;
230   }
231 
232   /**
233    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
234    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
235    * table updates.
236    */
237   void updateACL(RegionCoprocessorEnvironment e,
238       final Map<byte[], List<Cell>> familyMap) {
239     Set<byte[]> entries =
240         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
241     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
242       List<Cell> cells = f.getValue();
243       for (Cell cell: cells) {
244         if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
245             cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
246             AccessControlLists.ACL_LIST_FAMILY.length)) {
247           entries.add(CellUtil.cloneRow(cell));
248         }
249       }
250     }
251     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
252     Configuration conf = regionEnv.getConfiguration();
253     for (byte[] entry: entries) {
254       try {
255         ListMultimap<String,TablePermission> perms =
256           AccessControlLists.getPermissions(conf, entry);
257         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
258         zkw.writeToZookeeper(entry, serialized);
259       } catch (IOException ex) {
260         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
261             ex);
262       }
263     }
264   }
265 
266   /**
267    * Check the current user for authorization to perform a specific action
268    * against the given set of row data.
269    *
270    * <p>Note: Ordering of the authorization checks
271    * has been carefully optimized to short-circuit the most common requests
272    * and minimize the amount of processing required.</p>
273    *
274    * @param permRequest the action being requested
275    * @param e the coprocessor environment
276    * @param families the map of column families to qualifiers present in
277    * the request
278    * @return an authorization result
279    */
280   AuthResult permissionGranted(String request, User user, Action permRequest,
281       RegionCoprocessorEnvironment e,
282       Map<byte [], ? extends Collection<?>> families) {
283     HRegionInfo hri = e.getRegion().getRegionInfo();
284     TableName tableName = hri.getTable();
285 
286     // 1. All users need read access to hbase:meta table.
287     // this is a very common operation, so deal with it quickly.
288     if (hri.isMetaRegion()) {
289       if (permRequest == Action.READ) {
290         return AuthResult.allow(request, "All users allowed", user,
291           permRequest, tableName, families);
292       }
293     }
294 
295     if (user == null) {
296       return AuthResult.deny(request, "No user associated with request!", null,
297         permRequest, tableName, families);
298     }
299 
300     // 2. check for the table-level, if successful we can short-circuit
301     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
302       return AuthResult.allow(request, "Table permission granted", user,
303         permRequest, tableName, families);
304     }
305 
306     // 3. check permissions against the requested families
307     if (families != null && families.size() > 0) {
308       // all families must pass
309       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
310         // a) check for family level access
311         if (authManager.authorize(user, tableName, family.getKey(),
312             permRequest)) {
313           continue;  // family-level permission overrides per-qualifier
314         }
315 
316         // b) qualifier level access can still succeed
317         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
318           if (family.getValue() instanceof Set) {
319             // for each qualifier of the family
320             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
321             for (byte[] qualifier : familySet) {
322               if (!authManager.authorize(user, tableName, family.getKey(),
323                                          qualifier, permRequest)) {
324                 return AuthResult.deny(request, "Failed qualifier check", user,
325                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
326               }
327             }
328           } else if (family.getValue() instanceof List) { // List<KeyValue>
329             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
330             for (KeyValue kv : kvList) {
331               if (!authManager.authorize(user, tableName, family.getKey(),
332                       kv.getQualifier(), permRequest)) {
333                 return AuthResult.deny(request, "Failed qualifier check", user,
334                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
335               }
336             }
337           }
338         } else {
339           // no qualifiers and family-level check already failed
340           return AuthResult.deny(request, "Failed family check", user, permRequest,
341               tableName, makeFamilyMap(family.getKey(), null));
342         }
343       }
344 
345       // all family checks passed
346       return AuthResult.allow(request, "All family checks passed", user, permRequest,
347           tableName, families);
348     }
349 
350     // 4. no families to check and table level access failed
351     return AuthResult.deny(request, "No families to check and table permission failed",
352         user, permRequest, tableName, families);
353   }
354 
355   /**
356    * Check the current user for authorization to perform a specific action
357    * against the given set of row data.
358    * @param opType the operation type
359    * @param user the user
360    * @param e the coprocessor environment
361    * @param families the map of column families to qualifiers present in
362    * the request
363    * @param actions the desired actions
364    * @return an authorization result
365    */
366   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
367       Map<byte [], ? extends Collection<?>> families, Action... actions) {
368     AuthResult result = null;
369     for (Action action: actions) {
370       result = permissionGranted(opType.toString(), user, action, e, families);
371       if (!result.isAllowed()) {
372         return result;
373       }
374     }
375     return result;
376   }
377 
378   private void logResult(AuthResult result) {
379     if (AUDITLOG.isTraceEnabled()) {
380       InetAddress remoteAddr = RpcServer.getRemoteAddress();
381       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
382           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
383           "; reason: " + result.getReason() +
384           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
385           "; request: " + result.getRequest() +
386           "; context: " + result.toContextString());
387     }
388   }
389 
390   /**
391    * Returns the active user to which authorization checks should be applied.
392    * If we are in the context of an RPC call, the remote user is used,
393    * otherwise the currently logged in user is used.
394    */
395   private User getActiveUser() throws IOException {
396     User user = RpcServer.getRequestUser();
397     if (user == null) {
398       // for non-rpc handling, fallback to system user
399       user = userProvider.getCurrent();
400     }
401     return user;
402   }
403 
404   /**
405    * Authorizes that the current user has any of the given permissions for the
406    * given table, column family and column qualifier.
407    * @param tableName Table requested
408    * @param family Column family requested
409    * @param qualifier Column qualifier requested
410    * @throws IOException if obtaining the current user fails
411    * @throws AccessDeniedException if user has no authorization
412    */
413   private void requirePermission(String request, TableName tableName, byte[] family,
414       byte[] qualifier, Action... permissions) throws IOException {
415     User user = getActiveUser();
416     AuthResult result = null;
417 
418     for (Action permission : permissions) {
419       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
420         result = AuthResult.allow(request, "Table permission granted", user,
421                                   permission, tableName, family, qualifier);
422         break;
423       } else {
424         // rest of the world
425         result = AuthResult.deny(request, "Insufficient permissions", user,
426                                  permission, tableName, family, qualifier);
427       }
428     }
429     logResult(result);
430     if (authorizationEnabled && !result.isAllowed()) {
431       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
432     }
433   }
434 
435   /**
436    * Authorizes that the current user has any of the given permissions for the
437    * given table, column family and column qualifier.
438    * @param tableName Table requested
439    * @param family Column family param
440    * @param qualifier Column qualifier param
441    * @throws IOException if obtaining the current user fails
442    * @throws AccessDeniedException if user has no authorization
443    */
444   private void requireTablePermission(String request, TableName tableName, byte[] family,
445       byte[] qualifier, Action... permissions) throws IOException {
446     User user = getActiveUser();
447     AuthResult result = null;
448 
449     for (Action permission : permissions) {
450       if (authManager.authorize(user, tableName, null, null, permission)) {
451         result = AuthResult.allow(request, "Table permission granted", user,
452             permission, tableName, null, null);
453         result.getParams().setFamily(family).setQualifier(qualifier);
454         break;
455       } else {
456         // rest of the world
457         result = AuthResult.deny(request, "Insufficient permissions", user,
458             permission, tableName, family, qualifier);
459         result.getParams().setFamily(family).setQualifier(qualifier);
460       }
461     }
462     logResult(result);
463     if (authorizationEnabled && !result.isAllowed()) {
464       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
465     }
466   }
467 
468   /**
469    * Authorizes that the current user has any of the given permissions to access the table.
470    *
471    * @param tableName Table requested
472    * @param permissions Actions being requested
473    * @throws IOException if obtaining the current user fails
474    * @throws AccessDeniedException if user has no authorization
475    */
476   private void requireAccess(String request, TableName tableName,
477       Action... permissions) throws IOException {
478     User user = getActiveUser();
479     AuthResult result = null;
480 
481     for (Action permission : permissions) {
482       if (authManager.hasAccess(user, tableName, permission)) {
483         result = AuthResult.allow(request, "Table permission granted", user,
484                                   permission, tableName, null, null);
485         break;
486       } else {
487         // rest of the world
488         result = AuthResult.deny(request, "Insufficient permissions", user,
489                                  permission, tableName, null, null);
490       }
491     }
492     logResult(result);
493     if (authorizationEnabled && !result.isAllowed()) {
494       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
495     }
496   }
497 
498   /**
499    * Authorizes that the current user has global privileges for the given action.
500    * @param perm The action being requested
501    * @throws IOException if obtaining the current user fails
502    * @throws AccessDeniedException if authorization is denied
503    */
504   private void requirePermission(String request, Action perm) throws IOException {
505     requireGlobalPermission(request, perm, null, null);
506   }
507 
508   /**
509    * Checks that the user has the given global permission. The generated
510    * audit log message will contain context information for the operation
511    * being authorized, based on the given parameters.
512    * @param perm Action being requested
513    * @param tableName Affected table name.
514    * @param familyMap Affected column families.
515    */
516   private void requireGlobalPermission(String request, Action perm, TableName tableName,
517       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
518     User user = getActiveUser();
519     AuthResult result = null;
520     if (authManager.authorize(user, perm)) {
521       result = AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap);
522       result.getParams().setTableName(tableName).setFamilies(familyMap);
523       logResult(result);
524     } else {
525       result = AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap);
526       result.getParams().setTableName(tableName).setFamilies(familyMap);
527       logResult(result);
528       if (authorizationEnabled) {
529         throw new AccessDeniedException("Insufficient permissions for user '" +
530           (user != null ? user.getShortName() : "null") +"' (global, action=" +
531           perm.toString() + ")");
532       }
533     }
534   }
535 
536   /**
537    * Checks that the user has the given global permission. The generated
538    * audit log message will contain context information for the operation
539    * being authorized, based on the given parameters.
540    * @param perm Action being requested
541    * @param namespace
542    */
543   private void requireGlobalPermission(String request, Action perm,
544                                        String namespace) throws IOException {
545     User user = getActiveUser();
546     AuthResult authResult = null;
547     if (authManager.authorize(user, perm)) {
548       authResult = AuthResult.allow(request, "Global check allowed", user, perm, null);
549       authResult.getParams().setNamespace(namespace);
550       logResult(authResult);
551     } else {
552       authResult = AuthResult.deny(request, "Global check failed", user, perm, null);
553       authResult.getParams().setNamespace(namespace);
554       logResult(authResult);
555       if (authorizationEnabled) {
556         throw new AccessDeniedException("Insufficient permissions for user '" +
557           (user != null ? user.getShortName() : "null") +"' (global, action=" +
558           perm.toString() + ")");
559       }
560     }
561   }
562 
563   /**
564    * Checks that the user has the given global or namespace permission.
565    * @param namespace
566    * @param permissions Actions being requested
567    */
568   public void requireNamespacePermission(String request, String namespace,
569       Action... permissions) throws IOException {
570     User user = getActiveUser();
571     AuthResult result = null;
572 
573     for (Action permission : permissions) {
574       if (authManager.authorize(user, namespace, permission)) {
575         result = AuthResult.allow(request, "Namespace permission granted",
576             user, permission, namespace);
577         break;
578       } else {
579         // rest of the world
580         result = AuthResult.deny(request, "Insufficient permissions", user,
581             permission, namespace);
582       }
583     }
584     logResult(result);
585     if (authorizationEnabled && !result.isAllowed()) {
586       throw new AccessDeniedException("Insufficient permissions "
587           + result.toContextString());
588     }
589   }
590 
591   /**
592    * Checks that the user has the given global or namespace permission.
593    * @param namespace
594    * @param permissions Actions being requested
595    */
596   public void requireNamespacePermission(String request, String namespace, TableName tableName,
597       Map<byte[], ? extends Collection<byte[]>> familyMap, Action... permissions)
598       throws IOException {
599     User user = getActiveUser();
600     AuthResult result = null;
601 
602     for (Action permission : permissions) {
603       if (authManager.authorize(user, namespace, permission)) {
604         result = AuthResult.allow(request, "Namespace permission granted",
605             user, permission, namespace);
606         result.getParams().setTableName(tableName).setFamilies(familyMap);
607         break;
608       } else {
609         // rest of the world
610         result = AuthResult.deny(request, "Insufficient permissions", user,
611             permission, namespace);
612         result.getParams().setTableName(tableName).setFamilies(familyMap);
613       }
614     }
615     logResult(result);
616     if (authorizationEnabled && !result.isAllowed()) {
617       throw new AccessDeniedException("Insufficient permissions "
618           + result.toContextString());
619     }
620   }
621 
622   /**
623    * Returns <code>true</code> if the current user is allowed the given action
624    * over at least one of the column qualifiers in the given column families.
625    */
626   private boolean hasFamilyQualifierPermission(User user,
627       Action perm,
628       RegionCoprocessorEnvironment env,
629       Map<byte[], ? extends Collection<byte[]>> familyMap)
630     throws IOException {
631     HRegionInfo hri = env.getRegion().getRegionInfo();
632     TableName tableName = hri.getTable();
633 
634     if (user == null) {
635       return false;
636     }
637 
638     if (familyMap != null && familyMap.size() > 0) {
639       // at least one family must be allowed
640       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
641           familyMap.entrySet()) {
642         if (family.getValue() != null && !family.getValue().isEmpty()) {
643           for (byte[] qualifier : family.getValue()) {
644             if (authManager.matchPermission(user, tableName,
645                 family.getKey(), qualifier, perm)) {
646               return true;
647             }
648           }
649         } else {
650           if (authManager.matchPermission(user, tableName, family.getKey(),
651               perm)) {
652             return true;
653           }
654         }
655       }
656     } else if (LOG.isDebugEnabled()) {
657       LOG.debug("Empty family map passed for permission check");
658     }
659 
660     return false;
661   }
662 
663   private enum OpType {
664     GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
665     GET("get"),
666     EXISTS("exists"),
667     SCAN("scan"),
668     PUT("put"),
669     DELETE("delete"),
670     CHECK_AND_PUT("checkAndPut"),
671     CHECK_AND_DELETE("checkAndDelete"),
672     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
673     APPEND("append"),
674     INCREMENT("increment");
675 
676     private String type;
677 
678     private OpType(String type) {
679       this.type = type;
680     }
681 
682     @Override
683     public String toString() {
684       return type;
685     }
686   }
687 
688   /**
689    * Determine if cell ACLs covered by the operation grant access. This is expensive.
690    * @return false if cell ACLs failed to grant access, true otherwise
691    * @throws IOException
692    */
693   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
694       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
695       throws IOException {
696     if (!cellFeaturesEnabled) {
697       return false;
698     }
699     long cellGrants = 0;
700     User user = getActiveUser();
701     long latestCellTs = 0;
702     Get get = new Get(row);
703     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
704     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
705     // version. We have to get every cell version and check its TS against the TS asked for in
706     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
707     // consider only one such passing cell. In case of Delete we have to consider all the cell
708     // versions under this passing version. When Delete Mutation contains columns which are a
709     // version delete just consider only one version for those column cells.
710     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
711     if (considerCellTs) {
712       get.setMaxVersions();
713     } else {
714       get.setMaxVersions(1);
715     }
716     boolean diffCellTsFromOpTs = false;
717     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
718       byte[] col = entry.getKey();
719       // TODO: HBASE-7114 could possibly unify the collection type in family
720       // maps so we would not need to do this
721       if (entry.getValue() instanceof Set) {
722         Set<byte[]> set = (Set<byte[]>)entry.getValue();
723         if (set == null || set.isEmpty()) {
724           get.addFamily(col);
725         } else {
726           for (byte[] qual: set) {
727             get.addColumn(col, qual);
728           }
729         }
730       } else if (entry.getValue() instanceof List) {
731         List<Cell> list = (List<Cell>)entry.getValue();
732         if (list == null || list.isEmpty()) {
733           get.addFamily(col);
734         } else {
735           // In case of family delete, a Cell will be added into the list with Qualifier as null.
736           for (Cell cell : list) {
737             if (cell.getQualifierLength() == 0
738                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
739                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
740               get.addFamily(col);
741             } else {
742               get.addColumn(col, CellUtil.cloneQualifier(cell));
743             }
744             if (considerCellTs) {
745               long cellTs = cell.getTimestamp();
746               latestCellTs = Math.max(latestCellTs, cellTs);
747               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
748             }
749           }
750         }
751       } else if (entry.getValue() == null) {
752         get.addFamily(col);        
753       } else {
754         throw new RuntimeException("Unhandled collection type " +
755           entry.getValue().getClass().getName());
756       }
757     }
758     // We want to avoid looking into the future. So, if the cells of the
759     // operation specify a timestamp, or the operation itself specifies a
760     // timestamp, then we use the maximum ts found. Otherwise, we bound
761     // the Get to the current server time. We add 1 to the timerange since
762     // the upper bound of a timerange is exclusive yet we need to examine
763     // any cells found there inclusively.
764     long latestTs = Math.max(opTs, latestCellTs);
765     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
766       latestTs = EnvironmentEdgeManager.currentTime();
767     }
768     get.setTimeRange(0, latestTs + 1);
769     // In case of Put operation we set to read all versions. This was done to consider the case
770     // where columns are added with TS other than the Mutation TS. But normally this wont be the
771     // case with Put. There no need to get all versions but get latest version only.
772     if (!diffCellTsFromOpTs && request == OpType.PUT) {
773       get.setMaxVersions(1);
774     }
775     if (LOG.isTraceEnabled()) {
776       LOG.trace("Scanning for cells with " + get);
777     }
778     // This Map is identical to familyMap. The key is a BR rather than byte[].
779     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
780     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
781     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
782     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
783       if (entry.getValue() instanceof List) {
784         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
785       }
786     }
787     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
788     List<Cell> cells = Lists.newArrayList();
789     Cell prevCell = null;
790     ByteRange curFam = new SimpleMutableByteRange();
791     boolean curColAllVersions = (request == OpType.DELETE);
792     long curColCheckTs = opTs;
793     boolean foundColumn = false;
794     try {
795       boolean more = false;
796       ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
797 
798       do {
799         cells.clear();
800         // scan with limit as 1 to hold down memory use on wide rows
801         more = scanner.next(cells, scannerContext);
802         for (Cell cell: cells) {
803           if (LOG.isTraceEnabled()) {
804             LOG.trace("Found cell " + cell);
805           }
806           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
807           if (colChange) foundColumn = false;
808           prevCell = cell;
809           if (!curColAllVersions && foundColumn) {
810             continue;
811           }
812           if (colChange && considerCellTs) {
813             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
814             List<Cell> cols = familyMap1.get(curFam);
815             for (Cell col : cols) {
816               // null/empty qualifier is used to denote a Family delete. The TS and delete type
817               // associated with this is applicable for all columns within the family. That is
818               // why the below (col.getQualifierLength() == 0) check.
819               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
820                   || CellUtil.matchingQualifier(cell, col)) {
821                 byte type = col.getTypeByte();
822                 if (considerCellTs) {
823                   curColCheckTs = col.getTimestamp();
824                 }
825                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
826                 // a version delete for a column no need to check all the covering cells within
827                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
828                 // One version delete types are Delete/DeleteFamilyVersion
829                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
830                     || (KeyValue.Type.DeleteFamily.getCode() == type);
831                 break;
832               }
833             }
834           }
835           if (cell.getTimestamp() > curColCheckTs) {
836             // Just ignore this cell. This is not a covering cell.
837             continue;
838           }
839           foundColumn = true;
840           for (Action action: actions) {
841             // Are there permissions for this user for the cell?
842             if (!authManager.authorize(user, getTableName(e), cell, action)) {
843               // We can stop if the cell ACL denies access
844               return false;
845             }
846           }
847           cellGrants++;
848         }
849       } while (more);
850     } catch (AccessDeniedException ex) {
851       throw ex;
852     } catch (IOException ex) {
853       LOG.error("Exception while getting cells to calculate covering permission", ex);
854     } finally {
855       scanner.close();
856     }
857     // We should not authorize unless we have found one or more cell ACLs that
858     // grant access. This code is used to check for additional permissions
859     // after no table or CF grants are found.
860     return cellGrants > 0;
861   }
862 
863   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
864     // Iterate over the entries in the familyMap, replacing the cells therein
865     // with new cells including the ACL data
866     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
867       List<Cell> newCells = Lists.newArrayList();
868       for (Cell cell: e.getValue()) {
869         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
870         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
871         if (cell.getTagsLength() > 0) {
872           Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
873             cell.getTagsOffset(), cell.getTagsLength());
874           while (tagIterator.hasNext()) {
875             tags.add(tagIterator.next());
876           }
877         }
878         newCells.add(new TagRewriteCell(cell, Tag.fromList(tags)));
879       }
880       // This is supposed to be safe, won't CME
881       e.setValue(newCells);
882     }
883   }
884 
885   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
886   // type is reserved and should not be explicitly set by user.
887   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
888     // No need to check if we're not going to throw
889     if (!authorizationEnabled) {
890       m.setAttribute(TAG_CHECK_PASSED, TRUE);
891       return;
892     }
893     // Superusers are allowed to store cells unconditionally.
894     if (Superusers.isSuperUser(user)) {
895       m.setAttribute(TAG_CHECK_PASSED, TRUE);
896       return;
897     }
898     // We already checked (prePut vs preBatchMutation)
899     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
900       return;
901     }
902     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
903       Cell cell = cellScanner.current();
904       if (cell.getTagsLength() > 0) {
905         Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
906           cell.getTagsLength());
907         while (tagsItr.hasNext()) {
908           if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
909             throw new AccessDeniedException("Mutation contains cell with reserved type tag");
910           }
911         }
912       }
913     }
914     m.setAttribute(TAG_CHECK_PASSED, TRUE);
915   }
916 
917   /* ---- MasterObserver implementation ---- */
918   @Override
919   public void start(CoprocessorEnvironment env) throws IOException {
920     CompoundConfiguration conf = new CompoundConfiguration();
921     conf.add(env.getConfiguration());
922 
923     authorizationEnabled = conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
924     if (!authorizationEnabled) {
925       LOG.warn("The AccessController has been loaded with authorization checks disabled.");
926     }
927 
928     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
929       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
930 
931     cellFeaturesEnabled = HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS;
932     if (!cellFeaturesEnabled) {
933       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
934           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
935           + " accordingly.");
936     }
937 
938     ZooKeeperWatcher zk = null;
939     if (env instanceof MasterCoprocessorEnvironment) {
940       // if running on HMaster
941       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
942       zk = mEnv.getMasterServices().getZooKeeper();
943     } else if (env instanceof RegionServerCoprocessorEnvironment) {
944       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
945       zk = rsEnv.getRegionServerServices().getZooKeeper();
946     } else if (env instanceof RegionCoprocessorEnvironment) {
947       // if running at region
948       regionEnv = (RegionCoprocessorEnvironment) env;
949       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
950       zk = regionEnv.getRegionServerServices().getZooKeeper();
951       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
952         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
953     }
954 
955     // set the user-provider.
956     this.userProvider = UserProvider.instantiate(env.getConfiguration());
957 
958     // If zk is null or IOException while obtaining auth manager,
959     // throw RuntimeException so that the coprocessor is unloaded.
960     if (zk != null) {
961       try {
962         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
963       } catch (IOException ioe) {
964         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
965       }
966     } else {
967       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
968     }
969 
970     tableAcls = new MapMaker().weakValues().makeMap();
971   }
972 
973   @Override
974   public void stop(CoprocessorEnvironment env) {
975 
976   }
977 
978   @Override
979   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
980       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
981     Set<byte[]> families = desc.getFamiliesKeys();
982     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
983     for (byte[] family: families) {
984       familyMap.put(family, null);
985     }
986     requireNamespacePermission("createTable", desc.getTableName().getNamespaceAsString(),
987         desc.getTableName(), familyMap, Action.CREATE);
988   }
989 
990   @Override
991   public void postCreateTableHandler(final ObserverContext<MasterCoprocessorEnvironment> c,
992       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
993     // When AC is used, it should be configured as the 1st CP.
994     // In Master, the table operations like create, are handled by a Thread pool but the max size
995     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
996     // sequentially only.
997     // Related code in HMaster#startServiceThreads
998     // {code}
999     //   // We depend on there being only one instance of this executor running
1000     //   // at a time. To do concurrency, would need fencing of enable/disable of
1001     //   // tables.
1002     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1003     // {code}
1004     // In future if we change this pool to have more threads, then there is a chance for thread,
1005     // creating acl table, getting delayed and by that time another table creation got over and
1006     // this hook is getting called. In such a case, we will need a wait logic here which will
1007     // wait till the acl table is created.
1008     if (AccessControlLists.isAclTable(desc)) {
1009       this.aclTabAvailable = true;
1010     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
1011       if (!aclTabAvailable) {
1012         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
1013             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
1014             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
1015       } else {
1016         String owner = desc.getOwnerString();
1017         // default the table owner to current user, if not specified.
1018         if (owner == null)
1019           owner = getActiveUser().getShortName();
1020         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1021             desc.getTableName(), null, Action.values());
1022         // switch to the real hbase master user for doing the RPC on the ACL table
1023         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1024           @Override
1025           public Void run() throws Exception {
1026             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
1027                 userperm);
1028             return null;
1029           }
1030         });
1031       }
1032     }
1033   }
1034 
1035   @Override
1036   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1037       throws IOException {
1038     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1039   }
1040 
1041   @Override
1042   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
1043       final TableName tableName) throws IOException {
1044     final Configuration conf = c.getEnvironment().getConfiguration();
1045     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1046       @Override
1047       public Void run() throws Exception {
1048         AccessControlLists.removeTablePermissions(conf, tableName);
1049         return null;
1050       }
1051     });
1052     this.authManager.getZKPermissionWatcher().deleteTableACLNode(tableName);
1053   }
1054 
1055   @Override
1056   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
1057       final TableName tableName) throws IOException {
1058     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1059 
1060     final Configuration conf = c.getEnvironment().getConfiguration();
1061     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1062       @Override
1063       public Void run() throws Exception {
1064         List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
1065         if (acls != null) {
1066           tableAcls.put(tableName, acls);
1067         }
1068         return null;
1069       }
1070     });
1071   }
1072 
1073   @Override
1074   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
1075       final TableName tableName) throws IOException {
1076     final Configuration conf = ctx.getEnvironment().getConfiguration();
1077     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1078       @Override
1079       public Void run() throws Exception {
1080         List<UserPermission> perms = tableAcls.get(tableName);
1081         if (perms != null) {
1082           for (UserPermission perm : perms) {
1083             AccessControlLists.addUserPermission(conf, perm);
1084           }
1085         }
1086         tableAcls.remove(tableName);
1087         return null;
1088       }
1089     });
1090   }
1091 
1092   @Override
1093   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1094       HTableDescriptor htd) throws IOException {
1095     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1096   }
1097 
1098   @Override
1099   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
1100       TableName tableName, final HTableDescriptor htd) throws IOException {
1101     final Configuration conf = c.getEnvironment().getConfiguration();
1102     // default the table owner to current user, if not specified.
1103     final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
1104       getActiveUser().getShortName();
1105     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1106       @Override
1107       public Void run() throws Exception {
1108         UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1109           htd.getTableName(), null, Action.values());
1110         AccessControlLists.addUserPermission(conf, userperm);
1111         return null;
1112       }
1113     });
1114   }
1115 
1116   @Override
1117   public void preAddColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1118                                  TableName tableName, HColumnDescriptor columnFamily)
1119       throws IOException {
1120     requireTablePermission("addColumn", tableName, columnFamily.getName(), null, Action.ADMIN,
1121                            Action.CREATE);
1122   }
1123 
1124   @Override
1125   public void preModifyColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1126                                     TableName tableName, HColumnDescriptor columnFamily)
1127       throws IOException {
1128     requirePermission("modifyColumn", tableName, columnFamily.getName(), null, Action.ADMIN,
1129       Action.CREATE);
1130   }
1131 
1132   @Override
1133   public void preDeleteColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1134                                     TableName tableName, byte[] columnFamily) throws IOException {
1135     requirePermission("deleteColumn", tableName, columnFamily, null, Action.ADMIN, Action.CREATE);
1136   }
1137 
1138   @Override
1139   public void postDeleteColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1140       final TableName tableName, final byte[] columnFamily) throws IOException {
1141     final Configuration conf = ctx.getEnvironment().getConfiguration();
1142     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1143       @Override
1144       public Void run() throws Exception {
1145         AccessControlLists.removeTablePermissions(conf, tableName, columnFamily);
1146         return null;
1147       }
1148     });
1149   }
1150 
1151   @Override
1152   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1153       throws IOException {
1154     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1155   }
1156 
1157   @Override
1158   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1159       throws IOException {
1160     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1161       // We have to unconditionally disallow disable of the ACL table when we are installed,
1162       // even if not enforcing authorizations. We are still allowing grants and revocations,
1163       // checking permissions and logging audit messages, etc. If the ACL table is not
1164       // available we will fail random actions all over the place.
1165       throw new AccessDeniedException("Not allowed to disable "
1166           + AccessControlLists.ACL_TABLE_NAME + " table with AccessController installed");
1167     }
1168     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1169   }
1170 
1171   @Override
1172   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1173       ServerName srcServer, ServerName destServer) throws IOException {
1174     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1175   }
1176 
1177   @Override
1178   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1179       throws IOException {
1180     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1181   }
1182 
1183   @Override
1184   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1185       boolean force) throws IOException {
1186     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1187   }
1188 
1189   @Override
1190   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1191       HRegionInfo regionInfo) throws IOException {
1192     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1193   }
1194 
1195   @Override
1196   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1197       throws IOException {
1198     requirePermission("balance", Action.ADMIN);
1199   }
1200 
1201   @Override
1202   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1203       boolean newValue) throws IOException {
1204     requirePermission("balanceSwitch", Action.ADMIN);
1205     return newValue;
1206   }
1207 
1208   @Override
1209   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1210       throws IOException {
1211     requirePermission("shutdown", Action.ADMIN);
1212   }
1213 
1214   @Override
1215   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1216       throws IOException {
1217     requirePermission("stopMaster", Action.ADMIN);
1218   }
1219 
1220   @Override
1221   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1222       throws IOException {
1223     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1224       .getConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1225       // initialize the ACL storage table
1226       AccessControlLists.createACLTable(ctx.getEnvironment().getMasterServices());
1227     } else {
1228       aclTabAvailable = true;
1229     }
1230   }
1231 
1232   @Override
1233   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1234       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1235       throws IOException {
1236     requirePermission("snapshot", hTableDescriptor.getTableName(), null, null,
1237       Permission.Action.ADMIN);
1238   }
1239 
1240   @Override
1241   public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1242       final SnapshotDescription snapshot) throws IOException {
1243     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1244       // list it, if user is the owner of snapshot
1245       // TODO: We are not logging this for audit
1246     } else {
1247       requirePermission("listSnapshot", Action.ADMIN);
1248     }
1249   }
1250 
1251   @Override
1252   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1253       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1254       throws IOException {
1255     requirePermission("clone", Action.ADMIN);
1256   }
1257 
1258   @Override
1259   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1260       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1261       throws IOException {
1262     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1263       requirePermission("restoreSnapshot", hTableDescriptor.getTableName(), null, null,
1264         Permission.Action.ADMIN);
1265     } else {
1266       requirePermission("restoreSnapshot", Action.ADMIN);
1267     }
1268   }
1269 
1270   @Override
1271   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1272       final SnapshotDescription snapshot) throws IOException {
1273     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1274       // Snapshot owner is allowed to delete the snapshot
1275       // TODO: We are not logging this for audit
1276     } else {
1277       requirePermission("deleteSnapshot", Action.ADMIN);
1278     }
1279   }
1280 
1281   @Override
1282   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1283       NamespaceDescriptor ns) throws IOException {
1284     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1285   }
1286 
1287   @Override
1288   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1289       throws IOException {
1290     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1291   }
1292 
1293   @Override
1294   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1295       final String namespace) throws IOException {
1296     final Configuration conf = ctx.getEnvironment().getConfiguration();
1297     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1298       @Override
1299       public Void run() throws Exception {
1300         AccessControlLists.removeNamespacePermissions(conf, namespace);
1301         return null;
1302       }
1303     });
1304     this.authManager.getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
1305     LOG.info(namespace + " entry deleted in " + AccessControlLists.ACL_TABLE_NAME + " table.");
1306   }
1307 
1308   @Override
1309   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1310       NamespaceDescriptor ns) throws IOException {
1311     // We require only global permission so that 
1312     // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1313     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1314   }
1315 
1316   @Override
1317   public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1318       throws IOException {
1319     requireNamespacePermission("getNamespaceDescriptor", namespace, Action.ADMIN);
1320   }
1321 
1322   @Override
1323   public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1324       List<NamespaceDescriptor> descriptors) throws IOException {
1325     // Retains only those which passes authorization checks, as the checks weren't done as part
1326     // of preGetTableDescriptors.
1327     Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1328     while (itr.hasNext()) {
1329       NamespaceDescriptor desc = itr.next();
1330       try {
1331         requireNamespacePermission("listNamespaces", desc.getName(), Action.ADMIN);
1332       } catch (AccessDeniedException e) {
1333         itr.remove();
1334       }
1335     }
1336   }
1337 
1338   @Override
1339   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1340       final TableName tableName) throws IOException {
1341     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1342   }
1343 
1344   /* ---- RegionObserver implementation ---- */
1345 
1346   @Override
1347   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1348       throws IOException {
1349     RegionCoprocessorEnvironment env = e.getEnvironment();
1350     final Region region = env.getRegion();
1351     if (region == null) {
1352       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1353     } else {
1354       HRegionInfo regionInfo = region.getRegionInfo();
1355       if (regionInfo.getTable().isSystemTable()) {
1356         checkSystemOrSuperUser();
1357       } else {
1358         requirePermission("preOpen", Action.ADMIN);
1359       }
1360     }
1361   }
1362 
1363   @Override
1364   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1365     RegionCoprocessorEnvironment env = c.getEnvironment();
1366     final Region region = env.getRegion();
1367     if (region == null) {
1368       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1369       return;
1370     }
1371     if (AccessControlLists.isAclRegion(region)) {
1372       aclRegion = true;
1373       // When this region is under recovering state, initialize will be handled by postLogReplay
1374       if (!region.isRecovering()) {
1375         try {
1376           initialize(env);
1377         } catch (IOException ex) {
1378           // if we can't obtain permissions, it's better to fail
1379           // than perform checks incorrectly
1380           throw new RuntimeException("Failed to initialize permissions cache", ex);
1381         }
1382       }
1383     } else {
1384       initialized = true;
1385     }
1386   }
1387 
1388   @Override
1389   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1390     if (aclRegion) {
1391       try {
1392         initialize(c.getEnvironment());
1393       } catch (IOException ex) {
1394         // if we can't obtain permissions, it's better to fail
1395         // than perform checks incorrectly
1396         throw new RuntimeException("Failed to initialize permissions cache", ex);
1397       }
1398     }
1399   }
1400 
1401   @Override
1402   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1403     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1404         Action.CREATE);
1405   }
1406 
1407   @Override
1408   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1409     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1410   }
1411 
1412   @Override
1413   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1414       byte[] splitRow) throws IOException {
1415     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1416   }
1417 
1418   @Override
1419   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1420       final Store store, final InternalScanner scanner, final ScanType scanType)
1421           throws IOException {
1422     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1423         Action.CREATE);
1424     return scanner;
1425   }
1426 
1427   @Override
1428   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1429       final byte [] row, final byte [] family, final Result result)
1430       throws IOException {
1431     assert family != null;
1432     RegionCoprocessorEnvironment env = c.getEnvironment();
1433     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, null);
1434     User user = getActiveUser();
1435     AuthResult authResult = permissionGranted(OpType.GET_CLOSEST_ROW_BEFORE, user, env, families,
1436       Action.READ);
1437     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1438       authResult.setAllowed(checkCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, env, row,
1439         families, HConstants.LATEST_TIMESTAMP, Action.READ));
1440       authResult.setReason("Covering cell set");
1441     }
1442     logResult(authResult);
1443     if (authorizationEnabled && !authResult.isAllowed()) {
1444       throw new AccessDeniedException("Insufficient permissions " +
1445         authResult.toContextString());
1446     }
1447   }
1448 
1449   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1450       final Query query, OpType opType) throws IOException {
1451     Filter filter = query.getFilter();
1452     // Don't wrap an AccessControlFilter
1453     if (filter != null && filter instanceof AccessControlFilter) {
1454       return;
1455     }
1456     User user = getActiveUser();
1457     RegionCoprocessorEnvironment env = c.getEnvironment();
1458     Map<byte[],? extends Collection<byte[]>> families = null;
1459     switch (opType) {
1460     case GET:
1461     case EXISTS:
1462       families = ((Get)query).getFamilyMap();
1463       break;
1464     case SCAN:
1465       families = ((Scan)query).getFamilyMap();
1466       break;
1467     default:
1468       throw new RuntimeException("Unhandled operation " + opType);
1469     }
1470     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1471     Region region = getRegion(env);
1472     TableName table = getTableName(region);
1473     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1474     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1475       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1476     }
1477     if (!authResult.isAllowed()) {
1478       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1479         // Old behavior: Scan with only qualifier checks if we have partial
1480         // permission. Backwards compatible behavior is to throw an
1481         // AccessDeniedException immediately if there are no grants for table
1482         // or CF or CF+qual. Only proceed with an injected filter if there are
1483         // grants for qualifiers. Otherwise we will fall through below and log
1484         // the result and throw an ADE. We may end up checking qualifier
1485         // grants three times (permissionGranted above, here, and in the
1486         // filter) but that's the price of backwards compatibility.
1487         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1488           authResult.setAllowed(true);
1489           authResult.setReason("Access allowed with filter");
1490           // Only wrap the filter if we are enforcing authorizations
1491           if (authorizationEnabled) {
1492             Filter ourFilter = new AccessControlFilter(authManager, user, table,
1493               AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1494               cfVsMaxVersions);
1495             // wrap any existing filter
1496             if (filter != null) {
1497               ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1498                 Lists.newArrayList(ourFilter, filter));
1499             }
1500             switch (opType) {
1501               case GET:
1502               case EXISTS:
1503                 ((Get)query).setFilter(ourFilter);
1504                 break;
1505               case SCAN:
1506                 ((Scan)query).setFilter(ourFilter);
1507                 break;
1508               default:
1509                 throw new RuntimeException("Unhandled operation " + opType);
1510             }
1511           }
1512         }
1513       } else {
1514         // New behavior: Any access we might be granted is more fine-grained
1515         // than whole table or CF. Simply inject a filter and return what is
1516         // allowed. We will not throw an AccessDeniedException. This is a
1517         // behavioral change since 0.96.
1518         authResult.setAllowed(true);
1519         authResult.setReason("Access allowed with filter");
1520         // Only wrap the filter if we are enforcing authorizations
1521         if (authorizationEnabled) {
1522           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1523             AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1524           // wrap any existing filter
1525           if (filter != null) {
1526             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1527               Lists.newArrayList(ourFilter, filter));
1528           }
1529           switch (opType) {
1530             case GET:
1531             case EXISTS:
1532               ((Get)query).setFilter(ourFilter);
1533               break;
1534             case SCAN:
1535               ((Scan)query).setFilter(ourFilter);
1536               break;
1537             default:
1538               throw new RuntimeException("Unhandled operation " + opType);
1539           }
1540         }
1541       }
1542     }
1543 
1544     logResult(authResult);
1545     if (authorizationEnabled && !authResult.isAllowed()) {
1546       throw new AccessDeniedException("Insufficient permissions for user '"
1547           + (user != null ? user.getShortName() : "null")
1548           + "' (table=" + table + ", action=READ)");
1549     }
1550   }
1551 
1552   @Override
1553   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1554       final Get get, final List<Cell> result) throws IOException {
1555     internalPreRead(c, get, OpType.GET);
1556   }
1557 
1558   @Override
1559   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1560       final Get get, final boolean exists) throws IOException {
1561     internalPreRead(c, get, OpType.EXISTS);
1562     return exists;
1563   }
1564 
1565   @Override
1566   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1567       final Put put, final WALEdit edit, final Durability durability)
1568       throws IOException {
1569     User user = getActiveUser();
1570     checkForReservedTagPresence(user, put);
1571 
1572     // Require WRITE permission to the table, CF, or top visible value, if any.
1573     // NOTE: We don't need to check the permissions for any earlier Puts
1574     // because we treat the ACLs in each Put as timestamped like any other
1575     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1576     // change the ACL of any previous Put. This allows simple evolution of
1577     // security policy over time without requiring expensive updates.
1578     RegionCoprocessorEnvironment env = c.getEnvironment();
1579     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1580     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1581     logResult(authResult);
1582     if (!authResult.isAllowed()) {
1583       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1584         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1585       } else if (authorizationEnabled) {
1586         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1587       }
1588     }
1589 
1590     // Add cell ACLs from the operation to the cells themselves
1591     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1592     if (bytes != null) {
1593       if (cellFeaturesEnabled) {
1594         addCellPermissions(bytes, put.getFamilyCellMap());
1595       } else {
1596         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1597       }
1598     }
1599   }
1600 
1601   @Override
1602   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1603       final Put put, final WALEdit edit, final Durability durability) {
1604     if (aclRegion) {
1605       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1606     }
1607   }
1608 
1609   @Override
1610   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1611       final Delete delete, final WALEdit edit, final Durability durability)
1612       throws IOException {
1613     // An ACL on a delete is useless, we shouldn't allow it
1614     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1615       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1616     }
1617     // Require WRITE permissions on all cells covered by the delete. Unlike
1618     // for Puts we need to check all visible prior versions, because a major
1619     // compaction could remove them. If the user doesn't have permission to
1620     // overwrite any of the visible versions ('visible' defined as not covered
1621     // by a tombstone already) then we have to disallow this operation.
1622     RegionCoprocessorEnvironment env = c.getEnvironment();
1623     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1624     User user = getActiveUser();
1625     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1626     logResult(authResult);
1627     if (!authResult.isAllowed()) {
1628       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1629         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1630       } else if (authorizationEnabled) {
1631         throw new AccessDeniedException("Insufficient permissions " +
1632           authResult.toContextString());
1633       }
1634     }
1635   }
1636 
1637   @Override
1638   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1639       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1640     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1641       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1642       for (int i = 0; i < miniBatchOp.size(); i++) {
1643         Mutation m = miniBatchOp.getOperation(i);
1644         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1645           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1646           // perm check
1647           OpType opType;
1648           if (m instanceof Put) {
1649             checkForReservedTagPresence(getActiveUser(), m);
1650             opType = OpType.PUT;
1651           } else {
1652             opType = OpType.DELETE;
1653           }
1654           AuthResult authResult = null;
1655           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(),
1656             m.getFamilyCellMap(), m.getTimeStamp(), Action.WRITE)) {
1657             authResult = AuthResult.allow(opType.toString(), "Covering cell set",
1658               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1659           } else {
1660             authResult = AuthResult.deny(opType.toString(), "Covering cell set",
1661               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1662           }
1663           logResult(authResult);
1664           if (authorizationEnabled && !authResult.isAllowed()) {
1665             throw new AccessDeniedException("Insufficient permissions "
1666               + authResult.toContextString());
1667           }
1668         }
1669       }
1670     }
1671   }
1672 
1673   @Override
1674   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1675       final Delete delete, final WALEdit edit, final Durability durability)
1676       throws IOException {
1677     if (aclRegion) {
1678       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1679     }
1680   }
1681 
1682   @Override
1683   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1684       final byte [] row, final byte [] family, final byte [] qualifier,
1685       final CompareFilter.CompareOp compareOp,
1686       final ByteArrayComparable comparator, final Put put,
1687       final boolean result) throws IOException {
1688     User user = getActiveUser();
1689     checkForReservedTagPresence(user, put);
1690 
1691     // Require READ and WRITE permissions on the table, CF, and KV to update
1692     RegionCoprocessorEnvironment env = c.getEnvironment();
1693     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1694     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1695       Action.READ, Action.WRITE);
1696     logResult(authResult);
1697     if (!authResult.isAllowed()) {
1698       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1699         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1700       } else if (authorizationEnabled) {
1701         throw new AccessDeniedException("Insufficient permissions " +
1702           authResult.toContextString());
1703       }
1704     }
1705 
1706     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1707     if (bytes != null) {
1708       if (cellFeaturesEnabled) {
1709         addCellPermissions(bytes, put.getFamilyCellMap());
1710       } else {
1711         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1712       }
1713     }
1714     return result;
1715   }
1716 
1717   @Override
1718   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1719       final byte[] row, final byte[] family, final byte[] qualifier,
1720       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1721       final boolean result) throws IOException {
1722     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1723       // We had failure with table, cf and q perm checks and now giving a chance for cell
1724       // perm check
1725       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1726       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1727       AuthResult authResult = null;
1728       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1729           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1730         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1731             getActiveUser(), Action.READ, table, families);
1732       } else {
1733         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1734             getActiveUser(), Action.READ, table, families);
1735       }
1736       logResult(authResult);
1737       if (authorizationEnabled && !authResult.isAllowed()) {
1738         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1739       }
1740     }
1741     return result;
1742   }
1743 
1744   @Override
1745   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1746       final byte [] row, final byte [] family, final byte [] qualifier,
1747       final CompareFilter.CompareOp compareOp,
1748       final ByteArrayComparable comparator, final Delete delete,
1749       final boolean result) throws IOException {
1750     // An ACL on a delete is useless, we shouldn't allow it
1751     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1752       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1753           delete.toString());
1754     }
1755     // Require READ and WRITE permissions on the table, CF, and the KV covered
1756     // by the delete
1757     RegionCoprocessorEnvironment env = c.getEnvironment();
1758     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1759     User user = getActiveUser();
1760     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1761       Action.READ, Action.WRITE);
1762     logResult(authResult);
1763     if (!authResult.isAllowed()) {
1764       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1765         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1766       } else if (authorizationEnabled) {
1767         throw new AccessDeniedException("Insufficient permissions " +
1768           authResult.toContextString());
1769       }
1770     }
1771     return result;
1772   }
1773 
1774   @Override
1775   public boolean preCheckAndDeleteAfterRowLock(
1776       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1777       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1778       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1779       throws IOException {
1780     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1781       // We had failure with table, cf and q perm checks and now giving a chance for cell
1782       // perm check
1783       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1784       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1785       AuthResult authResult = null;
1786       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1787           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1788         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1789             getActiveUser(), Action.READ, table, families);
1790       } else {
1791         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1792             getActiveUser(), Action.READ, table, families);
1793       }
1794       logResult(authResult);
1795       if (authorizationEnabled && !authResult.isAllowed()) {
1796         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1797       }
1798     }
1799     return result;
1800   }
1801 
1802   @Override
1803   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1804       final byte [] row, final byte [] family, final byte [] qualifier,
1805       final long amount, final boolean writeToWAL)
1806       throws IOException {
1807     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1808     // incremented value
1809     RegionCoprocessorEnvironment env = c.getEnvironment();
1810     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1811     User user = getActiveUser();
1812     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1813       Action.WRITE);
1814     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1815       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1816         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1817       authResult.setReason("Covering cell set");
1818     }
1819     logResult(authResult);
1820     if (authorizationEnabled && !authResult.isAllowed()) {
1821       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1822     }
1823     return -1;
1824   }
1825 
1826   @Override
1827   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1828       throws IOException {
1829     User user = getActiveUser();
1830     checkForReservedTagPresence(user, append);
1831 
1832     // Require WRITE permission to the table, CF, and the KV to be appended
1833     RegionCoprocessorEnvironment env = c.getEnvironment();
1834     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1835     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1836     logResult(authResult);
1837     if (!authResult.isAllowed()) {
1838       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1839         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1840       } else if (authorizationEnabled)  {
1841         throw new AccessDeniedException("Insufficient permissions " +
1842           authResult.toContextString());
1843       }
1844     }
1845 
1846     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1847     if (bytes != null) {
1848       if (cellFeaturesEnabled) {
1849         addCellPermissions(bytes, append.getFamilyCellMap());
1850       } else {
1851         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1852       }
1853     }
1854 
1855     return null;
1856   }
1857 
1858   @Override
1859   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1860       final Append append) throws IOException {
1861     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1862       // We had failure with table, cf and q perm checks and now giving a chance for cell
1863       // perm check
1864       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1865       AuthResult authResult = null;
1866       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1867           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1868         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1869             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1870       } else {
1871         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1872             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1873       }
1874       logResult(authResult);
1875       if (authorizationEnabled && !authResult.isAllowed()) {
1876         throw new AccessDeniedException("Insufficient permissions " +
1877           authResult.toContextString());
1878       }
1879     }
1880     return null;
1881   }
1882 
1883   @Override
1884   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1885       final Increment increment)
1886       throws IOException {
1887     User user = getActiveUser();
1888     checkForReservedTagPresence(user, increment);
1889 
1890     // Require WRITE permission to the table, CF, and the KV to be replaced by
1891     // the incremented value
1892     RegionCoprocessorEnvironment env = c.getEnvironment();
1893     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1894     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1895       Action.WRITE);
1896     logResult(authResult);
1897     if (!authResult.isAllowed()) {
1898       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1899         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1900       } else if (authorizationEnabled) {
1901         throw new AccessDeniedException("Insufficient permissions " +
1902           authResult.toContextString());
1903       }
1904     }
1905 
1906     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1907     if (bytes != null) {
1908       if (cellFeaturesEnabled) {
1909         addCellPermissions(bytes, increment.getFamilyCellMap());
1910       } else {
1911         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1912       }
1913     }
1914 
1915     return null;
1916   }
1917 
1918   @Override
1919   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1920       final Increment increment) throws IOException {
1921     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1922       // We had failure with table, cf and q perm checks and now giving a chance for cell
1923       // perm check
1924       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1925       AuthResult authResult = null;
1926       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1927           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1928         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1929             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1930       } else {
1931         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1932             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1933       }
1934       logResult(authResult);
1935       if (authorizationEnabled && !authResult.isAllowed()) {
1936         throw new AccessDeniedException("Insufficient permissions " +
1937           authResult.toContextString());
1938       }
1939     }
1940     return null;
1941   }
1942 
1943   @Override
1944   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1945       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1946     // If the HFile version is insufficient to persist tags, we won't have any
1947     // work to do here
1948     if (!cellFeaturesEnabled) {
1949       return newCell;
1950     }
1951 
1952     // Collect any ACLs from the old cell
1953     List<Tag> tags = Lists.newArrayList();
1954     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1955     if (oldCell != null) {
1956       // Save an object allocation where we can
1957       if (oldCell.getTagsLength() > 0) {
1958         Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell.getTagsArray(),
1959           oldCell.getTagsOffset(), oldCell.getTagsLength());
1960         while (tagIterator.hasNext()) {
1961           Tag tag = tagIterator.next();
1962           if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
1963             // Not an ACL tag, just carry it through
1964             if (LOG.isTraceEnabled()) {
1965               LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
1966                 " length " + tag.getTagLength());
1967             }
1968             tags.add(tag);
1969           } else {
1970             // Merge the perms from the older ACL into the current permission set
1971             ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
1972               AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
1973                 tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
1974             perms.putAll(kvPerms);
1975           }
1976         }
1977       }
1978     }
1979 
1980     // Do we have an ACL on the operation?
1981     byte[] aclBytes = mutation.getACL();
1982     if (aclBytes != null) {
1983       // Yes, use it
1984       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
1985     } else {
1986       // No, use what we carried forward
1987       if (perms != null) {
1988         // TODO: If we collected ACLs from more than one tag we may have a
1989         // List<Permission> of size > 1, this can be collapsed into a single
1990         // Permission
1991         if (LOG.isTraceEnabled()) {
1992           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1993         }
1994         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
1995           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
1996       }
1997     }
1998 
1999     // If we have no tags to add, just return
2000     if (tags.isEmpty()) {
2001       return newCell;
2002     }
2003 
2004     Cell rewriteCell = new TagRewriteCell(newCell, Tag.fromList(tags));
2005     return rewriteCell;
2006   }
2007 
2008   @Override
2009   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2010       final Scan scan, final RegionScanner s) throws IOException {
2011     internalPreRead(c, scan, OpType.SCAN);
2012     return s;
2013   }
2014 
2015   @Override
2016   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2017       final Scan scan, final RegionScanner s) throws IOException {
2018     User user = getActiveUser();
2019     if (user != null && user.getShortName() != null) {
2020       // store reference to scanner owner for later checks
2021       scannerOwners.put(s, user.getShortName());
2022     }
2023     return s;
2024   }
2025 
2026   @Override
2027   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
2028       final InternalScanner s, final List<Result> result,
2029       final int limit, final boolean hasNext) throws IOException {
2030     requireScannerOwner(s);
2031     return hasNext;
2032   }
2033 
2034   @Override
2035   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2036       final InternalScanner s) throws IOException {
2037     requireScannerOwner(s);
2038   }
2039 
2040   @Override
2041   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2042       final InternalScanner s) throws IOException {
2043     // clean up any associated owner mapping
2044     scannerOwners.remove(s);
2045   }
2046 
2047   @Override
2048   public boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> e,
2049       final InternalScanner s, final Cell curRowCell, final boolean hasMore) throws IOException {
2050     // Impl in BaseRegionObserver might do unnecessary copy for Off heap backed Cells.
2051     return hasMore;
2052   }
2053 
2054   /**
2055    * Verify, when servicing an RPC, that the caller is the scanner owner.
2056    * If so, we assume that access control is correctly enforced based on
2057    * the checks performed in preScannerOpen()
2058    */
2059   private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
2060     if (!RpcServer.isInRpcCallContext())
2061       return;
2062     String requestUserName = RpcServer.getRequestUserName();
2063     String owner = scannerOwners.get(s);
2064     if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
2065       throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
2066     }
2067   }
2068 
2069   /**
2070    * Verifies user has CREATE privileges on
2071    * the Column Families involved in the bulkLoadHFile
2072    * request. Specific Column Write privileges are presently
2073    * ignored.
2074    */
2075   @Override
2076   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
2077       List<Pair<byte[], String>> familyPaths) throws IOException {
2078     for(Pair<byte[],String> el : familyPaths) {
2079       requirePermission("preBulkLoadHFile",
2080           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
2081           el.getFirst(),
2082           null,
2083           Action.CREATE);
2084     }
2085   }
2086 
2087   /**
2088    * Authorization check for
2089    * SecureBulkLoadProtocol.prepareBulkLoad()
2090    * @param ctx the context
2091    * @param request the request
2092    * @throws IOException
2093    */
2094   @Override
2095   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2096                                  PrepareBulkLoadRequest request) throws IOException {
2097     requireAccess("prePareBulkLoad",
2098         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2099   }
2100 
2101   /**
2102    * Authorization security check for
2103    * SecureBulkLoadProtocol.cleanupBulkLoad()
2104    * @param ctx the context
2105    * @param request the request
2106    * @throws IOException
2107    */
2108   @Override
2109   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2110                                  CleanupBulkLoadRequest request) throws IOException {
2111     requireAccess("preCleanupBulkLoad",
2112         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2113   }
2114 
2115   /* ---- EndpointObserver implementation ---- */
2116 
2117   @Override
2118   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2119       Service service, String methodName, Message request) throws IOException {
2120     // Don't intercept calls to our own AccessControlService, we check for
2121     // appropriate permissions in the service handlers
2122     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2123       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
2124         methodName + ")",
2125         getTableName(ctx.getEnvironment()), null, null,
2126         Action.EXEC);
2127     }
2128     return request;
2129   }
2130 
2131   @Override
2132   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2133       Service service, String methodName, Message request, Message.Builder responseBuilder)
2134       throws IOException { }
2135 
2136   /* ---- Protobuf AccessControlService implementation ---- */
2137 
2138   @Override
2139   public void grant(RpcController controller,
2140                     AccessControlProtos.GrantRequest request,
2141                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2142     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2143     AccessControlProtos.GrantResponse response = null;
2144     try {
2145       // verify it's only running at .acl.
2146       if (aclRegion) {
2147         if (!initialized) {
2148           throw new CoprocessorException("AccessController not yet initialized");
2149         }
2150         if (LOG.isDebugEnabled()) {
2151           LOG.debug("Received request to grant access permission " + perm.toString());
2152         }
2153 
2154         switch(request.getUserPermission().getPermission().getType()) {
2155           case Global :
2156           case Table :
2157             requirePermission("grant", perm.getTableName(), perm.getFamily(),
2158               perm.getQualifier(), Action.ADMIN);
2159             break;
2160           case Namespace :
2161             requireGlobalPermission("grant", Action.ADMIN, perm.getNamespace());
2162            break;
2163         }
2164 
2165         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2166           @Override
2167           public Void run() throws Exception {
2168             AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2169             return null;
2170           }
2171         });
2172 
2173         if (AUDITLOG.isTraceEnabled()) {
2174           // audit log should store permission changes in addition to auth results
2175           AUDITLOG.trace("Granted permission " + perm.toString());
2176         }
2177       } else {
2178         throw new CoprocessorException(AccessController.class, "This method "
2179             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2180       }
2181       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2182     } catch (IOException ioe) {
2183       // pass exception back up
2184       ResponseConverter.setControllerException(controller, ioe);
2185     }
2186     done.run(response);
2187   }
2188 
2189   @Override
2190   public void revoke(RpcController controller,
2191                      AccessControlProtos.RevokeRequest request,
2192                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2193     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2194     AccessControlProtos.RevokeResponse response = null;
2195     try {
2196       // only allowed to be called on _acl_ region
2197       if (aclRegion) {
2198         if (!initialized) {
2199           throw new CoprocessorException("AccessController not yet initialized");
2200         }
2201         if (LOG.isDebugEnabled()) {
2202           LOG.debug("Received request to revoke access permission " + perm.toString());
2203         }
2204 
2205         switch(request.getUserPermission().getPermission().getType()) {
2206           case Global :
2207           case Table :
2208             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2209               perm.getQualifier(), Action.ADMIN);
2210             break;
2211           case Namespace :
2212             requireGlobalPermission("revoke", Action.ADMIN, perm.getNamespace());
2213             break;
2214         }
2215 
2216         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2217           @Override
2218           public Void run() throws Exception {
2219             AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2220             return null;
2221           }
2222         });
2223 
2224         if (AUDITLOG.isTraceEnabled()) {
2225           // audit log should record all permission changes
2226           AUDITLOG.trace("Revoked permission " + perm.toString());
2227         }
2228       } else {
2229         throw new CoprocessorException(AccessController.class, "This method "
2230             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2231       }
2232       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2233     } catch (IOException ioe) {
2234       // pass exception back up
2235       ResponseConverter.setControllerException(controller, ioe);
2236     }
2237     done.run(response);
2238   }
2239 
2240   @Override
2241   public void getUserPermissions(RpcController controller,
2242                                  AccessControlProtos.GetUserPermissionsRequest request,
2243                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2244     AccessControlProtos.GetUserPermissionsResponse response = null;
2245     try {
2246       // only allowed to be called on _acl_ region
2247       if (aclRegion) {
2248         if (!initialized) {
2249           throw new CoprocessorException("AccessController not yet initialized");
2250         }
2251         List<UserPermission> perms = null;
2252         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2253           final TableName table = request.hasTableName() ?
2254             ProtobufUtil.toTableName(request.getTableName()) : null;
2255           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2256           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2257             @Override
2258             public List<UserPermission> run() throws Exception {
2259               return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2260             }
2261           });
2262         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2263           final String namespace = request.getNamespaceName().toStringUtf8();
2264           requireNamespacePermission("userPermissions", namespace, Action.ADMIN);
2265           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2266             @Override
2267             public List<UserPermission> run() throws Exception {
2268               return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2269                 namespace);
2270             }
2271           });
2272         } else {
2273           requirePermission("userPermissions", Action.ADMIN);
2274           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2275             @Override
2276             public List<UserPermission> run() throws Exception {
2277               return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2278             }
2279           });
2280         }
2281         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2282       } else {
2283         throw new CoprocessorException(AccessController.class, "This method "
2284             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2285       }
2286     } catch (IOException ioe) {
2287       // pass exception back up
2288       ResponseConverter.setControllerException(controller, ioe);
2289     }
2290     done.run(response);
2291   }
2292 
2293   @Override
2294   public void checkPermissions(RpcController controller,
2295                                AccessControlProtos.CheckPermissionsRequest request,
2296                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2297     Permission[] permissions = new Permission[request.getPermissionCount()];
2298     for (int i=0; i < request.getPermissionCount(); i++) {
2299       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2300     }
2301     AccessControlProtos.CheckPermissionsResponse response = null;
2302     try {
2303       User user = getActiveUser();
2304       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2305       for (Permission permission : permissions) {
2306         if (permission instanceof TablePermission) {
2307           // Check table permissions
2308 
2309           TablePermission tperm = (TablePermission) permission;
2310           for (Action action : permission.getActions()) {
2311             if (!tperm.getTableName().equals(tableName)) {
2312               throw new CoprocessorException(AccessController.class, String.format("This method "
2313                   + "can only execute at the table specified in TablePermission. " +
2314                   "Table of the region:%s , requested table:%s", tableName,
2315                   tperm.getTableName()));
2316             }
2317 
2318             Map<byte[], Set<byte[]>> familyMap =
2319                 new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2320             if (tperm.getFamily() != null) {
2321               if (tperm.getQualifier() != null) {
2322                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2323                 qualifiers.add(tperm.getQualifier());
2324                 familyMap.put(tperm.getFamily(), qualifiers);
2325               } else {
2326                 familyMap.put(tperm.getFamily(), null);
2327               }
2328             }
2329 
2330             AuthResult result = permissionGranted("checkPermissions", user, action, regionEnv,
2331               familyMap);
2332             logResult(result);
2333             if (!result.isAllowed()) {
2334               // Even if passive we need to throw an exception here, we support checking
2335               // effective permissions, so throw unconditionally
2336               throw new AccessDeniedException("Insufficient permissions (table=" + tableName +
2337                 (familyMap.size() > 0 ? ", family: " + result.toFamilyString() : "") +
2338                 ", action=" + action.toString() + ")");
2339             }
2340           }
2341 
2342         } else {
2343           // Check global permissions
2344 
2345           for (Action action : permission.getActions()) {
2346             AuthResult result;
2347             if (authManager.authorize(user, action)) {
2348               result = AuthResult.allow("checkPermissions", "Global action allowed", user,
2349                 action, null, null);
2350             } else {
2351               result = AuthResult.deny("checkPermissions", "Global action denied", user, action,
2352                 null, null);
2353             }
2354             logResult(result);
2355             if (!result.isAllowed()) {
2356               // Even if passive we need to throw an exception here, we support checking
2357               // effective permissions, so throw unconditionally
2358               throw new AccessDeniedException("Insufficient permissions (action=" +
2359                 action.toString() + ")");
2360             }
2361           }
2362         }
2363       }
2364       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2365     } catch (IOException ioe) {
2366       ResponseConverter.setControllerException(controller, ioe);
2367     }
2368     done.run(response);
2369   }
2370 
2371   @Override
2372   public Service getService() {
2373     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2374   }
2375 
2376   private Region getRegion(RegionCoprocessorEnvironment e) {
2377     return e.getRegion();
2378   }
2379 
2380   private TableName getTableName(RegionCoprocessorEnvironment e) {
2381     Region region = e.getRegion();
2382     if (region != null) {
2383       return getTableName(region);
2384     }
2385     return null;
2386   }
2387 
2388   private TableName getTableName(Region region) {
2389     HRegionInfo regionInfo = region.getRegionInfo();
2390     if (regionInfo != null) {
2391       return regionInfo.getTable();
2392     }
2393     return null;
2394   }
2395 
2396   @Override
2397   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2398       throws IOException {
2399     requirePermission("preClose", Action.ADMIN);
2400   }
2401 
2402   private void checkSystemOrSuperUser() throws IOException {
2403     // No need to check if we're not going to throw
2404     if (!authorizationEnabled) {
2405       return;
2406     }
2407     User activeUser = getActiveUser();
2408     if (!Superusers.isSuperUser(activeUser)) {
2409       throw new AccessDeniedException("User '" + (activeUser != null ?
2410         activeUser.getShortName() : "null") + "is not system or super user.");
2411     }
2412   }
2413 
2414   @Override
2415   public void preStopRegionServer(
2416       ObserverContext<RegionServerCoprocessorEnvironment> env)
2417       throws IOException {
2418     requirePermission("preStopRegionServer", Action.ADMIN);
2419   }
2420 
2421   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2422       byte[] qualifier) {
2423     if (family == null) {
2424       return null;
2425     }
2426 
2427     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2428     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2429     return familyMap;
2430   }
2431 
2432   @Override
2433   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2434        List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2435        String regex) throws IOException {
2436     // We are delegating the authorization check to postGetTableDescriptors as we don't have
2437     // any concrete set of table names when a regex is present or the full list is requested.
2438     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2439       // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2440       // request can be granted.
2441       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2442       for (TableName tableName: tableNamesList) {
2443         // Skip checks for a table that does not exist
2444         if (!masterServices.getTableStateManager().isTablePresent(tableName))
2445           continue;
2446         requirePermission("getTableDescriptors", tableName, null, null,
2447             Action.ADMIN, Action.CREATE);
2448       }
2449     }
2450   }
2451 
2452   @Override
2453   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2454       List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2455       String regex) throws IOException {
2456     // Skipping as checks in this case are already done by preGetTableDescriptors.
2457     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2458       return;
2459     }
2460 
2461     // Retains only those which passes authorization checks, as the checks weren't done as part
2462     // of preGetTableDescriptors.
2463     Iterator<HTableDescriptor> itr = descriptors.iterator();
2464     while (itr.hasNext()) {
2465       HTableDescriptor htd = itr.next();
2466       try {
2467         requirePermission("getTableDescriptors", htd.getTableName(), null, null,
2468             Action.ADMIN, Action.CREATE);
2469       } catch (AccessDeniedException e) {
2470         itr.remove();
2471       }
2472     }
2473   }
2474 
2475   @Override
2476   public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2477       List<HTableDescriptor> descriptors, String regex) throws IOException {
2478     // Retains only those which passes authorization checks.
2479     Iterator<HTableDescriptor> itr = descriptors.iterator();
2480     while (itr.hasNext()) {
2481       HTableDescriptor htd = itr.next();
2482       try {
2483         requireAccess("getTableNames", htd.getTableName(), Action.values());
2484       } catch (AccessDeniedException e) {
2485         itr.remove();
2486       }
2487     }
2488   }
2489 
2490   @Override
2491   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA,
2492       Region regionB) throws IOException {
2493     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2494       Action.ADMIN);
2495   }
2496 
2497   @Override
2498   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, Region regionA,
2499       Region regionB, Region mergedRegion) throws IOException { }
2500 
2501   @Override
2502   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2503       Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException { }
2504 
2505   @Override
2506   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2507       Region regionA, Region regionB, Region mergedRegion) throws IOException { }
2508 
2509   @Override
2510   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2511       Region regionA, Region regionB) throws IOException { }
2512 
2513   @Override
2514   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2515       Region regionA, Region regionB) throws IOException { }
2516 
2517   @Override
2518   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2519       throws IOException {
2520     requirePermission("preRollLogWriterRequest", Permission.Action.ADMIN);
2521   }
2522 
2523   @Override
2524   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2525       throws IOException { }
2526 
2527   @Override
2528   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2529       final String userName, final Quotas quotas) throws IOException {
2530     requirePermission("setUserQuota", Action.ADMIN);
2531   }
2532 
2533   @Override
2534   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2535       final String userName, final TableName tableName, final Quotas quotas) throws IOException {
2536     requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN);
2537   }
2538 
2539   @Override
2540   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2541       final String userName, final String namespace, final Quotas quotas) throws IOException {
2542     requirePermission("setUserNamespaceQuota", Action.ADMIN);
2543   }
2544 
2545   @Override
2546   public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2547       final TableName tableName, final Quotas quotas) throws IOException {
2548     requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
2549   }
2550 
2551   @Override
2552   public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2553       final String namespace, final Quotas quotas) throws IOException {
2554     requirePermission("setNamespaceQuota", Action.ADMIN);
2555   }
2556 
2557   @Override
2558   public ReplicationEndpoint postCreateReplicationEndPoint(
2559       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2560     return endpoint;
2561   }
2562 
2563   @Override
2564   public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2565       List<WALEntry> entries, CellScanner cells) throws IOException {
2566     requirePermission("replicateLogEntries", Action.WRITE);
2567   }
2568 
2569   @Override
2570   public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2571       List<WALEntry> entries, CellScanner cells) throws IOException {
2572   }
2573 }