View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.security.access;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.security.PrivilegedExceptionAction;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Map.Entry;
30  import java.util.Set;
31  import java.util.TreeMap;
32  import java.util.TreeSet;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.CellScanner;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.CompoundConfiguration;
41  import org.apache.hadoop.hbase.CoprocessorEnvironment;
42  import org.apache.hadoop.hbase.DoNotRetryIOException;
43  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
44  import org.apache.hadoop.hbase.HColumnDescriptor;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.HRegionInfo;
47  import org.apache.hadoop.hbase.HTableDescriptor;
48  import org.apache.hadoop.hbase.KeyValue;
49  import org.apache.hadoop.hbase.KeyValue.Type;
50  import org.apache.hadoop.hbase.MetaTableAccessor;
51  import org.apache.hadoop.hbase.NamespaceDescriptor;
52  import org.apache.hadoop.hbase.ProcedureInfo;
53  import org.apache.hadoop.hbase.ServerName;
54  import org.apache.hadoop.hbase.TableName;
55  import org.apache.hadoop.hbase.Tag;
56  import org.apache.hadoop.hbase.TagRewriteCell;
57  import org.apache.hadoop.hbase.classification.InterfaceAudience;
58  import org.apache.hadoop.hbase.client.Append;
59  import org.apache.hadoop.hbase.client.Delete;
60  import org.apache.hadoop.hbase.client.Durability;
61  import org.apache.hadoop.hbase.client.Get;
62  import org.apache.hadoop.hbase.client.Increment;
63  import org.apache.hadoop.hbase.client.Mutation;
64  import org.apache.hadoop.hbase.client.Put;
65  import org.apache.hadoop.hbase.client.Query;
66  import org.apache.hadoop.hbase.client.Result;
67  import org.apache.hadoop.hbase.client.Scan;
68  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
69  import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
70  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
71  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
72  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
73  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
74  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
75  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
76  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
77  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
78  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
79  import org.apache.hadoop.hbase.filter.CompareFilter;
80  import org.apache.hadoop.hbase.filter.Filter;
81  import org.apache.hadoop.hbase.filter.FilterList;
82  import org.apache.hadoop.hbase.io.hfile.HFile;
83  import org.apache.hadoop.hbase.ipc.RpcServer;
84  import org.apache.hadoop.hbase.master.MasterServices;
85  import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
86  import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
87  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
88  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
89  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
90  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
91  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
92  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
93  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
94  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
95  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
96  import org.apache.hadoop.hbase.regionserver.InternalScanner;
97  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
98  import org.apache.hadoop.hbase.regionserver.Region;
99  import org.apache.hadoop.hbase.regionserver.RegionScanner;
100 import org.apache.hadoop.hbase.regionserver.ScanType;
101 import org.apache.hadoop.hbase.regionserver.ScannerContext;
102 import org.apache.hadoop.hbase.regionserver.Store;
103 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
104 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
105 import org.apache.hadoop.hbase.security.AccessDeniedException;
106 import org.apache.hadoop.hbase.security.Superusers;
107 import org.apache.hadoop.hbase.security.User;
108 import org.apache.hadoop.hbase.security.UserProvider;
109 import org.apache.hadoop.hbase.security.access.Permission.Action;
110 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
111 import org.apache.hadoop.hbase.util.ByteRange;
112 import org.apache.hadoop.hbase.util.Bytes;
113 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
114 import org.apache.hadoop.hbase.util.Pair;
115 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
116 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
117 
118 import com.google.common.collect.ArrayListMultimap;
119 import com.google.common.collect.ImmutableSet;
120 import com.google.common.collect.ListMultimap;
121 import com.google.common.collect.Lists;
122 import com.google.common.collect.MapMaker;
123 import com.google.common.collect.Maps;
124 import com.google.common.collect.Sets;
125 import com.google.protobuf.Message;
126 import com.google.protobuf.RpcCallback;
127 import com.google.protobuf.RpcController;
128 import com.google.protobuf.Service;
129 
130 /**
131  * Provides basic authorization checks for data access and administrative
132  * operations.
133  *
134  * <p>
135  * {@code AccessController} performs authorization checks for HBase operations
136  * based on:
137  * <ul>
138  *   <li>the identity of the user performing the operation</li>
139  *   <li>the scope over which the operation is performed, in increasing
140  *   specificity: global, table, column family, or qualifier</li>
141  *   <li>the type of action being performed (as mapped to
142  *   {@link Permission.Action} values)</li>
143  * </ul>
144  * If the authorization check fails, an {@link AccessDeniedException}
145  * will be thrown for the operation.
146  * </p>
147  *
148  * <p>
149  * To perform authorization checks, {@code AccessController} relies on the
150  * RpcServerEngine being loaded to provide
151  * the user identities for remote requests.
152  * </p>
153  *
154  * <p>
155  * The access control lists used for authorization can be manipulated via the
156  * exposed {@link AccessControlService} Interface implementation, and the associated
157  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
158  * commands.
159  * </p>
160  */
161 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
162 public class AccessController extends BaseMasterAndRegionObserver
163     implements RegionServerObserver,
164       AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
165 
166   public static final Log LOG = LogFactory.getLog(AccessController.class);
167 
168   private static final Log AUDITLOG =
169     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
170   private static final String CHECK_COVERING_PERM = "check_covering_perm";
171   private static final String TAG_CHECK_PASSED = "tag_check_passed";
172   private static final byte[] TRUE = Bytes.toBytes(true);
173 
174   TableAuthManager authManager = null;
175 
176   /** flags if we are running on a region of the _acl_ table */
177   boolean aclRegion = false;
178 
179   /** defined only for Endpoint implementation, so it can have way to
180    access region services */
181   private RegionCoprocessorEnvironment regionEnv;
182 
183   /** Mapping of scanner instances to the user who created them */
184   private Map<InternalScanner,String> scannerOwners =
185       new MapMaker().weakKeys().makeMap();
186 
187   private Map<TableName, List<UserPermission>> tableAcls;
188 
189   /** Provider for mapping principal names to Users */
190   private UserProvider userProvider;
191 
192   /** if we are active, usually true, only not true if "hbase.security.authorization"
193    has been set to false in site configuration */
194   boolean authorizationEnabled;
195 
196   /** if we are able to support cell ACLs */
197   boolean cellFeaturesEnabled;
198 
199   /** if we should check EXEC permissions */
200   boolean shouldCheckExecPermission;
201 
202   /** if we should terminate access checks early as soon as table or CF grants
203     allow access; pre-0.98 compatible behavior */
204   boolean compatibleEarlyTermination;
205 
206   /** if we have been successfully initialized */
207   private volatile boolean initialized = false;
208 
209   /** if the ACL table is available, only relevant in the master */
210   private volatile boolean aclTabAvailable = false;
211 
212   public Region getRegion() {
213     return regionEnv != null ? regionEnv.getRegion() : null;
214   }
215 
216   public TableAuthManager getAuthManager() {
217     return authManager;
218   }
219 
220   void initialize(RegionCoprocessorEnvironment e) throws IOException {
221     final Region region = e.getRegion();
222     Configuration conf = e.getConfiguration();
223     Map<byte[], ListMultimap<String,TablePermission>> tables =
224         AccessControlLists.loadAll(region);
225     // For each table, write out the table's permissions to the respective
226     // znode for that table.
227     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
228       tables.entrySet()) {
229       byte[] entry = t.getKey();
230       ListMultimap<String,TablePermission> perms = t.getValue();
231       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
232       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
233     }
234     initialized = true;
235   }
236 
237   /**
238    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
239    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
240    * table updates.
241    */
242   void updateACL(RegionCoprocessorEnvironment e,
243       final Map<byte[], List<Cell>> familyMap) {
244     Set<byte[]> entries =
245         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
246     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
247       List<Cell> cells = f.getValue();
248       for (Cell cell: cells) {
249         if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
250             cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
251             AccessControlLists.ACL_LIST_FAMILY.length)) {
252           entries.add(CellUtil.cloneRow(cell));
253         }
254       }
255     }
256     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
257     Configuration conf = regionEnv.getConfiguration();
258     for (byte[] entry: entries) {
259       try {
260         ListMultimap<String,TablePermission> perms =
261           AccessControlLists.getPermissions(conf, entry);
262         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
263         zkw.writeToZookeeper(entry, serialized);
264       } catch (IOException ex) {
265         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
266             ex);
267       }
268     }
269   }
270 
271   /**
272    * Check the current user for authorization to perform a specific action
273    * against the given set of row data.
274    *
275    * <p>Note: Ordering of the authorization checks
276    * has been carefully optimized to short-circuit the most common requests
277    * and minimize the amount of processing required.</p>
278    *
279    * @param permRequest the action being requested
280    * @param e the coprocessor environment
281    * @param families the map of column families to qualifiers present in
282    * the request
283    * @return an authorization result
284    */
285   AuthResult permissionGranted(String request, User user, Action permRequest,
286       RegionCoprocessorEnvironment e,
287       Map<byte [], ? extends Collection<?>> families) {
288     HRegionInfo hri = e.getRegion().getRegionInfo();
289     TableName tableName = hri.getTable();
290 
291     // 1. All users need read access to hbase:meta table.
292     // this is a very common operation, so deal with it quickly.
293     if (hri.isMetaRegion()) {
294       if (permRequest == Action.READ) {
295         return AuthResult.allow(request, "All users allowed", user,
296           permRequest, tableName, families);
297       }
298     }
299 
300     if (user == null) {
301       return AuthResult.deny(request, "No user associated with request!", null,
302         permRequest, tableName, families);
303     }
304 
305     // 2. check for the table-level, if successful we can short-circuit
306     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
307       return AuthResult.allow(request, "Table permission granted", user,
308         permRequest, tableName, families);
309     }
310 
311     // 3. check permissions against the requested families
312     if (families != null && families.size() > 0) {
313       // all families must pass
314       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
315         // a) check for family level access
316         if (authManager.authorize(user, tableName, family.getKey(),
317             permRequest)) {
318           continue;  // family-level permission overrides per-qualifier
319         }
320 
321         // b) qualifier level access can still succeed
322         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
323           if (family.getValue() instanceof Set) {
324             // for each qualifier of the family
325             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
326             for (byte[] qualifier : familySet) {
327               if (!authManager.authorize(user, tableName, family.getKey(),
328                                          qualifier, permRequest)) {
329                 return AuthResult.deny(request, "Failed qualifier check", user,
330                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
331               }
332             }
333           } else if (family.getValue() instanceof List) { // List<KeyValue>
334             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
335             for (KeyValue kv : kvList) {
336               if (!authManager.authorize(user, tableName, family.getKey(),
337                       kv.getQualifier(), permRequest)) {
338                 return AuthResult.deny(request, "Failed qualifier check", user,
339                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
340               }
341             }
342           }
343         } else {
344           // no qualifiers and family-level check already failed
345           return AuthResult.deny(request, "Failed family check", user, permRequest,
346               tableName, makeFamilyMap(family.getKey(), null));
347         }
348       }
349 
350       // all family checks passed
351       return AuthResult.allow(request, "All family checks passed", user, permRequest,
352           tableName, families);
353     }
354 
355     // 4. no families to check and table level access failed
356     return AuthResult.deny(request, "No families to check and table permission failed",
357         user, permRequest, tableName, families);
358   }
359 
360   /**
361    * Check the current user for authorization to perform a specific action
362    * against the given set of row data.
363    * @param opType the operation type
364    * @param user the user
365    * @param e the coprocessor environment
366    * @param families the map of column families to qualifiers present in
367    * the request
368    * @param actions the desired actions
369    * @return an authorization result
370    */
371   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
372       Map<byte [], ? extends Collection<?>> families, Action... actions) {
373     AuthResult result = null;
374     for (Action action: actions) {
375       result = permissionGranted(opType.toString(), user, action, e, families);
376       if (!result.isAllowed()) {
377         return result;
378       }
379     }
380     return result;
381   }
382 
383   private void logResult(AuthResult result) {
384     if (AUDITLOG.isTraceEnabled()) {
385       InetAddress remoteAddr = RpcServer.getRemoteAddress();
386       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
387           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
388           "; reason: " + result.getReason() +
389           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
390           "; request: " + result.getRequest() +
391           "; context: " + result.toContextString());
392     }
393   }
394 
395   /**
396    * Returns the active user to which authorization checks should be applied.
397    * If we are in the context of an RPC call, the remote user is used,
398    * otherwise the currently logged in user is used.
399    */
400   private User getActiveUser() throws IOException {
401     User user = RpcServer.getRequestUser();
402     if (user == null) {
403       // for non-rpc handling, fallback to system user
404       user = userProvider.getCurrent();
405     }
406     return user;
407   }
408 
409   /**
410    * Authorizes that the current user has any of the given permissions for the
411    * given table, column family and column qualifier.
412    * @param tableName Table requested
413    * @param family Column family requested
414    * @param qualifier Column qualifier requested
415    * @throws IOException if obtaining the current user fails
416    * @throws AccessDeniedException if user has no authorization
417    */
418   private void requirePermission(String request, TableName tableName, byte[] family,
419       byte[] qualifier, Action... permissions) throws IOException {
420     User user = getActiveUser();
421     AuthResult result = null;
422 
423     for (Action permission : permissions) {
424       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
425         result = AuthResult.allow(request, "Table permission granted", user,
426                                   permission, tableName, family, qualifier);
427         break;
428       } else {
429         // rest of the world
430         result = AuthResult.deny(request, "Insufficient permissions", user,
431                                  permission, tableName, family, qualifier);
432       }
433     }
434     logResult(result);
435     if (authorizationEnabled && !result.isAllowed()) {
436       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
437     }
438   }
439 
440   /**
441    * Authorizes that the current user has any of the given permissions for the
442    * given table, column family and column qualifier.
443    * @param tableName Table requested
444    * @param family Column family param
445    * @param qualifier Column qualifier param
446    * @throws IOException if obtaining the current user fails
447    * @throws AccessDeniedException if user has no authorization
448    */
449   private void requireTablePermission(String request, TableName tableName, byte[] family,
450       byte[] qualifier, Action... permissions) throws IOException {
451     User user = getActiveUser();
452     AuthResult result = null;
453 
454     for (Action permission : permissions) {
455       if (authManager.authorize(user, tableName, null, null, permission)) {
456         result = AuthResult.allow(request, "Table permission granted", user,
457             permission, tableName, null, null);
458         result.getParams().setFamily(family).setQualifier(qualifier);
459         break;
460       } else {
461         // rest of the world
462         result = AuthResult.deny(request, "Insufficient permissions", user,
463             permission, tableName, family, qualifier);
464         result.getParams().setFamily(family).setQualifier(qualifier);
465       }
466     }
467     logResult(result);
468     if (authorizationEnabled && !result.isAllowed()) {
469       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
470     }
471   }
472 
473   /**
474    * Authorizes that the current user has any of the given permissions to access the table.
475    *
476    * @param tableName Table requested
477    * @param permissions Actions being requested
478    * @throws IOException if obtaining the current user fails
479    * @throws AccessDeniedException if user has no authorization
480    */
481   private void requireAccess(String request, TableName tableName,
482       Action... permissions) throws IOException {
483     User user = getActiveUser();
484     AuthResult result = null;
485 
486     for (Action permission : permissions) {
487       if (authManager.hasAccess(user, tableName, permission)) {
488         result = AuthResult.allow(request, "Table permission granted", user,
489                                   permission, tableName, null, null);
490         break;
491       } else {
492         // rest of the world
493         result = AuthResult.deny(request, "Insufficient permissions", user,
494                                  permission, tableName, null, null);
495       }
496     }
497     logResult(result);
498     if (authorizationEnabled && !result.isAllowed()) {
499       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
500     }
501   }
502 
503   /**
504    * Authorizes that the current user has global privileges for the given action.
505    * @param perm The action being requested
506    * @throws IOException if obtaining the current user fails
507    * @throws AccessDeniedException if authorization is denied
508    */
509   private void requirePermission(String request, Action perm) throws IOException {
510     requireGlobalPermission(request, perm, null, null);
511   }
512 
513   /**
514    * Checks that the user has the given global permission. The generated
515    * audit log message will contain context information for the operation
516    * being authorized, based on the given parameters.
517    * @param perm Action being requested
518    * @param tableName Affected table name.
519    * @param familyMap Affected column families.
520    */
521   private void requireGlobalPermission(String request, Action perm, TableName tableName,
522       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
523     User user = getActiveUser();
524     AuthResult result = null;
525     if (authManager.authorize(user, perm)) {
526       result = AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap);
527       result.getParams().setTableName(tableName).setFamilies(familyMap);
528       logResult(result);
529     } else {
530       result = AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap);
531       result.getParams().setTableName(tableName).setFamilies(familyMap);
532       logResult(result);
533       if (authorizationEnabled) {
534         throw new AccessDeniedException("Insufficient permissions for user '" +
535           (user != null ? user.getShortName() : "null") +"' (global, action=" +
536           perm.toString() + ")");
537       }
538     }
539   }
540 
541   /**
542    * Checks that the user has the given global permission. The generated
543    * audit log message will contain context information for the operation
544    * being authorized, based on the given parameters.
545    * @param perm Action being requested
546    * @param namespace
547    */
548   private void requireGlobalPermission(String request, Action perm,
549                                        String namespace) throws IOException {
550     User user = getActiveUser();
551     AuthResult authResult = null;
552     if (authManager.authorize(user, perm)) {
553       authResult = AuthResult.allow(request, "Global check allowed", user, perm, null);
554       authResult.getParams().setNamespace(namespace);
555       logResult(authResult);
556     } else {
557       authResult = AuthResult.deny(request, "Global check failed", user, perm, null);
558       authResult.getParams().setNamespace(namespace);
559       logResult(authResult);
560       if (authorizationEnabled) {
561         throw new AccessDeniedException("Insufficient permissions for user '" +
562           (user != null ? user.getShortName() : "null") +"' (global, action=" +
563           perm.toString() + ")");
564       }
565     }
566   }
567 
568   /**
569    * Checks that the user has the given global or namespace permission.
570    * @param namespace
571    * @param permissions Actions being requested
572    */
573   public void requireNamespacePermission(String request, String namespace,
574       Action... permissions) throws IOException {
575     User user = getActiveUser();
576     AuthResult result = null;
577 
578     for (Action permission : permissions) {
579       if (authManager.authorize(user, namespace, permission)) {
580         result = AuthResult.allow(request, "Namespace permission granted",
581             user, permission, namespace);
582         break;
583       } else {
584         // rest of the world
585         result = AuthResult.deny(request, "Insufficient permissions", user,
586             permission, namespace);
587       }
588     }
589     logResult(result);
590     if (authorizationEnabled && !result.isAllowed()) {
591       throw new AccessDeniedException("Insufficient permissions "
592           + result.toContextString());
593     }
594   }
595 
596   /**
597    * Checks that the user has the given global or namespace permission.
598    * @param namespace
599    * @param permissions Actions being requested
600    */
601   public void requireNamespacePermission(String request, String namespace, TableName tableName,
602       Map<byte[], ? extends Collection<byte[]>> familyMap, Action... permissions)
603       throws IOException {
604     User user = getActiveUser();
605     AuthResult result = null;
606 
607     for (Action permission : permissions) {
608       if (authManager.authorize(user, namespace, permission)) {
609         result = AuthResult.allow(request, "Namespace permission granted",
610             user, permission, namespace);
611         result.getParams().setTableName(tableName).setFamilies(familyMap);
612         break;
613       } else {
614         // rest of the world
615         result = AuthResult.deny(request, "Insufficient permissions", user,
616             permission, namespace);
617         result.getParams().setTableName(tableName).setFamilies(familyMap);
618       }
619     }
620     logResult(result);
621     if (authorizationEnabled && !result.isAllowed()) {
622       throw new AccessDeniedException("Insufficient permissions "
623           + result.toContextString());
624     }
625   }
626 
627   /**
628    * Returns <code>true</code> if the current user is allowed the given action
629    * over at least one of the column qualifiers in the given column families.
630    */
631   private boolean hasFamilyQualifierPermission(User user,
632       Action perm,
633       RegionCoprocessorEnvironment env,
634       Map<byte[], ? extends Collection<byte[]>> familyMap)
635     throws IOException {
636     HRegionInfo hri = env.getRegion().getRegionInfo();
637     TableName tableName = hri.getTable();
638 
639     if (user == null) {
640       return false;
641     }
642 
643     if (familyMap != null && familyMap.size() > 0) {
644       // at least one family must be allowed
645       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
646           familyMap.entrySet()) {
647         if (family.getValue() != null && !family.getValue().isEmpty()) {
648           for (byte[] qualifier : family.getValue()) {
649             if (authManager.matchPermission(user, tableName,
650                 family.getKey(), qualifier, perm)) {
651               return true;
652             }
653           }
654         } else {
655           if (authManager.matchPermission(user, tableName, family.getKey(),
656               perm)) {
657             return true;
658           }
659         }
660       }
661     } else if (LOG.isDebugEnabled()) {
662       LOG.debug("Empty family map passed for permission check");
663     }
664 
665     return false;
666   }
667 
668   private enum OpType {
669     GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
670     GET("get"),
671     EXISTS("exists"),
672     SCAN("scan"),
673     PUT("put"),
674     DELETE("delete"),
675     CHECK_AND_PUT("checkAndPut"),
676     CHECK_AND_DELETE("checkAndDelete"),
677     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
678     APPEND("append"),
679     INCREMENT("increment");
680 
681     private String type;
682 
683     private OpType(String type) {
684       this.type = type;
685     }
686 
687     @Override
688     public String toString() {
689       return type;
690     }
691   }
692 
693   /**
694    * Determine if cell ACLs covered by the operation grant access. This is expensive.
695    * @return false if cell ACLs failed to grant access, true otherwise
696    * @throws IOException
697    */
698   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
699       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
700       throws IOException {
701     if (!cellFeaturesEnabled) {
702       return false;
703     }
704     long cellGrants = 0;
705     User user = getActiveUser();
706     long latestCellTs = 0;
707     Get get = new Get(row);
708     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
709     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
710     // version. We have to get every cell version and check its TS against the TS asked for in
711     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
712     // consider only one such passing cell. In case of Delete we have to consider all the cell
713     // versions under this passing version. When Delete Mutation contains columns which are a
714     // version delete just consider only one version for those column cells.
715     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
716     if (considerCellTs) {
717       get.setMaxVersions();
718     } else {
719       get.setMaxVersions(1);
720     }
721     boolean diffCellTsFromOpTs = false;
722     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
723       byte[] col = entry.getKey();
724       // TODO: HBASE-7114 could possibly unify the collection type in family
725       // maps so we would not need to do this
726       if (entry.getValue() instanceof Set) {
727         Set<byte[]> set = (Set<byte[]>)entry.getValue();
728         if (set == null || set.isEmpty()) {
729           get.addFamily(col);
730         } else {
731           for (byte[] qual: set) {
732             get.addColumn(col, qual);
733           }
734         }
735       } else if (entry.getValue() instanceof List) {
736         List<Cell> list = (List<Cell>)entry.getValue();
737         if (list == null || list.isEmpty()) {
738           get.addFamily(col);
739         } else {
740           // In case of family delete, a Cell will be added into the list with Qualifier as null.
741           for (Cell cell : list) {
742             if (cell.getQualifierLength() == 0
743                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
744                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
745               get.addFamily(col);
746             } else {
747               get.addColumn(col, CellUtil.cloneQualifier(cell));
748             }
749             if (considerCellTs) {
750               long cellTs = cell.getTimestamp();
751               latestCellTs = Math.max(latestCellTs, cellTs);
752               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
753             }
754           }
755         }
756       } else if (entry.getValue() == null) {
757         get.addFamily(col);        
758       } else {
759         throw new RuntimeException("Unhandled collection type " +
760           entry.getValue().getClass().getName());
761       }
762     }
763     // We want to avoid looking into the future. So, if the cells of the
764     // operation specify a timestamp, or the operation itself specifies a
765     // timestamp, then we use the maximum ts found. Otherwise, we bound
766     // the Get to the current server time. We add 1 to the timerange since
767     // the upper bound of a timerange is exclusive yet we need to examine
768     // any cells found there inclusively.
769     long latestTs = Math.max(opTs, latestCellTs);
770     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
771       latestTs = EnvironmentEdgeManager.currentTime();
772     }
773     get.setTimeRange(0, latestTs + 1);
774     // In case of Put operation we set to read all versions. This was done to consider the case
775     // where columns are added with TS other than the Mutation TS. But normally this wont be the
776     // case with Put. There no need to get all versions but get latest version only.
777     if (!diffCellTsFromOpTs && request == OpType.PUT) {
778       get.setMaxVersions(1);
779     }
780     if (LOG.isTraceEnabled()) {
781       LOG.trace("Scanning for cells with " + get);
782     }
783     // This Map is identical to familyMap. The key is a BR rather than byte[].
784     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
785     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
786     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
787     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
788       if (entry.getValue() instanceof List) {
789         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
790       }
791     }
792     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
793     List<Cell> cells = Lists.newArrayList();
794     Cell prevCell = null;
795     ByteRange curFam = new SimpleMutableByteRange();
796     boolean curColAllVersions = (request == OpType.DELETE);
797     long curColCheckTs = opTs;
798     boolean foundColumn = false;
799     try {
800       boolean more = false;
801       ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
802 
803       do {
804         cells.clear();
805         // scan with limit as 1 to hold down memory use on wide rows
806         more = scanner.next(cells, scannerContext);
807         for (Cell cell: cells) {
808           if (LOG.isTraceEnabled()) {
809             LOG.trace("Found cell " + cell);
810           }
811           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
812           if (colChange) foundColumn = false;
813           prevCell = cell;
814           if (!curColAllVersions && foundColumn) {
815             continue;
816           }
817           if (colChange && considerCellTs) {
818             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
819             List<Cell> cols = familyMap1.get(curFam);
820             for (Cell col : cols) {
821               // null/empty qualifier is used to denote a Family delete. The TS and delete type
822               // associated with this is applicable for all columns within the family. That is
823               // why the below (col.getQualifierLength() == 0) check.
824               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
825                   || CellUtil.matchingQualifier(cell, col)) {
826                 byte type = col.getTypeByte();
827                 if (considerCellTs) {
828                   curColCheckTs = col.getTimestamp();
829                 }
830                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
831                 // a version delete for a column no need to check all the covering cells within
832                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
833                 // One version delete types are Delete/DeleteFamilyVersion
834                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
835                     || (KeyValue.Type.DeleteFamily.getCode() == type);
836                 break;
837               }
838             }
839           }
840           if (cell.getTimestamp() > curColCheckTs) {
841             // Just ignore this cell. This is not a covering cell.
842             continue;
843           }
844           foundColumn = true;
845           for (Action action: actions) {
846             // Are there permissions for this user for the cell?
847             if (!authManager.authorize(user, getTableName(e), cell, action)) {
848               // We can stop if the cell ACL denies access
849               return false;
850             }
851           }
852           cellGrants++;
853         }
854       } while (more);
855     } catch (AccessDeniedException ex) {
856       throw ex;
857     } catch (IOException ex) {
858       LOG.error("Exception while getting cells to calculate covering permission", ex);
859     } finally {
860       scanner.close();
861     }
862     // We should not authorize unless we have found one or more cell ACLs that
863     // grant access. This code is used to check for additional permissions
864     // after no table or CF grants are found.
865     return cellGrants > 0;
866   }
867 
868   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
869     // Iterate over the entries in the familyMap, replacing the cells therein
870     // with new cells including the ACL data
871     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
872       List<Cell> newCells = Lists.newArrayList();
873       for (Cell cell: e.getValue()) {
874         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
875         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
876         if (cell.getTagsLength() > 0) {
877           Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
878             cell.getTagsOffset(), cell.getTagsLength());
879           while (tagIterator.hasNext()) {
880             tags.add(tagIterator.next());
881           }
882         }
883         newCells.add(new TagRewriteCell(cell, Tag.fromList(tags)));
884       }
885       // This is supposed to be safe, won't CME
886       e.setValue(newCells);
887     }
888   }
889 
890   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
891   // type is reserved and should not be explicitly set by user.
892   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
893     // No need to check if we're not going to throw
894     if (!authorizationEnabled) {
895       m.setAttribute(TAG_CHECK_PASSED, TRUE);
896       return;
897     }
898     // Superusers are allowed to store cells unconditionally.
899     if (Superusers.isSuperUser(user)) {
900       m.setAttribute(TAG_CHECK_PASSED, TRUE);
901       return;
902     }
903     // We already checked (prePut vs preBatchMutation)
904     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
905       return;
906     }
907     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
908       Cell cell = cellScanner.current();
909       if (cell.getTagsLength() > 0) {
910         Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
911           cell.getTagsLength());
912         while (tagsItr.hasNext()) {
913           if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
914             throw new AccessDeniedException("Mutation contains cell with reserved type tag");
915           }
916         }
917       }
918     }
919     m.setAttribute(TAG_CHECK_PASSED, TRUE);
920   }
921 
922   /* ---- MasterObserver implementation ---- */
923   @Override
924   public void start(CoprocessorEnvironment env) throws IOException {
925     CompoundConfiguration conf = new CompoundConfiguration();
926     conf.add(env.getConfiguration());
927 
928     authorizationEnabled = conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
929     if (!authorizationEnabled) {
930       LOG.warn("The AccessController has been loaded with authorization checks disabled.");
931     }
932 
933     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
934       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
935 
936     cellFeaturesEnabled = HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS;
937     if (!cellFeaturesEnabled) {
938       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
939           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
940           + " accordingly.");
941     }
942 
943     ZooKeeperWatcher zk = null;
944     if (env instanceof MasterCoprocessorEnvironment) {
945       // if running on HMaster
946       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
947       zk = mEnv.getMasterServices().getZooKeeper();
948     } else if (env instanceof RegionServerCoprocessorEnvironment) {
949       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
950       zk = rsEnv.getRegionServerServices().getZooKeeper();
951     } else if (env instanceof RegionCoprocessorEnvironment) {
952       // if running at region
953       regionEnv = (RegionCoprocessorEnvironment) env;
954       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
955       zk = regionEnv.getRegionServerServices().getZooKeeper();
956       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
957         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
958     }
959 
960     // set the user-provider.
961     this.userProvider = UserProvider.instantiate(env.getConfiguration());
962 
963     // If zk is null or IOException while obtaining auth manager,
964     // throw RuntimeException so that the coprocessor is unloaded.
965     if (zk != null) {
966       try {
967         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
968       } catch (IOException ioe) {
969         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
970       }
971     } else {
972       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
973     }
974 
975     tableAcls = new MapMaker().weakValues().makeMap();
976   }
977 
978   @Override
979   public void stop(CoprocessorEnvironment env) {
980 
981   }
982 
983   @Override
984   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
985       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
986     Set<byte[]> families = desc.getFamiliesKeys();
987     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
988     for (byte[] family: families) {
989       familyMap.put(family, null);
990     }
991     requireNamespacePermission("createTable", desc.getTableName().getNamespaceAsString(),
992         desc.getTableName(), familyMap, Action.CREATE);
993   }
994 
995   @Override
996   public void postCreateTableHandler(final ObserverContext<MasterCoprocessorEnvironment> c,
997       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
998     // When AC is used, it should be configured as the 1st CP.
999     // In Master, the table operations like create, are handled by a Thread pool but the max size
1000     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
1001     // sequentially only.
1002     // Related code in HMaster#startServiceThreads
1003     // {code}
1004     //   // We depend on there being only one instance of this executor running
1005     //   // at a time. To do concurrency, would need fencing of enable/disable of
1006     //   // tables.
1007     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1008     // {code}
1009     // In future if we change this pool to have more threads, then there is a chance for thread,
1010     // creating acl table, getting delayed and by that time another table creation got over and
1011     // this hook is getting called. In such a case, we will need a wait logic here which will
1012     // wait till the acl table is created.
1013     if (AccessControlLists.isAclTable(desc)) {
1014       this.aclTabAvailable = true;
1015     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
1016       if (!aclTabAvailable) {
1017         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
1018             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
1019             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
1020       } else {
1021         String owner = desc.getOwnerString();
1022         // default the table owner to current user, if not specified.
1023         if (owner == null)
1024           owner = getActiveUser().getShortName();
1025         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1026             desc.getTableName(), null, Action.values());
1027         // switch to the real hbase master user for doing the RPC on the ACL table
1028         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1029           @Override
1030           public Void run() throws Exception {
1031             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
1032                 userperm);
1033             return null;
1034           }
1035         });
1036       }
1037     }
1038   }
1039 
1040   @Override
1041   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1042       throws IOException {
1043     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1044   }
1045 
1046   @Override
1047   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
1048       final TableName tableName) throws IOException {
1049     final Configuration conf = c.getEnvironment().getConfiguration();
1050     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1051       @Override
1052       public Void run() throws Exception {
1053         AccessControlLists.removeTablePermissions(conf, tableName);
1054         return null;
1055       }
1056     });
1057     this.authManager.getZKPermissionWatcher().deleteTableACLNode(tableName);
1058   }
1059 
1060   @Override
1061   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
1062       final TableName tableName) throws IOException {
1063     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1064 
1065     final Configuration conf = c.getEnvironment().getConfiguration();
1066     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1067       @Override
1068       public Void run() throws Exception {
1069         List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
1070         if (acls != null) {
1071           tableAcls.put(tableName, acls);
1072         }
1073         return null;
1074       }
1075     });
1076   }
1077 
1078   @Override
1079   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
1080       final TableName tableName) throws IOException {
1081     final Configuration conf = ctx.getEnvironment().getConfiguration();
1082     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1083       @Override
1084       public Void run() throws Exception {
1085         List<UserPermission> perms = tableAcls.get(tableName);
1086         if (perms != null) {
1087           for (UserPermission perm : perms) {
1088             AccessControlLists.addUserPermission(conf, perm);
1089           }
1090         }
1091         tableAcls.remove(tableName);
1092         return null;
1093       }
1094     });
1095   }
1096 
1097   @Override
1098   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1099       HTableDescriptor htd) throws IOException {
1100     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1101   }
1102 
1103   @Override
1104   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
1105       TableName tableName, final HTableDescriptor htd) throws IOException {
1106     final Configuration conf = c.getEnvironment().getConfiguration();
1107     // default the table owner to current user, if not specified.
1108     final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
1109       getActiveUser().getShortName();
1110     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1111       @Override
1112       public Void run() throws Exception {
1113         UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1114           htd.getTableName(), null, Action.values());
1115         AccessControlLists.addUserPermission(conf, userperm);
1116         return null;
1117       }
1118     });
1119   }
1120 
1121   @Override
1122   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1123       HColumnDescriptor column) throws IOException {
1124     requireTablePermission("addColumn", tableName, column.getName(), null, Action.ADMIN,
1125         Action.CREATE);
1126   }
1127 
1128   @Override
1129   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1130       HColumnDescriptor descriptor) throws IOException {
1131     requirePermission("modifyColumn", tableName, descriptor.getName(), null, Action.ADMIN,
1132       Action.CREATE);
1133   }
1134 
1135   @Override
1136   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1137       byte[] col) throws IOException {
1138     requirePermission("deleteColumn", tableName, col, null, Action.ADMIN, Action.CREATE);
1139   }
1140 
1141   @Override
1142   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
1143       final TableName tableName, final byte[] col) throws IOException {
1144     final Configuration conf = c.getEnvironment().getConfiguration();
1145     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1146       @Override
1147       public Void run() throws Exception {
1148         AccessControlLists.removeTablePermissions(conf, tableName, col);
1149         return null;
1150       }
1151     });
1152   }
1153 
1154   @Override
1155   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1156       throws IOException {
1157     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1158   }
1159 
1160   @Override
1161   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1162       throws IOException {
1163     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1164       // We have to unconditionally disallow disable of the ACL table when we are installed,
1165       // even if not enforcing authorizations. We are still allowing grants and revocations,
1166       // checking permissions and logging audit messages, etc. If the ACL table is not
1167       // available we will fail random actions all over the place.
1168       throw new AccessDeniedException("Not allowed to disable "
1169           + AccessControlLists.ACL_TABLE_NAME + " table with AccessController installed");
1170     }
1171     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1172   }
1173 
1174   @Override
1175   public void preAbortProcedure(
1176       ObserverContext<MasterCoprocessorEnvironment> ctx,
1177       final ProcedureExecutor<MasterProcedureEnv> procEnv,
1178       final long procId) throws IOException {
1179     if (!procEnv.isProcedureOwner(procId, getActiveUser())) {
1180       // If the user is not the procedure owner, then we should further probe whether
1181       // he can abort the procedure.
1182       requirePermission("abortProcedure", Action.ADMIN);
1183     }
1184   }
1185 
1186   @Override
1187   public void postAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx)
1188       throws IOException {
1189     // There is nothing to do at this time after the procedure abort request was sent.
1190   }
1191 
1192   @Override
1193   public void preListProcedures(ObserverContext<MasterCoprocessorEnvironment> ctx)
1194       throws IOException {
1195     // We are delegating the authorization check to postListProcedures as we don't have
1196     // any concrete set of procedures to work with
1197   }
1198 
1199   @Override
1200   public void postListProcedures(
1201       ObserverContext<MasterCoprocessorEnvironment> ctx,
1202       List<ProcedureInfo> procInfoList) throws IOException {
1203     if (procInfoList.isEmpty()) {
1204       return;
1205     }
1206 
1207     // Retains only those which passes authorization checks, as the checks weren't done as part
1208     // of preListProcedures.
1209     Iterator<ProcedureInfo> itr = procInfoList.iterator();
1210     User user = getActiveUser();
1211     while (itr.hasNext()) {
1212       ProcedureInfo procInfo = itr.next();
1213       try {
1214         if (!ProcedureInfo.isProcedureOwner(procInfo, user)) {
1215           // If the user is not the procedure owner, then we should further probe whether
1216           // he can see the procedure.
1217           requirePermission("listProcedures", Action.ADMIN);
1218         }
1219       } catch (AccessDeniedException e) {
1220         itr.remove();
1221       }
1222     }
1223   }
1224 
1225   @Override
1226   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1227       ServerName srcServer, ServerName destServer) throws IOException {
1228     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1229   }
1230 
1231   @Override
1232   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1233       throws IOException {
1234     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1235   }
1236 
1237   @Override
1238   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1239       boolean force) throws IOException {
1240     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1241   }
1242 
1243   @Override
1244   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1245       HRegionInfo regionInfo) throws IOException {
1246     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1247   }
1248 
1249   @Override
1250   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1251       throws IOException {
1252     requirePermission("balance", Action.ADMIN);
1253   }
1254 
1255   @Override
1256   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1257       boolean newValue) throws IOException {
1258     requirePermission("balanceSwitch", Action.ADMIN);
1259     return newValue;
1260   }
1261 
1262   @Override
1263   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1264       throws IOException {
1265     requirePermission("shutdown", Action.ADMIN);
1266   }
1267 
1268   @Override
1269   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1270       throws IOException {
1271     requirePermission("stopMaster", Action.ADMIN);
1272   }
1273 
1274   @Override
1275   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1276       throws IOException {
1277     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1278       .getConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1279       // initialize the ACL storage table
1280       AccessControlLists.createACLTable(ctx.getEnvironment().getMasterServices());
1281     } else {
1282       aclTabAvailable = true;
1283     }
1284   }
1285 
1286   @Override
1287   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1288       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1289       throws IOException {
1290     requirePermission("snapshot", hTableDescriptor.getTableName(), null, null,
1291       Permission.Action.ADMIN);
1292   }
1293 
1294   @Override
1295   public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1296       final SnapshotDescription snapshot) throws IOException {
1297     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1298       // list it, if user is the owner of snapshot
1299     } else {
1300       requirePermission("listSnapshot", Action.ADMIN);
1301     }
1302   }
1303   
1304   @Override
1305   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1306       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1307       throws IOException {
1308     requirePermission("clone", Action.ADMIN);
1309   }
1310 
1311   @Override
1312   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1313       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1314       throws IOException {
1315     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1316       requirePermission("restoreSnapshot", hTableDescriptor.getTableName(), null, null,
1317         Permission.Action.ADMIN);
1318     } else {
1319       requirePermission("restore", Action.ADMIN);
1320     }
1321   }
1322 
1323   @Override
1324   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1325       final SnapshotDescription snapshot) throws IOException {
1326     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1327       // Snapshot owner is allowed to delete the snapshot
1328       // TODO: We are not logging this for audit
1329     } else {
1330       requirePermission("deleteSnapshot", Action.ADMIN);
1331     }
1332   }
1333 
1334   @Override
1335   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1336       NamespaceDescriptor ns) throws IOException {
1337     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1338   }
1339 
1340   @Override
1341   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1342       throws IOException {
1343     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1344   }
1345 
1346   @Override
1347   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1348       final String namespace) throws IOException {
1349     final Configuration conf = ctx.getEnvironment().getConfiguration();
1350     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1351       @Override
1352       public Void run() throws Exception {
1353         AccessControlLists.removeNamespacePermissions(conf, namespace);
1354         return null;
1355       }
1356     });
1357     this.authManager.getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
1358     LOG.info(namespace + " entry deleted in "+AccessControlLists.ACL_TABLE_NAME+" table.");
1359   }
1360 
1361   @Override
1362   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1363       NamespaceDescriptor ns) throws IOException {
1364     // We require only global permission so that 
1365     // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1366     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1367   }
1368 
1369   @Override
1370   public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1371       throws IOException {
1372     requireNamespacePermission("getNamespaceDescriptor", namespace, Action.ADMIN);
1373   }
1374 
1375   @Override
1376   public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1377       List<NamespaceDescriptor> descriptors) throws IOException {
1378     // Retains only those which passes authorization checks, as the checks weren't done as part
1379     // of preGetTableDescriptors.
1380     Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1381     while (itr.hasNext()) {
1382       NamespaceDescriptor desc = itr.next();
1383       try {
1384         requireNamespacePermission("listNamespaces", desc.getName(), Action.ADMIN);
1385       } catch (AccessDeniedException e) {
1386         itr.remove();
1387       }
1388     }
1389   }
1390 
1391   @Override
1392   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1393       final TableName tableName) throws IOException {
1394     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1395   }
1396 
1397   /* ---- RegionObserver implementation ---- */
1398 
1399   @Override
1400   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1401       throws IOException {
1402     RegionCoprocessorEnvironment env = e.getEnvironment();
1403     final Region region = env.getRegion();
1404     if (region == null) {
1405       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1406     } else {
1407       HRegionInfo regionInfo = region.getRegionInfo();
1408       if (regionInfo.getTable().isSystemTable()) {
1409         checkSystemOrSuperUser();
1410       } else {
1411         requirePermission("preOpen", Action.ADMIN);
1412       }
1413     }
1414   }
1415 
1416   @Override
1417   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1418     RegionCoprocessorEnvironment env = c.getEnvironment();
1419     final Region region = env.getRegion();
1420     if (region == null) {
1421       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1422       return;
1423     }
1424     if (AccessControlLists.isAclRegion(region)) {
1425       aclRegion = true;
1426       // When this region is under recovering state, initialize will be handled by postLogReplay
1427       if (!region.isRecovering()) {
1428         try {
1429           initialize(env);
1430         } catch (IOException ex) {
1431           // if we can't obtain permissions, it's better to fail
1432           // than perform checks incorrectly
1433           throw new RuntimeException("Failed to initialize permissions cache", ex);
1434         }
1435       }
1436     } else {
1437       initialized = true;
1438     }
1439   }
1440 
1441   @Override
1442   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1443     if (aclRegion) {
1444       try {
1445         initialize(c.getEnvironment());
1446       } catch (IOException ex) {
1447         // if we can't obtain permissions, it's better to fail
1448         // than perform checks incorrectly
1449         throw new RuntimeException("Failed to initialize permissions cache", ex);
1450       }
1451     }
1452   }
1453 
1454   @Override
1455   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1456     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1457         Action.CREATE);
1458   }
1459 
1460   @Override
1461   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1462     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1463   }
1464 
1465   @Override
1466   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1467       byte[] splitRow) throws IOException {
1468     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1469   }
1470 
1471   @Override
1472   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1473       final Store store, final InternalScanner scanner, final ScanType scanType)
1474           throws IOException {
1475     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1476         Action.CREATE);
1477     return scanner;
1478   }
1479 
1480   @Override
1481   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1482       final byte [] row, final byte [] family, final Result result)
1483       throws IOException {
1484     assert family != null;
1485     RegionCoprocessorEnvironment env = c.getEnvironment();
1486     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, null);
1487     User user = getActiveUser();
1488     AuthResult authResult = permissionGranted(OpType.GET_CLOSEST_ROW_BEFORE, user, env, families,
1489       Action.READ);
1490     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1491       authResult.setAllowed(checkCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, env, row,
1492         families, HConstants.LATEST_TIMESTAMP, Action.READ));
1493       authResult.setReason("Covering cell set");
1494     }
1495     logResult(authResult);
1496     if (authorizationEnabled && !authResult.isAllowed()) {
1497       throw new AccessDeniedException("Insufficient permissions " +
1498         authResult.toContextString());
1499     }
1500   }
1501 
1502   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1503       final Query query, OpType opType) throws IOException {
1504     Filter filter = query.getFilter();
1505     // Don't wrap an AccessControlFilter
1506     if (filter != null && filter instanceof AccessControlFilter) {
1507       return;
1508     }
1509     User user = getActiveUser();
1510     RegionCoprocessorEnvironment env = c.getEnvironment();
1511     Map<byte[],? extends Collection<byte[]>> families = null;
1512     switch (opType) {
1513     case GET:
1514     case EXISTS:
1515       families = ((Get)query).getFamilyMap();
1516       break;
1517     case SCAN:
1518       families = ((Scan)query).getFamilyMap();
1519       break;
1520     default:
1521       throw new RuntimeException("Unhandled operation " + opType);
1522     }
1523     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1524     Region region = getRegion(env);
1525     TableName table = getTableName(region);
1526     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1527     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1528       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1529     }
1530     if (!authResult.isAllowed()) {
1531       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1532         // Old behavior: Scan with only qualifier checks if we have partial
1533         // permission. Backwards compatible behavior is to throw an
1534         // AccessDeniedException immediately if there are no grants for table
1535         // or CF or CF+qual. Only proceed with an injected filter if there are
1536         // grants for qualifiers. Otherwise we will fall through below and log
1537         // the result and throw an ADE. We may end up checking qualifier
1538         // grants three times (permissionGranted above, here, and in the
1539         // filter) but that's the price of backwards compatibility.
1540         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1541           authResult.setAllowed(true);
1542           authResult.setReason("Access allowed with filter");
1543           // Only wrap the filter if we are enforcing authorizations
1544           if (authorizationEnabled) {
1545             Filter ourFilter = new AccessControlFilter(authManager, user, table,
1546               AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1547               cfVsMaxVersions);
1548             // wrap any existing filter
1549             if (filter != null) {
1550               ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1551                 Lists.newArrayList(ourFilter, filter));
1552             }
1553             switch (opType) {
1554               case GET:
1555               case EXISTS:
1556                 ((Get)query).setFilter(ourFilter);
1557                 break;
1558               case SCAN:
1559                 ((Scan)query).setFilter(ourFilter);
1560                 break;
1561               default:
1562                 throw new RuntimeException("Unhandled operation " + opType);
1563             }
1564           }
1565         }
1566       } else {
1567         // New behavior: Any access we might be granted is more fine-grained
1568         // than whole table or CF. Simply inject a filter and return what is
1569         // allowed. We will not throw an AccessDeniedException. This is a
1570         // behavioral change since 0.96.
1571         authResult.setAllowed(true);
1572         authResult.setReason("Access allowed with filter");
1573         // Only wrap the filter if we are enforcing authorizations
1574         if (authorizationEnabled) {
1575           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1576             AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1577           // wrap any existing filter
1578           if (filter != null) {
1579             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1580               Lists.newArrayList(ourFilter, filter));
1581           }
1582           switch (opType) {
1583             case GET:
1584             case EXISTS:
1585               ((Get)query).setFilter(ourFilter);
1586               break;
1587             case SCAN:
1588               ((Scan)query).setFilter(ourFilter);
1589               break;
1590             default:
1591               throw new RuntimeException("Unhandled operation " + opType);
1592           }
1593         }
1594       }
1595     }
1596 
1597     logResult(authResult);
1598     if (authorizationEnabled && !authResult.isAllowed()) {
1599       throw new AccessDeniedException("Insufficient permissions for user '"
1600           + (user != null ? user.getShortName() : "null")
1601           + "' (table=" + table + ", action=READ)");
1602     }
1603   }
1604 
1605   @Override
1606   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1607       final Get get, final List<Cell> result) throws IOException {
1608     internalPreRead(c, get, OpType.GET);
1609   }
1610 
1611   @Override
1612   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1613       final Get get, final boolean exists) throws IOException {
1614     internalPreRead(c, get, OpType.EXISTS);
1615     return exists;
1616   }
1617 
1618   @Override
1619   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1620       final Put put, final WALEdit edit, final Durability durability)
1621       throws IOException {
1622     User user = getActiveUser();
1623     checkForReservedTagPresence(user, put);
1624 
1625     // Require WRITE permission to the table, CF, or top visible value, if any.
1626     // NOTE: We don't need to check the permissions for any earlier Puts
1627     // because we treat the ACLs in each Put as timestamped like any other
1628     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1629     // change the ACL of any previous Put. This allows simple evolution of
1630     // security policy over time without requiring expensive updates.
1631     RegionCoprocessorEnvironment env = c.getEnvironment();
1632     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1633     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1634     logResult(authResult);
1635     if (!authResult.isAllowed()) {
1636       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1637         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1638       } else if (authorizationEnabled) {
1639         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1640       }
1641     }
1642 
1643     // Add cell ACLs from the operation to the cells themselves
1644     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1645     if (bytes != null) {
1646       if (cellFeaturesEnabled) {
1647         addCellPermissions(bytes, put.getFamilyCellMap());
1648       } else {
1649         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1650       }
1651     }
1652   }
1653 
1654   @Override
1655   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1656       final Put put, final WALEdit edit, final Durability durability) {
1657     if (aclRegion) {
1658       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1659     }
1660   }
1661 
1662   @Override
1663   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1664       final Delete delete, final WALEdit edit, final Durability durability)
1665       throws IOException {
1666     // An ACL on a delete is useless, we shouldn't allow it
1667     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1668       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1669     }
1670     // Require WRITE permissions on all cells covered by the delete. Unlike
1671     // for Puts we need to check all visible prior versions, because a major
1672     // compaction could remove them. If the user doesn't have permission to
1673     // overwrite any of the visible versions ('visible' defined as not covered
1674     // by a tombstone already) then we have to disallow this operation.
1675     RegionCoprocessorEnvironment env = c.getEnvironment();
1676     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1677     User user = getActiveUser();
1678     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1679     logResult(authResult);
1680     if (!authResult.isAllowed()) {
1681       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1682         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1683       } else if (authorizationEnabled) {
1684         throw new AccessDeniedException("Insufficient permissions " +
1685           authResult.toContextString());
1686       }
1687     }
1688   }
1689 
1690   @Override
1691   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1692       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1693     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1694       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1695       for (int i = 0; i < miniBatchOp.size(); i++) {
1696         Mutation m = miniBatchOp.getOperation(i);
1697         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1698           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1699           // perm check
1700           OpType opType;
1701           if (m instanceof Put) {
1702             checkForReservedTagPresence(getActiveUser(), m);
1703             opType = OpType.PUT;
1704           } else {
1705             opType = OpType.DELETE;
1706           }
1707           AuthResult authResult = null;
1708           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(),
1709             m.getFamilyCellMap(), m.getTimeStamp(), Action.WRITE)) {
1710             authResult = AuthResult.allow(opType.toString(), "Covering cell set",
1711               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1712           } else {
1713             authResult = AuthResult.deny(opType.toString(), "Covering cell set",
1714               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1715           }
1716           logResult(authResult);
1717           if (authorizationEnabled && !authResult.isAllowed()) {
1718             throw new AccessDeniedException("Insufficient permissions "
1719               + authResult.toContextString());
1720           }
1721         }
1722       }
1723     }
1724   }
1725 
1726   @Override
1727   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1728       final Delete delete, final WALEdit edit, final Durability durability)
1729       throws IOException {
1730     if (aclRegion) {
1731       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1732     }
1733   }
1734 
1735   @Override
1736   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1737       final byte [] row, final byte [] family, final byte [] qualifier,
1738       final CompareFilter.CompareOp compareOp,
1739       final ByteArrayComparable comparator, final Put put,
1740       final boolean result) throws IOException {
1741     User user = getActiveUser();
1742     checkForReservedTagPresence(user, put);
1743 
1744     // Require READ and WRITE permissions on the table, CF, and KV to update
1745     RegionCoprocessorEnvironment env = c.getEnvironment();
1746     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1747     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1748       Action.READ, Action.WRITE);
1749     logResult(authResult);
1750     if (!authResult.isAllowed()) {
1751       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1752         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1753       } else if (authorizationEnabled) {
1754         throw new AccessDeniedException("Insufficient permissions " +
1755           authResult.toContextString());
1756       }
1757     }
1758 
1759     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1760     if (bytes != null) {
1761       if (cellFeaturesEnabled) {
1762         addCellPermissions(bytes, put.getFamilyCellMap());
1763       } else {
1764         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1765       }
1766     }
1767     return result;
1768   }
1769 
1770   @Override
1771   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1772       final byte[] row, final byte[] family, final byte[] qualifier,
1773       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1774       final boolean result) throws IOException {
1775     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1776       // We had failure with table, cf and q perm checks and now giving a chance for cell
1777       // perm check
1778       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1779       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1780       AuthResult authResult = null;
1781       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1782           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1783         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1784             getActiveUser(), Action.READ, table, families);
1785       } else {
1786         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1787             getActiveUser(), Action.READ, table, families);
1788       }
1789       logResult(authResult);
1790       if (authorizationEnabled && !authResult.isAllowed()) {
1791         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1792       }
1793     }
1794     return result;
1795   }
1796 
1797   @Override
1798   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1799       final byte [] row, final byte [] family, final byte [] qualifier,
1800       final CompareFilter.CompareOp compareOp,
1801       final ByteArrayComparable comparator, final Delete delete,
1802       final boolean result) throws IOException {
1803     // An ACL on a delete is useless, we shouldn't allow it
1804     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1805       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1806           delete.toString());
1807     }
1808     // Require READ and WRITE permissions on the table, CF, and the KV covered
1809     // by the delete
1810     RegionCoprocessorEnvironment env = c.getEnvironment();
1811     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1812     User user = getActiveUser();
1813     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1814       Action.READ, Action.WRITE);
1815     logResult(authResult);
1816     if (!authResult.isAllowed()) {
1817       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1818         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1819       } else if (authorizationEnabled) {
1820         throw new AccessDeniedException("Insufficient permissions " +
1821           authResult.toContextString());
1822       }
1823     }
1824     return result;
1825   }
1826 
1827   @Override
1828   public boolean preCheckAndDeleteAfterRowLock(
1829       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1830       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1831       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1832       throws IOException {
1833     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1834       // We had failure with table, cf and q perm checks and now giving a chance for cell
1835       // perm check
1836       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1837       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1838       AuthResult authResult = null;
1839       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1840           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1841         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1842             getActiveUser(), Action.READ, table, families);
1843       } else {
1844         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1845             getActiveUser(), Action.READ, table, families);
1846       }
1847       logResult(authResult);
1848       if (authorizationEnabled && !authResult.isAllowed()) {
1849         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1850       }
1851     }
1852     return result;
1853   }
1854 
1855   @Override
1856   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1857       final byte [] row, final byte [] family, final byte [] qualifier,
1858       final long amount, final boolean writeToWAL)
1859       throws IOException {
1860     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1861     // incremented value
1862     RegionCoprocessorEnvironment env = c.getEnvironment();
1863     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1864     User user = getActiveUser();
1865     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1866       Action.WRITE);
1867     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1868       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1869         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1870       authResult.setReason("Covering cell set");
1871     }
1872     logResult(authResult);
1873     if (authorizationEnabled && !authResult.isAllowed()) {
1874       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1875     }
1876     return -1;
1877   }
1878 
1879   @Override
1880   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1881       throws IOException {
1882     User user = getActiveUser();
1883     checkForReservedTagPresence(user, append);
1884 
1885     // Require WRITE permission to the table, CF, and the KV to be appended
1886     RegionCoprocessorEnvironment env = c.getEnvironment();
1887     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1888     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1889     logResult(authResult);
1890     if (!authResult.isAllowed()) {
1891       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1892         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1893       } else if (authorizationEnabled)  {
1894         throw new AccessDeniedException("Insufficient permissions " +
1895           authResult.toContextString());
1896       }
1897     }
1898 
1899     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1900     if (bytes != null) {
1901       if (cellFeaturesEnabled) {
1902         addCellPermissions(bytes, append.getFamilyCellMap());
1903       } else {
1904         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1905       }
1906     }
1907 
1908     return null;
1909   }
1910 
1911   @Override
1912   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1913       final Append append) throws IOException {
1914     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1915       // We had failure with table, cf and q perm checks and now giving a chance for cell
1916       // perm check
1917       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1918       AuthResult authResult = null;
1919       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1920           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1921         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1922             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1923       } else {
1924         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1925             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1926       }
1927       logResult(authResult);
1928       if (authorizationEnabled && !authResult.isAllowed()) {
1929         throw new AccessDeniedException("Insufficient permissions " +
1930           authResult.toContextString());
1931       }
1932     }
1933     return null;
1934   }
1935 
1936   @Override
1937   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1938       final Increment increment)
1939       throws IOException {
1940     User user = getActiveUser();
1941     checkForReservedTagPresence(user, increment);
1942 
1943     // Require WRITE permission to the table, CF, and the KV to be replaced by
1944     // the incremented value
1945     RegionCoprocessorEnvironment env = c.getEnvironment();
1946     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1947     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1948       Action.WRITE);
1949     logResult(authResult);
1950     if (!authResult.isAllowed()) {
1951       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1952         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1953       } else if (authorizationEnabled) {
1954         throw new AccessDeniedException("Insufficient permissions " +
1955           authResult.toContextString());
1956       }
1957     }
1958 
1959     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1960     if (bytes != null) {
1961       if (cellFeaturesEnabled) {
1962         addCellPermissions(bytes, increment.getFamilyCellMap());
1963       } else {
1964         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1965       }
1966     }
1967 
1968     return null;
1969   }
1970 
1971   @Override
1972   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1973       final Increment increment) throws IOException {
1974     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1975       // We had failure with table, cf and q perm checks and now giving a chance for cell
1976       // perm check
1977       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1978       AuthResult authResult = null;
1979       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1980           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1981         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1982             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1983       } else {
1984         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1985             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1986       }
1987       logResult(authResult);
1988       if (authorizationEnabled && !authResult.isAllowed()) {
1989         throw new AccessDeniedException("Insufficient permissions " +
1990           authResult.toContextString());
1991       }
1992     }
1993     return null;
1994   }
1995 
1996   @Override
1997   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1998       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1999     // If the HFile version is insufficient to persist tags, we won't have any
2000     // work to do here
2001     if (!cellFeaturesEnabled) {
2002       return newCell;
2003     }
2004 
2005     // Collect any ACLs from the old cell
2006     List<Tag> tags = Lists.newArrayList();
2007     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
2008     if (oldCell != null) {
2009       // Save an object allocation where we can
2010       if (oldCell.getTagsLength() > 0) {
2011         Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell.getTagsArray(),
2012           oldCell.getTagsOffset(), oldCell.getTagsLength());
2013         while (tagIterator.hasNext()) {
2014           Tag tag = tagIterator.next();
2015           if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
2016             // Not an ACL tag, just carry it through
2017             if (LOG.isTraceEnabled()) {
2018               LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
2019                 " length " + tag.getTagLength());
2020             }
2021             tags.add(tag);
2022           } else {
2023             // Merge the perms from the older ACL into the current permission set
2024             // TODO: The efficiency of this can be improved. Don't build just to unpack
2025             // again, use the builder
2026             AccessControlProtos.UsersAndPermissions.Builder builder =
2027               AccessControlProtos.UsersAndPermissions.newBuilder();
2028             ProtobufUtil.mergeFrom(builder, tag.getBuffer(), tag.getTagOffset(), tag.getTagLength());
2029             ListMultimap<String,Permission> kvPerms =
2030               ProtobufUtil.toUsersAndPermissions(builder.build());
2031             perms.putAll(kvPerms);
2032           }
2033         }
2034       }
2035     }
2036 
2037     // Do we have an ACL on the operation?
2038     byte[] aclBytes = mutation.getACL();
2039     if (aclBytes != null) {
2040       // Yes, use it
2041       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
2042     } else {
2043       // No, use what we carried forward
2044       if (perms != null) {
2045         // TODO: If we collected ACLs from more than one tag we may have a
2046         // List<Permission> of size > 1, this can be collapsed into a single
2047         // Permission
2048         if (LOG.isTraceEnabled()) {
2049           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
2050         }
2051         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
2052           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
2053       }
2054     }
2055 
2056     // If we have no tags to add, just return
2057     if (tags.isEmpty()) {
2058       return newCell;
2059     }
2060 
2061     Cell rewriteCell = new TagRewriteCell(newCell, Tag.fromList(tags));
2062     return rewriteCell;
2063   }
2064 
2065   @Override
2066   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2067       final Scan scan, final RegionScanner s) throws IOException {
2068     internalPreRead(c, scan, OpType.SCAN);
2069     return s;
2070   }
2071 
2072   @Override
2073   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2074       final Scan scan, final RegionScanner s) throws IOException {
2075     User user = getActiveUser();
2076     if (user != null && user.getShortName() != null) {
2077       // store reference to scanner owner for later checks
2078       scannerOwners.put(s, user.getShortName());
2079     }
2080     return s;
2081   }
2082 
2083   @Override
2084   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
2085       final InternalScanner s, final List<Result> result,
2086       final int limit, final boolean hasNext) throws IOException {
2087     requireScannerOwner(s);
2088     return hasNext;
2089   }
2090 
2091   @Override
2092   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2093       final InternalScanner s) throws IOException {
2094     requireScannerOwner(s);
2095   }
2096 
2097   @Override
2098   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2099       final InternalScanner s) throws IOException {
2100     // clean up any associated owner mapping
2101     scannerOwners.remove(s);
2102   }
2103 
2104   /**
2105    * Verify, when servicing an RPC, that the caller is the scanner owner.
2106    * If so, we assume that access control is correctly enforced based on
2107    * the checks performed in preScannerOpen()
2108    */
2109   private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
2110     if (!RpcServer.isInRpcCallContext())
2111       return;
2112     String requestUserName = RpcServer.getRequestUserName();
2113     String owner = scannerOwners.get(s);
2114     if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
2115       throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
2116     }
2117   }
2118 
2119   /**
2120    * Verifies user has CREATE privileges on
2121    * the Column Families involved in the bulkLoadHFile
2122    * request. Specific Column Write privileges are presently
2123    * ignored.
2124    */
2125   @Override
2126   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
2127       List<Pair<byte[], String>> familyPaths) throws IOException {
2128     for(Pair<byte[],String> el : familyPaths) {
2129       requirePermission("preBulkLoadHFile",
2130           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
2131           el.getFirst(),
2132           null,
2133           Action.CREATE);
2134     }
2135   }
2136 
2137   /**
2138    * Authorization check for
2139    * SecureBulkLoadProtocol.prepareBulkLoad()
2140    * @param ctx the context
2141    * @param request the request
2142    * @throws IOException
2143    */
2144   @Override
2145   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2146                                  PrepareBulkLoadRequest request) throws IOException {
2147     requireAccess("prePareBulkLoad",
2148         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2149   }
2150 
2151   /**
2152    * Authorization security check for
2153    * SecureBulkLoadProtocol.cleanupBulkLoad()
2154    * @param ctx the context
2155    * @param request the request
2156    * @throws IOException
2157    */
2158   @Override
2159   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2160                                  CleanupBulkLoadRequest request) throws IOException {
2161     requireAccess("preCleanupBulkLoad",
2162         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2163   }
2164 
2165   /* ---- EndpointObserver implementation ---- */
2166 
2167   @Override
2168   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2169       Service service, String methodName, Message request) throws IOException {
2170     // Don't intercept calls to our own AccessControlService, we check for
2171     // appropriate permissions in the service handlers
2172     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2173       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
2174         methodName + ")",
2175         getTableName(ctx.getEnvironment()), null, null,
2176         Action.EXEC);
2177     }
2178     return request;
2179   }
2180 
2181   @Override
2182   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2183       Service service, String methodName, Message request, Message.Builder responseBuilder)
2184       throws IOException { }
2185 
2186   /* ---- Protobuf AccessControlService implementation ---- */
2187 
2188   @Override
2189   public void grant(RpcController controller,
2190                     AccessControlProtos.GrantRequest request,
2191                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2192     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2193     AccessControlProtos.GrantResponse response = null;
2194     try {
2195       // verify it's only running at .acl.
2196       if (aclRegion) {
2197         if (!initialized) {
2198           throw new CoprocessorException("AccessController not yet initialized");
2199         }
2200         if (LOG.isDebugEnabled()) {
2201           LOG.debug("Received request to grant access permission " + perm.toString());
2202         }
2203 
2204         switch(request.getUserPermission().getPermission().getType()) {
2205           case Global :
2206           case Table :
2207             requirePermission("grant", perm.getTableName(), perm.getFamily(),
2208               perm.getQualifier(), Action.ADMIN);
2209             break;
2210           case Namespace :
2211             requireNamespacePermission("grant", perm.getNamespace(), Action.ADMIN);
2212            break;
2213         }
2214 
2215         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2216           @Override
2217           public Void run() throws Exception {
2218             AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2219             return null;
2220           }
2221         });
2222 
2223         if (AUDITLOG.isTraceEnabled()) {
2224           // audit log should store permission changes in addition to auth results
2225           AUDITLOG.trace("Granted permission " + perm.toString());
2226         }
2227       } else {
2228         throw new CoprocessorException(AccessController.class, "This method "
2229             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2230       }
2231       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2232     } catch (IOException ioe) {
2233       // pass exception back up
2234       ResponseConverter.setControllerException(controller, ioe);
2235     }
2236     done.run(response);
2237   }
2238 
2239   @Override
2240   public void revoke(RpcController controller,
2241                      AccessControlProtos.RevokeRequest request,
2242                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2243     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2244     AccessControlProtos.RevokeResponse response = null;
2245     try {
2246       // only allowed to be called on _acl_ region
2247       if (aclRegion) {
2248         if (!initialized) {
2249           throw new CoprocessorException("AccessController not yet initialized");
2250         }
2251         if (LOG.isDebugEnabled()) {
2252           LOG.debug("Received request to revoke access permission " + perm.toString());
2253         }
2254 
2255         switch(request.getUserPermission().getPermission().getType()) {
2256           case Global :
2257           case Table :
2258             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2259               perm.getQualifier(), Action.ADMIN);
2260             break;
2261           case Namespace :
2262             requireNamespacePermission("revoke", perm.getNamespace(), Action.ADMIN);
2263             break;
2264         }
2265 
2266         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2267           @Override
2268           public Void run() throws Exception {
2269             AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2270             return null;
2271           }
2272         });
2273 
2274         if (AUDITLOG.isTraceEnabled()) {
2275           // audit log should record all permission changes
2276           AUDITLOG.trace("Revoked permission " + perm.toString());
2277         }
2278       } else {
2279         throw new CoprocessorException(AccessController.class, "This method "
2280             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2281       }
2282       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2283     } catch (IOException ioe) {
2284       // pass exception back up
2285       ResponseConverter.setControllerException(controller, ioe);
2286     }
2287     done.run(response);
2288   }
2289 
2290   @Override
2291   public void getUserPermissions(RpcController controller,
2292                                  AccessControlProtos.GetUserPermissionsRequest request,
2293                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2294     AccessControlProtos.GetUserPermissionsResponse response = null;
2295     try {
2296       // only allowed to be called on _acl_ region
2297       if (aclRegion) {
2298         if (!initialized) {
2299           throw new CoprocessorException("AccessController not yet initialized");
2300         }
2301         List<UserPermission> perms = null;
2302         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2303           final TableName table = request.hasTableName() ?
2304             ProtobufUtil.toTableName(request.getTableName()) : null;
2305           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2306           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2307             @Override
2308             public List<UserPermission> run() throws Exception {
2309               return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2310             }
2311           });
2312         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2313           final String namespace = request.getNamespaceName().toStringUtf8();
2314           requireNamespacePermission("userPermissions", namespace, Action.ADMIN);
2315           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2316             @Override
2317             public List<UserPermission> run() throws Exception {
2318               return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2319                 namespace);
2320             }
2321           });
2322         } else {
2323           requirePermission("userPermissions", Action.ADMIN);
2324           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2325             @Override
2326             public List<UserPermission> run() throws Exception {
2327               return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2328             }
2329           });
2330         }
2331         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2332       } else {
2333         throw new CoprocessorException(AccessController.class, "This method "
2334             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2335       }
2336     } catch (IOException ioe) {
2337       // pass exception back up
2338       ResponseConverter.setControllerException(controller, ioe);
2339     }
2340     done.run(response);
2341   }
2342 
2343   @Override
2344   public void checkPermissions(RpcController controller,
2345                                AccessControlProtos.CheckPermissionsRequest request,
2346                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2347     Permission[] permissions = new Permission[request.getPermissionCount()];
2348     for (int i=0; i < request.getPermissionCount(); i++) {
2349       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2350     }
2351     AccessControlProtos.CheckPermissionsResponse response = null;
2352     try {
2353       User user = getActiveUser();
2354       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2355       for (Permission permission : permissions) {
2356         if (permission instanceof TablePermission) {
2357           // Check table permissions
2358 
2359           TablePermission tperm = (TablePermission) permission;
2360           for (Action action : permission.getActions()) {
2361             if (!tperm.getTableName().equals(tableName)) {
2362               throw new CoprocessorException(AccessController.class, String.format("This method "
2363                   + "can only execute at the table specified in TablePermission. " +
2364                   "Table of the region:%s , requested table:%s", tableName,
2365                   tperm.getTableName()));
2366             }
2367 
2368             Map<byte[], Set<byte[]>> familyMap =
2369                 new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2370             if (tperm.getFamily() != null) {
2371               if (tperm.getQualifier() != null) {
2372                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2373                 qualifiers.add(tperm.getQualifier());
2374                 familyMap.put(tperm.getFamily(), qualifiers);
2375               } else {
2376                 familyMap.put(tperm.getFamily(), null);
2377               }
2378             }
2379 
2380             AuthResult result = permissionGranted("checkPermissions", user, action, regionEnv,
2381               familyMap);
2382             logResult(result);
2383             if (!result.isAllowed()) {
2384               // Even if passive we need to throw an exception here, we support checking
2385               // effective permissions, so throw unconditionally
2386               throw new AccessDeniedException("Insufficient permissions (table=" + tableName +
2387                 (familyMap.size() > 0 ? ", family: " + result.toFamilyString() : "") +
2388                 ", action=" + action.toString() + ")");
2389             }
2390           }
2391 
2392         } else {
2393           // Check global permissions
2394 
2395           for (Action action : permission.getActions()) {
2396             AuthResult result;
2397             if (authManager.authorize(user, action)) {
2398               result = AuthResult.allow("checkPermissions", "Global action allowed", user,
2399                 action, null, null);
2400             } else {
2401               result = AuthResult.deny("checkPermissions", "Global action denied", user, action,
2402                 null, null);
2403             }
2404             logResult(result);
2405             if (!result.isAllowed()) {
2406               // Even if passive we need to throw an exception here, we support checking
2407               // effective permissions, so throw unconditionally
2408               throw new AccessDeniedException("Insufficient permissions (action=" +
2409                 action.toString() + ")");
2410             }
2411           }
2412         }
2413       }
2414       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2415     } catch (IOException ioe) {
2416       ResponseConverter.setControllerException(controller, ioe);
2417     }
2418     done.run(response);
2419   }
2420 
2421   @Override
2422   public Service getService() {
2423     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2424   }
2425 
2426   private Region getRegion(RegionCoprocessorEnvironment e) {
2427     return e.getRegion();
2428   }
2429 
2430   private TableName getTableName(RegionCoprocessorEnvironment e) {
2431     Region region = e.getRegion();
2432     if (region != null) {
2433       return getTableName(region);
2434     }
2435     return null;
2436   }
2437 
2438   private TableName getTableName(Region region) {
2439     HRegionInfo regionInfo = region.getRegionInfo();
2440     if (regionInfo != null) {
2441       return regionInfo.getTable();
2442     }
2443     return null;
2444   }
2445 
2446   @Override
2447   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2448       throws IOException {
2449     requirePermission("preClose", Action.ADMIN);
2450   }
2451 
2452   private void checkSystemOrSuperUser() throws IOException {
2453     // No need to check if we're not going to throw
2454     if (!authorizationEnabled) {
2455       return;
2456     }
2457     User activeUser = getActiveUser();
2458     if (!Superusers.isSuperUser(activeUser)) {
2459       throw new AccessDeniedException("User '" + (activeUser != null ?
2460         activeUser.getShortName() : "null") + "' is not system or super user.");
2461     }
2462   }
2463 
2464   @Override
2465   public void preStopRegionServer(
2466       ObserverContext<RegionServerCoprocessorEnvironment> env)
2467       throws IOException {
2468     requirePermission("preStopRegionServer", Action.ADMIN);
2469   }
2470 
2471   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2472       byte[] qualifier) {
2473     if (family == null) {
2474       return null;
2475     }
2476 
2477     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2478     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2479     return familyMap;
2480   }
2481 
2482   @Override
2483   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2484        List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2485        String regex) throws IOException {
2486     // We are delegating the authorization check to postGetTableDescriptors as we don't have
2487     // any concrete set of table names when a regex is present or the full list is requested.
2488     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2489       // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2490       // request can be granted.
2491       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2492       for (TableName tableName: tableNamesList) {
2493         // Skip checks for a table that does not exist
2494         if (masterServices.getTableDescriptors().get(tableName) == null) {
2495           continue;
2496         }
2497         requirePermission("getTableDescriptors", tableName, null, null,
2498             Action.ADMIN, Action.CREATE);
2499       }
2500     }
2501   }
2502 
2503   @Override
2504   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2505       List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2506       String regex) throws IOException {
2507     // Skipping as checks in this case are already done by preGetTableDescriptors.
2508     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2509       return;
2510     }
2511 
2512     // Retains only those which passes authorization checks, as the checks weren't done as part
2513     // of preGetTableDescriptors.
2514     Iterator<HTableDescriptor> itr = descriptors.iterator();
2515     while (itr.hasNext()) {
2516       HTableDescriptor htd = itr.next();
2517       try {
2518         requirePermission("getTableDescriptors", htd.getTableName(), null, null,
2519             Action.ADMIN, Action.CREATE);
2520       } catch (AccessDeniedException e) {
2521         itr.remove();
2522       }
2523     }
2524   }
2525 
2526   @Override
2527   public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2528       List<HTableDescriptor> descriptors, String regex) throws IOException {
2529     // Retains only those which passes authorization checks.
2530     Iterator<HTableDescriptor> itr = descriptors.iterator();
2531     while (itr.hasNext()) {
2532       HTableDescriptor htd = itr.next();
2533       try {
2534         requireAccess("getTableNames", htd.getTableName(), Action.values());
2535       } catch (AccessDeniedException e) {
2536         itr.remove();
2537       }
2538     }
2539   }
2540 
2541   @Override
2542   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA,
2543       Region regionB) throws IOException {
2544     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2545       Action.ADMIN);
2546   }
2547 
2548   @Override
2549   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, Region regionA,
2550       Region regionB, Region mergedRegion) throws IOException { }
2551 
2552   @Override
2553   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2554       Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException { }
2555 
2556   @Override
2557   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2558       Region regionA, Region regionB, Region mergedRegion) throws IOException { }
2559 
2560   @Override
2561   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2562       Region regionA, Region regionB) throws IOException { }
2563 
2564   @Override
2565   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2566       Region regionA, Region regionB) throws IOException { }
2567 
2568   @Override
2569   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2570       throws IOException {
2571     requirePermission("preRollLogWriterRequest", Permission.Action.ADMIN);
2572   }
2573 
2574   @Override
2575   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2576       throws IOException { }
2577 
2578   @Override
2579   public ReplicationEndpoint postCreateReplicationEndPoint(
2580       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2581     return endpoint;
2582   }
2583 
2584   @Override
2585   public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2586       List<WALEntry> entries, CellScanner cells) throws IOException {
2587     requirePermission("replicateLogEntries", Action.WRITE);
2588   }
2589 
2590   @Override
2591   public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2592       List<WALEntry> entries, CellScanner cells) throws IOException {
2593   }
2594   
2595   @Override
2596   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2597       final String userName, final Quotas quotas) throws IOException {
2598     requirePermission("setUserQuota", Action.ADMIN);
2599   }
2600 
2601   @Override
2602   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2603       final String userName, final TableName tableName, final Quotas quotas) throws IOException {
2604     requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN);
2605   }
2606 
2607   @Override
2608   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2609       final String userName, final String namespace, final Quotas quotas) throws IOException {
2610     requirePermission("setUserNamespaceQuota", Action.ADMIN);
2611   }
2612 
2613   @Override
2614   public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2615       final TableName tableName, final Quotas quotas) throws IOException {
2616     requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
2617   }
2618 
2619   @Override
2620   public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2621       final String namespace, final Quotas quotas) throws IOException {
2622     requirePermission("setNamespaceQuota", Action.ADMIN);
2623   }
2624 }