View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package org.apache.hadoop.hbase.security.access;
16  
17  import java.io.IOException;
18  import java.net.InetAddress;
19  import java.util.Collection;
20  import java.util.Collections;
21  import java.util.HashMap;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  import java.util.Set;
27  import java.util.TreeMap;
28  import java.util.TreeSet;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.classification.InterfaceAudience;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.CellScanner;
36  import org.apache.hadoop.hbase.CellUtil;
37  import org.apache.hadoop.hbase.CompoundConfiguration;
38  import org.apache.hadoop.hbase.CoprocessorEnvironment;
39  import org.apache.hadoop.hbase.DoNotRetryIOException;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.KeyValue.Type;
46  import org.apache.hadoop.hbase.KeyValueUtil;
47  import org.apache.hadoop.hbase.NamespaceDescriptor;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.TableNotDisabledException;
51  import org.apache.hadoop.hbase.TableNotFoundException;
52  import org.apache.hadoop.hbase.Tag;
53  import org.apache.hadoop.hbase.MetaTableAccessor;
54  import org.apache.hadoop.hbase.client.Append;
55  import org.apache.hadoop.hbase.client.Delete;
56  import org.apache.hadoop.hbase.client.Durability;
57  import org.apache.hadoop.hbase.client.Get;
58  import org.apache.hadoop.hbase.client.Increment;
59  import org.apache.hadoop.hbase.client.Mutation;
60  import org.apache.hadoop.hbase.client.Put;
61  import org.apache.hadoop.hbase.client.Query;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.client.Scan;
64  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
65  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
66  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
67  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
68  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
69  import org.apache.hadoop.hbase.coprocessor.MasterObserver;
70  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
71  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
72  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
73  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
74  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
75  import org.apache.hadoop.hbase.filter.CompareFilter;
76  import org.apache.hadoop.hbase.filter.Filter;
77  import org.apache.hadoop.hbase.filter.FilterList;
78  import org.apache.hadoop.hbase.io.hfile.HFile;
79  import org.apache.hadoop.hbase.ipc.RequestContext;
80  import org.apache.hadoop.hbase.master.MasterServices;
81  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
82  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
83  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
84  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
85  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
86  import org.apache.hadoop.hbase.regionserver.HRegion;
87  import org.apache.hadoop.hbase.regionserver.InternalScanner;
88  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
89  import org.apache.hadoop.hbase.regionserver.RegionScanner;
90  import org.apache.hadoop.hbase.regionserver.ScanType;
91  import org.apache.hadoop.hbase.regionserver.Store;
92  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
93  import org.apache.hadoop.hbase.security.AccessDeniedException;
94  import org.apache.hadoop.hbase.security.User;
95  import org.apache.hadoop.hbase.security.UserProvider;
96  import org.apache.hadoop.hbase.security.access.Permission.Action;
97  import org.apache.hadoop.hbase.util.ByteRange;
98  import org.apache.hadoop.hbase.util.Bytes;
99  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
100 import org.apache.hadoop.hbase.util.Pair;
101 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
102 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
103 
104 import com.google.common.collect.ArrayListMultimap;
105 import com.google.common.collect.ImmutableSet;
106 import com.google.common.collect.ListMultimap;
107 import com.google.common.collect.Lists;
108 import com.google.common.collect.MapMaker;
109 import com.google.common.collect.Maps;
110 import com.google.common.collect.Sets;
111 import com.google.protobuf.Message;
112 import com.google.protobuf.RpcCallback;
113 import com.google.protobuf.RpcController;
114 import com.google.protobuf.Service;
115 
116 /**
117  * Provides basic authorization checks for data access and administrative
118  * operations.
119  *
120  * <p>
121  * {@code AccessController} performs authorization checks for HBase operations
122  * based on:
123  * <ul>
124  *   <li>the identity of the user performing the operation</li>
125  *   <li>the scope over which the operation is performed, in increasing
126  *   specificity: global, table, column family, or qualifier</li>
127  *   <li>the type of action being performed (as mapped to
128  *   {@link Permission.Action} values)</li>
129  * </ul>
130  * If the authorization check fails, an {@link AccessDeniedException}
131  * will be thrown for the operation.
132  * </p>
133  *
134  * <p>
135  * To perform authorization checks, {@code AccessController} relies on the
136  * RpcServerEngine being loaded to provide
137  * the user identities for remote requests.
138  * </p>
139  *
140  * <p>
141  * The access control lists used for authorization can be manipulated via the
142  * exposed {@link AccessControlService} Interface implementation, and the associated
143  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
144  * commands.
145  * </p>
146  */
147 @InterfaceAudience.Private
148 public class AccessController extends BaseMasterAndRegionObserver
149     implements RegionServerObserver,
150       AccessControlService.Interface, CoprocessorService, EndpointObserver {
151 
152   public static final Log LOG = LogFactory.getLog(AccessController.class);
153 
154   private static final Log AUDITLOG =
155     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
156   private static final String CHECK_COVERING_PERM = "check_covering_perm";
157   private static final String TAG_CHECK_PASSED = "tag_check_passed";
158   private static final byte[] TRUE = Bytes.toBytes(true);
159 
160   TableAuthManager authManager = null;
161 
162   // flags if we are running on a region of the _acl_ table
163   boolean aclRegion = false;
164 
165   // defined only for Endpoint implementation, so it can have way to
166   // access region services.
167   private RegionCoprocessorEnvironment regionEnv;
168 
169   /** Mapping of scanner instances to the user who created them */
170   private Map<InternalScanner,String> scannerOwners =
171       new MapMaker().weakKeys().makeMap();
172 
173   // Provider for mapping principal names to Users
174   private UserProvider userProvider;
175 
176   // The list of users with superuser authority
177   private List<String> superusers;
178 
179   // if we are able to support cell ACLs
180   boolean cellFeaturesEnabled;
181 
182   // if we should check EXEC permissions
183   boolean shouldCheckExecPermission;
184 
185   // if we should terminate access checks early as soon as table or CF grants
186   // allow access; pre-0.98 compatible behavior
187   boolean compatibleEarlyTermination;
188 
189   private volatile boolean initialized = false;
190 
191   // This boolean having relevance only in the Master.
192   private volatile boolean aclTabAvailable = false;
193 
194   public HRegion getRegion() {
195     return regionEnv != null ? regionEnv.getRegion() : null;
196   }
197 
198   public TableAuthManager getAuthManager() {
199     return authManager;
200   }
201 
202   void initialize(RegionCoprocessorEnvironment e) throws IOException {
203     final HRegion region = e.getRegion();
204     Configuration conf = e.getConfiguration();
205     Map<byte[], ListMultimap<String,TablePermission>> tables =
206         AccessControlLists.loadAll(region);
207     // For each table, write out the table's permissions to the respective
208     // znode for that table.
209     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
210       tables.entrySet()) {
211       byte[] entry = t.getKey();
212       ListMultimap<String,TablePermission> perms = t.getValue();
213       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
214       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
215     }
216     initialized = true;
217   }
218 
219   /**
220    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
221    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
222    * table updates.
223    */
224   void updateACL(RegionCoprocessorEnvironment e,
225       final Map<byte[], List<Cell>> familyMap) {
226     Set<byte[]> entries =
227         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
228     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
229       List<Cell> cells = f.getValue();
230       for (Cell cell: cells) {
231         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
232         if (Bytes.equals(kv.getFamilyArray(), kv.getFamilyOffset(),
233             kv.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
234             AccessControlLists.ACL_LIST_FAMILY.length)) {
235           entries.add(kv.getRow());
236         }
237       }
238     }
239     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
240     Configuration conf = regionEnv.getConfiguration();
241     for (byte[] entry: entries) {
242       try {
243         ListMultimap<String,TablePermission> perms =
244           AccessControlLists.getPermissions(conf, entry);
245         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
246         zkw.writeToZookeeper(entry, serialized);
247       } catch (IOException ex) {
248         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
249             ex);
250       }
251     }
252   }
253 
254   /**
255    * Check the current user for authorization to perform a specific action
256    * against the given set of row data.
257    *
258    * <p>Note: Ordering of the authorization checks
259    * has been carefully optimized to short-circuit the most common requests
260    * and minimize the amount of processing required.</p>
261    *
262    * @param permRequest the action being requested
263    * @param e the coprocessor environment
264    * @param families the map of column families to qualifiers present in
265    * the request
266    * @return an authorization result
267    */
268   AuthResult permissionGranted(String request, User user, Action permRequest,
269       RegionCoprocessorEnvironment e,
270       Map<byte [], ? extends Collection<?>> families) {
271     HRegionInfo hri = e.getRegion().getRegionInfo();
272     TableName tableName = hri.getTable();
273 
274     // 1. All users need read access to hbase:meta table.
275     // this is a very common operation, so deal with it quickly.
276     if (hri.isMetaRegion()) {
277       if (permRequest == Action.READ) {
278         return AuthResult.allow(request, "All users allowed", user,
279           permRequest, tableName, families);
280       }
281     }
282 
283     if (user == null) {
284       return AuthResult.deny(request, "No user associated with request!", null,
285         permRequest, tableName, families);
286     }
287 
288     // Users with CREATE/ADMIN rights need to modify hbase:meta and _acl_ table
289     // e.g. When a new table is created a new entry in hbase:meta is added,
290     // so the user need to be allowed to write on it.
291     // e.g. When a table is removed an entry is removed from hbase:meta and _acl_
292     // and the user need to be allowed to write on both tables.
293     if (permRequest == Action.WRITE &&
294        (hri.isMetaRegion() ||
295         Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) &&
296        (authManager.authorize(user, Action.CREATE) ||
297         authManager.authorize(user, Action.ADMIN)))
298     {
299        return AuthResult.allow(request, "Table permission granted", user,
300         permRequest, tableName, families);
301     }
302 
303     // 2. check for the table-level, if successful we can short-circuit
304     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
305       return AuthResult.allow(request, "Table permission granted", user,
306         permRequest, tableName, families);
307     }
308 
309     // 3. check permissions against the requested families
310     if (families != null && families.size() > 0) {
311       // all families must pass
312       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
313         // a) check for family level access
314         if (authManager.authorize(user, tableName, family.getKey(),
315             permRequest)) {
316           continue;  // family-level permission overrides per-qualifier
317         }
318 
319         // b) qualifier level access can still succeed
320         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
321           if (family.getValue() instanceof Set) {
322             // for each qualifier of the family
323             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
324             for (byte[] qualifier : familySet) {
325               if (!authManager.authorize(user, tableName, family.getKey(),
326                                          qualifier, permRequest)) {
327                 return AuthResult.deny(request, "Failed qualifier check", user,
328                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
329               }
330             }
331           } else if (family.getValue() instanceof List) { // List<KeyValue>
332             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
333             for (KeyValue kv : kvList) {
334               if (!authManager.authorize(user, tableName, family.getKey(),
335                       kv.getQualifier(), permRequest)) {
336                 return AuthResult.deny(request, "Failed qualifier check", user,
337                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
338               }
339             }
340           }
341         } else {
342           // no qualifiers and family-level check already failed
343           return AuthResult.deny(request, "Failed family check", user, permRequest,
344               tableName, makeFamilyMap(family.getKey(), null));
345         }
346       }
347 
348       // all family checks passed
349       return AuthResult.allow(request, "All family checks passed", user, permRequest,
350           tableName, families);
351     }
352 
353     // 4. no families to check and table level access failed
354     return AuthResult.deny(request, "No families to check and table permission failed",
355         user, permRequest, tableName, families);
356   }
357 
358   /**
359    * Check the current user for authorization to perform a specific action
360    * against the given set of row data.
361    * @param opType the operation type
362    * @param user the user
363    * @param e the coprocessor environment
364    * @param families the map of column families to qualifiers present in
365    * the request
366    * @param actions the desired actions
367    * @return an authorization result
368    */
369   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
370       Map<byte [], ? extends Collection<?>> families, Action... actions) {
371     AuthResult result = null;
372     for (Action action: actions) {
373       result = permissionGranted(opType.toString(), user, action, e, families);
374       if (!result.isAllowed()) {
375         return result;
376       }
377     }
378     return result;
379   }
380 
381   private void logResult(AuthResult result) {
382     if (AUDITLOG.isTraceEnabled()) {
383       RequestContext ctx = RequestContext.get();
384       InetAddress remoteAddr = null;
385       if (ctx != null) {
386         remoteAddr = ctx.getRemoteAddress();
387       }
388       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
389           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
390           "; reason: " + result.getReason() +
391           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
392           "; request: " + result.getRequest() +
393           "; context: " + result.toContextString());
394     }
395   }
396 
397   /**
398    * Returns the active user to which authorization checks should be applied.
399    * If we are in the context of an RPC call, the remote user is used,
400    * otherwise the currently logged in user is used.
401    */
402   private User getActiveUser() throws IOException {
403     User user = RequestContext.getRequestUser();
404     if (!RequestContext.isInRequestContext()) {
405       // for non-rpc handling, fallback to system user
406       user = userProvider.getCurrent();
407     }
408     return user;
409   }
410 
411   /**
412    * Authorizes that the current user has any of the given permissions for the
413    * given table, column family and column qualifier.
414    * @param tableName Table requested
415    * @param family Column family requested
416    * @param qualifier Column qualifier requested
417    * @throws IOException if obtaining the current user fails
418    * @throws AccessDeniedException if user has no authorization
419    */
420   private void requirePermission(String request, TableName tableName, byte[] family, byte[] qualifier,
421       Action... permissions) throws IOException {
422     User user = getActiveUser();
423     AuthResult result = null;
424 
425     for (Action permission : permissions) {
426       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
427         result = AuthResult.allow(request, "Table permission granted", user,
428                                   permission, tableName, family, qualifier);
429         break;
430       } else {
431         // rest of the world
432         result = AuthResult.deny(request, "Insufficient permissions", user,
433                                  permission, tableName, family, qualifier);
434       }
435     }
436     logResult(result);
437     if (!result.isAllowed()) {
438       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
439     }
440   }
441 
442   /**
443    * Authorizes that the current user has global privileges for the given action.
444    * @param perm The action being requested
445    * @throws IOException if obtaining the current user fails
446    * @throws AccessDeniedException if authorization is denied
447    */
448   private void requirePermission(String request, Action perm) throws IOException {
449     requireGlobalPermission(request, perm, null, null);
450   }
451 
452   /**
453    * Authorizes that the current user has permission to perform the given
454    * action on the set of table column families.
455    * @param perm Action that is required
456    * @param env The current coprocessor environment
457    * @param families The map of column families-qualifiers.
458    * @throws AccessDeniedException if the authorization check failed
459    */
460   private void requirePermission(String request, Action perm,
461         RegionCoprocessorEnvironment env,
462         Map<byte[], ? extends Collection<?>> families)
463       throws IOException {
464     User user = getActiveUser();
465     AuthResult result = permissionGranted(request, user, perm, env, families);
466     logResult(result);
467 
468     if (!result.isAllowed()) {
469       throw new AccessDeniedException("Insufficient permissions (table=" +
470         env.getRegion().getTableDesc().getTableName()+
471         ((families != null && families.size() > 0) ? ", family: " +
472         result.toFamilyString() : "") + ", action=" +
473         perm.toString() + ")");
474     }
475   }
476 
477   /**
478    * Checks that the user has the given global permission. The generated
479    * audit log message will contain context information for the operation
480    * being authorized, based on the given parameters.
481    * @param perm Action being requested
482    * @param tableName Affected table name.
483    * @param familyMap Affected column families.
484    */
485   private void requireGlobalPermission(String request, Action perm, TableName tableName,
486       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
487     User user = getActiveUser();
488     if (authManager.authorize(user, perm)) {
489       logResult(AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap));
490     } else {
491       logResult(AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap));
492       throw new AccessDeniedException("Insufficient permissions for user '" +
493           (user != null ? user.getShortName() : "null") +"' (global, action=" +
494           perm.toString() + ")");
495     }
496   }
497 
498   /**
499    * Checks that the user has the given global permission. The generated
500    * audit log message will contain context information for the operation
501    * being authorized, based on the given parameters.
502    * @param perm Action being requested
503    * @param namespace
504    */
505   private void requireGlobalPermission(String request, Action perm,
506                                        String namespace) throws IOException {
507     User user = getActiveUser();
508     if (authManager.authorize(user, perm)) {
509       logResult(AuthResult.allow(request, "Global check allowed", user, perm, namespace));
510     } else {
511       logResult(AuthResult.deny(request, "Global check failed", user, perm, namespace));
512       throw new AccessDeniedException("Insufficient permissions for user '" +
513           (user != null ? user.getShortName() : "null") +"' (global, action=" +
514           perm.toString() + ")");
515     }
516   }
517 
518   /**
519    * Returns <code>true</code> if the current user is allowed the given action
520    * over at least one of the column qualifiers in the given column families.
521    */
522   private boolean hasFamilyQualifierPermission(User user,
523       Action perm,
524       RegionCoprocessorEnvironment env,
525       Map<byte[], ? extends Collection<byte[]>> familyMap)
526     throws IOException {
527     HRegionInfo hri = env.getRegion().getRegionInfo();
528     TableName tableName = hri.getTable();
529 
530     if (user == null) {
531       return false;
532     }
533 
534     if (familyMap != null && familyMap.size() > 0) {
535       // at least one family must be allowed
536       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
537           familyMap.entrySet()) {
538         if (family.getValue() != null && !family.getValue().isEmpty()) {
539           for (byte[] qualifier : family.getValue()) {
540             if (authManager.matchPermission(user, tableName,
541                 family.getKey(), qualifier, perm)) {
542               return true;
543             }
544           }
545         } else {
546           if (authManager.matchPermission(user, tableName, family.getKey(),
547               perm)) {
548             return true;
549           }
550         }
551       }
552     } else if (LOG.isDebugEnabled()) {
553       LOG.debug("Empty family map passed for permission check");
554     }
555 
556     return false;
557   }
558 
559   private enum OpType {
560     GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
561     GET("get"),
562     EXISTS("exists"),
563     SCAN("scan"),
564     PUT("put"),
565     DELETE("delete"),
566     CHECK_AND_PUT("checkAndPut"),
567     CHECK_AND_DELETE("checkAndDelete"),
568     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
569     APPEND("append"),
570     INCREMENT("increment");
571 
572     private String type;
573 
574     private OpType(String type) {
575       this.type = type;
576     }
577 
578     @Override
579     public String toString() {
580       return type;
581     }
582   }
583 
584   /**
585    * Determine if cell ACLs covered by the operation grant access. This is expensive.
586    * @return false if cell ACLs failed to grant access, true otherwise
587    * @throws IOException
588    */
589   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
590       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
591       throws IOException {
592     if (!cellFeaturesEnabled) {
593       return false;
594     }
595     long cellGrants = 0;
596     User user = getActiveUser();
597     long latestCellTs = 0;
598     Get get = new Get(row);
599     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
600     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
601     // version. We have to get every cell version and check its TS against the TS asked for in
602     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
603     // consider only one such passing cell. In case of Delete we have to consider all the cell
604     // versions under this passing version. When Delete Mutation contains columns which are a
605     // version delete just consider only one version for those column cells.
606     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
607     if (considerCellTs) {
608       get.setMaxVersions();
609     } else {
610       get.setMaxVersions(1);
611     }
612     boolean diffCellTsFromOpTs = false;
613     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
614       byte[] col = entry.getKey();
615       // TODO: HBASE-7114 could possibly unify the collection type in family
616       // maps so we would not need to do this
617       if (entry.getValue() instanceof Set) {
618         Set<byte[]> set = (Set<byte[]>)entry.getValue();
619         if (set == null || set.isEmpty()) {
620           get.addFamily(col);
621         } else {
622           for (byte[] qual: set) {
623             get.addColumn(col, qual);
624           }
625         }
626       } else if (entry.getValue() instanceof List) {
627         List<Cell> list = (List<Cell>)entry.getValue();
628         if (list == null || list.isEmpty()) {
629           get.addFamily(col);
630         } else {
631           // In case of family delete, a Cell will be added into the list with Qualifier as null.
632           for (Cell cell : list) {
633             if (cell.getQualifierLength() == 0
634                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
635                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
636               get.addFamily(col);
637             } else {
638               get.addColumn(col, CellUtil.cloneQualifier(cell));
639             }
640             if (considerCellTs) {
641               long cellTs = cell.getTimestamp();
642               latestCellTs = Math.max(latestCellTs, cellTs);
643               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
644             }
645           }
646         }
647       } else {
648         throw new RuntimeException("Unhandled collection type " +
649           entry.getValue().getClass().getName());
650       }
651     }
652     // We want to avoid looking into the future. So, if the cells of the
653     // operation specify a timestamp, or the operation itself specifies a
654     // timestamp, then we use the maximum ts found. Otherwise, we bound
655     // the Get to the current server time. We add 1 to the timerange since
656     // the upper bound of a timerange is exclusive yet we need to examine
657     // any cells found there inclusively.
658     long latestTs = Math.max(opTs, latestCellTs);
659     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
660       latestTs = EnvironmentEdgeManager.currentTimeMillis();
661     }
662     get.setTimeRange(0, latestTs + 1);
663     // In case of Put operation we set to read all versions. This was done to consider the case
664     // where columns are added with TS other than the Mutation TS. But normally this wont be the
665     // case with Put. There no need to get all versions but get latest version only.
666     if (!diffCellTsFromOpTs && request == OpType.PUT) {
667       get.setMaxVersions(1);
668     }
669     if (LOG.isTraceEnabled()) {
670       LOG.trace("Scanning for cells with " + get);
671     }
672     // This Map is identical to familyMap. The key is a BR rather than byte[].
673     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
674     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
675     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
676     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
677       if (entry.getValue() instanceof List) {
678         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
679       }
680     }
681     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
682     List<Cell> cells = Lists.newArrayList();
683     Cell prevCell = null;
684     ByteRange curFam = new SimpleMutableByteRange();
685     boolean curColAllVersions = (request == OpType.DELETE);
686     long curColCheckTs = opTs;
687     boolean foundColumn = false;
688     try {
689       boolean more = false;
690       do {
691         cells.clear();
692         // scan with limit as 1 to hold down memory use on wide rows
693         more = scanner.next(cells, 1);
694         for (Cell cell: cells) {
695           if (LOG.isTraceEnabled()) {
696             LOG.trace("Found cell " + cell);
697           }
698           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
699           if (colChange) foundColumn = false;
700           prevCell = cell;
701           if (!curColAllVersions && foundColumn) {
702             continue;
703           }
704           if (colChange && considerCellTs) {
705             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
706             List<Cell> cols = familyMap1.get(curFam);
707             for (Cell col : cols) {
708               // null/empty qualifier is used to denote a Family delete. The TS and delete type
709               // associated with this is applicable for all columns within the family. That is
710               // why the below (col.getQualifierLength() == 0) check.
711               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
712                   || CellUtil.matchingQualifier(cell, col)) {
713                 byte type = col.getTypeByte();
714                 if (considerCellTs) {
715                   curColCheckTs = col.getTimestamp();
716                 }
717                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
718                 // a version delete for a column no need to check all the covering cells within
719                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
720                 // One version delete types are Delete/DeleteFamilyVersion
721                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
722                     || (KeyValue.Type.DeleteFamily.getCode() == type);
723                 break;
724               }
725             }
726           }
727           if (cell.getTimestamp() > curColCheckTs) {
728             // Just ignore this cell. This is not a covering cell.
729             continue;
730           }
731           foundColumn = true;
732           for (Action action: actions) {
733             // Are there permissions for this user for the cell?
734             if (!authManager.authorize(user, getTableName(e), cell, action)) {
735               // We can stop if the cell ACL denies access
736               return false;
737             }
738           }
739           cellGrants++;
740         }
741       } while (more);
742     } catch (AccessDeniedException ex) {
743       throw ex;
744     } catch (IOException ex) {
745       LOG.error("Exception while getting cells to calculate covering permission", ex);
746     } finally {
747       scanner.close();
748     }
749     // We should not authorize unless we have found one or more cell ACLs that
750     // grant access. This code is used to check for additional permissions
751     // after no table or CF grants are found.
752     return cellGrants > 0;
753   }
754 
755   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
756     // Iterate over the entries in the familyMap, replacing the cells therein
757     // with new cells including the ACL data
758     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
759       List<Cell> newCells = Lists.newArrayList();
760       for (Cell cell: e.getValue()) {
761         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
762         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
763         if (cell.getTagsLength() > 0) {
764           Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
765             cell.getTagsOffset(), cell.getTagsLength());
766           while (tagIterator.hasNext()) {
767             tags.add(tagIterator.next());
768           }
769         }
770         // Ensure KeyValue so we can do a scatter gather copy. This is only a win if the
771         // incoming cell type is actually KeyValue.
772         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
773         newCells.add(
774           new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
775             kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(),
776             kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
777             kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()),
778             kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(),
779             tags));
780       }
781       // This is supposed to be safe, won't CME
782       e.setValue(newCells);
783     }
784   }
785 
786   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
787   // type is reserved and should not be explicitly set by user.
788   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
789     // Superusers are allowed to store cells unconditionally.
790     if (superusers.contains(user.getShortName())) {
791       return;
792     }
793     // We already checked (prePut vs preBatchMutation)
794     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
795       return;
796     }
797     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
798       Cell cell = cellScanner.current();
799       if (cell.getTagsLength() > 0) {
800         Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
801           cell.getTagsLength());
802         while (tagsItr.hasNext()) {
803           if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
804             throw new AccessDeniedException("Mutation contains cell with reserved type tag");
805           }
806         }
807       }
808     }
809     m.setAttribute(TAG_CHECK_PASSED, TRUE);
810   }
811 
812   /* ---- MasterObserver implementation ---- */
813 
814   public void start(CoprocessorEnvironment env) throws IOException {
815     CompoundConfiguration conf = new CompoundConfiguration();
816     conf.add(env.getConfiguration());
817 
818     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
819       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
820 
821     cellFeaturesEnabled = HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS;
822     if (!cellFeaturesEnabled) {
823       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
824           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
825           + " accordingly.");
826     }
827 
828     ZooKeeperWatcher zk = null;
829     if (env instanceof MasterCoprocessorEnvironment) {
830       // if running on HMaster
831       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
832       zk = mEnv.getMasterServices().getZooKeeper();
833     } else if (env instanceof RegionServerCoprocessorEnvironment) {
834       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
835       zk = rsEnv.getRegionServerServices().getZooKeeper();
836     } else if (env instanceof RegionCoprocessorEnvironment) {
837       // if running at region
838       regionEnv = (RegionCoprocessorEnvironment) env;
839       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
840       zk = regionEnv.getRegionServerServices().getZooKeeper();
841       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
842         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
843     }
844 
845     // set the user-provider.
846     this.userProvider = UserProvider.instantiate(env.getConfiguration());
847 
848     // set up the list of users with superuser privilege
849     User user = userProvider.getCurrent();
850     superusers = Lists.asList(user.getShortName(),
851       conf.getStrings(AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
852 
853     // If zk is null or IOException while obtaining auth manager,
854     // throw RuntimeException so that the coprocessor is unloaded.
855     if (zk != null) {
856       try {
857         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
858       } catch (IOException ioe) {
859         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
860       }
861     } else {
862       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
863     }
864   }
865 
866   public void stop(CoprocessorEnvironment env) {
867 
868   }
869 
870   @Override
871   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
872       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
873     Set<byte[]> families = desc.getFamiliesKeys();
874     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
875     for (byte[] family: families) {
876       familyMap.put(family, null);
877     }
878     requireGlobalPermission("createTable", Action.CREATE, desc.getTableName(), familyMap);
879   }
880 
881   @Override
882   public void postCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
883       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
884     // When AC is used, it should be configured as the 1st CP.
885     // In Master, the table operations like create, are handled by a Thread pool but the max size
886     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
887     // sequentially only.
888     // Related code in HMaster#startServiceThreads
889     // {code}
890     //   // We depend on there being only one instance of this executor running
891     //   // at a time. To do concurrency, would need fencing of enable/disable of
892     //   // tables.
893     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
894     // {code}
895     // In future if we change this pool to have more threads, then there is a chance for thread,
896     // creating acl table, getting delayed and by that time another table creation got over and
897     // this hook is getting called. In such a case, we will need a wait logic here which will
898     // wait till the acl table is created.
899     if (AccessControlLists.isAclTable(desc)) {
900       this.aclTabAvailable = true;
901     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
902       if (!aclTabAvailable) {
903         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
904             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
905             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
906       } else {
907         String owner = desc.getOwnerString();
908         // default the table owner to current user, if not specified.
909         if (owner == null)
910           owner = getActiveUser().getShortName();
911         UserPermission userperm = new UserPermission(Bytes.toBytes(owner), desc.getTableName(),
912             null, Action.values());
913         AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(), userperm);
914       }
915     }
916   }
917 
918   @Override
919   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
920       throws IOException {
921     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
922   }
923 
924   @Override
925   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
926       TableName tableName) throws IOException {
927     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(), tableName);
928   }
929 
930   @Override
931   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
932       throws IOException {
933     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
934   }
935 
936   @Override
937   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
938       HTableDescriptor htd) throws IOException {
939     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
940   }
941 
942   @Override
943   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
944       TableName tableName, HTableDescriptor htd) throws IOException {
945     String owner = htd.getOwnerString();
946     // default the table owner to current user, if not specified.
947     if (owner == null) owner = getActiveUser().getShortName();
948     UserPermission userperm = new UserPermission(Bytes.toBytes(owner), htd.getTableName(), null,
949         Action.values());
950     AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(), userperm);
951   }
952 
953   @Override
954   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
955       HColumnDescriptor column) throws IOException {
956     requirePermission("addColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
957   }
958 
959   @Override
960   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
961       HColumnDescriptor descriptor) throws IOException {
962     requirePermission("modifyColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
963   }
964 
965   @Override
966   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
967       byte[] col) throws IOException {
968     requirePermission("deleteColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
969   }
970 
971   @Override
972   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
973       TableName tableName, byte[] col) throws IOException {
974     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(), tableName, col);
975   }
976 
977   @Override
978   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
979       throws IOException {
980     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
981   }
982 
983   @Override
984   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
985       throws IOException {
986     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
987       throw new AccessDeniedException("Not allowed to disable "
988           + AccessControlLists.ACL_TABLE_NAME + " table.");
989     }
990     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
991   }
992 
993   @Override
994   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
995       ServerName srcServer, ServerName destServer) throws IOException {
996     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
997   }
998 
999   @Override
1000   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1001       throws IOException {
1002     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1003   }
1004 
1005   @Override
1006   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1007       boolean force) throws IOException {
1008     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1009   }
1010 
1011   @Override
1012   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1013       HRegionInfo regionInfo) throws IOException {
1014     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1015   }
1016 
1017   @Override
1018   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1019       throws IOException {
1020     requirePermission("balance", Action.ADMIN);
1021   }
1022 
1023   @Override
1024   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1025       boolean newValue) throws IOException {
1026     requirePermission("balanceSwitch", Action.ADMIN);
1027     return newValue;
1028   }
1029 
1030   @Override
1031   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1032       throws IOException {
1033     requirePermission("shutdown", Action.ADMIN);
1034   }
1035 
1036   @Override
1037   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1038       throws IOException {
1039     requirePermission("stopMaster", Action.ADMIN);
1040   }
1041 
1042   @Override
1043   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1044       throws IOException {
1045     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1046       .getShortCircuitConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1047       // initialize the ACL storage table
1048       AccessControlLists.init(ctx.getEnvironment().getMasterServices());
1049     } else {
1050       aclTabAvailable = true;
1051     }
1052   }
1053 
1054   @Override
1055   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1056       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1057       throws IOException {
1058     requirePermission("snapshot", Action.ADMIN);
1059   }
1060 
1061   @Override
1062   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1063       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1064       throws IOException {
1065     requirePermission("clone", Action.ADMIN);
1066   }
1067 
1068   @Override
1069   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1070       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1071       throws IOException {
1072     requirePermission("restore", Action.ADMIN);
1073   }
1074 
1075   @Override
1076   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1077       final SnapshotDescription snapshot) throws IOException {
1078     requirePermission("deleteSnapshot", Action.ADMIN);
1079   }
1080 
1081   @Override
1082   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1083       NamespaceDescriptor ns) throws IOException {
1084     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1085   }
1086 
1087   @Override
1088   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1089       throws IOException {
1090     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1091   }
1092 
1093   @Override
1094   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1095                                   String namespace) throws IOException {
1096     AccessControlLists.removeNamespacePermissions(ctx.getEnvironment().getConfiguration(),
1097         namespace);
1098     LOG.info(namespace + "entry deleted in "+AccessControlLists.ACL_TABLE_NAME+" table.");
1099   }
1100 
1101   @Override
1102   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1103       NamespaceDescriptor ns) throws IOException {
1104     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1105   }
1106 
1107   @Override
1108   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1109       final TableName tableName) throws IOException {
1110     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1111   }
1112 
1113   /* ---- RegionObserver implementation ---- */
1114 
1115   @Override
1116   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1117       throws IOException {
1118     RegionCoprocessorEnvironment env = e.getEnvironment();
1119     final HRegion region = env.getRegion();
1120     if (region == null) {
1121       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1122     } else {
1123       HRegionInfo regionInfo = region.getRegionInfo();
1124       if (regionInfo.getTable().isSystemTable()) {
1125         isSystemOrSuperUser(regionEnv.getConfiguration());
1126       } else {
1127         requirePermission("preOpen", Action.ADMIN);
1128       }
1129     }
1130   }
1131 
1132   @Override
1133   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1134     RegionCoprocessorEnvironment env = c.getEnvironment();
1135     final HRegion region = env.getRegion();
1136     if (region == null) {
1137       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1138       return;
1139     }
1140     if (AccessControlLists.isAclRegion(region)) {
1141       aclRegion = true;
1142       // When this region is under recovering state, initialize will be handled by postLogReplay
1143       if (!region.isRecovering()) {
1144         try {
1145           initialize(env);
1146         } catch (IOException ex) {
1147           // if we can't obtain permissions, it's better to fail
1148           // than perform checks incorrectly
1149           throw new RuntimeException("Failed to initialize permissions cache", ex);
1150         }
1151       }
1152     } else {
1153       initialized = true;
1154     }
1155   }
1156 
1157   @Override
1158   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1159     if (aclRegion) {
1160       try {
1161         initialize(c.getEnvironment());
1162       } catch (IOException ex) {
1163         // if we can't obtain permissions, it's better to fail
1164         // than perform checks incorrectly
1165         throw new RuntimeException("Failed to initialize permissions cache", ex);
1166       }
1167     }
1168   }
1169 
1170   @Override
1171   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1172     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1173         Action.CREATE);
1174   }
1175 
1176   @Override
1177   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1178     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1179   }
1180 
1181   @Override
1182   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1183       byte[] splitRow) throws IOException {
1184     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1185   }
1186 
1187   @Override
1188   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1189       final Store store, final InternalScanner scanner, final ScanType scanType)
1190           throws IOException {
1191     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1192         Action.CREATE);
1193     return scanner;
1194   }
1195 
1196   @Override
1197   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1198       final byte [] row, final byte [] family, final Result result)
1199       throws IOException {
1200     assert family != null;
1201     RegionCoprocessorEnvironment env = c.getEnvironment();
1202     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, null);
1203     User user = getActiveUser();
1204     AuthResult authResult = permissionGranted(OpType.GET_CLOSEST_ROW_BEFORE, user, env, families,
1205       Action.READ);
1206     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1207       authResult.setAllowed(checkCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, env, row,
1208         families, HConstants.LATEST_TIMESTAMP, Action.READ));
1209       authResult.setReason("Covering cell set");
1210     }
1211     logResult(authResult);
1212     if (!authResult.isAllowed()) {
1213       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1214     }
1215   }
1216 
1217   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1218       final Query query, OpType opType) throws IOException {
1219     Filter filter = query.getFilter();
1220     // Don't wrap an AccessControlFilter
1221     if (filter != null && filter instanceof AccessControlFilter) {
1222       return;
1223     }
1224     User user = getActiveUser();
1225     RegionCoprocessorEnvironment env = c.getEnvironment();
1226     Map<byte[],? extends Collection<byte[]>> families = null;
1227     switch (opType) {
1228     case GET:
1229     case EXISTS:
1230       families = ((Get)query).getFamilyMap();
1231       break;
1232     case SCAN:
1233       families = ((Scan)query).getFamilyMap();
1234       break;
1235     default:
1236       throw new RuntimeException("Unhandled operation " + opType);
1237     }
1238     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1239     HRegion region = getRegion(env);
1240     TableName table = getTableName(region);
1241     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1242     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1243       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1244     }
1245     if (!authResult.isAllowed()) {
1246       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1247         // Old behavior: Scan with only qualifier checks if we have partial
1248         // permission. Backwards compatible behavior is to throw an
1249         // AccessDeniedException immediately if there are no grants for table
1250         // or CF or CF+qual. Only proceed with an injected filter if there are
1251         // grants for qualifiers. Otherwise we will fall through below and log
1252         // the result and throw an ADE. We may end up checking qualifier
1253         // grants three times (permissionGranted above, here, and in the
1254         // filter) but that's the price of backwards compatibility.
1255         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1256           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1257             AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1258             cfVsMaxVersions);
1259           // wrap any existing filter
1260           if (filter != null) {
1261             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1262               Lists.newArrayList(ourFilter, filter));
1263           }
1264           authResult.setAllowed(true);;
1265           authResult.setReason("Access allowed with filter");
1266           switch (opType) {
1267           case GET:
1268           case EXISTS:
1269             ((Get)query).setFilter(ourFilter);
1270             break;
1271           case SCAN:
1272             ((Scan)query).setFilter(ourFilter);
1273             break;
1274           default:
1275             throw new RuntimeException("Unhandled operation " + opType);
1276           }
1277         }
1278       } else {
1279         // New behavior: Any access we might be granted is more fine-grained
1280         // than whole table or CF. Simply inject a filter and return what is
1281         // allowed. We will not throw an AccessDeniedException. This is a
1282         // behavioral change since 0.96.
1283         Filter ourFilter = new AccessControlFilter(authManager, user, table,
1284           AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1285         // wrap any existing filter
1286         if (filter != null) {
1287           ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1288             Lists.newArrayList(ourFilter, filter));
1289         }
1290         authResult.setAllowed(true);;
1291         authResult.setReason("Access allowed with filter");
1292         switch (opType) {
1293         case GET:
1294         case EXISTS:
1295           ((Get)query).setFilter(ourFilter);
1296           break;
1297         case SCAN:
1298           ((Scan)query).setFilter(ourFilter);
1299           break;
1300         default:
1301           throw new RuntimeException("Unhandled operation " + opType);
1302         }
1303       }
1304     }
1305 
1306     logResult(authResult);
1307     if (!authResult.isAllowed()) {
1308       throw new AccessDeniedException("Insufficient permissions (table=" + table +
1309         ", action=READ)");
1310     }
1311   }
1312 
1313   @Override
1314   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1315       final Get get, final List<Cell> result) throws IOException {
1316     internalPreRead(c, get, OpType.GET);
1317   }
1318 
1319   @Override
1320   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1321       final Get get, final boolean exists) throws IOException {
1322     internalPreRead(c, get, OpType.EXISTS);
1323     return exists;
1324   }
1325 
1326   @Override
1327   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1328       final Put put, final WALEdit edit, final Durability durability)
1329       throws IOException {
1330     // Require WRITE permission to the table, CF, or top visible value, if any.
1331     // NOTE: We don't need to check the permissions for any earlier Puts
1332     // because we treat the ACLs in each Put as timestamped like any other
1333     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1334     // change the ACL of any previous Put. This allows simple evolution of
1335     // security policy over time without requiring expensive updates.
1336     User user = getActiveUser();
1337     checkForReservedTagPresence(user, put);
1338     RegionCoprocessorEnvironment env = c.getEnvironment();
1339     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1340     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1341     logResult(authResult);
1342     if (!authResult.isAllowed()) {
1343       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1344         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1345       } else {
1346         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1347       }
1348     }
1349     // Add cell ACLs from the operation to the cells themselves
1350     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1351     if (bytes != null) {
1352       if (cellFeaturesEnabled) {
1353         addCellPermissions(bytes, put.getFamilyCellMap());
1354       } else {
1355         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1356       }
1357     }
1358   }
1359 
1360   @Override
1361   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1362       final Put put, final WALEdit edit, final Durability durability) {
1363     if (aclRegion) {
1364       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1365     }
1366   }
1367 
1368   @Override
1369   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1370       final Delete delete, final WALEdit edit, final Durability durability)
1371       throws IOException {
1372     // An ACL on a delete is useless, we shouldn't allow it
1373     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1374       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1375     }
1376     // Require WRITE permissions on all cells covered by the delete. Unlike
1377     // for Puts we need to check all visible prior versions, because a major
1378     // compaction could remove them. If the user doesn't have permission to
1379     // overwrite any of the visible versions ('visible' defined as not covered
1380     // by a tombstone already) then we have to disallow this operation.
1381     RegionCoprocessorEnvironment env = c.getEnvironment();
1382     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1383     User user = getActiveUser();
1384     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1385     logResult(authResult);
1386     if (!authResult.isAllowed()) {
1387       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1388         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1389       } else {
1390         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1391       }
1392     }
1393   }
1394 
1395   @Override
1396   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1397       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1398     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1399       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1400       for (int i = 0; i < miniBatchOp.size(); i++) {
1401         Mutation m = miniBatchOp.getOperation(i);
1402         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1403           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1404           // perm check
1405           OpType opType;
1406           if (m instanceof Put) {
1407             checkForReservedTagPresence(getActiveUser(), m);
1408             opType = OpType.PUT;
1409           } else {
1410             opType = OpType.DELETE;
1411           }
1412           AuthResult authResult = null;
1413           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(), m.getFamilyCellMap(),
1414               m.getTimeStamp(), Action.WRITE)) {
1415             authResult = AuthResult.allow(opType.toString(), "Covering cell set", getActiveUser(),
1416                 Action.WRITE, table, m.getFamilyCellMap());
1417           } else {
1418             authResult = AuthResult.deny(opType.toString(), "Covering cell set", getActiveUser(),
1419                 Action.WRITE, table, m.getFamilyCellMap());
1420           }
1421           logResult(authResult);
1422           if (!authResult.isAllowed()) {
1423             throw new AccessDeniedException("Insufficient permissions "
1424                 + authResult.toContextString());
1425           }
1426         }
1427       }
1428     }
1429   }
1430 
1431   @Override
1432   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1433       final Delete delete, final WALEdit edit, final Durability durability)
1434       throws IOException {
1435     if (aclRegion) {
1436       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1437     }
1438   }
1439 
1440   @Override
1441   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1442       final byte [] row, final byte [] family, final byte [] qualifier,
1443       final CompareFilter.CompareOp compareOp,
1444       final ByteArrayComparable comparator, final Put put,
1445       final boolean result) throws IOException {
1446     // Require READ and WRITE permissions on the table, CF, and KV to update
1447     User user = getActiveUser();
1448     checkForReservedTagPresence(user, put);
1449     RegionCoprocessorEnvironment env = c.getEnvironment();
1450     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1451     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1452       Action.READ, Action.WRITE);
1453     logResult(authResult);
1454     if (!authResult.isAllowed()) {
1455       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1456         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1457       } else {
1458         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1459       }
1460     }
1461     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1462     if (bytes != null) {
1463       if (cellFeaturesEnabled) {
1464         addCellPermissions(bytes, put.getFamilyCellMap());
1465       } else {
1466         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1467       }
1468     }
1469     return result;
1470   }
1471 
1472   @Override
1473   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1474       final byte[] row, final byte[] family, final byte[] qualifier,
1475       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1476       final boolean result) throws IOException {
1477     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1478       // We had failure with table, cf and q perm checks and now giving a chance for cell
1479       // perm check
1480       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1481       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1482       AuthResult authResult = null;
1483       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1484           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1485         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1486             getActiveUser(), Action.READ, table, families);
1487       } else {
1488         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1489             getActiveUser(), Action.READ, table, families);
1490       }
1491       logResult(authResult);
1492       if (!authResult.isAllowed()) {
1493         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1494       }
1495     }
1496     return result;
1497   }
1498 
1499   @Override
1500   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1501       final byte [] row, final byte [] family, final byte [] qualifier,
1502       final CompareFilter.CompareOp compareOp,
1503       final ByteArrayComparable comparator, final Delete delete,
1504       final boolean result) throws IOException {
1505     // An ACL on a delete is useless, we shouldn't allow it
1506     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1507       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1508           delete.toString());
1509     }
1510     // Require READ and WRITE permissions on the table, CF, and the KV covered
1511     // by the delete
1512     RegionCoprocessorEnvironment env = c.getEnvironment();
1513     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1514     User user = getActiveUser();
1515     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1516       Action.READ, Action.WRITE);
1517     logResult(authResult);
1518     if (!authResult.isAllowed()) {
1519       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1520         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1521       } else {
1522         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1523       }
1524     }
1525     return result;
1526   }
1527 
1528   @Override
1529   public boolean preCheckAndDeleteAfterRowLock(
1530       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1531       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1532       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1533       throws IOException {
1534     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1535       // We had failure with table, cf and q perm checks and now giving a chance for cell
1536       // perm check
1537       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1538       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1539       AuthResult authResult = null;
1540       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1541           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1542         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1543             getActiveUser(), Action.READ, table, families);
1544       } else {
1545         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1546             getActiveUser(), Action.READ, table, families);
1547       }
1548       logResult(authResult);
1549       if (!authResult.isAllowed()) {
1550         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1551       }
1552     }
1553     return result;
1554   }
1555 
1556   @Override
1557   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1558       final byte [] row, final byte [] family, final byte [] qualifier,
1559       final long amount, final boolean writeToWAL)
1560       throws IOException {
1561     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1562     // incremented value
1563     RegionCoprocessorEnvironment env = c.getEnvironment();
1564     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1565     User user = getActiveUser();
1566     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1567       Action.WRITE);
1568     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1569       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1570         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1571       authResult.setReason("Covering cell set");
1572     }
1573     logResult(authResult);
1574     if (!authResult.isAllowed()) {
1575       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1576     }
1577     return -1;
1578   }
1579 
1580   @Override
1581   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1582       throws IOException {
1583     // Require WRITE permission to the table, CF, and the KV to be appended
1584     User user = getActiveUser();
1585     checkForReservedTagPresence(user, append);
1586     RegionCoprocessorEnvironment env = c.getEnvironment();
1587     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1588     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1589     logResult(authResult);
1590     if (!authResult.isAllowed()) {
1591       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1592         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1593       } else {
1594         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1595       }
1596     }
1597     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1598     if (bytes != null) {
1599       if (cellFeaturesEnabled) {
1600         addCellPermissions(bytes, append.getFamilyCellMap());
1601       } else {
1602         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1603       }
1604     }
1605     return null;
1606   }
1607 
1608   @Override
1609   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1610       final Append append) throws IOException {
1611     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1612       // We had failure with table, cf and q perm checks and now giving a chance for cell
1613       // perm check
1614       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1615       AuthResult authResult = null;
1616       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1617           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1618         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1619             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1620       } else {
1621         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1622             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1623       }
1624       logResult(authResult);
1625       if (!authResult.isAllowed()) {
1626         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1627       }
1628     }
1629     return null;
1630   }
1631 
1632   @Override
1633   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1634       final Increment increment)
1635       throws IOException {
1636     // Require WRITE permission to the table, CF, and the KV to be replaced by
1637     // the incremented value
1638     User user = getActiveUser();
1639     checkForReservedTagPresence(user, increment);
1640     RegionCoprocessorEnvironment env = c.getEnvironment();
1641     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1642     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1643       Action.WRITE);
1644     logResult(authResult);
1645     if (!authResult.isAllowed()) {
1646       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1647         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1648       } else {
1649         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1650       }
1651     }
1652     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1653     if (bytes != null) {
1654       if (cellFeaturesEnabled) {
1655         addCellPermissions(bytes, increment.getFamilyCellMap());
1656       } else {
1657         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1658       }
1659     }
1660     return null;
1661   }
1662 
1663   @Override
1664   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1665       final Increment increment) throws IOException {
1666     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1667       // We had failure with table, cf and q perm checks and now giving a chance for cell
1668       // perm check
1669       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1670       AuthResult authResult = null;
1671       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1672           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1673         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1674             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1675       } else {
1676         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1677             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1678       }
1679       logResult(authResult);
1680       if (!authResult.isAllowed()) {
1681         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1682       }
1683     }
1684     return null;
1685   }
1686 
1687   @Override
1688   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1689       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1690     // If the HFile version is insufficient to persist tags, we won't have any
1691     // work to do here
1692     if (!cellFeaturesEnabled) {
1693       return newCell;
1694     }
1695 
1696     // Collect any ACLs from the old cell
1697     List<Tag> tags = Lists.newArrayList();
1698     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1699     if (oldCell != null) {
1700       // Save an object allocation where we can
1701       if (oldCell.getTagsLength() > 0) {
1702         Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell.getTagsArray(),
1703           oldCell.getTagsOffset(), oldCell.getTagsLength());
1704         while (tagIterator.hasNext()) {
1705           Tag tag = tagIterator.next();
1706           if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
1707             // Not an ACL tag, just carry it through
1708             if (LOG.isTraceEnabled()) {
1709               LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
1710                 " length " + tag.getTagLength());
1711             }
1712             tags.add(tag);
1713           } else {
1714             // Merge the perms from the older ACL into the current permission set
1715             ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
1716               AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
1717                 tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
1718             perms.putAll(kvPerms);
1719           }
1720         }
1721       }
1722     }
1723 
1724     // Do we have an ACL on the operation?
1725     byte[] aclBytes = mutation.getACL();
1726     if (aclBytes != null) {
1727       // Yes, use it
1728       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
1729     } else {
1730       // No, use what we carried forward
1731       if (perms != null) {
1732         // TODO: If we collected ACLs from more than one tag we may have a
1733         // List<Permission> of size > 1, this can be collapsed into a single
1734         // Permission
1735         if (LOG.isTraceEnabled()) {
1736           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1737         }
1738         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
1739           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
1740       }
1741     }
1742 
1743     // If we have no tags to add, just return
1744     if (tags.isEmpty()) {
1745       return newCell;
1746     }
1747 
1748     // We need to create another KV, unfortunately, because the current new KV
1749     // has no space for tags
1750     KeyValue newKv = KeyValueUtil.ensureKeyValue(newCell);
1751     KeyValue rewriteKv = new KeyValue(newKv.getRowArray(), newKv.getRowOffset(), newKv.getRowLength(),
1752       newKv.getFamilyArray(), newKv.getFamilyOffset(), newKv.getFamilyLength(),
1753       newKv.getQualifierArray(), newKv.getQualifierOffset(), newKv.getQualifierLength(),
1754       newKv.getTimestamp(), KeyValue.Type.codeToType(newKv.getTypeByte()),
1755       newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(),
1756       tags);
1757     // Preserve mvcc data
1758     rewriteKv.setSequenceId(newKv.getMvccVersion());
1759     return rewriteKv;
1760   }
1761 
1762   @Override
1763   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1764       final Scan scan, final RegionScanner s) throws IOException {
1765     internalPreRead(c, scan, OpType.SCAN);
1766     return s;
1767   }
1768 
1769   @Override
1770   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1771       final Scan scan, final RegionScanner s) throws IOException {
1772     User user = getActiveUser();
1773     if (user != null && user.getShortName() != null) {      // store reference to scanner owner for later checks
1774       scannerOwners.put(s, user.getShortName());
1775     }
1776     return s;
1777   }
1778 
1779   @Override
1780   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1781       final InternalScanner s, final List<Result> result,
1782       final int limit, final boolean hasNext) throws IOException {
1783     requireScannerOwner(s);
1784     return hasNext;
1785   }
1786 
1787   @Override
1788   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1789       final InternalScanner s) throws IOException {
1790     requireScannerOwner(s);
1791   }
1792 
1793   @Override
1794   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1795       final InternalScanner s) throws IOException {
1796     // clean up any associated owner mapping
1797     scannerOwners.remove(s);
1798   }
1799 
1800   /**
1801    * Verify, when servicing an RPC, that the caller is the scanner owner.
1802    * If so, we assume that access control is correctly enforced based on
1803    * the checks performed in preScannerOpen()
1804    */
1805   private void requireScannerOwner(InternalScanner s)
1806       throws AccessDeniedException {
1807     if (RequestContext.isInRequestContext()) {
1808       String requestUserName = RequestContext.getRequestUserName();
1809       String owner = scannerOwners.get(s);
1810       if (owner != null && !owner.equals(requestUserName)) {
1811         throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
1812       }
1813     }
1814   }
1815 
1816   /**
1817    * Verifies user has WRITE privileges on
1818    * the Column Families involved in the bulkLoadHFile
1819    * request. Specific Column Write privileges are presently
1820    * ignored.
1821    */
1822   @Override
1823   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1824       List<Pair<byte[], String>> familyPaths) throws IOException {
1825     for(Pair<byte[],String> el : familyPaths) {
1826       requirePermission("preBulkLoadHFile",
1827           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
1828           el.getFirst(),
1829           null,
1830           Action.CREATE);
1831     }
1832   }
1833 
1834   private AuthResult hasSomeAccess(RegionCoprocessorEnvironment e, String method, Action action) throws IOException {
1835     User requestUser = getActiveUser();
1836     TableName tableName = e.getRegion().getTableDesc().getTableName();
1837     AuthResult authResult = permissionGranted(method, requestUser,
1838         action, e, Collections.EMPTY_MAP);
1839     if (!authResult.isAllowed()) {
1840       for(UserPermission userPerm:
1841           AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), tableName)) {
1842         for(Action userAction: userPerm.getActions()) {
1843           if(userAction.equals(action)) {
1844             return AuthResult.allow(method, "Access allowed", requestUser,
1845                 action, tableName, null, null);
1846           }
1847         }
1848       }
1849     }
1850     return authResult;
1851   }
1852 
1853   /**
1854    * Authorization check for
1855    * SecureBulkLoadProtocol.prepareBulkLoad()
1856    * @param e
1857    * @throws IOException
1858    */
1859   //TODO this should end up as a coprocessor hook
1860   public void prePrepareBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
1861     AuthResult authResult = hasSomeAccess(e, "prePrepareBulkLoad", Action.WRITE);
1862     logResult(authResult);
1863     if (!authResult.isAllowed()) {
1864       throw new AccessDeniedException("Insufficient permissions (table=" +
1865         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
1866     }
1867   }
1868 
1869   /**
1870    * Authorization security check for
1871    * SecureBulkLoadProtocol.cleanupBulkLoad()
1872    * @param e
1873    * @throws IOException
1874    */
1875   //TODO this should end up as a coprocessor hook
1876   public void preCleanupBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
1877     AuthResult authResult = hasSomeAccess(e, "preCleanupBulkLoad", Action.WRITE);
1878     logResult(authResult);
1879     if (!authResult.isAllowed()) {
1880       throw new AccessDeniedException("Insufficient permissions (table=" +
1881         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
1882     }
1883   }
1884 
1885   /* ---- EndpointObserver implementation ---- */
1886 
1887   @Override
1888   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1889       Service service, String methodName, Message request) throws IOException {
1890     // Don't intercept calls to our own AccessControlService, we check for
1891     // appropriate permissions in the service handlers
1892     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
1893       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
1894         methodName + ")",
1895         getTableName(ctx.getEnvironment()), null, null,
1896         Action.EXEC);
1897     }
1898     return request;
1899   }
1900 
1901   @Override
1902   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1903       Service service, String methodName, Message request, Message.Builder responseBuilder)
1904       throws IOException { }
1905 
1906   /* ---- Protobuf AccessControlService implementation ---- */
1907 
1908   @Override
1909   public void grant(RpcController controller,
1910                     AccessControlProtos.GrantRequest request,
1911                     RpcCallback<AccessControlProtos.GrantResponse> done) {
1912     UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
1913     AccessControlProtos.GrantResponse response = null;
1914     try {
1915       // verify it's only running at .acl.
1916       if (aclRegion) {
1917         if (!initialized) {
1918           throw new CoprocessorException("AccessController not yet initialized");
1919         }
1920         if (LOG.isDebugEnabled()) {
1921           LOG.debug("Received request to grant access permission " + perm.toString());
1922         }
1923 
1924         switch(request.getUserPermission().getPermission().getType()) {
1925           case Global :
1926           case Table :
1927             requirePermission("grant", perm.getTableName(), perm.getFamily(),
1928                 perm.getQualifier(), Action.ADMIN);
1929             break;
1930           case Namespace :
1931             requireGlobalPermission("grant", Action.ADMIN, perm.getNamespace());
1932         }
1933 
1934         AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
1935         if (AUDITLOG.isTraceEnabled()) {
1936           // audit log should store permission changes in addition to auth results
1937           AUDITLOG.trace("Granted permission " + perm.toString());
1938         }
1939       } else {
1940         throw new CoprocessorException(AccessController.class, "This method "
1941             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
1942       }
1943       response = AccessControlProtos.GrantResponse.getDefaultInstance();
1944     } catch (IOException ioe) {
1945       // pass exception back up
1946       ResponseConverter.setControllerException(controller, ioe);
1947     }
1948     done.run(response);
1949   }
1950 
1951   @Override
1952   public void revoke(RpcController controller,
1953                      AccessControlProtos.RevokeRequest request,
1954                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
1955     UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
1956     AccessControlProtos.RevokeResponse response = null;
1957     try {
1958       // only allowed to be called on _acl_ region
1959       if (aclRegion) {
1960         if (!initialized) {
1961           throw new CoprocessorException("AccessController not yet initialized");
1962         }
1963         if (LOG.isDebugEnabled()) {
1964           LOG.debug("Received request to revoke access permission " + perm.toString());
1965         }
1966 
1967         switch(request.getUserPermission().getPermission().getType()) {
1968           case Global :
1969           case Table :
1970             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
1971                               perm.getQualifier(), Action.ADMIN);
1972             break;
1973           case Namespace :
1974             requireGlobalPermission("revoke", Action.ADMIN, perm.getNamespace());
1975         }
1976 
1977         AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
1978         if (AUDITLOG.isTraceEnabled()) {
1979           // audit log should record all permission changes
1980           AUDITLOG.trace("Revoked permission " + perm.toString());
1981         }
1982       } else {
1983         throw new CoprocessorException(AccessController.class, "This method "
1984             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
1985       }
1986       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
1987     } catch (IOException ioe) {
1988       // pass exception back up
1989       ResponseConverter.setControllerException(controller, ioe);
1990     }
1991     done.run(response);
1992   }
1993 
1994   @Override
1995   public void getUserPermissions(RpcController controller,
1996                                  AccessControlProtos.GetUserPermissionsRequest request,
1997                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
1998     AccessControlProtos.GetUserPermissionsResponse response = null;
1999     try {
2000       // only allowed to be called on _acl_ region
2001       if (aclRegion) {
2002         if (!initialized) {
2003           throw new CoprocessorException("AccessController not yet initialized");
2004         }
2005         List<UserPermission> perms = null;
2006         if(request.getType() == AccessControlProtos.Permission.Type.Table) {
2007           TableName table = null;
2008           if (request.hasTableName()) {
2009             table = ProtobufUtil.toTableName(request.getTableName());
2010           }
2011           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2012 
2013           perms = AccessControlLists.getUserTablePermissions(
2014               regionEnv.getConfiguration(), table);
2015         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2016           perms = AccessControlLists.getUserNamespacePermissions(
2017               regionEnv.getConfiguration(), request.getNamespaceName().toStringUtf8());
2018         } else {
2019           perms = AccessControlLists.getUserPermissions(
2020               regionEnv.getConfiguration(), null);
2021         }
2022         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2023       } else {
2024         throw new CoprocessorException(AccessController.class, "This method "
2025             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2026       }
2027     } catch (IOException ioe) {
2028       // pass exception back up
2029       ResponseConverter.setControllerException(controller, ioe);
2030     }
2031     done.run(response);
2032   }
2033 
2034   @Override
2035   public void checkPermissions(RpcController controller,
2036                                AccessControlProtos.CheckPermissionsRequest request,
2037                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2038     Permission[] permissions = new Permission[request.getPermissionCount()];
2039     for (int i=0; i < request.getPermissionCount(); i++) {
2040       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2041     }
2042     AccessControlProtos.CheckPermissionsResponse response = null;
2043     try {
2044       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2045       for (Permission permission : permissions) {
2046         if (permission instanceof TablePermission) {
2047           TablePermission tperm = (TablePermission) permission;
2048           for (Action action : permission.getActions()) {
2049             if (!tperm.getTableName().equals(tableName)) {
2050               throw new CoprocessorException(AccessController.class, String.format("This method "
2051                   + "can only execute at the table specified in TablePermission. " +
2052                   "Table of the region:%s , requested table:%s", tableName,
2053                   tperm.getTableName()));
2054             }
2055 
2056             Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2057             if (tperm.getFamily() != null) {
2058               if (tperm.getQualifier() != null) {
2059                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2060                 qualifiers.add(tperm.getQualifier());
2061                 familyMap.put(tperm.getFamily(), qualifiers);
2062               } else {
2063                 familyMap.put(tperm.getFamily(), null);
2064               }
2065             }
2066 
2067             requirePermission("checkPermissions", action, regionEnv, familyMap);
2068           }
2069 
2070         } else {
2071           for (Action action : permission.getActions()) {
2072             requirePermission("checkPermissions", action);
2073           }
2074         }
2075       }
2076       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2077     } catch (IOException ioe) {
2078       ResponseConverter.setControllerException(controller, ioe);
2079     }
2080     done.run(response);
2081   }
2082 
2083   @Override
2084   public Service getService() {
2085     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2086   }
2087 
2088   private HRegion getRegion(RegionCoprocessorEnvironment e) {
2089     return e.getRegion();
2090   }
2091 
2092   private TableName getTableName(RegionCoprocessorEnvironment e) {
2093     HRegion region = e.getRegion();
2094     if (region != null) {
2095       return getTableName(region);
2096     }
2097     return null;
2098   }
2099 
2100   private TableName getTableName(HRegion region) {
2101     HRegionInfo regionInfo = region.getRegionInfo();
2102     if (regionInfo != null) {
2103       return regionInfo.getTable();
2104     }
2105     return null;
2106   }
2107 
2108   @Override
2109   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2110       throws IOException {
2111     requirePermission("preClose", Action.ADMIN);
2112   }
2113 
2114   private void isSystemOrSuperUser(Configuration conf) throws IOException {
2115     User user = userProvider.getCurrent();
2116     if (user == null) {
2117       throw new IOException("Unable to obtain the current user, " +
2118         "authorization checks for internal operations will not work correctly!");
2119     }
2120     User activeUser = getActiveUser();
2121     if (!(superusers.contains(activeUser.getShortName()))) {
2122       throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null") +
2123         "is not system or super user.");
2124     }
2125   }
2126 
2127   @Override
2128   public void preStopRegionServer(
2129       ObserverContext<RegionServerCoprocessorEnvironment> env)
2130       throws IOException {
2131     requirePermission("preStopRegionServer", Action.ADMIN);
2132   }
2133 
2134   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2135       byte[] qualifier) {
2136     if (family == null) {
2137       return null;
2138     }
2139 
2140     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2141     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2142     return familyMap;
2143   }
2144 
2145   @Override
2146   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2147       List<TableName> tableNamesList,
2148       List<HTableDescriptor> descriptors) throws IOException {
2149     // If the list is empty, this is a request for all table descriptors and requires GLOBAL
2150     // ADMIN privs.
2151     if (tableNamesList == null || tableNamesList.isEmpty()) {
2152       requireGlobalPermission("getTableDescriptors", Action.ADMIN, null, null);
2153     }
2154     // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2155     // request can be granted.
2156     else {
2157       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2158       for (TableName tableName: tableNamesList) {
2159         // Do not deny if the table does not exist
2160         try {
2161           masterServices.checkTableModifiable(tableName);
2162         } catch (TableNotFoundException ex) {
2163           // Skip checks for a table that does not exist
2164           continue;
2165         } catch (TableNotDisabledException ex) {
2166           // We don't care about this
2167         }
2168         requirePermission("getTableDescriptors", tableName, null, null,
2169           Action.ADMIN, Action.CREATE);
2170       }
2171     }
2172   }
2173 
2174   @Override
2175   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA,
2176       HRegion regionB) throws IOException {
2177     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2178       Action.ADMIN);
2179   }
2180 
2181   @Override
2182   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, HRegion regionA,
2183       HRegion regionB, HRegion mergedRegion) throws IOException { }
2184 
2185   @Override
2186   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2187       HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException { }
2188 
2189   @Override
2190   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2191       HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException { }
2192 
2193   @Override
2194   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2195       HRegion regionA, HRegion regionB) throws IOException { }
2196 
2197   @Override
2198   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2199       HRegion regionA, HRegion regionB) throws IOException { }
2200 }