View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package org.apache.hadoop.hbase.security.access;
16  
17  import java.io.IOException;
18  import java.net.InetAddress;
19  import java.security.PrivilegedExceptionAction;
20  import java.util.Collection;
21  import java.util.Collections;
22  import java.util.HashMap;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  import java.util.Set;
28  import java.util.TreeMap;
29  import java.util.TreeSet;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.classification.InterfaceAudience;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Cell;
36  import org.apache.hadoop.hbase.CellScanner;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.CompoundConfiguration;
39  import org.apache.hadoop.hbase.CoprocessorEnvironment;
40  import org.apache.hadoop.hbase.DoNotRetryIOException;
41  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.KeyValue;
47  import org.apache.hadoop.hbase.KeyValue.Type;
48  import org.apache.hadoop.hbase.MetaTableAccessor;
49  import org.apache.hadoop.hbase.NamespaceDescriptor;
50  import org.apache.hadoop.hbase.ServerName;
51  import org.apache.hadoop.hbase.TableName;
52  import org.apache.hadoop.hbase.TableNotDisabledException;
53  import org.apache.hadoop.hbase.TableNotFoundException;
54  import org.apache.hadoop.hbase.Tag;
55  import org.apache.hadoop.hbase.client.Append;
56  import org.apache.hadoop.hbase.client.Delete;
57  import org.apache.hadoop.hbase.client.Durability;
58  import org.apache.hadoop.hbase.client.Get;
59  import org.apache.hadoop.hbase.client.Increment;
60  import org.apache.hadoop.hbase.client.Mutation;
61  import org.apache.hadoop.hbase.client.Put;
62  import org.apache.hadoop.hbase.client.Query;
63  import org.apache.hadoop.hbase.client.Result;
64  import org.apache.hadoop.hbase.client.Scan;
65  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
66  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
67  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
68  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
69  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
70  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
71  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
72  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
73  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
74  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
75  import org.apache.hadoop.hbase.filter.CompareFilter;
76  import org.apache.hadoop.hbase.filter.Filter;
77  import org.apache.hadoop.hbase.filter.FilterList;
78  import org.apache.hadoop.hbase.io.hfile.HFile;
79  import org.apache.hadoop.hbase.ipc.RequestContext;
80  import org.apache.hadoop.hbase.master.MasterServices;
81  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
82  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
83  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
84  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
85  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
86  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
87  import org.apache.hadoop.hbase.regionserver.HRegion;
88  import org.apache.hadoop.hbase.regionserver.InternalScanner;
89  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
90  import org.apache.hadoop.hbase.regionserver.RegionScanner;
91  import org.apache.hadoop.hbase.regionserver.ScanType;
92  import org.apache.hadoop.hbase.regionserver.Store;
93  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
94  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
95  import org.apache.hadoop.hbase.security.AccessDeniedException;
96  import org.apache.hadoop.hbase.security.User;
97  import org.apache.hadoop.hbase.security.UserProvider;
98  import org.apache.hadoop.hbase.security.access.Permission.Action;
99  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
100 import org.apache.hadoop.hbase.util.ByteRange;
101 import org.apache.hadoop.hbase.util.Bytes;
102 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
103 import org.apache.hadoop.hbase.util.Pair;
104 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
105 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
106 
107 import com.google.common.collect.ArrayListMultimap;
108 import com.google.common.collect.ImmutableSet;
109 import com.google.common.collect.ListMultimap;
110 import com.google.common.collect.Lists;
111 import com.google.common.collect.MapMaker;
112 import com.google.common.collect.Maps;
113 import com.google.common.collect.Sets;
114 import com.google.protobuf.Message;
115 import com.google.protobuf.RpcCallback;
116 import com.google.protobuf.RpcController;
117 import com.google.protobuf.Service;
118 
119 /**
120  * Provides basic authorization checks for data access and administrative
121  * operations.
122  *
123  * <p>
124  * {@code AccessController} performs authorization checks for HBase operations
125  * based on:
126  * <ul>
127  *   <li>the identity of the user performing the operation</li>
128  *   <li>the scope over which the operation is performed, in increasing
129  *   specificity: global, table, column family, or qualifier</li>
130  *   <li>the type of action being performed (as mapped to
131  *   {@link Permission.Action} values)</li>
132  * </ul>
133  * If the authorization check fails, an {@link AccessDeniedException}
134  * will be thrown for the operation.
135  * </p>
136  *
137  * <p>
138  * To perform authorization checks, {@code AccessController} relies on the
139  * RpcServerEngine being loaded to provide
140  * the user identities for remote requests.
141  * </p>
142  *
143  * <p>
144  * The access control lists used for authorization can be manipulated via the
145  * exposed {@link AccessControlService} Interface implementation, and the associated
146  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
147  * commands.
148  * </p>
149  */
150 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
151 public class AccessController extends BaseMasterAndRegionObserver
152     implements RegionServerObserver,
153       AccessControlService.Interface, CoprocessorService, EndpointObserver {
154 
155   public static final Log LOG = LogFactory.getLog(AccessController.class);
156 
157   private static final Log AUDITLOG =
158     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
159   private static final String CHECK_COVERING_PERM = "check_covering_perm";
160   private static final String TAG_CHECK_PASSED = "tag_check_passed";
161   private static final byte[] TRUE = Bytes.toBytes(true);
162 
163   TableAuthManager authManager = null;
164 
165   // flags if we are running on a region of the _acl_ table
166   boolean aclRegion = false;
167 
168   // defined only for Endpoint implementation, so it can have way to
169   // access region services.
170   private RegionCoprocessorEnvironment regionEnv;
171 
172   /** Mapping of scanner instances to the user who created them */
173   private Map<InternalScanner,String> scannerOwners =
174       new MapMaker().weakKeys().makeMap();
175 
176   private Map<TableName, List<UserPermission>> tableAcls;
177 
178   // Provider for mapping principal names to Users
179   private UserProvider userProvider;
180 
181   // The list of users with superuser authority
182   private List<String> superusers;
183 
184   // if we are able to support cell ACLs
185   boolean cellFeaturesEnabled;
186 
187   // if we should check EXEC permissions
188   boolean shouldCheckExecPermission;
189 
190   // if we should terminate access checks early as soon as table or CF grants
191   // allow access; pre-0.98 compatible behavior
192   boolean compatibleEarlyTermination;
193 
194   private volatile boolean initialized = false;
195 
196   // This boolean having relevance only in the Master.
197   private volatile boolean aclTabAvailable = false;
198 
199   public HRegion getRegion() {
200     return regionEnv != null ? regionEnv.getRegion() : null;
201   }
202 
203   public TableAuthManager getAuthManager() {
204     return authManager;
205   }
206 
207   void initialize(RegionCoprocessorEnvironment e) throws IOException {
208     final HRegion region = e.getRegion();
209     Configuration conf = e.getConfiguration();
210     Map<byte[], ListMultimap<String,TablePermission>> tables =
211         AccessControlLists.loadAll(region);
212     // For each table, write out the table's permissions to the respective
213     // znode for that table.
214     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
215       tables.entrySet()) {
216       byte[] entry = t.getKey();
217       ListMultimap<String,TablePermission> perms = t.getValue();
218       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
219       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
220     }
221     initialized = true;
222   }
223 
224   /**
225    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
226    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
227    * table updates.
228    */
229   void updateACL(RegionCoprocessorEnvironment e,
230       final Map<byte[], List<Cell>> familyMap) {
231     Set<byte[]> entries =
232         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
233     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
234       List<Cell> cells = f.getValue();
235       for (Cell cell: cells) {
236         if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
237             cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
238             AccessControlLists.ACL_LIST_FAMILY.length)) {
239           entries.add(CellUtil.cloneRow(cell));
240         }
241       }
242     }
243     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
244     Configuration conf = regionEnv.getConfiguration();
245     for (byte[] entry: entries) {
246       try {
247         ListMultimap<String,TablePermission> perms =
248           AccessControlLists.getPermissions(conf, entry);
249         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
250         zkw.writeToZookeeper(entry, serialized);
251       } catch (IOException ex) {
252         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
253             ex);
254       }
255     }
256   }
257 
258   /**
259    * Check the current user for authorization to perform a specific action
260    * against the given set of row data.
261    *
262    * <p>Note: Ordering of the authorization checks
263    * has been carefully optimized to short-circuit the most common requests
264    * and minimize the amount of processing required.</p>
265    *
266    * @param permRequest the action being requested
267    * @param e the coprocessor environment
268    * @param families the map of column families to qualifiers present in
269    * the request
270    * @return an authorization result
271    */
272   AuthResult permissionGranted(String request, User user, Action permRequest,
273       RegionCoprocessorEnvironment e,
274       Map<byte [], ? extends Collection<?>> families) {
275     HRegionInfo hri = e.getRegion().getRegionInfo();
276     TableName tableName = hri.getTable();
277 
278     // 1. All users need read access to hbase:meta table.
279     // this is a very common operation, so deal with it quickly.
280     if (hri.isMetaRegion()) {
281       if (permRequest == Action.READ) {
282         return AuthResult.allow(request, "All users allowed", user,
283           permRequest, tableName, families);
284       }
285     }
286 
287     if (user == null) {
288       return AuthResult.deny(request, "No user associated with request!", null,
289         permRequest, tableName, families);
290     }
291 
292     // Users with CREATE/ADMIN rights need to modify hbase:meta and _acl_ table
293     // e.g. When a new table is created a new entry in hbase:meta is added,
294     // so the user need to be allowed to write on it.
295     // e.g. When a table is removed an entry is removed from hbase:meta and _acl_
296     // and the user need to be allowed to write on both tables.
297     if (permRequest == Action.WRITE &&
298        (hri.isMetaRegion() ||
299         Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) &&
300        (authManager.authorize(user, Action.CREATE) ||
301         authManager.authorize(user, Action.ADMIN)))
302     {
303        return AuthResult.allow(request, "Table permission granted", user,
304         permRequest, tableName, families);
305     }
306 
307     // 2. check for the table-level, if successful we can short-circuit
308     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
309       return AuthResult.allow(request, "Table permission granted", user,
310         permRequest, tableName, families);
311     }
312 
313     // 3. check permissions against the requested families
314     if (families != null && families.size() > 0) {
315       // all families must pass
316       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
317         // a) check for family level access
318         if (authManager.authorize(user, tableName, family.getKey(),
319             permRequest)) {
320           continue;  // family-level permission overrides per-qualifier
321         }
322 
323         // b) qualifier level access can still succeed
324         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
325           if (family.getValue() instanceof Set) {
326             // for each qualifier of the family
327             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
328             for (byte[] qualifier : familySet) {
329               if (!authManager.authorize(user, tableName, family.getKey(),
330                                          qualifier, permRequest)) {
331                 return AuthResult.deny(request, "Failed qualifier check", user,
332                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
333               }
334             }
335           } else if (family.getValue() instanceof List) { // List<KeyValue>
336             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
337             for (KeyValue kv : kvList) {
338               if (!authManager.authorize(user, tableName, family.getKey(),
339                       kv.getQualifier(), permRequest)) {
340                 return AuthResult.deny(request, "Failed qualifier check", user,
341                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
342               }
343             }
344           }
345         } else {
346           // no qualifiers and family-level check already failed
347           return AuthResult.deny(request, "Failed family check", user, permRequest,
348               tableName, makeFamilyMap(family.getKey(), null));
349         }
350       }
351 
352       // all family checks passed
353       return AuthResult.allow(request, "All family checks passed", user, permRequest,
354           tableName, families);
355     }
356 
357     // 4. no families to check and table level access failed
358     return AuthResult.deny(request, "No families to check and table permission failed",
359         user, permRequest, tableName, families);
360   }
361 
362   /**
363    * Check the current user for authorization to perform a specific action
364    * against the given set of row data.
365    * @param opType the operation type
366    * @param user the user
367    * @param e the coprocessor environment
368    * @param families the map of column families to qualifiers present in
369    * the request
370    * @param actions the desired actions
371    * @return an authorization result
372    */
373   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
374       Map<byte [], ? extends Collection<?>> families, Action... actions) {
375     AuthResult result = null;
376     for (Action action: actions) {
377       result = permissionGranted(opType.toString(), user, action, e, families);
378       if (!result.isAllowed()) {
379         return result;
380       }
381     }
382     return result;
383   }
384 
385   private void logResult(AuthResult result) {
386     if (AUDITLOG.isTraceEnabled()) {
387       RequestContext ctx = RequestContext.get();
388       InetAddress remoteAddr = null;
389       if (ctx != null) {
390         remoteAddr = ctx.getRemoteAddress();
391       }
392       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
393           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
394           "; reason: " + result.getReason() +
395           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
396           "; request: " + result.getRequest() +
397           "; context: " + result.toContextString());
398     }
399   }
400 
401   /**
402    * Returns the active user to which authorization checks should be applied.
403    * If we are in the context of an RPC call, the remote user is used,
404    * otherwise the currently logged in user is used.
405    */
406   private User getActiveUser() throws IOException {
407     User user = RequestContext.getRequestUser();
408     if (!RequestContext.isInRequestContext()) {
409       // for non-rpc handling, fallback to system user
410       user = userProvider.getCurrent();
411     }
412     return user;
413   }
414 
415   /**
416    * Authorizes that the current user has any of the given permissions for the
417    * given table, column family and column qualifier.
418    * @param tableName Table requested
419    * @param family Column family requested
420    * @param qualifier Column qualifier requested
421    * @throws IOException if obtaining the current user fails
422    * @throws AccessDeniedException if user has no authorization
423    */
424   private void requirePermission(String request, TableName tableName, byte[] family, byte[] qualifier,
425       Action... permissions) throws IOException {
426     User user = getActiveUser();
427     AuthResult result = null;
428 
429     for (Action permission : permissions) {
430       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
431         result = AuthResult.allow(request, "Table permission granted", user,
432                                   permission, tableName, family, qualifier);
433         break;
434       } else {
435         // rest of the world
436         result = AuthResult.deny(request, "Insufficient permissions", user,
437                                  permission, tableName, family, qualifier);
438       }
439     }
440     logResult(result);
441     if (!result.isAllowed()) {
442       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
443     }
444   }
445 
446   /**
447    * Authorizes that the current user has global privileges for the given action.
448    * @param perm The action being requested
449    * @throws IOException if obtaining the current user fails
450    * @throws AccessDeniedException if authorization is denied
451    */
452   private void requirePermission(String request, Action perm) throws IOException {
453     requireGlobalPermission(request, perm, null, null);
454   }
455 
456   /**
457    * Authorizes that the current user has permission to perform the given
458    * action on the set of table column families.
459    * @param perm Action that is required
460    * @param env The current coprocessor environment
461    * @param families The map of column families-qualifiers.
462    * @throws AccessDeniedException if the authorization check failed
463    */
464   private void requirePermission(String request, Action perm,
465         RegionCoprocessorEnvironment env,
466         Map<byte[], ? extends Collection<?>> families)
467       throws IOException {
468     User user = getActiveUser();
469     AuthResult result = permissionGranted(request, user, perm, env, families);
470     logResult(result);
471 
472     if (!result.isAllowed()) {
473       throw new AccessDeniedException("Insufficient permissions (table=" +
474         env.getRegion().getTableDesc().getTableName()+
475         ((families != null && families.size() > 0) ? ", family: " +
476         result.toFamilyString() : "") + ", action=" +
477         perm.toString() + ")");
478     }
479   }
480 
481   /**
482    * Checks that the user has the given global permission. The generated
483    * audit log message will contain context information for the operation
484    * being authorized, based on the given parameters.
485    * @param perm Action being requested
486    * @param tableName Affected table name.
487    * @param familyMap Affected column families.
488    */
489   private void requireGlobalPermission(String request, Action perm, TableName tableName,
490       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
491     User user = getActiveUser();
492     if (authManager.authorize(user, perm) || (tableName != null &&
493         authManager.authorize(user, tableName.getNamespaceAsString(), perm))) {
494       logResult(AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap));
495     } else {
496       logResult(AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap));
497       throw new AccessDeniedException("Insufficient permissions for user '" +
498           (user != null ? user.getShortName() : "null") +"' (global, action=" +
499           perm.toString() + ")");
500     }
501   }
502 
503   /**
504    * Checks that the user has the given global permission. The generated
505    * audit log message will contain context information for the operation
506    * being authorized, based on the given parameters.
507    * @param perm Action being requested
508    * @param namespace
509    */
510   private void requireGlobalPermission(String request, Action perm,
511                                        String namespace) throws IOException {
512     User user = getActiveUser();
513     if (authManager.authorize(user, perm)) {
514       logResult(AuthResult.allow(request, "Global check allowed", user, perm, namespace));
515     } else {
516       logResult(AuthResult.deny(request, "Global check failed", user, perm, namespace));
517       throw new AccessDeniedException("Insufficient permissions for user '" +
518           (user != null ? user.getShortName() : "null") +"' (global, action=" +
519           perm.toString() + ")");
520     }
521   }
522 
523   /**
524    * Returns <code>true</code> if the current user is allowed the given action
525    * over at least one of the column qualifiers in the given column families.
526    */
527   private boolean hasFamilyQualifierPermission(User user,
528       Action perm,
529       RegionCoprocessorEnvironment env,
530       Map<byte[], ? extends Collection<byte[]>> familyMap)
531     throws IOException {
532     HRegionInfo hri = env.getRegion().getRegionInfo();
533     TableName tableName = hri.getTable();
534 
535     if (user == null) {
536       return false;
537     }
538 
539     if (familyMap != null && familyMap.size() > 0) {
540       // at least one family must be allowed
541       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
542           familyMap.entrySet()) {
543         if (family.getValue() != null && !family.getValue().isEmpty()) {
544           for (byte[] qualifier : family.getValue()) {
545             if (authManager.matchPermission(user, tableName,
546                 family.getKey(), qualifier, perm)) {
547               return true;
548             }
549           }
550         } else {
551           if (authManager.matchPermission(user, tableName, family.getKey(),
552               perm)) {
553             return true;
554           }
555         }
556       }
557     } else if (LOG.isDebugEnabled()) {
558       LOG.debug("Empty family map passed for permission check");
559     }
560 
561     return false;
562   }
563 
564   private enum OpType {
565     GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
566     GET("get"),
567     EXISTS("exists"),
568     SCAN("scan"),
569     PUT("put"),
570     DELETE("delete"),
571     CHECK_AND_PUT("checkAndPut"),
572     CHECK_AND_DELETE("checkAndDelete"),
573     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
574     APPEND("append"),
575     INCREMENT("increment");
576 
577     private String type;
578 
579     private OpType(String type) {
580       this.type = type;
581     }
582 
583     @Override
584     public String toString() {
585       return type;
586     }
587   }
588 
589   /**
590    * Determine if cell ACLs covered by the operation grant access. This is expensive.
591    * @return false if cell ACLs failed to grant access, true otherwise
592    * @throws IOException
593    */
594   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
595       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
596       throws IOException {
597     if (!cellFeaturesEnabled) {
598       return false;
599     }
600     long cellGrants = 0;
601     User user = getActiveUser();
602     long latestCellTs = 0;
603     Get get = new Get(row);
604     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
605     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
606     // version. We have to get every cell version and check its TS against the TS asked for in
607     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
608     // consider only one such passing cell. In case of Delete we have to consider all the cell
609     // versions under this passing version. When Delete Mutation contains columns which are a
610     // version delete just consider only one version for those column cells.
611     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
612     if (considerCellTs) {
613       get.setMaxVersions();
614     } else {
615       get.setMaxVersions(1);
616     }
617     boolean diffCellTsFromOpTs = false;
618     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
619       byte[] col = entry.getKey();
620       // TODO: HBASE-7114 could possibly unify the collection type in family
621       // maps so we would not need to do this
622       if (entry.getValue() instanceof Set) {
623         Set<byte[]> set = (Set<byte[]>)entry.getValue();
624         if (set == null || set.isEmpty()) {
625           get.addFamily(col);
626         } else {
627           for (byte[] qual: set) {
628             get.addColumn(col, qual);
629           }
630         }
631       } else if (entry.getValue() instanceof List) {
632         List<Cell> list = (List<Cell>)entry.getValue();
633         if (list == null || list.isEmpty()) {
634           get.addFamily(col);
635         } else {
636           // In case of family delete, a Cell will be added into the list with Qualifier as null.
637           for (Cell cell : list) {
638             if (cell.getQualifierLength() == 0
639                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
640                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
641               get.addFamily(col);
642             } else {
643               get.addColumn(col, CellUtil.cloneQualifier(cell));
644             }
645             if (considerCellTs) {
646               long cellTs = cell.getTimestamp();
647               latestCellTs = Math.max(latestCellTs, cellTs);
648               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
649             }
650           }
651         }
652       } else {
653         throw new RuntimeException("Unhandled collection type " +
654           entry.getValue().getClass().getName());
655       }
656     }
657     // We want to avoid looking into the future. So, if the cells of the
658     // operation specify a timestamp, or the operation itself specifies a
659     // timestamp, then we use the maximum ts found. Otherwise, we bound
660     // the Get to the current server time. We add 1 to the timerange since
661     // the upper bound of a timerange is exclusive yet we need to examine
662     // any cells found there inclusively.
663     long latestTs = Math.max(opTs, latestCellTs);
664     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
665       latestTs = EnvironmentEdgeManager.currentTime();
666     }
667     get.setTimeRange(0, latestTs + 1);
668     // In case of Put operation we set to read all versions. This was done to consider the case
669     // where columns are added with TS other than the Mutation TS. But normally this wont be the
670     // case with Put. There no need to get all versions but get latest version only.
671     if (!diffCellTsFromOpTs && request == OpType.PUT) {
672       get.setMaxVersions(1);
673     }
674     if (LOG.isTraceEnabled()) {
675       LOG.trace("Scanning for cells with " + get);
676     }
677     // This Map is identical to familyMap. The key is a BR rather than byte[].
678     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
679     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
680     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
681     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
682       if (entry.getValue() instanceof List) {
683         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
684       }
685     }
686     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
687     List<Cell> cells = Lists.newArrayList();
688     Cell prevCell = null;
689     ByteRange curFam = new SimpleMutableByteRange();
690     boolean curColAllVersions = (request == OpType.DELETE);
691     long curColCheckTs = opTs;
692     boolean foundColumn = false;
693     try {
694       boolean more = false;
695       do {
696         cells.clear();
697         // scan with limit as 1 to hold down memory use on wide rows
698         more = scanner.next(cells, 1);
699         for (Cell cell: cells) {
700           if (LOG.isTraceEnabled()) {
701             LOG.trace("Found cell " + cell);
702           }
703           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
704           if (colChange) foundColumn = false;
705           prevCell = cell;
706           if (!curColAllVersions && foundColumn) {
707             continue;
708           }
709           if (colChange && considerCellTs) {
710             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
711             List<Cell> cols = familyMap1.get(curFam);
712             for (Cell col : cols) {
713               // null/empty qualifier is used to denote a Family delete. The TS and delete type
714               // associated with this is applicable for all columns within the family. That is
715               // why the below (col.getQualifierLength() == 0) check.
716               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
717                   || CellUtil.matchingQualifier(cell, col)) {
718                 byte type = col.getTypeByte();
719                 if (considerCellTs) {
720                   curColCheckTs = col.getTimestamp();
721                 }
722                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
723                 // a version delete for a column no need to check all the covering cells within
724                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
725                 // One version delete types are Delete/DeleteFamilyVersion
726                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
727                     || (KeyValue.Type.DeleteFamily.getCode() == type);
728                 break;
729               }
730             }
731           }
732           if (cell.getTimestamp() > curColCheckTs) {
733             // Just ignore this cell. This is not a covering cell.
734             continue;
735           }
736           foundColumn = true;
737           for (Action action: actions) {
738             // Are there permissions for this user for the cell?
739             if (!authManager.authorize(user, getTableName(e), cell, action)) {
740               // We can stop if the cell ACL denies access
741               return false;
742             }
743           }
744           cellGrants++;
745         }
746       } while (more);
747     } catch (AccessDeniedException ex) {
748       throw ex;
749     } catch (IOException ex) {
750       LOG.error("Exception while getting cells to calculate covering permission", ex);
751     } finally {
752       scanner.close();
753     }
754     // We should not authorize unless we have found one or more cell ACLs that
755     // grant access. This code is used to check for additional permissions
756     // after no table or CF grants are found.
757     return cellGrants > 0;
758   }
759 
760   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
761     // Iterate over the entries in the familyMap, replacing the cells therein
762     // with new cells including the ACL data
763     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
764       List<Cell> newCells = Lists.newArrayList();
765       for (Cell cell: e.getValue()) {
766         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
767         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
768         if (cell.getTagsLength() > 0) {
769           Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
770             cell.getTagsOffset(), cell.getTagsLength());
771           while (tagIterator.hasNext()) {
772             tags.add(tagIterator.next());
773           }
774         }
775         newCells.add(
776           new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
777             cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
778             cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
779             cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
780             cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
781             tags));
782       }
783       // This is supposed to be safe, won't CME
784       e.setValue(newCells);
785     }
786   }
787 
788   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
789   // type is reserved and should not be explicitly set by user.
790   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
791     // Superusers are allowed to store cells unconditionally.
792     if (superusers.contains(user.getShortName())) {
793       return;
794     }
795     // We already checked (prePut vs preBatchMutation)
796     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
797       return;
798     }
799     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
800       Cell cell = cellScanner.current();
801       if (cell.getTagsLength() > 0) {
802         Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
803           cell.getTagsLength());
804         while (tagsItr.hasNext()) {
805           if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
806             throw new AccessDeniedException("Mutation contains cell with reserved type tag");
807           }
808         }
809       }
810     }
811     m.setAttribute(TAG_CHECK_PASSED, TRUE);
812   }
813 
814   /* ---- MasterObserver implementation ---- */
815 
816   public void start(CoprocessorEnvironment env) throws IOException {
817     CompoundConfiguration conf = new CompoundConfiguration();
818     conf.add(env.getConfiguration());
819 
820     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
821       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
822 
823     cellFeaturesEnabled = HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS;
824     if (!cellFeaturesEnabled) {
825       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
826           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
827           + " accordingly.");
828     }
829 
830     ZooKeeperWatcher zk = null;
831     if (env instanceof MasterCoprocessorEnvironment) {
832       // if running on HMaster
833       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
834       zk = mEnv.getMasterServices().getZooKeeper();
835     } else if (env instanceof RegionServerCoprocessorEnvironment) {
836       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
837       zk = rsEnv.getRegionServerServices().getZooKeeper();
838     } else if (env instanceof RegionCoprocessorEnvironment) {
839       // if running at region
840       regionEnv = (RegionCoprocessorEnvironment) env;
841       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
842       zk = regionEnv.getRegionServerServices().getZooKeeper();
843       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
844         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
845     }
846 
847     // set the user-provider.
848     this.userProvider = UserProvider.instantiate(env.getConfiguration());
849 
850     // set up the list of users with superuser privilege
851     User user = userProvider.getCurrent();
852     superusers = Lists.asList(user.getShortName(),
853       conf.getStrings(AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
854 
855     // If zk is null or IOException while obtaining auth manager,
856     // throw RuntimeException so that the coprocessor is unloaded.
857     if (zk != null) {
858       try {
859         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
860       } catch (IOException ioe) {
861         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
862       }
863     } else {
864       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
865     }
866 
867     tableAcls = new MapMaker().weakValues().makeMap();
868   }
869 
870   public void stop(CoprocessorEnvironment env) {
871 
872   }
873 
874   @Override
875   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
876       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
877     Set<byte[]> families = desc.getFamiliesKeys();
878     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
879     for (byte[] family: families) {
880       familyMap.put(family, null);
881     }
882     requireGlobalPermission("createTable", Action.CREATE, desc.getTableName(), familyMap);
883   }
884 
885   @Override
886   public void postCreateTableHandler(final ObserverContext<MasterCoprocessorEnvironment> c,
887       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
888     // When AC is used, it should be configured as the 1st CP.
889     // In Master, the table operations like create, are handled by a Thread pool but the max size
890     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
891     // sequentially only.
892     // Related code in HMaster#startServiceThreads
893     // {code}
894     //   // We depend on there being only one instance of this executor running
895     //   // at a time. To do concurrency, would need fencing of enable/disable of
896     //   // tables.
897     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
898     // {code}
899     // In future if we change this pool to have more threads, then there is a chance for thread,
900     // creating acl table, getting delayed and by that time another table creation got over and
901     // this hook is getting called. In such a case, we will need a wait logic here which will
902     // wait till the acl table is created.
903     if (AccessControlLists.isAclTable(desc)) {
904       this.aclTabAvailable = true;
905     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
906       if (!aclTabAvailable) {
907         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
908             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
909             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
910       } else {
911         String owner = desc.getOwnerString();
912         // default the table owner to current user, if not specified.
913         if (owner == null)
914           owner = getActiveUser().getShortName();
915         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
916             desc.getTableName(), null, Action.values());
917         // switch to the real hbase master user for doing the RPC on the ACL table
918         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
919           @Override
920           public Void run() throws Exception {
921             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
922                 userperm);
923             return null;
924           }
925         });
926       }
927     }
928   }
929 
930   @Override
931   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
932       throws IOException {
933     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
934   }
935 
936   @Override
937   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
938       TableName tableName) throws IOException {
939     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(), tableName);
940   }
941 
942   @Override
943   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
944       throws IOException {
945     requirePermission("truncateTable", tableName, null, null, Action.ADMIN);
946     List<UserPermission> acls = AccessControlLists.getUserTablePermissions(c.getEnvironment()
947         .getConfiguration(), tableName);
948     if (acls != null) {
949       tableAcls.put(tableName, acls);
950     }
951   }
952 
953   @Override
954   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
955       TableName tableName) throws IOException {
956     List<UserPermission> perms = tableAcls.get(tableName);
957     if (perms != null) {
958       for (UserPermission perm : perms) {
959         AccessControlLists.addUserPermission(ctx.getEnvironment().getConfiguration(), perm);
960       }
961     }
962     tableAcls.remove(tableName);
963   }
964 
965   @Override
966   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
967       HTableDescriptor htd) throws IOException {
968     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
969   }
970 
971   @Override
972   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
973       TableName tableName, HTableDescriptor htd) throws IOException {
974     String owner = htd.getOwnerString();
975     // default the table owner to current user, if not specified.
976     if (owner == null) owner = getActiveUser().getShortName();
977     UserPermission userperm = new UserPermission(Bytes.toBytes(owner), htd.getTableName(), null,
978         Action.values());
979     AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(), userperm);
980   }
981 
982   @Override
983   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
984       HColumnDescriptor column) throws IOException {
985     requirePermission("addColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
986   }
987 
988   @Override
989   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
990       HColumnDescriptor descriptor) throws IOException {
991     requirePermission("modifyColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
992   }
993 
994   @Override
995   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
996       byte[] col) throws IOException {
997     requirePermission("deleteColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
998   }
999 
1000   @Override
1001   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
1002       TableName tableName, byte[] col) throws IOException {
1003     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(), tableName, col);
1004   }
1005 
1006   @Override
1007   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1008       throws IOException {
1009     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1010   }
1011 
1012   @Override
1013   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1014       throws IOException {
1015     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1016       throw new AccessDeniedException("Not allowed to disable "
1017           + AccessControlLists.ACL_TABLE_NAME + " table.");
1018     }
1019     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1020   }
1021 
1022   @Override
1023   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1024       ServerName srcServer, ServerName destServer) throws IOException {
1025     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1026   }
1027 
1028   @Override
1029   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1030       throws IOException {
1031     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1032   }
1033 
1034   @Override
1035   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1036       boolean force) throws IOException {
1037     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1038   }
1039 
1040   @Override
1041   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1042       HRegionInfo regionInfo) throws IOException {
1043     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1044   }
1045 
1046   @Override
1047   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1048       throws IOException {
1049     requirePermission("balance", Action.ADMIN);
1050   }
1051 
1052   @Override
1053   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1054       boolean newValue) throws IOException {
1055     requirePermission("balanceSwitch", Action.ADMIN);
1056     return newValue;
1057   }
1058 
1059   @Override
1060   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1061       throws IOException {
1062     requirePermission("shutdown", Action.ADMIN);
1063   }
1064 
1065   @Override
1066   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1067       throws IOException {
1068     requirePermission("stopMaster", Action.ADMIN);
1069   }
1070 
1071   @Override
1072   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1073       throws IOException {
1074     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1075       .getShortCircuitConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1076       // initialize the ACL storage table
1077       AccessControlLists.init(ctx.getEnvironment().getMasterServices());
1078     } else {
1079       aclTabAvailable = true;
1080     }
1081   }
1082 
1083   @Override
1084   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1085       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1086       throws IOException {
1087     requirePermission("snapshot", hTableDescriptor.getTableName(), null, null,
1088       Permission.Action.ADMIN);
1089   }
1090 
1091   @Override
1092   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1093       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1094       throws IOException {
1095     requirePermission("clone", Action.ADMIN);
1096   }
1097 
1098   @Override
1099   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1100       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1101       throws IOException {
1102     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1103       requirePermission("restoreSnapshot", hTableDescriptor.getTableName(), null, null,
1104         Permission.Action.ADMIN);
1105     } else {
1106       requirePermission("restore", Action.ADMIN);
1107     }
1108   }
1109 
1110   @Override
1111   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1112       final SnapshotDescription snapshot) throws IOException {
1113     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1114       // Snapshot owner is allowed to delete the snapshot
1115     } else {
1116       requirePermission("deleteSnapshot", Action.ADMIN);
1117     }
1118   }
1119 
1120   @Override
1121   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1122       NamespaceDescriptor ns) throws IOException {
1123     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1124   }
1125 
1126   @Override
1127   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1128       throws IOException {
1129     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1130   }
1131 
1132   @Override
1133   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1134                                   String namespace) throws IOException {
1135     AccessControlLists.removeNamespacePermissions(ctx.getEnvironment().getConfiguration(),
1136         namespace);
1137     LOG.info(namespace + "entry deleted in "+AccessControlLists.ACL_TABLE_NAME+" table.");
1138   }
1139 
1140   @Override
1141   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1142       NamespaceDescriptor ns) throws IOException {
1143     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1144   }
1145 
1146   @Override
1147   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1148       final TableName tableName) throws IOException {
1149     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1150   }
1151 
1152   /* ---- RegionObserver implementation ---- */
1153 
1154   @Override
1155   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1156       throws IOException {
1157     RegionCoprocessorEnvironment env = e.getEnvironment();
1158     final HRegion region = env.getRegion();
1159     if (region == null) {
1160       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1161     } else {
1162       HRegionInfo regionInfo = region.getRegionInfo();
1163       if (regionInfo.getTable().isSystemTable()) {
1164         isSystemOrSuperUser(regionEnv.getConfiguration());
1165       } else {
1166         requirePermission("preOpen", Action.ADMIN);
1167       }
1168     }
1169   }
1170 
1171   @Override
1172   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1173     RegionCoprocessorEnvironment env = c.getEnvironment();
1174     final HRegion region = env.getRegion();
1175     if (region == null) {
1176       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1177       return;
1178     }
1179     if (AccessControlLists.isAclRegion(region)) {
1180       aclRegion = true;
1181       // When this region is under recovering state, initialize will be handled by postLogReplay
1182       if (!region.isRecovering()) {
1183         try {
1184           initialize(env);
1185         } catch (IOException ex) {
1186           // if we can't obtain permissions, it's better to fail
1187           // than perform checks incorrectly
1188           throw new RuntimeException("Failed to initialize permissions cache", ex);
1189         }
1190       }
1191     } else {
1192       initialized = true;
1193     }
1194   }
1195 
1196   @Override
1197   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1198     if (aclRegion) {
1199       try {
1200         initialize(c.getEnvironment());
1201       } catch (IOException ex) {
1202         // if we can't obtain permissions, it's better to fail
1203         // than perform checks incorrectly
1204         throw new RuntimeException("Failed to initialize permissions cache", ex);
1205       }
1206     }
1207   }
1208 
1209   @Override
1210   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1211     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1212         Action.CREATE);
1213   }
1214 
1215   @Override
1216   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1217     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1218   }
1219 
1220   @Override
1221   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1222       byte[] splitRow) throws IOException {
1223     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1224   }
1225 
1226   @Override
1227   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1228       final Store store, final InternalScanner scanner, final ScanType scanType)
1229           throws IOException {
1230     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1231         Action.CREATE);
1232     return scanner;
1233   }
1234 
1235   @Override
1236   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1237       final byte [] row, final byte [] family, final Result result)
1238       throws IOException {
1239     assert family != null;
1240     RegionCoprocessorEnvironment env = c.getEnvironment();
1241     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, null);
1242     User user = getActiveUser();
1243     AuthResult authResult = permissionGranted(OpType.GET_CLOSEST_ROW_BEFORE, user, env, families,
1244       Action.READ);
1245     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1246       authResult.setAllowed(checkCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, env, row,
1247         families, HConstants.LATEST_TIMESTAMP, Action.READ));
1248       authResult.setReason("Covering cell set");
1249     }
1250     logResult(authResult);
1251     if (!authResult.isAllowed()) {
1252       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1253     }
1254   }
1255 
1256   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1257       final Query query, OpType opType) throws IOException {
1258     Filter filter = query.getFilter();
1259     // Don't wrap an AccessControlFilter
1260     if (filter != null && filter instanceof AccessControlFilter) {
1261       return;
1262     }
1263     User user = getActiveUser();
1264     RegionCoprocessorEnvironment env = c.getEnvironment();
1265     Map<byte[],? extends Collection<byte[]>> families = null;
1266     switch (opType) {
1267     case GET:
1268     case EXISTS:
1269       families = ((Get)query).getFamilyMap();
1270       break;
1271     case SCAN:
1272       families = ((Scan)query).getFamilyMap();
1273       break;
1274     default:
1275       throw new RuntimeException("Unhandled operation " + opType);
1276     }
1277     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1278     HRegion region = getRegion(env);
1279     TableName table = getTableName(region);
1280     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1281     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1282       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1283     }
1284     if (!authResult.isAllowed()) {
1285       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1286         // Old behavior: Scan with only qualifier checks if we have partial
1287         // permission. Backwards compatible behavior is to throw an
1288         // AccessDeniedException immediately if there are no grants for table
1289         // or CF or CF+qual. Only proceed with an injected filter if there are
1290         // grants for qualifiers. Otherwise we will fall through below and log
1291         // the result and throw an ADE. We may end up checking qualifier
1292         // grants three times (permissionGranted above, here, and in the
1293         // filter) but that's the price of backwards compatibility.
1294         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1295           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1296             AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1297             cfVsMaxVersions);
1298           // wrap any existing filter
1299           if (filter != null) {
1300             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1301               Lists.newArrayList(ourFilter, filter));
1302           }
1303           authResult.setAllowed(true);;
1304           authResult.setReason("Access allowed with filter");
1305           switch (opType) {
1306           case GET:
1307           case EXISTS:
1308             ((Get)query).setFilter(ourFilter);
1309             break;
1310           case SCAN:
1311             ((Scan)query).setFilter(ourFilter);
1312             break;
1313           default:
1314             throw new RuntimeException("Unhandled operation " + opType);
1315           }
1316         }
1317       } else {
1318         // New behavior: Any access we might be granted is more fine-grained
1319         // than whole table or CF. Simply inject a filter and return what is
1320         // allowed. We will not throw an AccessDeniedException. This is a
1321         // behavioral change since 0.96.
1322         Filter ourFilter = new AccessControlFilter(authManager, user, table,
1323           AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1324         // wrap any existing filter
1325         if (filter != null) {
1326           ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1327             Lists.newArrayList(ourFilter, filter));
1328         }
1329         authResult.setAllowed(true);;
1330         authResult.setReason("Access allowed with filter");
1331         switch (opType) {
1332         case GET:
1333         case EXISTS:
1334           ((Get)query).setFilter(ourFilter);
1335           break;
1336         case SCAN:
1337           ((Scan)query).setFilter(ourFilter);
1338           break;
1339         default:
1340           throw new RuntimeException("Unhandled operation " + opType);
1341         }
1342       }
1343     }
1344 
1345     logResult(authResult);
1346     if (!authResult.isAllowed()) {
1347       throw new AccessDeniedException("Insufficient permissions (table=" + table +
1348         ", action=READ)");
1349     }
1350   }
1351 
1352   @Override
1353   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1354       final Get get, final List<Cell> result) throws IOException {
1355     internalPreRead(c, get, OpType.GET);
1356   }
1357 
1358   @Override
1359   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1360       final Get get, final boolean exists) throws IOException {
1361     internalPreRead(c, get, OpType.EXISTS);
1362     return exists;
1363   }
1364 
1365   @Override
1366   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1367       final Put put, final WALEdit edit, final Durability durability)
1368       throws IOException {
1369     // Require WRITE permission to the table, CF, or top visible value, if any.
1370     // NOTE: We don't need to check the permissions for any earlier Puts
1371     // because we treat the ACLs in each Put as timestamped like any other
1372     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1373     // change the ACL of any previous Put. This allows simple evolution of
1374     // security policy over time without requiring expensive updates.
1375     User user = getActiveUser();
1376     checkForReservedTagPresence(user, put);
1377     RegionCoprocessorEnvironment env = c.getEnvironment();
1378     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1379     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1380     logResult(authResult);
1381     if (!authResult.isAllowed()) {
1382       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1383         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1384       } else {
1385         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1386       }
1387     }
1388     // Add cell ACLs from the operation to the cells themselves
1389     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1390     if (bytes != null) {
1391       if (cellFeaturesEnabled) {
1392         addCellPermissions(bytes, put.getFamilyCellMap());
1393       } else {
1394         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1395       }
1396     }
1397   }
1398 
1399   @Override
1400   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1401       final Put put, final WALEdit edit, final Durability durability) {
1402     if (aclRegion) {
1403       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1404     }
1405   }
1406 
1407   @Override
1408   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1409       final Delete delete, final WALEdit edit, final Durability durability)
1410       throws IOException {
1411     // An ACL on a delete is useless, we shouldn't allow it
1412     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1413       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1414     }
1415     // Require WRITE permissions on all cells covered by the delete. Unlike
1416     // for Puts we need to check all visible prior versions, because a major
1417     // compaction could remove them. If the user doesn't have permission to
1418     // overwrite any of the visible versions ('visible' defined as not covered
1419     // by a tombstone already) then we have to disallow this operation.
1420     RegionCoprocessorEnvironment env = c.getEnvironment();
1421     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1422     User user = getActiveUser();
1423     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1424     logResult(authResult);
1425     if (!authResult.isAllowed()) {
1426       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1427         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1428       } else {
1429         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1430       }
1431     }
1432   }
1433 
1434   @Override
1435   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1436       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1437     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1438       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1439       for (int i = 0; i < miniBatchOp.size(); i++) {
1440         Mutation m = miniBatchOp.getOperation(i);
1441         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1442           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1443           // perm check
1444           OpType opType;
1445           if (m instanceof Put) {
1446             checkForReservedTagPresence(getActiveUser(), m);
1447             opType = OpType.PUT;
1448           } else {
1449             opType = OpType.DELETE;
1450           }
1451           AuthResult authResult = null;
1452           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(), m.getFamilyCellMap(),
1453               m.getTimeStamp(), Action.WRITE)) {
1454             authResult = AuthResult.allow(opType.toString(), "Covering cell set", getActiveUser(),
1455                 Action.WRITE, table, m.getFamilyCellMap());
1456           } else {
1457             authResult = AuthResult.deny(opType.toString(), "Covering cell set", getActiveUser(),
1458                 Action.WRITE, table, m.getFamilyCellMap());
1459           }
1460           logResult(authResult);
1461           if (!authResult.isAllowed()) {
1462             throw new AccessDeniedException("Insufficient permissions "
1463                 + authResult.toContextString());
1464           }
1465         }
1466       }
1467     }
1468   }
1469 
1470   @Override
1471   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1472       final Delete delete, final WALEdit edit, final Durability durability)
1473       throws IOException {
1474     if (aclRegion) {
1475       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1476     }
1477   }
1478 
1479   @Override
1480   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1481       final byte [] row, final byte [] family, final byte [] qualifier,
1482       final CompareFilter.CompareOp compareOp,
1483       final ByteArrayComparable comparator, final Put put,
1484       final boolean result) throws IOException {
1485     // Require READ and WRITE permissions on the table, CF, and KV to update
1486     User user = getActiveUser();
1487     checkForReservedTagPresence(user, put);
1488     RegionCoprocessorEnvironment env = c.getEnvironment();
1489     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1490     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1491       Action.READ, Action.WRITE);
1492     logResult(authResult);
1493     if (!authResult.isAllowed()) {
1494       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1495         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1496       } else {
1497         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1498       }
1499     }
1500     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1501     if (bytes != null) {
1502       if (cellFeaturesEnabled) {
1503         addCellPermissions(bytes, put.getFamilyCellMap());
1504       } else {
1505         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1506       }
1507     }
1508     return result;
1509   }
1510 
1511   @Override
1512   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1513       final byte[] row, final byte[] family, final byte[] qualifier,
1514       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1515       final boolean result) throws IOException {
1516     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1517       // We had failure with table, cf and q perm checks and now giving a chance for cell
1518       // perm check
1519       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1520       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1521       AuthResult authResult = null;
1522       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1523           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1524         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1525             getActiveUser(), Action.READ, table, families);
1526       } else {
1527         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1528             getActiveUser(), Action.READ, table, families);
1529       }
1530       logResult(authResult);
1531       if (!authResult.isAllowed()) {
1532         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1533       }
1534     }
1535     return result;
1536   }
1537 
1538   @Override
1539   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1540       final byte [] row, final byte [] family, final byte [] qualifier,
1541       final CompareFilter.CompareOp compareOp,
1542       final ByteArrayComparable comparator, final Delete delete,
1543       final boolean result) throws IOException {
1544     // An ACL on a delete is useless, we shouldn't allow it
1545     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1546       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1547           delete.toString());
1548     }
1549     // Require READ and WRITE permissions on the table, CF, and the KV covered
1550     // by the delete
1551     RegionCoprocessorEnvironment env = c.getEnvironment();
1552     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1553     User user = getActiveUser();
1554     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1555       Action.READ, Action.WRITE);
1556     logResult(authResult);
1557     if (!authResult.isAllowed()) {
1558       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1559         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1560       } else {
1561         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1562       }
1563     }
1564     return result;
1565   }
1566 
1567   @Override
1568   public boolean preCheckAndDeleteAfterRowLock(
1569       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1570       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1571       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1572       throws IOException {
1573     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1574       // We had failure with table, cf and q perm checks and now giving a chance for cell
1575       // perm check
1576       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1577       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1578       AuthResult authResult = null;
1579       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1580           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1581         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1582             getActiveUser(), Action.READ, table, families);
1583       } else {
1584         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1585             getActiveUser(), Action.READ, table, families);
1586       }
1587       logResult(authResult);
1588       if (!authResult.isAllowed()) {
1589         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1590       }
1591     }
1592     return result;
1593   }
1594 
1595   @Override
1596   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1597       final byte [] row, final byte [] family, final byte [] qualifier,
1598       final long amount, final boolean writeToWAL)
1599       throws IOException {
1600     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1601     // incremented value
1602     RegionCoprocessorEnvironment env = c.getEnvironment();
1603     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1604     User user = getActiveUser();
1605     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1606       Action.WRITE);
1607     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1608       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1609         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1610       authResult.setReason("Covering cell set");
1611     }
1612     logResult(authResult);
1613     if (!authResult.isAllowed()) {
1614       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1615     }
1616     return -1;
1617   }
1618 
1619   @Override
1620   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1621       throws IOException {
1622     // Require WRITE permission to the table, CF, and the KV to be appended
1623     User user = getActiveUser();
1624     checkForReservedTagPresence(user, append);
1625     RegionCoprocessorEnvironment env = c.getEnvironment();
1626     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1627     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1628     logResult(authResult);
1629     if (!authResult.isAllowed()) {
1630       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1631         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1632       } else {
1633         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1634       }
1635     }
1636     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1637     if (bytes != null) {
1638       if (cellFeaturesEnabled) {
1639         addCellPermissions(bytes, append.getFamilyCellMap());
1640       } else {
1641         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1642       }
1643     }
1644     return null;
1645   }
1646 
1647   @Override
1648   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1649       final Append append) throws IOException {
1650     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1651       // We had failure with table, cf and q perm checks and now giving a chance for cell
1652       // perm check
1653       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1654       AuthResult authResult = null;
1655       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1656           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1657         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1658             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1659       } else {
1660         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1661             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1662       }
1663       logResult(authResult);
1664       if (!authResult.isAllowed()) {
1665         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1666       }
1667     }
1668     return null;
1669   }
1670 
1671   @Override
1672   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1673       final Increment increment)
1674       throws IOException {
1675     // Require WRITE permission to the table, CF, and the KV to be replaced by
1676     // the incremented value
1677     User user = getActiveUser();
1678     checkForReservedTagPresence(user, increment);
1679     RegionCoprocessorEnvironment env = c.getEnvironment();
1680     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1681     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1682       Action.WRITE);
1683     logResult(authResult);
1684     if (!authResult.isAllowed()) {
1685       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1686         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1687       } else {
1688         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1689       }
1690     }
1691     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1692     if (bytes != null) {
1693       if (cellFeaturesEnabled) {
1694         addCellPermissions(bytes, increment.getFamilyCellMap());
1695       } else {
1696         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1697       }
1698     }
1699     return null;
1700   }
1701 
1702   @Override
1703   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1704       final Increment increment) throws IOException {
1705     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1706       // We had failure with table, cf and q perm checks and now giving a chance for cell
1707       // perm check
1708       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1709       AuthResult authResult = null;
1710       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1711           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1712         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1713             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1714       } else {
1715         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1716             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1717       }
1718       logResult(authResult);
1719       if (!authResult.isAllowed()) {
1720         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1721       }
1722     }
1723     return null;
1724   }
1725 
1726   @Override
1727   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1728       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1729     // If the HFile version is insufficient to persist tags, we won't have any
1730     // work to do here
1731     if (!cellFeaturesEnabled) {
1732       return newCell;
1733     }
1734 
1735     // Collect any ACLs from the old cell
1736     List<Tag> tags = Lists.newArrayList();
1737     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1738     if (oldCell != null) {
1739       // Save an object allocation where we can
1740       if (oldCell.getTagsLength() > 0) {
1741         Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell.getTagsArray(),
1742           oldCell.getTagsOffset(), oldCell.getTagsLength());
1743         while (tagIterator.hasNext()) {
1744           Tag tag = tagIterator.next();
1745           if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
1746             // Not an ACL tag, just carry it through
1747             if (LOG.isTraceEnabled()) {
1748               LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
1749                 " length " + tag.getTagLength());
1750             }
1751             tags.add(tag);
1752           } else {
1753             // Merge the perms from the older ACL into the current permission set
1754             ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
1755               AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
1756                 tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
1757             perms.putAll(kvPerms);
1758           }
1759         }
1760       }
1761     }
1762 
1763     // Do we have an ACL on the operation?
1764     byte[] aclBytes = mutation.getACL();
1765     if (aclBytes != null) {
1766       // Yes, use it
1767       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
1768     } else {
1769       // No, use what we carried forward
1770       if (perms != null) {
1771         // TODO: If we collected ACLs from more than one tag we may have a
1772         // List<Permission> of size > 1, this can be collapsed into a single
1773         // Permission
1774         if (LOG.isTraceEnabled()) {
1775           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1776         }
1777         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
1778           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
1779       }
1780     }
1781 
1782     // If we have no tags to add, just return
1783     if (tags.isEmpty()) {
1784       return newCell;
1785     }
1786 
1787     // We need to create another KV, unfortunately, because the current new KV
1788     // has no space for tags
1789     KeyValue rewriteKv = new KeyValue(newCell.getRowArray(), newCell.getRowOffset(),
1790         newCell.getRowLength(), newCell.getFamilyArray(), newCell.getFamilyOffset(),
1791         newCell.getFamilyLength(), newCell.getQualifierArray(), newCell.getQualifierOffset(),
1792         newCell.getQualifierLength(), newCell.getTimestamp(), KeyValue.Type.codeToType(newCell
1793             .getTypeByte()), newCell.getValueArray(), newCell.getValueOffset(),
1794         newCell.getValueLength(), tags);
1795     // Preserve mvcc data
1796     rewriteKv.setSequenceId(newCell.getSequenceId());
1797     return rewriteKv;
1798   }
1799 
1800   @Override
1801   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1802       final Scan scan, final RegionScanner s) throws IOException {
1803     internalPreRead(c, scan, OpType.SCAN);
1804     return s;
1805   }
1806 
1807   @Override
1808   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1809       final Scan scan, final RegionScanner s) throws IOException {
1810     User user = getActiveUser();
1811     if (user != null && user.getShortName() != null) {      // store reference to scanner owner for later checks
1812       scannerOwners.put(s, user.getShortName());
1813     }
1814     return s;
1815   }
1816 
1817   @Override
1818   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1819       final InternalScanner s, final List<Result> result,
1820       final int limit, final boolean hasNext) throws IOException {
1821     requireScannerOwner(s);
1822     return hasNext;
1823   }
1824 
1825   @Override
1826   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1827       final InternalScanner s) throws IOException {
1828     requireScannerOwner(s);
1829   }
1830 
1831   @Override
1832   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1833       final InternalScanner s) throws IOException {
1834     // clean up any associated owner mapping
1835     scannerOwners.remove(s);
1836   }
1837 
1838   /**
1839    * Verify, when servicing an RPC, that the caller is the scanner owner.
1840    * If so, we assume that access control is correctly enforced based on
1841    * the checks performed in preScannerOpen()
1842    */
1843   private void requireScannerOwner(InternalScanner s)
1844       throws AccessDeniedException {
1845     if (RequestContext.isInRequestContext()) {
1846       String requestUserName = RequestContext.getRequestUserName();
1847       String owner = scannerOwners.get(s);
1848       if (owner != null && !owner.equals(requestUserName)) {
1849         throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
1850       }
1851     }
1852   }
1853 
1854   /**
1855    * Verifies user has WRITE privileges on
1856    * the Column Families involved in the bulkLoadHFile
1857    * request. Specific Column Write privileges are presently
1858    * ignored.
1859    */
1860   @Override
1861   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1862       List<Pair<byte[], String>> familyPaths) throws IOException {
1863     for(Pair<byte[],String> el : familyPaths) {
1864       requirePermission("preBulkLoadHFile",
1865           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
1866           el.getFirst(),
1867           null,
1868           Action.CREATE);
1869     }
1870   }
1871 
1872   private AuthResult hasSomeAccess(RegionCoprocessorEnvironment e, String method, Action action) throws IOException {
1873     User requestUser = getActiveUser();
1874     TableName tableName = e.getRegion().getTableDesc().getTableName();
1875     AuthResult authResult = permissionGranted(method, requestUser,
1876         action, e, Collections.EMPTY_MAP);
1877     if (!authResult.isAllowed()) {
1878       for(UserPermission userPerm:
1879           AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), tableName)) {
1880         for(Action userAction: userPerm.getActions()) {
1881           if(userAction.equals(action)) {
1882             return AuthResult.allow(method, "Access allowed", requestUser,
1883                 action, tableName, null, null);
1884           }
1885         }
1886       }
1887     }
1888     return authResult;
1889   }
1890 
1891   /**
1892    * Authorization check for
1893    * SecureBulkLoadProtocol.prepareBulkLoad()
1894    * @param e
1895    * @throws IOException
1896    */
1897   //TODO this should end up as a coprocessor hook
1898   public void prePrepareBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
1899     AuthResult authResult = hasSomeAccess(e, "prePrepareBulkLoad", Action.WRITE);
1900     logResult(authResult);
1901     if (!authResult.isAllowed()) {
1902       throw new AccessDeniedException("Insufficient permissions (table=" +
1903         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
1904     }
1905   }
1906 
1907   /**
1908    * Authorization security check for
1909    * SecureBulkLoadProtocol.cleanupBulkLoad()
1910    * @param e
1911    * @throws IOException
1912    */
1913   //TODO this should end up as a coprocessor hook
1914   public void preCleanupBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
1915     AuthResult authResult = hasSomeAccess(e, "preCleanupBulkLoad", Action.WRITE);
1916     logResult(authResult);
1917     if (!authResult.isAllowed()) {
1918       throw new AccessDeniedException("Insufficient permissions (table=" +
1919         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
1920     }
1921   }
1922 
1923   /* ---- EndpointObserver implementation ---- */
1924 
1925   @Override
1926   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1927       Service service, String methodName, Message request) throws IOException {
1928     // Don't intercept calls to our own AccessControlService, we check for
1929     // appropriate permissions in the service handlers
1930     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
1931       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
1932         methodName + ")",
1933         getTableName(ctx.getEnvironment()), null, null,
1934         Action.EXEC);
1935     }
1936     return request;
1937   }
1938 
1939   @Override
1940   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1941       Service service, String methodName, Message request, Message.Builder responseBuilder)
1942       throws IOException { }
1943 
1944   /* ---- Protobuf AccessControlService implementation ---- */
1945 
1946   @Override
1947   public void grant(RpcController controller,
1948                     AccessControlProtos.GrantRequest request,
1949                     RpcCallback<AccessControlProtos.GrantResponse> done) {
1950     UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
1951     AccessControlProtos.GrantResponse response = null;
1952     try {
1953       // verify it's only running at .acl.
1954       if (aclRegion) {
1955         if (!initialized) {
1956           throw new CoprocessorException("AccessController not yet initialized");
1957         }
1958         if (LOG.isDebugEnabled()) {
1959           LOG.debug("Received request to grant access permission " + perm.toString());
1960         }
1961 
1962         switch(request.getUserPermission().getPermission().getType()) {
1963           case Global :
1964           case Table :
1965             requirePermission("grant", perm.getTableName(), perm.getFamily(),
1966                 perm.getQualifier(), Action.ADMIN);
1967             break;
1968           case Namespace :
1969             requireGlobalPermission("grant", Action.ADMIN, perm.getNamespace());
1970         }
1971 
1972         AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
1973         if (AUDITLOG.isTraceEnabled()) {
1974           // audit log should store permission changes in addition to auth results
1975           AUDITLOG.trace("Granted permission " + perm.toString());
1976         }
1977       } else {
1978         throw new CoprocessorException(AccessController.class, "This method "
1979             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
1980       }
1981       response = AccessControlProtos.GrantResponse.getDefaultInstance();
1982     } catch (IOException ioe) {
1983       // pass exception back up
1984       ResponseConverter.setControllerException(controller, ioe);
1985     }
1986     done.run(response);
1987   }
1988 
1989   @Override
1990   public void revoke(RpcController controller,
1991                      AccessControlProtos.RevokeRequest request,
1992                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
1993     UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
1994     AccessControlProtos.RevokeResponse response = null;
1995     try {
1996       // only allowed to be called on _acl_ region
1997       if (aclRegion) {
1998         if (!initialized) {
1999           throw new CoprocessorException("AccessController not yet initialized");
2000         }
2001         if (LOG.isDebugEnabled()) {
2002           LOG.debug("Received request to revoke access permission " + perm.toString());
2003         }
2004 
2005         switch(request.getUserPermission().getPermission().getType()) {
2006           case Global :
2007           case Table :
2008             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2009                               perm.getQualifier(), Action.ADMIN);
2010             break;
2011           case Namespace :
2012             requireGlobalPermission("revoke", Action.ADMIN, perm.getNamespace());
2013         }
2014 
2015         AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2016         if (AUDITLOG.isTraceEnabled()) {
2017           // audit log should record all permission changes
2018           AUDITLOG.trace("Revoked permission " + perm.toString());
2019         }
2020       } else {
2021         throw new CoprocessorException(AccessController.class, "This method "
2022             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2023       }
2024       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2025     } catch (IOException ioe) {
2026       // pass exception back up
2027       ResponseConverter.setControllerException(controller, ioe);
2028     }
2029     done.run(response);
2030   }
2031 
2032   @Override
2033   public void getUserPermissions(RpcController controller,
2034                                  AccessControlProtos.GetUserPermissionsRequest request,
2035                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2036     AccessControlProtos.GetUserPermissionsResponse response = null;
2037     try {
2038       // only allowed to be called on _acl_ region
2039       if (aclRegion) {
2040         if (!initialized) {
2041           throw new CoprocessorException("AccessController not yet initialized");
2042         }
2043         List<UserPermission> perms = null;
2044         if(request.getType() == AccessControlProtos.Permission.Type.Table) {
2045           TableName table = null;
2046           if (request.hasTableName()) {
2047             table = ProtobufUtil.toTableName(request.getTableName());
2048           }
2049           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2050 
2051           perms = AccessControlLists.getUserTablePermissions(
2052               regionEnv.getConfiguration(), table);
2053         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2054           perms = AccessControlLists.getUserNamespacePermissions(
2055               regionEnv.getConfiguration(), request.getNamespaceName().toStringUtf8());
2056         } else {
2057           perms = AccessControlLists.getUserPermissions(
2058               regionEnv.getConfiguration(), null);
2059         }
2060         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2061       } else {
2062         throw new CoprocessorException(AccessController.class, "This method "
2063             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2064       }
2065     } catch (IOException ioe) {
2066       // pass exception back up
2067       ResponseConverter.setControllerException(controller, ioe);
2068     }
2069     done.run(response);
2070   }
2071 
2072   @Override
2073   public void checkPermissions(RpcController controller,
2074                                AccessControlProtos.CheckPermissionsRequest request,
2075                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2076     Permission[] permissions = new Permission[request.getPermissionCount()];
2077     for (int i=0; i < request.getPermissionCount(); i++) {
2078       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2079     }
2080     AccessControlProtos.CheckPermissionsResponse response = null;
2081     try {
2082       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2083       for (Permission permission : permissions) {
2084         if (permission instanceof TablePermission) {
2085           TablePermission tperm = (TablePermission) permission;
2086           for (Action action : permission.getActions()) {
2087             if (!tperm.getTableName().equals(tableName)) {
2088               throw new CoprocessorException(AccessController.class, String.format("This method "
2089                   + "can only execute at the table specified in TablePermission. " +
2090                   "Table of the region:%s , requested table:%s", tableName,
2091                   tperm.getTableName()));
2092             }
2093 
2094             Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2095             if (tperm.getFamily() != null) {
2096               if (tperm.getQualifier() != null) {
2097                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2098                 qualifiers.add(tperm.getQualifier());
2099                 familyMap.put(tperm.getFamily(), qualifiers);
2100               } else {
2101                 familyMap.put(tperm.getFamily(), null);
2102               }
2103             }
2104 
2105             requirePermission("checkPermissions", action, regionEnv, familyMap);
2106           }
2107 
2108         } else {
2109           for (Action action : permission.getActions()) {
2110             requirePermission("checkPermissions", action);
2111           }
2112         }
2113       }
2114       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2115     } catch (IOException ioe) {
2116       ResponseConverter.setControllerException(controller, ioe);
2117     }
2118     done.run(response);
2119   }
2120 
2121   @Override
2122   public Service getService() {
2123     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2124   }
2125 
2126   private HRegion getRegion(RegionCoprocessorEnvironment e) {
2127     return e.getRegion();
2128   }
2129 
2130   private TableName getTableName(RegionCoprocessorEnvironment e) {
2131     HRegion region = e.getRegion();
2132     if (region != null) {
2133       return getTableName(region);
2134     }
2135     return null;
2136   }
2137 
2138   private TableName getTableName(HRegion region) {
2139     HRegionInfo regionInfo = region.getRegionInfo();
2140     if (regionInfo != null) {
2141       return regionInfo.getTable();
2142     }
2143     return null;
2144   }
2145 
2146   @Override
2147   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2148       throws IOException {
2149     requirePermission("preClose", Action.ADMIN);
2150   }
2151 
2152   private void isSystemOrSuperUser(Configuration conf) throws IOException {
2153     User user = userProvider.getCurrent();
2154     if (user == null) {
2155       throw new IOException("Unable to obtain the current user, " +
2156         "authorization checks for internal operations will not work correctly!");
2157     }
2158     User activeUser = getActiveUser();
2159     if (!(superusers.contains(activeUser.getShortName()))) {
2160       throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null") +
2161         "is not system or super user.");
2162     }
2163   }
2164 
2165   @Override
2166   public void preStopRegionServer(
2167       ObserverContext<RegionServerCoprocessorEnvironment> env)
2168       throws IOException {
2169     requirePermission("preStopRegionServer", Action.ADMIN);
2170   }
2171 
2172   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2173       byte[] qualifier) {
2174     if (family == null) {
2175       return null;
2176     }
2177 
2178     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2179     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2180     return familyMap;
2181   }
2182 
2183   @Override
2184   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2185       List<TableName> tableNamesList,
2186       List<HTableDescriptor> descriptors) throws IOException {
2187     // If the list is empty, this is a request for all table descriptors and requires GLOBAL
2188     // ADMIN privs.
2189     if (tableNamesList == null || tableNamesList.isEmpty()) {
2190       requireGlobalPermission("getTableDescriptors", Action.ADMIN, null, null);
2191     }
2192     // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2193     // request can be granted.
2194     else {
2195       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2196       for (TableName tableName: tableNamesList) {
2197         // Skip checks for a table that does not exist
2198         if (!masterServices.getTableStateManager().isTablePresent(tableName))
2199           continue;
2200         requirePermission("getTableDescriptors", tableName, null, null,
2201           Action.ADMIN, Action.CREATE);
2202       }
2203     }
2204   }
2205 
2206   @Override
2207   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA,
2208       HRegion regionB) throws IOException {
2209     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2210       Action.ADMIN);
2211   }
2212 
2213   @Override
2214   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, HRegion regionA,
2215       HRegion regionB, HRegion mergedRegion) throws IOException { }
2216 
2217   @Override
2218   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2219       HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException { }
2220 
2221   @Override
2222   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2223       HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException { }
2224 
2225   @Override
2226   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2227       HRegion regionA, HRegion regionB) throws IOException { }
2228 
2229   @Override
2230   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2231       HRegion regionA, HRegion regionB) throws IOException { }
2232 
2233   @Override
2234   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2235       throws IOException {
2236     requirePermission("preRollLogWriterRequest", Permission.Action.ADMIN);
2237   }
2238 
2239   @Override
2240   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2241       throws IOException { }
2242 
2243   @Override
2244   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2245       final String userName, final Quotas quotas) throws IOException {
2246     requirePermission("setUserQuota", Action.ADMIN);
2247   }
2248 
2249   @Override
2250   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2251       final String userName, final TableName tableName, final Quotas quotas) throws IOException {
2252     requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN);
2253   }
2254 
2255   @Override
2256   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2257       final String userName, final String namespace, final Quotas quotas) throws IOException {
2258     requirePermission("setUserNamespaceQuota", Action.ADMIN);
2259   }
2260 
2261   @Override
2262   public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2263       final TableName tableName, final Quotas quotas) throws IOException {
2264     requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
2265   }
2266 
2267   @Override
2268   public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2269       final String namespace, final Quotas quotas) throws IOException {
2270     requirePermission("setNamespaceQuota", Action.ADMIN);
2271   }
2272   
2273   @Override
2274   public ReplicationEndpoint postCreateReplicationEndPoint(
2275       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2276     return endpoint;
2277   }
2278 }