View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package org.apache.hadoop.hbase.security.access;
16  
17  import java.io.IOException;
18  import java.net.InetAddress;
19  import java.security.PrivilegedExceptionAction;
20  import java.util.Collection;
21  import java.util.Collections;
22  import java.util.HashMap;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  import java.util.Set;
28  import java.util.TreeMap;
29  import java.util.TreeSet;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Cell;
36  import org.apache.hadoop.hbase.CellScanner;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.CompoundConfiguration;
39  import org.apache.hadoop.hbase.CoprocessorEnvironment;
40  import org.apache.hadoop.hbase.DoNotRetryIOException;
41  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.KeyValue;
47  import org.apache.hadoop.hbase.KeyValue.Type;
48  import org.apache.hadoop.hbase.MetaTableAccessor;
49  import org.apache.hadoop.hbase.NamespaceDescriptor;
50  import org.apache.hadoop.hbase.ServerName;
51  import org.apache.hadoop.hbase.TableName;
52  import org.apache.hadoop.hbase.Tag;
53  import org.apache.hadoop.hbase.TagRewriteCell;
54  import org.apache.hadoop.hbase.client.Append;
55  import org.apache.hadoop.hbase.client.Delete;
56  import org.apache.hadoop.hbase.client.Durability;
57  import org.apache.hadoop.hbase.client.Get;
58  import org.apache.hadoop.hbase.client.Increment;
59  import org.apache.hadoop.hbase.client.Mutation;
60  import org.apache.hadoop.hbase.client.Put;
61  import org.apache.hadoop.hbase.client.Query;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.client.Scan;
64  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
65  import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
66  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
67  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
68  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
69  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
70  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
71  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
72  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
73  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
74  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
75  import org.apache.hadoop.hbase.filter.CompareFilter;
76  import org.apache.hadoop.hbase.filter.Filter;
77  import org.apache.hadoop.hbase.filter.FilterList;
78  import org.apache.hadoop.hbase.io.hfile.HFile;
79  import org.apache.hadoop.hbase.ipc.RequestContext;
80  import org.apache.hadoop.hbase.master.MasterServices;
81  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
82  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
83  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
84  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
85  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
86  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
87  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
88  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
89  import org.apache.hadoop.hbase.regionserver.HRegion;
90  import org.apache.hadoop.hbase.regionserver.InternalScanner;
91  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
92  import org.apache.hadoop.hbase.regionserver.RegionScanner;
93  import org.apache.hadoop.hbase.regionserver.ScanType;
94  import org.apache.hadoop.hbase.regionserver.Store;
95  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
96  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
97  import org.apache.hadoop.hbase.security.AccessDeniedException;
98  import org.apache.hadoop.hbase.security.User;
99  import org.apache.hadoop.hbase.security.UserProvider;
100 import org.apache.hadoop.hbase.security.access.Permission.Action;
101 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
102 import org.apache.hadoop.hbase.util.ByteRange;
103 import org.apache.hadoop.hbase.util.Bytes;
104 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
105 import org.apache.hadoop.hbase.util.Pair;
106 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
107 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
108 
109 import com.google.common.collect.ArrayListMultimap;
110 import com.google.common.collect.ImmutableSet;
111 import com.google.common.collect.ListMultimap;
112 import com.google.common.collect.Lists;
113 import com.google.common.collect.MapMaker;
114 import com.google.common.collect.Maps;
115 import com.google.common.collect.Sets;
116 import com.google.protobuf.Message;
117 import com.google.protobuf.RpcCallback;
118 import com.google.protobuf.RpcController;
119 import com.google.protobuf.Service;
120 
121 /**
122  * Provides basic authorization checks for data access and administrative
123  * operations.
124  *
125  * <p>
126  * {@code AccessController} performs authorization checks for HBase operations
127  * based on:
128  * <ul>
129  *   <li>the identity of the user performing the operation</li>
130  *   <li>the scope over which the operation is performed, in increasing
131  *   specificity: global, table, column family, or qualifier</li>
132  *   <li>the type of action being performed (as mapped to
133  *   {@link Permission.Action} values)</li>
134  * </ul>
135  * If the authorization check fails, an {@link AccessDeniedException}
136  * will be thrown for the operation.
137  * </p>
138  *
139  * <p>
140  * To perform authorization checks, {@code AccessController} relies on the
141  * RpcServerEngine being loaded to provide
142  * the user identities for remote requests.
143  * </p>
144  *
145  * <p>
146  * The access control lists used for authorization can be manipulated via the
147  * exposed {@link AccessControlService} Interface implementation, and the associated
148  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
149  * commands.
150  * </p>
151  */
152 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
153 public class AccessController extends BaseMasterAndRegionObserver
154     implements RegionServerObserver,
155       AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
156 
157   public static final Log LOG = LogFactory.getLog(AccessController.class);
158 
159   private static final Log AUDITLOG =
160     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
161   private static final String CHECK_COVERING_PERM = "check_covering_perm";
162   private static final String TAG_CHECK_PASSED = "tag_check_passed";
163   private static final byte[] TRUE = Bytes.toBytes(true);
164 
165   TableAuthManager authManager = null;
166 
167   // flags if we are running on a region of the _acl_ table
168   boolean aclRegion = false;
169 
170   // defined only for Endpoint implementation, so it can have way to
171   // access region services.
172   private RegionCoprocessorEnvironment regionEnv;
173 
174   /** Mapping of scanner instances to the user who created them */
175   private Map<InternalScanner,String> scannerOwners =
176       new MapMaker().weakKeys().makeMap();
177 
178   private Map<TableName, List<UserPermission>> tableAcls;
179 
180   // Provider for mapping principal names to Users
181   private UserProvider userProvider;
182 
183   // The list of users with superuser authority
184   private List<String> superusers;
185 
186   // if we are able to support cell ACLs
187   boolean cellFeaturesEnabled;
188 
189   // if we should check EXEC permissions
190   boolean shouldCheckExecPermission;
191 
192   // if we should terminate access checks early as soon as table or CF grants
193   // allow access; pre-0.98 compatible behavior
194   boolean compatibleEarlyTermination;
195 
196   private volatile boolean initialized = false;
197 
198   // This boolean having relevance only in the Master.
199   private volatile boolean aclTabAvailable = false;
200 
201   public HRegion getRegion() {
202     return regionEnv != null ? regionEnv.getRegion() : null;
203   }
204 
205   public TableAuthManager getAuthManager() {
206     return authManager;
207   }
208 
209   void initialize(RegionCoprocessorEnvironment e) throws IOException {
210     final HRegion region = e.getRegion();
211     Configuration conf = e.getConfiguration();
212     Map<byte[], ListMultimap<String,TablePermission>> tables =
213         AccessControlLists.loadAll(region);
214     // For each table, write out the table's permissions to the respective
215     // znode for that table.
216     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
217       tables.entrySet()) {
218       byte[] entry = t.getKey();
219       ListMultimap<String,TablePermission> perms = t.getValue();
220       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
221       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
222     }
223     initialized = true;
224   }
225 
226   /**
227    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
228    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
229    * table updates.
230    */
231   void updateACL(RegionCoprocessorEnvironment e,
232       final Map<byte[], List<Cell>> familyMap) {
233     Set<byte[]> entries =
234         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
235     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
236       List<Cell> cells = f.getValue();
237       for (Cell cell: cells) {
238         if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
239             cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
240             AccessControlLists.ACL_LIST_FAMILY.length)) {
241           entries.add(CellUtil.cloneRow(cell));
242         }
243       }
244     }
245     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
246     Configuration conf = regionEnv.getConfiguration();
247     for (byte[] entry: entries) {
248       try {
249         ListMultimap<String,TablePermission> perms =
250           AccessControlLists.getPermissions(conf, entry);
251         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
252         zkw.writeToZookeeper(entry, serialized);
253       } catch (IOException ex) {
254         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
255             ex);
256       }
257     }
258   }
259 
260   /**
261    * Check the current user for authorization to perform a specific action
262    * against the given set of row data.
263    *
264    * <p>Note: Ordering of the authorization checks
265    * has been carefully optimized to short-circuit the most common requests
266    * and minimize the amount of processing required.</p>
267    *
268    * @param permRequest the action being requested
269    * @param e the coprocessor environment
270    * @param families the map of column families to qualifiers present in
271    * the request
272    * @return an authorization result
273    */
274   AuthResult permissionGranted(String request, User user, Action permRequest,
275       RegionCoprocessorEnvironment e,
276       Map<byte [], ? extends Collection<?>> families) {
277     HRegionInfo hri = e.getRegion().getRegionInfo();
278     TableName tableName = hri.getTable();
279 
280     // 1. All users need read access to hbase:meta table.
281     // this is a very common operation, so deal with it quickly.
282     if (hri.isMetaRegion()) {
283       if (permRequest == Action.READ) {
284         return AuthResult.allow(request, "All users allowed", user,
285           permRequest, tableName, families);
286       }
287     }
288 
289     if (user == null) {
290       return AuthResult.deny(request, "No user associated with request!", null,
291         permRequest, tableName, families);
292     }
293 
294     // 2. check for the table-level, if successful we can short-circuit
295     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
296       return AuthResult.allow(request, "Table permission granted", user,
297         permRequest, tableName, families);
298     }
299 
300     // 3. check permissions against the requested families
301     if (families != null && families.size() > 0) {
302       // all families must pass
303       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
304         // a) check for family level access
305         if (authManager.authorize(user, tableName, family.getKey(),
306             permRequest)) {
307           continue;  // family-level permission overrides per-qualifier
308         }
309 
310         // b) qualifier level access can still succeed
311         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
312           if (family.getValue() instanceof Set) {
313             // for each qualifier of the family
314             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
315             for (byte[] qualifier : familySet) {
316               if (!authManager.authorize(user, tableName, family.getKey(),
317                                          qualifier, permRequest)) {
318                 return AuthResult.deny(request, "Failed qualifier check", user,
319                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
320               }
321             }
322           } else if (family.getValue() instanceof List) { // List<KeyValue>
323             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
324             for (KeyValue kv : kvList) {
325               if (!authManager.authorize(user, tableName, family.getKey(),
326                       kv.getQualifier(), permRequest)) {
327                 return AuthResult.deny(request, "Failed qualifier check", user,
328                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
329               }
330             }
331           }
332         } else {
333           // no qualifiers and family-level check already failed
334           return AuthResult.deny(request, "Failed family check", user, permRequest,
335               tableName, makeFamilyMap(family.getKey(), null));
336         }
337       }
338 
339       // all family checks passed
340       return AuthResult.allow(request, "All family checks passed", user, permRequest,
341           tableName, families);
342     }
343 
344     // 4. no families to check and table level access failed
345     return AuthResult.deny(request, "No families to check and table permission failed",
346         user, permRequest, tableName, families);
347   }
348 
349   /**
350    * Check the current user for authorization to perform a specific action
351    * against the given set of row data.
352    * @param opType the operation type
353    * @param user the user
354    * @param e the coprocessor environment
355    * @param families the map of column families to qualifiers present in
356    * the request
357    * @param actions the desired actions
358    * @return an authorization result
359    */
360   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
361       Map<byte [], ? extends Collection<?>> families, Action... actions) {
362     AuthResult result = null;
363     for (Action action: actions) {
364       result = permissionGranted(opType.toString(), user, action, e, families);
365       if (!result.isAllowed()) {
366         return result;
367       }
368     }
369     return result;
370   }
371 
372   private void logResult(AuthResult result) {
373     if (AUDITLOG.isTraceEnabled()) {
374       RequestContext ctx = RequestContext.get();
375       InetAddress remoteAddr = null;
376       if (ctx != null) {
377         remoteAddr = ctx.getRemoteAddress();
378       }
379       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
380           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
381           "; reason: " + result.getReason() +
382           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
383           "; request: " + result.getRequest() +
384           "; context: " + result.toContextString());
385     }
386   }
387 
388   /**
389    * Returns the active user to which authorization checks should be applied.
390    * If we are in the context of an RPC call, the remote user is used,
391    * otherwise the currently logged in user is used.
392    */
393   private User getActiveUser() throws IOException {
394     User user = RequestContext.getRequestUser();
395     if (!RequestContext.isInRequestContext()) {
396       // for non-rpc handling, fallback to system user
397       user = userProvider.getCurrent();
398     }
399     return user;
400   }
401 
402   /**
403    * Authorizes that the current user has any of the given permissions for the
404    * given table, column family and column qualifier.
405    * @param tableName Table requested
406    * @param family Column family requested
407    * @param qualifier Column qualifier requested
408    * @throws IOException if obtaining the current user fails
409    * @throws AccessDeniedException if user has no authorization
410    */
411   private void requirePermission(String request, TableName tableName, byte[] family, byte[] qualifier,
412       Action... permissions) throws IOException {
413     User user = getActiveUser();
414     AuthResult result = null;
415 
416     for (Action permission : permissions) {
417       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
418         result = AuthResult.allow(request, "Table permission granted", user,
419                                   permission, tableName, family, qualifier);
420         break;
421       } else {
422         // rest of the world
423         result = AuthResult.deny(request, "Insufficient permissions", user,
424                                  permission, tableName, family, qualifier);
425       }
426     }
427     logResult(result);
428     if (!result.isAllowed()) {
429       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
430     }
431   }
432 
433   /**
434    * Authorizes that the current user has global privileges for the given action.
435    * @param perm The action being requested
436    * @throws IOException if obtaining the current user fails
437    * @throws AccessDeniedException if authorization is denied
438    */
439   private void requirePermission(String request, Action perm) throws IOException {
440     requireGlobalPermission(request, perm, null, null);
441   }
442 
443   /**
444    * Authorizes that the current user has permission to perform the given
445    * action on the set of table column families.
446    * @param perm Action that is required
447    * @param env The current coprocessor environment
448    * @param families The map of column families-qualifiers.
449    * @throws AccessDeniedException if the authorization check failed
450    */
451   private void requirePermission(String request, Action perm,
452         RegionCoprocessorEnvironment env,
453         Map<byte[], ? extends Collection<?>> families)
454       throws IOException {
455     User user = getActiveUser();
456     AuthResult result = permissionGranted(request, user, perm, env, families);
457     logResult(result);
458 
459     if (!result.isAllowed()) {
460       throw new AccessDeniedException("Insufficient permissions (table=" +
461         env.getRegion().getTableDesc().getTableName()+
462         ((families != null && families.size() > 0) ? ", family: " +
463         result.toFamilyString() : "") + ", action=" +
464         perm.toString() + ")");
465     }
466   }
467 
468   /**
469    * Checks that the user has the given global permission. The generated
470    * audit log message will contain context information for the operation
471    * being authorized, based on the given parameters.
472    * @param perm Action being requested
473    * @param tableName Affected table name.
474    * @param familyMap Affected column families.
475    */
476   private void requireGlobalPermission(String request, Action perm, TableName tableName,
477       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
478     User user = getActiveUser();
479     if (authManager.authorize(user, perm) || (tableName != null &&
480         authManager.authorize(user, tableName.getNamespaceAsString(), perm))) {
481       logResult(AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap));
482     } else {
483       logResult(AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap));
484       throw new AccessDeniedException("Insufficient permissions for user '" +
485           (user != null ? user.getShortName() : "null") +"' (global, action=" +
486           perm.toString() + ")");
487     }
488   }
489 
490   /**
491    * Checks that the user has the given global permission. The generated
492    * audit log message will contain context information for the operation
493    * being authorized, based on the given parameters.
494    * @param perm Action being requested
495    * @param namespace
496    */
497   private void requireGlobalPermission(String request, Action perm,
498                                        String namespace) throws IOException {
499     User user = getActiveUser();
500     if (authManager.authorize(user, perm)) {
501       logResult(AuthResult.allow(request, "Global check allowed", user, perm, namespace));
502     } else {
503       logResult(AuthResult.deny(request, "Global check failed", user, perm, namespace));
504       throw new AccessDeniedException("Insufficient permissions for user '" +
505           (user != null ? user.getShortName() : "null") +"' (global, action=" +
506           perm.toString() + ")");
507     }
508   }
509 
510   /**
511    * Returns <code>true</code> if the current user is allowed the given action
512    * over at least one of the column qualifiers in the given column families.
513    */
514   private boolean hasFamilyQualifierPermission(User user,
515       Action perm,
516       RegionCoprocessorEnvironment env,
517       Map<byte[], ? extends Collection<byte[]>> familyMap)
518     throws IOException {
519     HRegionInfo hri = env.getRegion().getRegionInfo();
520     TableName tableName = hri.getTable();
521 
522     if (user == null) {
523       return false;
524     }
525 
526     if (familyMap != null && familyMap.size() > 0) {
527       // at least one family must be allowed
528       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
529           familyMap.entrySet()) {
530         if (family.getValue() != null && !family.getValue().isEmpty()) {
531           for (byte[] qualifier : family.getValue()) {
532             if (authManager.matchPermission(user, tableName,
533                 family.getKey(), qualifier, perm)) {
534               return true;
535             }
536           }
537         } else {
538           if (authManager.matchPermission(user, tableName, family.getKey(),
539               perm)) {
540             return true;
541           }
542         }
543       }
544     } else if (LOG.isDebugEnabled()) {
545       LOG.debug("Empty family map passed for permission check");
546     }
547 
548     return false;
549   }
550 
551   private enum OpType {
552     GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
553     GET("get"),
554     EXISTS("exists"),
555     SCAN("scan"),
556     PUT("put"),
557     DELETE("delete"),
558     CHECK_AND_PUT("checkAndPut"),
559     CHECK_AND_DELETE("checkAndDelete"),
560     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
561     APPEND("append"),
562     INCREMENT("increment");
563 
564     private String type;
565 
566     private OpType(String type) {
567       this.type = type;
568     }
569 
570     @Override
571     public String toString() {
572       return type;
573     }
574   }
575 
576   /**
577    * Determine if cell ACLs covered by the operation grant access. This is expensive.
578    * @return false if cell ACLs failed to grant access, true otherwise
579    * @throws IOException
580    */
581   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
582       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
583       throws IOException {
584     if (!cellFeaturesEnabled) {
585       return false;
586     }
587     long cellGrants = 0;
588     User user = getActiveUser();
589     long latestCellTs = 0;
590     Get get = new Get(row);
591     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
592     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
593     // version. We have to get every cell version and check its TS against the TS asked for in
594     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
595     // consider only one such passing cell. In case of Delete we have to consider all the cell
596     // versions under this passing version. When Delete Mutation contains columns which are a
597     // version delete just consider only one version for those column cells.
598     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
599     if (considerCellTs) {
600       get.setMaxVersions();
601     } else {
602       get.setMaxVersions(1);
603     }
604     boolean diffCellTsFromOpTs = false;
605     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
606       byte[] col = entry.getKey();
607       // TODO: HBASE-7114 could possibly unify the collection type in family
608       // maps so we would not need to do this
609       if (entry.getValue() instanceof Set) {
610         Set<byte[]> set = (Set<byte[]>)entry.getValue();
611         if (set == null || set.isEmpty()) {
612           get.addFamily(col);
613         } else {
614           for (byte[] qual: set) {
615             get.addColumn(col, qual);
616           }
617         }
618       } else if (entry.getValue() instanceof List) {
619         List<Cell> list = (List<Cell>)entry.getValue();
620         if (list == null || list.isEmpty()) {
621           get.addFamily(col);
622         } else {
623           // In case of family delete, a Cell will be added into the list with Qualifier as null.
624           for (Cell cell : list) {
625             if (cell.getQualifierLength() == 0
626                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
627                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
628               get.addFamily(col);
629             } else {
630               get.addColumn(col, CellUtil.cloneQualifier(cell));
631             }
632             if (considerCellTs) {
633               long cellTs = cell.getTimestamp();
634               latestCellTs = Math.max(latestCellTs, cellTs);
635               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
636             }
637           }
638         }
639       } else {
640         throw new RuntimeException("Unhandled collection type " +
641           entry.getValue().getClass().getName());
642       }
643     }
644     // We want to avoid looking into the future. So, if the cells of the
645     // operation specify a timestamp, or the operation itself specifies a
646     // timestamp, then we use the maximum ts found. Otherwise, we bound
647     // the Get to the current server time. We add 1 to the timerange since
648     // the upper bound of a timerange is exclusive yet we need to examine
649     // any cells found there inclusively.
650     long latestTs = Math.max(opTs, latestCellTs);
651     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
652       latestTs = EnvironmentEdgeManager.currentTime();
653     }
654     get.setTimeRange(0, latestTs + 1);
655     // In case of Put operation we set to read all versions. This was done to consider the case
656     // where columns are added with TS other than the Mutation TS. But normally this wont be the
657     // case with Put. There no need to get all versions but get latest version only.
658     if (!diffCellTsFromOpTs && request == OpType.PUT) {
659       get.setMaxVersions(1);
660     }
661     if (LOG.isTraceEnabled()) {
662       LOG.trace("Scanning for cells with " + get);
663     }
664     // This Map is identical to familyMap. The key is a BR rather than byte[].
665     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
666     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
667     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
668     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
669       if (entry.getValue() instanceof List) {
670         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
671       }
672     }
673     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
674     List<Cell> cells = Lists.newArrayList();
675     Cell prevCell = null;
676     ByteRange curFam = new SimpleMutableByteRange();
677     boolean curColAllVersions = (request == OpType.DELETE);
678     long curColCheckTs = opTs;
679     boolean foundColumn = false;
680     try {
681       boolean more = false;
682       do {
683         cells.clear();
684         // scan with limit as 1 to hold down memory use on wide rows
685         more = scanner.next(cells, 1);
686         for (Cell cell: cells) {
687           if (LOG.isTraceEnabled()) {
688             LOG.trace("Found cell " + cell);
689           }
690           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
691           if (colChange) foundColumn = false;
692           prevCell = cell;
693           if (!curColAllVersions && foundColumn) {
694             continue;
695           }
696           if (colChange && considerCellTs) {
697             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
698             List<Cell> cols = familyMap1.get(curFam);
699             for (Cell col : cols) {
700               // null/empty qualifier is used to denote a Family delete. The TS and delete type
701               // associated with this is applicable for all columns within the family. That is
702               // why the below (col.getQualifierLength() == 0) check.
703               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
704                   || CellUtil.matchingQualifier(cell, col)) {
705                 byte type = col.getTypeByte();
706                 if (considerCellTs) {
707                   curColCheckTs = col.getTimestamp();
708                 }
709                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
710                 // a version delete for a column no need to check all the covering cells within
711                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
712                 // One version delete types are Delete/DeleteFamilyVersion
713                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
714                     || (KeyValue.Type.DeleteFamily.getCode() == type);
715                 break;
716               }
717             }
718           }
719           if (cell.getTimestamp() > curColCheckTs) {
720             // Just ignore this cell. This is not a covering cell.
721             continue;
722           }
723           foundColumn = true;
724           for (Action action: actions) {
725             // Are there permissions for this user for the cell?
726             if (!authManager.authorize(user, getTableName(e), cell, action)) {
727               // We can stop if the cell ACL denies access
728               return false;
729             }
730           }
731           cellGrants++;
732         }
733       } while (more);
734     } catch (AccessDeniedException ex) {
735       throw ex;
736     } catch (IOException ex) {
737       LOG.error("Exception while getting cells to calculate covering permission", ex);
738     } finally {
739       scanner.close();
740     }
741     // We should not authorize unless we have found one or more cell ACLs that
742     // grant access. This code is used to check for additional permissions
743     // after no table or CF grants are found.
744     return cellGrants > 0;
745   }
746 
747   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
748     // Iterate over the entries in the familyMap, replacing the cells therein
749     // with new cells including the ACL data
750     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
751       List<Cell> newCells = Lists.newArrayList();
752       for (Cell cell: e.getValue()) {
753         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
754         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
755         if (cell.getTagsLength() > 0) {
756           Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
757             cell.getTagsOffset(), cell.getTagsLength());
758           while (tagIterator.hasNext()) {
759             tags.add(tagIterator.next());
760           }
761         }
762         newCells.add(new TagRewriteCell(cell, Tag.fromList(tags)));
763       }
764       // This is supposed to be safe, won't CME
765       e.setValue(newCells);
766     }
767   }
768 
769   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
770   // type is reserved and should not be explicitly set by user.
771   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
772     // Superusers are allowed to store cells unconditionally.
773     if (superusers.contains(user.getShortName())) {
774       return;
775     }
776     // We already checked (prePut vs preBatchMutation)
777     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
778       return;
779     }
780     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
781       Cell cell = cellScanner.current();
782       if (cell.getTagsLength() > 0) {
783         Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
784           cell.getTagsLength());
785         while (tagsItr.hasNext()) {
786           if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
787             throw new AccessDeniedException("Mutation contains cell with reserved type tag");
788           }
789         }
790       }
791     }
792     m.setAttribute(TAG_CHECK_PASSED, TRUE);
793   }
794 
795   /* ---- MasterObserver implementation ---- */
796 
797   public void start(CoprocessorEnvironment env) throws IOException {
798     CompoundConfiguration conf = new CompoundConfiguration();
799     conf.add(env.getConfiguration());
800 
801     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
802       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
803 
804     cellFeaturesEnabled = HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS;
805     if (!cellFeaturesEnabled) {
806       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
807           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
808           + " accordingly.");
809     }
810 
811     ZooKeeperWatcher zk = null;
812     if (env instanceof MasterCoprocessorEnvironment) {
813       // if running on HMaster
814       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
815       zk = mEnv.getMasterServices().getZooKeeper();
816     } else if (env instanceof RegionServerCoprocessorEnvironment) {
817       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
818       zk = rsEnv.getRegionServerServices().getZooKeeper();
819     } else if (env instanceof RegionCoprocessorEnvironment) {
820       // if running at region
821       regionEnv = (RegionCoprocessorEnvironment) env;
822       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
823       zk = regionEnv.getRegionServerServices().getZooKeeper();
824       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
825         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
826     }
827 
828     // set the user-provider.
829     this.userProvider = UserProvider.instantiate(env.getConfiguration());
830 
831     // set up the list of users with superuser privilege
832     User user = userProvider.getCurrent();
833     superusers = Lists.asList(user.getShortName(),
834       conf.getStrings(AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
835 
836     // If zk is null or IOException while obtaining auth manager,
837     // throw RuntimeException so that the coprocessor is unloaded.
838     if (zk != null) {
839       try {
840         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
841       } catch (IOException ioe) {
842         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
843       }
844     } else {
845       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
846     }
847 
848     tableAcls = new MapMaker().weakValues().makeMap();
849   }
850 
851   public void stop(CoprocessorEnvironment env) {
852 
853   }
854 
855   @Override
856   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
857       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
858     Set<byte[]> families = desc.getFamiliesKeys();
859     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
860     for (byte[] family: families) {
861       familyMap.put(family, null);
862     }
863     requireGlobalPermission("createTable", Action.CREATE, desc.getTableName(), familyMap);
864   }
865 
866   @Override
867   public void postCreateTableHandler(final ObserverContext<MasterCoprocessorEnvironment> c,
868       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
869     // When AC is used, it should be configured as the 1st CP.
870     // In Master, the table operations like create, are handled by a Thread pool but the max size
871     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
872     // sequentially only.
873     // Related code in HMaster#startServiceThreads
874     // {code}
875     //   // We depend on there being only one instance of this executor running
876     //   // at a time. To do concurrency, would need fencing of enable/disable of
877     //   // tables.
878     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
879     // {code}
880     // In future if we change this pool to have more threads, then there is a chance for thread,
881     // creating acl table, getting delayed and by that time another table creation got over and
882     // this hook is getting called. In such a case, we will need a wait logic here which will
883     // wait till the acl table is created.
884     if (AccessControlLists.isAclTable(desc)) {
885       this.aclTabAvailable = true;
886     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
887       if (!aclTabAvailable) {
888         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
889             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
890             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
891       } else {
892         String owner = desc.getOwnerString();
893         // default the table owner to current user, if not specified.
894         if (owner == null)
895           owner = getActiveUser().getShortName();
896         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
897             desc.getTableName(), null, Action.values());
898         // switch to the real hbase master user for doing the RPC on the ACL table
899         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
900           @Override
901           public Void run() throws Exception {
902             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
903                 userperm);
904             return null;
905           }
906         });
907       }
908     }
909   }
910 
911   @Override
912   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
913       throws IOException {
914     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
915   }
916 
917   @Override
918   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
919       final TableName tableName) throws IOException {
920     final Configuration conf = c.getEnvironment().getConfiguration();
921     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
922       @Override
923       public Void run() throws Exception {
924         AccessControlLists.removeTablePermissions(conf, tableName);
925         return null;
926       }
927     });
928   }
929 
930   @Override
931   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
932       final TableName tableName) throws IOException {
933     requirePermission("truncateTable", tableName, null, null, Action.ADMIN);
934     final Configuration conf = c.getEnvironment().getConfiguration();
935     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
936       @Override
937       public Void run() throws Exception {
938         List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
939         if (acls != null) {
940           tableAcls.put(tableName, acls);
941         }
942         return null;
943       }
944     });
945   }
946 
947   @Override
948   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
949       final TableName tableName) throws IOException {
950     final Configuration conf = ctx.getEnvironment().getConfiguration();
951     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
952       @Override
953       public Void run() throws Exception {
954         List<UserPermission> perms = tableAcls.get(tableName);
955         if (perms != null) {
956           for (UserPermission perm : perms) {
957             AccessControlLists.addUserPermission(conf, perm);
958           }
959         }
960         tableAcls.remove(tableName);
961         return null;
962       }
963     });
964   }
965 
966   @Override
967   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
968       HTableDescriptor htd) throws IOException {
969     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
970   }
971 
972   @Override
973   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
974       TableName tableName, final HTableDescriptor htd) throws IOException {
975     final Configuration conf = c.getEnvironment().getConfiguration();
976     // default the table owner to current user, if not specified.
977     final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() : 
978       getActiveUser().getShortName();
979     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
980       @Override
981       public Void run() throws Exception {
982         UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
983           htd.getTableName(), null, Action.values());
984         AccessControlLists.addUserPermission(conf, userperm);
985         return null;
986       }
987     });
988   }
989 
990   @Override
991   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
992       HColumnDescriptor column) throws IOException {
993     requirePermission("addColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
994   }
995 
996   @Override
997   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
998       HColumnDescriptor descriptor) throws IOException {
999     requirePermission("modifyColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
1000   }
1001 
1002   @Override
1003   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1004       byte[] col) throws IOException {
1005     requirePermission("deleteColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
1006   }
1007 
1008   @Override
1009   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
1010       final TableName tableName, final byte[] col) throws IOException {
1011     final Configuration conf = c.getEnvironment().getConfiguration();
1012     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1013       @Override
1014       public Void run() throws Exception {
1015         AccessControlLists.removeTablePermissions(conf, tableName, col);
1016         return null;
1017       }
1018     });
1019   }
1020 
1021   @Override
1022   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1023       throws IOException {
1024     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1025   }
1026 
1027   @Override
1028   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1029       throws IOException {
1030     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1031       throw new AccessDeniedException("Not allowed to disable "
1032           + AccessControlLists.ACL_TABLE_NAME + " table.");
1033     }
1034     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1035   }
1036 
1037   @Override
1038   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1039       ServerName srcServer, ServerName destServer) throws IOException {
1040     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1041   }
1042 
1043   @Override
1044   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1045       throws IOException {
1046     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1047   }
1048 
1049   @Override
1050   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1051       boolean force) throws IOException {
1052     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1053   }
1054 
1055   @Override
1056   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1057       HRegionInfo regionInfo) throws IOException {
1058     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1059   }
1060 
1061   @Override
1062   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1063       throws IOException {
1064     requirePermission("balance", Action.ADMIN);
1065   }
1066 
1067   @Override
1068   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1069       boolean newValue) throws IOException {
1070     requirePermission("balanceSwitch", Action.ADMIN);
1071     return newValue;
1072   }
1073 
1074   @Override
1075   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1076       throws IOException {
1077     requirePermission("shutdown", Action.ADMIN);
1078   }
1079 
1080   @Override
1081   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1082       throws IOException {
1083     requirePermission("stopMaster", Action.ADMIN);
1084   }
1085 
1086   @Override
1087   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1088       throws IOException {
1089     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1090       .getShortCircuitConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1091       // initialize the ACL storage table
1092       AccessControlLists.createACLTable(ctx.getEnvironment().getMasterServices());
1093     } else {
1094       aclTabAvailable = true;
1095     }
1096   }
1097 
1098   @Override
1099   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1100       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1101       throws IOException {
1102     requirePermission("snapshot", hTableDescriptor.getTableName(), null, null,
1103       Permission.Action.ADMIN);
1104   }
1105 
1106   @Override
1107   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1108       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1109       throws IOException {
1110     requirePermission("clone", Action.ADMIN);
1111   }
1112 
1113   @Override
1114   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1115       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1116       throws IOException {
1117     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1118       requirePermission("restoreSnapshot", hTableDescriptor.getTableName(), null, null,
1119         Permission.Action.ADMIN);
1120     } else {
1121       requirePermission("restore", Action.ADMIN);
1122     }
1123   }
1124 
1125   @Override
1126   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1127       final SnapshotDescription snapshot) throws IOException {
1128     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1129       // Snapshot owner is allowed to delete the snapshot
1130     } else {
1131       requirePermission("deleteSnapshot", Action.ADMIN);
1132     }
1133   }
1134 
1135   @Override
1136   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1137       NamespaceDescriptor ns) throws IOException {
1138     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1139   }
1140 
1141   @Override
1142   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1143       throws IOException {
1144     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1145   }
1146 
1147   @Override
1148   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1149       final String namespace) throws IOException {
1150     final Configuration conf = ctx.getEnvironment().getConfiguration();
1151     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1152       @Override
1153       public Void run() throws Exception {
1154         AccessControlLists.removeNamespacePermissions(conf, namespace);
1155         return null;
1156       }
1157     });
1158     LOG.info(namespace + "entry deleted in "+AccessControlLists.ACL_TABLE_NAME+" table.");
1159   }
1160 
1161   @Override
1162   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1163       NamespaceDescriptor ns) throws IOException {
1164     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1165   }
1166 
1167   @Override
1168   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1169       final TableName tableName) throws IOException {
1170     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1171   }
1172 
1173   /* ---- RegionObserver implementation ---- */
1174 
1175   @Override
1176   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1177       throws IOException {
1178     RegionCoprocessorEnvironment env = e.getEnvironment();
1179     final HRegion region = env.getRegion();
1180     if (region == null) {
1181       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1182     } else {
1183       HRegionInfo regionInfo = region.getRegionInfo();
1184       if (regionInfo.getTable().isSystemTable()) {
1185         isSystemOrSuperUser(regionEnv.getConfiguration());
1186       } else {
1187         requirePermission("preOpen", Action.ADMIN);
1188       }
1189     }
1190   }
1191 
1192   @Override
1193   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1194     RegionCoprocessorEnvironment env = c.getEnvironment();
1195     final HRegion region = env.getRegion();
1196     if (region == null) {
1197       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1198       return;
1199     }
1200     if (AccessControlLists.isAclRegion(region)) {
1201       aclRegion = true;
1202       // When this region is under recovering state, initialize will be handled by postLogReplay
1203       if (!region.isRecovering()) {
1204         try {
1205           initialize(env);
1206         } catch (IOException ex) {
1207           // if we can't obtain permissions, it's better to fail
1208           // than perform checks incorrectly
1209           throw new RuntimeException("Failed to initialize permissions cache", ex);
1210         }
1211       }
1212     } else {
1213       initialized = true;
1214     }
1215   }
1216 
1217   @Override
1218   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1219     if (aclRegion) {
1220       try {
1221         initialize(c.getEnvironment());
1222       } catch (IOException ex) {
1223         // if we can't obtain permissions, it's better to fail
1224         // than perform checks incorrectly
1225         throw new RuntimeException("Failed to initialize permissions cache", ex);
1226       }
1227     }
1228   }
1229 
1230   @Override
1231   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1232     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1233         Action.CREATE);
1234   }
1235 
1236   @Override
1237   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1238     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1239   }
1240 
1241   @Override
1242   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1243       byte[] splitRow) throws IOException {
1244     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1245   }
1246 
1247   @Override
1248   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1249       final Store store, final InternalScanner scanner, final ScanType scanType)
1250           throws IOException {
1251     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1252         Action.CREATE);
1253     return scanner;
1254   }
1255 
1256   @Override
1257   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1258       final byte [] row, final byte [] family, final Result result)
1259       throws IOException {
1260     assert family != null;
1261     RegionCoprocessorEnvironment env = c.getEnvironment();
1262     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, null);
1263     User user = getActiveUser();
1264     AuthResult authResult = permissionGranted(OpType.GET_CLOSEST_ROW_BEFORE, user, env, families,
1265       Action.READ);
1266     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1267       authResult.setAllowed(checkCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, env, row,
1268         families, HConstants.LATEST_TIMESTAMP, Action.READ));
1269       authResult.setReason("Covering cell set");
1270     }
1271     logResult(authResult);
1272     if (!authResult.isAllowed()) {
1273       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1274     }
1275   }
1276 
1277   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1278       final Query query, OpType opType) throws IOException {
1279     Filter filter = query.getFilter();
1280     // Don't wrap an AccessControlFilter
1281     if (filter != null && filter instanceof AccessControlFilter) {
1282       return;
1283     }
1284     User user = getActiveUser();
1285     RegionCoprocessorEnvironment env = c.getEnvironment();
1286     Map<byte[],? extends Collection<byte[]>> families = null;
1287     switch (opType) {
1288     case GET:
1289     case EXISTS:
1290       families = ((Get)query).getFamilyMap();
1291       break;
1292     case SCAN:
1293       families = ((Scan)query).getFamilyMap();
1294       break;
1295     default:
1296       throw new RuntimeException("Unhandled operation " + opType);
1297     }
1298     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1299     HRegion region = getRegion(env);
1300     TableName table = getTableName(region);
1301     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1302     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1303       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1304     }
1305     if (!authResult.isAllowed()) {
1306       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1307         // Old behavior: Scan with only qualifier checks if we have partial
1308         // permission. Backwards compatible behavior is to throw an
1309         // AccessDeniedException immediately if there are no grants for table
1310         // or CF or CF+qual. Only proceed with an injected filter if there are
1311         // grants for qualifiers. Otherwise we will fall through below and log
1312         // the result and throw an ADE. We may end up checking qualifier
1313         // grants three times (permissionGranted above, here, and in the
1314         // filter) but that's the price of backwards compatibility.
1315         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1316           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1317             AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1318             cfVsMaxVersions);
1319           // wrap any existing filter
1320           if (filter != null) {
1321             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1322               Lists.newArrayList(ourFilter, filter));
1323           }
1324           authResult.setAllowed(true);
1325           authResult.setReason("Access allowed with filter");
1326           switch (opType) {
1327           case GET:
1328           case EXISTS:
1329             ((Get)query).setFilter(ourFilter);
1330             break;
1331           case SCAN:
1332             ((Scan)query).setFilter(ourFilter);
1333             break;
1334           default:
1335             throw new RuntimeException("Unhandled operation " + opType);
1336           }
1337         }
1338       } else {
1339         // New behavior: Any access we might be granted is more fine-grained
1340         // than whole table or CF. Simply inject a filter and return what is
1341         // allowed. We will not throw an AccessDeniedException. This is a
1342         // behavioral change since 0.96.
1343         Filter ourFilter = new AccessControlFilter(authManager, user, table,
1344           AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1345         // wrap any existing filter
1346         if (filter != null) {
1347           ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1348             Lists.newArrayList(ourFilter, filter));
1349         }
1350         authResult.setAllowed(true);
1351         authResult.setReason("Access allowed with filter");
1352         switch (opType) {
1353         case GET:
1354         case EXISTS:
1355           ((Get)query).setFilter(ourFilter);
1356           break;
1357         case SCAN:
1358           ((Scan)query).setFilter(ourFilter);
1359           break;
1360         default:
1361           throw new RuntimeException("Unhandled operation " + opType);
1362         }
1363       }
1364     }
1365 
1366     logResult(authResult);
1367     if (!authResult.isAllowed()) {
1368       throw new AccessDeniedException("Insufficient permissions (table=" + table +
1369         ", action=READ)");
1370     }
1371   }
1372 
1373   @Override
1374   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1375       final Get get, final List<Cell> result) throws IOException {
1376     internalPreRead(c, get, OpType.GET);
1377   }
1378 
1379   @Override
1380   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1381       final Get get, final boolean exists) throws IOException {
1382     internalPreRead(c, get, OpType.EXISTS);
1383     return exists;
1384   }
1385 
1386   @Override
1387   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1388       final Put put, final WALEdit edit, final Durability durability)
1389       throws IOException {
1390     // Require WRITE permission to the table, CF, or top visible value, if any.
1391     // NOTE: We don't need to check the permissions for any earlier Puts
1392     // because we treat the ACLs in each Put as timestamped like any other
1393     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1394     // change the ACL of any previous Put. This allows simple evolution of
1395     // security policy over time without requiring expensive updates.
1396     User user = getActiveUser();
1397     checkForReservedTagPresence(user, put);
1398     RegionCoprocessorEnvironment env = c.getEnvironment();
1399     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1400     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1401     logResult(authResult);
1402     if (!authResult.isAllowed()) {
1403       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1404         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1405       } else {
1406         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1407       }
1408     }
1409     // Add cell ACLs from the operation to the cells themselves
1410     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1411     if (bytes != null) {
1412       if (cellFeaturesEnabled) {
1413         addCellPermissions(bytes, put.getFamilyCellMap());
1414       } else {
1415         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1416       }
1417     }
1418   }
1419 
1420   @Override
1421   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1422       final Put put, final WALEdit edit, final Durability durability) {
1423     if (aclRegion) {
1424       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1425     }
1426   }
1427 
1428   @Override
1429   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1430       final Delete delete, final WALEdit edit, final Durability durability)
1431       throws IOException {
1432     // An ACL on a delete is useless, we shouldn't allow it
1433     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1434       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1435     }
1436     // Require WRITE permissions on all cells covered by the delete. Unlike
1437     // for Puts we need to check all visible prior versions, because a major
1438     // compaction could remove them. If the user doesn't have permission to
1439     // overwrite any of the visible versions ('visible' defined as not covered
1440     // by a tombstone already) then we have to disallow this operation.
1441     RegionCoprocessorEnvironment env = c.getEnvironment();
1442     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1443     User user = getActiveUser();
1444     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1445     logResult(authResult);
1446     if (!authResult.isAllowed()) {
1447       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1448         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1449       } else {
1450         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1451       }
1452     }
1453   }
1454 
1455   @Override
1456   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1457       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1458     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1459       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1460       for (int i = 0; i < miniBatchOp.size(); i++) {
1461         Mutation m = miniBatchOp.getOperation(i);
1462         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1463           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1464           // perm check
1465           OpType opType;
1466           if (m instanceof Put) {
1467             checkForReservedTagPresence(getActiveUser(), m);
1468             opType = OpType.PUT;
1469           } else {
1470             opType = OpType.DELETE;
1471           }
1472           AuthResult authResult = null;
1473           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(), m.getFamilyCellMap(),
1474               m.getTimeStamp(), Action.WRITE)) {
1475             authResult = AuthResult.allow(opType.toString(), "Covering cell set", getActiveUser(),
1476                 Action.WRITE, table, m.getFamilyCellMap());
1477           } else {
1478             authResult = AuthResult.deny(opType.toString(), "Covering cell set", getActiveUser(),
1479                 Action.WRITE, table, m.getFamilyCellMap());
1480           }
1481           logResult(authResult);
1482           if (!authResult.isAllowed()) {
1483             throw new AccessDeniedException("Insufficient permissions "
1484                 + authResult.toContextString());
1485           }
1486         }
1487       }
1488     }
1489   }
1490 
1491   @Override
1492   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1493       final Delete delete, final WALEdit edit, final Durability durability)
1494       throws IOException {
1495     if (aclRegion) {
1496       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1497     }
1498   }
1499 
1500   @Override
1501   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1502       final byte [] row, final byte [] family, final byte [] qualifier,
1503       final CompareFilter.CompareOp compareOp,
1504       final ByteArrayComparable comparator, final Put put,
1505       final boolean result) throws IOException {
1506     // Require READ and WRITE permissions on the table, CF, and KV to update
1507     User user = getActiveUser();
1508     checkForReservedTagPresence(user, put);
1509     RegionCoprocessorEnvironment env = c.getEnvironment();
1510     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1511     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1512       Action.READ, Action.WRITE);
1513     logResult(authResult);
1514     if (!authResult.isAllowed()) {
1515       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1516         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1517       } else {
1518         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1519       }
1520     }
1521     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1522     if (bytes != null) {
1523       if (cellFeaturesEnabled) {
1524         addCellPermissions(bytes, put.getFamilyCellMap());
1525       } else {
1526         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1527       }
1528     }
1529     return result;
1530   }
1531 
1532   @Override
1533   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1534       final byte[] row, final byte[] family, final byte[] qualifier,
1535       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1536       final boolean result) throws IOException {
1537     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1538       // We had failure with table, cf and q perm checks and now giving a chance for cell
1539       // perm check
1540       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1541       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1542       AuthResult authResult = null;
1543       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1544           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1545         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1546             getActiveUser(), Action.READ, table, families);
1547       } else {
1548         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1549             getActiveUser(), Action.READ, table, families);
1550       }
1551       logResult(authResult);
1552       if (!authResult.isAllowed()) {
1553         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1554       }
1555     }
1556     return result;
1557   }
1558 
1559   @Override
1560   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1561       final byte [] row, final byte [] family, final byte [] qualifier,
1562       final CompareFilter.CompareOp compareOp,
1563       final ByteArrayComparable comparator, final Delete delete,
1564       final boolean result) throws IOException {
1565     // An ACL on a delete is useless, we shouldn't allow it
1566     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1567       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1568           delete.toString());
1569     }
1570     // Require READ and WRITE permissions on the table, CF, and the KV covered
1571     // by the delete
1572     RegionCoprocessorEnvironment env = c.getEnvironment();
1573     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1574     User user = getActiveUser();
1575     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1576       Action.READ, Action.WRITE);
1577     logResult(authResult);
1578     if (!authResult.isAllowed()) {
1579       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1580         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1581       } else {
1582         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1583       }
1584     }
1585     return result;
1586   }
1587 
1588   @Override
1589   public boolean preCheckAndDeleteAfterRowLock(
1590       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1591       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1592       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1593       throws IOException {
1594     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1595       // We had failure with table, cf and q perm checks and now giving a chance for cell
1596       // perm check
1597       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1598       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1599       AuthResult authResult = null;
1600       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1601           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1602         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1603             getActiveUser(), Action.READ, table, families);
1604       } else {
1605         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1606             getActiveUser(), Action.READ, table, families);
1607       }
1608       logResult(authResult);
1609       if (!authResult.isAllowed()) {
1610         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1611       }
1612     }
1613     return result;
1614   }
1615 
1616   @Override
1617   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1618       final byte [] row, final byte [] family, final byte [] qualifier,
1619       final long amount, final boolean writeToWAL)
1620       throws IOException {
1621     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1622     // incremented value
1623     RegionCoprocessorEnvironment env = c.getEnvironment();
1624     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1625     User user = getActiveUser();
1626     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1627       Action.WRITE);
1628     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1629       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1630         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1631       authResult.setReason("Covering cell set");
1632     }
1633     logResult(authResult);
1634     if (!authResult.isAllowed()) {
1635       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1636     }
1637     return -1;
1638   }
1639 
1640   @Override
1641   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1642       throws IOException {
1643     // Require WRITE permission to the table, CF, and the KV to be appended
1644     User user = getActiveUser();
1645     checkForReservedTagPresence(user, append);
1646     RegionCoprocessorEnvironment env = c.getEnvironment();
1647     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1648     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1649     logResult(authResult);
1650     if (!authResult.isAllowed()) {
1651       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1652         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1653       } else {
1654         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1655       }
1656     }
1657     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1658     if (bytes != null) {
1659       if (cellFeaturesEnabled) {
1660         addCellPermissions(bytes, append.getFamilyCellMap());
1661       } else {
1662         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1663       }
1664     }
1665     return null;
1666   }
1667 
1668   @Override
1669   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1670       final Append append) throws IOException {
1671     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1672       // We had failure with table, cf and q perm checks and now giving a chance for cell
1673       // perm check
1674       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1675       AuthResult authResult = null;
1676       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1677           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1678         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1679             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1680       } else {
1681         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1682             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1683       }
1684       logResult(authResult);
1685       if (!authResult.isAllowed()) {
1686         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1687       }
1688     }
1689     return null;
1690   }
1691 
1692   @Override
1693   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1694       final Increment increment)
1695       throws IOException {
1696     // Require WRITE permission to the table, CF, and the KV to be replaced by
1697     // the incremented value
1698     User user = getActiveUser();
1699     checkForReservedTagPresence(user, increment);
1700     RegionCoprocessorEnvironment env = c.getEnvironment();
1701     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1702     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1703       Action.WRITE);
1704     logResult(authResult);
1705     if (!authResult.isAllowed()) {
1706       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1707         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1708       } else {
1709         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1710       }
1711     }
1712     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1713     if (bytes != null) {
1714       if (cellFeaturesEnabled) {
1715         addCellPermissions(bytes, increment.getFamilyCellMap());
1716       } else {
1717         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1718       }
1719     }
1720     return null;
1721   }
1722 
1723   @Override
1724   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1725       final Increment increment) throws IOException {
1726     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1727       // We had failure with table, cf and q perm checks and now giving a chance for cell
1728       // perm check
1729       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1730       AuthResult authResult = null;
1731       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1732           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1733         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1734             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1735       } else {
1736         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1737             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1738       }
1739       logResult(authResult);
1740       if (!authResult.isAllowed()) {
1741         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1742       }
1743     }
1744     return null;
1745   }
1746 
1747   @Override
1748   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1749       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1750     // If the HFile version is insufficient to persist tags, we won't have any
1751     // work to do here
1752     if (!cellFeaturesEnabled) {
1753       return newCell;
1754     }
1755 
1756     // Collect any ACLs from the old cell
1757     List<Tag> tags = Lists.newArrayList();
1758     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1759     if (oldCell != null) {
1760       // Save an object allocation where we can
1761       if (oldCell.getTagsLength() > 0) {
1762         Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell.getTagsArray(),
1763           oldCell.getTagsOffset(), oldCell.getTagsLength());
1764         while (tagIterator.hasNext()) {
1765           Tag tag = tagIterator.next();
1766           if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
1767             // Not an ACL tag, just carry it through
1768             if (LOG.isTraceEnabled()) {
1769               LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
1770                 " length " + tag.getTagLength());
1771             }
1772             tags.add(tag);
1773           } else {
1774             // Merge the perms from the older ACL into the current permission set
1775             ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
1776               AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
1777                 tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
1778             perms.putAll(kvPerms);
1779           }
1780         }
1781       }
1782     }
1783 
1784     // Do we have an ACL on the operation?
1785     byte[] aclBytes = mutation.getACL();
1786     if (aclBytes != null) {
1787       // Yes, use it
1788       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
1789     } else {
1790       // No, use what we carried forward
1791       if (perms != null) {
1792         // TODO: If we collected ACLs from more than one tag we may have a
1793         // List<Permission> of size > 1, this can be collapsed into a single
1794         // Permission
1795         if (LOG.isTraceEnabled()) {
1796           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1797         }
1798         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
1799           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
1800       }
1801     }
1802 
1803     // If we have no tags to add, just return
1804     if (tags.isEmpty()) {
1805       return newCell;
1806     }
1807 
1808     Cell rewriteCell = new TagRewriteCell(newCell, Tag.fromList(tags));
1809     return rewriteCell;
1810   }
1811 
1812   @Override
1813   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1814       final Scan scan, final RegionScanner s) throws IOException {
1815     internalPreRead(c, scan, OpType.SCAN);
1816     return s;
1817   }
1818 
1819   @Override
1820   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1821       final Scan scan, final RegionScanner s) throws IOException {
1822     User user = getActiveUser();
1823     if (user != null && user.getShortName() != null) {      // store reference to scanner owner for later checks
1824       scannerOwners.put(s, user.getShortName());
1825     }
1826     return s;
1827   }
1828 
1829   @Override
1830   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1831       final InternalScanner s, final List<Result> result,
1832       final int limit, final boolean hasNext) throws IOException {
1833     requireScannerOwner(s);
1834     return hasNext;
1835   }
1836 
1837   @Override
1838   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1839       final InternalScanner s) throws IOException {
1840     requireScannerOwner(s);
1841   }
1842 
1843   @Override
1844   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1845       final InternalScanner s) throws IOException {
1846     // clean up any associated owner mapping
1847     scannerOwners.remove(s);
1848   }
1849 
1850   /**
1851    * Verify, when servicing an RPC, that the caller is the scanner owner.
1852    * If so, we assume that access control is correctly enforced based on
1853    * the checks performed in preScannerOpen()
1854    */
1855   private void requireScannerOwner(InternalScanner s)
1856       throws AccessDeniedException {
1857     if (RequestContext.isInRequestContext()) {
1858       String requestUserName = RequestContext.getRequestUserName();
1859       String owner = scannerOwners.get(s);
1860       if (owner != null && !owner.equals(requestUserName)) {
1861         throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
1862       }
1863     }
1864   }
1865 
1866   /**
1867    * Verifies user has WRITE privileges on
1868    * the Column Families involved in the bulkLoadHFile
1869    * request. Specific Column Write privileges are presently
1870    * ignored.
1871    */
1872   @Override
1873   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1874       List<Pair<byte[], String>> familyPaths) throws IOException {
1875     for(Pair<byte[],String> el : familyPaths) {
1876       requirePermission("preBulkLoadHFile",
1877           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
1878           el.getFirst(),
1879           null,
1880           Action.CREATE);
1881     }
1882   }
1883 
1884   private AuthResult hasSomeAccess(RegionCoprocessorEnvironment e, String method, Action action)
1885       throws IOException {
1886     User requestUser = getActiveUser();
1887     final TableName tableName = e.getRegion().getTableDesc().getTableName();
1888     AuthResult authResult = permissionGranted(method, requestUser, action, e,
1889       Collections.EMPTY_MAP);
1890     if (!authResult.isAllowed()) {
1891       final Configuration conf = e.getConfiguration();
1892       // hasSomeAccess is called from bulkload pre hooks
1893       List<UserPermission> perms =
1894         User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
1895           @Override
1896           public List<UserPermission> run() throws Exception {
1897             return AccessControlLists.getUserTablePermissions(conf, tableName);
1898           }
1899         });
1900       for (UserPermission userPerm: perms) {
1901         for (Action userAction: userPerm.getActions()) {
1902           if (userAction.equals(action)) {
1903             return AuthResult.allow(method, "Access allowed", requestUser,
1904               action, tableName, null, null);
1905           }
1906         }
1907       }
1908     }
1909     return authResult;
1910   }
1911 
1912   /**
1913    * Authorization check for
1914    * SecureBulkLoadProtocol.prepareBulkLoad()
1915    * @param ctx the context
1916    * @param request the request
1917    * @throws IOException
1918    */
1919   @Override
1920   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
1921                                  PrepareBulkLoadRequest request) throws IOException {
1922     RegionCoprocessorEnvironment e = ctx.getEnvironment();
1923 
1924     AuthResult authResult = hasSomeAccess(e, "prePrepareBulkLoad", Action.WRITE);
1925     logResult(authResult);
1926     if (!authResult.isAllowed()) {
1927       throw new AccessDeniedException("Insufficient permissions (table=" +
1928         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
1929     }
1930   }
1931 
1932   /**
1933    * Authorization security check for
1934    * SecureBulkLoadProtocol.cleanupBulkLoad()
1935    * @param ctx the context
1936    * @param request the request
1937    * @throws IOException
1938    */
1939   @Override
1940   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
1941                                  CleanupBulkLoadRequest request) throws IOException {
1942     RegionCoprocessorEnvironment e = ctx.getEnvironment();
1943 
1944     AuthResult authResult = hasSomeAccess(e, "preCleanupBulkLoad", Action.WRITE);
1945     logResult(authResult);
1946     if (!authResult.isAllowed()) {
1947       throw new AccessDeniedException("Insufficient permissions (table=" +
1948         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
1949     }
1950   }
1951 
1952   /* ---- EndpointObserver implementation ---- */
1953 
1954   @Override
1955   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1956       Service service, String methodName, Message request) throws IOException {
1957     // Don't intercept calls to our own AccessControlService, we check for
1958     // appropriate permissions in the service handlers
1959     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
1960       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
1961         methodName + ")",
1962         getTableName(ctx.getEnvironment()), null, null,
1963         Action.EXEC);
1964     }
1965     return request;
1966   }
1967 
1968   @Override
1969   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1970       Service service, String methodName, Message request, Message.Builder responseBuilder)
1971       throws IOException { }
1972 
1973   /* ---- Protobuf AccessControlService implementation ---- */
1974 
1975   @Override
1976   public void grant(RpcController controller,
1977                     AccessControlProtos.GrantRequest request,
1978                     RpcCallback<AccessControlProtos.GrantResponse> done) {
1979     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
1980     AccessControlProtos.GrantResponse response = null;
1981     try {
1982       // verify it's only running at .acl.
1983       if (aclRegion) {
1984         if (!initialized) {
1985           throw new CoprocessorException("AccessController not yet initialized");
1986         }
1987         if (LOG.isDebugEnabled()) {
1988           LOG.debug("Received request to grant access permission " + perm.toString());
1989         }
1990 
1991         switch(request.getUserPermission().getPermission().getType()) {
1992           case Global :
1993           case Table :
1994             requirePermission("grant", perm.getTableName(), perm.getFamily(),
1995                 perm.getQualifier(), Action.ADMIN);
1996             break;
1997           case Namespace :
1998             requireGlobalPermission("grant", Action.ADMIN, perm.getNamespace());
1999             break;
2000         }
2001 
2002         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2003           @Override
2004           public Void run() throws Exception {
2005             AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2006             return null;
2007           }
2008         });
2009 
2010         if (AUDITLOG.isTraceEnabled()) {
2011           // audit log should store permission changes in addition to auth results
2012           AUDITLOG.trace("Granted permission " + perm.toString());
2013         }
2014       } else {
2015         throw new CoprocessorException(AccessController.class, "This method "
2016             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2017       }
2018       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2019     } catch (IOException ioe) {
2020       // pass exception back up
2021       ResponseConverter.setControllerException(controller, ioe);
2022     }
2023     done.run(response);
2024   }
2025 
2026   @Override
2027   public void revoke(RpcController controller,
2028                      AccessControlProtos.RevokeRequest request,
2029                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2030     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2031     AccessControlProtos.RevokeResponse response = null;
2032     try {
2033       // only allowed to be called on _acl_ region
2034       if (aclRegion) {
2035         if (!initialized) {
2036           throw new CoprocessorException("AccessController not yet initialized");
2037         }
2038         if (LOG.isDebugEnabled()) {
2039           LOG.debug("Received request to revoke access permission " + perm.toString());
2040         }
2041 
2042         switch(request.getUserPermission().getPermission().getType()) {
2043           case Global :
2044           case Table :
2045             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2046                               perm.getQualifier(), Action.ADMIN);
2047             break;
2048           case Namespace :
2049             requireGlobalPermission("revoke", Action.ADMIN, perm.getNamespace());
2050             break;
2051         }
2052 
2053         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2054           @Override
2055           public Void run() throws Exception {
2056             AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2057             return null;
2058           }
2059         });
2060 
2061         if (AUDITLOG.isTraceEnabled()) {
2062           // audit log should record all permission changes
2063           AUDITLOG.trace("Revoked permission " + perm.toString());
2064         }
2065       } else {
2066         throw new CoprocessorException(AccessController.class, "This method "
2067             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2068       }
2069       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2070     } catch (IOException ioe) {
2071       // pass exception back up
2072       ResponseConverter.setControllerException(controller, ioe);
2073     }
2074     done.run(response);
2075   }
2076 
2077   @Override
2078   public void getUserPermissions(RpcController controller,
2079                                  AccessControlProtos.GetUserPermissionsRequest request,
2080                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2081     AccessControlProtos.GetUserPermissionsResponse response = null;
2082     try {
2083       // only allowed to be called on _acl_ region
2084       if (aclRegion) {
2085         if (!initialized) {
2086           throw new CoprocessorException("AccessController not yet initialized");
2087         }
2088         List<UserPermission> perms = null;
2089         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2090           final TableName table = request.hasTableName() ?
2091             ProtobufUtil.toTableName(request.getTableName()) : null;
2092           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2093           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2094             @Override
2095             public List<UserPermission> run() throws Exception {
2096               return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2097             }
2098           });
2099         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2100           final String namespace = request.getNamespaceName().toStringUtf8();
2101           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2102             @Override
2103             public List<UserPermission> run() throws Exception {
2104               return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2105                 namespace);
2106             }
2107           });
2108         } else {
2109           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2110             @Override
2111             public List<UserPermission> run() throws Exception {
2112               return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2113             }
2114           });
2115         }
2116         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2117       } else {
2118         throw new CoprocessorException(AccessController.class, "This method "
2119             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2120       }
2121     } catch (IOException ioe) {
2122       // pass exception back up
2123       ResponseConverter.setControllerException(controller, ioe);
2124     }
2125     done.run(response);
2126   }
2127 
2128   @Override
2129   public void checkPermissions(RpcController controller,
2130                                AccessControlProtos.CheckPermissionsRequest request,
2131                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2132     Permission[] permissions = new Permission[request.getPermissionCount()];
2133     for (int i=0; i < request.getPermissionCount(); i++) {
2134       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2135     }
2136     AccessControlProtos.CheckPermissionsResponse response = null;
2137     try {
2138       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2139       for (Permission permission : permissions) {
2140         if (permission instanceof TablePermission) {
2141           TablePermission tperm = (TablePermission) permission;
2142           for (Action action : permission.getActions()) {
2143             if (!tperm.getTableName().equals(tableName)) {
2144               throw new CoprocessorException(AccessController.class, String.format("This method "
2145                   + "can only execute at the table specified in TablePermission. " +
2146                   "Table of the region:%s , requested table:%s", tableName,
2147                   tperm.getTableName()));
2148             }
2149 
2150             Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2151             if (tperm.getFamily() != null) {
2152               if (tperm.getQualifier() != null) {
2153                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2154                 qualifiers.add(tperm.getQualifier());
2155                 familyMap.put(tperm.getFamily(), qualifiers);
2156               } else {
2157                 familyMap.put(tperm.getFamily(), null);
2158               }
2159             }
2160 
2161             requirePermission("checkPermissions", action, regionEnv, familyMap);
2162           }
2163 
2164         } else {
2165           for (Action action : permission.getActions()) {
2166             requirePermission("checkPermissions", action);
2167           }
2168         }
2169       }
2170       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2171     } catch (IOException ioe) {
2172       ResponseConverter.setControllerException(controller, ioe);
2173     }
2174     done.run(response);
2175   }
2176 
2177   @Override
2178   public Service getService() {
2179     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2180   }
2181 
2182   private HRegion getRegion(RegionCoprocessorEnvironment e) {
2183     return e.getRegion();
2184   }
2185 
2186   private TableName getTableName(RegionCoprocessorEnvironment e) {
2187     HRegion region = e.getRegion();
2188     if (region != null) {
2189       return getTableName(region);
2190     }
2191     return null;
2192   }
2193 
2194   private TableName getTableName(HRegion region) {
2195     HRegionInfo regionInfo = region.getRegionInfo();
2196     if (regionInfo != null) {
2197       return regionInfo.getTable();
2198     }
2199     return null;
2200   }
2201 
2202   @Override
2203   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2204       throws IOException {
2205     requirePermission("preClose", Action.ADMIN);
2206   }
2207 
2208   private void isSystemOrSuperUser(Configuration conf) throws IOException {
2209     User user = userProvider.getCurrent();
2210     if (user == null) {
2211       throw new IOException("Unable to obtain the current user, " +
2212         "authorization checks for internal operations will not work correctly!");
2213     }
2214     User activeUser = getActiveUser();
2215     if (!(superusers.contains(activeUser.getShortName()))) {
2216       throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null") +
2217         "is not system or super user.");
2218     }
2219   }
2220 
2221   @Override
2222   public void preStopRegionServer(
2223       ObserverContext<RegionServerCoprocessorEnvironment> env)
2224       throws IOException {
2225     requirePermission("preStopRegionServer", Action.ADMIN);
2226   }
2227 
2228   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2229       byte[] qualifier) {
2230     if (family == null) {
2231       return null;
2232     }
2233 
2234     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2235     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2236     return familyMap;
2237   }
2238 
2239   @Override
2240   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2241       List<TableName> tableNamesList,
2242       List<HTableDescriptor> descriptors) throws IOException {
2243     // If the list is empty, this is a request for all table descriptors and requires GLOBAL
2244     // ADMIN privs.
2245     if (tableNamesList == null || tableNamesList.isEmpty()) {
2246       requireGlobalPermission("getTableDescriptors", Action.ADMIN, null, null);
2247     }
2248     // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2249     // request can be granted.
2250     else {
2251       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2252       for (TableName tableName: tableNamesList) {
2253         // Skip checks for a table that does not exist
2254         if (!masterServices.getTableStateManager().isTablePresent(tableName))
2255           continue;
2256         requirePermission("getTableDescriptors", tableName, null, null,
2257           Action.ADMIN, Action.CREATE);
2258       }
2259     }
2260   }
2261 
2262   @Override
2263   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA,
2264       HRegion regionB) throws IOException {
2265     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2266       Action.ADMIN);
2267   }
2268 
2269   @Override
2270   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, HRegion regionA,
2271       HRegion regionB, HRegion mergedRegion) throws IOException { }
2272 
2273   @Override
2274   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2275       HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException { }
2276 
2277   @Override
2278   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2279       HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException { }
2280 
2281   @Override
2282   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2283       HRegion regionA, HRegion regionB) throws IOException { }
2284 
2285   @Override
2286   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2287       HRegion regionA, HRegion regionB) throws IOException { }
2288 
2289   @Override
2290   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2291       throws IOException {
2292     requirePermission("preRollLogWriterRequest", Permission.Action.ADMIN);
2293   }
2294 
2295   @Override
2296   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2297       throws IOException { }
2298 
2299   @Override
2300   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2301       final String userName, final Quotas quotas) throws IOException {
2302     requirePermission("setUserQuota", Action.ADMIN);
2303   }
2304 
2305   @Override
2306   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2307       final String userName, final TableName tableName, final Quotas quotas) throws IOException {
2308     requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN);
2309   }
2310 
2311   @Override
2312   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2313       final String userName, final String namespace, final Quotas quotas) throws IOException {
2314     requirePermission("setUserNamespaceQuota", Action.ADMIN);
2315   }
2316 
2317   @Override
2318   public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2319       final TableName tableName, final Quotas quotas) throws IOException {
2320     requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
2321   }
2322 
2323   @Override
2324   public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2325       final String namespace, final Quotas quotas) throws IOException {
2326     requirePermission("setNamespaceQuota", Action.ADMIN);
2327   }
2328   
2329   @Override
2330   public ReplicationEndpoint postCreateReplicationEndPoint(
2331       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2332     return endpoint;
2333   }
2334 }