View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package org.apache.hadoop.hbase.security.access;
16  
17  import java.io.IOException;
18  import java.net.InetAddress;
19  import java.security.PrivilegedExceptionAction;
20  import java.util.Collection;
21  import java.util.Collections;
22  import java.util.HashMap;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  import java.util.Set;
28  import java.util.TreeMap;
29  import java.util.TreeSet;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.classification.InterfaceAudience;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Cell;
36  import org.apache.hadoop.hbase.CellScanner;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.CompoundConfiguration;
39  import org.apache.hadoop.hbase.CoprocessorEnvironment;
40  import org.apache.hadoop.hbase.DoNotRetryIOException;
41  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.KeyValue;
47  import org.apache.hadoop.hbase.KeyValue.Type;
48  import org.apache.hadoop.hbase.MetaTableAccessor;
49  import org.apache.hadoop.hbase.NamespaceDescriptor;
50  import org.apache.hadoop.hbase.ServerName;
51  import org.apache.hadoop.hbase.TableName;
52  import org.apache.hadoop.hbase.Tag;
53  import org.apache.hadoop.hbase.TagRewriteCell;
54  import org.apache.hadoop.hbase.client.Append;
55  import org.apache.hadoop.hbase.client.Delete;
56  import org.apache.hadoop.hbase.client.Durability;
57  import org.apache.hadoop.hbase.client.Get;
58  import org.apache.hadoop.hbase.client.Increment;
59  import org.apache.hadoop.hbase.client.Mutation;
60  import org.apache.hadoop.hbase.client.Put;
61  import org.apache.hadoop.hbase.client.Query;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.client.Scan;
64  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
65  import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
66  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
67  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
68  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
69  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
70  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
71  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
72  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
73  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
74  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
75  import org.apache.hadoop.hbase.filter.CompareFilter;
76  import org.apache.hadoop.hbase.filter.Filter;
77  import org.apache.hadoop.hbase.filter.FilterList;
78  import org.apache.hadoop.hbase.io.hfile.HFile;
79  import org.apache.hadoop.hbase.ipc.RequestContext;
80  import org.apache.hadoop.hbase.master.MasterServices;
81  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
82  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
83  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
84  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
85  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
86  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
87  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
88  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
89  import org.apache.hadoop.hbase.regionserver.HRegion;
90  import org.apache.hadoop.hbase.regionserver.InternalScanner;
91  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
92  import org.apache.hadoop.hbase.regionserver.RegionScanner;
93  import org.apache.hadoop.hbase.regionserver.ScanType;
94  import org.apache.hadoop.hbase.regionserver.Store;
95  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
96  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
97  import org.apache.hadoop.hbase.security.AccessDeniedException;
98  import org.apache.hadoop.hbase.security.User;
99  import org.apache.hadoop.hbase.security.UserProvider;
100 import org.apache.hadoop.hbase.security.access.Permission.Action;
101 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
102 import org.apache.hadoop.hbase.util.ByteRange;
103 import org.apache.hadoop.hbase.util.Bytes;
104 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
105 import org.apache.hadoop.hbase.util.Pair;
106 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
107 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
108 
109 import com.google.common.collect.ArrayListMultimap;
110 import com.google.common.collect.ImmutableSet;
111 import com.google.common.collect.ListMultimap;
112 import com.google.common.collect.Lists;
113 import com.google.common.collect.MapMaker;
114 import com.google.common.collect.Maps;
115 import com.google.common.collect.Sets;
116 import com.google.protobuf.Message;
117 import com.google.protobuf.RpcCallback;
118 import com.google.protobuf.RpcController;
119 import com.google.protobuf.Service;
120 
121 /**
122  * Provides basic authorization checks for data access and administrative
123  * operations.
124  *
125  * <p>
126  * {@code AccessController} performs authorization checks for HBase operations
127  * based on:
128  * <ul>
129  *   <li>the identity of the user performing the operation</li>
130  *   <li>the scope over which the operation is performed, in increasing
131  *   specificity: global, table, column family, or qualifier</li>
132  *   <li>the type of action being performed (as mapped to
133  *   {@link Permission.Action} values)</li>
134  * </ul>
135  * If the authorization check fails, an {@link AccessDeniedException}
136  * will be thrown for the operation.
137  * </p>
138  *
139  * <p>
140  * To perform authorization checks, {@code AccessController} relies on the
141  * RpcServerEngine being loaded to provide
142  * the user identities for remote requests.
143  * </p>
144  *
145  * <p>
146  * The access control lists used for authorization can be manipulated via the
147  * exposed {@link AccessControlService} Interface implementation, and the associated
148  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
149  * commands.
150  * </p>
151  */
152 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
153 public class AccessController extends BaseMasterAndRegionObserver
154     implements RegionServerObserver,
155       AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
156 
157   public static final Log LOG = LogFactory.getLog(AccessController.class);
158 
159   private static final Log AUDITLOG =
160     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
161   private static final String CHECK_COVERING_PERM = "check_covering_perm";
162   private static final String TAG_CHECK_PASSED = "tag_check_passed";
163   private static final byte[] TRUE = Bytes.toBytes(true);
164 
165   TableAuthManager authManager = null;
166 
167   // flags if we are running on a region of the _acl_ table
168   boolean aclRegion = false;
169 
170   // defined only for Endpoint implementation, so it can have way to
171   // access region services.
172   private RegionCoprocessorEnvironment regionEnv;
173 
174   /** Mapping of scanner instances to the user who created them */
175   private Map<InternalScanner,String> scannerOwners =
176       new MapMaker().weakKeys().makeMap();
177 
178   private Map<TableName, List<UserPermission>> tableAcls;
179 
180   // Provider for mapping principal names to Users
181   private UserProvider userProvider;
182 
183   // The list of users with superuser authority
184   private List<String> superusers;
185 
186   // if we are able to support cell ACLs
187   boolean cellFeaturesEnabled;
188 
189   // if we should check EXEC permissions
190   boolean shouldCheckExecPermission;
191 
192   // if we should terminate access checks early as soon as table or CF grants
193   // allow access; pre-0.98 compatible behavior
194   boolean compatibleEarlyTermination;
195 
196   private volatile boolean initialized = false;
197 
198   // This boolean having relevance only in the Master.
199   private volatile boolean aclTabAvailable = false;
200 
201   public HRegion getRegion() {
202     return regionEnv != null ? regionEnv.getRegion() : null;
203   }
204 
205   public TableAuthManager getAuthManager() {
206     return authManager;
207   }
208 
209   void initialize(RegionCoprocessorEnvironment e) throws IOException {
210     final HRegion region = e.getRegion();
211     Configuration conf = e.getConfiguration();
212     Map<byte[], ListMultimap<String,TablePermission>> tables =
213         AccessControlLists.loadAll(region);
214     // For each table, write out the table's permissions to the respective
215     // znode for that table.
216     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
217       tables.entrySet()) {
218       byte[] entry = t.getKey();
219       ListMultimap<String,TablePermission> perms = t.getValue();
220       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
221       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
222     }
223     initialized = true;
224   }
225 
226   /**
227    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
228    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
229    * table updates.
230    */
231   void updateACL(RegionCoprocessorEnvironment e,
232       final Map<byte[], List<Cell>> familyMap) {
233     Set<byte[]> entries =
234         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
235     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
236       List<Cell> cells = f.getValue();
237       for (Cell cell: cells) {
238         if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
239             cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
240             AccessControlLists.ACL_LIST_FAMILY.length)) {
241           entries.add(CellUtil.cloneRow(cell));
242         }
243       }
244     }
245     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
246     Configuration conf = regionEnv.getConfiguration();
247     for (byte[] entry: entries) {
248       try {
249         ListMultimap<String,TablePermission> perms =
250           AccessControlLists.getPermissions(conf, entry);
251         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
252         zkw.writeToZookeeper(entry, serialized);
253       } catch (IOException ex) {
254         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
255             ex);
256       }
257     }
258   }
259 
260   /**
261    * Check the current user for authorization to perform a specific action
262    * against the given set of row data.
263    *
264    * <p>Note: Ordering of the authorization checks
265    * has been carefully optimized to short-circuit the most common requests
266    * and minimize the amount of processing required.</p>
267    *
268    * @param permRequest the action being requested
269    * @param e the coprocessor environment
270    * @param families the map of column families to qualifiers present in
271    * the request
272    * @return an authorization result
273    */
274   AuthResult permissionGranted(String request, User user, Action permRequest,
275       RegionCoprocessorEnvironment e,
276       Map<byte [], ? extends Collection<?>> families) {
277     HRegionInfo hri = e.getRegion().getRegionInfo();
278     TableName tableName = hri.getTable();
279 
280     // 1. All users need read access to hbase:meta table.
281     // this is a very common operation, so deal with it quickly.
282     if (hri.isMetaRegion()) {
283       if (permRequest == Action.READ) {
284         return AuthResult.allow(request, "All users allowed", user,
285           permRequest, tableName, families);
286       }
287     }
288 
289     if (user == null) {
290       return AuthResult.deny(request, "No user associated with request!", null,
291         permRequest, tableName, families);
292     }
293 
294     // Users with CREATE/ADMIN rights need to modify hbase:meta and _acl_ table
295     // e.g. When a new table is created a new entry in hbase:meta is added,
296     // so the user need to be allowed to write on it.
297     // e.g. When a table is removed an entry is removed from hbase:meta and _acl_
298     // and the user need to be allowed to write on both tables.
299     if (permRequest == Action.WRITE &&
300        (hri.isMetaRegion() ||
301         Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) &&
302        (authManager.authorize(user, Action.CREATE) ||
303         authManager.authorize(user, Action.ADMIN)))
304     {
305        return AuthResult.allow(request, "Table permission granted", user,
306         permRequest, tableName, families);
307     }
308 
309     // 2. check for the table-level, if successful we can short-circuit
310     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
311       return AuthResult.allow(request, "Table permission granted", user,
312         permRequest, tableName, families);
313     }
314 
315     // 3. check permissions against the requested families
316     if (families != null && families.size() > 0) {
317       // all families must pass
318       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
319         // a) check for family level access
320         if (authManager.authorize(user, tableName, family.getKey(),
321             permRequest)) {
322           continue;  // family-level permission overrides per-qualifier
323         }
324 
325         // b) qualifier level access can still succeed
326         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
327           if (family.getValue() instanceof Set) {
328             // for each qualifier of the family
329             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
330             for (byte[] qualifier : familySet) {
331               if (!authManager.authorize(user, tableName, family.getKey(),
332                                          qualifier, permRequest)) {
333                 return AuthResult.deny(request, "Failed qualifier check", user,
334                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
335               }
336             }
337           } else if (family.getValue() instanceof List) { // List<KeyValue>
338             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
339             for (KeyValue kv : kvList) {
340               if (!authManager.authorize(user, tableName, family.getKey(),
341                       kv.getQualifier(), permRequest)) {
342                 return AuthResult.deny(request, "Failed qualifier check", user,
343                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
344               }
345             }
346           }
347         } else {
348           // no qualifiers and family-level check already failed
349           return AuthResult.deny(request, "Failed family check", user, permRequest,
350               tableName, makeFamilyMap(family.getKey(), null));
351         }
352       }
353 
354       // all family checks passed
355       return AuthResult.allow(request, "All family checks passed", user, permRequest,
356           tableName, families);
357     }
358 
359     // 4. no families to check and table level access failed
360     return AuthResult.deny(request, "No families to check and table permission failed",
361         user, permRequest, tableName, families);
362   }
363 
364   /**
365    * Check the current user for authorization to perform a specific action
366    * against the given set of row data.
367    * @param opType the operation type
368    * @param user the user
369    * @param e the coprocessor environment
370    * @param families the map of column families to qualifiers present in
371    * the request
372    * @param actions the desired actions
373    * @return an authorization result
374    */
375   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
376       Map<byte [], ? extends Collection<?>> families, Action... actions) {
377     AuthResult result = null;
378     for (Action action: actions) {
379       result = permissionGranted(opType.toString(), user, action, e, families);
380       if (!result.isAllowed()) {
381         return result;
382       }
383     }
384     return result;
385   }
386 
387   private void logResult(AuthResult result) {
388     if (AUDITLOG.isTraceEnabled()) {
389       RequestContext ctx = RequestContext.get();
390       InetAddress remoteAddr = null;
391       if (ctx != null) {
392         remoteAddr = ctx.getRemoteAddress();
393       }
394       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
395           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
396           "; reason: " + result.getReason() +
397           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
398           "; request: " + result.getRequest() +
399           "; context: " + result.toContextString());
400     }
401   }
402 
403   /**
404    * Returns the active user to which authorization checks should be applied.
405    * If we are in the context of an RPC call, the remote user is used,
406    * otherwise the currently logged in user is used.
407    */
408   private User getActiveUser() throws IOException {
409     User user = RequestContext.getRequestUser();
410     if (!RequestContext.isInRequestContext()) {
411       // for non-rpc handling, fallback to system user
412       user = userProvider.getCurrent();
413     }
414     return user;
415   }
416 
417   /**
418    * Authorizes that the current user has any of the given permissions for the
419    * given table, column family and column qualifier.
420    * @param tableName Table requested
421    * @param family Column family requested
422    * @param qualifier Column qualifier requested
423    * @throws IOException if obtaining the current user fails
424    * @throws AccessDeniedException if user has no authorization
425    */
426   private void requirePermission(String request, TableName tableName, byte[] family, byte[] qualifier,
427       Action... permissions) throws IOException {
428     User user = getActiveUser();
429     AuthResult result = null;
430 
431     for (Action permission : permissions) {
432       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
433         result = AuthResult.allow(request, "Table permission granted", user,
434                                   permission, tableName, family, qualifier);
435         break;
436       } else {
437         // rest of the world
438         result = AuthResult.deny(request, "Insufficient permissions", user,
439                                  permission, tableName, family, qualifier);
440       }
441     }
442     logResult(result);
443     if (!result.isAllowed()) {
444       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
445     }
446   }
447 
448   /**
449    * Authorizes that the current user has global privileges for the given action.
450    * @param perm The action being requested
451    * @throws IOException if obtaining the current user fails
452    * @throws AccessDeniedException if authorization is denied
453    */
454   private void requirePermission(String request, Action perm) throws IOException {
455     requireGlobalPermission(request, perm, null, null);
456   }
457 
458   /**
459    * Authorizes that the current user has permission to perform the given
460    * action on the set of table column families.
461    * @param perm Action that is required
462    * @param env The current coprocessor environment
463    * @param families The map of column families-qualifiers.
464    * @throws AccessDeniedException if the authorization check failed
465    */
466   private void requirePermission(String request, Action perm,
467         RegionCoprocessorEnvironment env,
468         Map<byte[], ? extends Collection<?>> families)
469       throws IOException {
470     User user = getActiveUser();
471     AuthResult result = permissionGranted(request, user, perm, env, families);
472     logResult(result);
473 
474     if (!result.isAllowed()) {
475       throw new AccessDeniedException("Insufficient permissions (table=" +
476         env.getRegion().getTableDesc().getTableName()+
477         ((families != null && families.size() > 0) ? ", family: " +
478         result.toFamilyString() : "") + ", action=" +
479         perm.toString() + ")");
480     }
481   }
482 
483   /**
484    * Checks that the user has the given global permission. The generated
485    * audit log message will contain context information for the operation
486    * being authorized, based on the given parameters.
487    * @param perm Action being requested
488    * @param tableName Affected table name.
489    * @param familyMap Affected column families.
490    */
491   private void requireGlobalPermission(String request, Action perm, TableName tableName,
492       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
493     User user = getActiveUser();
494     if (authManager.authorize(user, perm) || (tableName != null &&
495         authManager.authorize(user, tableName.getNamespaceAsString(), perm))) {
496       logResult(AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap));
497     } else {
498       logResult(AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap));
499       throw new AccessDeniedException("Insufficient permissions for user '" +
500           (user != null ? user.getShortName() : "null") +"' (global, action=" +
501           perm.toString() + ")");
502     }
503   }
504 
505   /**
506    * Checks that the user has the given global permission. The generated
507    * audit log message will contain context information for the operation
508    * being authorized, based on the given parameters.
509    * @param perm Action being requested
510    * @param namespace
511    */
512   private void requireGlobalPermission(String request, Action perm,
513                                        String namespace) throws IOException {
514     User user = getActiveUser();
515     if (authManager.authorize(user, perm)) {
516       logResult(AuthResult.allow(request, "Global check allowed", user, perm, namespace));
517     } else {
518       logResult(AuthResult.deny(request, "Global check failed", user, perm, namespace));
519       throw new AccessDeniedException("Insufficient permissions for user '" +
520           (user != null ? user.getShortName() : "null") +"' (global, action=" +
521           perm.toString() + ")");
522     }
523   }
524 
525   /**
526    * Returns <code>true</code> if the current user is allowed the given action
527    * over at least one of the column qualifiers in the given column families.
528    */
529   private boolean hasFamilyQualifierPermission(User user,
530       Action perm,
531       RegionCoprocessorEnvironment env,
532       Map<byte[], ? extends Collection<byte[]>> familyMap)
533     throws IOException {
534     HRegionInfo hri = env.getRegion().getRegionInfo();
535     TableName tableName = hri.getTable();
536 
537     if (user == null) {
538       return false;
539     }
540 
541     if (familyMap != null && familyMap.size() > 0) {
542       // at least one family must be allowed
543       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
544           familyMap.entrySet()) {
545         if (family.getValue() != null && !family.getValue().isEmpty()) {
546           for (byte[] qualifier : family.getValue()) {
547             if (authManager.matchPermission(user, tableName,
548                 family.getKey(), qualifier, perm)) {
549               return true;
550             }
551           }
552         } else {
553           if (authManager.matchPermission(user, tableName, family.getKey(),
554               perm)) {
555             return true;
556           }
557         }
558       }
559     } else if (LOG.isDebugEnabled()) {
560       LOG.debug("Empty family map passed for permission check");
561     }
562 
563     return false;
564   }
565 
566   private enum OpType {
567     GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
568     GET("get"),
569     EXISTS("exists"),
570     SCAN("scan"),
571     PUT("put"),
572     DELETE("delete"),
573     CHECK_AND_PUT("checkAndPut"),
574     CHECK_AND_DELETE("checkAndDelete"),
575     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
576     APPEND("append"),
577     INCREMENT("increment");
578 
579     private String type;
580 
581     private OpType(String type) {
582       this.type = type;
583     }
584 
585     @Override
586     public String toString() {
587       return type;
588     }
589   }
590 
591   /**
592    * Determine if cell ACLs covered by the operation grant access. This is expensive.
593    * @return false if cell ACLs failed to grant access, true otherwise
594    * @throws IOException
595    */
596   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
597       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
598       throws IOException {
599     if (!cellFeaturesEnabled) {
600       return false;
601     }
602     long cellGrants = 0;
603     User user = getActiveUser();
604     long latestCellTs = 0;
605     Get get = new Get(row);
606     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
607     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
608     // version. We have to get every cell version and check its TS against the TS asked for in
609     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
610     // consider only one such passing cell. In case of Delete we have to consider all the cell
611     // versions under this passing version. When Delete Mutation contains columns which are a
612     // version delete just consider only one version for those column cells.
613     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
614     if (considerCellTs) {
615       get.setMaxVersions();
616     } else {
617       get.setMaxVersions(1);
618     }
619     boolean diffCellTsFromOpTs = false;
620     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
621       byte[] col = entry.getKey();
622       // TODO: HBASE-7114 could possibly unify the collection type in family
623       // maps so we would not need to do this
624       if (entry.getValue() instanceof Set) {
625         Set<byte[]> set = (Set<byte[]>)entry.getValue();
626         if (set == null || set.isEmpty()) {
627           get.addFamily(col);
628         } else {
629           for (byte[] qual: set) {
630             get.addColumn(col, qual);
631           }
632         }
633       } else if (entry.getValue() instanceof List) {
634         List<Cell> list = (List<Cell>)entry.getValue();
635         if (list == null || list.isEmpty()) {
636           get.addFamily(col);
637         } else {
638           // In case of family delete, a Cell will be added into the list with Qualifier as null.
639           for (Cell cell : list) {
640             if (cell.getQualifierLength() == 0
641                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
642                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
643               get.addFamily(col);
644             } else {
645               get.addColumn(col, CellUtil.cloneQualifier(cell));
646             }
647             if (considerCellTs) {
648               long cellTs = cell.getTimestamp();
649               latestCellTs = Math.max(latestCellTs, cellTs);
650               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
651             }
652           }
653         }
654       } else {
655         throw new RuntimeException("Unhandled collection type " +
656           entry.getValue().getClass().getName());
657       }
658     }
659     // We want to avoid looking into the future. So, if the cells of the
660     // operation specify a timestamp, or the operation itself specifies a
661     // timestamp, then we use the maximum ts found. Otherwise, we bound
662     // the Get to the current server time. We add 1 to the timerange since
663     // the upper bound of a timerange is exclusive yet we need to examine
664     // any cells found there inclusively.
665     long latestTs = Math.max(opTs, latestCellTs);
666     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
667       latestTs = EnvironmentEdgeManager.currentTime();
668     }
669     get.setTimeRange(0, latestTs + 1);
670     // In case of Put operation we set to read all versions. This was done to consider the case
671     // where columns are added with TS other than the Mutation TS. But normally this wont be the
672     // case with Put. There no need to get all versions but get latest version only.
673     if (!diffCellTsFromOpTs && request == OpType.PUT) {
674       get.setMaxVersions(1);
675     }
676     if (LOG.isTraceEnabled()) {
677       LOG.trace("Scanning for cells with " + get);
678     }
679     // This Map is identical to familyMap. The key is a BR rather than byte[].
680     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
681     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
682     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
683     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
684       if (entry.getValue() instanceof List) {
685         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
686       }
687     }
688     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
689     List<Cell> cells = Lists.newArrayList();
690     Cell prevCell = null;
691     ByteRange curFam = new SimpleMutableByteRange();
692     boolean curColAllVersions = (request == OpType.DELETE);
693     long curColCheckTs = opTs;
694     boolean foundColumn = false;
695     try {
696       boolean more = false;
697       do {
698         cells.clear();
699         // scan with limit as 1 to hold down memory use on wide rows
700         more = scanner.next(cells, 1);
701         for (Cell cell: cells) {
702           if (LOG.isTraceEnabled()) {
703             LOG.trace("Found cell " + cell);
704           }
705           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
706           if (colChange) foundColumn = false;
707           prevCell = cell;
708           if (!curColAllVersions && foundColumn) {
709             continue;
710           }
711           if (colChange && considerCellTs) {
712             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
713             List<Cell> cols = familyMap1.get(curFam);
714             for (Cell col : cols) {
715               // null/empty qualifier is used to denote a Family delete. The TS and delete type
716               // associated with this is applicable for all columns within the family. That is
717               // why the below (col.getQualifierLength() == 0) check.
718               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
719                   || CellUtil.matchingQualifier(cell, col)) {
720                 byte type = col.getTypeByte();
721                 if (considerCellTs) {
722                   curColCheckTs = col.getTimestamp();
723                 }
724                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
725                 // a version delete for a column no need to check all the covering cells within
726                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
727                 // One version delete types are Delete/DeleteFamilyVersion
728                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
729                     || (KeyValue.Type.DeleteFamily.getCode() == type);
730                 break;
731               }
732             }
733           }
734           if (cell.getTimestamp() > curColCheckTs) {
735             // Just ignore this cell. This is not a covering cell.
736             continue;
737           }
738           foundColumn = true;
739           for (Action action: actions) {
740             // Are there permissions for this user for the cell?
741             if (!authManager.authorize(user, getTableName(e), cell, action)) {
742               // We can stop if the cell ACL denies access
743               return false;
744             }
745           }
746           cellGrants++;
747         }
748       } while (more);
749     } catch (AccessDeniedException ex) {
750       throw ex;
751     } catch (IOException ex) {
752       LOG.error("Exception while getting cells to calculate covering permission", ex);
753     } finally {
754       scanner.close();
755     }
756     // We should not authorize unless we have found one or more cell ACLs that
757     // grant access. This code is used to check for additional permissions
758     // after no table or CF grants are found.
759     return cellGrants > 0;
760   }
761 
762   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
763     // Iterate over the entries in the familyMap, replacing the cells therein
764     // with new cells including the ACL data
765     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
766       List<Cell> newCells = Lists.newArrayList();
767       for (Cell cell: e.getValue()) {
768         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
769         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
770         if (cell.getTagsLength() > 0) {
771           Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
772             cell.getTagsOffset(), cell.getTagsLength());
773           while (tagIterator.hasNext()) {
774             tags.add(tagIterator.next());
775           }
776         }
777         newCells.add(new TagRewriteCell(cell, Tag.fromList(tags)));
778       }
779       // This is supposed to be safe, won't CME
780       e.setValue(newCells);
781     }
782   }
783 
784   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
785   // type is reserved and should not be explicitly set by user.
786   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
787     // Superusers are allowed to store cells unconditionally.
788     if (superusers.contains(user.getShortName())) {
789       return;
790     }
791     // We already checked (prePut vs preBatchMutation)
792     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
793       return;
794     }
795     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
796       Cell cell = cellScanner.current();
797       if (cell.getTagsLength() > 0) {
798         Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
799           cell.getTagsLength());
800         while (tagsItr.hasNext()) {
801           if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
802             throw new AccessDeniedException("Mutation contains cell with reserved type tag");
803           }
804         }
805       }
806     }
807     m.setAttribute(TAG_CHECK_PASSED, TRUE);
808   }
809 
810   /* ---- MasterObserver implementation ---- */
811 
812   public void start(CoprocessorEnvironment env) throws IOException {
813     CompoundConfiguration conf = new CompoundConfiguration();
814     conf.add(env.getConfiguration());
815 
816     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
817       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
818 
819     cellFeaturesEnabled = HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS;
820     if (!cellFeaturesEnabled) {
821       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
822           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
823           + " accordingly.");
824     }
825 
826     ZooKeeperWatcher zk = null;
827     if (env instanceof MasterCoprocessorEnvironment) {
828       // if running on HMaster
829       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
830       zk = mEnv.getMasterServices().getZooKeeper();
831     } else if (env instanceof RegionServerCoprocessorEnvironment) {
832       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
833       zk = rsEnv.getRegionServerServices().getZooKeeper();
834     } else if (env instanceof RegionCoprocessorEnvironment) {
835       // if running at region
836       regionEnv = (RegionCoprocessorEnvironment) env;
837       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
838       zk = regionEnv.getRegionServerServices().getZooKeeper();
839       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
840         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
841     }
842 
843     // set the user-provider.
844     this.userProvider = UserProvider.instantiate(env.getConfiguration());
845 
846     // set up the list of users with superuser privilege
847     User user = userProvider.getCurrent();
848     superusers = Lists.asList(user.getShortName(),
849       conf.getStrings(AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
850 
851     // If zk is null or IOException while obtaining auth manager,
852     // throw RuntimeException so that the coprocessor is unloaded.
853     if (zk != null) {
854       try {
855         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
856       } catch (IOException ioe) {
857         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
858       }
859     } else {
860       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
861     }
862 
863     tableAcls = new MapMaker().weakValues().makeMap();
864   }
865 
866   public void stop(CoprocessorEnvironment env) {
867 
868   }
869 
870   @Override
871   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
872       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
873     Set<byte[]> families = desc.getFamiliesKeys();
874     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
875     for (byte[] family: families) {
876       familyMap.put(family, null);
877     }
878     requireGlobalPermission("createTable", Action.CREATE, desc.getTableName(), familyMap);
879   }
880 
881   @Override
882   public void postCreateTableHandler(final ObserverContext<MasterCoprocessorEnvironment> c,
883       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
884     // When AC is used, it should be configured as the 1st CP.
885     // In Master, the table operations like create, are handled by a Thread pool but the max size
886     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
887     // sequentially only.
888     // Related code in HMaster#startServiceThreads
889     // {code}
890     //   // We depend on there being only one instance of this executor running
891     //   // at a time. To do concurrency, would need fencing of enable/disable of
892     //   // tables.
893     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
894     // {code}
895     // In future if we change this pool to have more threads, then there is a chance for thread,
896     // creating acl table, getting delayed and by that time another table creation got over and
897     // this hook is getting called. In such a case, we will need a wait logic here which will
898     // wait till the acl table is created.
899     if (AccessControlLists.isAclTable(desc)) {
900       this.aclTabAvailable = true;
901     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
902       if (!aclTabAvailable) {
903         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
904             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
905             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
906       } else {
907         String owner = desc.getOwnerString();
908         // default the table owner to current user, if not specified.
909         if (owner == null)
910           owner = getActiveUser().getShortName();
911         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
912             desc.getTableName(), null, Action.values());
913         // switch to the real hbase master user for doing the RPC on the ACL table
914         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
915           @Override
916           public Void run() throws Exception {
917             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
918                 userperm);
919             return null;
920           }
921         });
922       }
923     }
924   }
925 
926   @Override
927   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
928       throws IOException {
929     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
930   }
931 
932   @Override
933   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
934       TableName tableName) throws IOException {
935     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(), tableName);
936   }
937 
938   @Override
939   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
940       throws IOException {
941     requirePermission("truncateTable", tableName, null, null, Action.ADMIN);
942     List<UserPermission> acls = AccessControlLists.getUserTablePermissions(c.getEnvironment()
943         .getConfiguration(), tableName);
944     if (acls != null) {
945       tableAcls.put(tableName, acls);
946     }
947   }
948 
949   @Override
950   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
951       TableName tableName) throws IOException {
952     List<UserPermission> perms = tableAcls.get(tableName);
953     if (perms != null) {
954       for (UserPermission perm : perms) {
955         AccessControlLists.addUserPermission(ctx.getEnvironment().getConfiguration(), perm);
956       }
957     }
958     tableAcls.remove(tableName);
959   }
960 
961   @Override
962   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
963       HTableDescriptor htd) throws IOException {
964     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
965   }
966 
967   @Override
968   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
969       TableName tableName, HTableDescriptor htd) throws IOException {
970     String owner = htd.getOwnerString();
971     // default the table owner to current user, if not specified.
972     if (owner == null) owner = getActiveUser().getShortName();
973     UserPermission userperm = new UserPermission(Bytes.toBytes(owner), htd.getTableName(), null,
974         Action.values());
975     AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(), userperm);
976   }
977 
978   @Override
979   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
980       HColumnDescriptor column) throws IOException {
981     requirePermission("addColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
982   }
983 
984   @Override
985   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
986       HColumnDescriptor descriptor) throws IOException {
987     requirePermission("modifyColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
988   }
989 
990   @Override
991   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
992       byte[] col) throws IOException {
993     requirePermission("deleteColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
994   }
995 
996   @Override
997   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
998       TableName tableName, byte[] col) throws IOException {
999     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(), tableName, col);
1000   }
1001 
1002   @Override
1003   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1004       throws IOException {
1005     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1006   }
1007 
1008   @Override
1009   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1010       throws IOException {
1011     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1012       throw new AccessDeniedException("Not allowed to disable "
1013           + AccessControlLists.ACL_TABLE_NAME + " table.");
1014     }
1015     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1016   }
1017 
1018   @Override
1019   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1020       ServerName srcServer, ServerName destServer) throws IOException {
1021     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1022   }
1023 
1024   @Override
1025   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1026       throws IOException {
1027     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1028   }
1029 
1030   @Override
1031   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1032       boolean force) throws IOException {
1033     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1034   }
1035 
1036   @Override
1037   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1038       HRegionInfo regionInfo) throws IOException {
1039     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1040   }
1041 
1042   @Override
1043   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1044       throws IOException {
1045     requirePermission("balance", Action.ADMIN);
1046   }
1047 
1048   @Override
1049   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1050       boolean newValue) throws IOException {
1051     requirePermission("balanceSwitch", Action.ADMIN);
1052     return newValue;
1053   }
1054 
1055   @Override
1056   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1057       throws IOException {
1058     requirePermission("shutdown", Action.ADMIN);
1059   }
1060 
1061   @Override
1062   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1063       throws IOException {
1064     requirePermission("stopMaster", Action.ADMIN);
1065   }
1066 
1067   @Override
1068   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1069       throws IOException {
1070     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1071       .getShortCircuitConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1072       // initialize the ACL storage table
1073       AccessControlLists.init(ctx.getEnvironment().getMasterServices());
1074     } else {
1075       aclTabAvailable = true;
1076     }
1077   }
1078 
1079   @Override
1080   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1081       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1082       throws IOException {
1083     requirePermission("snapshot", hTableDescriptor.getTableName(), null, null,
1084       Permission.Action.ADMIN);
1085   }
1086 
1087   @Override
1088   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1089       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1090       throws IOException {
1091     requirePermission("clone", Action.ADMIN);
1092   }
1093 
1094   @Override
1095   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1096       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1097       throws IOException {
1098     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1099       requirePermission("restoreSnapshot", hTableDescriptor.getTableName(), null, null,
1100         Permission.Action.ADMIN);
1101     } else {
1102       requirePermission("restore", Action.ADMIN);
1103     }
1104   }
1105 
1106   @Override
1107   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1108       final SnapshotDescription snapshot) throws IOException {
1109     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1110       // Snapshot owner is allowed to delete the snapshot
1111     } else {
1112       requirePermission("deleteSnapshot", Action.ADMIN);
1113     }
1114   }
1115 
1116   @Override
1117   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1118       NamespaceDescriptor ns) throws IOException {
1119     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1120   }
1121 
1122   @Override
1123   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1124       throws IOException {
1125     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1126   }
1127 
1128   @Override
1129   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1130                                   String namespace) throws IOException {
1131     AccessControlLists.removeNamespacePermissions(ctx.getEnvironment().getConfiguration(),
1132         namespace);
1133     LOG.info(namespace + "entry deleted in "+AccessControlLists.ACL_TABLE_NAME+" table.");
1134   }
1135 
1136   @Override
1137   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1138       NamespaceDescriptor ns) throws IOException {
1139     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1140   }
1141 
1142   @Override
1143   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1144       final TableName tableName) throws IOException {
1145     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1146   }
1147 
1148   /* ---- RegionObserver implementation ---- */
1149 
1150   @Override
1151   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1152       throws IOException {
1153     RegionCoprocessorEnvironment env = e.getEnvironment();
1154     final HRegion region = env.getRegion();
1155     if (region == null) {
1156       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1157     } else {
1158       HRegionInfo regionInfo = region.getRegionInfo();
1159       if (regionInfo.getTable().isSystemTable()) {
1160         isSystemOrSuperUser(regionEnv.getConfiguration());
1161       } else {
1162         requirePermission("preOpen", Action.ADMIN);
1163       }
1164     }
1165   }
1166 
1167   @Override
1168   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1169     RegionCoprocessorEnvironment env = c.getEnvironment();
1170     final HRegion region = env.getRegion();
1171     if (region == null) {
1172       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1173       return;
1174     }
1175     if (AccessControlLists.isAclRegion(region)) {
1176       aclRegion = true;
1177       // When this region is under recovering state, initialize will be handled by postLogReplay
1178       if (!region.isRecovering()) {
1179         try {
1180           initialize(env);
1181         } catch (IOException ex) {
1182           // if we can't obtain permissions, it's better to fail
1183           // than perform checks incorrectly
1184           throw new RuntimeException("Failed to initialize permissions cache", ex);
1185         }
1186       }
1187     } else {
1188       initialized = true;
1189     }
1190   }
1191 
1192   @Override
1193   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1194     if (aclRegion) {
1195       try {
1196         initialize(c.getEnvironment());
1197       } catch (IOException ex) {
1198         // if we can't obtain permissions, it's better to fail
1199         // than perform checks incorrectly
1200         throw new RuntimeException("Failed to initialize permissions cache", ex);
1201       }
1202     }
1203   }
1204 
1205   @Override
1206   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1207     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1208         Action.CREATE);
1209   }
1210 
1211   @Override
1212   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1213     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1214   }
1215 
1216   @Override
1217   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1218       byte[] splitRow) throws IOException {
1219     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1220   }
1221 
1222   @Override
1223   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1224       final Store store, final InternalScanner scanner, final ScanType scanType)
1225           throws IOException {
1226     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1227         Action.CREATE);
1228     return scanner;
1229   }
1230 
1231   @Override
1232   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1233       final byte [] row, final byte [] family, final Result result)
1234       throws IOException {
1235     assert family != null;
1236     RegionCoprocessorEnvironment env = c.getEnvironment();
1237     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, null);
1238     User user = getActiveUser();
1239     AuthResult authResult = permissionGranted(OpType.GET_CLOSEST_ROW_BEFORE, user, env, families,
1240       Action.READ);
1241     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1242       authResult.setAllowed(checkCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, env, row,
1243         families, HConstants.LATEST_TIMESTAMP, Action.READ));
1244       authResult.setReason("Covering cell set");
1245     }
1246     logResult(authResult);
1247     if (!authResult.isAllowed()) {
1248       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1249     }
1250   }
1251 
1252   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1253       final Query query, OpType opType) throws IOException {
1254     Filter filter = query.getFilter();
1255     // Don't wrap an AccessControlFilter
1256     if (filter != null && filter instanceof AccessControlFilter) {
1257       return;
1258     }
1259     User user = getActiveUser();
1260     RegionCoprocessorEnvironment env = c.getEnvironment();
1261     Map<byte[],? extends Collection<byte[]>> families = null;
1262     switch (opType) {
1263     case GET:
1264     case EXISTS:
1265       families = ((Get)query).getFamilyMap();
1266       break;
1267     case SCAN:
1268       families = ((Scan)query).getFamilyMap();
1269       break;
1270     default:
1271       throw new RuntimeException("Unhandled operation " + opType);
1272     }
1273     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1274     HRegion region = getRegion(env);
1275     TableName table = getTableName(region);
1276     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1277     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1278       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1279     }
1280     if (!authResult.isAllowed()) {
1281       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1282         // Old behavior: Scan with only qualifier checks if we have partial
1283         // permission. Backwards compatible behavior is to throw an
1284         // AccessDeniedException immediately if there are no grants for table
1285         // or CF or CF+qual. Only proceed with an injected filter if there are
1286         // grants for qualifiers. Otherwise we will fall through below and log
1287         // the result and throw an ADE. We may end up checking qualifier
1288         // grants three times (permissionGranted above, here, and in the
1289         // filter) but that's the price of backwards compatibility.
1290         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1291           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1292             AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1293             cfVsMaxVersions);
1294           // wrap any existing filter
1295           if (filter != null) {
1296             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1297               Lists.newArrayList(ourFilter, filter));
1298           }
1299           authResult.setAllowed(true);;
1300           authResult.setReason("Access allowed with filter");
1301           switch (opType) {
1302           case GET:
1303           case EXISTS:
1304             ((Get)query).setFilter(ourFilter);
1305             break;
1306           case SCAN:
1307             ((Scan)query).setFilter(ourFilter);
1308             break;
1309           default:
1310             throw new RuntimeException("Unhandled operation " + opType);
1311           }
1312         }
1313       } else {
1314         // New behavior: Any access we might be granted is more fine-grained
1315         // than whole table or CF. Simply inject a filter and return what is
1316         // allowed. We will not throw an AccessDeniedException. This is a
1317         // behavioral change since 0.96.
1318         Filter ourFilter = new AccessControlFilter(authManager, user, table,
1319           AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1320         // wrap any existing filter
1321         if (filter != null) {
1322           ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1323             Lists.newArrayList(ourFilter, filter));
1324         }
1325         authResult.setAllowed(true);;
1326         authResult.setReason("Access allowed with filter");
1327         switch (opType) {
1328         case GET:
1329         case EXISTS:
1330           ((Get)query).setFilter(ourFilter);
1331           break;
1332         case SCAN:
1333           ((Scan)query).setFilter(ourFilter);
1334           break;
1335         default:
1336           throw new RuntimeException("Unhandled operation " + opType);
1337         }
1338       }
1339     }
1340 
1341     logResult(authResult);
1342     if (!authResult.isAllowed()) {
1343       throw new AccessDeniedException("Insufficient permissions (table=" + table +
1344         ", action=READ)");
1345     }
1346   }
1347 
1348   @Override
1349   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1350       final Get get, final List<Cell> result) throws IOException {
1351     internalPreRead(c, get, OpType.GET);
1352   }
1353 
1354   @Override
1355   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1356       final Get get, final boolean exists) throws IOException {
1357     internalPreRead(c, get, OpType.EXISTS);
1358     return exists;
1359   }
1360 
1361   @Override
1362   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1363       final Put put, final WALEdit edit, final Durability durability)
1364       throws IOException {
1365     // Require WRITE permission to the table, CF, or top visible value, if any.
1366     // NOTE: We don't need to check the permissions for any earlier Puts
1367     // because we treat the ACLs in each Put as timestamped like any other
1368     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1369     // change the ACL of any previous Put. This allows simple evolution of
1370     // security policy over time without requiring expensive updates.
1371     User user = getActiveUser();
1372     checkForReservedTagPresence(user, put);
1373     RegionCoprocessorEnvironment env = c.getEnvironment();
1374     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1375     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1376     logResult(authResult);
1377     if (!authResult.isAllowed()) {
1378       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1379         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1380       } else {
1381         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1382       }
1383     }
1384     // Add cell ACLs from the operation to the cells themselves
1385     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1386     if (bytes != null) {
1387       if (cellFeaturesEnabled) {
1388         addCellPermissions(bytes, put.getFamilyCellMap());
1389       } else {
1390         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1391       }
1392     }
1393   }
1394 
1395   @Override
1396   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1397       final Put put, final WALEdit edit, final Durability durability) {
1398     if (aclRegion) {
1399       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1400     }
1401   }
1402 
1403   @Override
1404   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1405       final Delete delete, final WALEdit edit, final Durability durability)
1406       throws IOException {
1407     // An ACL on a delete is useless, we shouldn't allow it
1408     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1409       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1410     }
1411     // Require WRITE permissions on all cells covered by the delete. Unlike
1412     // for Puts we need to check all visible prior versions, because a major
1413     // compaction could remove them. If the user doesn't have permission to
1414     // overwrite any of the visible versions ('visible' defined as not covered
1415     // by a tombstone already) then we have to disallow this operation.
1416     RegionCoprocessorEnvironment env = c.getEnvironment();
1417     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1418     User user = getActiveUser();
1419     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1420     logResult(authResult);
1421     if (!authResult.isAllowed()) {
1422       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1423         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1424       } else {
1425         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1426       }
1427     }
1428   }
1429 
1430   @Override
1431   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1432       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1433     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1434       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1435       for (int i = 0; i < miniBatchOp.size(); i++) {
1436         Mutation m = miniBatchOp.getOperation(i);
1437         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1438           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1439           // perm check
1440           OpType opType;
1441           if (m instanceof Put) {
1442             checkForReservedTagPresence(getActiveUser(), m);
1443             opType = OpType.PUT;
1444           } else {
1445             opType = OpType.DELETE;
1446           }
1447           AuthResult authResult = null;
1448           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(), m.getFamilyCellMap(),
1449               m.getTimeStamp(), Action.WRITE)) {
1450             authResult = AuthResult.allow(opType.toString(), "Covering cell set", getActiveUser(),
1451                 Action.WRITE, table, m.getFamilyCellMap());
1452           } else {
1453             authResult = AuthResult.deny(opType.toString(), "Covering cell set", getActiveUser(),
1454                 Action.WRITE, table, m.getFamilyCellMap());
1455           }
1456           logResult(authResult);
1457           if (!authResult.isAllowed()) {
1458             throw new AccessDeniedException("Insufficient permissions "
1459                 + authResult.toContextString());
1460           }
1461         }
1462       }
1463     }
1464   }
1465 
1466   @Override
1467   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1468       final Delete delete, final WALEdit edit, final Durability durability)
1469       throws IOException {
1470     if (aclRegion) {
1471       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1472     }
1473   }
1474 
1475   @Override
1476   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1477       final byte [] row, final byte [] family, final byte [] qualifier,
1478       final CompareFilter.CompareOp compareOp,
1479       final ByteArrayComparable comparator, final Put put,
1480       final boolean result) throws IOException {
1481     // Require READ and WRITE permissions on the table, CF, and KV to update
1482     User user = getActiveUser();
1483     checkForReservedTagPresence(user, put);
1484     RegionCoprocessorEnvironment env = c.getEnvironment();
1485     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1486     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1487       Action.READ, Action.WRITE);
1488     logResult(authResult);
1489     if (!authResult.isAllowed()) {
1490       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1491         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1492       } else {
1493         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1494       }
1495     }
1496     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1497     if (bytes != null) {
1498       if (cellFeaturesEnabled) {
1499         addCellPermissions(bytes, put.getFamilyCellMap());
1500       } else {
1501         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1502       }
1503     }
1504     return result;
1505   }
1506 
1507   @Override
1508   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1509       final byte[] row, final byte[] family, final byte[] qualifier,
1510       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1511       final boolean result) throws IOException {
1512     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1513       // We had failure with table, cf and q perm checks and now giving a chance for cell
1514       // perm check
1515       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1516       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1517       AuthResult authResult = null;
1518       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1519           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1520         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1521             getActiveUser(), Action.READ, table, families);
1522       } else {
1523         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1524             getActiveUser(), Action.READ, table, families);
1525       }
1526       logResult(authResult);
1527       if (!authResult.isAllowed()) {
1528         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1529       }
1530     }
1531     return result;
1532   }
1533 
1534   @Override
1535   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1536       final byte [] row, final byte [] family, final byte [] qualifier,
1537       final CompareFilter.CompareOp compareOp,
1538       final ByteArrayComparable comparator, final Delete delete,
1539       final boolean result) throws IOException {
1540     // An ACL on a delete is useless, we shouldn't allow it
1541     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1542       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1543           delete.toString());
1544     }
1545     // Require READ and WRITE permissions on the table, CF, and the KV covered
1546     // by the delete
1547     RegionCoprocessorEnvironment env = c.getEnvironment();
1548     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1549     User user = getActiveUser();
1550     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1551       Action.READ, Action.WRITE);
1552     logResult(authResult);
1553     if (!authResult.isAllowed()) {
1554       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1555         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1556       } else {
1557         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1558       }
1559     }
1560     return result;
1561   }
1562 
1563   @Override
1564   public boolean preCheckAndDeleteAfterRowLock(
1565       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1566       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1567       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1568       throws IOException {
1569     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1570       // We had failure with table, cf and q perm checks and now giving a chance for cell
1571       // perm check
1572       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1573       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1574       AuthResult authResult = null;
1575       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1576           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1577         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1578             getActiveUser(), Action.READ, table, families);
1579       } else {
1580         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1581             getActiveUser(), Action.READ, table, families);
1582       }
1583       logResult(authResult);
1584       if (!authResult.isAllowed()) {
1585         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1586       }
1587     }
1588     return result;
1589   }
1590 
1591   @Override
1592   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1593       final byte [] row, final byte [] family, final byte [] qualifier,
1594       final long amount, final boolean writeToWAL)
1595       throws IOException {
1596     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1597     // incremented value
1598     RegionCoprocessorEnvironment env = c.getEnvironment();
1599     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1600     User user = getActiveUser();
1601     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1602       Action.WRITE);
1603     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1604       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1605         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1606       authResult.setReason("Covering cell set");
1607     }
1608     logResult(authResult);
1609     if (!authResult.isAllowed()) {
1610       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1611     }
1612     return -1;
1613   }
1614 
1615   @Override
1616   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1617       throws IOException {
1618     // Require WRITE permission to the table, CF, and the KV to be appended
1619     User user = getActiveUser();
1620     checkForReservedTagPresence(user, append);
1621     RegionCoprocessorEnvironment env = c.getEnvironment();
1622     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1623     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1624     logResult(authResult);
1625     if (!authResult.isAllowed()) {
1626       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1627         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1628       } else {
1629         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1630       }
1631     }
1632     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1633     if (bytes != null) {
1634       if (cellFeaturesEnabled) {
1635         addCellPermissions(bytes, append.getFamilyCellMap());
1636       } else {
1637         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1638       }
1639     }
1640     return null;
1641   }
1642 
1643   @Override
1644   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1645       final Append append) throws IOException {
1646     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1647       // We had failure with table, cf and q perm checks and now giving a chance for cell
1648       // perm check
1649       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1650       AuthResult authResult = null;
1651       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1652           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1653         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1654             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1655       } else {
1656         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1657             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1658       }
1659       logResult(authResult);
1660       if (!authResult.isAllowed()) {
1661         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1662       }
1663     }
1664     return null;
1665   }
1666 
1667   @Override
1668   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1669       final Increment increment)
1670       throws IOException {
1671     // Require WRITE permission to the table, CF, and the KV to be replaced by
1672     // the incremented value
1673     User user = getActiveUser();
1674     checkForReservedTagPresence(user, increment);
1675     RegionCoprocessorEnvironment env = c.getEnvironment();
1676     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1677     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1678       Action.WRITE);
1679     logResult(authResult);
1680     if (!authResult.isAllowed()) {
1681       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1682         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1683       } else {
1684         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1685       }
1686     }
1687     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1688     if (bytes != null) {
1689       if (cellFeaturesEnabled) {
1690         addCellPermissions(bytes, increment.getFamilyCellMap());
1691       } else {
1692         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1693       }
1694     }
1695     return null;
1696   }
1697 
1698   @Override
1699   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1700       final Increment increment) throws IOException {
1701     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1702       // We had failure with table, cf and q perm checks and now giving a chance for cell
1703       // perm check
1704       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1705       AuthResult authResult = null;
1706       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1707           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1708         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1709             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1710       } else {
1711         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1712             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1713       }
1714       logResult(authResult);
1715       if (!authResult.isAllowed()) {
1716         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1717       }
1718     }
1719     return null;
1720   }
1721 
1722   @Override
1723   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1724       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1725     // If the HFile version is insufficient to persist tags, we won't have any
1726     // work to do here
1727     if (!cellFeaturesEnabled) {
1728       return newCell;
1729     }
1730 
1731     // Collect any ACLs from the old cell
1732     List<Tag> tags = Lists.newArrayList();
1733     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1734     if (oldCell != null) {
1735       // Save an object allocation where we can
1736       if (oldCell.getTagsLength() > 0) {
1737         Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell.getTagsArray(),
1738           oldCell.getTagsOffset(), oldCell.getTagsLength());
1739         while (tagIterator.hasNext()) {
1740           Tag tag = tagIterator.next();
1741           if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
1742             // Not an ACL tag, just carry it through
1743             if (LOG.isTraceEnabled()) {
1744               LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
1745                 " length " + tag.getTagLength());
1746             }
1747             tags.add(tag);
1748           } else {
1749             // Merge the perms from the older ACL into the current permission set
1750             ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
1751               AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
1752                 tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
1753             perms.putAll(kvPerms);
1754           }
1755         }
1756       }
1757     }
1758 
1759     // Do we have an ACL on the operation?
1760     byte[] aclBytes = mutation.getACL();
1761     if (aclBytes != null) {
1762       // Yes, use it
1763       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
1764     } else {
1765       // No, use what we carried forward
1766       if (perms != null) {
1767         // TODO: If we collected ACLs from more than one tag we may have a
1768         // List<Permission> of size > 1, this can be collapsed into a single
1769         // Permission
1770         if (LOG.isTraceEnabled()) {
1771           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1772         }
1773         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
1774           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
1775       }
1776     }
1777 
1778     // If we have no tags to add, just return
1779     if (tags.isEmpty()) {
1780       return newCell;
1781     }
1782 
1783     Cell rewriteCell = new TagRewriteCell(newCell, Tag.fromList(tags));
1784     return rewriteCell;
1785   }
1786 
1787   @Override
1788   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1789       final Scan scan, final RegionScanner s) throws IOException {
1790     internalPreRead(c, scan, OpType.SCAN);
1791     return s;
1792   }
1793 
1794   @Override
1795   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1796       final Scan scan, final RegionScanner s) throws IOException {
1797     User user = getActiveUser();
1798     if (user != null && user.getShortName() != null) {      // store reference to scanner owner for later checks
1799       scannerOwners.put(s, user.getShortName());
1800     }
1801     return s;
1802   }
1803 
1804   @Override
1805   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1806       final InternalScanner s, final List<Result> result,
1807       final int limit, final boolean hasNext) throws IOException {
1808     requireScannerOwner(s);
1809     return hasNext;
1810   }
1811 
1812   @Override
1813   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1814       final InternalScanner s) throws IOException {
1815     requireScannerOwner(s);
1816   }
1817 
1818   @Override
1819   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1820       final InternalScanner s) throws IOException {
1821     // clean up any associated owner mapping
1822     scannerOwners.remove(s);
1823   }
1824 
1825   /**
1826    * Verify, when servicing an RPC, that the caller is the scanner owner.
1827    * If so, we assume that access control is correctly enforced based on
1828    * the checks performed in preScannerOpen()
1829    */
1830   private void requireScannerOwner(InternalScanner s)
1831       throws AccessDeniedException {
1832     if (RequestContext.isInRequestContext()) {
1833       String requestUserName = RequestContext.getRequestUserName();
1834       String owner = scannerOwners.get(s);
1835       if (owner != null && !owner.equals(requestUserName)) {
1836         throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
1837       }
1838     }
1839   }
1840 
1841   /**
1842    * Verifies user has WRITE privileges on
1843    * the Column Families involved in the bulkLoadHFile
1844    * request. Specific Column Write privileges are presently
1845    * ignored.
1846    */
1847   @Override
1848   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1849       List<Pair<byte[], String>> familyPaths) throws IOException {
1850     for(Pair<byte[],String> el : familyPaths) {
1851       requirePermission("preBulkLoadHFile",
1852           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
1853           el.getFirst(),
1854           null,
1855           Action.CREATE);
1856     }
1857   }
1858 
1859   private AuthResult hasSomeAccess(RegionCoprocessorEnvironment e, String method, Action action) throws IOException {
1860     User requestUser = getActiveUser();
1861     TableName tableName = e.getRegion().getTableDesc().getTableName();
1862     AuthResult authResult = permissionGranted(method, requestUser,
1863         action, e, Collections.EMPTY_MAP);
1864     if (!authResult.isAllowed()) {
1865       for(UserPermission userPerm:
1866           AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), tableName)) {
1867         for(Action userAction: userPerm.getActions()) {
1868           if(userAction.equals(action)) {
1869             return AuthResult.allow(method, "Access allowed", requestUser,
1870                 action, tableName, null, null);
1871           }
1872         }
1873       }
1874     }
1875     return authResult;
1876   }
1877 
1878   /**
1879    * Authorization check for
1880    * SecureBulkLoadProtocol.prepareBulkLoad()
1881    * @param ctx the context
1882    * @param request the request
1883    * @throws IOException
1884    */
1885   @Override
1886   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
1887                                  PrepareBulkLoadRequest request) throws IOException {
1888     RegionCoprocessorEnvironment e = ctx.getEnvironment();
1889 
1890     AuthResult authResult = hasSomeAccess(e, "prePrepareBulkLoad", Action.WRITE);
1891     logResult(authResult);
1892     if (!authResult.isAllowed()) {
1893       throw new AccessDeniedException("Insufficient permissions (table=" +
1894         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
1895     }
1896   }
1897 
1898   /**
1899    * Authorization security check for
1900    * SecureBulkLoadProtocol.cleanupBulkLoad()
1901    * @param ctx the context
1902    * @param request the request
1903    * @throws IOException
1904    */
1905   @Override
1906   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
1907                                  CleanupBulkLoadRequest request) throws IOException {
1908     RegionCoprocessorEnvironment e = ctx.getEnvironment();
1909 
1910     AuthResult authResult = hasSomeAccess(e, "preCleanupBulkLoad", Action.WRITE);
1911     logResult(authResult);
1912     if (!authResult.isAllowed()) {
1913       throw new AccessDeniedException("Insufficient permissions (table=" +
1914         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
1915     }
1916   }
1917 
1918   /* ---- EndpointObserver implementation ---- */
1919 
1920   @Override
1921   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1922       Service service, String methodName, Message request) throws IOException {
1923     // Don't intercept calls to our own AccessControlService, we check for
1924     // appropriate permissions in the service handlers
1925     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
1926       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
1927         methodName + ")",
1928         getTableName(ctx.getEnvironment()), null, null,
1929         Action.EXEC);
1930     }
1931     return request;
1932   }
1933 
1934   @Override
1935   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1936       Service service, String methodName, Message request, Message.Builder responseBuilder)
1937       throws IOException { }
1938 
1939   /* ---- Protobuf AccessControlService implementation ---- */
1940 
1941   @Override
1942   public void grant(RpcController controller,
1943                     AccessControlProtos.GrantRequest request,
1944                     RpcCallback<AccessControlProtos.GrantResponse> done) {
1945     UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
1946     AccessControlProtos.GrantResponse response = null;
1947     try {
1948       // verify it's only running at .acl.
1949       if (aclRegion) {
1950         if (!initialized) {
1951           throw new CoprocessorException("AccessController not yet initialized");
1952         }
1953         if (LOG.isDebugEnabled()) {
1954           LOG.debug("Received request to grant access permission " + perm.toString());
1955         }
1956 
1957         switch(request.getUserPermission().getPermission().getType()) {
1958           case Global :
1959           case Table :
1960             requirePermission("grant", perm.getTableName(), perm.getFamily(),
1961                 perm.getQualifier(), Action.ADMIN);
1962             break;
1963           case Namespace :
1964             requireGlobalPermission("grant", Action.ADMIN, perm.getNamespace());
1965         }
1966 
1967         AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
1968         if (AUDITLOG.isTraceEnabled()) {
1969           // audit log should store permission changes in addition to auth results
1970           AUDITLOG.trace("Granted permission " + perm.toString());
1971         }
1972       } else {
1973         throw new CoprocessorException(AccessController.class, "This method "
1974             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
1975       }
1976       response = AccessControlProtos.GrantResponse.getDefaultInstance();
1977     } catch (IOException ioe) {
1978       // pass exception back up
1979       ResponseConverter.setControllerException(controller, ioe);
1980     }
1981     done.run(response);
1982   }
1983 
1984   @Override
1985   public void revoke(RpcController controller,
1986                      AccessControlProtos.RevokeRequest request,
1987                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
1988     UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
1989     AccessControlProtos.RevokeResponse response = null;
1990     try {
1991       // only allowed to be called on _acl_ region
1992       if (aclRegion) {
1993         if (!initialized) {
1994           throw new CoprocessorException("AccessController not yet initialized");
1995         }
1996         if (LOG.isDebugEnabled()) {
1997           LOG.debug("Received request to revoke access permission " + perm.toString());
1998         }
1999 
2000         switch(request.getUserPermission().getPermission().getType()) {
2001           case Global :
2002           case Table :
2003             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2004                               perm.getQualifier(), Action.ADMIN);
2005             break;
2006           case Namespace :
2007             requireGlobalPermission("revoke", Action.ADMIN, perm.getNamespace());
2008         }
2009 
2010         AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2011         if (AUDITLOG.isTraceEnabled()) {
2012           // audit log should record all permission changes
2013           AUDITLOG.trace("Revoked permission " + perm.toString());
2014         }
2015       } else {
2016         throw new CoprocessorException(AccessController.class, "This method "
2017             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2018       }
2019       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2020     } catch (IOException ioe) {
2021       // pass exception back up
2022       ResponseConverter.setControllerException(controller, ioe);
2023     }
2024     done.run(response);
2025   }
2026 
2027   @Override
2028   public void getUserPermissions(RpcController controller,
2029                                  AccessControlProtos.GetUserPermissionsRequest request,
2030                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2031     AccessControlProtos.GetUserPermissionsResponse response = null;
2032     try {
2033       // only allowed to be called on _acl_ region
2034       if (aclRegion) {
2035         if (!initialized) {
2036           throw new CoprocessorException("AccessController not yet initialized");
2037         }
2038         List<UserPermission> perms = null;
2039         if(request.getType() == AccessControlProtos.Permission.Type.Table) {
2040           TableName table = null;
2041           if (request.hasTableName()) {
2042             table = ProtobufUtil.toTableName(request.getTableName());
2043           }
2044           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2045 
2046           perms = AccessControlLists.getUserTablePermissions(
2047               regionEnv.getConfiguration(), table);
2048         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2049           perms = AccessControlLists.getUserNamespacePermissions(
2050               regionEnv.getConfiguration(), request.getNamespaceName().toStringUtf8());
2051         } else {
2052           perms = AccessControlLists.getUserPermissions(
2053               regionEnv.getConfiguration(), null);
2054         }
2055         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2056       } else {
2057         throw new CoprocessorException(AccessController.class, "This method "
2058             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2059       }
2060     } catch (IOException ioe) {
2061       // pass exception back up
2062       ResponseConverter.setControllerException(controller, ioe);
2063     }
2064     done.run(response);
2065   }
2066 
2067   @Override
2068   public void checkPermissions(RpcController controller,
2069                                AccessControlProtos.CheckPermissionsRequest request,
2070                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2071     Permission[] permissions = new Permission[request.getPermissionCount()];
2072     for (int i=0; i < request.getPermissionCount(); i++) {
2073       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2074     }
2075     AccessControlProtos.CheckPermissionsResponse response = null;
2076     try {
2077       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2078       for (Permission permission : permissions) {
2079         if (permission instanceof TablePermission) {
2080           TablePermission tperm = (TablePermission) permission;
2081           for (Action action : permission.getActions()) {
2082             if (!tperm.getTableName().equals(tableName)) {
2083               throw new CoprocessorException(AccessController.class, String.format("This method "
2084                   + "can only execute at the table specified in TablePermission. " +
2085                   "Table of the region:%s , requested table:%s", tableName,
2086                   tperm.getTableName()));
2087             }
2088 
2089             Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2090             if (tperm.getFamily() != null) {
2091               if (tperm.getQualifier() != null) {
2092                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2093                 qualifiers.add(tperm.getQualifier());
2094                 familyMap.put(tperm.getFamily(), qualifiers);
2095               } else {
2096                 familyMap.put(tperm.getFamily(), null);
2097               }
2098             }
2099 
2100             requirePermission("checkPermissions", action, regionEnv, familyMap);
2101           }
2102 
2103         } else {
2104           for (Action action : permission.getActions()) {
2105             requirePermission("checkPermissions", action);
2106           }
2107         }
2108       }
2109       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2110     } catch (IOException ioe) {
2111       ResponseConverter.setControllerException(controller, ioe);
2112     }
2113     done.run(response);
2114   }
2115 
2116   @Override
2117   public Service getService() {
2118     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2119   }
2120 
2121   private HRegion getRegion(RegionCoprocessorEnvironment e) {
2122     return e.getRegion();
2123   }
2124 
2125   private TableName getTableName(RegionCoprocessorEnvironment e) {
2126     HRegion region = e.getRegion();
2127     if (region != null) {
2128       return getTableName(region);
2129     }
2130     return null;
2131   }
2132 
2133   private TableName getTableName(HRegion region) {
2134     HRegionInfo regionInfo = region.getRegionInfo();
2135     if (regionInfo != null) {
2136       return regionInfo.getTable();
2137     }
2138     return null;
2139   }
2140 
2141   @Override
2142   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2143       throws IOException {
2144     requirePermission("preClose", Action.ADMIN);
2145   }
2146 
2147   private void isSystemOrSuperUser(Configuration conf) throws IOException {
2148     User user = userProvider.getCurrent();
2149     if (user == null) {
2150       throw new IOException("Unable to obtain the current user, " +
2151         "authorization checks for internal operations will not work correctly!");
2152     }
2153     User activeUser = getActiveUser();
2154     if (!(superusers.contains(activeUser.getShortName()))) {
2155       throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null") +
2156         "is not system or super user.");
2157     }
2158   }
2159 
2160   @Override
2161   public void preStopRegionServer(
2162       ObserverContext<RegionServerCoprocessorEnvironment> env)
2163       throws IOException {
2164     requirePermission("preStopRegionServer", Action.ADMIN);
2165   }
2166 
2167   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2168       byte[] qualifier) {
2169     if (family == null) {
2170       return null;
2171     }
2172 
2173     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2174     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2175     return familyMap;
2176   }
2177 
2178   @Override
2179   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2180       List<TableName> tableNamesList,
2181       List<HTableDescriptor> descriptors) throws IOException {
2182     // If the list is empty, this is a request for all table descriptors and requires GLOBAL
2183     // ADMIN privs.
2184     if (tableNamesList == null || tableNamesList.isEmpty()) {
2185       requireGlobalPermission("getTableDescriptors", Action.ADMIN, null, null);
2186     }
2187     // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2188     // request can be granted.
2189     else {
2190       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2191       for (TableName tableName: tableNamesList) {
2192         // Skip checks for a table that does not exist
2193         if (!masterServices.getTableStateManager().isTablePresent(tableName))
2194           continue;
2195         requirePermission("getTableDescriptors", tableName, null, null,
2196           Action.ADMIN, Action.CREATE);
2197       }
2198     }
2199   }
2200 
2201   @Override
2202   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA,
2203       HRegion regionB) throws IOException {
2204     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2205       Action.ADMIN);
2206   }
2207 
2208   @Override
2209   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, HRegion regionA,
2210       HRegion regionB, HRegion mergedRegion) throws IOException { }
2211 
2212   @Override
2213   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2214       HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException { }
2215 
2216   @Override
2217   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2218       HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException { }
2219 
2220   @Override
2221   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2222       HRegion regionA, HRegion regionB) throws IOException { }
2223 
2224   @Override
2225   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2226       HRegion regionA, HRegion regionB) throws IOException { }
2227 
2228   @Override
2229   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2230       throws IOException {
2231     requirePermission("preRollLogWriterRequest", Permission.Action.ADMIN);
2232   }
2233 
2234   @Override
2235   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2236       throws IOException { }
2237 
2238   @Override
2239   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2240       final String userName, final Quotas quotas) throws IOException {
2241     requirePermission("setUserQuota", Action.ADMIN);
2242   }
2243 
2244   @Override
2245   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2246       final String userName, final TableName tableName, final Quotas quotas) throws IOException {
2247     requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN);
2248   }
2249 
2250   @Override
2251   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2252       final String userName, final String namespace, final Quotas quotas) throws IOException {
2253     requirePermission("setUserNamespaceQuota", Action.ADMIN);
2254   }
2255 
2256   @Override
2257   public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2258       final TableName tableName, final Quotas quotas) throws IOException {
2259     requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
2260   }
2261 
2262   @Override
2263   public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2264       final String namespace, final Quotas quotas) throws IOException {
2265     requirePermission("setNamespaceQuota", Action.ADMIN);
2266   }
2267   
2268   @Override
2269   public ReplicationEndpoint postCreateReplicationEndPoint(
2270       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2271     return endpoint;
2272   }
2273 }