View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package org.apache.hadoop.hbase.security.access;
16  
17  import java.io.IOException;
18  import java.net.InetAddress;
19  import java.security.PrivilegedExceptionAction;
20  import java.util.Collection;
21  import java.util.Collections;
22  import java.util.HashMap;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  import java.util.Set;
28  import java.util.TreeMap;
29  import java.util.TreeSet;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.classification.InterfaceAudience;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Cell;
36  import org.apache.hadoop.hbase.CellScanner;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.CompoundConfiguration;
39  import org.apache.hadoop.hbase.CoprocessorEnvironment;
40  import org.apache.hadoop.hbase.DoNotRetryIOException;
41  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.KeyValue;
47  import org.apache.hadoop.hbase.KeyValue.Type;
48  import org.apache.hadoop.hbase.KeyValueUtil;
49  import org.apache.hadoop.hbase.NamespaceDescriptor;
50  import org.apache.hadoop.hbase.ServerName;
51  import org.apache.hadoop.hbase.TableName;
52  import org.apache.hadoop.hbase.TableNotDisabledException;
53  import org.apache.hadoop.hbase.TableNotFoundException;
54  import org.apache.hadoop.hbase.Tag;
55  import org.apache.hadoop.hbase.MetaTableAccessor;
56  import org.apache.hadoop.hbase.client.Append;
57  import org.apache.hadoop.hbase.client.Delete;
58  import org.apache.hadoop.hbase.client.Durability;
59  import org.apache.hadoop.hbase.client.Get;
60  import org.apache.hadoop.hbase.client.Increment;
61  import org.apache.hadoop.hbase.client.Mutation;
62  import org.apache.hadoop.hbase.client.Put;
63  import org.apache.hadoop.hbase.client.Query;
64  import org.apache.hadoop.hbase.client.Result;
65  import org.apache.hadoop.hbase.client.Scan;
66  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
67  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
68  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
69  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
70  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
71  import org.apache.hadoop.hbase.coprocessor.MasterObserver;
72  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
73  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
74  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
75  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
76  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
77  import org.apache.hadoop.hbase.filter.CompareFilter;
78  import org.apache.hadoop.hbase.filter.Filter;
79  import org.apache.hadoop.hbase.filter.FilterList;
80  import org.apache.hadoop.hbase.io.hfile.HFile;
81  import org.apache.hadoop.hbase.ipc.RequestContext;
82  import org.apache.hadoop.hbase.master.MasterServices;
83  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
84  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
85  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
86  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
87  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
88  import org.apache.hadoop.hbase.regionserver.HRegion;
89  import org.apache.hadoop.hbase.regionserver.InternalScanner;
90  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
91  import org.apache.hadoop.hbase.regionserver.RegionScanner;
92  import org.apache.hadoop.hbase.regionserver.ScanType;
93  import org.apache.hadoop.hbase.regionserver.Store;
94  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
95  import org.apache.hadoop.hbase.security.AccessDeniedException;
96  import org.apache.hadoop.hbase.security.User;
97  import org.apache.hadoop.hbase.security.UserProvider;
98  import org.apache.hadoop.hbase.security.access.Permission.Action;
99  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
100 import org.apache.hadoop.hbase.util.ByteRange;
101 import org.apache.hadoop.hbase.util.Bytes;
102 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
103 import org.apache.hadoop.hbase.util.Pair;
104 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
105 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
106 
107 import com.google.common.collect.ArrayListMultimap;
108 import com.google.common.collect.ImmutableSet;
109 import com.google.common.collect.ListMultimap;
110 import com.google.common.collect.Lists;
111 import com.google.common.collect.MapMaker;
112 import com.google.common.collect.Maps;
113 import com.google.common.collect.Sets;
114 import com.google.protobuf.Message;
115 import com.google.protobuf.RpcCallback;
116 import com.google.protobuf.RpcController;
117 import com.google.protobuf.Service;
118 
119 /**
120  * Provides basic authorization checks for data access and administrative
121  * operations.
122  *
123  * <p>
124  * {@code AccessController} performs authorization checks for HBase operations
125  * based on:
126  * <ul>
127  *   <li>the identity of the user performing the operation</li>
128  *   <li>the scope over which the operation is performed, in increasing
129  *   specificity: global, table, column family, or qualifier</li>
130  *   <li>the type of action being performed (as mapped to
131  *   {@link Permission.Action} values)</li>
132  * </ul>
133  * If the authorization check fails, an {@link AccessDeniedException}
134  * will be thrown for the operation.
135  * </p>
136  *
137  * <p>
138  * To perform authorization checks, {@code AccessController} relies on the
139  * RpcServerEngine being loaded to provide
140  * the user identities for remote requests.
141  * </p>
142  *
143  * <p>
144  * The access control lists used for authorization can be manipulated via the
145  * exposed {@link AccessControlService} Interface implementation, and the associated
146  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
147  * commands.
148  * </p>
149  */
150 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
151 public class AccessController extends BaseMasterAndRegionObserver
152     implements RegionServerObserver,
153       AccessControlService.Interface, CoprocessorService, EndpointObserver {
154 
155   public static final Log LOG = LogFactory.getLog(AccessController.class);
156 
157   private static final Log AUDITLOG =
158     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
159   private static final String CHECK_COVERING_PERM = "check_covering_perm";
160   private static final String TAG_CHECK_PASSED = "tag_check_passed";
161   private static final byte[] TRUE = Bytes.toBytes(true);
162 
163   TableAuthManager authManager = null;
164 
165   // flags if we are running on a region of the _acl_ table
166   boolean aclRegion = false;
167 
168   // defined only for Endpoint implementation, so it can have way to
169   // access region services.
170   private RegionCoprocessorEnvironment regionEnv;
171 
172   /** Mapping of scanner instances to the user who created them */
173   private Map<InternalScanner,String> scannerOwners =
174       new MapMaker().weakKeys().makeMap();
175 
176   // Provider for mapping principal names to Users
177   private UserProvider userProvider;
178 
179   // The list of users with superuser authority
180   private List<String> superusers;
181 
182   // if we are able to support cell ACLs
183   boolean cellFeaturesEnabled;
184 
185   // if we should check EXEC permissions
186   boolean shouldCheckExecPermission;
187 
188   // if we should terminate access checks early as soon as table or CF grants
189   // allow access; pre-0.98 compatible behavior
190   boolean compatibleEarlyTermination;
191 
192   private volatile boolean initialized = false;
193 
194   // This boolean having relevance only in the Master.
195   private volatile boolean aclTabAvailable = false;
196 
197   public HRegion getRegion() {
198     return regionEnv != null ? regionEnv.getRegion() : null;
199   }
200 
201   public TableAuthManager getAuthManager() {
202     return authManager;
203   }
204 
205   void initialize(RegionCoprocessorEnvironment e) throws IOException {
206     final HRegion region = e.getRegion();
207     Configuration conf = e.getConfiguration();
208     Map<byte[], ListMultimap<String,TablePermission>> tables =
209         AccessControlLists.loadAll(region);
210     // For each table, write out the table's permissions to the respective
211     // znode for that table.
212     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
213       tables.entrySet()) {
214       byte[] entry = t.getKey();
215       ListMultimap<String,TablePermission> perms = t.getValue();
216       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
217       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
218     }
219     initialized = true;
220   }
221 
222   /**
223    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
224    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
225    * table updates.
226    */
227   void updateACL(RegionCoprocessorEnvironment e,
228       final Map<byte[], List<Cell>> familyMap) {
229     Set<byte[]> entries =
230         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
231     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
232       List<Cell> cells = f.getValue();
233       for (Cell cell: cells) {
234         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
235         if (Bytes.equals(kv.getFamilyArray(), kv.getFamilyOffset(),
236             kv.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
237             AccessControlLists.ACL_LIST_FAMILY.length)) {
238           entries.add(kv.getRow());
239         }
240       }
241     }
242     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
243     Configuration conf = regionEnv.getConfiguration();
244     for (byte[] entry: entries) {
245       try {
246         ListMultimap<String,TablePermission> perms =
247           AccessControlLists.getPermissions(conf, entry);
248         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
249         zkw.writeToZookeeper(entry, serialized);
250       } catch (IOException ex) {
251         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
252             ex);
253       }
254     }
255   }
256 
257   /**
258    * Check the current user for authorization to perform a specific action
259    * against the given set of row data.
260    *
261    * <p>Note: Ordering of the authorization checks
262    * has been carefully optimized to short-circuit the most common requests
263    * and minimize the amount of processing required.</p>
264    *
265    * @param permRequest the action being requested
266    * @param e the coprocessor environment
267    * @param families the map of column families to qualifiers present in
268    * the request
269    * @return an authorization result
270    */
271   AuthResult permissionGranted(String request, User user, Action permRequest,
272       RegionCoprocessorEnvironment e,
273       Map<byte [], ? extends Collection<?>> families) {
274     HRegionInfo hri = e.getRegion().getRegionInfo();
275     TableName tableName = hri.getTable();
276 
277     // 1. All users need read access to hbase:meta table.
278     // this is a very common operation, so deal with it quickly.
279     if (hri.isMetaRegion()) {
280       if (permRequest == Action.READ) {
281         return AuthResult.allow(request, "All users allowed", user,
282           permRequest, tableName, families);
283       }
284     }
285 
286     if (user == null) {
287       return AuthResult.deny(request, "No user associated with request!", null,
288         permRequest, tableName, families);
289     }
290 
291     // Users with CREATE/ADMIN rights need to modify hbase:meta and _acl_ table
292     // e.g. When a new table is created a new entry in hbase:meta is added,
293     // so the user need to be allowed to write on it.
294     // e.g. When a table is removed an entry is removed from hbase:meta and _acl_
295     // and the user need to be allowed to write on both tables.
296     if (permRequest == Action.WRITE &&
297        (hri.isMetaRegion() ||
298         Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) &&
299        (authManager.authorize(user, Action.CREATE) ||
300         authManager.authorize(user, Action.ADMIN)))
301     {
302        return AuthResult.allow(request, "Table permission granted", user,
303         permRequest, tableName, families);
304     }
305 
306     // 2. check for the table-level, if successful we can short-circuit
307     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
308       return AuthResult.allow(request, "Table permission granted", user,
309         permRequest, tableName, families);
310     }
311 
312     // 3. check permissions against the requested families
313     if (families != null && families.size() > 0) {
314       // all families must pass
315       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
316         // a) check for family level access
317         if (authManager.authorize(user, tableName, family.getKey(),
318             permRequest)) {
319           continue;  // family-level permission overrides per-qualifier
320         }
321 
322         // b) qualifier level access can still succeed
323         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
324           if (family.getValue() instanceof Set) {
325             // for each qualifier of the family
326             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
327             for (byte[] qualifier : familySet) {
328               if (!authManager.authorize(user, tableName, family.getKey(),
329                                          qualifier, permRequest)) {
330                 return AuthResult.deny(request, "Failed qualifier check", user,
331                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
332               }
333             }
334           } else if (family.getValue() instanceof List) { // List<KeyValue>
335             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
336             for (KeyValue kv : kvList) {
337               if (!authManager.authorize(user, tableName, family.getKey(),
338                       kv.getQualifier(), permRequest)) {
339                 return AuthResult.deny(request, "Failed qualifier check", user,
340                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
341               }
342             }
343           }
344         } else {
345           // no qualifiers and family-level check already failed
346           return AuthResult.deny(request, "Failed family check", user, permRequest,
347               tableName, makeFamilyMap(family.getKey(), null));
348         }
349       }
350 
351       // all family checks passed
352       return AuthResult.allow(request, "All family checks passed", user, permRequest,
353           tableName, families);
354     }
355 
356     // 4. no families to check and table level access failed
357     return AuthResult.deny(request, "No families to check and table permission failed",
358         user, permRequest, tableName, families);
359   }
360 
361   /**
362    * Check the current user for authorization to perform a specific action
363    * against the given set of row data.
364    * @param opType the operation type
365    * @param user the user
366    * @param e the coprocessor environment
367    * @param families the map of column families to qualifiers present in
368    * the request
369    * @param actions the desired actions
370    * @return an authorization result
371    */
372   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
373       Map<byte [], ? extends Collection<?>> families, Action... actions) {
374     AuthResult result = null;
375     for (Action action: actions) {
376       result = permissionGranted(opType.toString(), user, action, e, families);
377       if (!result.isAllowed()) {
378         return result;
379       }
380     }
381     return result;
382   }
383 
384   private void logResult(AuthResult result) {
385     if (AUDITLOG.isTraceEnabled()) {
386       RequestContext ctx = RequestContext.get();
387       InetAddress remoteAddr = null;
388       if (ctx != null) {
389         remoteAddr = ctx.getRemoteAddress();
390       }
391       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
392           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
393           "; reason: " + result.getReason() +
394           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
395           "; request: " + result.getRequest() +
396           "; context: " + result.toContextString());
397     }
398   }
399 
400   /**
401    * Returns the active user to which authorization checks should be applied.
402    * If we are in the context of an RPC call, the remote user is used,
403    * otherwise the currently logged in user is used.
404    */
405   private User getActiveUser() throws IOException {
406     User user = RequestContext.getRequestUser();
407     if (!RequestContext.isInRequestContext()) {
408       // for non-rpc handling, fallback to system user
409       user = userProvider.getCurrent();
410     }
411     return user;
412   }
413 
414   /**
415    * Authorizes that the current user has any of the given permissions for the
416    * given table, column family and column qualifier.
417    * @param tableName Table requested
418    * @param family Column family requested
419    * @param qualifier Column qualifier requested
420    * @throws IOException if obtaining the current user fails
421    * @throws AccessDeniedException if user has no authorization
422    */
423   private void requirePermission(String request, TableName tableName, byte[] family, byte[] qualifier,
424       Action... permissions) throws IOException {
425     User user = getActiveUser();
426     AuthResult result = null;
427 
428     for (Action permission : permissions) {
429       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
430         result = AuthResult.allow(request, "Table permission granted", user,
431                                   permission, tableName, family, qualifier);
432         break;
433       } else {
434         // rest of the world
435         result = AuthResult.deny(request, "Insufficient permissions", user,
436                                  permission, tableName, family, qualifier);
437       }
438     }
439     logResult(result);
440     if (!result.isAllowed()) {
441       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
442     }
443   }
444 
445   /**
446    * Authorizes that the current user has global privileges for the given action.
447    * @param perm The action being requested
448    * @throws IOException if obtaining the current user fails
449    * @throws AccessDeniedException if authorization is denied
450    */
451   private void requirePermission(String request, Action perm) throws IOException {
452     requireGlobalPermission(request, perm, null, null);
453   }
454 
455   /**
456    * Authorizes that the current user has permission to perform the given
457    * action on the set of table column families.
458    * @param perm Action that is required
459    * @param env The current coprocessor environment
460    * @param families The map of column families-qualifiers.
461    * @throws AccessDeniedException if the authorization check failed
462    */
463   private void requirePermission(String request, Action perm,
464         RegionCoprocessorEnvironment env,
465         Map<byte[], ? extends Collection<?>> families)
466       throws IOException {
467     User user = getActiveUser();
468     AuthResult result = permissionGranted(request, user, perm, env, families);
469     logResult(result);
470 
471     if (!result.isAllowed()) {
472       throw new AccessDeniedException("Insufficient permissions (table=" +
473         env.getRegion().getTableDesc().getTableName()+
474         ((families != null && families.size() > 0) ? ", family: " +
475         result.toFamilyString() : "") + ", action=" +
476         perm.toString() + ")");
477     }
478   }
479 
480   /**
481    * Checks that the user has the given global permission. The generated
482    * audit log message will contain context information for the operation
483    * being authorized, based on the given parameters.
484    * @param perm Action being requested
485    * @param tableName Affected table name.
486    * @param familyMap Affected column families.
487    */
488   private void requireGlobalPermission(String request, Action perm, TableName tableName,
489       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
490     User user = getActiveUser();
491     if (authManager.authorize(user, perm)) {
492       logResult(AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap));
493     } else {
494       logResult(AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap));
495       throw new AccessDeniedException("Insufficient permissions for user '" +
496           (user != null ? user.getShortName() : "null") +"' (global, action=" +
497           perm.toString() + ")");
498     }
499   }
500 
501   /**
502    * Checks that the user has the given global permission. The generated
503    * audit log message will contain context information for the operation
504    * being authorized, based on the given parameters.
505    * @param perm Action being requested
506    * @param namespace
507    */
508   private void requireGlobalPermission(String request, Action perm,
509                                        String namespace) throws IOException {
510     User user = getActiveUser();
511     if (authManager.authorize(user, perm)) {
512       logResult(AuthResult.allow(request, "Global check allowed", user, perm, namespace));
513     } else {
514       logResult(AuthResult.deny(request, "Global check failed", user, perm, namespace));
515       throw new AccessDeniedException("Insufficient permissions for user '" +
516           (user != null ? user.getShortName() : "null") +"' (global, action=" +
517           perm.toString() + ")");
518     }
519   }
520 
521   /**
522    * Returns <code>true</code> if the current user is allowed the given action
523    * over at least one of the column qualifiers in the given column families.
524    */
525   private boolean hasFamilyQualifierPermission(User user,
526       Action perm,
527       RegionCoprocessorEnvironment env,
528       Map<byte[], ? extends Collection<byte[]>> familyMap)
529     throws IOException {
530     HRegionInfo hri = env.getRegion().getRegionInfo();
531     TableName tableName = hri.getTable();
532 
533     if (user == null) {
534       return false;
535     }
536 
537     if (familyMap != null && familyMap.size() > 0) {
538       // at least one family must be allowed
539       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
540           familyMap.entrySet()) {
541         if (family.getValue() != null && !family.getValue().isEmpty()) {
542           for (byte[] qualifier : family.getValue()) {
543             if (authManager.matchPermission(user, tableName,
544                 family.getKey(), qualifier, perm)) {
545               return true;
546             }
547           }
548         } else {
549           if (authManager.matchPermission(user, tableName, family.getKey(),
550               perm)) {
551             return true;
552           }
553         }
554       }
555     } else if (LOG.isDebugEnabled()) {
556       LOG.debug("Empty family map passed for permission check");
557     }
558 
559     return false;
560   }
561 
562   private enum OpType {
563     GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
564     GET("get"),
565     EXISTS("exists"),
566     SCAN("scan"),
567     PUT("put"),
568     DELETE("delete"),
569     CHECK_AND_PUT("checkAndPut"),
570     CHECK_AND_DELETE("checkAndDelete"),
571     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
572     APPEND("append"),
573     INCREMENT("increment");
574 
575     private String type;
576 
577     private OpType(String type) {
578       this.type = type;
579     }
580 
581     @Override
582     public String toString() {
583       return type;
584     }
585   }
586 
587   /**
588    * Determine if cell ACLs covered by the operation grant access. This is expensive.
589    * @return false if cell ACLs failed to grant access, true otherwise
590    * @throws IOException
591    */
592   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
593       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
594       throws IOException {
595     if (!cellFeaturesEnabled) {
596       return false;
597     }
598     long cellGrants = 0;
599     User user = getActiveUser();
600     long latestCellTs = 0;
601     Get get = new Get(row);
602     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
603     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
604     // version. We have to get every cell version and check its TS against the TS asked for in
605     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
606     // consider only one such passing cell. In case of Delete we have to consider all the cell
607     // versions under this passing version. When Delete Mutation contains columns which are a
608     // version delete just consider only one version for those column cells.
609     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
610     if (considerCellTs) {
611       get.setMaxVersions();
612     } else {
613       get.setMaxVersions(1);
614     }
615     boolean diffCellTsFromOpTs = false;
616     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
617       byte[] col = entry.getKey();
618       // TODO: HBASE-7114 could possibly unify the collection type in family
619       // maps so we would not need to do this
620       if (entry.getValue() instanceof Set) {
621         Set<byte[]> set = (Set<byte[]>)entry.getValue();
622         if (set == null || set.isEmpty()) {
623           get.addFamily(col);
624         } else {
625           for (byte[] qual: set) {
626             get.addColumn(col, qual);
627           }
628         }
629       } else if (entry.getValue() instanceof List) {
630         List<Cell> list = (List<Cell>)entry.getValue();
631         if (list == null || list.isEmpty()) {
632           get.addFamily(col);
633         } else {
634           // In case of family delete, a Cell will be added into the list with Qualifier as null.
635           for (Cell cell : list) {
636             if (cell.getQualifierLength() == 0
637                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
638                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
639               get.addFamily(col);
640             } else {
641               get.addColumn(col, CellUtil.cloneQualifier(cell));
642             }
643             if (considerCellTs) {
644               long cellTs = cell.getTimestamp();
645               latestCellTs = Math.max(latestCellTs, cellTs);
646               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
647             }
648           }
649         }
650       } else {
651         throw new RuntimeException("Unhandled collection type " +
652           entry.getValue().getClass().getName());
653       }
654     }
655     // We want to avoid looking into the future. So, if the cells of the
656     // operation specify a timestamp, or the operation itself specifies a
657     // timestamp, then we use the maximum ts found. Otherwise, we bound
658     // the Get to the current server time. We add 1 to the timerange since
659     // the upper bound of a timerange is exclusive yet we need to examine
660     // any cells found there inclusively.
661     long latestTs = Math.max(opTs, latestCellTs);
662     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
663       latestTs = EnvironmentEdgeManager.currentTime();
664     }
665     get.setTimeRange(0, latestTs + 1);
666     // In case of Put operation we set to read all versions. This was done to consider the case
667     // where columns are added with TS other than the Mutation TS. But normally this wont be the
668     // case with Put. There no need to get all versions but get latest version only.
669     if (!diffCellTsFromOpTs && request == OpType.PUT) {
670       get.setMaxVersions(1);
671     }
672     if (LOG.isTraceEnabled()) {
673       LOG.trace("Scanning for cells with " + get);
674     }
675     // This Map is identical to familyMap. The key is a BR rather than byte[].
676     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
677     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
678     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
679     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
680       if (entry.getValue() instanceof List) {
681         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
682       }
683     }
684     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
685     List<Cell> cells = Lists.newArrayList();
686     Cell prevCell = null;
687     ByteRange curFam = new SimpleMutableByteRange();
688     boolean curColAllVersions = (request == OpType.DELETE);
689     long curColCheckTs = opTs;
690     boolean foundColumn = false;
691     try {
692       boolean more = false;
693       do {
694         cells.clear();
695         // scan with limit as 1 to hold down memory use on wide rows
696         more = scanner.next(cells, 1);
697         for (Cell cell: cells) {
698           if (LOG.isTraceEnabled()) {
699             LOG.trace("Found cell " + cell);
700           }
701           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
702           if (colChange) foundColumn = false;
703           prevCell = cell;
704           if (!curColAllVersions && foundColumn) {
705             continue;
706           }
707           if (colChange && considerCellTs) {
708             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
709             List<Cell> cols = familyMap1.get(curFam);
710             for (Cell col : cols) {
711               // null/empty qualifier is used to denote a Family delete. The TS and delete type
712               // associated with this is applicable for all columns within the family. That is
713               // why the below (col.getQualifierLength() == 0) check.
714               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
715                   || CellUtil.matchingQualifier(cell, col)) {
716                 byte type = col.getTypeByte();
717                 if (considerCellTs) {
718                   curColCheckTs = col.getTimestamp();
719                 }
720                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
721                 // a version delete for a column no need to check all the covering cells within
722                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
723                 // One version delete types are Delete/DeleteFamilyVersion
724                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
725                     || (KeyValue.Type.DeleteFamily.getCode() == type);
726                 break;
727               }
728             }
729           }
730           if (cell.getTimestamp() > curColCheckTs) {
731             // Just ignore this cell. This is not a covering cell.
732             continue;
733           }
734           foundColumn = true;
735           for (Action action: actions) {
736             // Are there permissions for this user for the cell?
737             if (!authManager.authorize(user, getTableName(e), cell, action)) {
738               // We can stop if the cell ACL denies access
739               return false;
740             }
741           }
742           cellGrants++;
743         }
744       } while (more);
745     } catch (AccessDeniedException ex) {
746       throw ex;
747     } catch (IOException ex) {
748       LOG.error("Exception while getting cells to calculate covering permission", ex);
749     } finally {
750       scanner.close();
751     }
752     // We should not authorize unless we have found one or more cell ACLs that
753     // grant access. This code is used to check for additional permissions
754     // after no table or CF grants are found.
755     return cellGrants > 0;
756   }
757 
758   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
759     // Iterate over the entries in the familyMap, replacing the cells therein
760     // with new cells including the ACL data
761     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
762       List<Cell> newCells = Lists.newArrayList();
763       for (Cell cell: e.getValue()) {
764         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
765         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
766         if (cell.getTagsLength() > 0) {
767           Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
768             cell.getTagsOffset(), cell.getTagsLength());
769           while (tagIterator.hasNext()) {
770             tags.add(tagIterator.next());
771           }
772         }
773         // Ensure KeyValue so we can do a scatter gather copy. This is only a win if the
774         // incoming cell type is actually KeyValue.
775         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
776         newCells.add(
777           new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
778             kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(),
779             kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
780             kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()),
781             kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(),
782             tags));
783       }
784       // This is supposed to be safe, won't CME
785       e.setValue(newCells);
786     }
787   }
788 
789   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
790   // type is reserved and should not be explicitly set by user.
791   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
792     // Superusers are allowed to store cells unconditionally.
793     if (superusers.contains(user.getShortName())) {
794       return;
795     }
796     // We already checked (prePut vs preBatchMutation)
797     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
798       return;
799     }
800     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
801       Cell cell = cellScanner.current();
802       if (cell.getTagsLength() > 0) {
803         Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
804           cell.getTagsLength());
805         while (tagsItr.hasNext()) {
806           if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
807             throw new AccessDeniedException("Mutation contains cell with reserved type tag");
808           }
809         }
810       }
811     }
812     m.setAttribute(TAG_CHECK_PASSED, TRUE);
813   }
814 
815   /* ---- MasterObserver implementation ---- */
816 
817   public void start(CoprocessorEnvironment env) throws IOException {
818     CompoundConfiguration conf = new CompoundConfiguration();
819     conf.add(env.getConfiguration());
820 
821     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
822       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
823 
824     cellFeaturesEnabled = HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS;
825     if (!cellFeaturesEnabled) {
826       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
827           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
828           + " accordingly.");
829     }
830 
831     ZooKeeperWatcher zk = null;
832     if (env instanceof MasterCoprocessorEnvironment) {
833       // if running on HMaster
834       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
835       zk = mEnv.getMasterServices().getZooKeeper();
836     } else if (env instanceof RegionServerCoprocessorEnvironment) {
837       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
838       zk = rsEnv.getRegionServerServices().getZooKeeper();
839     } else if (env instanceof RegionCoprocessorEnvironment) {
840       // if running at region
841       regionEnv = (RegionCoprocessorEnvironment) env;
842       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
843       zk = regionEnv.getRegionServerServices().getZooKeeper();
844       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
845         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
846     }
847 
848     // set the user-provider.
849     this.userProvider = UserProvider.instantiate(env.getConfiguration());
850 
851     // set up the list of users with superuser privilege
852     User user = userProvider.getCurrent();
853     superusers = Lists.asList(user.getShortName(),
854       conf.getStrings(AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
855 
856     // If zk is null or IOException while obtaining auth manager,
857     // throw RuntimeException so that the coprocessor is unloaded.
858     if (zk != null) {
859       try {
860         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
861       } catch (IOException ioe) {
862         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
863       }
864     } else {
865       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
866     }
867   }
868 
869   public void stop(CoprocessorEnvironment env) {
870 
871   }
872 
873   @Override
874   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
875       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
876     Set<byte[]> families = desc.getFamiliesKeys();
877     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
878     for (byte[] family: families) {
879       familyMap.put(family, null);
880     }
881     requireGlobalPermission("createTable", Action.CREATE, desc.getTableName(), familyMap);
882   }
883 
884   @Override
885   public void postCreateTableHandler(final ObserverContext<MasterCoprocessorEnvironment> c,
886       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
887     // When AC is used, it should be configured as the 1st CP.
888     // In Master, the table operations like create, are handled by a Thread pool but the max size
889     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
890     // sequentially only.
891     // Related code in HMaster#startServiceThreads
892     // {code}
893     //   // We depend on there being only one instance of this executor running
894     //   // at a time. To do concurrency, would need fencing of enable/disable of
895     //   // tables.
896     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
897     // {code}
898     // In future if we change this pool to have more threads, then there is a chance for thread,
899     // creating acl table, getting delayed and by that time another table creation got over and
900     // this hook is getting called. In such a case, we will need a wait logic here which will
901     // wait till the acl table is created.
902     if (AccessControlLists.isAclTable(desc)) {
903       this.aclTabAvailable = true;
904     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
905       if (!aclTabAvailable) {
906         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
907             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
908             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
909       } else {
910         String owner = desc.getOwnerString();
911         // default the table owner to current user, if not specified.
912         if (owner == null)
913           owner = getActiveUser().getShortName();
914         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
915             desc.getTableName(), null, Action.values());
916         // switch to the real hbase master user for doing the RPC on the ACL table
917         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
918           @Override
919           public Void run() throws Exception {
920             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
921                 userperm);
922             return null;
923           }
924         });
925       }
926     }
927   }
928 
929   @Override
930   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
931       throws IOException {
932     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
933   }
934 
935   @Override
936   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
937       TableName tableName) throws IOException {
938     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(), tableName);
939   }
940 
941   @Override
942   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
943       throws IOException {
944     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
945   }
946 
947   @Override
948   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
949       HTableDescriptor htd) throws IOException {
950     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
951   }
952 
953   @Override
954   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
955       TableName tableName, HTableDescriptor htd) throws IOException {
956     String owner = htd.getOwnerString();
957     // default the table owner to current user, if not specified.
958     if (owner == null) owner = getActiveUser().getShortName();
959     UserPermission userperm = new UserPermission(Bytes.toBytes(owner), htd.getTableName(), null,
960         Action.values());
961     AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(), userperm);
962   }
963 
964   @Override
965   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
966       HColumnDescriptor column) throws IOException {
967     requirePermission("addColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
968   }
969 
970   @Override
971   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
972       HColumnDescriptor descriptor) throws IOException {
973     requirePermission("modifyColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
974   }
975 
976   @Override
977   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
978       byte[] col) throws IOException {
979     requirePermission("deleteColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
980   }
981 
982   @Override
983   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
984       TableName tableName, byte[] col) throws IOException {
985     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(), tableName, col);
986   }
987 
988   @Override
989   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
990       throws IOException {
991     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
992   }
993 
994   @Override
995   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
996       throws IOException {
997     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
998       throw new AccessDeniedException("Not allowed to disable "
999           + AccessControlLists.ACL_TABLE_NAME + " table.");
1000     }
1001     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1002   }
1003 
1004   @Override
1005   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1006       ServerName srcServer, ServerName destServer) throws IOException {
1007     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1008   }
1009 
1010   @Override
1011   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1012       throws IOException {
1013     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1014   }
1015 
1016   @Override
1017   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1018       boolean force) throws IOException {
1019     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1020   }
1021 
1022   @Override
1023   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1024       HRegionInfo regionInfo) throws IOException {
1025     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1026   }
1027 
1028   @Override
1029   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1030       throws IOException {
1031     requirePermission("balance", Action.ADMIN);
1032   }
1033 
1034   @Override
1035   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1036       boolean newValue) throws IOException {
1037     requirePermission("balanceSwitch", Action.ADMIN);
1038     return newValue;
1039   }
1040 
1041   @Override
1042   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1043       throws IOException {
1044     requirePermission("shutdown", Action.ADMIN);
1045   }
1046 
1047   @Override
1048   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1049       throws IOException {
1050     requirePermission("stopMaster", Action.ADMIN);
1051   }
1052 
1053   @Override
1054   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1055       throws IOException {
1056     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1057       .getShortCircuitConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1058       // initialize the ACL storage table
1059       AccessControlLists.init(ctx.getEnvironment().getMasterServices());
1060     } else {
1061       aclTabAvailable = true;
1062     }
1063   }
1064 
1065   @Override
1066   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1067       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1068       throws IOException {
1069     requirePermission("snapshot", hTableDescriptor.getTableName(), null, null,
1070       Permission.Action.ADMIN);
1071   }
1072 
1073   @Override
1074   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1075       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1076       throws IOException {
1077     requirePermission("clone", Action.ADMIN);
1078   }
1079 
1080   @Override
1081   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1082       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1083       throws IOException {
1084     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1085       requirePermission("restoreSnapshot", hTableDescriptor.getTableName(), null, null,
1086         Permission.Action.ADMIN);
1087     } else {
1088       requirePermission("restore", Action.ADMIN);
1089     }
1090   }
1091 
1092   @Override
1093   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1094       final SnapshotDescription snapshot) throws IOException {
1095     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1096       // Snapshot owner is allowed to delete the snapshot
1097     } else {
1098       requirePermission("deleteSnapshot", Action.ADMIN);
1099     }
1100   }
1101 
1102   @Override
1103   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1104       NamespaceDescriptor ns) throws IOException {
1105     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1106   }
1107 
1108   @Override
1109   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1110       throws IOException {
1111     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1112   }
1113 
1114   @Override
1115   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1116                                   String namespace) throws IOException {
1117     AccessControlLists.removeNamespacePermissions(ctx.getEnvironment().getConfiguration(),
1118         namespace);
1119     LOG.info(namespace + "entry deleted in "+AccessControlLists.ACL_TABLE_NAME+" table.");
1120   }
1121 
1122   @Override
1123   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1124       NamespaceDescriptor ns) throws IOException {
1125     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1126   }
1127 
1128   @Override
1129   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1130       final TableName tableName) throws IOException {
1131     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1132   }
1133 
1134   /* ---- RegionObserver implementation ---- */
1135 
1136   @Override
1137   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1138       throws IOException {
1139     RegionCoprocessorEnvironment env = e.getEnvironment();
1140     final HRegion region = env.getRegion();
1141     if (region == null) {
1142       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1143     } else {
1144       HRegionInfo regionInfo = region.getRegionInfo();
1145       if (regionInfo.getTable().isSystemTable()) {
1146         isSystemOrSuperUser(regionEnv.getConfiguration());
1147       } else {
1148         requirePermission("preOpen", Action.ADMIN);
1149       }
1150     }
1151   }
1152 
1153   @Override
1154   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1155     RegionCoprocessorEnvironment env = c.getEnvironment();
1156     final HRegion region = env.getRegion();
1157     if (region == null) {
1158       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1159       return;
1160     }
1161     if (AccessControlLists.isAclRegion(region)) {
1162       aclRegion = true;
1163       // When this region is under recovering state, initialize will be handled by postLogReplay
1164       if (!region.isRecovering()) {
1165         try {
1166           initialize(env);
1167         } catch (IOException ex) {
1168           // if we can't obtain permissions, it's better to fail
1169           // than perform checks incorrectly
1170           throw new RuntimeException("Failed to initialize permissions cache", ex);
1171         }
1172       }
1173     } else {
1174       initialized = true;
1175     }
1176   }
1177 
1178   @Override
1179   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1180     if (aclRegion) {
1181       try {
1182         initialize(c.getEnvironment());
1183       } catch (IOException ex) {
1184         // if we can't obtain permissions, it's better to fail
1185         // than perform checks incorrectly
1186         throw new RuntimeException("Failed to initialize permissions cache", ex);
1187       }
1188     }
1189   }
1190 
1191   @Override
1192   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1193     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1194         Action.CREATE);
1195   }
1196 
1197   @Override
1198   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1199     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1200   }
1201 
1202   @Override
1203   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1204       byte[] splitRow) throws IOException {
1205     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1206   }
1207 
1208   @Override
1209   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1210       final Store store, final InternalScanner scanner, final ScanType scanType)
1211           throws IOException {
1212     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1213         Action.CREATE);
1214     return scanner;
1215   }
1216 
1217   @Override
1218   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1219       final byte [] row, final byte [] family, final Result result)
1220       throws IOException {
1221     assert family != null;
1222     RegionCoprocessorEnvironment env = c.getEnvironment();
1223     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, null);
1224     User user = getActiveUser();
1225     AuthResult authResult = permissionGranted(OpType.GET_CLOSEST_ROW_BEFORE, user, env, families,
1226       Action.READ);
1227     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1228       authResult.setAllowed(checkCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, env, row,
1229         families, HConstants.LATEST_TIMESTAMP, Action.READ));
1230       authResult.setReason("Covering cell set");
1231     }
1232     logResult(authResult);
1233     if (!authResult.isAllowed()) {
1234       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1235     }
1236   }
1237 
1238   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1239       final Query query, OpType opType) throws IOException {
1240     Filter filter = query.getFilter();
1241     // Don't wrap an AccessControlFilter
1242     if (filter != null && filter instanceof AccessControlFilter) {
1243       return;
1244     }
1245     User user = getActiveUser();
1246     RegionCoprocessorEnvironment env = c.getEnvironment();
1247     Map<byte[],? extends Collection<byte[]>> families = null;
1248     switch (opType) {
1249     case GET:
1250     case EXISTS:
1251       families = ((Get)query).getFamilyMap();
1252       break;
1253     case SCAN:
1254       families = ((Scan)query).getFamilyMap();
1255       break;
1256     default:
1257       throw new RuntimeException("Unhandled operation " + opType);
1258     }
1259     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1260     HRegion region = getRegion(env);
1261     TableName table = getTableName(region);
1262     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1263     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1264       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1265     }
1266     if (!authResult.isAllowed()) {
1267       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1268         // Old behavior: Scan with only qualifier checks if we have partial
1269         // permission. Backwards compatible behavior is to throw an
1270         // AccessDeniedException immediately if there are no grants for table
1271         // or CF or CF+qual. Only proceed with an injected filter if there are
1272         // grants for qualifiers. Otherwise we will fall through below and log
1273         // the result and throw an ADE. We may end up checking qualifier
1274         // grants three times (permissionGranted above, here, and in the
1275         // filter) but that's the price of backwards compatibility.
1276         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1277           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1278             AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1279             cfVsMaxVersions);
1280           // wrap any existing filter
1281           if (filter != null) {
1282             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1283               Lists.newArrayList(ourFilter, filter));
1284           }
1285           authResult.setAllowed(true);;
1286           authResult.setReason("Access allowed with filter");
1287           switch (opType) {
1288           case GET:
1289           case EXISTS:
1290             ((Get)query).setFilter(ourFilter);
1291             break;
1292           case SCAN:
1293             ((Scan)query).setFilter(ourFilter);
1294             break;
1295           default:
1296             throw new RuntimeException("Unhandled operation " + opType);
1297           }
1298         }
1299       } else {
1300         // New behavior: Any access we might be granted is more fine-grained
1301         // than whole table or CF. Simply inject a filter and return what is
1302         // allowed. We will not throw an AccessDeniedException. This is a
1303         // behavioral change since 0.96.
1304         Filter ourFilter = new AccessControlFilter(authManager, user, table,
1305           AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1306         // wrap any existing filter
1307         if (filter != null) {
1308           ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1309             Lists.newArrayList(ourFilter, filter));
1310         }
1311         authResult.setAllowed(true);;
1312         authResult.setReason("Access allowed with filter");
1313         switch (opType) {
1314         case GET:
1315         case EXISTS:
1316           ((Get)query).setFilter(ourFilter);
1317           break;
1318         case SCAN:
1319           ((Scan)query).setFilter(ourFilter);
1320           break;
1321         default:
1322           throw new RuntimeException("Unhandled operation " + opType);
1323         }
1324       }
1325     }
1326 
1327     logResult(authResult);
1328     if (!authResult.isAllowed()) {
1329       throw new AccessDeniedException("Insufficient permissions (table=" + table +
1330         ", action=READ)");
1331     }
1332   }
1333 
1334   @Override
1335   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1336       final Get get, final List<Cell> result) throws IOException {
1337     internalPreRead(c, get, OpType.GET);
1338   }
1339 
1340   @Override
1341   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1342       final Get get, final boolean exists) throws IOException {
1343     internalPreRead(c, get, OpType.EXISTS);
1344     return exists;
1345   }
1346 
1347   @Override
1348   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1349       final Put put, final WALEdit edit, final Durability durability)
1350       throws IOException {
1351     // Require WRITE permission to the table, CF, or top visible value, if any.
1352     // NOTE: We don't need to check the permissions for any earlier Puts
1353     // because we treat the ACLs in each Put as timestamped like any other
1354     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1355     // change the ACL of any previous Put. This allows simple evolution of
1356     // security policy over time without requiring expensive updates.
1357     User user = getActiveUser();
1358     checkForReservedTagPresence(user, put);
1359     RegionCoprocessorEnvironment env = c.getEnvironment();
1360     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1361     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1362     logResult(authResult);
1363     if (!authResult.isAllowed()) {
1364       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1365         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1366       } else {
1367         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1368       }
1369     }
1370     // Add cell ACLs from the operation to the cells themselves
1371     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1372     if (bytes != null) {
1373       if (cellFeaturesEnabled) {
1374         addCellPermissions(bytes, put.getFamilyCellMap());
1375       } else {
1376         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1377       }
1378     }
1379   }
1380 
1381   @Override
1382   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1383       final Put put, final WALEdit edit, final Durability durability) {
1384     if (aclRegion) {
1385       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1386     }
1387   }
1388 
1389   @Override
1390   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1391       final Delete delete, final WALEdit edit, final Durability durability)
1392       throws IOException {
1393     // An ACL on a delete is useless, we shouldn't allow it
1394     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1395       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1396     }
1397     // Require WRITE permissions on all cells covered by the delete. Unlike
1398     // for Puts we need to check all visible prior versions, because a major
1399     // compaction could remove them. If the user doesn't have permission to
1400     // overwrite any of the visible versions ('visible' defined as not covered
1401     // by a tombstone already) then we have to disallow this operation.
1402     RegionCoprocessorEnvironment env = c.getEnvironment();
1403     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1404     User user = getActiveUser();
1405     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1406     logResult(authResult);
1407     if (!authResult.isAllowed()) {
1408       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1409         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1410       } else {
1411         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1412       }
1413     }
1414   }
1415 
1416   @Override
1417   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1418       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1419     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1420       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1421       for (int i = 0; i < miniBatchOp.size(); i++) {
1422         Mutation m = miniBatchOp.getOperation(i);
1423         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1424           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1425           // perm check
1426           OpType opType;
1427           if (m instanceof Put) {
1428             checkForReservedTagPresence(getActiveUser(), m);
1429             opType = OpType.PUT;
1430           } else {
1431             opType = OpType.DELETE;
1432           }
1433           AuthResult authResult = null;
1434           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(), m.getFamilyCellMap(),
1435               m.getTimeStamp(), Action.WRITE)) {
1436             authResult = AuthResult.allow(opType.toString(), "Covering cell set", getActiveUser(),
1437                 Action.WRITE, table, m.getFamilyCellMap());
1438           } else {
1439             authResult = AuthResult.deny(opType.toString(), "Covering cell set", getActiveUser(),
1440                 Action.WRITE, table, m.getFamilyCellMap());
1441           }
1442           logResult(authResult);
1443           if (!authResult.isAllowed()) {
1444             throw new AccessDeniedException("Insufficient permissions "
1445                 + authResult.toContextString());
1446           }
1447         }
1448       }
1449     }
1450   }
1451 
1452   @Override
1453   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1454       final Delete delete, final WALEdit edit, final Durability durability)
1455       throws IOException {
1456     if (aclRegion) {
1457       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1458     }
1459   }
1460 
1461   @Override
1462   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1463       final byte [] row, final byte [] family, final byte [] qualifier,
1464       final CompareFilter.CompareOp compareOp,
1465       final ByteArrayComparable comparator, final Put put,
1466       final boolean result) throws IOException {
1467     // Require READ and WRITE permissions on the table, CF, and KV to update
1468     User user = getActiveUser();
1469     checkForReservedTagPresence(user, put);
1470     RegionCoprocessorEnvironment env = c.getEnvironment();
1471     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1472     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1473       Action.READ, Action.WRITE);
1474     logResult(authResult);
1475     if (!authResult.isAllowed()) {
1476       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1477         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1478       } else {
1479         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1480       }
1481     }
1482     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1483     if (bytes != null) {
1484       if (cellFeaturesEnabled) {
1485         addCellPermissions(bytes, put.getFamilyCellMap());
1486       } else {
1487         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1488       }
1489     }
1490     return result;
1491   }
1492 
1493   @Override
1494   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1495       final byte[] row, final byte[] family, final byte[] qualifier,
1496       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1497       final boolean result) throws IOException {
1498     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1499       // We had failure with table, cf and q perm checks and now giving a chance for cell
1500       // perm check
1501       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1502       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1503       AuthResult authResult = null;
1504       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1505           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1506         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1507             getActiveUser(), Action.READ, table, families);
1508       } else {
1509         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1510             getActiveUser(), Action.READ, table, families);
1511       }
1512       logResult(authResult);
1513       if (!authResult.isAllowed()) {
1514         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1515       }
1516     }
1517     return result;
1518   }
1519 
1520   @Override
1521   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1522       final byte [] row, final byte [] family, final byte [] qualifier,
1523       final CompareFilter.CompareOp compareOp,
1524       final ByteArrayComparable comparator, final Delete delete,
1525       final boolean result) throws IOException {
1526     // An ACL on a delete is useless, we shouldn't allow it
1527     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1528       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1529           delete.toString());
1530     }
1531     // Require READ and WRITE permissions on the table, CF, and the KV covered
1532     // by the delete
1533     RegionCoprocessorEnvironment env = c.getEnvironment();
1534     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1535     User user = getActiveUser();
1536     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1537       Action.READ, Action.WRITE);
1538     logResult(authResult);
1539     if (!authResult.isAllowed()) {
1540       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1541         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1542       } else {
1543         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1544       }
1545     }
1546     return result;
1547   }
1548 
1549   @Override
1550   public boolean preCheckAndDeleteAfterRowLock(
1551       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1552       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1553       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1554       throws IOException {
1555     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1556       // We had failure with table, cf and q perm checks and now giving a chance for cell
1557       // perm check
1558       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1559       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1560       AuthResult authResult = null;
1561       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1562           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1563         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1564             getActiveUser(), Action.READ, table, families);
1565       } else {
1566         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1567             getActiveUser(), Action.READ, table, families);
1568       }
1569       logResult(authResult);
1570       if (!authResult.isAllowed()) {
1571         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1572       }
1573     }
1574     return result;
1575   }
1576 
1577   @Override
1578   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1579       final byte [] row, final byte [] family, final byte [] qualifier,
1580       final long amount, final boolean writeToWAL)
1581       throws IOException {
1582     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1583     // incremented value
1584     RegionCoprocessorEnvironment env = c.getEnvironment();
1585     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1586     User user = getActiveUser();
1587     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1588       Action.WRITE);
1589     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1590       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1591         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1592       authResult.setReason("Covering cell set");
1593     }
1594     logResult(authResult);
1595     if (!authResult.isAllowed()) {
1596       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1597     }
1598     return -1;
1599   }
1600 
1601   @Override
1602   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1603       throws IOException {
1604     // Require WRITE permission to the table, CF, and the KV to be appended
1605     User user = getActiveUser();
1606     checkForReservedTagPresence(user, append);
1607     RegionCoprocessorEnvironment env = c.getEnvironment();
1608     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1609     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1610     logResult(authResult);
1611     if (!authResult.isAllowed()) {
1612       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1613         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1614       } else {
1615         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1616       }
1617     }
1618     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1619     if (bytes != null) {
1620       if (cellFeaturesEnabled) {
1621         addCellPermissions(bytes, append.getFamilyCellMap());
1622       } else {
1623         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1624       }
1625     }
1626     return null;
1627   }
1628 
1629   @Override
1630   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1631       final Append append) throws IOException {
1632     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1633       // We had failure with table, cf and q perm checks and now giving a chance for cell
1634       // perm check
1635       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1636       AuthResult authResult = null;
1637       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1638           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1639         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1640             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1641       } else {
1642         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1643             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1644       }
1645       logResult(authResult);
1646       if (!authResult.isAllowed()) {
1647         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1648       }
1649     }
1650     return null;
1651   }
1652 
1653   @Override
1654   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1655       final Increment increment)
1656       throws IOException {
1657     // Require WRITE permission to the table, CF, and the KV to be replaced by
1658     // the incremented value
1659     User user = getActiveUser();
1660     checkForReservedTagPresence(user, increment);
1661     RegionCoprocessorEnvironment env = c.getEnvironment();
1662     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1663     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1664       Action.WRITE);
1665     logResult(authResult);
1666     if (!authResult.isAllowed()) {
1667       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1668         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1669       } else {
1670         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1671       }
1672     }
1673     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1674     if (bytes != null) {
1675       if (cellFeaturesEnabled) {
1676         addCellPermissions(bytes, increment.getFamilyCellMap());
1677       } else {
1678         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1679       }
1680     }
1681     return null;
1682   }
1683 
1684   @Override
1685   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1686       final Increment increment) throws IOException {
1687     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1688       // We had failure with table, cf and q perm checks and now giving a chance for cell
1689       // perm check
1690       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1691       AuthResult authResult = null;
1692       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1693           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1694         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1695             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1696       } else {
1697         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1698             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1699       }
1700       logResult(authResult);
1701       if (!authResult.isAllowed()) {
1702         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1703       }
1704     }
1705     return null;
1706   }
1707 
1708   @Override
1709   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1710       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1711     // If the HFile version is insufficient to persist tags, we won't have any
1712     // work to do here
1713     if (!cellFeaturesEnabled) {
1714       return newCell;
1715     }
1716 
1717     // Collect any ACLs from the old cell
1718     List<Tag> tags = Lists.newArrayList();
1719     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1720     if (oldCell != null) {
1721       // Save an object allocation where we can
1722       if (oldCell.getTagsLength() > 0) {
1723         Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell.getTagsArray(),
1724           oldCell.getTagsOffset(), oldCell.getTagsLength());
1725         while (tagIterator.hasNext()) {
1726           Tag tag = tagIterator.next();
1727           if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
1728             // Not an ACL tag, just carry it through
1729             if (LOG.isTraceEnabled()) {
1730               LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
1731                 " length " + tag.getTagLength());
1732             }
1733             tags.add(tag);
1734           } else {
1735             // Merge the perms from the older ACL into the current permission set
1736             ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
1737               AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
1738                 tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
1739             perms.putAll(kvPerms);
1740           }
1741         }
1742       }
1743     }
1744 
1745     // Do we have an ACL on the operation?
1746     byte[] aclBytes = mutation.getACL();
1747     if (aclBytes != null) {
1748       // Yes, use it
1749       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
1750     } else {
1751       // No, use what we carried forward
1752       if (perms != null) {
1753         // TODO: If we collected ACLs from more than one tag we may have a
1754         // List<Permission> of size > 1, this can be collapsed into a single
1755         // Permission
1756         if (LOG.isTraceEnabled()) {
1757           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1758         }
1759         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
1760           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
1761       }
1762     }
1763 
1764     // If we have no tags to add, just return
1765     if (tags.isEmpty()) {
1766       return newCell;
1767     }
1768 
1769     // We need to create another KV, unfortunately, because the current new KV
1770     // has no space for tags
1771     KeyValue newKv = KeyValueUtil.ensureKeyValue(newCell);
1772     KeyValue rewriteKv = new KeyValue(newKv.getRowArray(), newKv.getRowOffset(), newKv.getRowLength(),
1773       newKv.getFamilyArray(), newKv.getFamilyOffset(), newKv.getFamilyLength(),
1774       newKv.getQualifierArray(), newKv.getQualifierOffset(), newKv.getQualifierLength(),
1775       newKv.getTimestamp(), KeyValue.Type.codeToType(newKv.getTypeByte()),
1776       newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(),
1777       tags);
1778     // Preserve mvcc data
1779     rewriteKv.setSequenceId(newKv.getMvccVersion());
1780     return rewriteKv;
1781   }
1782 
1783   @Override
1784   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1785       final Scan scan, final RegionScanner s) throws IOException {
1786     internalPreRead(c, scan, OpType.SCAN);
1787     return s;
1788   }
1789 
1790   @Override
1791   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1792       final Scan scan, final RegionScanner s) throws IOException {
1793     User user = getActiveUser();
1794     if (user != null && user.getShortName() != null) {      // store reference to scanner owner for later checks
1795       scannerOwners.put(s, user.getShortName());
1796     }
1797     return s;
1798   }
1799 
1800   @Override
1801   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1802       final InternalScanner s, final List<Result> result,
1803       final int limit, final boolean hasNext) throws IOException {
1804     requireScannerOwner(s);
1805     return hasNext;
1806   }
1807 
1808   @Override
1809   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1810       final InternalScanner s) throws IOException {
1811     requireScannerOwner(s);
1812   }
1813 
1814   @Override
1815   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1816       final InternalScanner s) throws IOException {
1817     // clean up any associated owner mapping
1818     scannerOwners.remove(s);
1819   }
1820 
1821   /**
1822    * Verify, when servicing an RPC, that the caller is the scanner owner.
1823    * If so, we assume that access control is correctly enforced based on
1824    * the checks performed in preScannerOpen()
1825    */
1826   private void requireScannerOwner(InternalScanner s)
1827       throws AccessDeniedException {
1828     if (RequestContext.isInRequestContext()) {
1829       String requestUserName = RequestContext.getRequestUserName();
1830       String owner = scannerOwners.get(s);
1831       if (owner != null && !owner.equals(requestUserName)) {
1832         throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
1833       }
1834     }
1835   }
1836 
1837   /**
1838    * Verifies user has WRITE privileges on
1839    * the Column Families involved in the bulkLoadHFile
1840    * request. Specific Column Write privileges are presently
1841    * ignored.
1842    */
1843   @Override
1844   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1845       List<Pair<byte[], String>> familyPaths) throws IOException {
1846     for(Pair<byte[],String> el : familyPaths) {
1847       requirePermission("preBulkLoadHFile",
1848           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
1849           el.getFirst(),
1850           null,
1851           Action.CREATE);
1852     }
1853   }
1854 
1855   private AuthResult hasSomeAccess(RegionCoprocessorEnvironment e, String method, Action action) throws IOException {
1856     User requestUser = getActiveUser();
1857     TableName tableName = e.getRegion().getTableDesc().getTableName();
1858     AuthResult authResult = permissionGranted(method, requestUser,
1859         action, e, Collections.EMPTY_MAP);
1860     if (!authResult.isAllowed()) {
1861       for(UserPermission userPerm:
1862           AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), tableName)) {
1863         for(Action userAction: userPerm.getActions()) {
1864           if(userAction.equals(action)) {
1865             return AuthResult.allow(method, "Access allowed", requestUser,
1866                 action, tableName, null, null);
1867           }
1868         }
1869       }
1870     }
1871     return authResult;
1872   }
1873 
1874   /**
1875    * Authorization check for
1876    * SecureBulkLoadProtocol.prepareBulkLoad()
1877    * @param e
1878    * @throws IOException
1879    */
1880   //TODO this should end up as a coprocessor hook
1881   public void prePrepareBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
1882     AuthResult authResult = hasSomeAccess(e, "prePrepareBulkLoad", Action.WRITE);
1883     logResult(authResult);
1884     if (!authResult.isAllowed()) {
1885       throw new AccessDeniedException("Insufficient permissions (table=" +
1886         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
1887     }
1888   }
1889 
1890   /**
1891    * Authorization security check for
1892    * SecureBulkLoadProtocol.cleanupBulkLoad()
1893    * @param e
1894    * @throws IOException
1895    */
1896   //TODO this should end up as a coprocessor hook
1897   public void preCleanupBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
1898     AuthResult authResult = hasSomeAccess(e, "preCleanupBulkLoad", Action.WRITE);
1899     logResult(authResult);
1900     if (!authResult.isAllowed()) {
1901       throw new AccessDeniedException("Insufficient permissions (table=" +
1902         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
1903     }
1904   }
1905 
1906   /* ---- EndpointObserver implementation ---- */
1907 
1908   @Override
1909   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1910       Service service, String methodName, Message request) throws IOException {
1911     // Don't intercept calls to our own AccessControlService, we check for
1912     // appropriate permissions in the service handlers
1913     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
1914       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
1915         methodName + ")",
1916         getTableName(ctx.getEnvironment()), null, null,
1917         Action.EXEC);
1918     }
1919     return request;
1920   }
1921 
1922   @Override
1923   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1924       Service service, String methodName, Message request, Message.Builder responseBuilder)
1925       throws IOException { }
1926 
1927   /* ---- Protobuf AccessControlService implementation ---- */
1928 
1929   @Override
1930   public void grant(RpcController controller,
1931                     AccessControlProtos.GrantRequest request,
1932                     RpcCallback<AccessControlProtos.GrantResponse> done) {
1933     UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
1934     AccessControlProtos.GrantResponse response = null;
1935     try {
1936       // verify it's only running at .acl.
1937       if (aclRegion) {
1938         if (!initialized) {
1939           throw new CoprocessorException("AccessController not yet initialized");
1940         }
1941         if (LOG.isDebugEnabled()) {
1942           LOG.debug("Received request to grant access permission " + perm.toString());
1943         }
1944 
1945         switch(request.getUserPermission().getPermission().getType()) {
1946           case Global :
1947           case Table :
1948             requirePermission("grant", perm.getTableName(), perm.getFamily(),
1949                 perm.getQualifier(), Action.ADMIN);
1950             break;
1951           case Namespace :
1952             requireGlobalPermission("grant", Action.ADMIN, perm.getNamespace());
1953         }
1954 
1955         AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
1956         if (AUDITLOG.isTraceEnabled()) {
1957           // audit log should store permission changes in addition to auth results
1958           AUDITLOG.trace("Granted permission " + perm.toString());
1959         }
1960       } else {
1961         throw new CoprocessorException(AccessController.class, "This method "
1962             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
1963       }
1964       response = AccessControlProtos.GrantResponse.getDefaultInstance();
1965     } catch (IOException ioe) {
1966       // pass exception back up
1967       ResponseConverter.setControllerException(controller, ioe);
1968     }
1969     done.run(response);
1970   }
1971 
1972   @Override
1973   public void revoke(RpcController controller,
1974                      AccessControlProtos.RevokeRequest request,
1975                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
1976     UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
1977     AccessControlProtos.RevokeResponse response = null;
1978     try {
1979       // only allowed to be called on _acl_ region
1980       if (aclRegion) {
1981         if (!initialized) {
1982           throw new CoprocessorException("AccessController not yet initialized");
1983         }
1984         if (LOG.isDebugEnabled()) {
1985           LOG.debug("Received request to revoke access permission " + perm.toString());
1986         }
1987 
1988         switch(request.getUserPermission().getPermission().getType()) {
1989           case Global :
1990           case Table :
1991             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
1992                               perm.getQualifier(), Action.ADMIN);
1993             break;
1994           case Namespace :
1995             requireGlobalPermission("revoke", Action.ADMIN, perm.getNamespace());
1996         }
1997 
1998         AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
1999         if (AUDITLOG.isTraceEnabled()) {
2000           // audit log should record all permission changes
2001           AUDITLOG.trace("Revoked permission " + perm.toString());
2002         }
2003       } else {
2004         throw new CoprocessorException(AccessController.class, "This method "
2005             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2006       }
2007       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2008     } catch (IOException ioe) {
2009       // pass exception back up
2010       ResponseConverter.setControllerException(controller, ioe);
2011     }
2012     done.run(response);
2013   }
2014 
2015   @Override
2016   public void getUserPermissions(RpcController controller,
2017                                  AccessControlProtos.GetUserPermissionsRequest request,
2018                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2019     AccessControlProtos.GetUserPermissionsResponse response = null;
2020     try {
2021       // only allowed to be called on _acl_ region
2022       if (aclRegion) {
2023         if (!initialized) {
2024           throw new CoprocessorException("AccessController not yet initialized");
2025         }
2026         List<UserPermission> perms = null;
2027         if(request.getType() == AccessControlProtos.Permission.Type.Table) {
2028           TableName table = null;
2029           if (request.hasTableName()) {
2030             table = ProtobufUtil.toTableName(request.getTableName());
2031           }
2032           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2033 
2034           perms = AccessControlLists.getUserTablePermissions(
2035               regionEnv.getConfiguration(), table);
2036         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2037           perms = AccessControlLists.getUserNamespacePermissions(
2038               regionEnv.getConfiguration(), request.getNamespaceName().toStringUtf8());
2039         } else {
2040           perms = AccessControlLists.getUserPermissions(
2041               regionEnv.getConfiguration(), null);
2042         }
2043         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2044       } else {
2045         throw new CoprocessorException(AccessController.class, "This method "
2046             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2047       }
2048     } catch (IOException ioe) {
2049       // pass exception back up
2050       ResponseConverter.setControllerException(controller, ioe);
2051     }
2052     done.run(response);
2053   }
2054 
2055   @Override
2056   public void checkPermissions(RpcController controller,
2057                                AccessControlProtos.CheckPermissionsRequest request,
2058                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2059     Permission[] permissions = new Permission[request.getPermissionCount()];
2060     for (int i=0; i < request.getPermissionCount(); i++) {
2061       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2062     }
2063     AccessControlProtos.CheckPermissionsResponse response = null;
2064     try {
2065       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2066       for (Permission permission : permissions) {
2067         if (permission instanceof TablePermission) {
2068           TablePermission tperm = (TablePermission) permission;
2069           for (Action action : permission.getActions()) {
2070             if (!tperm.getTableName().equals(tableName)) {
2071               throw new CoprocessorException(AccessController.class, String.format("This method "
2072                   + "can only execute at the table specified in TablePermission. " +
2073                   "Table of the region:%s , requested table:%s", tableName,
2074                   tperm.getTableName()));
2075             }
2076 
2077             Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2078             if (tperm.getFamily() != null) {
2079               if (tperm.getQualifier() != null) {
2080                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2081                 qualifiers.add(tperm.getQualifier());
2082                 familyMap.put(tperm.getFamily(), qualifiers);
2083               } else {
2084                 familyMap.put(tperm.getFamily(), null);
2085               }
2086             }
2087 
2088             requirePermission("checkPermissions", action, regionEnv, familyMap);
2089           }
2090 
2091         } else {
2092           for (Action action : permission.getActions()) {
2093             requirePermission("checkPermissions", action);
2094           }
2095         }
2096       }
2097       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2098     } catch (IOException ioe) {
2099       ResponseConverter.setControllerException(controller, ioe);
2100     }
2101     done.run(response);
2102   }
2103 
2104   @Override
2105   public Service getService() {
2106     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2107   }
2108 
2109   private HRegion getRegion(RegionCoprocessorEnvironment e) {
2110     return e.getRegion();
2111   }
2112 
2113   private TableName getTableName(RegionCoprocessorEnvironment e) {
2114     HRegion region = e.getRegion();
2115     if (region != null) {
2116       return getTableName(region);
2117     }
2118     return null;
2119   }
2120 
2121   private TableName getTableName(HRegion region) {
2122     HRegionInfo regionInfo = region.getRegionInfo();
2123     if (regionInfo != null) {
2124       return regionInfo.getTable();
2125     }
2126     return null;
2127   }
2128 
2129   @Override
2130   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2131       throws IOException {
2132     requirePermission("preClose", Action.ADMIN);
2133   }
2134 
2135   private void isSystemOrSuperUser(Configuration conf) throws IOException {
2136     User user = userProvider.getCurrent();
2137     if (user == null) {
2138       throw new IOException("Unable to obtain the current user, " +
2139         "authorization checks for internal operations will not work correctly!");
2140     }
2141     User activeUser = getActiveUser();
2142     if (!(superusers.contains(activeUser.getShortName()))) {
2143       throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null") +
2144         "is not system or super user.");
2145     }
2146   }
2147 
2148   @Override
2149   public void preStopRegionServer(
2150       ObserverContext<RegionServerCoprocessorEnvironment> env)
2151       throws IOException {
2152     requirePermission("preStopRegionServer", Action.ADMIN);
2153   }
2154 
2155   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2156       byte[] qualifier) {
2157     if (family == null) {
2158       return null;
2159     }
2160 
2161     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2162     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2163     return familyMap;
2164   }
2165 
2166   @Override
2167   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2168       List<TableName> tableNamesList,
2169       List<HTableDescriptor> descriptors) throws IOException {
2170     // If the list is empty, this is a request for all table descriptors and requires GLOBAL
2171     // ADMIN privs.
2172     if (tableNamesList == null || tableNamesList.isEmpty()) {
2173       requireGlobalPermission("getTableDescriptors", Action.ADMIN, null, null);
2174     }
2175     // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2176     // request can be granted.
2177     else {
2178       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2179       for (TableName tableName: tableNamesList) {
2180         // Do not deny if the table does not exist
2181         try {
2182           masterServices.checkTableModifiable(tableName);
2183         } catch (TableNotFoundException ex) {
2184           // Skip checks for a table that does not exist
2185           continue;
2186         } catch (TableNotDisabledException ex) {
2187           // We don't care about this
2188         }
2189         requirePermission("getTableDescriptors", tableName, null, null,
2190           Action.ADMIN, Action.CREATE);
2191       }
2192     }
2193   }
2194 
2195   @Override
2196   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA,
2197       HRegion regionB) throws IOException {
2198     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2199       Action.ADMIN);
2200   }
2201 
2202   @Override
2203   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, HRegion regionA,
2204       HRegion regionB, HRegion mergedRegion) throws IOException { }
2205 
2206   @Override
2207   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2208       HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException { }
2209 
2210   @Override
2211   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2212       HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException { }
2213 
2214   @Override
2215   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2216       HRegion regionA, HRegion regionB) throws IOException { }
2217 
2218   @Override
2219   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2220       HRegion regionA, HRegion regionB) throws IOException { }
2221 
2222   @Override
2223   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2224       throws IOException {
2225     requirePermission("preRollLogWriterRequest", Permission.Action.ADMIN);
2226   }
2227 
2228   @Override
2229   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2230       throws IOException { }
2231 }