View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package org.apache.hadoop.hbase.security.access;
16  
17  import java.io.IOException;
18  import java.net.InetAddress;
19  import java.util.Collection;
20  import java.util.Collections;
21  import java.util.HashMap;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  import java.util.Set;
27  import java.util.TreeMap;
28  import java.util.TreeSet;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.CellUtil;
35  import org.apache.hadoop.hbase.CompoundConfiguration;
36  import org.apache.hadoop.hbase.CoprocessorEnvironment;
37  import org.apache.hadoop.hbase.DoNotRetryIOException;
38  import org.apache.hadoop.hbase.HColumnDescriptor;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.HRegionInfo;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.KeyValue;
43  import org.apache.hadoop.hbase.KeyValue.Type;
44  import org.apache.hadoop.hbase.KeyValueUtil;
45  import org.apache.hadoop.hbase.NamespaceDescriptor;
46  import org.apache.hadoop.hbase.ServerName;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.TableNotDisabledException;
49  import org.apache.hadoop.hbase.TableNotFoundException;
50  import org.apache.hadoop.hbase.Tag;
51  import org.apache.hadoop.hbase.catalog.MetaReader;
52  import org.apache.hadoop.hbase.client.Append;
53  import org.apache.hadoop.hbase.client.Delete;
54  import org.apache.hadoop.hbase.client.Durability;
55  import org.apache.hadoop.hbase.client.Get;
56  import org.apache.hadoop.hbase.client.Increment;
57  import org.apache.hadoop.hbase.client.Mutation;
58  import org.apache.hadoop.hbase.client.Put;
59  import org.apache.hadoop.hbase.client.Query;
60  import org.apache.hadoop.hbase.client.Result;
61  import org.apache.hadoop.hbase.client.Scan;
62  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
63  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
64  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
65  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
66  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
67  import org.apache.hadoop.hbase.coprocessor.MasterObserver;
68  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
69  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
70  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
71  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
72  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
73  import org.apache.hadoop.hbase.filter.CompareFilter;
74  import org.apache.hadoop.hbase.filter.Filter;
75  import org.apache.hadoop.hbase.filter.FilterList;
76  import org.apache.hadoop.hbase.io.hfile.HFile;
77  import org.apache.hadoop.hbase.ipc.RequestContext;
78  import org.apache.hadoop.hbase.master.MasterServices;
79  import org.apache.hadoop.hbase.master.RegionPlan;
80  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
81  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
82  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
83  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
84  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
85  import org.apache.hadoop.hbase.regionserver.HRegion;
86  import org.apache.hadoop.hbase.regionserver.InternalScanner;
87  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
88  import org.apache.hadoop.hbase.regionserver.RegionScanner;
89  import org.apache.hadoop.hbase.regionserver.ScanType;
90  import org.apache.hadoop.hbase.regionserver.Store;
91  import org.apache.hadoop.hbase.regionserver.StoreFile;
92  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
93  import org.apache.hadoop.hbase.security.AccessDeniedException;
94  import org.apache.hadoop.hbase.security.User;
95  import org.apache.hadoop.hbase.security.UserProvider;
96  import org.apache.hadoop.hbase.security.access.Permission.Action;
97  import org.apache.hadoop.hbase.util.ByteRange;
98  import org.apache.hadoop.hbase.util.Bytes;
99  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
100 import org.apache.hadoop.hbase.util.Pair;
101 import org.apache.hadoop.hbase.util.SimpleByteRange;
102 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
103 
104 import com.google.common.collect.ArrayListMultimap;
105 import com.google.common.collect.ImmutableSet;
106 import com.google.common.collect.ListMultimap;
107 import com.google.common.collect.Lists;
108 import com.google.common.collect.MapMaker;
109 import com.google.common.collect.Maps;
110 import com.google.common.collect.Sets;
111 import com.google.protobuf.Message;
112 import com.google.protobuf.RpcCallback;
113 import com.google.protobuf.RpcController;
114 import com.google.protobuf.Service;
115 
116 /**
117  * Provides basic authorization checks for data access and administrative
118  * operations.
119  *
120  * <p>
121  * {@code AccessController} performs authorization checks for HBase operations
122  * based on:
123  * <ul>
124  *   <li>the identity of the user performing the operation</li>
125  *   <li>the scope over which the operation is performed, in increasing
126  *   specificity: global, table, column family, or qualifier</li>
127  *   <li>the type of action being performed (as mapped to
128  *   {@link Permission.Action} values)</li>
129  * </ul>
130  * If the authorization check fails, an {@link AccessDeniedException}
131  * will be thrown for the operation.
132  * </p>
133  *
134  * <p>
135  * To perform authorization checks, {@code AccessController} relies on the
136  * RpcServerEngine being loaded to provide
137  * the user identities for remote requests.
138  * </p>
139  *
140  * <p>
141  * The access control lists used for authorization can be manipulated via the
142  * exposed {@link AccessControlService} Interface implementation, and the associated
143  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
144  * commands.
145  * </p>
146  */
147 public class AccessController extends BaseRegionObserver
148     implements MasterObserver, RegionServerObserver,
149       AccessControlService.Interface, CoprocessorService, EndpointObserver {
150 
151   public static final Log LOG = LogFactory.getLog(AccessController.class);
152 
153   private static final Log AUDITLOG =
154     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
155   private static final String CHECK_COVERING_PERM = "check_covering_perm";
156   private static final byte[] TRUE = Bytes.toBytes(true);
157 
158   TableAuthManager authManager = null;
159 
160   // flags if we are running on a region of the _acl_ table
161   boolean aclRegion = false;
162 
163   // defined only for Endpoint implementation, so it can have way to
164   // access region services.
165   private RegionCoprocessorEnvironment regionEnv;
166 
167   /** Mapping of scanner instances to the user who created them */
168   private Map<InternalScanner,String> scannerOwners =
169       new MapMaker().weakKeys().makeMap();
170 
171   private UserProvider userProvider;
172 
173   // if we are able to support cell ACLs
174   boolean cellFeaturesEnabled;
175 
176   // if we should check EXEC permissions
177   boolean shouldCheckExecPermission;
178 
179   // if we should terminate access checks early as soon as table or CF grants
180   // allow access; pre-0.98 compatible behavior
181   boolean compatibleEarlyTermination;
182 
183   private volatile boolean initialized = false;
184 
185   // This boolean having relevance only in the Master.
186   private volatile boolean aclTabAvailable = false;
187 
188   public HRegion getRegion() {
189     return regionEnv != null ? regionEnv.getRegion() : null;
190   }
191 
192   public TableAuthManager getAuthManager() {
193     return authManager;
194   }
195 
196   void initialize(RegionCoprocessorEnvironment e) throws IOException {
197     final HRegion region = e.getRegion();
198     Configuration conf = e.getConfiguration();
199     Map<byte[], ListMultimap<String,TablePermission>> tables =
200         AccessControlLists.loadAll(region);
201     // For each table, write out the table's permissions to the respective
202     // znode for that table.
203     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
204       tables.entrySet()) {
205       byte[] entry = t.getKey();
206       ListMultimap<String,TablePermission> perms = t.getValue();
207       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
208       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
209     }
210     initialized = true;
211   }
212 
213   /**
214    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
215    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
216    * table updates.
217    */
218   void updateACL(RegionCoprocessorEnvironment e,
219       final Map<byte[], List<Cell>> familyMap) {
220     Set<byte[]> entries =
221         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
222     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
223       List<Cell> cells = f.getValue();
224       for (Cell cell: cells) {
225         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
226         if (Bytes.equals(kv.getFamilyArray(), kv.getFamilyOffset(),
227             kv.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
228             AccessControlLists.ACL_LIST_FAMILY.length)) {
229           entries.add(kv.getRow());
230         }
231       }
232     }
233     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
234     Configuration conf = regionEnv.getConfiguration();
235     for (byte[] entry: entries) {
236       try {
237         ListMultimap<String,TablePermission> perms =
238           AccessControlLists.getPermissions(conf, entry);
239         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
240         zkw.writeToZookeeper(entry, serialized);
241       } catch (IOException ex) {
242         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
243             ex);
244       }
245     }
246   }
247 
248   /**
249    * Check the current user for authorization to perform a specific action
250    * against the given set of row data.
251    *
252    * <p>Note: Ordering of the authorization checks
253    * has been carefully optimized to short-circuit the most common requests
254    * and minimize the amount of processing required.</p>
255    *
256    * @param permRequest the action being requested
257    * @param e the coprocessor environment
258    * @param families the map of column families to qualifiers present in
259    * the request
260    * @return an authorization result
261    */
262   AuthResult permissionGranted(String request, User user, Action permRequest,
263       RegionCoprocessorEnvironment e,
264       Map<byte [], ? extends Collection<?>> families) {
265     HRegionInfo hri = e.getRegion().getRegionInfo();
266     TableName tableName = hri.getTable();
267 
268     // 1. All users need read access to hbase:meta table.
269     // this is a very common operation, so deal with it quickly.
270     if (hri.isMetaRegion()) {
271       if (permRequest == Action.READ) {
272         return AuthResult.allow(request, "All users allowed", user,
273           permRequest, tableName, families);
274       }
275     }
276 
277     if (user == null) {
278       return AuthResult.deny(request, "No user associated with request!", null,
279         permRequest, tableName, families);
280     }
281 
282     // Users with CREATE/ADMIN rights need to modify hbase:meta and _acl_ table
283     // e.g. When a new table is created a new entry in hbase:meta is added,
284     // so the user need to be allowed to write on it.
285     // e.g. When a table is removed an entry is removed from hbase:meta and _acl_
286     // and the user need to be allowed to write on both tables.
287     if (permRequest == Action.WRITE &&
288        (hri.isMetaRegion() ||
289         Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) &&
290        (authManager.authorize(user, Action.CREATE) ||
291         authManager.authorize(user, Action.ADMIN)))
292     {
293        return AuthResult.allow(request, "Table permission granted", user,
294         permRequest, tableName, families);
295     }
296 
297     // 2. check for the table-level, if successful we can short-circuit
298     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
299       return AuthResult.allow(request, "Table permission granted", user,
300         permRequest, tableName, families);
301     }
302 
303     // 3. check permissions against the requested families
304     if (families != null && families.size() > 0) {
305       // all families must pass
306       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
307         // a) check for family level access
308         if (authManager.authorize(user, tableName, family.getKey(),
309             permRequest)) {
310           continue;  // family-level permission overrides per-qualifier
311         }
312 
313         // b) qualifier level access can still succeed
314         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
315           if (family.getValue() instanceof Set) {
316             // for each qualifier of the family
317             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
318             for (byte[] qualifier : familySet) {
319               if (!authManager.authorize(user, tableName, family.getKey(),
320                                          qualifier, permRequest)) {
321                 return AuthResult.deny(request, "Failed qualifier check", user,
322                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
323               }
324             }
325           } else if (family.getValue() instanceof List) { // List<KeyValue>
326             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
327             for (KeyValue kv : kvList) {
328               if (!authManager.authorize(user, tableName, family.getKey(),
329                       kv.getQualifier(), permRequest)) {
330                 return AuthResult.deny(request, "Failed qualifier check", user,
331                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
332               }
333             }
334           }
335         } else {
336           // no qualifiers and family-level check already failed
337           return AuthResult.deny(request, "Failed family check", user, permRequest,
338               tableName, makeFamilyMap(family.getKey(), null));
339         }
340       }
341 
342       // all family checks passed
343       return AuthResult.allow(request, "All family checks passed", user, permRequest,
344           tableName, families);
345     }
346 
347     // 4. no families to check and table level access failed
348     return AuthResult.deny(request, "No families to check and table permission failed",
349         user, permRequest, tableName, families);
350   }
351 
352   /**
353    * Check the current user for authorization to perform a specific action
354    * against the given set of row data.
355    * @param opType the operation type
356    * @param user the user
357    * @param e the coprocessor environment
358    * @param families the map of column families to qualifiers present in
359    * the request
360    * @param actions the desired actions
361    * @return an authorization result
362    */
363   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
364       Map<byte [], ? extends Collection<?>> families, Action... actions) {
365     AuthResult result = null;
366     for (Action action: actions) {
367       result = permissionGranted(opType.toString(), user, action, e, families);
368       if (!result.isAllowed()) {
369         return result;
370       }
371     }
372     return result;
373   }
374 
375   private void logResult(AuthResult result) {
376     if (AUDITLOG.isTraceEnabled()) {
377       RequestContext ctx = RequestContext.get();
378       InetAddress remoteAddr = null;
379       if (ctx != null) {
380         remoteAddr = ctx.getRemoteAddress();
381       }
382       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
383           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
384           "; reason: " + result.getReason() +
385           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
386           "; request: " + result.getRequest() +
387           "; context: " + result.toContextString());
388     }
389   }
390 
391   /**
392    * Returns the active user to which authorization checks should be applied.
393    * If we are in the context of an RPC call, the remote user is used,
394    * otherwise the currently logged in user is used.
395    */
396   private User getActiveUser() throws IOException {
397     User user = RequestContext.getRequestUser();
398     if (!RequestContext.isInRequestContext()) {
399       // for non-rpc handling, fallback to system user
400       user = userProvider.getCurrent();
401     }
402     return user;
403   }
404 
405   /**
406    * Authorizes that the current user has any of the given permissions for the
407    * given table, column family and column qualifier.
408    * @param tableName Table requested
409    * @param family Column family requested
410    * @param qualifier Column qualifier requested
411    * @throws IOException if obtaining the current user fails
412    * @throws AccessDeniedException if user has no authorization
413    */
414   private void requirePermission(String request, TableName tableName, byte[] family, byte[] qualifier,
415       Action... permissions) throws IOException {
416     User user = getActiveUser();
417     AuthResult result = null;
418 
419     for (Action permission : permissions) {
420       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
421         result = AuthResult.allow(request, "Table permission granted", user,
422                                   permission, tableName, family, qualifier);
423         break;
424       } else {
425         // rest of the world
426         result = AuthResult.deny(request, "Insufficient permissions", user,
427                                  permission, tableName, family, qualifier);
428       }
429     }
430     logResult(result);
431     if (!result.isAllowed()) {
432       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
433     }
434   }
435 
436   /**
437    * Authorizes that the current user has global privileges for the given action.
438    * @param perm The action being requested
439    * @throws IOException if obtaining the current user fails
440    * @throws AccessDeniedException if authorization is denied
441    */
442   private void requirePermission(String request, Action perm) throws IOException {
443     requireGlobalPermission(request, perm, null, null);
444   }
445 
446   /**
447    * Authorizes that the current user has permission to perform the given
448    * action on the set of table column families.
449    * @param perm Action that is required
450    * @param env The current coprocessor environment
451    * @param families The map of column families-qualifiers.
452    * @throws AccessDeniedException if the authorization check failed
453    */
454   private void requirePermission(String request, Action perm,
455         RegionCoprocessorEnvironment env,
456         Map<byte[], ? extends Collection<?>> families)
457       throws IOException {
458     User user = getActiveUser();
459     AuthResult result = permissionGranted(request, user, perm, env, families);
460     logResult(result);
461 
462     if (!result.isAllowed()) {
463       throw new AccessDeniedException("Insufficient permissions (table=" +
464         env.getRegion().getTableDesc().getTableName()+
465         ((families != null && families.size() > 0) ? ", family: " +
466         result.toFamilyString() : "") + ", action=" +
467         perm.toString() + ")");
468     }
469   }
470 
471   /**
472    * Checks that the user has the given global permission. The generated
473    * audit log message will contain context information for the operation
474    * being authorized, based on the given parameters.
475    * @param perm Action being requested
476    * @param tableName Affected table name.
477    * @param familyMap Affected column families.
478    */
479   private void requireGlobalPermission(String request, Action perm, TableName tableName,
480       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
481     User user = getActiveUser();
482     if (authManager.authorize(user, perm)) {
483       logResult(AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap));
484     } else {
485       logResult(AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap));
486       throw new AccessDeniedException("Insufficient permissions for user '" +
487           (user != null ? user.getShortName() : "null") +"' (global, action=" +
488           perm.toString() + ")");
489     }
490   }
491 
492   /**
493    * Checks that the user has the given global permission. The generated
494    * audit log message will contain context information for the operation
495    * being authorized, based on the given parameters.
496    * @param perm Action being requested
497    * @param namespace
498    */
499   private void requireGlobalPermission(String request, Action perm,
500                                        String namespace) throws IOException {
501     User user = getActiveUser();
502     if (authManager.authorize(user, perm)) {
503       logResult(AuthResult.allow(request, "Global check allowed", user, perm, namespace));
504     } else {
505       logResult(AuthResult.deny(request, "Global check failed", user, perm, namespace));
506       throw new AccessDeniedException("Insufficient permissions for user '" +
507           (user != null ? user.getShortName() : "null") +"' (global, action=" +
508           perm.toString() + ")");
509     }
510   }
511 
512   /**
513    * Returns <code>true</code> if the current user is allowed the given action
514    * over at least one of the column qualifiers in the given column families.
515    */
516   private boolean hasFamilyQualifierPermission(User user,
517       Action perm,
518       RegionCoprocessorEnvironment env,
519       Map<byte[], ? extends Collection<byte[]>> familyMap)
520     throws IOException {
521     HRegionInfo hri = env.getRegion().getRegionInfo();
522     TableName tableName = hri.getTable();
523 
524     if (user == null) {
525       return false;
526     }
527 
528     if (familyMap != null && familyMap.size() > 0) {
529       // at least one family must be allowed
530       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
531           familyMap.entrySet()) {
532         if (family.getValue() != null && !family.getValue().isEmpty()) {
533           for (byte[] qualifier : family.getValue()) {
534             if (authManager.matchPermission(user, tableName,
535                 family.getKey(), qualifier, perm)) {
536               return true;
537             }
538           }
539         } else {
540           if (authManager.matchPermission(user, tableName, family.getKey(),
541               perm)) {
542             return true;
543           }
544         }
545       }
546     } else if (LOG.isDebugEnabled()) {
547       LOG.debug("Empty family map passed for permission check");
548     }
549 
550     return false;
551   }
552 
553   private enum OpType {
554     GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
555     GET("get"),
556     EXISTS("exists"),
557     SCAN("scan"),
558     PUT("put"),
559     DELETE("delete"),
560     CHECK_AND_PUT("checkAndPut"),
561     CHECK_AND_DELETE("checkAndDelete"),
562     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
563     APPEND("append"),
564     INCREMENT("increment");
565 
566     private String type;
567 
568     private OpType(String type) {
569       this.type = type;
570     }
571 
572     @Override
573     public String toString() {
574       return type;
575     }
576   }
577 
578   /**
579    * Determine if cell ACLs covered by the operation grant access. This is expensive.
580    * @return false if cell ACLs failed to grant access, true otherwise
581    * @throws IOException
582    */
583   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
584       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
585       throws IOException {
586     if (!cellFeaturesEnabled) {
587       return false;
588     }
589     long cellGrants = 0;
590     User user = getActiveUser();
591     long latestCellTs = 0;
592     Get get = new Get(row);
593     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
594     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
595     // version. We have to get every cell version and check its TS against the TS asked for in
596     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
597     // consider only one such passing cell. In case of Delete we have to consider all the cell
598     // versions under this passing version. When Delete Mutation contains columns which are a
599     // version delete just consider only one version for those column cells.
600     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
601     if (considerCellTs) {
602       get.setMaxVersions();
603     } else {
604       get.setMaxVersions(1);
605     }
606     boolean diffCellTsFromOpTs = false;
607     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
608       byte[] col = entry.getKey();
609       // TODO: HBASE-7114 could possibly unify the collection type in family
610       // maps so we would not need to do this
611       if (entry.getValue() instanceof Set) {
612         Set<byte[]> set = (Set<byte[]>)entry.getValue();
613         if (set == null || set.isEmpty()) {
614           get.addFamily(col);
615         } else {
616           for (byte[] qual: set) {
617             get.addColumn(col, qual);
618           }
619         }
620       } else if (entry.getValue() instanceof List) {
621         List<Cell> list = (List<Cell>)entry.getValue();
622         if (list == null || list.isEmpty()) {
623           get.addFamily(col);
624         } else {
625           // In case of family delete, a Cell will be added into the list with Qualifier as null.
626           for (Cell cell : list) {
627             if (cell.getQualifierLength() == 0
628                 && (cell.getTypeByte() == Type.DeleteFamily.getCode() 
629                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
630               get.addFamily(col);
631             } else {
632               get.addColumn(col, CellUtil.cloneQualifier(cell));
633             }
634             if (considerCellTs) {
635               long cellTs = cell.getTimestamp();
636               latestCellTs = Math.max(latestCellTs, cellTs);
637               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
638             }
639           }
640         }
641       } else {
642         throw new RuntimeException("Unhandled collection type " +
643           entry.getValue().getClass().getName());
644       }
645     }
646     // We want to avoid looking into the future. So, if the cells of the
647     // operation specify a timestamp, or the operation itself specifies a
648     // timestamp, then we use the maximum ts found. Otherwise, we bound
649     // the Get to the current server time. We add 1 to the timerange since
650     // the upper bound of a timerange is exclusive yet we need to examine
651     // any cells found there inclusively.
652     long latestTs = Math.max(opTs, latestCellTs);
653     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
654       latestTs = EnvironmentEdgeManager.currentTimeMillis();
655     }
656     get.setTimeRange(0, latestTs + 1);
657     // In case of Put operation we set to read all versions. This was done to consider the case
658     // where columns are added with TS other than the Mutation TS. But normally this wont be the
659     // case with Put. There no need to get all versions but get latest version only.
660     if (!diffCellTsFromOpTs && request == OpType.PUT) {
661       get.setMaxVersions(1);
662     }
663     if (LOG.isTraceEnabled()) {
664       LOG.trace("Scanning for cells with " + get);
665     }
666     // This Map is identical to familyMap. The key is a BR rather than byte[].
667     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
668     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
669     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
670     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
671       if (entry.getValue() instanceof List) {
672         familyMap1.put(new SimpleByteRange(entry.getKey()), (List<Cell>) entry.getValue());
673       }
674     }
675     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
676     List<Cell> cells = Lists.newArrayList();
677     Cell prevCell = null;
678     ByteRange curFam = new SimpleByteRange();
679     boolean curColAllVersions = (request == OpType.DELETE);
680     long curColCheckTs = opTs;
681     boolean foundColumn = false;
682     try {
683       boolean more = false;
684       do {
685         cells.clear();
686         // scan with limit as 1 to hold down memory use on wide rows
687         more = scanner.next(cells, 1);
688         for (Cell cell: cells) {
689           if (LOG.isTraceEnabled()) {
690             LOG.trace("Found cell " + cell);
691           }
692           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
693           if (colChange) foundColumn = false;
694           prevCell = cell;
695           if (!curColAllVersions && foundColumn) {
696             continue;
697           }
698           if (colChange && considerCellTs) {
699             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
700             List<Cell> cols = familyMap1.get(curFam);
701             for (Cell col : cols) {
702               // null/empty qualifier is used to denote a Family delete. The TS and delete type
703               // associated with this is applicable for all columns within the family. That is
704               // why the below (col.getQualifierLength() == 0) check.
705               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
706                   || CellUtil.matchingQualifier(cell, col)) {
707                 byte type = col.getTypeByte();
708                 if (considerCellTs) {
709                   curColCheckTs = col.getTimestamp();
710                 }
711                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
712                 // a version delete for a column no need to check all the covering cells within
713                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
714                 // One version delete types are Delete/DeleteFamilyVersion
715                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
716                     || (KeyValue.Type.DeleteFamily.getCode() == type);
717                 break;
718               }
719             }
720           }
721           if (cell.getTimestamp() > curColCheckTs) {
722             // Just ignore this cell. This is not a covering cell.
723             continue;
724           }
725           foundColumn = true;
726           for (Action action: actions) {
727             // Are there permissions for this user for the cell?
728             if (!authManager.authorize(user, getTableName(e), cell, action)) {
729               // We can stop if the cell ACL denies access
730               return false;
731             }
732           }
733           cellGrants++;
734         }
735       } while (more);
736     } catch (AccessDeniedException ex) {
737       throw ex;
738     } catch (IOException ex) {
739       LOG.error("Exception while getting cells to calculate covering permission", ex);
740     } finally {
741       scanner.close();
742     }
743     // We should not authorize unless we have found one or more cell ACLs that
744     // grant access. This code is used to check for additional permissions
745     // after no table or CF grants are found.
746     return cellGrants > 0;
747   }
748 
749   private void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
750     // Iterate over the entries in the familyMap, replacing the cells therein
751     // with new cells including the ACL data
752     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
753       List<Cell> newCells = Lists.newArrayList();
754       for (Cell cell: e.getValue()) {
755         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
756         byte[] tagBytes = CellUtil.getTagArray(cell);
757         Iterator<Tag> tagIterator = CellUtil.tagsIterator(tagBytes, 0, tagBytes.length);
758         while (tagIterator.hasNext()) {
759           tags.add(tagIterator.next());
760         }
761         // Ensure KeyValue so we can do a scatter gather copy. This is only a win if the
762         // incoming cell type is actually KeyValue.
763         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
764         newCells.add(
765           new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
766             kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(),
767             kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
768             kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()),
769             kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(),
770             tags));
771       }
772       // This is supposed to be safe, won't CME
773       e.setValue(newCells);
774     }
775   }
776 
777   /* ---- MasterObserver implementation ---- */
778 
779   public void start(CoprocessorEnvironment env) throws IOException {
780     CompoundConfiguration conf = new CompoundConfiguration();
781     conf.add(env.getConfiguration());
782 
783     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
784       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
785 
786     cellFeaturesEnabled = HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS;
787     if (!cellFeaturesEnabled) {
788       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
789           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
790           + " accordingly.");
791     }
792 
793     ZooKeeperWatcher zk = null;
794     if (env instanceof MasterCoprocessorEnvironment) {
795       // if running on HMaster
796       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
797       zk = mEnv.getMasterServices().getZooKeeper();
798     } else if (env instanceof RegionServerCoprocessorEnvironment) {      
799       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
800       zk = rsEnv.getRegionServerServices().getZooKeeper();      
801     } else if (env instanceof RegionCoprocessorEnvironment) {
802       // if running at region
803       regionEnv = (RegionCoprocessorEnvironment) env;
804       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
805       zk = regionEnv.getRegionServerServices().getZooKeeper();
806       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
807         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
808     }
809 
810     // set the user-provider.
811     this.userProvider = UserProvider.instantiate(env.getConfiguration());
812 
813     // If zk is null or IOException while obtaining auth manager,
814     // throw RuntimeException so that the coprocessor is unloaded.
815     if (zk != null) {
816       try {
817         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
818       } catch (IOException ioe) {
819         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
820       }
821     } else {
822       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
823     }
824   }
825 
826   public void stop(CoprocessorEnvironment env) {
827 
828   }
829 
830   @Override
831   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
832       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
833     Set<byte[]> families = desc.getFamiliesKeys();
834     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
835     for (byte[] family: families) {
836       familyMap.put(family, null);
837     }
838     requireGlobalPermission("createTable", Action.CREATE, desc.getTableName(), familyMap);
839   }
840 
841   @Override
842   public void preCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
843       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {}
844 
845   @Override
846   public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
847       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {}
848 
849   @Override
850   public void postCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
851       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
852     // When AC is used, it should be configured as the 1st CP.
853     // In Master, the table operations like create, are handled by a Thread pool but the max size
854     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
855     // sequentially only.
856     // Related code in HMaster#startServiceThreads
857     // {code}
858     //   // We depend on there being only one instance of this executor running
859     //   // at a time. To do concurrency, would need fencing of enable/disable of
860     //   // tables.
861     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
862     // {code}
863     // In future if we change this pool to have more threads, then there is a chance for thread,
864     // creating acl table, getting delayed and by that time another table creation got over and
865     // this hook is getting called. In such a case, we will need a wait logic here which will
866     // wait till the acl table is created.
867     if (AccessControlLists.isAclTable(desc)) {
868       this.aclTabAvailable = true;
869     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
870       if (!aclTabAvailable) {
871         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
872             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
873             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
874       } else {
875         String owner = desc.getOwnerString();
876         // default the table owner to current user, if not specified.
877         if (owner == null)
878           owner = getActiveUser().getShortName();
879         UserPermission userperm = new UserPermission(Bytes.toBytes(owner), desc.getTableName(),
880             null, Action.values());
881         AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(), userperm);
882       }
883     }
884   }
885 
886   @Override
887   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
888       throws IOException {
889     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
890   }
891 
892   @Override
893   public void preDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
894       TableName tableName) throws IOException {}
895 
896   @Override
897   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
898       TableName tableName) throws IOException {
899     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(), tableName);
900   }
901 
902   @Override
903   public void postDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
904       TableName tableName) throws IOException {}
905 
906    @Override
907   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
908       throws IOException {
909     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
910   }
911   @Override
912   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
913       TableName tableName) throws IOException {
914   }
915   @Override
916   public void preTruncateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
917       TableName tableName) throws IOException {}
918   @Override
919   public void postTruncateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
920       TableName tableName) throws IOException {}
921 
922   @Override
923   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
924       HTableDescriptor htd) throws IOException {
925     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
926   }
927 
928   @Override
929   public void preModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
930       TableName tableName, HTableDescriptor htd) throws IOException {}
931 
932   @Override
933   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
934       TableName tableName, HTableDescriptor htd) throws IOException {
935     String owner = htd.getOwnerString();
936     // default the table owner to current user, if not specified.
937     if (owner == null) owner = getActiveUser().getShortName();
938     UserPermission userperm = new UserPermission(Bytes.toBytes(owner), htd.getTableName(), null,
939         Action.values());
940     AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(), userperm);
941   }
942 
943   @Override
944   public void postModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
945       TableName tableName, HTableDescriptor htd) throws IOException {}
946 
947 
948   @Override
949   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
950       HColumnDescriptor column) throws IOException {
951     requirePermission("addColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
952   }
953 
954   @Override
955   public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
956       TableName tableName, HColumnDescriptor column) throws IOException {}
957 
958   @Override
959   public void postAddColumn(ObserverContext<MasterCoprocessorEnvironment> c,
960       TableName tableName, HColumnDescriptor column) throws IOException {}
961 
962   @Override
963   public void postAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
964       TableName tableName, HColumnDescriptor column) throws IOException {}
965 
966   @Override
967   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
968       HColumnDescriptor descriptor) throws IOException {
969     requirePermission("modifyColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
970   }
971 
972   @Override
973   public void preModifyColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
974       TableName tableName, HColumnDescriptor descriptor) throws IOException {}
975 
976   @Override
977   public void postModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c,
978       TableName tableName, HColumnDescriptor descriptor) throws IOException {}
979 
980   @Override
981   public void postModifyColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
982       TableName tableName, HColumnDescriptor descriptor) throws IOException {}
983 
984   @Override
985   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
986       byte[] col) throws IOException {
987     requirePermission("deleteColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
988   }
989 
990   @Override
991   public void preDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
992       TableName tableName, byte[] col) throws IOException {}
993 
994   @Override
995   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
996       TableName tableName, byte[] col) throws IOException {
997     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(), tableName, col);
998   }
999 
1000   @Override
1001   public void postDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1002       TableName tableName, byte[] col) throws IOException {}
1003 
1004   @Override
1005   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1006       throws IOException {
1007     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1008   }
1009 
1010   @Override
1011   public void preEnableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1012       TableName tableName) throws IOException {}
1013 
1014   @Override
1015   public void postEnableTable(ObserverContext<MasterCoprocessorEnvironment> c,
1016       TableName tableName) throws IOException {}
1017 
1018   @Override
1019   public void postEnableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1020       TableName tableName) throws IOException {}
1021 
1022   @Override
1023   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1024       throws IOException {
1025     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1026       throw new AccessDeniedException("Not allowed to disable "
1027           + AccessControlLists.ACL_TABLE_NAME + " table.");
1028     }
1029     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1030   }
1031 
1032   @Override
1033   public void preDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1034       TableName tableName) throws IOException {}
1035 
1036   @Override
1037   public void postDisableTable(ObserverContext<MasterCoprocessorEnvironment> c,
1038       TableName tableName) throws IOException {}
1039 
1040   @Override
1041   public void postDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1042       TableName tableName) throws IOException {}
1043 
1044   @Override
1045   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1046       ServerName srcServer, ServerName destServer) throws IOException {
1047     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1048   }
1049 
1050   @Override
1051   public void postMove(ObserverContext<MasterCoprocessorEnvironment> c,
1052       HRegionInfo region, ServerName srcServer, ServerName destServer)
1053     throws IOException {}
1054 
1055   @Override
1056   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1057       throws IOException {
1058     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1059   }
1060 
1061   @Override
1062   public void postAssign(ObserverContext<MasterCoprocessorEnvironment> c,
1063       HRegionInfo regionInfo) throws IOException {}
1064 
1065   @Override
1066   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1067       boolean force) throws IOException {
1068     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1069   }
1070 
1071   @Override
1072   public void postUnassign(ObserverContext<MasterCoprocessorEnvironment> c,
1073       HRegionInfo regionInfo, boolean force) throws IOException {}
1074 
1075   @Override
1076   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1077       HRegionInfo regionInfo) throws IOException {
1078     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1079   }
1080 
1081   @Override
1082   public void postRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1083       HRegionInfo regionInfo) throws IOException {
1084   }
1085 
1086   @Override
1087   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1088       throws IOException {
1089     requirePermission("balance", Action.ADMIN);
1090   }
1091 
1092   @Override
1093   public void postBalance(ObserverContext<MasterCoprocessorEnvironment> c, List<RegionPlan> plans)
1094       throws IOException {}
1095 
1096   @Override
1097   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1098       boolean newValue) throws IOException {
1099     requirePermission("balanceSwitch", Action.ADMIN);
1100     return newValue;
1101   }
1102 
1103   @Override
1104   public void postBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1105       boolean oldValue, boolean newValue) throws IOException {}
1106 
1107   @Override
1108   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1109       throws IOException {
1110     requirePermission("shutdown", Action.ADMIN);
1111   }
1112 
1113   @Override
1114   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1115       throws IOException {
1116     requirePermission("stopMaster", Action.ADMIN);
1117   }
1118 
1119   @Override
1120   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1121       throws IOException {
1122     if (!MetaReader.tableExists(ctx.getEnvironment().getMasterServices().getCatalogTracker(),
1123         AccessControlLists.ACL_TABLE_NAME)) {
1124       // initialize the ACL storage table
1125       AccessControlLists.init(ctx.getEnvironment().getMasterServices());
1126     } else {
1127       aclTabAvailable = true;
1128     }
1129   }
1130 
1131   @Override
1132   public void preMasterInitialization(
1133       ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
1134   }
1135 
1136   @Override
1137   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1138       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1139       throws IOException {
1140     requirePermission("snapshot", Action.ADMIN);
1141   }
1142 
1143   @Override
1144   public void postSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1145       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1146       throws IOException {
1147   }
1148 
1149   @Override
1150   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1151       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1152       throws IOException {
1153     requirePermission("clone", Action.ADMIN);
1154   }
1155 
1156   @Override
1157   public void postCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1158       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1159       throws IOException {
1160   }
1161 
1162   @Override
1163   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1164       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1165       throws IOException {
1166     requirePermission("restore", Action.ADMIN);
1167   }
1168 
1169   @Override
1170   public void postRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1171       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1172       throws IOException {
1173   }
1174 
1175   @Override
1176   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1177       final SnapshotDescription snapshot) throws IOException {
1178     requirePermission("deleteSnapshot", Action.ADMIN);
1179   }
1180 
1181   @Override
1182   public void postDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1183       final SnapshotDescription snapshot) throws IOException {
1184   }
1185 
1186   @Override
1187   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1188       NamespaceDescriptor ns) throws IOException {
1189     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1190   }
1191 
1192   @Override
1193   public void postCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1194       NamespaceDescriptor ns) throws IOException {
1195   }
1196 
1197   @Override
1198   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1199       throws IOException {
1200     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1201   }
1202 
1203   @Override
1204   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1205                                   String namespace) throws IOException {
1206     AccessControlLists.removeNamespacePermissions(ctx.getEnvironment().getConfiguration(),
1207         namespace);
1208     LOG.info(namespace + "entry deleted in "+AccessControlLists.ACL_TABLE_NAME+" table.");
1209   }
1210 
1211   @Override
1212   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1213       NamespaceDescriptor ns) throws IOException {
1214     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1215   }
1216 
1217   @Override
1218   public void postModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1219                                   NamespaceDescriptor ns) throws IOException {
1220   }
1221 
1222   @Override
1223   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1224       final TableName tableName) throws IOException {
1225     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1226   }
1227 
1228   @Override
1229   public void postTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1230       final TableName tableName) throws IOException {
1231   }
1232 
1233   /* ---- RegionObserver implementation ---- */
1234 
1235   @Override
1236   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1237       throws IOException {
1238     RegionCoprocessorEnvironment env = e.getEnvironment();
1239     final HRegion region = env.getRegion();
1240     if (region == null) {
1241       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1242     } else {
1243       HRegionInfo regionInfo = region.getRegionInfo();
1244       if (regionInfo.getTable().isSystemTable()) {
1245         isSystemOrSuperUser(regionEnv.getConfiguration());
1246       } else {
1247         requirePermission("preOpen", Action.ADMIN);
1248       }
1249     }
1250   }
1251 
1252   @Override
1253   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1254     RegionCoprocessorEnvironment env = c.getEnvironment();
1255     final HRegion region = env.getRegion();
1256     if (region == null) {
1257       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1258       return;
1259     }
1260     if (AccessControlLists.isAclRegion(region)) {
1261       aclRegion = true;
1262       // When this region is under recovering state, initialize will be handled by postLogReplay
1263       if (!region.isRecovering()) {
1264         try {
1265           initialize(env);
1266         } catch (IOException ex) {
1267           // if we can't obtain permissions, it's better to fail
1268           // than perform checks incorrectly
1269           throw new RuntimeException("Failed to initialize permissions cache", ex);
1270         }
1271       }
1272     } else {
1273       initialized = true;
1274     }
1275   }
1276 
1277   @Override
1278   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1279     if (aclRegion) {
1280       try {
1281         initialize(c.getEnvironment());
1282       } catch (IOException ex) {
1283         // if we can't obtain permissions, it's better to fail
1284         // than perform checks incorrectly
1285         throw new RuntimeException("Failed to initialize permissions cache", ex);
1286       }
1287     }
1288   }
1289 
1290   @Override
1291   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1292     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1293         Action.CREATE);
1294   }
1295 
1296   @Override
1297   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1298     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1299   }
1300   
1301   @Override
1302   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1303       byte[] splitRow) throws IOException {
1304     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1305   }
1306 
1307   @Override
1308   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1309       final Store store, final InternalScanner scanner, final ScanType scanType)
1310           throws IOException {
1311     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1312         Action.CREATE);
1313     return scanner;
1314   }
1315 
1316   @Override
1317   public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> e,
1318       final Store store, final List<StoreFile> candidates) throws IOException {
1319     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1320   }
1321 
1322   @Override
1323   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1324       final byte [] row, final byte [] family, final Result result)
1325       throws IOException {
1326     assert family != null;
1327     RegionCoprocessorEnvironment env = c.getEnvironment();
1328     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, null);
1329     User user = getActiveUser();
1330     AuthResult authResult = permissionGranted(OpType.GET_CLOSEST_ROW_BEFORE, user, env, families,
1331       Action.READ);
1332     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1333       authResult.setAllowed(checkCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, env, row,
1334         families, HConstants.LATEST_TIMESTAMP, Action.READ));
1335       authResult.setReason("Covering cell set");
1336     }
1337     logResult(authResult);
1338     if (!authResult.isAllowed()) {
1339       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1340     }
1341   }
1342 
1343   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1344       final Query query, OpType opType) throws IOException {
1345     Filter filter = query.getFilter();
1346     // Don't wrap an AccessControlFilter
1347     if (filter != null && filter instanceof AccessControlFilter) {
1348       return;
1349     }
1350     User user = getActiveUser();
1351     RegionCoprocessorEnvironment env = c.getEnvironment();
1352     Map<byte[],? extends Collection<byte[]>> families = null;
1353     switch (opType) {
1354     case GET:
1355     case EXISTS:
1356       families = ((Get)query).getFamilyMap();
1357       break;
1358     case SCAN:
1359       families = ((Scan)query).getFamilyMap();
1360       break;
1361     default:
1362       throw new RuntimeException("Unhandled operation " + opType);
1363     }
1364     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1365     HRegion region = getRegion(env);
1366     TableName table = getTableName(region);
1367     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1368     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1369       cfVsMaxVersions.put(new SimpleByteRange(hcd.getName()), hcd.getMaxVersions());
1370     }
1371     if (!authResult.isAllowed()) {
1372       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1373         // Old behavior: Scan with only qualifier checks if we have partial
1374         // permission. Backwards compatible behavior is to throw an
1375         // AccessDeniedException immediately if there are no grants for table
1376         // or CF or CF+qual. Only proceed with an injected filter if there are
1377         // grants for qualifiers. Otherwise we will fall through below and log
1378         // the result and throw an ADE. We may end up checking qualifier
1379         // grants three times (permissionGranted above, here, and in the
1380         // filter) but that's the price of backwards compatibility. 
1381         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1382           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1383             query.getACLStrategy() ? AccessControlFilter.Strategy.CHECK_CELL_FIRST :
1384               AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1385             cfVsMaxVersions);
1386           // wrap any existing filter
1387           if (filter != null) {
1388             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1389               Lists.newArrayList(ourFilter, filter));
1390           }
1391           authResult.setAllowed(true);;
1392           authResult.setReason("Access allowed with filter");
1393           switch (opType) {
1394           case GET:
1395           case EXISTS:
1396             ((Get)query).setFilter(ourFilter);
1397             break;
1398           case SCAN:
1399             ((Scan)query).setFilter(ourFilter);
1400             break;
1401           default:
1402             throw new RuntimeException("Unhandled operation " + opType);
1403           }
1404         }
1405       } else {
1406         // New behavior: Any access we might be granted is more fine-grained
1407         // than whole table or CF. Simply inject a filter and return what is
1408         // allowed. We will not throw an AccessDeniedException. This is a
1409         // behavioral change since 0.96. 
1410         Filter ourFilter = new AccessControlFilter(authManager, user, table,
1411           query.getACLStrategy() ? AccessControlFilter.Strategy.CHECK_CELL_FIRST :
1412             AccessControlFilter.Strategy.CHECK_CELL_DEFAULT,
1413           cfVsMaxVersions);
1414         // wrap any existing filter
1415         if (filter != null) {
1416           ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1417             Lists.newArrayList(ourFilter, filter));
1418         }
1419         authResult.setAllowed(true);;
1420         authResult.setReason("Access allowed with filter");
1421         switch (opType) {
1422         case GET:
1423         case EXISTS:
1424           ((Get)query).setFilter(ourFilter);
1425           break;
1426         case SCAN:
1427           ((Scan)query).setFilter(ourFilter);
1428           break;
1429         default:
1430           throw new RuntimeException("Unhandled operation " + opType);
1431         }
1432       }
1433     }
1434 
1435     logResult(authResult);
1436     if (!authResult.isAllowed()) {
1437       throw new AccessDeniedException("Insufficient permissions (table=" + table +
1438         ", action=READ)");
1439     }
1440   }
1441 
1442   @Override
1443   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1444       final Get get, final List<Cell> result) throws IOException {
1445     internalPreRead(c, get, OpType.GET);
1446   }
1447 
1448   @Override
1449   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1450       final Get get, final boolean exists) throws IOException {
1451     internalPreRead(c, get, OpType.EXISTS);
1452     return exists;
1453   }
1454 
1455   @Override
1456   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1457       final Put put, final WALEdit edit, final Durability durability)
1458       throws IOException {
1459     // Require WRITE permission to the table, CF, or top visible value, if any.
1460     // NOTE: We don't need to check the permissions for any earlier Puts
1461     // because we treat the ACLs in each Put as timestamped like any other
1462     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1463     // change the ACL of any previous Put. This allows simple evolution of
1464     // security policy over time without requiring expensive updates.
1465     RegionCoprocessorEnvironment env = c.getEnvironment();
1466     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1467     User user = getActiveUser();
1468     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1469     logResult(authResult);
1470     if (!authResult.isAllowed()) {
1471       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1472         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1473       } else {
1474         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1475       }
1476     }
1477     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1478     if (bytes != null) {
1479       if (cellFeaturesEnabled) {
1480         addCellPermissions(bytes, put.getFamilyCellMap());
1481       } else {
1482         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1483       }
1484     }
1485   }
1486 
1487   @Override
1488   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1489       final Put put, final WALEdit edit, final Durability durability) {
1490     if (aclRegion) {
1491       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1492     }
1493   }
1494 
1495   @Override
1496   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1497       final Delete delete, final WALEdit edit, final Durability durability)
1498       throws IOException {
1499     // An ACL on a delete is useless, we shouldn't allow it
1500     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1501       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1502     }
1503     // Require WRITE permissions on all cells covered by the delete. Unlike
1504     // for Puts we need to check all visible prior versions, because a major
1505     // compaction could remove them. If the user doesn't have permission to
1506     // overwrite any of the visible versions ('visible' defined as not covered
1507     // by a tombstone already) then we have to disallow this operation.
1508     RegionCoprocessorEnvironment env = c.getEnvironment();
1509     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1510     User user = getActiveUser();
1511     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1512     logResult(authResult);
1513     if (!authResult.isAllowed()) {
1514       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1515         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1516       } else {
1517         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1518       }
1519     }
1520   }
1521 
1522   @Override
1523   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1524       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1525     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1526       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1527       for (int i = 0; i < miniBatchOp.size(); i++) {
1528         Mutation m = miniBatchOp.getOperation(i);
1529         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1530           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1531           // perm check
1532           OpType opType = (m instanceof Put) ? OpType.PUT : OpType.DELETE;
1533           AuthResult authResult = null;
1534           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(), m.getFamilyCellMap(),
1535               m.getTimeStamp(), Action.WRITE)) {
1536             authResult = AuthResult.allow(opType.toString(), "Covering cell set", getActiveUser(),
1537                 Action.WRITE, table, m.getFamilyCellMap());
1538           } else {
1539             authResult = AuthResult.deny(opType.toString(), "Covering cell set", getActiveUser(),
1540                 Action.WRITE, table, m.getFamilyCellMap());
1541           }
1542           logResult(authResult);
1543           if (!authResult.isAllowed()) {
1544             throw new AccessDeniedException("Insufficient permissions "
1545                 + authResult.toContextString());
1546           }
1547         }
1548       }
1549     }
1550   }
1551 
1552   @Override
1553   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1554       final Delete delete, final WALEdit edit, final Durability durability)
1555       throws IOException {
1556     if (aclRegion) {
1557       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1558     }
1559   }
1560 
1561   @Override
1562   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1563       final byte [] row, final byte [] family, final byte [] qualifier,
1564       final CompareFilter.CompareOp compareOp,
1565       final ByteArrayComparable comparator, final Put put,
1566       final boolean result) throws IOException {
1567     // Require READ and WRITE permissions on the table, CF, and KV to update
1568     RegionCoprocessorEnvironment env = c.getEnvironment();
1569     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1570     User user = getActiveUser();
1571     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1572       Action.READ, Action.WRITE);
1573     logResult(authResult);
1574     if (!authResult.isAllowed()) {
1575       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1576         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1577       } else {
1578         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1579       }
1580     }
1581     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1582     if (bytes != null) {
1583       if (cellFeaturesEnabled) {
1584         addCellPermissions(bytes, put.getFamilyCellMap());
1585       } else {
1586         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1587       }
1588     }
1589     return result;
1590   }
1591 
1592   @Override
1593   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1594       final byte[] row, final byte[] family, final byte[] qualifier,
1595       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1596       final boolean result) throws IOException {
1597     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1598       // We had failure with table, cf and q perm checks and now giving a chance for cell
1599       // perm check
1600       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1601       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1602       AuthResult authResult = null;
1603       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1604           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1605         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1606             getActiveUser(), Action.READ, table, families);
1607       } else {
1608         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1609             getActiveUser(), Action.READ, table, families);
1610       }
1611       logResult(authResult);
1612       if (!authResult.isAllowed()) {
1613         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1614       }
1615     }
1616     return result;
1617   }
1618 
1619   @Override
1620   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1621       final byte [] row, final byte [] family, final byte [] qualifier,
1622       final CompareFilter.CompareOp compareOp,
1623       final ByteArrayComparable comparator, final Delete delete,
1624       final boolean result) throws IOException {
1625     // An ACL on a delete is useless, we shouldn't allow it
1626     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1627       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1628           delete.toString());
1629     }
1630     // Require READ and WRITE permissions on the table, CF, and the KV covered
1631     // by the delete
1632     RegionCoprocessorEnvironment env = c.getEnvironment();
1633     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1634     User user = getActiveUser();
1635     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1636       Action.READ, Action.WRITE);
1637     logResult(authResult);
1638     if (!authResult.isAllowed()) {
1639       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1640         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1641       } else {
1642         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1643       }
1644     }
1645     return result;
1646   }
1647 
1648   @Override
1649   public boolean preCheckAndDeleteAfterRowLock(
1650       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1651       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1652       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1653       throws IOException {
1654     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1655       // We had failure with table, cf and q perm checks and now giving a chance for cell
1656       // perm check
1657       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1658       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1659       AuthResult authResult = null;
1660       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1661           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1662         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1663             getActiveUser(), Action.READ, table, families);
1664       } else {
1665         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1666             getActiveUser(), Action.READ, table, families);
1667       }
1668       logResult(authResult);
1669       if (!authResult.isAllowed()) {
1670         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1671       }
1672     }
1673     return result;
1674   }
1675 
1676   @Override
1677   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1678       final byte [] row, final byte [] family, final byte [] qualifier,
1679       final long amount, final boolean writeToWAL)
1680       throws IOException {
1681     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1682     // incremented value
1683     RegionCoprocessorEnvironment env = c.getEnvironment();
1684     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1685     User user = getActiveUser();
1686     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1687       Action.WRITE);
1688     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1689       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1690         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1691       authResult.setReason("Covering cell set");
1692     }
1693     logResult(authResult);
1694     if (!authResult.isAllowed()) {
1695       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1696     }
1697     return -1;
1698   }
1699 
1700   @Override
1701   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1702       throws IOException {
1703     // Require WRITE permission to the table, CF, and the KV to be appended
1704     RegionCoprocessorEnvironment env = c.getEnvironment();
1705     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1706     User user = getActiveUser();
1707     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1708     logResult(authResult);
1709     if (!authResult.isAllowed()) {
1710       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1711         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1712       } else {
1713         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1714       }
1715     }
1716     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1717     if (bytes != null) {
1718       if (cellFeaturesEnabled) {
1719         addCellPermissions(bytes, append.getFamilyCellMap());
1720       } else {
1721         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1722       }
1723     }
1724     return null;
1725   }
1726 
1727   @Override
1728   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1729       final Append append) throws IOException {
1730     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1731       // We had failure with table, cf and q perm checks and now giving a chance for cell
1732       // perm check
1733       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1734       AuthResult authResult = null;
1735       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1736           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1737         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1738             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1739       } else {
1740         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1741             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1742       }
1743       logResult(authResult);
1744       if (!authResult.isAllowed()) {
1745         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1746       }
1747     }
1748     return null;
1749   }
1750 
1751   @Override
1752   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1753       final Increment increment)
1754       throws IOException {
1755     // Require WRITE permission to the table, CF, and the KV to be replaced by
1756     // the incremented value
1757     RegionCoprocessorEnvironment env = c.getEnvironment();
1758     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1759     User user = getActiveUser();
1760     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1761       Action.WRITE);
1762     logResult(authResult);
1763     if (!authResult.isAllowed()) {
1764       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1765         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1766       } else {
1767         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1768       }
1769     }
1770     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1771     if (bytes != null) {
1772       if (cellFeaturesEnabled) {
1773         addCellPermissions(bytes, increment.getFamilyCellMap());
1774       } else {
1775         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1776       }
1777     }
1778     return null;
1779   }
1780 
1781   @Override
1782   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1783       final Increment increment) throws IOException {
1784     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1785       // We had failure with table, cf and q perm checks and now giving a chance for cell
1786       // perm check
1787       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1788       AuthResult authResult = null;
1789       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1790           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1791         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1792             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1793       } else {
1794         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1795             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1796       }
1797       logResult(authResult);
1798       if (!authResult.isAllowed()) {
1799         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1800       }
1801     }
1802     return null;
1803   }
1804 
1805   @Override
1806   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1807       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1808     // If the HFile version is insufficient to persist tags, we won't have any
1809     // work to do here
1810     if (!cellFeaturesEnabled) {
1811       return newCell;
1812     }
1813 
1814     // Collect any ACLs from the old cell
1815     List<Tag> tags = Lists.newArrayList();
1816     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1817     if (oldCell != null) {
1818       byte[] tagBytes = CellUtil.getTagArray(oldCell);
1819       Iterator<Tag> tagIterator = CellUtil.tagsIterator(tagBytes, 0, tagBytes.length);
1820       while (tagIterator.hasNext()) {
1821         Tag tag = tagIterator.next();
1822         if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
1823           if (LOG.isTraceEnabled()) {
1824             LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
1825               " length " + tag.getValue().length);
1826           }
1827           tags.add(tag);
1828         } else {
1829           ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
1830             AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
1831               tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
1832           perms.putAll(kvPerms);
1833         }
1834       }
1835     }
1836 
1837     // Do we have an ACL on the operation?
1838     byte[] aclBytes = mutation.getACL();
1839     if (aclBytes != null) {
1840       // Yes, use it
1841       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
1842     } else {
1843       // No, use what we carried forward
1844       if (perms != null) {
1845         // TODO: If we collected ACLs from more than one tag we may have a
1846         // List<Permission> of size > 1, this can be collapsed into a single
1847         // Permission
1848         if (LOG.isTraceEnabled()) {
1849           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1850         }
1851         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
1852           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
1853       }
1854     }
1855 
1856     // If we have no tags to add, just return
1857     if (tags.isEmpty()) {
1858       return newCell;
1859     }
1860 
1861     // We need to create another KV, unfortunately, because the current new KV
1862     // has no space for tags
1863     KeyValue newKv = KeyValueUtil.ensureKeyValue(newCell);
1864     KeyValue rewriteKv = new KeyValue(newKv.getRowArray(), newKv.getRowOffset(), newKv.getRowLength(),
1865       newKv.getFamilyArray(), newKv.getFamilyOffset(), newKv.getFamilyLength(),
1866       newKv.getQualifierArray(), newKv.getQualifierOffset(), newKv.getQualifierLength(),
1867       newKv.getTimestamp(), KeyValue.Type.codeToType(newKv.getTypeByte()),
1868       newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(),
1869       tags);
1870     // Preserve mvcc data
1871     rewriteKv.setMvccVersion(newKv.getMvccVersion());
1872     return rewriteKv;
1873   }
1874 
1875   @Override
1876   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1877       final Scan scan, final RegionScanner s) throws IOException {
1878     internalPreRead(c, scan, OpType.SCAN);
1879     return s;
1880   }
1881 
1882   @Override
1883   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1884       final Scan scan, final RegionScanner s) throws IOException {
1885     User user = getActiveUser();
1886     if (user != null && user.getShortName() != null) {      // store reference to scanner owner for later checks
1887       scannerOwners.put(s, user.getShortName());
1888     }
1889     return s;
1890   }
1891 
1892   @Override
1893   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1894       final InternalScanner s, final List<Result> result,
1895       final int limit, final boolean hasNext) throws IOException {
1896     requireScannerOwner(s);
1897     return hasNext;
1898   }
1899 
1900   @Override
1901   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1902       final InternalScanner s) throws IOException {
1903     requireScannerOwner(s);
1904   }
1905 
1906   @Override
1907   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1908       final InternalScanner s) throws IOException {
1909     // clean up any associated owner mapping
1910     scannerOwners.remove(s);
1911   }
1912 
1913   /**
1914    * Verify, when servicing an RPC, that the caller is the scanner owner.
1915    * If so, we assume that access control is correctly enforced based on
1916    * the checks performed in preScannerOpen()
1917    */
1918   private void requireScannerOwner(InternalScanner s)
1919       throws AccessDeniedException {
1920     if (RequestContext.isInRequestContext()) {
1921       String requestUserName = RequestContext.getRequestUserName();
1922       String owner = scannerOwners.get(s);
1923       if (owner != null && !owner.equals(requestUserName)) {
1924         throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
1925       }
1926     }
1927   }
1928 
1929   /**
1930    * Verifies user has WRITE privileges on
1931    * the Column Families involved in the bulkLoadHFile
1932    * request. Specific Column Write privileges are presently
1933    * ignored.
1934    */
1935   @Override
1936   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1937       List<Pair<byte[], String>> familyPaths) throws IOException {
1938     for(Pair<byte[],String> el : familyPaths) {
1939       requirePermission("preBulkLoadHFile",
1940           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
1941           el.getFirst(),
1942           null,
1943           Action.CREATE);
1944     }
1945   }
1946 
1947   private AuthResult hasSomeAccess(RegionCoprocessorEnvironment e, String method, Action action) throws IOException {
1948     User requestUser = getActiveUser();
1949     TableName tableName = e.getRegion().getTableDesc().getTableName();
1950     AuthResult authResult = permissionGranted(method, requestUser,
1951         action, e, Collections.EMPTY_MAP);
1952     if (!authResult.isAllowed()) {
1953       for(UserPermission userPerm:
1954           AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), tableName)) {
1955         for(Action userAction: userPerm.getActions()) {
1956           if(userAction.equals(action)) {
1957             return AuthResult.allow(method, "Access allowed", requestUser,
1958                 action, tableName, null, null);
1959           }
1960         }
1961       }
1962     }
1963     return authResult;
1964   }
1965 
1966   /**
1967    * Authorization check for
1968    * SecureBulkLoadProtocol.prepareBulkLoad()
1969    * @param e
1970    * @throws IOException
1971    */
1972   //TODO this should end up as a coprocessor hook
1973   public void prePrepareBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
1974     AuthResult authResult = hasSomeAccess(e, "prePrepareBulkLoad", Action.WRITE);
1975     logResult(authResult);
1976     if (!authResult.isAllowed()) {
1977       throw new AccessDeniedException("Insufficient permissions (table=" +
1978         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
1979     }
1980   }
1981 
1982   /**
1983    * Authorization security check for
1984    * SecureBulkLoadProtocol.cleanupBulkLoad()
1985    * @param e
1986    * @throws IOException
1987    */
1988   //TODO this should end up as a coprocessor hook
1989   public void preCleanupBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
1990     AuthResult authResult = hasSomeAccess(e, "preCleanupBulkLoad", Action.WRITE);
1991     logResult(authResult);
1992     if (!authResult.isAllowed()) {
1993       throw new AccessDeniedException("Insufficient permissions (table=" +
1994         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
1995     }
1996   }
1997 
1998   /* ---- EndpointObserver implementation ---- */
1999 
2000   @Override
2001   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2002       Service service, String methodName, Message request) throws IOException {
2003     // Don't intercept calls to our own AccessControlService, we check for
2004     // appropriate permissions in the service handlers
2005     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2006       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
2007         methodName + ")",
2008         getTableName(ctx.getEnvironment()), null, null,
2009         Action.EXEC);
2010     }
2011     return request;
2012   }
2013 
2014   @Override
2015   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2016       Service service, String methodName, Message request, Message.Builder responseBuilder)
2017       throws IOException { }
2018 
2019   /* ---- Protobuf AccessControlService implementation ---- */
2020 
2021   @Override
2022   public void grant(RpcController controller,
2023                     AccessControlProtos.GrantRequest request,
2024                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2025     UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2026     AccessControlProtos.GrantResponse response = null;
2027     try {
2028       // verify it's only running at .acl.
2029       if (aclRegion) {
2030         if (!initialized) {
2031           throw new CoprocessorException("AccessController not yet initialized");
2032         }
2033         if (LOG.isDebugEnabled()) {
2034           LOG.debug("Received request to grant access permission " + perm.toString());
2035         }
2036 
2037         switch(request.getUserPermission().getPermission().getType()) {
2038           case Global :
2039           case Table :
2040             requirePermission("grant", perm.getTableName(), perm.getFamily(),
2041                 perm.getQualifier(), Action.ADMIN);
2042             break;
2043           case Namespace :
2044             requireGlobalPermission("grant", Action.ADMIN, perm.getNamespace());
2045         }
2046 
2047         AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2048         if (AUDITLOG.isTraceEnabled()) {
2049           // audit log should store permission changes in addition to auth results
2050           AUDITLOG.trace("Granted permission " + perm.toString());
2051         }
2052       } else {
2053         throw new CoprocessorException(AccessController.class, "This method "
2054             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2055       }
2056       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2057     } catch (IOException ioe) {
2058       // pass exception back up
2059       ResponseConverter.setControllerException(controller, ioe);
2060     }
2061     done.run(response);
2062   }
2063 
2064   @Override
2065   public void revoke(RpcController controller,
2066                      AccessControlProtos.RevokeRequest request,
2067                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2068     UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2069     AccessControlProtos.RevokeResponse response = null;
2070     try {
2071       // only allowed to be called on _acl_ region
2072       if (aclRegion) {
2073         if (!initialized) {
2074           throw new CoprocessorException("AccessController not yet initialized");
2075         }
2076         if (LOG.isDebugEnabled()) {
2077           LOG.debug("Received request to revoke access permission " + perm.toString());
2078         }
2079 
2080         switch(request.getUserPermission().getPermission().getType()) {
2081           case Global :
2082           case Table :
2083             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2084                               perm.getQualifier(), Action.ADMIN);
2085             break;
2086           case Namespace :
2087             requireGlobalPermission("revoke", Action.ADMIN, perm.getNamespace());
2088         }
2089 
2090         AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2091         if (AUDITLOG.isTraceEnabled()) {
2092           // audit log should record all permission changes
2093           AUDITLOG.trace("Revoked permission " + perm.toString());
2094         }
2095       } else {
2096         throw new CoprocessorException(AccessController.class, "This method "
2097             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2098       }
2099       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2100     } catch (IOException ioe) {
2101       // pass exception back up
2102       ResponseConverter.setControllerException(controller, ioe);
2103     }
2104     done.run(response);
2105   }
2106 
2107   @Override
2108   public void getUserPermissions(RpcController controller,
2109                                  AccessControlProtos.GetUserPermissionsRequest request,
2110                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2111     AccessControlProtos.GetUserPermissionsResponse response = null;
2112     try {
2113       // only allowed to be called on _acl_ region
2114       if (aclRegion) {
2115         if (!initialized) {
2116           throw new CoprocessorException("AccessController not yet initialized");
2117         }
2118         List<UserPermission> perms = null;
2119         if(request.getType() == AccessControlProtos.Permission.Type.Table) {
2120           TableName table = null;
2121           if (request.hasTableName()) {
2122             table = ProtobufUtil.toTableName(request.getTableName());
2123           }
2124           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2125 
2126           perms = AccessControlLists.getUserTablePermissions(
2127               regionEnv.getConfiguration(), table);
2128         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2129           perms = AccessControlLists.getUserNamespacePermissions(
2130               regionEnv.getConfiguration(), request.getNamespaceName().toStringUtf8());
2131         } else {
2132           perms = AccessControlLists.getUserPermissions(
2133               regionEnv.getConfiguration(), null);
2134         }
2135         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2136       } else {
2137         throw new CoprocessorException(AccessController.class, "This method "
2138             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2139       }
2140     } catch (IOException ioe) {
2141       // pass exception back up
2142       ResponseConverter.setControllerException(controller, ioe);
2143     }
2144     done.run(response);
2145   }
2146 
2147   @Override
2148   public void checkPermissions(RpcController controller,
2149                                AccessControlProtos.CheckPermissionsRequest request,
2150                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2151     Permission[] permissions = new Permission[request.getPermissionCount()];
2152     for (int i=0; i < request.getPermissionCount(); i++) {
2153       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2154     }
2155     AccessControlProtos.CheckPermissionsResponse response = null;
2156     try {
2157       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2158       for (Permission permission : permissions) {
2159         if (permission instanceof TablePermission) {
2160           TablePermission tperm = (TablePermission) permission;
2161           for (Action action : permission.getActions()) {
2162             if (!tperm.getTableName().equals(tableName)) {
2163               throw new CoprocessorException(AccessController.class, String.format("This method "
2164                   + "can only execute at the table specified in TablePermission. " +
2165                   "Table of the region:%s , requested table:%s", tableName,
2166                   tperm.getTableName()));
2167             }
2168 
2169             Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2170             if (tperm.getFamily() != null) {
2171               if (tperm.getQualifier() != null) {
2172                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2173                 qualifiers.add(tperm.getQualifier());
2174                 familyMap.put(tperm.getFamily(), qualifiers);
2175               } else {
2176                 familyMap.put(tperm.getFamily(), null);
2177               }
2178             }
2179 
2180             requirePermission("checkPermissions", action, regionEnv, familyMap);
2181           }
2182 
2183         } else {
2184           for (Action action : permission.getActions()) {
2185             requirePermission("checkPermissions", action);
2186           }
2187         }
2188       }
2189       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2190     } catch (IOException ioe) {
2191       ResponseConverter.setControllerException(controller, ioe);
2192     }
2193     done.run(response);
2194   }
2195 
2196   @Override
2197   public Service getService() {
2198     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2199   }
2200 
2201   private HRegion getRegion(RegionCoprocessorEnvironment e) {
2202     return e.getRegion();
2203   }
2204 
2205   private TableName getTableName(RegionCoprocessorEnvironment e) {
2206     HRegion region = e.getRegion();
2207     if (region != null) {
2208       return getTableName(region);
2209     }
2210     return null;
2211   }
2212 
2213   private TableName getTableName(HRegion region) {
2214     HRegionInfo regionInfo = region.getRegionInfo();
2215     if (regionInfo != null) {
2216       return regionInfo.getTable();
2217     }
2218     return null;
2219   }
2220 
2221   @Override
2222   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2223       throws IOException {
2224     requirePermission("preClose", Action.ADMIN);
2225   }
2226 
2227   private void isSystemOrSuperUser(Configuration conf) throws IOException {
2228     User user = userProvider.getCurrent();
2229     if (user == null) {
2230       throw new IOException("Unable to obtain the current user, " +
2231         "authorization checks for internal operations will not work correctly!");
2232     }
2233 
2234     String currentUser = user.getShortName();
2235     List<String> superusers = Lists.asList(currentUser, conf.getStrings(
2236       AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
2237 
2238     User activeUser = getActiveUser();
2239     if (!(superusers.contains(activeUser.getShortName()))) {
2240       throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null") +
2241         "is not system or super user.");
2242     }
2243   }
2244 
2245   @Override
2246   public void preStopRegionServer(
2247       ObserverContext<RegionServerCoprocessorEnvironment> env)
2248       throws IOException {
2249     requirePermission("preStopRegionServer", Action.ADMIN);
2250   }
2251 
2252   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2253       byte[] qualifier) {
2254     if (family == null) {
2255       return null;
2256     }
2257 
2258     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2259     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2260     return familyMap;
2261   }
2262 
2263   @Override
2264   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2265       List<TableName> tableNamesList,
2266       List<HTableDescriptor> descriptors) throws IOException {
2267     // If the list is empty, this is a request for all table descriptors and requires GLOBAL
2268     // ADMIN privs.
2269     if (tableNamesList == null || tableNamesList.isEmpty()) {
2270       requireGlobalPermission("getTableDescriptors", Action.ADMIN, null, null);
2271     }
2272     // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2273     // request can be granted.
2274     else {
2275       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2276       for (TableName tableName: tableNamesList) {
2277         // Do not deny if the table does not exist
2278         try {
2279           masterServices.checkTableModifiable(tableName);
2280         } catch (TableNotFoundException ex) {
2281           // Skip checks for a table that does not exist
2282           continue;
2283         } catch (TableNotDisabledException ex) {
2284           // We don't care about this
2285         }
2286         requirePermission("getTableDescriptors", tableName, null, null,
2287           Action.ADMIN, Action.CREATE);
2288       }
2289     }
2290   }
2291 
2292   @Override
2293   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2294       List<HTableDescriptor> descriptors) throws IOException {
2295   }
2296 
2297   @Override
2298   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA,
2299       HRegion regionB) throws IOException {
2300     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2301       Action.ADMIN);
2302   }
2303 
2304   @Override
2305   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, HRegion regionA,
2306       HRegion regionB, HRegion mergedRegion) throws IOException { }
2307 
2308   @Override
2309   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2310       HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException { }
2311 
2312   @Override
2313   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2314       HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException { }
2315 
2316   @Override
2317   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2318       HRegion regionA, HRegion regionB) throws IOException { }
2319 
2320   @Override
2321   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2322       HRegion regionA, HRegion regionB) throws IOException { }
2323 }