View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package org.apache.hadoop.hbase.security.access;
16  
17  import java.io.IOException;
18  import java.net.InetAddress;
19  import java.util.Collection;
20  import java.util.Collections;
21  import java.util.HashMap;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  import java.util.Set;
27  import java.util.TreeMap;
28  import java.util.TreeSet;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.CellScanner;
35  import org.apache.hadoop.hbase.CellUtil;
36  import org.apache.hadoop.hbase.CompoundConfiguration;
37  import org.apache.hadoop.hbase.CoprocessorEnvironment;
38  import org.apache.hadoop.hbase.DoNotRetryIOException;
39  import org.apache.hadoop.hbase.HColumnDescriptor;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HRegionInfo;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.KeyValue;
44  import org.apache.hadoop.hbase.KeyValue.Type;
45  import org.apache.hadoop.hbase.KeyValueUtil;
46  import org.apache.hadoop.hbase.NamespaceDescriptor;
47  import org.apache.hadoop.hbase.ServerName;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.TableNotDisabledException;
50  import org.apache.hadoop.hbase.TableNotFoundException;
51  import org.apache.hadoop.hbase.Tag;
52  import org.apache.hadoop.hbase.MetaTableAccessor;
53  import org.apache.hadoop.hbase.client.Append;
54  import org.apache.hadoop.hbase.client.Delete;
55  import org.apache.hadoop.hbase.client.Durability;
56  import org.apache.hadoop.hbase.client.Get;
57  import org.apache.hadoop.hbase.client.Increment;
58  import org.apache.hadoop.hbase.client.Mutation;
59  import org.apache.hadoop.hbase.client.Put;
60  import org.apache.hadoop.hbase.client.Query;
61  import org.apache.hadoop.hbase.client.Result;
62  import org.apache.hadoop.hbase.client.Scan;
63  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
64  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
65  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
66  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
67  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
68  import org.apache.hadoop.hbase.coprocessor.MasterObserver;
69  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
70  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
71  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
72  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
73  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
74  import org.apache.hadoop.hbase.filter.CompareFilter;
75  import org.apache.hadoop.hbase.filter.Filter;
76  import org.apache.hadoop.hbase.filter.FilterList;
77  import org.apache.hadoop.hbase.io.hfile.HFile;
78  import org.apache.hadoop.hbase.ipc.RequestContext;
79  import org.apache.hadoop.hbase.master.MasterServices;
80  import org.apache.hadoop.hbase.master.RegionPlan;
81  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
82  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
83  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
84  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
85  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
86  import org.apache.hadoop.hbase.regionserver.HRegion;
87  import org.apache.hadoop.hbase.regionserver.InternalScanner;
88  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
89  import org.apache.hadoop.hbase.regionserver.RegionScanner;
90  import org.apache.hadoop.hbase.regionserver.ScanType;
91  import org.apache.hadoop.hbase.regionserver.Store;
92  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
93  import org.apache.hadoop.hbase.security.AccessDeniedException;
94  import org.apache.hadoop.hbase.security.User;
95  import org.apache.hadoop.hbase.security.UserProvider;
96  import org.apache.hadoop.hbase.security.access.Permission.Action;
97  import org.apache.hadoop.hbase.util.ByteRange;
98  import org.apache.hadoop.hbase.util.Bytes;
99  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
100 import org.apache.hadoop.hbase.util.Pair;
101 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
102 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
103 
104 import com.google.common.collect.ArrayListMultimap;
105 import com.google.common.collect.ImmutableSet;
106 import com.google.common.collect.ListMultimap;
107 import com.google.common.collect.Lists;
108 import com.google.common.collect.MapMaker;
109 import com.google.common.collect.Maps;
110 import com.google.common.collect.Sets;
111 import com.google.protobuf.Message;
112 import com.google.protobuf.RpcCallback;
113 import com.google.protobuf.RpcController;
114 import com.google.protobuf.Service;
115 
116 /**
117  * Provides basic authorization checks for data access and administrative
118  * operations.
119  *
120  * <p>
121  * {@code AccessController} performs authorization checks for HBase operations
122  * based on:
123  * <ul>
124  *   <li>the identity of the user performing the operation</li>
125  *   <li>the scope over which the operation is performed, in increasing
126  *   specificity: global, table, column family, or qualifier</li>
127  *   <li>the type of action being performed (as mapped to
128  *   {@link Permission.Action} values)</li>
129  * </ul>
130  * If the authorization check fails, an {@link AccessDeniedException}
131  * will be thrown for the operation.
132  * </p>
133  *
134  * <p>
135  * To perform authorization checks, {@code AccessController} relies on the
136  * RpcServerEngine being loaded to provide
137  * the user identities for remote requests.
138  * </p>
139  *
140  * <p>
141  * The access control lists used for authorization can be manipulated via the
142  * exposed {@link AccessControlService} Interface implementation, and the associated
143  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
144  * commands.
145  * </p>
146  */
147 public class AccessController extends BaseRegionObserver
148     implements MasterObserver, RegionServerObserver,
149       AccessControlService.Interface, CoprocessorService, EndpointObserver {
150 
151   public static final Log LOG = LogFactory.getLog(AccessController.class);
152 
153   private static final Log AUDITLOG =
154     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
155   private static final String CHECK_COVERING_PERM = "check_covering_perm";
156   private static final String TAG_CHECK_PASSED = "tag_check_passed";
157   private static final byte[] TRUE = Bytes.toBytes(true);
158 
159   TableAuthManager authManager = null;
160 
161   // flags if we are running on a region of the _acl_ table
162   boolean aclRegion = false;
163 
164   // defined only for Endpoint implementation, so it can have way to
165   // access region services.
166   private RegionCoprocessorEnvironment regionEnv;
167 
168   /** Mapping of scanner instances to the user who created them */
169   private Map<InternalScanner,String> scannerOwners =
170       new MapMaker().weakKeys().makeMap();
171 
172   // Provider for mapping principal names to Users
173   private UserProvider userProvider;
174 
175   // The list of users with superuser authority
176   private List<String> superusers;
177 
178   // if we are able to support cell ACLs
179   boolean cellFeaturesEnabled;
180 
181   // if we should check EXEC permissions
182   boolean shouldCheckExecPermission;
183 
184   // if we should terminate access checks early as soon as table or CF grants
185   // allow access; pre-0.98 compatible behavior
186   boolean compatibleEarlyTermination;
187 
188   private volatile boolean initialized = false;
189 
190   // This boolean having relevance only in the Master.
191   private volatile boolean aclTabAvailable = false;
192 
193   public HRegion getRegion() {
194     return regionEnv != null ? regionEnv.getRegion() : null;
195   }
196 
197   public TableAuthManager getAuthManager() {
198     return authManager;
199   }
200 
201   void initialize(RegionCoprocessorEnvironment e) throws IOException {
202     final HRegion region = e.getRegion();
203     Configuration conf = e.getConfiguration();
204     Map<byte[], ListMultimap<String,TablePermission>> tables =
205         AccessControlLists.loadAll(region);
206     // For each table, write out the table's permissions to the respective
207     // znode for that table.
208     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
209       tables.entrySet()) {
210       byte[] entry = t.getKey();
211       ListMultimap<String,TablePermission> perms = t.getValue();
212       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
213       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
214     }
215     initialized = true;
216   }
217 
218   /**
219    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
220    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
221    * table updates.
222    */
223   void updateACL(RegionCoprocessorEnvironment e,
224       final Map<byte[], List<Cell>> familyMap) {
225     Set<byte[]> entries =
226         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
227     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
228       List<Cell> cells = f.getValue();
229       for (Cell cell: cells) {
230         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
231         if (Bytes.equals(kv.getFamilyArray(), kv.getFamilyOffset(),
232             kv.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
233             AccessControlLists.ACL_LIST_FAMILY.length)) {
234           entries.add(kv.getRow());
235         }
236       }
237     }
238     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
239     Configuration conf = regionEnv.getConfiguration();
240     for (byte[] entry: entries) {
241       try {
242         ListMultimap<String,TablePermission> perms =
243           AccessControlLists.getPermissions(conf, entry);
244         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
245         zkw.writeToZookeeper(entry, serialized);
246       } catch (IOException ex) {
247         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
248             ex);
249       }
250     }
251   }
252 
253   /**
254    * Check the current user for authorization to perform a specific action
255    * against the given set of row data.
256    *
257    * <p>Note: Ordering of the authorization checks
258    * has been carefully optimized to short-circuit the most common requests
259    * and minimize the amount of processing required.</p>
260    *
261    * @param permRequest the action being requested
262    * @param e the coprocessor environment
263    * @param families the map of column families to qualifiers present in
264    * the request
265    * @return an authorization result
266    */
267   AuthResult permissionGranted(String request, User user, Action permRequest,
268       RegionCoprocessorEnvironment e,
269       Map<byte [], ? extends Collection<?>> families) {
270     HRegionInfo hri = e.getRegion().getRegionInfo();
271     TableName tableName = hri.getTable();
272 
273     // 1. All users need read access to hbase:meta table.
274     // this is a very common operation, so deal with it quickly.
275     if (hri.isMetaRegion()) {
276       if (permRequest == Action.READ) {
277         return AuthResult.allow(request, "All users allowed", user,
278           permRequest, tableName, families);
279       }
280     }
281 
282     if (user == null) {
283       return AuthResult.deny(request, "No user associated with request!", null,
284         permRequest, tableName, families);
285     }
286 
287     // Users with CREATE/ADMIN rights need to modify hbase:meta and _acl_ table
288     // e.g. When a new table is created a new entry in hbase:meta is added,
289     // so the user need to be allowed to write on it.
290     // e.g. When a table is removed an entry is removed from hbase:meta and _acl_
291     // and the user need to be allowed to write on both tables.
292     if (permRequest == Action.WRITE &&
293        (hri.isMetaRegion() ||
294         Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) &&
295        (authManager.authorize(user, Action.CREATE) ||
296         authManager.authorize(user, Action.ADMIN)))
297     {
298        return AuthResult.allow(request, "Table permission granted", user,
299         permRequest, tableName, families);
300     }
301 
302     // 2. check for the table-level, if successful we can short-circuit
303     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
304       return AuthResult.allow(request, "Table permission granted", user,
305         permRequest, tableName, families);
306     }
307 
308     // 3. check permissions against the requested families
309     if (families != null && families.size() > 0) {
310       // all families must pass
311       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
312         // a) check for family level access
313         if (authManager.authorize(user, tableName, family.getKey(),
314             permRequest)) {
315           continue;  // family-level permission overrides per-qualifier
316         }
317 
318         // b) qualifier level access can still succeed
319         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
320           if (family.getValue() instanceof Set) {
321             // for each qualifier of the family
322             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
323             for (byte[] qualifier : familySet) {
324               if (!authManager.authorize(user, tableName, family.getKey(),
325                                          qualifier, permRequest)) {
326                 return AuthResult.deny(request, "Failed qualifier check", user,
327                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
328               }
329             }
330           } else if (family.getValue() instanceof List) { // List<KeyValue>
331             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
332             for (KeyValue kv : kvList) {
333               if (!authManager.authorize(user, tableName, family.getKey(),
334                       kv.getQualifier(), permRequest)) {
335                 return AuthResult.deny(request, "Failed qualifier check", user,
336                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
337               }
338             }
339           }
340         } else {
341           // no qualifiers and family-level check already failed
342           return AuthResult.deny(request, "Failed family check", user, permRequest,
343               tableName, makeFamilyMap(family.getKey(), null));
344         }
345       }
346 
347       // all family checks passed
348       return AuthResult.allow(request, "All family checks passed", user, permRequest,
349           tableName, families);
350     }
351 
352     // 4. no families to check and table level access failed
353     return AuthResult.deny(request, "No families to check and table permission failed",
354         user, permRequest, tableName, families);
355   }
356 
357   /**
358    * Check the current user for authorization to perform a specific action
359    * against the given set of row data.
360    * @param opType the operation type
361    * @param user the user
362    * @param e the coprocessor environment
363    * @param families the map of column families to qualifiers present in
364    * the request
365    * @param actions the desired actions
366    * @return an authorization result
367    */
368   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
369       Map<byte [], ? extends Collection<?>> families, Action... actions) {
370     AuthResult result = null;
371     for (Action action: actions) {
372       result = permissionGranted(opType.toString(), user, action, e, families);
373       if (!result.isAllowed()) {
374         return result;
375       }
376     }
377     return result;
378   }
379 
380   private void logResult(AuthResult result) {
381     if (AUDITLOG.isTraceEnabled()) {
382       RequestContext ctx = RequestContext.get();
383       InetAddress remoteAddr = null;
384       if (ctx != null) {
385         remoteAddr = ctx.getRemoteAddress();
386       }
387       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
388           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
389           "; reason: " + result.getReason() +
390           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
391           "; request: " + result.getRequest() +
392           "; context: " + result.toContextString());
393     }
394   }
395 
396   /**
397    * Returns the active user to which authorization checks should be applied.
398    * If we are in the context of an RPC call, the remote user is used,
399    * otherwise the currently logged in user is used.
400    */
401   private User getActiveUser() throws IOException {
402     User user = RequestContext.getRequestUser();
403     if (!RequestContext.isInRequestContext()) {
404       // for non-rpc handling, fallback to system user
405       user = userProvider.getCurrent();
406     }
407     return user;
408   }
409 
410   /**
411    * Authorizes that the current user has any of the given permissions for the
412    * given table, column family and column qualifier.
413    * @param tableName Table requested
414    * @param family Column family requested
415    * @param qualifier Column qualifier requested
416    * @throws IOException if obtaining the current user fails
417    * @throws AccessDeniedException if user has no authorization
418    */
419   private void requirePermission(String request, TableName tableName, byte[] family, byte[] qualifier,
420       Action... permissions) throws IOException {
421     User user = getActiveUser();
422     AuthResult result = null;
423 
424     for (Action permission : permissions) {
425       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
426         result = AuthResult.allow(request, "Table permission granted", user,
427                                   permission, tableName, family, qualifier);
428         break;
429       } else {
430         // rest of the world
431         result = AuthResult.deny(request, "Insufficient permissions", user,
432                                  permission, tableName, family, qualifier);
433       }
434     }
435     logResult(result);
436     if (!result.isAllowed()) {
437       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
438     }
439   }
440 
441   /**
442    * Authorizes that the current user has global privileges for the given action.
443    * @param perm The action being requested
444    * @throws IOException if obtaining the current user fails
445    * @throws AccessDeniedException if authorization is denied
446    */
447   private void requirePermission(String request, Action perm) throws IOException {
448     requireGlobalPermission(request, perm, null, null);
449   }
450 
451   /**
452    * Authorizes that the current user has permission to perform the given
453    * action on the set of table column families.
454    * @param perm Action that is required
455    * @param env The current coprocessor environment
456    * @param families The map of column families-qualifiers.
457    * @throws AccessDeniedException if the authorization check failed
458    */
459   private void requirePermission(String request, Action perm,
460         RegionCoprocessorEnvironment env,
461         Map<byte[], ? extends Collection<?>> families)
462       throws IOException {
463     User user = getActiveUser();
464     AuthResult result = permissionGranted(request, user, perm, env, families);
465     logResult(result);
466 
467     if (!result.isAllowed()) {
468       throw new AccessDeniedException("Insufficient permissions (table=" +
469         env.getRegion().getTableDesc().getTableName()+
470         ((families != null && families.size() > 0) ? ", family: " +
471         result.toFamilyString() : "") + ", action=" +
472         perm.toString() + ")");
473     }
474   }
475 
476   /**
477    * Checks that the user has the given global permission. The generated
478    * audit log message will contain context information for the operation
479    * being authorized, based on the given parameters.
480    * @param perm Action being requested
481    * @param tableName Affected table name.
482    * @param familyMap Affected column families.
483    */
484   private void requireGlobalPermission(String request, Action perm, TableName tableName,
485       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
486     User user = getActiveUser();
487     if (authManager.authorize(user, perm)) {
488       logResult(AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap));
489     } else {
490       logResult(AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap));
491       throw new AccessDeniedException("Insufficient permissions for user '" +
492           (user != null ? user.getShortName() : "null") +"' (global, action=" +
493           perm.toString() + ")");
494     }
495   }
496 
497   /**
498    * Checks that the user has the given global permission. The generated
499    * audit log message will contain context information for the operation
500    * being authorized, based on the given parameters.
501    * @param perm Action being requested
502    * @param namespace
503    */
504   private void requireGlobalPermission(String request, Action perm,
505                                        String namespace) throws IOException {
506     User user = getActiveUser();
507     if (authManager.authorize(user, perm)) {
508       logResult(AuthResult.allow(request, "Global check allowed", user, perm, namespace));
509     } else {
510       logResult(AuthResult.deny(request, "Global check failed", user, perm, namespace));
511       throw new AccessDeniedException("Insufficient permissions for user '" +
512           (user != null ? user.getShortName() : "null") +"' (global, action=" +
513           perm.toString() + ")");
514     }
515   }
516 
517   /**
518    * Returns <code>true</code> if the current user is allowed the given action
519    * over at least one of the column qualifiers in the given column families.
520    */
521   private boolean hasFamilyQualifierPermission(User user,
522       Action perm,
523       RegionCoprocessorEnvironment env,
524       Map<byte[], ? extends Collection<byte[]>> familyMap)
525     throws IOException {
526     HRegionInfo hri = env.getRegion().getRegionInfo();
527     TableName tableName = hri.getTable();
528 
529     if (user == null) {
530       return false;
531     }
532 
533     if (familyMap != null && familyMap.size() > 0) {
534       // at least one family must be allowed
535       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
536           familyMap.entrySet()) {
537         if (family.getValue() != null && !family.getValue().isEmpty()) {
538           for (byte[] qualifier : family.getValue()) {
539             if (authManager.matchPermission(user, tableName,
540                 family.getKey(), qualifier, perm)) {
541               return true;
542             }
543           }
544         } else {
545           if (authManager.matchPermission(user, tableName, family.getKey(),
546               perm)) {
547             return true;
548           }
549         }
550       }
551     } else if (LOG.isDebugEnabled()) {
552       LOG.debug("Empty family map passed for permission check");
553     }
554 
555     return false;
556   }
557 
558   private enum OpType {
559     GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
560     GET("get"),
561     EXISTS("exists"),
562     SCAN("scan"),
563     PUT("put"),
564     DELETE("delete"),
565     CHECK_AND_PUT("checkAndPut"),
566     CHECK_AND_DELETE("checkAndDelete"),
567     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
568     APPEND("append"),
569     INCREMENT("increment");
570 
571     private String type;
572 
573     private OpType(String type) {
574       this.type = type;
575     }
576 
577     @Override
578     public String toString() {
579       return type;
580     }
581   }
582 
583   /**
584    * Determine if cell ACLs covered by the operation grant access. This is expensive.
585    * @return false if cell ACLs failed to grant access, true otherwise
586    * @throws IOException
587    */
588   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
589       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
590       throws IOException {
591     if (!cellFeaturesEnabled) {
592       return false;
593     }
594     long cellGrants = 0;
595     User user = getActiveUser();
596     long latestCellTs = 0;
597     Get get = new Get(row);
598     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
599     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
600     // version. We have to get every cell version and check its TS against the TS asked for in
601     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
602     // consider only one such passing cell. In case of Delete we have to consider all the cell
603     // versions under this passing version. When Delete Mutation contains columns which are a
604     // version delete just consider only one version for those column cells.
605     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
606     if (considerCellTs) {
607       get.setMaxVersions();
608     } else {
609       get.setMaxVersions(1);
610     }
611     boolean diffCellTsFromOpTs = false;
612     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
613       byte[] col = entry.getKey();
614       // TODO: HBASE-7114 could possibly unify the collection type in family
615       // maps so we would not need to do this
616       if (entry.getValue() instanceof Set) {
617         Set<byte[]> set = (Set<byte[]>)entry.getValue();
618         if (set == null || set.isEmpty()) {
619           get.addFamily(col);
620         } else {
621           for (byte[] qual: set) {
622             get.addColumn(col, qual);
623           }
624         }
625       } else if (entry.getValue() instanceof List) {
626         List<Cell> list = (List<Cell>)entry.getValue();
627         if (list == null || list.isEmpty()) {
628           get.addFamily(col);
629         } else {
630           // In case of family delete, a Cell will be added into the list with Qualifier as null.
631           for (Cell cell : list) {
632             if (cell.getQualifierLength() == 0
633                 && (cell.getTypeByte() == Type.DeleteFamily.getCode() 
634                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
635               get.addFamily(col);
636             } else {
637               get.addColumn(col, CellUtil.cloneQualifier(cell));
638             }
639             if (considerCellTs) {
640               long cellTs = cell.getTimestamp();
641               latestCellTs = Math.max(latestCellTs, cellTs);
642               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
643             }
644           }
645         }
646       } else {
647         throw new RuntimeException("Unhandled collection type " +
648           entry.getValue().getClass().getName());
649       }
650     }
651     // We want to avoid looking into the future. So, if the cells of the
652     // operation specify a timestamp, or the operation itself specifies a
653     // timestamp, then we use the maximum ts found. Otherwise, we bound
654     // the Get to the current server time. We add 1 to the timerange since
655     // the upper bound of a timerange is exclusive yet we need to examine
656     // any cells found there inclusively.
657     long latestTs = Math.max(opTs, latestCellTs);
658     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
659       latestTs = EnvironmentEdgeManager.currentTimeMillis();
660     }
661     get.setTimeRange(0, latestTs + 1);
662     // In case of Put operation we set to read all versions. This was done to consider the case
663     // where columns are added with TS other than the Mutation TS. But normally this wont be the
664     // case with Put. There no need to get all versions but get latest version only.
665     if (!diffCellTsFromOpTs && request == OpType.PUT) {
666       get.setMaxVersions(1);
667     }
668     if (LOG.isTraceEnabled()) {
669       LOG.trace("Scanning for cells with " + get);
670     }
671     // This Map is identical to familyMap. The key is a BR rather than byte[].
672     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
673     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
674     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
675     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
676       if (entry.getValue() instanceof List) {
677         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
678       }
679     }
680     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
681     List<Cell> cells = Lists.newArrayList();
682     Cell prevCell = null;
683     ByteRange curFam = new SimpleMutableByteRange();
684     boolean curColAllVersions = (request == OpType.DELETE);
685     long curColCheckTs = opTs;
686     boolean foundColumn = false;
687     try {
688       boolean more = false;
689       do {
690         cells.clear();
691         // scan with limit as 1 to hold down memory use on wide rows
692         more = scanner.next(cells, 1);
693         for (Cell cell: cells) {
694           if (LOG.isTraceEnabled()) {
695             LOG.trace("Found cell " + cell);
696           }
697           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
698           if (colChange) foundColumn = false;
699           prevCell = cell;
700           if (!curColAllVersions && foundColumn) {
701             continue;
702           }
703           if (colChange && considerCellTs) {
704             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
705             List<Cell> cols = familyMap1.get(curFam);
706             for (Cell col : cols) {
707               // null/empty qualifier is used to denote a Family delete. The TS and delete type
708               // associated with this is applicable for all columns within the family. That is
709               // why the below (col.getQualifierLength() == 0) check.
710               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
711                   || CellUtil.matchingQualifier(cell, col)) {
712                 byte type = col.getTypeByte();
713                 if (considerCellTs) {
714                   curColCheckTs = col.getTimestamp();
715                 }
716                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
717                 // a version delete for a column no need to check all the covering cells within
718                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
719                 // One version delete types are Delete/DeleteFamilyVersion
720                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
721                     || (KeyValue.Type.DeleteFamily.getCode() == type);
722                 break;
723               }
724             }
725           }
726           if (cell.getTimestamp() > curColCheckTs) {
727             // Just ignore this cell. This is not a covering cell.
728             continue;
729           }
730           foundColumn = true;
731           for (Action action: actions) {
732             // Are there permissions for this user for the cell?
733             if (!authManager.authorize(user, getTableName(e), cell, action)) {
734               // We can stop if the cell ACL denies access
735               return false;
736             }
737           }
738           cellGrants++;
739         }
740       } while (more);
741     } catch (AccessDeniedException ex) {
742       throw ex;
743     } catch (IOException ex) {
744       LOG.error("Exception while getting cells to calculate covering permission", ex);
745     } finally {
746       scanner.close();
747     }
748     // We should not authorize unless we have found one or more cell ACLs that
749     // grant access. This code is used to check for additional permissions
750     // after no table or CF grants are found.
751     return cellGrants > 0;
752   }
753 
754   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
755     // Iterate over the entries in the familyMap, replacing the cells therein
756     // with new cells including the ACL data
757     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
758       List<Cell> newCells = Lists.newArrayList();
759       for (Cell cell: e.getValue()) {
760         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
761         Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
762             cell.getTagsOffset(), cell.getTagsLength());
763         while (tagIterator.hasNext()) {
764           tags.add(tagIterator.next());
765         }
766         // Ensure KeyValue so we can do a scatter gather copy. This is only a win if the
767         // incoming cell type is actually KeyValue.
768         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
769         newCells.add(
770           new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
771             kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(),
772             kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
773             kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()),
774             kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(),
775             tags));
776       }
777       // This is supposed to be safe, won't CME
778       e.setValue(newCells);
779     }
780   }
781 
782   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
783   // type is reserved and should not be explicitly set by user.
784   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
785     // Superusers are allowed to store cells unconditionally.
786     if (superusers.contains(user.getShortName())) {
787       return;
788     }
789     // We already checked (prePut vs preBatchMutation)
790     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
791       return;
792     }
793     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
794       Cell cell = cellScanner.current();
795       if (cell.getTagsLength() > 0) {
796         Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
797           cell.getTagsLength());
798         while (tagsItr.hasNext()) {
799           if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
800             throw new AccessDeniedException("Mutation contains cell with reserved type tag");
801           }
802         }
803       }
804     }
805     m.setAttribute(TAG_CHECK_PASSED, TRUE);
806   }
807 
808   /* ---- MasterObserver implementation ---- */
809 
810   public void start(CoprocessorEnvironment env) throws IOException {
811     CompoundConfiguration conf = new CompoundConfiguration();
812     conf.add(env.getConfiguration());
813 
814     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
815       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
816 
817     cellFeaturesEnabled = HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS;
818     if (!cellFeaturesEnabled) {
819       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
820           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
821           + " accordingly.");
822     }
823 
824     ZooKeeperWatcher zk = null;
825     if (env instanceof MasterCoprocessorEnvironment) {
826       // if running on HMaster
827       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
828       zk = mEnv.getMasterServices().getZooKeeper();
829     } else if (env instanceof RegionServerCoprocessorEnvironment) {      
830       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
831       zk = rsEnv.getRegionServerServices().getZooKeeper();      
832     } else if (env instanceof RegionCoprocessorEnvironment) {
833       // if running at region
834       regionEnv = (RegionCoprocessorEnvironment) env;
835       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
836       zk = regionEnv.getRegionServerServices().getZooKeeper();
837       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
838         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
839     }
840 
841     // set the user-provider.
842     this.userProvider = UserProvider.instantiate(env.getConfiguration());
843 
844     // set up the list of users with superuser privilege
845     User user = userProvider.getCurrent();
846     superusers = Lists.asList(user.getShortName(),
847       conf.getStrings(AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
848 
849     // If zk is null or IOException while obtaining auth manager,
850     // throw RuntimeException so that the coprocessor is unloaded.
851     if (zk != null) {
852       try {
853         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
854       } catch (IOException ioe) {
855         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
856       }
857     } else {
858       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
859     }
860   }
861 
862   public void stop(CoprocessorEnvironment env) {
863 
864   }
865 
866   @Override
867   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
868       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
869     Set<byte[]> families = desc.getFamiliesKeys();
870     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
871     for (byte[] family: families) {
872       familyMap.put(family, null);
873     }
874     requireGlobalPermission("createTable", Action.CREATE, desc.getTableName(), familyMap);
875   }
876 
877   @Override
878   public void preCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
879       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {}
880 
881   @Override
882   public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
883       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {}
884 
885   @Override
886   public void postCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
887       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
888     // When AC is used, it should be configured as the 1st CP.
889     // In Master, the table operations like create, are handled by a Thread pool but the max size
890     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
891     // sequentially only.
892     // Related code in HMaster#startServiceThreads
893     // {code}
894     //   // We depend on there being only one instance of this executor running
895     //   // at a time. To do concurrency, would need fencing of enable/disable of
896     //   // tables.
897     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
898     // {code}
899     // In future if we change this pool to have more threads, then there is a chance for thread,
900     // creating acl table, getting delayed and by that time another table creation got over and
901     // this hook is getting called. In such a case, we will need a wait logic here which will
902     // wait till the acl table is created.
903     if (AccessControlLists.isAclTable(desc)) {
904       this.aclTabAvailable = true;
905     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
906       if (!aclTabAvailable) {
907         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
908             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
909             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
910       } else {
911         String owner = desc.getOwnerString();
912         // default the table owner to current user, if not specified.
913         if (owner == null)
914           owner = getActiveUser().getShortName();
915         UserPermission userperm = new UserPermission(Bytes.toBytes(owner), desc.getTableName(),
916             null, Action.values());
917         AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(), userperm);
918       }
919     }
920   }
921 
922   @Override
923   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
924       throws IOException {
925     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
926   }
927 
928   @Override
929   public void preDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
930       TableName tableName) throws IOException {}
931 
932   @Override
933   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
934       TableName tableName) throws IOException {
935     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(), tableName);
936   }
937 
938   @Override
939   public void postDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
940       TableName tableName) throws IOException {}
941 
942    @Override
943   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
944       throws IOException {
945     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
946   }
947   @Override
948   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
949       TableName tableName) throws IOException {
950   }
951   @Override
952   public void preTruncateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
953       TableName tableName) throws IOException {}
954   @Override
955   public void postTruncateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
956       TableName tableName) throws IOException {}
957 
958   @Override
959   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
960       HTableDescriptor htd) throws IOException {
961     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
962   }
963 
964   @Override
965   public void preModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
966       TableName tableName, HTableDescriptor htd) throws IOException {}
967 
968   @Override
969   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
970       TableName tableName, HTableDescriptor htd) throws IOException {
971     String owner = htd.getOwnerString();
972     // default the table owner to current user, if not specified.
973     if (owner == null) owner = getActiveUser().getShortName();
974     UserPermission userperm = new UserPermission(Bytes.toBytes(owner), htd.getTableName(), null,
975         Action.values());
976     AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(), userperm);
977   }
978 
979   @Override
980   public void postModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
981       TableName tableName, HTableDescriptor htd) throws IOException {}
982 
983 
984   @Override
985   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
986       HColumnDescriptor column) throws IOException {
987     requirePermission("addColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
988   }
989 
990   @Override
991   public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
992       TableName tableName, HColumnDescriptor column) throws IOException {}
993 
994   @Override
995   public void postAddColumn(ObserverContext<MasterCoprocessorEnvironment> c,
996       TableName tableName, HColumnDescriptor column) throws IOException {}
997 
998   @Override
999   public void postAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1000       TableName tableName, HColumnDescriptor column) throws IOException {}
1001 
1002   @Override
1003   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1004       HColumnDescriptor descriptor) throws IOException {
1005     requirePermission("modifyColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
1006   }
1007 
1008   @Override
1009   public void preModifyColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1010       TableName tableName, HColumnDescriptor descriptor) throws IOException {}
1011 
1012   @Override
1013   public void postModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c,
1014       TableName tableName, HColumnDescriptor descriptor) throws IOException {}
1015 
1016   @Override
1017   public void postModifyColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1018       TableName tableName, HColumnDescriptor descriptor) throws IOException {}
1019 
1020   @Override
1021   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1022       byte[] col) throws IOException {
1023     requirePermission("deleteColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
1024   }
1025 
1026   @Override
1027   public void preDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1028       TableName tableName, byte[] col) throws IOException {}
1029 
1030   @Override
1031   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
1032       TableName tableName, byte[] col) throws IOException {
1033     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(), tableName, col);
1034   }
1035 
1036   @Override
1037   public void postDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1038       TableName tableName, byte[] col) throws IOException {}
1039 
1040   @Override
1041   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1042       throws IOException {
1043     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1044   }
1045 
1046   @Override
1047   public void preEnableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1048       TableName tableName) throws IOException {}
1049 
1050   @Override
1051   public void postEnableTable(ObserverContext<MasterCoprocessorEnvironment> c,
1052       TableName tableName) throws IOException {}
1053 
1054   @Override
1055   public void postEnableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1056       TableName tableName) throws IOException {}
1057 
1058   @Override
1059   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1060       throws IOException {
1061     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1062       throw new AccessDeniedException("Not allowed to disable "
1063           + AccessControlLists.ACL_TABLE_NAME + " table.");
1064     }
1065     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1066   }
1067 
1068   @Override
1069   public void preDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1070       TableName tableName) throws IOException {}
1071 
1072   @Override
1073   public void postDisableTable(ObserverContext<MasterCoprocessorEnvironment> c,
1074       TableName tableName) throws IOException {}
1075 
1076   @Override
1077   public void postDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1078       TableName tableName) throws IOException {}
1079 
1080   @Override
1081   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1082       ServerName srcServer, ServerName destServer) throws IOException {
1083     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1084   }
1085 
1086   @Override
1087   public void postMove(ObserverContext<MasterCoprocessorEnvironment> c,
1088       HRegionInfo region, ServerName srcServer, ServerName destServer)
1089     throws IOException {}
1090 
1091   @Override
1092   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1093       throws IOException {
1094     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1095   }
1096 
1097   @Override
1098   public void postAssign(ObserverContext<MasterCoprocessorEnvironment> c,
1099       HRegionInfo regionInfo) throws IOException {}
1100 
1101   @Override
1102   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1103       boolean force) throws IOException {
1104     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1105   }
1106 
1107   @Override
1108   public void postUnassign(ObserverContext<MasterCoprocessorEnvironment> c,
1109       HRegionInfo regionInfo, boolean force) throws IOException {}
1110 
1111   @Override
1112   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1113       HRegionInfo regionInfo) throws IOException {
1114     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1115   }
1116 
1117   @Override
1118   public void postRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1119       HRegionInfo regionInfo) throws IOException {
1120   }
1121 
1122   @Override
1123   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1124       throws IOException {
1125     requirePermission("balance", Action.ADMIN);
1126   }
1127 
1128   @Override
1129   public void postBalance(ObserverContext<MasterCoprocessorEnvironment> c, List<RegionPlan> plans)
1130       throws IOException {}
1131 
1132   @Override
1133   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1134       boolean newValue) throws IOException {
1135     requirePermission("balanceSwitch", Action.ADMIN);
1136     return newValue;
1137   }
1138 
1139   @Override
1140   public void postBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1141       boolean oldValue, boolean newValue) throws IOException {}
1142 
1143   @Override
1144   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1145       throws IOException {
1146     requirePermission("shutdown", Action.ADMIN);
1147   }
1148 
1149   @Override
1150   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1151       throws IOException {
1152     requirePermission("stopMaster", Action.ADMIN);
1153   }
1154 
1155   @Override
1156   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1157       throws IOException {
1158     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1159       .getShortCircuitConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1160       // initialize the ACL storage table
1161       AccessControlLists.init(ctx.getEnvironment().getMasterServices());
1162     } else {
1163       aclTabAvailable = true;
1164     }
1165   }
1166 
1167   @Override
1168   public void preMasterInitialization(
1169       ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
1170   }
1171 
1172   @Override
1173   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1174       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1175       throws IOException {
1176     requirePermission("snapshot", Action.ADMIN);
1177   }
1178 
1179   @Override
1180   public void postSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1181       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1182       throws IOException {
1183   }
1184 
1185   @Override
1186   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1187       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1188       throws IOException {
1189     requirePermission("clone", Action.ADMIN);
1190   }
1191 
1192   @Override
1193   public void postCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1194       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1195       throws IOException {
1196   }
1197 
1198   @Override
1199   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1200       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1201       throws IOException {
1202     requirePermission("restore", Action.ADMIN);
1203   }
1204 
1205   @Override
1206   public void postRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1207       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1208       throws IOException {
1209   }
1210 
1211   @Override
1212   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1213       final SnapshotDescription snapshot) throws IOException {
1214     requirePermission("deleteSnapshot", Action.ADMIN);
1215   }
1216 
1217   @Override
1218   public void postDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1219       final SnapshotDescription snapshot) throws IOException {
1220   }
1221 
1222   @Override
1223   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1224       NamespaceDescriptor ns) throws IOException {
1225     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1226   }
1227 
1228   @Override
1229   public void postCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1230       NamespaceDescriptor ns) throws IOException {
1231   }
1232 
1233   @Override
1234   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1235       throws IOException {
1236     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1237   }
1238 
1239   @Override
1240   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1241                                   String namespace) throws IOException {
1242     AccessControlLists.removeNamespacePermissions(ctx.getEnvironment().getConfiguration(),
1243         namespace);
1244     LOG.info(namespace + "entry deleted in "+AccessControlLists.ACL_TABLE_NAME+" table.");
1245   }
1246 
1247   @Override
1248   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1249       NamespaceDescriptor ns) throws IOException {
1250     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1251   }
1252 
1253   @Override
1254   public void postModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1255                                   NamespaceDescriptor ns) throws IOException {
1256   }
1257 
1258   @Override
1259   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1260       final TableName tableName) throws IOException {
1261     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1262   }
1263 
1264   @Override
1265   public void postTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1266       final TableName tableName) throws IOException {
1267   }
1268 
1269   /* ---- RegionObserver implementation ---- */
1270 
1271   @Override
1272   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1273       throws IOException {
1274     RegionCoprocessorEnvironment env = e.getEnvironment();
1275     final HRegion region = env.getRegion();
1276     if (region == null) {
1277       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1278     } else {
1279       HRegionInfo regionInfo = region.getRegionInfo();
1280       if (regionInfo.getTable().isSystemTable()) {
1281         isSystemOrSuperUser(regionEnv.getConfiguration());
1282       } else {
1283         requirePermission("preOpen", Action.ADMIN);
1284       }
1285     }
1286   }
1287 
1288   @Override
1289   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1290     RegionCoprocessorEnvironment env = c.getEnvironment();
1291     final HRegion region = env.getRegion();
1292     if (region == null) {
1293       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1294       return;
1295     }
1296     if (AccessControlLists.isAclRegion(region)) {
1297       aclRegion = true;
1298       // When this region is under recovering state, initialize will be handled by postLogReplay
1299       if (!region.isRecovering()) {
1300         try {
1301           initialize(env);
1302         } catch (IOException ex) {
1303           // if we can't obtain permissions, it's better to fail
1304           // than perform checks incorrectly
1305           throw new RuntimeException("Failed to initialize permissions cache", ex);
1306         }
1307       }
1308     } else {
1309       initialized = true;
1310     }
1311   }
1312 
1313   @Override
1314   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1315     if (aclRegion) {
1316       try {
1317         initialize(c.getEnvironment());
1318       } catch (IOException ex) {
1319         // if we can't obtain permissions, it's better to fail
1320         // than perform checks incorrectly
1321         throw new RuntimeException("Failed to initialize permissions cache", ex);
1322       }
1323     }
1324   }
1325 
1326   @Override
1327   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1328     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1329         Action.CREATE);
1330   }
1331 
1332   @Override
1333   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1334     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1335   }
1336   
1337   @Override
1338   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1339       byte[] splitRow) throws IOException {
1340     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1341   }
1342 
1343   @Override
1344   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1345       final Store store, final InternalScanner scanner, final ScanType scanType)
1346           throws IOException {
1347     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1348         Action.CREATE);
1349     return scanner;
1350   }
1351 
1352   @Override
1353   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1354       final byte [] row, final byte [] family, final Result result)
1355       throws IOException {
1356     assert family != null;
1357     RegionCoprocessorEnvironment env = c.getEnvironment();
1358     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, null);
1359     User user = getActiveUser();
1360     AuthResult authResult = permissionGranted(OpType.GET_CLOSEST_ROW_BEFORE, user, env, families,
1361       Action.READ);
1362     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1363       authResult.setAllowed(checkCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, env, row,
1364         families, HConstants.LATEST_TIMESTAMP, Action.READ));
1365       authResult.setReason("Covering cell set");
1366     }
1367     logResult(authResult);
1368     if (!authResult.isAllowed()) {
1369       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1370     }
1371   }
1372 
1373   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1374       final Query query, OpType opType) throws IOException {
1375     Filter filter = query.getFilter();
1376     // Don't wrap an AccessControlFilter
1377     if (filter != null && filter instanceof AccessControlFilter) {
1378       return;
1379     }
1380     User user = getActiveUser();
1381     RegionCoprocessorEnvironment env = c.getEnvironment();
1382     Map<byte[],? extends Collection<byte[]>> families = null;
1383     switch (opType) {
1384     case GET:
1385     case EXISTS:
1386       families = ((Get)query).getFamilyMap();
1387       break;
1388     case SCAN:
1389       families = ((Scan)query).getFamilyMap();
1390       break;
1391     default:
1392       throw new RuntimeException("Unhandled operation " + opType);
1393     }
1394     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1395     HRegion region = getRegion(env);
1396     TableName table = getTableName(region);
1397     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1398     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1399       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1400     }
1401     if (!authResult.isAllowed()) {
1402       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1403         // Old behavior: Scan with only qualifier checks if we have partial
1404         // permission. Backwards compatible behavior is to throw an
1405         // AccessDeniedException immediately if there are no grants for table
1406         // or CF or CF+qual. Only proceed with an injected filter if there are
1407         // grants for qualifiers. Otherwise we will fall through below and log
1408         // the result and throw an ADE. We may end up checking qualifier
1409         // grants three times (permissionGranted above, here, and in the
1410         // filter) but that's the price of backwards compatibility. 
1411         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1412           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1413             AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1414             cfVsMaxVersions);
1415           // wrap any existing filter
1416           if (filter != null) {
1417             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1418               Lists.newArrayList(ourFilter, filter));
1419           }
1420           authResult.setAllowed(true);;
1421           authResult.setReason("Access allowed with filter");
1422           switch (opType) {
1423           case GET:
1424           case EXISTS:
1425             ((Get)query).setFilter(ourFilter);
1426             break;
1427           case SCAN:
1428             ((Scan)query).setFilter(ourFilter);
1429             break;
1430           default:
1431             throw new RuntimeException("Unhandled operation " + opType);
1432           }
1433         }
1434       } else {
1435         // New behavior: Any access we might be granted is more fine-grained
1436         // than whole table or CF. Simply inject a filter and return what is
1437         // allowed. We will not throw an AccessDeniedException. This is a
1438         // behavioral change since 0.96. 
1439         Filter ourFilter = new AccessControlFilter(authManager, user, table,
1440           AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1441         // wrap any existing filter
1442         if (filter != null) {
1443           ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1444             Lists.newArrayList(ourFilter, filter));
1445         }
1446         authResult.setAllowed(true);;
1447         authResult.setReason("Access allowed with filter");
1448         switch (opType) {
1449         case GET:
1450         case EXISTS:
1451           ((Get)query).setFilter(ourFilter);
1452           break;
1453         case SCAN:
1454           ((Scan)query).setFilter(ourFilter);
1455           break;
1456         default:
1457           throw new RuntimeException("Unhandled operation " + opType);
1458         }
1459       }
1460     }
1461 
1462     logResult(authResult);
1463     if (!authResult.isAllowed()) {
1464       throw new AccessDeniedException("Insufficient permissions (table=" + table +
1465         ", action=READ)");
1466     }
1467   }
1468 
1469   @Override
1470   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1471       final Get get, final List<Cell> result) throws IOException {
1472     internalPreRead(c, get, OpType.GET);
1473   }
1474 
1475   @Override
1476   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1477       final Get get, final boolean exists) throws IOException {
1478     internalPreRead(c, get, OpType.EXISTS);
1479     return exists;
1480   }
1481 
1482   @Override
1483   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1484       final Put put, final WALEdit edit, final Durability durability)
1485       throws IOException {
1486     // Require WRITE permission to the table, CF, or top visible value, if any.
1487     // NOTE: We don't need to check the permissions for any earlier Puts
1488     // because we treat the ACLs in each Put as timestamped like any other
1489     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1490     // change the ACL of any previous Put. This allows simple evolution of
1491     // security policy over time without requiring expensive updates.
1492     User user = getActiveUser();
1493     checkForReservedTagPresence(user, put);
1494     RegionCoprocessorEnvironment env = c.getEnvironment();
1495     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1496     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1497     logResult(authResult);
1498     if (!authResult.isAllowed()) {
1499       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1500         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1501       } else {
1502         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1503       }
1504     }
1505     // Add cell ACLs from the operation to the cells themselves
1506     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1507     if (bytes != null) {
1508       if (cellFeaturesEnabled) {
1509         addCellPermissions(bytes, put.getFamilyCellMap());
1510       } else {
1511         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1512       }
1513     }
1514   }
1515 
1516   @Override
1517   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1518       final Put put, final WALEdit edit, final Durability durability) {
1519     if (aclRegion) {
1520       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1521     }
1522   }
1523 
1524   @Override
1525   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1526       final Delete delete, final WALEdit edit, final Durability durability)
1527       throws IOException {
1528     // An ACL on a delete is useless, we shouldn't allow it
1529     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1530       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1531     }
1532     // Require WRITE permissions on all cells covered by the delete. Unlike
1533     // for Puts we need to check all visible prior versions, because a major
1534     // compaction could remove them. If the user doesn't have permission to
1535     // overwrite any of the visible versions ('visible' defined as not covered
1536     // by a tombstone already) then we have to disallow this operation.
1537     RegionCoprocessorEnvironment env = c.getEnvironment();
1538     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1539     User user = getActiveUser();
1540     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1541     logResult(authResult);
1542     if (!authResult.isAllowed()) {
1543       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1544         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1545       } else {
1546         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1547       }
1548     }
1549   }
1550 
1551   @Override
1552   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1553       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1554     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1555       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1556       for (int i = 0; i < miniBatchOp.size(); i++) {
1557         Mutation m = miniBatchOp.getOperation(i);
1558         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1559           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1560           // perm check
1561           OpType opType;
1562           if (m instanceof Put) {
1563             checkForReservedTagPresence(getActiveUser(), m);
1564             opType = OpType.PUT;
1565           } else {
1566             opType = OpType.DELETE;
1567           }
1568           AuthResult authResult = null;
1569           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(), m.getFamilyCellMap(),
1570               m.getTimeStamp(), Action.WRITE)) {
1571             authResult = AuthResult.allow(opType.toString(), "Covering cell set", getActiveUser(),
1572                 Action.WRITE, table, m.getFamilyCellMap());
1573           } else {
1574             authResult = AuthResult.deny(opType.toString(), "Covering cell set", getActiveUser(),
1575                 Action.WRITE, table, m.getFamilyCellMap());
1576           }
1577           logResult(authResult);
1578           if (!authResult.isAllowed()) {
1579             throw new AccessDeniedException("Insufficient permissions "
1580                 + authResult.toContextString());
1581           }
1582         }
1583       }
1584     }
1585   }
1586 
1587   @Override
1588   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1589       final Delete delete, final WALEdit edit, final Durability durability)
1590       throws IOException {
1591     if (aclRegion) {
1592       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1593     }
1594   }
1595 
1596   @Override
1597   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1598       final byte [] row, final byte [] family, final byte [] qualifier,
1599       final CompareFilter.CompareOp compareOp,
1600       final ByteArrayComparable comparator, final Put put,
1601       final boolean result) throws IOException {
1602     // Require READ and WRITE permissions on the table, CF, and KV to update
1603     User user = getActiveUser();
1604     checkForReservedTagPresence(user, put);
1605     RegionCoprocessorEnvironment env = c.getEnvironment();
1606     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1607     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1608       Action.READ, Action.WRITE);
1609     logResult(authResult);
1610     if (!authResult.isAllowed()) {
1611       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1612         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1613       } else {
1614         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1615       }
1616     }
1617     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1618     if (bytes != null) {
1619       if (cellFeaturesEnabled) {
1620         addCellPermissions(bytes, put.getFamilyCellMap());
1621       } else {
1622         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1623       }
1624     }
1625     return result;
1626   }
1627 
1628   @Override
1629   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1630       final byte[] row, final byte[] family, final byte[] qualifier,
1631       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1632       final boolean result) throws IOException {
1633     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1634       // We had failure with table, cf and q perm checks and now giving a chance for cell
1635       // perm check
1636       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1637       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1638       AuthResult authResult = null;
1639       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1640           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1641         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1642             getActiveUser(), Action.READ, table, families);
1643       } else {
1644         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1645             getActiveUser(), Action.READ, table, families);
1646       }
1647       logResult(authResult);
1648       if (!authResult.isAllowed()) {
1649         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1650       }
1651     }
1652     return result;
1653   }
1654 
1655   @Override
1656   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1657       final byte [] row, final byte [] family, final byte [] qualifier,
1658       final CompareFilter.CompareOp compareOp,
1659       final ByteArrayComparable comparator, final Delete delete,
1660       final boolean result) throws IOException {
1661     // An ACL on a delete is useless, we shouldn't allow it
1662     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1663       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1664           delete.toString());
1665     }
1666     // Require READ and WRITE permissions on the table, CF, and the KV covered
1667     // by the delete
1668     RegionCoprocessorEnvironment env = c.getEnvironment();
1669     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1670     User user = getActiveUser();
1671     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1672       Action.READ, Action.WRITE);
1673     logResult(authResult);
1674     if (!authResult.isAllowed()) {
1675       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1676         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1677       } else {
1678         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1679       }
1680     }
1681     return result;
1682   }
1683 
1684   @Override
1685   public boolean preCheckAndDeleteAfterRowLock(
1686       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1687       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1688       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1689       throws IOException {
1690     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1691       // We had failure with table, cf and q perm checks and now giving a chance for cell
1692       // perm check
1693       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1694       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1695       AuthResult authResult = null;
1696       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1697           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1698         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1699             getActiveUser(), Action.READ, table, families);
1700       } else {
1701         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1702             getActiveUser(), Action.READ, table, families);
1703       }
1704       logResult(authResult);
1705       if (!authResult.isAllowed()) {
1706         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1707       }
1708     }
1709     return result;
1710   }
1711 
1712   @Override
1713   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1714       final byte [] row, final byte [] family, final byte [] qualifier,
1715       final long amount, final boolean writeToWAL)
1716       throws IOException {
1717     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1718     // incremented value
1719     RegionCoprocessorEnvironment env = c.getEnvironment();
1720     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1721     User user = getActiveUser();
1722     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1723       Action.WRITE);
1724     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1725       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1726         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1727       authResult.setReason("Covering cell set");
1728     }
1729     logResult(authResult);
1730     if (!authResult.isAllowed()) {
1731       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1732     }
1733     return -1;
1734   }
1735 
1736   @Override
1737   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1738       throws IOException {
1739     // Require WRITE permission to the table, CF, and the KV to be appended
1740     User user = getActiveUser();
1741     checkForReservedTagPresence(user, append);
1742     RegionCoprocessorEnvironment env = c.getEnvironment();
1743     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1744     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1745     logResult(authResult);
1746     if (!authResult.isAllowed()) {
1747       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1748         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1749       } else {
1750         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1751       }
1752     }
1753     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1754     if (bytes != null) {
1755       if (cellFeaturesEnabled) {
1756         addCellPermissions(bytes, append.getFamilyCellMap());
1757       } else {
1758         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1759       }
1760     }
1761     return null;
1762   }
1763 
1764   @Override
1765   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1766       final Append append) throws IOException {
1767     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1768       // We had failure with table, cf and q perm checks and now giving a chance for cell
1769       // perm check
1770       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1771       AuthResult authResult = null;
1772       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1773           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1774         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1775             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1776       } else {
1777         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1778             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1779       }
1780       logResult(authResult);
1781       if (!authResult.isAllowed()) {
1782         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1783       }
1784     }
1785     return null;
1786   }
1787 
1788   @Override
1789   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1790       final Increment increment)
1791       throws IOException {
1792     // Require WRITE permission to the table, CF, and the KV to be replaced by
1793     // the incremented value
1794     User user = getActiveUser();
1795     checkForReservedTagPresence(user, increment);
1796     RegionCoprocessorEnvironment env = c.getEnvironment();
1797     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1798     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1799       Action.WRITE);
1800     logResult(authResult);
1801     if (!authResult.isAllowed()) {
1802       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1803         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1804       } else {
1805         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1806       }
1807     }
1808     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1809     if (bytes != null) {
1810       if (cellFeaturesEnabled) {
1811         addCellPermissions(bytes, increment.getFamilyCellMap());
1812       } else {
1813         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1814       }
1815     }
1816     return null;
1817   }
1818 
1819   @Override
1820   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1821       final Increment increment) throws IOException {
1822     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1823       // We had failure with table, cf and q perm checks and now giving a chance for cell
1824       // perm check
1825       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1826       AuthResult authResult = null;
1827       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1828           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1829         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1830             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1831       } else {
1832         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1833             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1834       }
1835       logResult(authResult);
1836       if (!authResult.isAllowed()) {
1837         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1838       }
1839     }
1840     return null;
1841   }
1842 
1843   @Override
1844   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1845       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1846     // If the HFile version is insufficient to persist tags, we won't have any
1847     // work to do here
1848     if (!cellFeaturesEnabled) {
1849       return newCell;
1850     }
1851 
1852     // Collect any ACLs from the old cell
1853     List<Tag> tags = Lists.newArrayList();
1854     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1855     if (oldCell != null) {
1856       Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell.getTagsArray(),
1857           oldCell.getTagsOffset(), oldCell.getTagsLength());
1858       while (tagIterator.hasNext()) {
1859         Tag tag = tagIterator.next();
1860         if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
1861           if (LOG.isTraceEnabled()) {
1862             LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
1863               " length " + tag.getTagLength());
1864           }
1865           tags.add(tag);
1866         } else {
1867           ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
1868             AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
1869               tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
1870           perms.putAll(kvPerms);
1871         }
1872       }
1873     }
1874 
1875     // Do we have an ACL on the operation?
1876     byte[] aclBytes = mutation.getACL();
1877     if (aclBytes != null) {
1878       // Yes, use it
1879       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
1880     } else {
1881       // No, use what we carried forward
1882       if (perms != null) {
1883         // TODO: If we collected ACLs from more than one tag we may have a
1884         // List<Permission> of size > 1, this can be collapsed into a single
1885         // Permission
1886         if (LOG.isTraceEnabled()) {
1887           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1888         }
1889         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
1890           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
1891       }
1892     }
1893 
1894     // If we have no tags to add, just return
1895     if (tags.isEmpty()) {
1896       return newCell;
1897     }
1898 
1899     // We need to create another KV, unfortunately, because the current new KV
1900     // has no space for tags
1901     KeyValue newKv = KeyValueUtil.ensureKeyValue(newCell);
1902     KeyValue rewriteKv = new KeyValue(newKv.getRowArray(), newKv.getRowOffset(), newKv.getRowLength(),
1903       newKv.getFamilyArray(), newKv.getFamilyOffset(), newKv.getFamilyLength(),
1904       newKv.getQualifierArray(), newKv.getQualifierOffset(), newKv.getQualifierLength(),
1905       newKv.getTimestamp(), KeyValue.Type.codeToType(newKv.getTypeByte()),
1906       newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(),
1907       tags);
1908     // Preserve mvcc data
1909     rewriteKv.setSequenceId(newKv.getMvccVersion());
1910     return rewriteKv;
1911   }
1912 
1913   @Override
1914   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1915       final Scan scan, final RegionScanner s) throws IOException {
1916     internalPreRead(c, scan, OpType.SCAN);
1917     return s;
1918   }
1919 
1920   @Override
1921   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1922       final Scan scan, final RegionScanner s) throws IOException {
1923     User user = getActiveUser();
1924     if (user != null && user.getShortName() != null) {      // store reference to scanner owner for later checks
1925       scannerOwners.put(s, user.getShortName());
1926     }
1927     return s;
1928   }
1929 
1930   @Override
1931   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1932       final InternalScanner s, final List<Result> result,
1933       final int limit, final boolean hasNext) throws IOException {
1934     requireScannerOwner(s);
1935     return hasNext;
1936   }
1937 
1938   @Override
1939   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1940       final InternalScanner s) throws IOException {
1941     requireScannerOwner(s);
1942   }
1943 
1944   @Override
1945   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1946       final InternalScanner s) throws IOException {
1947     // clean up any associated owner mapping
1948     scannerOwners.remove(s);
1949   }
1950 
1951   /**
1952    * Verify, when servicing an RPC, that the caller is the scanner owner.
1953    * If so, we assume that access control is correctly enforced based on
1954    * the checks performed in preScannerOpen()
1955    */
1956   private void requireScannerOwner(InternalScanner s)
1957       throws AccessDeniedException {
1958     if (RequestContext.isInRequestContext()) {
1959       String requestUserName = RequestContext.getRequestUserName();
1960       String owner = scannerOwners.get(s);
1961       if (owner != null && !owner.equals(requestUserName)) {
1962         throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
1963       }
1964     }
1965   }
1966 
1967   /**
1968    * Verifies user has WRITE privileges on
1969    * the Column Families involved in the bulkLoadHFile
1970    * request. Specific Column Write privileges are presently
1971    * ignored.
1972    */
1973   @Override
1974   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1975       List<Pair<byte[], String>> familyPaths) throws IOException {
1976     for(Pair<byte[],String> el : familyPaths) {
1977       requirePermission("preBulkLoadHFile",
1978           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
1979           el.getFirst(),
1980           null,
1981           Action.CREATE);
1982     }
1983   }
1984 
1985   private AuthResult hasSomeAccess(RegionCoprocessorEnvironment e, String method, Action action) throws IOException {
1986     User requestUser = getActiveUser();
1987     TableName tableName = e.getRegion().getTableDesc().getTableName();
1988     AuthResult authResult = permissionGranted(method, requestUser,
1989         action, e, Collections.EMPTY_MAP);
1990     if (!authResult.isAllowed()) {
1991       for(UserPermission userPerm:
1992           AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), tableName)) {
1993         for(Action userAction: userPerm.getActions()) {
1994           if(userAction.equals(action)) {
1995             return AuthResult.allow(method, "Access allowed", requestUser,
1996                 action, tableName, null, null);
1997           }
1998         }
1999       }
2000     }
2001     return authResult;
2002   }
2003 
2004   /**
2005    * Authorization check for
2006    * SecureBulkLoadProtocol.prepareBulkLoad()
2007    * @param e
2008    * @throws IOException
2009    */
2010   //TODO this should end up as a coprocessor hook
2011   public void prePrepareBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
2012     AuthResult authResult = hasSomeAccess(e, "prePrepareBulkLoad", Action.WRITE);
2013     logResult(authResult);
2014     if (!authResult.isAllowed()) {
2015       throw new AccessDeniedException("Insufficient permissions (table=" +
2016         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
2017     }
2018   }
2019 
2020   /**
2021    * Authorization security check for
2022    * SecureBulkLoadProtocol.cleanupBulkLoad()
2023    * @param e
2024    * @throws IOException
2025    */
2026   //TODO this should end up as a coprocessor hook
2027   public void preCleanupBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
2028     AuthResult authResult = hasSomeAccess(e, "preCleanupBulkLoad", Action.WRITE);
2029     logResult(authResult);
2030     if (!authResult.isAllowed()) {
2031       throw new AccessDeniedException("Insufficient permissions (table=" +
2032         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
2033     }
2034   }
2035 
2036   /* ---- EndpointObserver implementation ---- */
2037 
2038   @Override
2039   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2040       Service service, String methodName, Message request) throws IOException {
2041     // Don't intercept calls to our own AccessControlService, we check for
2042     // appropriate permissions in the service handlers
2043     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2044       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
2045         methodName + ")",
2046         getTableName(ctx.getEnvironment()), null, null,
2047         Action.EXEC);
2048     }
2049     return request;
2050   }
2051 
2052   @Override
2053   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2054       Service service, String methodName, Message request, Message.Builder responseBuilder)
2055       throws IOException { }
2056 
2057   /* ---- Protobuf AccessControlService implementation ---- */
2058 
2059   @Override
2060   public void grant(RpcController controller,
2061                     AccessControlProtos.GrantRequest request,
2062                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2063     UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2064     AccessControlProtos.GrantResponse response = null;
2065     try {
2066       // verify it's only running at .acl.
2067       if (aclRegion) {
2068         if (!initialized) {
2069           throw new CoprocessorException("AccessController not yet initialized");
2070         }
2071         if (LOG.isDebugEnabled()) {
2072           LOG.debug("Received request to grant access permission " + perm.toString());
2073         }
2074 
2075         switch(request.getUserPermission().getPermission().getType()) {
2076           case Global :
2077           case Table :
2078             requirePermission("grant", perm.getTableName(), perm.getFamily(),
2079                 perm.getQualifier(), Action.ADMIN);
2080             break;
2081           case Namespace :
2082             requireGlobalPermission("grant", Action.ADMIN, perm.getNamespace());
2083         }
2084 
2085         AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2086         if (AUDITLOG.isTraceEnabled()) {
2087           // audit log should store permission changes in addition to auth results
2088           AUDITLOG.trace("Granted permission " + perm.toString());
2089         }
2090       } else {
2091         throw new CoprocessorException(AccessController.class, "This method "
2092             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2093       }
2094       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2095     } catch (IOException ioe) {
2096       // pass exception back up
2097       ResponseConverter.setControllerException(controller, ioe);
2098     }
2099     done.run(response);
2100   }
2101 
2102   @Override
2103   public void revoke(RpcController controller,
2104                      AccessControlProtos.RevokeRequest request,
2105                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2106     UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2107     AccessControlProtos.RevokeResponse response = null;
2108     try {
2109       // only allowed to be called on _acl_ region
2110       if (aclRegion) {
2111         if (!initialized) {
2112           throw new CoprocessorException("AccessController not yet initialized");
2113         }
2114         if (LOG.isDebugEnabled()) {
2115           LOG.debug("Received request to revoke access permission " + perm.toString());
2116         }
2117 
2118         switch(request.getUserPermission().getPermission().getType()) {
2119           case Global :
2120           case Table :
2121             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2122                               perm.getQualifier(), Action.ADMIN);
2123             break;
2124           case Namespace :
2125             requireGlobalPermission("revoke", Action.ADMIN, perm.getNamespace());
2126         }
2127 
2128         AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2129         if (AUDITLOG.isTraceEnabled()) {
2130           // audit log should record all permission changes
2131           AUDITLOG.trace("Revoked permission " + perm.toString());
2132         }
2133       } else {
2134         throw new CoprocessorException(AccessController.class, "This method "
2135             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2136       }
2137       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2138     } catch (IOException ioe) {
2139       // pass exception back up
2140       ResponseConverter.setControllerException(controller, ioe);
2141     }
2142     done.run(response);
2143   }
2144 
2145   @Override
2146   public void getUserPermissions(RpcController controller,
2147                                  AccessControlProtos.GetUserPermissionsRequest request,
2148                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2149     AccessControlProtos.GetUserPermissionsResponse response = null;
2150     try {
2151       // only allowed to be called on _acl_ region
2152       if (aclRegion) {
2153         if (!initialized) {
2154           throw new CoprocessorException("AccessController not yet initialized");
2155         }
2156         List<UserPermission> perms = null;
2157         if(request.getType() == AccessControlProtos.Permission.Type.Table) {
2158           TableName table = null;
2159           if (request.hasTableName()) {
2160             table = ProtobufUtil.toTableName(request.getTableName());
2161           }
2162           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2163 
2164           perms = AccessControlLists.getUserTablePermissions(
2165               regionEnv.getConfiguration(), table);
2166         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2167           perms = AccessControlLists.getUserNamespacePermissions(
2168               regionEnv.getConfiguration(), request.getNamespaceName().toStringUtf8());
2169         } else {
2170           perms = AccessControlLists.getUserPermissions(
2171               regionEnv.getConfiguration(), null);
2172         }
2173         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2174       } else {
2175         throw new CoprocessorException(AccessController.class, "This method "
2176             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2177       }
2178     } catch (IOException ioe) {
2179       // pass exception back up
2180       ResponseConverter.setControllerException(controller, ioe);
2181     }
2182     done.run(response);
2183   }
2184 
2185   @Override
2186   public void checkPermissions(RpcController controller,
2187                                AccessControlProtos.CheckPermissionsRequest request,
2188                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2189     Permission[] permissions = new Permission[request.getPermissionCount()];
2190     for (int i=0; i < request.getPermissionCount(); i++) {
2191       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2192     }
2193     AccessControlProtos.CheckPermissionsResponse response = null;
2194     try {
2195       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2196       for (Permission permission : permissions) {
2197         if (permission instanceof TablePermission) {
2198           TablePermission tperm = (TablePermission) permission;
2199           for (Action action : permission.getActions()) {
2200             if (!tperm.getTableName().equals(tableName)) {
2201               throw new CoprocessorException(AccessController.class, String.format("This method "
2202                   + "can only execute at the table specified in TablePermission. " +
2203                   "Table of the region:%s , requested table:%s", tableName,
2204                   tperm.getTableName()));
2205             }
2206 
2207             Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2208             if (tperm.getFamily() != null) {
2209               if (tperm.getQualifier() != null) {
2210                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2211                 qualifiers.add(tperm.getQualifier());
2212                 familyMap.put(tperm.getFamily(), qualifiers);
2213               } else {
2214                 familyMap.put(tperm.getFamily(), null);
2215               }
2216             }
2217 
2218             requirePermission("checkPermissions", action, regionEnv, familyMap);
2219           }
2220 
2221         } else {
2222           for (Action action : permission.getActions()) {
2223             requirePermission("checkPermissions", action);
2224           }
2225         }
2226       }
2227       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2228     } catch (IOException ioe) {
2229       ResponseConverter.setControllerException(controller, ioe);
2230     }
2231     done.run(response);
2232   }
2233 
2234   @Override
2235   public Service getService() {
2236     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2237   }
2238 
2239   private HRegion getRegion(RegionCoprocessorEnvironment e) {
2240     return e.getRegion();
2241   }
2242 
2243   private TableName getTableName(RegionCoprocessorEnvironment e) {
2244     HRegion region = e.getRegion();
2245     if (region != null) {
2246       return getTableName(region);
2247     }
2248     return null;
2249   }
2250 
2251   private TableName getTableName(HRegion region) {
2252     HRegionInfo regionInfo = region.getRegionInfo();
2253     if (regionInfo != null) {
2254       return regionInfo.getTable();
2255     }
2256     return null;
2257   }
2258 
2259   @Override
2260   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2261       throws IOException {
2262     requirePermission("preClose", Action.ADMIN);
2263   }
2264 
2265   private void isSystemOrSuperUser(Configuration conf) throws IOException {
2266     User user = userProvider.getCurrent();
2267     if (user == null) {
2268       throw new IOException("Unable to obtain the current user, " +
2269         "authorization checks for internal operations will not work correctly!");
2270     }
2271     User activeUser = getActiveUser();
2272     if (!(superusers.contains(activeUser.getShortName()))) {
2273       throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null") +
2274         "is not system or super user.");
2275     }
2276   }
2277 
2278   @Override
2279   public void preStopRegionServer(
2280       ObserverContext<RegionServerCoprocessorEnvironment> env)
2281       throws IOException {
2282     requirePermission("preStopRegionServer", Action.ADMIN);
2283   }
2284 
2285   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2286       byte[] qualifier) {
2287     if (family == null) {
2288       return null;
2289     }
2290 
2291     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2292     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2293     return familyMap;
2294   }
2295 
2296   @Override
2297   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2298       List<TableName> tableNamesList,
2299       List<HTableDescriptor> descriptors) throws IOException {
2300     // If the list is empty, this is a request for all table descriptors and requires GLOBAL
2301     // ADMIN privs.
2302     if (tableNamesList == null || tableNamesList.isEmpty()) {
2303       requireGlobalPermission("getTableDescriptors", Action.ADMIN, null, null);
2304     }
2305     // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2306     // request can be granted.
2307     else {
2308       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2309       for (TableName tableName: tableNamesList) {
2310         // Do not deny if the table does not exist
2311         try {
2312           masterServices.checkTableModifiable(tableName);
2313         } catch (TableNotFoundException ex) {
2314           // Skip checks for a table that does not exist
2315           continue;
2316         } catch (TableNotDisabledException ex) {
2317           // We don't care about this
2318         }
2319         requirePermission("getTableDescriptors", tableName, null, null,
2320           Action.ADMIN, Action.CREATE);
2321       }
2322     }
2323   }
2324 
2325   @Override
2326   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2327       List<HTableDescriptor> descriptors) throws IOException {
2328   }
2329 
2330   @Override
2331   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA,
2332       HRegion regionB) throws IOException {
2333     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2334       Action.ADMIN);
2335   }
2336 
2337   @Override
2338   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, HRegion regionA,
2339       HRegion regionB, HRegion mergedRegion) throws IOException { }
2340 
2341   @Override
2342   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2343       HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException { }
2344 
2345   @Override
2346   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2347       HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException { }
2348 
2349   @Override
2350   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2351       HRegion regionA, HRegion regionB) throws IOException { }
2352 
2353   @Override
2354   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2355       HRegion regionA, HRegion regionB) throws IOException { }
2356 }