View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package org.apache.hadoop.hbase.security.access;
16  
17  import java.io.IOException;
18  import java.net.InetAddress;
19  import java.util.Collection;
20  import java.util.Collections;
21  import java.util.HashMap;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  import java.util.Set;
27  import java.util.TreeMap;
28  import java.util.TreeSet;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.classification.InterfaceAudience;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.CellScanner;
36  import org.apache.hadoop.hbase.CellUtil;
37  import org.apache.hadoop.hbase.CompoundConfiguration;
38  import org.apache.hadoop.hbase.CoprocessorEnvironment;
39  import org.apache.hadoop.hbase.DoNotRetryIOException;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.KeyValue.Type;
46  import org.apache.hadoop.hbase.KeyValueUtil;
47  import org.apache.hadoop.hbase.NamespaceDescriptor;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.TableNotDisabledException;
51  import org.apache.hadoop.hbase.TableNotFoundException;
52  import org.apache.hadoop.hbase.Tag;
53  import org.apache.hadoop.hbase.MetaTableAccessor;
54  import org.apache.hadoop.hbase.client.Append;
55  import org.apache.hadoop.hbase.client.Delete;
56  import org.apache.hadoop.hbase.client.Durability;
57  import org.apache.hadoop.hbase.client.Get;
58  import org.apache.hadoop.hbase.client.Increment;
59  import org.apache.hadoop.hbase.client.Mutation;
60  import org.apache.hadoop.hbase.client.Put;
61  import org.apache.hadoop.hbase.client.Query;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.client.Scan;
64  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
65  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
66  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
67  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
68  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
69  import org.apache.hadoop.hbase.coprocessor.MasterObserver;
70  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
71  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
72  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
73  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
74  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
75  import org.apache.hadoop.hbase.filter.CompareFilter;
76  import org.apache.hadoop.hbase.filter.Filter;
77  import org.apache.hadoop.hbase.filter.FilterList;
78  import org.apache.hadoop.hbase.io.hfile.HFile;
79  import org.apache.hadoop.hbase.ipc.RequestContext;
80  import org.apache.hadoop.hbase.master.MasterServices;
81  import org.apache.hadoop.hbase.master.RegionPlan;
82  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
83  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
84  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
85  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
86  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
87  import org.apache.hadoop.hbase.regionserver.HRegion;
88  import org.apache.hadoop.hbase.regionserver.InternalScanner;
89  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
90  import org.apache.hadoop.hbase.regionserver.RegionScanner;
91  import org.apache.hadoop.hbase.regionserver.ScanType;
92  import org.apache.hadoop.hbase.regionserver.Store;
93  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
94  import org.apache.hadoop.hbase.security.AccessDeniedException;
95  import org.apache.hadoop.hbase.security.User;
96  import org.apache.hadoop.hbase.security.UserProvider;
97  import org.apache.hadoop.hbase.security.access.Permission.Action;
98  import org.apache.hadoop.hbase.util.ByteRange;
99  import org.apache.hadoop.hbase.util.Bytes;
100 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
101 import org.apache.hadoop.hbase.util.Pair;
102 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
103 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
104 
105 import com.google.common.collect.ArrayListMultimap;
106 import com.google.common.collect.ImmutableSet;
107 import com.google.common.collect.ListMultimap;
108 import com.google.common.collect.Lists;
109 import com.google.common.collect.MapMaker;
110 import com.google.common.collect.Maps;
111 import com.google.common.collect.Sets;
112 import com.google.protobuf.Message;
113 import com.google.protobuf.RpcCallback;
114 import com.google.protobuf.RpcController;
115 import com.google.protobuf.Service;
116 
117 /**
118  * Provides basic authorization checks for data access and administrative
119  * operations.
120  *
121  * <p>
122  * {@code AccessController} performs authorization checks for HBase operations
123  * based on:
124  * <ul>
125  *   <li>the identity of the user performing the operation</li>
126  *   <li>the scope over which the operation is performed, in increasing
127  *   specificity: global, table, column family, or qualifier</li>
128  *   <li>the type of action being performed (as mapped to
129  *   {@link Permission.Action} values)</li>
130  * </ul>
131  * If the authorization check fails, an {@link AccessDeniedException}
132  * will be thrown for the operation.
133  * </p>
134  *
135  * <p>
136  * To perform authorization checks, {@code AccessController} relies on the
137  * RpcServerEngine being loaded to provide
138  * the user identities for remote requests.
139  * </p>
140  *
141  * <p>
142  * The access control lists used for authorization can be manipulated via the
143  * exposed {@link AccessControlService} Interface implementation, and the associated
144  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
145  * commands.
146  * </p>
147  */
148 @InterfaceAudience.Private
149 public class AccessController extends BaseRegionObserver
150     implements MasterObserver, RegionServerObserver,
151       AccessControlService.Interface, CoprocessorService, EndpointObserver {
152 
153   public static final Log LOG = LogFactory.getLog(AccessController.class);
154 
155   private static final Log AUDITLOG =
156     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
157   private static final String CHECK_COVERING_PERM = "check_covering_perm";
158   private static final String TAG_CHECK_PASSED = "tag_check_passed";
159   private static final byte[] TRUE = Bytes.toBytes(true);
160 
161   TableAuthManager authManager = null;
162 
163   // flags if we are running on a region of the _acl_ table
164   boolean aclRegion = false;
165 
166   // defined only for Endpoint implementation, so it can have way to
167   // access region services.
168   private RegionCoprocessorEnvironment regionEnv;
169 
170   /** Mapping of scanner instances to the user who created them */
171   private Map<InternalScanner,String> scannerOwners =
172       new MapMaker().weakKeys().makeMap();
173 
174   // Provider for mapping principal names to Users
175   private UserProvider userProvider;
176 
177   // The list of users with superuser authority
178   private List<String> superusers;
179 
180   // if we are able to support cell ACLs
181   boolean cellFeaturesEnabled;
182 
183   // if we should check EXEC permissions
184   boolean shouldCheckExecPermission;
185 
186   // if we should terminate access checks early as soon as table or CF grants
187   // allow access; pre-0.98 compatible behavior
188   boolean compatibleEarlyTermination;
189 
190   private volatile boolean initialized = false;
191 
192   // This boolean having relevance only in the Master.
193   private volatile boolean aclTabAvailable = false;
194 
195   public HRegion getRegion() {
196     return regionEnv != null ? regionEnv.getRegion() : null;
197   }
198 
199   public TableAuthManager getAuthManager() {
200     return authManager;
201   }
202 
203   void initialize(RegionCoprocessorEnvironment e) throws IOException {
204     final HRegion region = e.getRegion();
205     Configuration conf = e.getConfiguration();
206     Map<byte[], ListMultimap<String,TablePermission>> tables =
207         AccessControlLists.loadAll(region);
208     // For each table, write out the table's permissions to the respective
209     // znode for that table.
210     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
211       tables.entrySet()) {
212       byte[] entry = t.getKey();
213       ListMultimap<String,TablePermission> perms = t.getValue();
214       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
215       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
216     }
217     initialized = true;
218   }
219 
220   /**
221    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
222    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
223    * table updates.
224    */
225   void updateACL(RegionCoprocessorEnvironment e,
226       final Map<byte[], List<Cell>> familyMap) {
227     Set<byte[]> entries =
228         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
229     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
230       List<Cell> cells = f.getValue();
231       for (Cell cell: cells) {
232         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
233         if (Bytes.equals(kv.getFamilyArray(), kv.getFamilyOffset(),
234             kv.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
235             AccessControlLists.ACL_LIST_FAMILY.length)) {
236           entries.add(kv.getRow());
237         }
238       }
239     }
240     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
241     Configuration conf = regionEnv.getConfiguration();
242     for (byte[] entry: entries) {
243       try {
244         ListMultimap<String,TablePermission> perms =
245           AccessControlLists.getPermissions(conf, entry);
246         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
247         zkw.writeToZookeeper(entry, serialized);
248       } catch (IOException ex) {
249         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
250             ex);
251       }
252     }
253   }
254 
255   /**
256    * Check the current user for authorization to perform a specific action
257    * against the given set of row data.
258    *
259    * <p>Note: Ordering of the authorization checks
260    * has been carefully optimized to short-circuit the most common requests
261    * and minimize the amount of processing required.</p>
262    *
263    * @param permRequest the action being requested
264    * @param e the coprocessor environment
265    * @param families the map of column families to qualifiers present in
266    * the request
267    * @return an authorization result
268    */
269   AuthResult permissionGranted(String request, User user, Action permRequest,
270       RegionCoprocessorEnvironment e,
271       Map<byte [], ? extends Collection<?>> families) {
272     HRegionInfo hri = e.getRegion().getRegionInfo();
273     TableName tableName = hri.getTable();
274 
275     // 1. All users need read access to hbase:meta table.
276     // this is a very common operation, so deal with it quickly.
277     if (hri.isMetaRegion()) {
278       if (permRequest == Action.READ) {
279         return AuthResult.allow(request, "All users allowed", user,
280           permRequest, tableName, families);
281       }
282     }
283 
284     if (user == null) {
285       return AuthResult.deny(request, "No user associated with request!", null,
286         permRequest, tableName, families);
287     }
288 
289     // Users with CREATE/ADMIN rights need to modify hbase:meta and _acl_ table
290     // e.g. When a new table is created a new entry in hbase:meta is added,
291     // so the user need to be allowed to write on it.
292     // e.g. When a table is removed an entry is removed from hbase:meta and _acl_
293     // and the user need to be allowed to write on both tables.
294     if (permRequest == Action.WRITE &&
295        (hri.isMetaRegion() ||
296         Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) &&
297        (authManager.authorize(user, Action.CREATE) ||
298         authManager.authorize(user, Action.ADMIN)))
299     {
300        return AuthResult.allow(request, "Table permission granted", user,
301         permRequest, tableName, families);
302     }
303 
304     // 2. check for the table-level, if successful we can short-circuit
305     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
306       return AuthResult.allow(request, "Table permission granted", user,
307         permRequest, tableName, families);
308     }
309 
310     // 3. check permissions against the requested families
311     if (families != null && families.size() > 0) {
312       // all families must pass
313       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
314         // a) check for family level access
315         if (authManager.authorize(user, tableName, family.getKey(),
316             permRequest)) {
317           continue;  // family-level permission overrides per-qualifier
318         }
319 
320         // b) qualifier level access can still succeed
321         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
322           if (family.getValue() instanceof Set) {
323             // for each qualifier of the family
324             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
325             for (byte[] qualifier : familySet) {
326               if (!authManager.authorize(user, tableName, family.getKey(),
327                                          qualifier, permRequest)) {
328                 return AuthResult.deny(request, "Failed qualifier check", user,
329                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
330               }
331             }
332           } else if (family.getValue() instanceof List) { // List<KeyValue>
333             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
334             for (KeyValue kv : kvList) {
335               if (!authManager.authorize(user, tableName, family.getKey(),
336                       kv.getQualifier(), permRequest)) {
337                 return AuthResult.deny(request, "Failed qualifier check", user,
338                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
339               }
340             }
341           }
342         } else {
343           // no qualifiers and family-level check already failed
344           return AuthResult.deny(request, "Failed family check", user, permRequest,
345               tableName, makeFamilyMap(family.getKey(), null));
346         }
347       }
348 
349       // all family checks passed
350       return AuthResult.allow(request, "All family checks passed", user, permRequest,
351           tableName, families);
352     }
353 
354     // 4. no families to check and table level access failed
355     return AuthResult.deny(request, "No families to check and table permission failed",
356         user, permRequest, tableName, families);
357   }
358 
359   /**
360    * Check the current user for authorization to perform a specific action
361    * against the given set of row data.
362    * @param opType the operation type
363    * @param user the user
364    * @param e the coprocessor environment
365    * @param families the map of column families to qualifiers present in
366    * the request
367    * @param actions the desired actions
368    * @return an authorization result
369    */
370   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
371       Map<byte [], ? extends Collection<?>> families, Action... actions) {
372     AuthResult result = null;
373     for (Action action: actions) {
374       result = permissionGranted(opType.toString(), user, action, e, families);
375       if (!result.isAllowed()) {
376         return result;
377       }
378     }
379     return result;
380   }
381 
382   private void logResult(AuthResult result) {
383     if (AUDITLOG.isTraceEnabled()) {
384       RequestContext ctx = RequestContext.get();
385       InetAddress remoteAddr = null;
386       if (ctx != null) {
387         remoteAddr = ctx.getRemoteAddress();
388       }
389       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
390           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
391           "; reason: " + result.getReason() +
392           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
393           "; request: " + result.getRequest() +
394           "; context: " + result.toContextString());
395     }
396   }
397 
398   /**
399    * Returns the active user to which authorization checks should be applied.
400    * If we are in the context of an RPC call, the remote user is used,
401    * otherwise the currently logged in user is used.
402    */
403   private User getActiveUser() throws IOException {
404     User user = RequestContext.getRequestUser();
405     if (!RequestContext.isInRequestContext()) {
406       // for non-rpc handling, fallback to system user
407       user = userProvider.getCurrent();
408     }
409     return user;
410   }
411 
412   /**
413    * Authorizes that the current user has any of the given permissions for the
414    * given table, column family and column qualifier.
415    * @param tableName Table requested
416    * @param family Column family requested
417    * @param qualifier Column qualifier requested
418    * @throws IOException if obtaining the current user fails
419    * @throws AccessDeniedException if user has no authorization
420    */
421   private void requirePermission(String request, TableName tableName, byte[] family, byte[] qualifier,
422       Action... permissions) throws IOException {
423     User user = getActiveUser();
424     AuthResult result = null;
425 
426     for (Action permission : permissions) {
427       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
428         result = AuthResult.allow(request, "Table permission granted", user,
429                                   permission, tableName, family, qualifier);
430         break;
431       } else {
432         // rest of the world
433         result = AuthResult.deny(request, "Insufficient permissions", user,
434                                  permission, tableName, family, qualifier);
435       }
436     }
437     logResult(result);
438     if (!result.isAllowed()) {
439       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
440     }
441   }
442 
443   /**
444    * Authorizes that the current user has global privileges for the given action.
445    * @param perm The action being requested
446    * @throws IOException if obtaining the current user fails
447    * @throws AccessDeniedException if authorization is denied
448    */
449   private void requirePermission(String request, Action perm) throws IOException {
450     requireGlobalPermission(request, perm, null, null);
451   }
452 
453   /**
454    * Authorizes that the current user has permission to perform the given
455    * action on the set of table column families.
456    * @param perm Action that is required
457    * @param env The current coprocessor environment
458    * @param families The map of column families-qualifiers.
459    * @throws AccessDeniedException if the authorization check failed
460    */
461   private void requirePermission(String request, Action perm,
462         RegionCoprocessorEnvironment env,
463         Map<byte[], ? extends Collection<?>> families)
464       throws IOException {
465     User user = getActiveUser();
466     AuthResult result = permissionGranted(request, user, perm, env, families);
467     logResult(result);
468 
469     if (!result.isAllowed()) {
470       throw new AccessDeniedException("Insufficient permissions (table=" +
471         env.getRegion().getTableDesc().getTableName()+
472         ((families != null && families.size() > 0) ? ", family: " +
473         result.toFamilyString() : "") + ", action=" +
474         perm.toString() + ")");
475     }
476   }
477 
478   /**
479    * Checks that the user has the given global permission. The generated
480    * audit log message will contain context information for the operation
481    * being authorized, based on the given parameters.
482    * @param perm Action being requested
483    * @param tableName Affected table name.
484    * @param familyMap Affected column families.
485    */
486   private void requireGlobalPermission(String request, Action perm, TableName tableName,
487       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
488     User user = getActiveUser();
489     if (authManager.authorize(user, perm)) {
490       logResult(AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap));
491     } else {
492       logResult(AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap));
493       throw new AccessDeniedException("Insufficient permissions for user '" +
494           (user != null ? user.getShortName() : "null") +"' (global, action=" +
495           perm.toString() + ")");
496     }
497   }
498 
499   /**
500    * Checks that the user has the given global permission. The generated
501    * audit log message will contain context information for the operation
502    * being authorized, based on the given parameters.
503    * @param perm Action being requested
504    * @param namespace
505    */
506   private void requireGlobalPermission(String request, Action perm,
507                                        String namespace) throws IOException {
508     User user = getActiveUser();
509     if (authManager.authorize(user, perm)) {
510       logResult(AuthResult.allow(request, "Global check allowed", user, perm, namespace));
511     } else {
512       logResult(AuthResult.deny(request, "Global check failed", user, perm, namespace));
513       throw new AccessDeniedException("Insufficient permissions for user '" +
514           (user != null ? user.getShortName() : "null") +"' (global, action=" +
515           perm.toString() + ")");
516     }
517   }
518 
519   /**
520    * Returns <code>true</code> if the current user is allowed the given action
521    * over at least one of the column qualifiers in the given column families.
522    */
523   private boolean hasFamilyQualifierPermission(User user,
524       Action perm,
525       RegionCoprocessorEnvironment env,
526       Map<byte[], ? extends Collection<byte[]>> familyMap)
527     throws IOException {
528     HRegionInfo hri = env.getRegion().getRegionInfo();
529     TableName tableName = hri.getTable();
530 
531     if (user == null) {
532       return false;
533     }
534 
535     if (familyMap != null && familyMap.size() > 0) {
536       // at least one family must be allowed
537       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
538           familyMap.entrySet()) {
539         if (family.getValue() != null && !family.getValue().isEmpty()) {
540           for (byte[] qualifier : family.getValue()) {
541             if (authManager.matchPermission(user, tableName,
542                 family.getKey(), qualifier, perm)) {
543               return true;
544             }
545           }
546         } else {
547           if (authManager.matchPermission(user, tableName, family.getKey(),
548               perm)) {
549             return true;
550           }
551         }
552       }
553     } else if (LOG.isDebugEnabled()) {
554       LOG.debug("Empty family map passed for permission check");
555     }
556 
557     return false;
558   }
559 
560   private enum OpType {
561     GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
562     GET("get"),
563     EXISTS("exists"),
564     SCAN("scan"),
565     PUT("put"),
566     DELETE("delete"),
567     CHECK_AND_PUT("checkAndPut"),
568     CHECK_AND_DELETE("checkAndDelete"),
569     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
570     APPEND("append"),
571     INCREMENT("increment");
572 
573     private String type;
574 
575     private OpType(String type) {
576       this.type = type;
577     }
578 
579     @Override
580     public String toString() {
581       return type;
582     }
583   }
584 
585   /**
586    * Determine if cell ACLs covered by the operation grant access. This is expensive.
587    * @return false if cell ACLs failed to grant access, true otherwise
588    * @throws IOException
589    */
590   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
591       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
592       throws IOException {
593     if (!cellFeaturesEnabled) {
594       return false;
595     }
596     long cellGrants = 0;
597     User user = getActiveUser();
598     long latestCellTs = 0;
599     Get get = new Get(row);
600     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
601     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
602     // version. We have to get every cell version and check its TS against the TS asked for in
603     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
604     // consider only one such passing cell. In case of Delete we have to consider all the cell
605     // versions under this passing version. When Delete Mutation contains columns which are a
606     // version delete just consider only one version for those column cells.
607     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
608     if (considerCellTs) {
609       get.setMaxVersions();
610     } else {
611       get.setMaxVersions(1);
612     }
613     boolean diffCellTsFromOpTs = false;
614     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
615       byte[] col = entry.getKey();
616       // TODO: HBASE-7114 could possibly unify the collection type in family
617       // maps so we would not need to do this
618       if (entry.getValue() instanceof Set) {
619         Set<byte[]> set = (Set<byte[]>)entry.getValue();
620         if (set == null || set.isEmpty()) {
621           get.addFamily(col);
622         } else {
623           for (byte[] qual: set) {
624             get.addColumn(col, qual);
625           }
626         }
627       } else if (entry.getValue() instanceof List) {
628         List<Cell> list = (List<Cell>)entry.getValue();
629         if (list == null || list.isEmpty()) {
630           get.addFamily(col);
631         } else {
632           // In case of family delete, a Cell will be added into the list with Qualifier as null.
633           for (Cell cell : list) {
634             if (cell.getQualifierLength() == 0
635                 && (cell.getTypeByte() == Type.DeleteFamily.getCode() 
636                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
637               get.addFamily(col);
638             } else {
639               get.addColumn(col, CellUtil.cloneQualifier(cell));
640             }
641             if (considerCellTs) {
642               long cellTs = cell.getTimestamp();
643               latestCellTs = Math.max(latestCellTs, cellTs);
644               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
645             }
646           }
647         }
648       } else {
649         throw new RuntimeException("Unhandled collection type " +
650           entry.getValue().getClass().getName());
651       }
652     }
653     // We want to avoid looking into the future. So, if the cells of the
654     // operation specify a timestamp, or the operation itself specifies a
655     // timestamp, then we use the maximum ts found. Otherwise, we bound
656     // the Get to the current server time. We add 1 to the timerange since
657     // the upper bound of a timerange is exclusive yet we need to examine
658     // any cells found there inclusively.
659     long latestTs = Math.max(opTs, latestCellTs);
660     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
661       latestTs = EnvironmentEdgeManager.currentTimeMillis();
662     }
663     get.setTimeRange(0, latestTs + 1);
664     // In case of Put operation we set to read all versions. This was done to consider the case
665     // where columns are added with TS other than the Mutation TS. But normally this wont be the
666     // case with Put. There no need to get all versions but get latest version only.
667     if (!diffCellTsFromOpTs && request == OpType.PUT) {
668       get.setMaxVersions(1);
669     }
670     if (LOG.isTraceEnabled()) {
671       LOG.trace("Scanning for cells with " + get);
672     }
673     // This Map is identical to familyMap. The key is a BR rather than byte[].
674     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
675     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
676     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
677     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
678       if (entry.getValue() instanceof List) {
679         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
680       }
681     }
682     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
683     List<Cell> cells = Lists.newArrayList();
684     Cell prevCell = null;
685     ByteRange curFam = new SimpleMutableByteRange();
686     boolean curColAllVersions = (request == OpType.DELETE);
687     long curColCheckTs = opTs;
688     boolean foundColumn = false;
689     try {
690       boolean more = false;
691       do {
692         cells.clear();
693         // scan with limit as 1 to hold down memory use on wide rows
694         more = scanner.next(cells, 1);
695         for (Cell cell: cells) {
696           if (LOG.isTraceEnabled()) {
697             LOG.trace("Found cell " + cell);
698           }
699           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
700           if (colChange) foundColumn = false;
701           prevCell = cell;
702           if (!curColAllVersions && foundColumn) {
703             continue;
704           }
705           if (colChange && considerCellTs) {
706             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
707             List<Cell> cols = familyMap1.get(curFam);
708             for (Cell col : cols) {
709               // null/empty qualifier is used to denote a Family delete. The TS and delete type
710               // associated with this is applicable for all columns within the family. That is
711               // why the below (col.getQualifierLength() == 0) check.
712               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
713                   || CellUtil.matchingQualifier(cell, col)) {
714                 byte type = col.getTypeByte();
715                 if (considerCellTs) {
716                   curColCheckTs = col.getTimestamp();
717                 }
718                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
719                 // a version delete for a column no need to check all the covering cells within
720                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
721                 // One version delete types are Delete/DeleteFamilyVersion
722                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
723                     || (KeyValue.Type.DeleteFamily.getCode() == type);
724                 break;
725               }
726             }
727           }
728           if (cell.getTimestamp() > curColCheckTs) {
729             // Just ignore this cell. This is not a covering cell.
730             continue;
731           }
732           foundColumn = true;
733           for (Action action: actions) {
734             // Are there permissions for this user for the cell?
735             if (!authManager.authorize(user, getTableName(e), cell, action)) {
736               // We can stop if the cell ACL denies access
737               return false;
738             }
739           }
740           cellGrants++;
741         }
742       } while (more);
743     } catch (AccessDeniedException ex) {
744       throw ex;
745     } catch (IOException ex) {
746       LOG.error("Exception while getting cells to calculate covering permission", ex);
747     } finally {
748       scanner.close();
749     }
750     // We should not authorize unless we have found one or more cell ACLs that
751     // grant access. This code is used to check for additional permissions
752     // after no table or CF grants are found.
753     return cellGrants > 0;
754   }
755 
756   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
757     // Iterate over the entries in the familyMap, replacing the cells therein
758     // with new cells including the ACL data
759     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
760       List<Cell> newCells = Lists.newArrayList();
761       for (Cell cell: e.getValue()) {
762         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
763         Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
764             cell.getTagsOffset(), cell.getTagsLength());
765         while (tagIterator.hasNext()) {
766           tags.add(tagIterator.next());
767         }
768         // Ensure KeyValue so we can do a scatter gather copy. This is only a win if the
769         // incoming cell type is actually KeyValue.
770         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
771         newCells.add(
772           new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
773             kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(),
774             kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
775             kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()),
776             kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(),
777             tags));
778       }
779       // This is supposed to be safe, won't CME
780       e.setValue(newCells);
781     }
782   }
783 
784   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
785   // type is reserved and should not be explicitly set by user.
786   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
787     // Superusers are allowed to store cells unconditionally.
788     if (superusers.contains(user.getShortName())) {
789       return;
790     }
791     // We already checked (prePut vs preBatchMutation)
792     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
793       return;
794     }
795     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
796       Cell cell = cellScanner.current();
797       if (cell.getTagsLength() > 0) {
798         Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
799           cell.getTagsLength());
800         while (tagsItr.hasNext()) {
801           if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
802             throw new AccessDeniedException("Mutation contains cell with reserved type tag");
803           }
804         }
805       }
806     }
807     m.setAttribute(TAG_CHECK_PASSED, TRUE);
808   }
809 
810   /* ---- MasterObserver implementation ---- */
811 
812   public void start(CoprocessorEnvironment env) throws IOException {
813     CompoundConfiguration conf = new CompoundConfiguration();
814     conf.add(env.getConfiguration());
815 
816     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
817       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
818 
819     cellFeaturesEnabled = HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS;
820     if (!cellFeaturesEnabled) {
821       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
822           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
823           + " accordingly.");
824     }
825 
826     ZooKeeperWatcher zk = null;
827     if (env instanceof MasterCoprocessorEnvironment) {
828       // if running on HMaster
829       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
830       zk = mEnv.getMasterServices().getZooKeeper();
831     } else if (env instanceof RegionServerCoprocessorEnvironment) {      
832       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
833       zk = rsEnv.getRegionServerServices().getZooKeeper();      
834     } else if (env instanceof RegionCoprocessorEnvironment) {
835       // if running at region
836       regionEnv = (RegionCoprocessorEnvironment) env;
837       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
838       zk = regionEnv.getRegionServerServices().getZooKeeper();
839       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
840         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
841     }
842 
843     // set the user-provider.
844     this.userProvider = UserProvider.instantiate(env.getConfiguration());
845 
846     // set up the list of users with superuser privilege
847     User user = userProvider.getCurrent();
848     superusers = Lists.asList(user.getShortName(),
849       conf.getStrings(AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
850 
851     // If zk is null or IOException while obtaining auth manager,
852     // throw RuntimeException so that the coprocessor is unloaded.
853     if (zk != null) {
854       try {
855         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
856       } catch (IOException ioe) {
857         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
858       }
859     } else {
860       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
861     }
862   }
863 
864   public void stop(CoprocessorEnvironment env) {
865 
866   }
867 
868   @Override
869   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
870       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
871     Set<byte[]> families = desc.getFamiliesKeys();
872     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
873     for (byte[] family: families) {
874       familyMap.put(family, null);
875     }
876     requireGlobalPermission("createTable", Action.CREATE, desc.getTableName(), familyMap);
877   }
878 
879   @Override
880   public void preCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
881       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {}
882 
883   @Override
884   public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
885       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {}
886 
887   @Override
888   public void postCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
889       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
890     // When AC is used, it should be configured as the 1st CP.
891     // In Master, the table operations like create, are handled by a Thread pool but the max size
892     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
893     // sequentially only.
894     // Related code in HMaster#startServiceThreads
895     // {code}
896     //   // We depend on there being only one instance of this executor running
897     //   // at a time. To do concurrency, would need fencing of enable/disable of
898     //   // tables.
899     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
900     // {code}
901     // In future if we change this pool to have more threads, then there is a chance for thread,
902     // creating acl table, getting delayed and by that time another table creation got over and
903     // this hook is getting called. In such a case, we will need a wait logic here which will
904     // wait till the acl table is created.
905     if (AccessControlLists.isAclTable(desc)) {
906       this.aclTabAvailable = true;
907     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
908       if (!aclTabAvailable) {
909         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
910             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
911             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
912       } else {
913         String owner = desc.getOwnerString();
914         // default the table owner to current user, if not specified.
915         if (owner == null)
916           owner = getActiveUser().getShortName();
917         UserPermission userperm = new UserPermission(Bytes.toBytes(owner), desc.getTableName(),
918             null, Action.values());
919         AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(), userperm);
920       }
921     }
922   }
923 
924   @Override
925   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
926       throws IOException {
927     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
928   }
929 
930   @Override
931   public void preDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
932       TableName tableName) throws IOException {}
933 
934   @Override
935   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
936       TableName tableName) throws IOException {
937     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(), tableName);
938   }
939 
940   @Override
941   public void postDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
942       TableName tableName) throws IOException {}
943 
944    @Override
945   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
946       throws IOException {
947     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
948   }
949   @Override
950   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
951       TableName tableName) throws IOException {
952   }
953   @Override
954   public void preTruncateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
955       TableName tableName) throws IOException {}
956   @Override
957   public void postTruncateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
958       TableName tableName) throws IOException {}
959 
960   @Override
961   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
962       HTableDescriptor htd) throws IOException {
963     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
964   }
965 
966   @Override
967   public void preModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
968       TableName tableName, HTableDescriptor htd) throws IOException {}
969 
970   @Override
971   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
972       TableName tableName, HTableDescriptor htd) throws IOException {
973     String owner = htd.getOwnerString();
974     // default the table owner to current user, if not specified.
975     if (owner == null) owner = getActiveUser().getShortName();
976     UserPermission userperm = new UserPermission(Bytes.toBytes(owner), htd.getTableName(), null,
977         Action.values());
978     AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(), userperm);
979   }
980 
981   @Override
982   public void postModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
983       TableName tableName, HTableDescriptor htd) throws IOException {}
984 
985 
986   @Override
987   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
988       HColumnDescriptor column) throws IOException {
989     requirePermission("addColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
990   }
991 
992   @Override
993   public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
994       TableName tableName, HColumnDescriptor column) throws IOException {}
995 
996   @Override
997   public void postAddColumn(ObserverContext<MasterCoprocessorEnvironment> c,
998       TableName tableName, HColumnDescriptor column) throws IOException {}
999 
1000   @Override
1001   public void postAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1002       TableName tableName, HColumnDescriptor column) throws IOException {}
1003 
1004   @Override
1005   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1006       HColumnDescriptor descriptor) throws IOException {
1007     requirePermission("modifyColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
1008   }
1009 
1010   @Override
1011   public void preModifyColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1012       TableName tableName, HColumnDescriptor descriptor) throws IOException {}
1013 
1014   @Override
1015   public void postModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c,
1016       TableName tableName, HColumnDescriptor descriptor) throws IOException {}
1017 
1018   @Override
1019   public void postModifyColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1020       TableName tableName, HColumnDescriptor descriptor) throws IOException {}
1021 
1022   @Override
1023   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1024       byte[] col) throws IOException {
1025     requirePermission("deleteColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
1026   }
1027 
1028   @Override
1029   public void preDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1030       TableName tableName, byte[] col) throws IOException {}
1031 
1032   @Override
1033   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
1034       TableName tableName, byte[] col) throws IOException {
1035     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(), tableName, col);
1036   }
1037 
1038   @Override
1039   public void postDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1040       TableName tableName, byte[] col) throws IOException {}
1041 
1042   @Override
1043   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1044       throws IOException {
1045     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1046   }
1047 
1048   @Override
1049   public void preEnableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1050       TableName tableName) throws IOException {}
1051 
1052   @Override
1053   public void postEnableTable(ObserverContext<MasterCoprocessorEnvironment> c,
1054       TableName tableName) throws IOException {}
1055 
1056   @Override
1057   public void postEnableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1058       TableName tableName) throws IOException {}
1059 
1060   @Override
1061   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1062       throws IOException {
1063     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1064       throw new AccessDeniedException("Not allowed to disable "
1065           + AccessControlLists.ACL_TABLE_NAME + " table.");
1066     }
1067     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1068   }
1069 
1070   @Override
1071   public void preDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1072       TableName tableName) throws IOException {}
1073 
1074   @Override
1075   public void postDisableTable(ObserverContext<MasterCoprocessorEnvironment> c,
1076       TableName tableName) throws IOException {}
1077 
1078   @Override
1079   public void postDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
1080       TableName tableName) throws IOException {}
1081 
1082   @Override
1083   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1084       ServerName srcServer, ServerName destServer) throws IOException {
1085     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1086   }
1087 
1088   @Override
1089   public void postMove(ObserverContext<MasterCoprocessorEnvironment> c,
1090       HRegionInfo region, ServerName srcServer, ServerName destServer)
1091     throws IOException {}
1092 
1093   @Override
1094   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1095       throws IOException {
1096     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1097   }
1098 
1099   @Override
1100   public void postAssign(ObserverContext<MasterCoprocessorEnvironment> c,
1101       HRegionInfo regionInfo) throws IOException {}
1102 
1103   @Override
1104   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1105       boolean force) throws IOException {
1106     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1107   }
1108 
1109   @Override
1110   public void postUnassign(ObserverContext<MasterCoprocessorEnvironment> c,
1111       HRegionInfo regionInfo, boolean force) throws IOException {}
1112 
1113   @Override
1114   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1115       HRegionInfo regionInfo) throws IOException {
1116     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1117   }
1118 
1119   @Override
1120   public void postRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1121       HRegionInfo regionInfo) throws IOException {
1122   }
1123 
1124   @Override
1125   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1126       throws IOException {
1127     requirePermission("balance", Action.ADMIN);
1128   }
1129 
1130   @Override
1131   public void postBalance(ObserverContext<MasterCoprocessorEnvironment> c, List<RegionPlan> plans)
1132       throws IOException {}
1133 
1134   @Override
1135   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1136       boolean newValue) throws IOException {
1137     requirePermission("balanceSwitch", Action.ADMIN);
1138     return newValue;
1139   }
1140 
1141   @Override
1142   public void postBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1143       boolean oldValue, boolean newValue) throws IOException {}
1144 
1145   @Override
1146   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1147       throws IOException {
1148     requirePermission("shutdown", Action.ADMIN);
1149   }
1150 
1151   @Override
1152   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1153       throws IOException {
1154     requirePermission("stopMaster", Action.ADMIN);
1155   }
1156 
1157   @Override
1158   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1159       throws IOException {
1160     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1161       .getShortCircuitConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1162       // initialize the ACL storage table
1163       AccessControlLists.init(ctx.getEnvironment().getMasterServices());
1164     } else {
1165       aclTabAvailable = true;
1166     }
1167   }
1168 
1169   @Override
1170   public void preMasterInitialization(
1171       ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
1172   }
1173 
1174   @Override
1175   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1176       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1177       throws IOException {
1178     requirePermission("snapshot", Action.ADMIN);
1179   }
1180 
1181   @Override
1182   public void postSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1183       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1184       throws IOException {
1185   }
1186 
1187   @Override
1188   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1189       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1190       throws IOException {
1191     requirePermission("clone", Action.ADMIN);
1192   }
1193 
1194   @Override
1195   public void postCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1196       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1197       throws IOException {
1198   }
1199 
1200   @Override
1201   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1202       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1203       throws IOException {
1204     requirePermission("restore", Action.ADMIN);
1205   }
1206 
1207   @Override
1208   public void postRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1209       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1210       throws IOException {
1211   }
1212 
1213   @Override
1214   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1215       final SnapshotDescription snapshot) throws IOException {
1216     requirePermission("deleteSnapshot", Action.ADMIN);
1217   }
1218 
1219   @Override
1220   public void postDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1221       final SnapshotDescription snapshot) throws IOException {
1222   }
1223 
1224   @Override
1225   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1226       NamespaceDescriptor ns) throws IOException {
1227     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1228   }
1229 
1230   @Override
1231   public void postCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1232       NamespaceDescriptor ns) throws IOException {
1233   }
1234 
1235   @Override
1236   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1237       throws IOException {
1238     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1239   }
1240 
1241   @Override
1242   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1243                                   String namespace) throws IOException {
1244     AccessControlLists.removeNamespacePermissions(ctx.getEnvironment().getConfiguration(),
1245         namespace);
1246     LOG.info(namespace + "entry deleted in "+AccessControlLists.ACL_TABLE_NAME+" table.");
1247   }
1248 
1249   @Override
1250   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1251       NamespaceDescriptor ns) throws IOException {
1252     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1253   }
1254 
1255   @Override
1256   public void postModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1257                                   NamespaceDescriptor ns) throws IOException {
1258   }
1259 
1260   @Override
1261   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1262       final TableName tableName) throws IOException {
1263     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1264   }
1265 
1266   @Override
1267   public void postTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1268       final TableName tableName) throws IOException {
1269   }
1270 
1271   /* ---- RegionObserver implementation ---- */
1272 
1273   @Override
1274   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1275       throws IOException {
1276     RegionCoprocessorEnvironment env = e.getEnvironment();
1277     final HRegion region = env.getRegion();
1278     if (region == null) {
1279       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1280     } else {
1281       HRegionInfo regionInfo = region.getRegionInfo();
1282       if (regionInfo.getTable().isSystemTable()) {
1283         isSystemOrSuperUser(regionEnv.getConfiguration());
1284       } else {
1285         requirePermission("preOpen", Action.ADMIN);
1286       }
1287     }
1288   }
1289 
1290   @Override
1291   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1292     RegionCoprocessorEnvironment env = c.getEnvironment();
1293     final HRegion region = env.getRegion();
1294     if (region == null) {
1295       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1296       return;
1297     }
1298     if (AccessControlLists.isAclRegion(region)) {
1299       aclRegion = true;
1300       // When this region is under recovering state, initialize will be handled by postLogReplay
1301       if (!region.isRecovering()) {
1302         try {
1303           initialize(env);
1304         } catch (IOException ex) {
1305           // if we can't obtain permissions, it's better to fail
1306           // than perform checks incorrectly
1307           throw new RuntimeException("Failed to initialize permissions cache", ex);
1308         }
1309       }
1310     } else {
1311       initialized = true;
1312     }
1313   }
1314 
1315   @Override
1316   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1317     if (aclRegion) {
1318       try {
1319         initialize(c.getEnvironment());
1320       } catch (IOException ex) {
1321         // if we can't obtain permissions, it's better to fail
1322         // than perform checks incorrectly
1323         throw new RuntimeException("Failed to initialize permissions cache", ex);
1324       }
1325     }
1326   }
1327 
1328   @Override
1329   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1330     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1331         Action.CREATE);
1332   }
1333 
1334   @Override
1335   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1336     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1337   }
1338   
1339   @Override
1340   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1341       byte[] splitRow) throws IOException {
1342     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1343   }
1344 
1345   @Override
1346   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1347       final Store store, final InternalScanner scanner, final ScanType scanType)
1348           throws IOException {
1349     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1350         Action.CREATE);
1351     return scanner;
1352   }
1353 
1354   @Override
1355   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1356       final byte [] row, final byte [] family, final Result result)
1357       throws IOException {
1358     assert family != null;
1359     RegionCoprocessorEnvironment env = c.getEnvironment();
1360     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, null);
1361     User user = getActiveUser();
1362     AuthResult authResult = permissionGranted(OpType.GET_CLOSEST_ROW_BEFORE, user, env, families,
1363       Action.READ);
1364     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1365       authResult.setAllowed(checkCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, env, row,
1366         families, HConstants.LATEST_TIMESTAMP, Action.READ));
1367       authResult.setReason("Covering cell set");
1368     }
1369     logResult(authResult);
1370     if (!authResult.isAllowed()) {
1371       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1372     }
1373   }
1374 
1375   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1376       final Query query, OpType opType) throws IOException {
1377     Filter filter = query.getFilter();
1378     // Don't wrap an AccessControlFilter
1379     if (filter != null && filter instanceof AccessControlFilter) {
1380       return;
1381     }
1382     User user = getActiveUser();
1383     RegionCoprocessorEnvironment env = c.getEnvironment();
1384     Map<byte[],? extends Collection<byte[]>> families = null;
1385     switch (opType) {
1386     case GET:
1387     case EXISTS:
1388       families = ((Get)query).getFamilyMap();
1389       break;
1390     case SCAN:
1391       families = ((Scan)query).getFamilyMap();
1392       break;
1393     default:
1394       throw new RuntimeException("Unhandled operation " + opType);
1395     }
1396     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1397     HRegion region = getRegion(env);
1398     TableName table = getTableName(region);
1399     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1400     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1401       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1402     }
1403     if (!authResult.isAllowed()) {
1404       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1405         // Old behavior: Scan with only qualifier checks if we have partial
1406         // permission. Backwards compatible behavior is to throw an
1407         // AccessDeniedException immediately if there are no grants for table
1408         // or CF or CF+qual. Only proceed with an injected filter if there are
1409         // grants for qualifiers. Otherwise we will fall through below and log
1410         // the result and throw an ADE. We may end up checking qualifier
1411         // grants three times (permissionGranted above, here, and in the
1412         // filter) but that's the price of backwards compatibility. 
1413         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1414           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1415             AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1416             cfVsMaxVersions);
1417           // wrap any existing filter
1418           if (filter != null) {
1419             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1420               Lists.newArrayList(ourFilter, filter));
1421           }
1422           authResult.setAllowed(true);;
1423           authResult.setReason("Access allowed with filter");
1424           switch (opType) {
1425           case GET:
1426           case EXISTS:
1427             ((Get)query).setFilter(ourFilter);
1428             break;
1429           case SCAN:
1430             ((Scan)query).setFilter(ourFilter);
1431             break;
1432           default:
1433             throw new RuntimeException("Unhandled operation " + opType);
1434           }
1435         }
1436       } else {
1437         // New behavior: Any access we might be granted is more fine-grained
1438         // than whole table or CF. Simply inject a filter and return what is
1439         // allowed. We will not throw an AccessDeniedException. This is a
1440         // behavioral change since 0.96. 
1441         Filter ourFilter = new AccessControlFilter(authManager, user, table,
1442           AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1443         // wrap any existing filter
1444         if (filter != null) {
1445           ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1446             Lists.newArrayList(ourFilter, filter));
1447         }
1448         authResult.setAllowed(true);;
1449         authResult.setReason("Access allowed with filter");
1450         switch (opType) {
1451         case GET:
1452         case EXISTS:
1453           ((Get)query).setFilter(ourFilter);
1454           break;
1455         case SCAN:
1456           ((Scan)query).setFilter(ourFilter);
1457           break;
1458         default:
1459           throw new RuntimeException("Unhandled operation " + opType);
1460         }
1461       }
1462     }
1463 
1464     logResult(authResult);
1465     if (!authResult.isAllowed()) {
1466       throw new AccessDeniedException("Insufficient permissions (table=" + table +
1467         ", action=READ)");
1468     }
1469   }
1470 
1471   @Override
1472   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1473       final Get get, final List<Cell> result) throws IOException {
1474     internalPreRead(c, get, OpType.GET);
1475   }
1476 
1477   @Override
1478   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1479       final Get get, final boolean exists) throws IOException {
1480     internalPreRead(c, get, OpType.EXISTS);
1481     return exists;
1482   }
1483 
1484   @Override
1485   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1486       final Put put, final WALEdit edit, final Durability durability)
1487       throws IOException {
1488     // Require WRITE permission to the table, CF, or top visible value, if any.
1489     // NOTE: We don't need to check the permissions for any earlier Puts
1490     // because we treat the ACLs in each Put as timestamped like any other
1491     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1492     // change the ACL of any previous Put. This allows simple evolution of
1493     // security policy over time without requiring expensive updates.
1494     User user = getActiveUser();
1495     checkForReservedTagPresence(user, put);
1496     RegionCoprocessorEnvironment env = c.getEnvironment();
1497     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1498     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1499     logResult(authResult);
1500     if (!authResult.isAllowed()) {
1501       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1502         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1503       } else {
1504         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1505       }
1506     }
1507     // Add cell ACLs from the operation to the cells themselves
1508     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1509     if (bytes != null) {
1510       if (cellFeaturesEnabled) {
1511         addCellPermissions(bytes, put.getFamilyCellMap());
1512       } else {
1513         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1514       }
1515     }
1516   }
1517 
1518   @Override
1519   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1520       final Put put, final WALEdit edit, final Durability durability) {
1521     if (aclRegion) {
1522       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1523     }
1524   }
1525 
1526   @Override
1527   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1528       final Delete delete, final WALEdit edit, final Durability durability)
1529       throws IOException {
1530     // An ACL on a delete is useless, we shouldn't allow it
1531     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1532       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1533     }
1534     // Require WRITE permissions on all cells covered by the delete. Unlike
1535     // for Puts we need to check all visible prior versions, because a major
1536     // compaction could remove them. If the user doesn't have permission to
1537     // overwrite any of the visible versions ('visible' defined as not covered
1538     // by a tombstone already) then we have to disallow this operation.
1539     RegionCoprocessorEnvironment env = c.getEnvironment();
1540     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1541     User user = getActiveUser();
1542     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1543     logResult(authResult);
1544     if (!authResult.isAllowed()) {
1545       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1546         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1547       } else {
1548         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1549       }
1550     }
1551   }
1552 
1553   @Override
1554   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1555       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1556     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1557       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1558       for (int i = 0; i < miniBatchOp.size(); i++) {
1559         Mutation m = miniBatchOp.getOperation(i);
1560         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1561           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1562           // perm check
1563           OpType opType;
1564           if (m instanceof Put) {
1565             checkForReservedTagPresence(getActiveUser(), m);
1566             opType = OpType.PUT;
1567           } else {
1568             opType = OpType.DELETE;
1569           }
1570           AuthResult authResult = null;
1571           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(), m.getFamilyCellMap(),
1572               m.getTimeStamp(), Action.WRITE)) {
1573             authResult = AuthResult.allow(opType.toString(), "Covering cell set", getActiveUser(),
1574                 Action.WRITE, table, m.getFamilyCellMap());
1575           } else {
1576             authResult = AuthResult.deny(opType.toString(), "Covering cell set", getActiveUser(),
1577                 Action.WRITE, table, m.getFamilyCellMap());
1578           }
1579           logResult(authResult);
1580           if (!authResult.isAllowed()) {
1581             throw new AccessDeniedException("Insufficient permissions "
1582                 + authResult.toContextString());
1583           }
1584         }
1585       }
1586     }
1587   }
1588 
1589   @Override
1590   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1591       final Delete delete, final WALEdit edit, final Durability durability)
1592       throws IOException {
1593     if (aclRegion) {
1594       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1595     }
1596   }
1597 
1598   @Override
1599   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1600       final byte [] row, final byte [] family, final byte [] qualifier,
1601       final CompareFilter.CompareOp compareOp,
1602       final ByteArrayComparable comparator, final Put put,
1603       final boolean result) throws IOException {
1604     // Require READ and WRITE permissions on the table, CF, and KV to update
1605     User user = getActiveUser();
1606     checkForReservedTagPresence(user, put);
1607     RegionCoprocessorEnvironment env = c.getEnvironment();
1608     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1609     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1610       Action.READ, Action.WRITE);
1611     logResult(authResult);
1612     if (!authResult.isAllowed()) {
1613       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1614         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1615       } else {
1616         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1617       }
1618     }
1619     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1620     if (bytes != null) {
1621       if (cellFeaturesEnabled) {
1622         addCellPermissions(bytes, put.getFamilyCellMap());
1623       } else {
1624         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1625       }
1626     }
1627     return result;
1628   }
1629 
1630   @Override
1631   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1632       final byte[] row, final byte[] family, final byte[] qualifier,
1633       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1634       final boolean result) throws IOException {
1635     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1636       // We had failure with table, cf and q perm checks and now giving a chance for cell
1637       // perm check
1638       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1639       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1640       AuthResult authResult = null;
1641       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1642           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1643         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1644             getActiveUser(), Action.READ, table, families);
1645       } else {
1646         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1647             getActiveUser(), Action.READ, table, families);
1648       }
1649       logResult(authResult);
1650       if (!authResult.isAllowed()) {
1651         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1652       }
1653     }
1654     return result;
1655   }
1656 
1657   @Override
1658   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1659       final byte [] row, final byte [] family, final byte [] qualifier,
1660       final CompareFilter.CompareOp compareOp,
1661       final ByteArrayComparable comparator, final Delete delete,
1662       final boolean result) throws IOException {
1663     // An ACL on a delete is useless, we shouldn't allow it
1664     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1665       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1666           delete.toString());
1667     }
1668     // Require READ and WRITE permissions on the table, CF, and the KV covered
1669     // by the delete
1670     RegionCoprocessorEnvironment env = c.getEnvironment();
1671     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1672     User user = getActiveUser();
1673     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1674       Action.READ, Action.WRITE);
1675     logResult(authResult);
1676     if (!authResult.isAllowed()) {
1677       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1678         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1679       } else {
1680         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1681       }
1682     }
1683     return result;
1684   }
1685 
1686   @Override
1687   public boolean preCheckAndDeleteAfterRowLock(
1688       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1689       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1690       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1691       throws IOException {
1692     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1693       // We had failure with table, cf and q perm checks and now giving a chance for cell
1694       // perm check
1695       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1696       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1697       AuthResult authResult = null;
1698       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1699           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1700         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1701             getActiveUser(), Action.READ, table, families);
1702       } else {
1703         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1704             getActiveUser(), Action.READ, table, families);
1705       }
1706       logResult(authResult);
1707       if (!authResult.isAllowed()) {
1708         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1709       }
1710     }
1711     return result;
1712   }
1713 
1714   @Override
1715   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1716       final byte [] row, final byte [] family, final byte [] qualifier,
1717       final long amount, final boolean writeToWAL)
1718       throws IOException {
1719     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1720     // incremented value
1721     RegionCoprocessorEnvironment env = c.getEnvironment();
1722     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1723     User user = getActiveUser();
1724     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1725       Action.WRITE);
1726     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1727       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1728         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1729       authResult.setReason("Covering cell set");
1730     }
1731     logResult(authResult);
1732     if (!authResult.isAllowed()) {
1733       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1734     }
1735     return -1;
1736   }
1737 
1738   @Override
1739   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1740       throws IOException {
1741     // Require WRITE permission to the table, CF, and the KV to be appended
1742     User user = getActiveUser();
1743     checkForReservedTagPresence(user, append);
1744     RegionCoprocessorEnvironment env = c.getEnvironment();
1745     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1746     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1747     logResult(authResult);
1748     if (!authResult.isAllowed()) {
1749       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1750         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1751       } else {
1752         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1753       }
1754     }
1755     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1756     if (bytes != null) {
1757       if (cellFeaturesEnabled) {
1758         addCellPermissions(bytes, append.getFamilyCellMap());
1759       } else {
1760         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1761       }
1762     }
1763     return null;
1764   }
1765 
1766   @Override
1767   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1768       final Append append) throws IOException {
1769     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1770       // We had failure with table, cf and q perm checks and now giving a chance for cell
1771       // perm check
1772       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1773       AuthResult authResult = null;
1774       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1775           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1776         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1777             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1778       } else {
1779         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1780             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1781       }
1782       logResult(authResult);
1783       if (!authResult.isAllowed()) {
1784         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1785       }
1786     }
1787     return null;
1788   }
1789 
1790   @Override
1791   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1792       final Increment increment)
1793       throws IOException {
1794     // Require WRITE permission to the table, CF, and the KV to be replaced by
1795     // the incremented value
1796     User user = getActiveUser();
1797     checkForReservedTagPresence(user, increment);
1798     RegionCoprocessorEnvironment env = c.getEnvironment();
1799     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1800     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1801       Action.WRITE);
1802     logResult(authResult);
1803     if (!authResult.isAllowed()) {
1804       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1805         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1806       } else {
1807         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1808       }
1809     }
1810     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1811     if (bytes != null) {
1812       if (cellFeaturesEnabled) {
1813         addCellPermissions(bytes, increment.getFamilyCellMap());
1814       } else {
1815         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1816       }
1817     }
1818     return null;
1819   }
1820 
1821   @Override
1822   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1823       final Increment increment) throws IOException {
1824     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1825       // We had failure with table, cf and q perm checks and now giving a chance for cell
1826       // perm check
1827       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1828       AuthResult authResult = null;
1829       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1830           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1831         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1832             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1833       } else {
1834         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1835             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1836       }
1837       logResult(authResult);
1838       if (!authResult.isAllowed()) {
1839         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1840       }
1841     }
1842     return null;
1843   }
1844 
1845   @Override
1846   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1847       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1848     // If the HFile version is insufficient to persist tags, we won't have any
1849     // work to do here
1850     if (!cellFeaturesEnabled) {
1851       return newCell;
1852     }
1853 
1854     // Collect any ACLs from the old cell
1855     List<Tag> tags = Lists.newArrayList();
1856     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1857     if (oldCell != null) {
1858       Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell.getTagsArray(),
1859           oldCell.getTagsOffset(), oldCell.getTagsLength());
1860       while (tagIterator.hasNext()) {
1861         Tag tag = tagIterator.next();
1862         if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
1863           if (LOG.isTraceEnabled()) {
1864             LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
1865               " length " + tag.getTagLength());
1866           }
1867           tags.add(tag);
1868         } else {
1869           ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
1870             AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
1871               tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
1872           perms.putAll(kvPerms);
1873         }
1874       }
1875     }
1876 
1877     // Do we have an ACL on the operation?
1878     byte[] aclBytes = mutation.getACL();
1879     if (aclBytes != null) {
1880       // Yes, use it
1881       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
1882     } else {
1883       // No, use what we carried forward
1884       if (perms != null) {
1885         // TODO: If we collected ACLs from more than one tag we may have a
1886         // List<Permission> of size > 1, this can be collapsed into a single
1887         // Permission
1888         if (LOG.isTraceEnabled()) {
1889           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1890         }
1891         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
1892           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
1893       }
1894     }
1895 
1896     // If we have no tags to add, just return
1897     if (tags.isEmpty()) {
1898       return newCell;
1899     }
1900 
1901     // We need to create another KV, unfortunately, because the current new KV
1902     // has no space for tags
1903     KeyValue newKv = KeyValueUtil.ensureKeyValue(newCell);
1904     KeyValue rewriteKv = new KeyValue(newKv.getRowArray(), newKv.getRowOffset(), newKv.getRowLength(),
1905       newKv.getFamilyArray(), newKv.getFamilyOffset(), newKv.getFamilyLength(),
1906       newKv.getQualifierArray(), newKv.getQualifierOffset(), newKv.getQualifierLength(),
1907       newKv.getTimestamp(), KeyValue.Type.codeToType(newKv.getTypeByte()),
1908       newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(),
1909       tags);
1910     // Preserve mvcc data
1911     rewriteKv.setSequenceId(newKv.getMvccVersion());
1912     return rewriteKv;
1913   }
1914 
1915   @Override
1916   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1917       final Scan scan, final RegionScanner s) throws IOException {
1918     internalPreRead(c, scan, OpType.SCAN);
1919     return s;
1920   }
1921 
1922   @Override
1923   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1924       final Scan scan, final RegionScanner s) throws IOException {
1925     User user = getActiveUser();
1926     if (user != null && user.getShortName() != null) {      // store reference to scanner owner for later checks
1927       scannerOwners.put(s, user.getShortName());
1928     }
1929     return s;
1930   }
1931 
1932   @Override
1933   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1934       final InternalScanner s, final List<Result> result,
1935       final int limit, final boolean hasNext) throws IOException {
1936     requireScannerOwner(s);
1937     return hasNext;
1938   }
1939 
1940   @Override
1941   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1942       final InternalScanner s) throws IOException {
1943     requireScannerOwner(s);
1944   }
1945 
1946   @Override
1947   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1948       final InternalScanner s) throws IOException {
1949     // clean up any associated owner mapping
1950     scannerOwners.remove(s);
1951   }
1952 
1953   /**
1954    * Verify, when servicing an RPC, that the caller is the scanner owner.
1955    * If so, we assume that access control is correctly enforced based on
1956    * the checks performed in preScannerOpen()
1957    */
1958   private void requireScannerOwner(InternalScanner s)
1959       throws AccessDeniedException {
1960     if (RequestContext.isInRequestContext()) {
1961       String requestUserName = RequestContext.getRequestUserName();
1962       String owner = scannerOwners.get(s);
1963       if (owner != null && !owner.equals(requestUserName)) {
1964         throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
1965       }
1966     }
1967   }
1968 
1969   /**
1970    * Verifies user has WRITE privileges on
1971    * the Column Families involved in the bulkLoadHFile
1972    * request. Specific Column Write privileges are presently
1973    * ignored.
1974    */
1975   @Override
1976   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1977       List<Pair<byte[], String>> familyPaths) throws IOException {
1978     for(Pair<byte[],String> el : familyPaths) {
1979       requirePermission("preBulkLoadHFile",
1980           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
1981           el.getFirst(),
1982           null,
1983           Action.CREATE);
1984     }
1985   }
1986 
1987   private AuthResult hasSomeAccess(RegionCoprocessorEnvironment e, String method, Action action) throws IOException {
1988     User requestUser = getActiveUser();
1989     TableName tableName = e.getRegion().getTableDesc().getTableName();
1990     AuthResult authResult = permissionGranted(method, requestUser,
1991         action, e, Collections.EMPTY_MAP);
1992     if (!authResult.isAllowed()) {
1993       for(UserPermission userPerm:
1994           AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), tableName)) {
1995         for(Action userAction: userPerm.getActions()) {
1996           if(userAction.equals(action)) {
1997             return AuthResult.allow(method, "Access allowed", requestUser,
1998                 action, tableName, null, null);
1999           }
2000         }
2001       }
2002     }
2003     return authResult;
2004   }
2005 
2006   /**
2007    * Authorization check for
2008    * SecureBulkLoadProtocol.prepareBulkLoad()
2009    * @param e
2010    * @throws IOException
2011    */
2012   //TODO this should end up as a coprocessor hook
2013   public void prePrepareBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
2014     AuthResult authResult = hasSomeAccess(e, "prePrepareBulkLoad", Action.WRITE);
2015     logResult(authResult);
2016     if (!authResult.isAllowed()) {
2017       throw new AccessDeniedException("Insufficient permissions (table=" +
2018         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
2019     }
2020   }
2021 
2022   /**
2023    * Authorization security check for
2024    * SecureBulkLoadProtocol.cleanupBulkLoad()
2025    * @param e
2026    * @throws IOException
2027    */
2028   //TODO this should end up as a coprocessor hook
2029   public void preCleanupBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
2030     AuthResult authResult = hasSomeAccess(e, "preCleanupBulkLoad", Action.WRITE);
2031     logResult(authResult);
2032     if (!authResult.isAllowed()) {
2033       throw new AccessDeniedException("Insufficient permissions (table=" +
2034         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
2035     }
2036   }
2037 
2038   /* ---- EndpointObserver implementation ---- */
2039 
2040   @Override
2041   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2042       Service service, String methodName, Message request) throws IOException {
2043     // Don't intercept calls to our own AccessControlService, we check for
2044     // appropriate permissions in the service handlers
2045     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2046       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
2047         methodName + ")",
2048         getTableName(ctx.getEnvironment()), null, null,
2049         Action.EXEC);
2050     }
2051     return request;
2052   }
2053 
2054   @Override
2055   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2056       Service service, String methodName, Message request, Message.Builder responseBuilder)
2057       throws IOException { }
2058 
2059   /* ---- Protobuf AccessControlService implementation ---- */
2060 
2061   @Override
2062   public void grant(RpcController controller,
2063                     AccessControlProtos.GrantRequest request,
2064                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2065     UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2066     AccessControlProtos.GrantResponse response = null;
2067     try {
2068       // verify it's only running at .acl.
2069       if (aclRegion) {
2070         if (!initialized) {
2071           throw new CoprocessorException("AccessController not yet initialized");
2072         }
2073         if (LOG.isDebugEnabled()) {
2074           LOG.debug("Received request to grant access permission " + perm.toString());
2075         }
2076 
2077         switch(request.getUserPermission().getPermission().getType()) {
2078           case Global :
2079           case Table :
2080             requirePermission("grant", perm.getTableName(), perm.getFamily(),
2081                 perm.getQualifier(), Action.ADMIN);
2082             break;
2083           case Namespace :
2084             requireGlobalPermission("grant", Action.ADMIN, perm.getNamespace());
2085         }
2086 
2087         AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2088         if (AUDITLOG.isTraceEnabled()) {
2089           // audit log should store permission changes in addition to auth results
2090           AUDITLOG.trace("Granted permission " + perm.toString());
2091         }
2092       } else {
2093         throw new CoprocessorException(AccessController.class, "This method "
2094             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2095       }
2096       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2097     } catch (IOException ioe) {
2098       // pass exception back up
2099       ResponseConverter.setControllerException(controller, ioe);
2100     }
2101     done.run(response);
2102   }
2103 
2104   @Override
2105   public void revoke(RpcController controller,
2106                      AccessControlProtos.RevokeRequest request,
2107                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2108     UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2109     AccessControlProtos.RevokeResponse response = null;
2110     try {
2111       // only allowed to be called on _acl_ region
2112       if (aclRegion) {
2113         if (!initialized) {
2114           throw new CoprocessorException("AccessController not yet initialized");
2115         }
2116         if (LOG.isDebugEnabled()) {
2117           LOG.debug("Received request to revoke access permission " + perm.toString());
2118         }
2119 
2120         switch(request.getUserPermission().getPermission().getType()) {
2121           case Global :
2122           case Table :
2123             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2124                               perm.getQualifier(), Action.ADMIN);
2125             break;
2126           case Namespace :
2127             requireGlobalPermission("revoke", Action.ADMIN, perm.getNamespace());
2128         }
2129 
2130         AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2131         if (AUDITLOG.isTraceEnabled()) {
2132           // audit log should record all permission changes
2133           AUDITLOG.trace("Revoked permission " + perm.toString());
2134         }
2135       } else {
2136         throw new CoprocessorException(AccessController.class, "This method "
2137             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2138       }
2139       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2140     } catch (IOException ioe) {
2141       // pass exception back up
2142       ResponseConverter.setControllerException(controller, ioe);
2143     }
2144     done.run(response);
2145   }
2146 
2147   @Override
2148   public void getUserPermissions(RpcController controller,
2149                                  AccessControlProtos.GetUserPermissionsRequest request,
2150                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2151     AccessControlProtos.GetUserPermissionsResponse response = null;
2152     try {
2153       // only allowed to be called on _acl_ region
2154       if (aclRegion) {
2155         if (!initialized) {
2156           throw new CoprocessorException("AccessController not yet initialized");
2157         }
2158         List<UserPermission> perms = null;
2159         if(request.getType() == AccessControlProtos.Permission.Type.Table) {
2160           TableName table = null;
2161           if (request.hasTableName()) {
2162             table = ProtobufUtil.toTableName(request.getTableName());
2163           }
2164           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2165 
2166           perms = AccessControlLists.getUserTablePermissions(
2167               regionEnv.getConfiguration(), table);
2168         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2169           perms = AccessControlLists.getUserNamespacePermissions(
2170               regionEnv.getConfiguration(), request.getNamespaceName().toStringUtf8());
2171         } else {
2172           perms = AccessControlLists.getUserPermissions(
2173               regionEnv.getConfiguration(), null);
2174         }
2175         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2176       } else {
2177         throw new CoprocessorException(AccessController.class, "This method "
2178             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2179       }
2180     } catch (IOException ioe) {
2181       // pass exception back up
2182       ResponseConverter.setControllerException(controller, ioe);
2183     }
2184     done.run(response);
2185   }
2186 
2187   @Override
2188   public void checkPermissions(RpcController controller,
2189                                AccessControlProtos.CheckPermissionsRequest request,
2190                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2191     Permission[] permissions = new Permission[request.getPermissionCount()];
2192     for (int i=0; i < request.getPermissionCount(); i++) {
2193       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2194     }
2195     AccessControlProtos.CheckPermissionsResponse response = null;
2196     try {
2197       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2198       for (Permission permission : permissions) {
2199         if (permission instanceof TablePermission) {
2200           TablePermission tperm = (TablePermission) permission;
2201           for (Action action : permission.getActions()) {
2202             if (!tperm.getTableName().equals(tableName)) {
2203               throw new CoprocessorException(AccessController.class, String.format("This method "
2204                   + "can only execute at the table specified in TablePermission. " +
2205                   "Table of the region:%s , requested table:%s", tableName,
2206                   tperm.getTableName()));
2207             }
2208 
2209             Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2210             if (tperm.getFamily() != null) {
2211               if (tperm.getQualifier() != null) {
2212                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2213                 qualifiers.add(tperm.getQualifier());
2214                 familyMap.put(tperm.getFamily(), qualifiers);
2215               } else {
2216                 familyMap.put(tperm.getFamily(), null);
2217               }
2218             }
2219 
2220             requirePermission("checkPermissions", action, regionEnv, familyMap);
2221           }
2222 
2223         } else {
2224           for (Action action : permission.getActions()) {
2225             requirePermission("checkPermissions", action);
2226           }
2227         }
2228       }
2229       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2230     } catch (IOException ioe) {
2231       ResponseConverter.setControllerException(controller, ioe);
2232     }
2233     done.run(response);
2234   }
2235 
2236   @Override
2237   public Service getService() {
2238     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2239   }
2240 
2241   private HRegion getRegion(RegionCoprocessorEnvironment e) {
2242     return e.getRegion();
2243   }
2244 
2245   private TableName getTableName(RegionCoprocessorEnvironment e) {
2246     HRegion region = e.getRegion();
2247     if (region != null) {
2248       return getTableName(region);
2249     }
2250     return null;
2251   }
2252 
2253   private TableName getTableName(HRegion region) {
2254     HRegionInfo regionInfo = region.getRegionInfo();
2255     if (regionInfo != null) {
2256       return regionInfo.getTable();
2257     }
2258     return null;
2259   }
2260 
2261   @Override
2262   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2263       throws IOException {
2264     requirePermission("preClose", Action.ADMIN);
2265   }
2266 
2267   private void isSystemOrSuperUser(Configuration conf) throws IOException {
2268     User user = userProvider.getCurrent();
2269     if (user == null) {
2270       throw new IOException("Unable to obtain the current user, " +
2271         "authorization checks for internal operations will not work correctly!");
2272     }
2273     User activeUser = getActiveUser();
2274     if (!(superusers.contains(activeUser.getShortName()))) {
2275       throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null") +
2276         "is not system or super user.");
2277     }
2278   }
2279 
2280   @Override
2281   public void preStopRegionServer(
2282       ObserverContext<RegionServerCoprocessorEnvironment> env)
2283       throws IOException {
2284     requirePermission("preStopRegionServer", Action.ADMIN);
2285   }
2286 
2287   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2288       byte[] qualifier) {
2289     if (family == null) {
2290       return null;
2291     }
2292 
2293     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2294     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2295     return familyMap;
2296   }
2297 
2298   @Override
2299   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2300       List<TableName> tableNamesList,
2301       List<HTableDescriptor> descriptors) throws IOException {
2302     // If the list is empty, this is a request for all table descriptors and requires GLOBAL
2303     // ADMIN privs.
2304     if (tableNamesList == null || tableNamesList.isEmpty()) {
2305       requireGlobalPermission("getTableDescriptors", Action.ADMIN, null, null);
2306     }
2307     // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2308     // request can be granted.
2309     else {
2310       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2311       for (TableName tableName: tableNamesList) {
2312         // Do not deny if the table does not exist
2313         try {
2314           masterServices.checkTableModifiable(tableName);
2315         } catch (TableNotFoundException ex) {
2316           // Skip checks for a table that does not exist
2317           continue;
2318         } catch (TableNotDisabledException ex) {
2319           // We don't care about this
2320         }
2321         requirePermission("getTableDescriptors", tableName, null, null,
2322           Action.ADMIN, Action.CREATE);
2323       }
2324     }
2325   }
2326 
2327   @Override
2328   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2329       List<HTableDescriptor> descriptors) throws IOException {
2330   }
2331 
2332   @Override
2333   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA,
2334       HRegion regionB) throws IOException {
2335     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2336       Action.ADMIN);
2337   }
2338 
2339   @Override
2340   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, HRegion regionA,
2341       HRegion regionB, HRegion mergedRegion) throws IOException { }
2342 
2343   @Override
2344   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2345       HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException { }
2346 
2347   @Override
2348   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2349       HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException { }
2350 
2351   @Override
2352   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2353       HRegion regionA, HRegion regionB) throws IOException { }
2354 
2355   @Override
2356   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2357       HRegion regionA, HRegion regionB) throws IOException { }
2358 }