View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package org.apache.hadoop.hbase.security.access;
16  
17  import java.io.IOException;
18  import java.net.InetAddress;
19  import java.security.PrivilegedExceptionAction;
20  import java.util.Collection;
21  import java.util.Collections;
22  import java.util.HashMap;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  import java.util.Set;
28  import java.util.TreeMap;
29  import java.util.TreeSet;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Cell;
36  import org.apache.hadoop.hbase.CellScanner;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.CompoundConfiguration;
39  import org.apache.hadoop.hbase.CoprocessorEnvironment;
40  import org.apache.hadoop.hbase.DoNotRetryIOException;
41  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.KeyValue;
47  import org.apache.hadoop.hbase.KeyValue.Type;
48  import org.apache.hadoop.hbase.MetaTableAccessor;
49  import org.apache.hadoop.hbase.NamespaceDescriptor;
50  import org.apache.hadoop.hbase.ServerName;
51  import org.apache.hadoop.hbase.TableName;
52  import org.apache.hadoop.hbase.Tag;
53  import org.apache.hadoop.hbase.TagRewriteCell;
54  import org.apache.hadoop.hbase.client.Append;
55  import org.apache.hadoop.hbase.client.Delete;
56  import org.apache.hadoop.hbase.client.Durability;
57  import org.apache.hadoop.hbase.client.Get;
58  import org.apache.hadoop.hbase.client.Increment;
59  import org.apache.hadoop.hbase.client.Mutation;
60  import org.apache.hadoop.hbase.client.Put;
61  import org.apache.hadoop.hbase.client.Query;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.client.Scan;
64  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
65  import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
66  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
67  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
68  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
69  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
70  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
71  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
72  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
73  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
74  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
75  import org.apache.hadoop.hbase.filter.CompareFilter;
76  import org.apache.hadoop.hbase.filter.Filter;
77  import org.apache.hadoop.hbase.filter.FilterList;
78  import org.apache.hadoop.hbase.io.hfile.HFile;
79  import org.apache.hadoop.hbase.ipc.RequestContext;
80  import org.apache.hadoop.hbase.master.MasterServices;
81  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
82  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
83  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
84  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
85  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
86  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
87  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
88  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
89  import org.apache.hadoop.hbase.regionserver.HRegion;
90  import org.apache.hadoop.hbase.regionserver.InternalScanner;
91  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
92  import org.apache.hadoop.hbase.regionserver.RegionScanner;
93  import org.apache.hadoop.hbase.regionserver.ScanType;
94  import org.apache.hadoop.hbase.regionserver.Store;
95  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
96  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
97  import org.apache.hadoop.hbase.security.AccessDeniedException;
98  import org.apache.hadoop.hbase.security.User;
99  import org.apache.hadoop.hbase.security.UserProvider;
100 import org.apache.hadoop.hbase.security.access.Permission.Action;
101 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
102 import org.apache.hadoop.hbase.util.ByteRange;
103 import org.apache.hadoop.hbase.util.Bytes;
104 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
105 import org.apache.hadoop.hbase.util.Pair;
106 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
107 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
108 
109 import com.google.common.collect.ArrayListMultimap;
110 import com.google.common.collect.ImmutableSet;
111 import com.google.common.collect.ListMultimap;
112 import com.google.common.collect.Lists;
113 import com.google.common.collect.MapMaker;
114 import com.google.common.collect.Maps;
115 import com.google.common.collect.Sets;
116 import com.google.protobuf.Message;
117 import com.google.protobuf.RpcCallback;
118 import com.google.protobuf.RpcController;
119 import com.google.protobuf.Service;
120 
121 /**
122  * Provides basic authorization checks for data access and administrative
123  * operations.
124  *
125  * <p>
126  * {@code AccessController} performs authorization checks for HBase operations
127  * based on:
128  * <ul>
129  *   <li>the identity of the user performing the operation</li>
130  *   <li>the scope over which the operation is performed, in increasing
131  *   specificity: global, table, column family, or qualifier</li>
132  *   <li>the type of action being performed (as mapped to
133  *   {@link Permission.Action} values)</li>
134  * </ul>
135  * If the authorization check fails, an {@link AccessDeniedException}
136  * will be thrown for the operation.
137  * </p>
138  *
139  * <p>
140  * To perform authorization checks, {@code AccessController} relies on the
141  * RpcServerEngine being loaded to provide
142  * the user identities for remote requests.
143  * </p>
144  *
145  * <p>
146  * The access control lists used for authorization can be manipulated via the
147  * exposed {@link AccessControlService} Interface implementation, and the associated
148  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
149  * commands.
150  * </p>
151  */
152 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
153 public class AccessController extends BaseMasterAndRegionObserver
154     implements RegionServerObserver,
155       AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
156 
157   public static final Log LOG = LogFactory.getLog(AccessController.class);
158 
159   private static final Log AUDITLOG =
160     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
161   private static final String CHECK_COVERING_PERM = "check_covering_perm";
162   private static final String TAG_CHECK_PASSED = "tag_check_passed";
163   private static final byte[] TRUE = Bytes.toBytes(true);
164 
165   TableAuthManager authManager = null;
166 
167   // flags if we are running on a region of the _acl_ table
168   boolean aclRegion = false;
169 
170   // defined only for Endpoint implementation, so it can have way to
171   // access region services.
172   private RegionCoprocessorEnvironment regionEnv;
173 
174   /** Mapping of scanner instances to the user who created them */
175   private Map<InternalScanner,String> scannerOwners =
176       new MapMaker().weakKeys().makeMap();
177 
178   private Map<TableName, List<UserPermission>> tableAcls;
179 
180   // Provider for mapping principal names to Users
181   private UserProvider userProvider;
182 
183   // The list of users with superuser authority
184   private List<String> superusers;
185 
186   // if we are able to support cell ACLs
187   boolean cellFeaturesEnabled;
188 
189   // if we should check EXEC permissions
190   boolean shouldCheckExecPermission;
191 
192   // if we should terminate access checks early as soon as table or CF grants
193   // allow access; pre-0.98 compatible behavior
194   boolean compatibleEarlyTermination;
195 
196   private volatile boolean initialized = false;
197 
198   // This boolean having relevance only in the Master.
199   private volatile boolean aclTabAvailable = false;
200 
201   public HRegion getRegion() {
202     return regionEnv != null ? regionEnv.getRegion() : null;
203   }
204 
205   public TableAuthManager getAuthManager() {
206     return authManager;
207   }
208 
209   void initialize(RegionCoprocessorEnvironment e) throws IOException {
210     final HRegion region = e.getRegion();
211     Configuration conf = e.getConfiguration();
212     Map<byte[], ListMultimap<String,TablePermission>> tables =
213         AccessControlLists.loadAll(region);
214     // For each table, write out the table's permissions to the respective
215     // znode for that table.
216     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
217       tables.entrySet()) {
218       byte[] entry = t.getKey();
219       ListMultimap<String,TablePermission> perms = t.getValue();
220       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
221       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
222     }
223     initialized = true;
224   }
225 
226   /**
227    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
228    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
229    * table updates.
230    */
231   void updateACL(RegionCoprocessorEnvironment e,
232       final Map<byte[], List<Cell>> familyMap) {
233     Set<byte[]> entries =
234         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
235     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
236       List<Cell> cells = f.getValue();
237       for (Cell cell: cells) {
238         if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
239             cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
240             AccessControlLists.ACL_LIST_FAMILY.length)) {
241           entries.add(CellUtil.cloneRow(cell));
242         }
243       }
244     }
245     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
246     Configuration conf = regionEnv.getConfiguration();
247     for (byte[] entry: entries) {
248       try {
249         ListMultimap<String,TablePermission> perms =
250           AccessControlLists.getPermissions(conf, entry);
251         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
252         zkw.writeToZookeeper(entry, serialized);
253       } catch (IOException ex) {
254         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
255             ex);
256       }
257     }
258   }
259 
260   /**
261    * Check the current user for authorization to perform a specific action
262    * against the given set of row data.
263    *
264    * <p>Note: Ordering of the authorization checks
265    * has been carefully optimized to short-circuit the most common requests
266    * and minimize the amount of processing required.</p>
267    *
268    * @param permRequest the action being requested
269    * @param e the coprocessor environment
270    * @param families the map of column families to qualifiers present in
271    * the request
272    * @return an authorization result
273    */
274   AuthResult permissionGranted(String request, User user, Action permRequest,
275       RegionCoprocessorEnvironment e,
276       Map<byte [], ? extends Collection<?>> families) {
277     HRegionInfo hri = e.getRegion().getRegionInfo();
278     TableName tableName = hri.getTable();
279 
280     // 1. All users need read access to hbase:meta table.
281     // this is a very common operation, so deal with it quickly.
282     if (hri.isMetaRegion()) {
283       if (permRequest == Action.READ) {
284         return AuthResult.allow(request, "All users allowed", user,
285           permRequest, tableName, families);
286       }
287     }
288 
289     if (user == null) {
290       return AuthResult.deny(request, "No user associated with request!", null,
291         permRequest, tableName, families);
292     }
293 
294     // 2. check for the table-level, if successful we can short-circuit
295     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
296       return AuthResult.allow(request, "Table permission granted", user,
297         permRequest, tableName, families);
298     }
299 
300     // 3. check permissions against the requested families
301     if (families != null && families.size() > 0) {
302       // all families must pass
303       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
304         // a) check for family level access
305         if (authManager.authorize(user, tableName, family.getKey(),
306             permRequest)) {
307           continue;  // family-level permission overrides per-qualifier
308         }
309 
310         // b) qualifier level access can still succeed
311         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
312           if (family.getValue() instanceof Set) {
313             // for each qualifier of the family
314             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
315             for (byte[] qualifier : familySet) {
316               if (!authManager.authorize(user, tableName, family.getKey(),
317                                          qualifier, permRequest)) {
318                 return AuthResult.deny(request, "Failed qualifier check", user,
319                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
320               }
321             }
322           } else if (family.getValue() instanceof List) { // List<KeyValue>
323             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
324             for (KeyValue kv : kvList) {
325               if (!authManager.authorize(user, tableName, family.getKey(),
326                       kv.getQualifier(), permRequest)) {
327                 return AuthResult.deny(request, "Failed qualifier check", user,
328                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
329               }
330             }
331           }
332         } else {
333           // no qualifiers and family-level check already failed
334           return AuthResult.deny(request, "Failed family check", user, permRequest,
335               tableName, makeFamilyMap(family.getKey(), null));
336         }
337       }
338 
339       // all family checks passed
340       return AuthResult.allow(request, "All family checks passed", user, permRequest,
341           tableName, families);
342     }
343 
344     // 4. no families to check and table level access failed
345     return AuthResult.deny(request, "No families to check and table permission failed",
346         user, permRequest, tableName, families);
347   }
348 
349   /**
350    * Check the current user for authorization to perform a specific action
351    * against the given set of row data.
352    * @param opType the operation type
353    * @param user the user
354    * @param e the coprocessor environment
355    * @param families the map of column families to qualifiers present in
356    * the request
357    * @param actions the desired actions
358    * @return an authorization result
359    */
360   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
361       Map<byte [], ? extends Collection<?>> families, Action... actions) {
362     AuthResult result = null;
363     for (Action action: actions) {
364       result = permissionGranted(opType.toString(), user, action, e, families);
365       if (!result.isAllowed()) {
366         return result;
367       }
368     }
369     return result;
370   }
371 
372   private void logResult(AuthResult result) {
373     if (AUDITLOG.isTraceEnabled()) {
374       RequestContext ctx = RequestContext.get();
375       InetAddress remoteAddr = null;
376       if (ctx != null) {
377         remoteAddr = ctx.getRemoteAddress();
378       }
379       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
380           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
381           "; reason: " + result.getReason() +
382           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
383           "; request: " + result.getRequest() +
384           "; context: " + result.toContextString());
385     }
386   }
387 
388   /**
389    * Returns the active user to which authorization checks should be applied.
390    * If we are in the context of an RPC call, the remote user is used,
391    * otherwise the currently logged in user is used.
392    */
393   private User getActiveUser() throws IOException {
394     User user = RequestContext.getRequestUser();
395     if (!RequestContext.isInRequestContext()) {
396       // for non-rpc handling, fallback to system user
397       user = userProvider.getCurrent();
398     }
399     return user;
400   }
401 
402   /**
403    * Authorizes that the current user has any of the given permissions for the
404    * given table, column family and column qualifier.
405    * @param tableName Table requested
406    * @param family Column family requested
407    * @param qualifier Column qualifier requested
408    * @throws IOException if obtaining the current user fails
409    * @throws AccessDeniedException if user has no authorization
410    */
411   private void requirePermission(String request, TableName tableName, byte[] family, byte[] qualifier,
412       Action... permissions) throws IOException {
413     User user = getActiveUser();
414     AuthResult result = null;
415 
416     for (Action permission : permissions) {
417       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
418         result = AuthResult.allow(request, "Table permission granted", user,
419                                   permission, tableName, family, qualifier);
420         break;
421       } else {
422         // rest of the world
423         result = AuthResult.deny(request, "Insufficient permissions", user,
424                                  permission, tableName, family, qualifier);
425       }
426     }
427     logResult(result);
428     if (!result.isAllowed()) {
429       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
430     }
431   }
432 
433   /**
434    * Authorizes that the current user has any of the given permissions to access the table.
435    *
436    * @param tableName Table requested
437    * @param permissions Actions being requested
438    * @throws IOException if obtaining the current user fails
439    * @throws AccessDeniedException if user has no authorization
440    */
441   private void requireAccess(String request, TableName tableName,
442       Action... permissions) throws IOException {
443     User user = getActiveUser();
444     AuthResult result = null;
445 
446     for (Action permission : permissions) {
447       if (authManager.hasAccess(user, tableName, permission)) {
448         result = AuthResult.allow(request, "Table permission granted", user,
449                                   permission, tableName, null, null);
450         break;
451       } else {
452         // rest of the world
453         result = AuthResult.deny(request, "Insufficient permissions", user,
454                                  permission, tableName, null, null);
455       }
456     }
457     logResult(result);
458     if (!result.isAllowed()) {
459       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
460     }
461   }
462 
463   /**
464    * Authorizes that the current user has global privileges for the given action.
465    * @param perm The action being requested
466    * @throws IOException if obtaining the current user fails
467    * @throws AccessDeniedException if authorization is denied
468    */
469   private void requirePermission(String request, Action perm) throws IOException {
470     requireGlobalPermission(request, perm, null, null);
471   }
472 
473   /**
474    * Authorizes that the current user has permission to perform the given
475    * action on the set of table column families.
476    * @param perm Action that is required
477    * @param env The current coprocessor environment
478    * @param families The map of column families-qualifiers.
479    * @throws AccessDeniedException if the authorization check failed
480    */
481   private void requirePermission(String request, Action perm,
482         RegionCoprocessorEnvironment env,
483         Map<byte[], ? extends Collection<?>> families)
484       throws IOException {
485     User user = getActiveUser();
486     AuthResult result = permissionGranted(request, user, perm, env, families);
487     logResult(result);
488 
489     if (!result.isAllowed()) {
490       throw new AccessDeniedException("Insufficient permissions (table=" +
491         env.getRegion().getTableDesc().getTableName()+
492         ((families != null && families.size() > 0) ? ", family: " +
493         result.toFamilyString() : "") + ", action=" +
494         perm.toString() + ")");
495     }
496   }
497 
498   /**
499    * Checks that the user has the given global permission. The generated
500    * audit log message will contain context information for the operation
501    * being authorized, based on the given parameters.
502    * @param perm Action being requested
503    * @param tableName Affected table name.
504    * @param familyMap Affected column families.
505    */
506   private void requireGlobalPermission(String request, Action perm, TableName tableName,
507       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
508     User user = getActiveUser();
509     if (authManager.authorize(user, perm) || (tableName != null &&
510         authManager.authorize(user, tableName.getNamespaceAsString(), perm))) {
511       logResult(AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap));
512     } else {
513       logResult(AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap));
514       throw new AccessDeniedException("Insufficient permissions for user '" +
515           (user != null ? user.getShortName() : "null") +"' (global, action=" +
516           perm.toString() + ")");
517     }
518   }
519 
520   /**
521    * Checks that the user has the given global permission. The generated
522    * audit log message will contain context information for the operation
523    * being authorized, based on the given parameters.
524    * @param perm Action being requested
525    * @param namespace
526    */
527   private void requireGlobalPermission(String request, Action perm,
528                                        String namespace) throws IOException {
529     User user = getActiveUser();
530     if (authManager.authorize(user, perm)
531         || (namespace != null && authManager.authorize(user, namespace, perm))) {
532       logResult(AuthResult.allow(request, "Global check allowed", user, perm, namespace));
533     } else {
534       logResult(AuthResult.deny(request, "Global check failed", user, perm, namespace));
535       throw new AccessDeniedException("Insufficient permissions for user '" +
536           (user != null ? user.getShortName() : "null") +"' (global, action=" +
537           perm.toString() + ")");
538     }
539   }
540 
541   /**
542    * Returns <code>true</code> if the current user is allowed the given action
543    * over at least one of the column qualifiers in the given column families.
544    */
545   private boolean hasFamilyQualifierPermission(User user,
546       Action perm,
547       RegionCoprocessorEnvironment env,
548       Map<byte[], ? extends Collection<byte[]>> familyMap)
549     throws IOException {
550     HRegionInfo hri = env.getRegion().getRegionInfo();
551     TableName tableName = hri.getTable();
552 
553     if (user == null) {
554       return false;
555     }
556 
557     if (familyMap != null && familyMap.size() > 0) {
558       // at least one family must be allowed
559       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
560           familyMap.entrySet()) {
561         if (family.getValue() != null && !family.getValue().isEmpty()) {
562           for (byte[] qualifier : family.getValue()) {
563             if (authManager.matchPermission(user, tableName,
564                 family.getKey(), qualifier, perm)) {
565               return true;
566             }
567           }
568         } else {
569           if (authManager.matchPermission(user, tableName, family.getKey(),
570               perm)) {
571             return true;
572           }
573         }
574       }
575     } else if (LOG.isDebugEnabled()) {
576       LOG.debug("Empty family map passed for permission check");
577     }
578 
579     return false;
580   }
581 
582   private enum OpType {
583     GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
584     GET("get"),
585     EXISTS("exists"),
586     SCAN("scan"),
587     PUT("put"),
588     DELETE("delete"),
589     CHECK_AND_PUT("checkAndPut"),
590     CHECK_AND_DELETE("checkAndDelete"),
591     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
592     APPEND("append"),
593     INCREMENT("increment");
594 
595     private String type;
596 
597     private OpType(String type) {
598       this.type = type;
599     }
600 
601     @Override
602     public String toString() {
603       return type;
604     }
605   }
606 
607   /**
608    * Determine if cell ACLs covered by the operation grant access. This is expensive.
609    * @return false if cell ACLs failed to grant access, true otherwise
610    * @throws IOException
611    */
612   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
613       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
614       throws IOException {
615     if (!cellFeaturesEnabled) {
616       return false;
617     }
618     long cellGrants = 0;
619     User user = getActiveUser();
620     long latestCellTs = 0;
621     Get get = new Get(row);
622     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
623     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
624     // version. We have to get every cell version and check its TS against the TS asked for in
625     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
626     // consider only one such passing cell. In case of Delete we have to consider all the cell
627     // versions under this passing version. When Delete Mutation contains columns which are a
628     // version delete just consider only one version for those column cells.
629     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
630     if (considerCellTs) {
631       get.setMaxVersions();
632     } else {
633       get.setMaxVersions(1);
634     }
635     boolean diffCellTsFromOpTs = false;
636     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
637       byte[] col = entry.getKey();
638       // TODO: HBASE-7114 could possibly unify the collection type in family
639       // maps so we would not need to do this
640       if (entry.getValue() instanceof Set) {
641         Set<byte[]> set = (Set<byte[]>)entry.getValue();
642         if (set == null || set.isEmpty()) {
643           get.addFamily(col);
644         } else {
645           for (byte[] qual: set) {
646             get.addColumn(col, qual);
647           }
648         }
649       } else if (entry.getValue() instanceof List) {
650         List<Cell> list = (List<Cell>)entry.getValue();
651         if (list == null || list.isEmpty()) {
652           get.addFamily(col);
653         } else {
654           // In case of family delete, a Cell will be added into the list with Qualifier as null.
655           for (Cell cell : list) {
656             if (cell.getQualifierLength() == 0
657                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
658                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
659               get.addFamily(col);
660             } else {
661               get.addColumn(col, CellUtil.cloneQualifier(cell));
662             }
663             if (considerCellTs) {
664               long cellTs = cell.getTimestamp();
665               latestCellTs = Math.max(latestCellTs, cellTs);
666               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
667             }
668           }
669         }
670       } else {
671         throw new RuntimeException("Unhandled collection type " +
672           entry.getValue().getClass().getName());
673       }
674     }
675     // We want to avoid looking into the future. So, if the cells of the
676     // operation specify a timestamp, or the operation itself specifies a
677     // timestamp, then we use the maximum ts found. Otherwise, we bound
678     // the Get to the current server time. We add 1 to the timerange since
679     // the upper bound of a timerange is exclusive yet we need to examine
680     // any cells found there inclusively.
681     long latestTs = Math.max(opTs, latestCellTs);
682     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
683       latestTs = EnvironmentEdgeManager.currentTime();
684     }
685     get.setTimeRange(0, latestTs + 1);
686     // In case of Put operation we set to read all versions. This was done to consider the case
687     // where columns are added with TS other than the Mutation TS. But normally this wont be the
688     // case with Put. There no need to get all versions but get latest version only.
689     if (!diffCellTsFromOpTs && request == OpType.PUT) {
690       get.setMaxVersions(1);
691     }
692     if (LOG.isTraceEnabled()) {
693       LOG.trace("Scanning for cells with " + get);
694     }
695     // This Map is identical to familyMap. The key is a BR rather than byte[].
696     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
697     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
698     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
699     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
700       if (entry.getValue() instanceof List) {
701         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
702       }
703     }
704     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
705     List<Cell> cells = Lists.newArrayList();
706     Cell prevCell = null;
707     ByteRange curFam = new SimpleMutableByteRange();
708     boolean curColAllVersions = (request == OpType.DELETE);
709     long curColCheckTs = opTs;
710     boolean foundColumn = false;
711     try {
712       boolean more = false;
713       do {
714         cells.clear();
715         // scan with limit as 1 to hold down memory use on wide rows
716         more = scanner.next(cells, 1);
717         for (Cell cell: cells) {
718           if (LOG.isTraceEnabled()) {
719             LOG.trace("Found cell " + cell);
720           }
721           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
722           if (colChange) foundColumn = false;
723           prevCell = cell;
724           if (!curColAllVersions && foundColumn) {
725             continue;
726           }
727           if (colChange && considerCellTs) {
728             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
729             List<Cell> cols = familyMap1.get(curFam);
730             for (Cell col : cols) {
731               // null/empty qualifier is used to denote a Family delete. The TS and delete type
732               // associated with this is applicable for all columns within the family. That is
733               // why the below (col.getQualifierLength() == 0) check.
734               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
735                   || CellUtil.matchingQualifier(cell, col)) {
736                 byte type = col.getTypeByte();
737                 if (considerCellTs) {
738                   curColCheckTs = col.getTimestamp();
739                 }
740                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
741                 // a version delete for a column no need to check all the covering cells within
742                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
743                 // One version delete types are Delete/DeleteFamilyVersion
744                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
745                     || (KeyValue.Type.DeleteFamily.getCode() == type);
746                 break;
747               }
748             }
749           }
750           if (cell.getTimestamp() > curColCheckTs) {
751             // Just ignore this cell. This is not a covering cell.
752             continue;
753           }
754           foundColumn = true;
755           for (Action action: actions) {
756             // Are there permissions for this user for the cell?
757             if (!authManager.authorize(user, getTableName(e), cell, action)) {
758               // We can stop if the cell ACL denies access
759               return false;
760             }
761           }
762           cellGrants++;
763         }
764       } while (more);
765     } catch (AccessDeniedException ex) {
766       throw ex;
767     } catch (IOException ex) {
768       LOG.error("Exception while getting cells to calculate covering permission", ex);
769     } finally {
770       scanner.close();
771     }
772     // We should not authorize unless we have found one or more cell ACLs that
773     // grant access. This code is used to check for additional permissions
774     // after no table or CF grants are found.
775     return cellGrants > 0;
776   }
777 
778   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
779     // Iterate over the entries in the familyMap, replacing the cells therein
780     // with new cells including the ACL data
781     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
782       List<Cell> newCells = Lists.newArrayList();
783       for (Cell cell: e.getValue()) {
784         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
785         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
786         if (cell.getTagsLength() > 0) {
787           Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
788             cell.getTagsOffset(), cell.getTagsLength());
789           while (tagIterator.hasNext()) {
790             tags.add(tagIterator.next());
791           }
792         }
793         newCells.add(new TagRewriteCell(cell, Tag.fromList(tags)));
794       }
795       // This is supposed to be safe, won't CME
796       e.setValue(newCells);
797     }
798   }
799 
800   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
801   // type is reserved and should not be explicitly set by user.
802   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
803     // Superusers are allowed to store cells unconditionally.
804     if (superusers.contains(user.getShortName())) {
805       return;
806     }
807     // We already checked (prePut vs preBatchMutation)
808     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
809       return;
810     }
811     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
812       Cell cell = cellScanner.current();
813       if (cell.getTagsLength() > 0) {
814         Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
815           cell.getTagsLength());
816         while (tagsItr.hasNext()) {
817           if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
818             throw new AccessDeniedException("Mutation contains cell with reserved type tag");
819           }
820         }
821       }
822     }
823     m.setAttribute(TAG_CHECK_PASSED, TRUE);
824   }
825 
826   /* ---- MasterObserver implementation ---- */
827 
828   public void start(CoprocessorEnvironment env) throws IOException {
829     CompoundConfiguration conf = new CompoundConfiguration();
830     conf.add(env.getConfiguration());
831 
832     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
833       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
834 
835     cellFeaturesEnabled = HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS;
836     if (!cellFeaturesEnabled) {
837       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
838           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
839           + " accordingly.");
840     }
841 
842     ZooKeeperWatcher zk = null;
843     if (env instanceof MasterCoprocessorEnvironment) {
844       // if running on HMaster
845       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
846       zk = mEnv.getMasterServices().getZooKeeper();
847     } else if (env instanceof RegionServerCoprocessorEnvironment) {
848       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
849       zk = rsEnv.getRegionServerServices().getZooKeeper();
850     } else if (env instanceof RegionCoprocessorEnvironment) {
851       // if running at region
852       regionEnv = (RegionCoprocessorEnvironment) env;
853       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
854       zk = regionEnv.getRegionServerServices().getZooKeeper();
855       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
856         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
857     }
858 
859     // set the user-provider.
860     this.userProvider = UserProvider.instantiate(env.getConfiguration());
861 
862     // set up the list of users with superuser privilege
863     User user = userProvider.getCurrent();
864     superusers = Lists.asList(user.getShortName(),
865       conf.getStrings(AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
866 
867     // If zk is null or IOException while obtaining auth manager,
868     // throw RuntimeException so that the coprocessor is unloaded.
869     if (zk != null) {
870       try {
871         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
872       } catch (IOException ioe) {
873         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
874       }
875     } else {
876       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
877     }
878 
879     tableAcls = new MapMaker().weakValues().makeMap();
880   }
881 
882   public void stop(CoprocessorEnvironment env) {
883 
884   }
885 
886   @Override
887   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
888       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
889     Set<byte[]> families = desc.getFamiliesKeys();
890     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
891     for (byte[] family: families) {
892       familyMap.put(family, null);
893     }
894     requireGlobalPermission("createTable", Action.CREATE, desc.getTableName(), familyMap);
895   }
896 
897   @Override
898   public void postCreateTableHandler(final ObserverContext<MasterCoprocessorEnvironment> c,
899       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
900     // When AC is used, it should be configured as the 1st CP.
901     // In Master, the table operations like create, are handled by a Thread pool but the max size
902     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
903     // sequentially only.
904     // Related code in HMaster#startServiceThreads
905     // {code}
906     //   // We depend on there being only one instance of this executor running
907     //   // at a time. To do concurrency, would need fencing of enable/disable of
908     //   // tables.
909     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
910     // {code}
911     // In future if we change this pool to have more threads, then there is a chance for thread,
912     // creating acl table, getting delayed and by that time another table creation got over and
913     // this hook is getting called. In such a case, we will need a wait logic here which will
914     // wait till the acl table is created.
915     if (AccessControlLists.isAclTable(desc)) {
916       this.aclTabAvailable = true;
917     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
918       if (!aclTabAvailable) {
919         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
920             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
921             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
922       } else {
923         String owner = desc.getOwnerString();
924         // default the table owner to current user, if not specified.
925         if (owner == null)
926           owner = getActiveUser().getShortName();
927         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
928             desc.getTableName(), null, Action.values());
929         // switch to the real hbase master user for doing the RPC on the ACL table
930         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
931           @Override
932           public Void run() throws Exception {
933             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
934                 userperm);
935             return null;
936           }
937         });
938       }
939     }
940   }
941 
942   @Override
943   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
944       throws IOException {
945     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
946   }
947 
948   @Override
949   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
950       final TableName tableName) throws IOException {
951     final Configuration conf = c.getEnvironment().getConfiguration();
952     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
953       @Override
954       public Void run() throws Exception {
955         AccessControlLists.removeTablePermissions(conf, tableName);
956         return null;
957       }
958     });
959     this.authManager.getZKPermissionWatcher().deleteTableACLNode(tableName);
960   }
961 
962   @Override
963   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
964       final TableName tableName) throws IOException {
965     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
966     final Configuration conf = c.getEnvironment().getConfiguration();
967     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
968       @Override
969       public Void run() throws Exception {
970         List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
971         if (acls != null) {
972           tableAcls.put(tableName, acls);
973         }
974         return null;
975       }
976     });
977   }
978 
979   @Override
980   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
981       final TableName tableName) throws IOException {
982     final Configuration conf = ctx.getEnvironment().getConfiguration();
983     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
984       @Override
985       public Void run() throws Exception {
986         List<UserPermission> perms = tableAcls.get(tableName);
987         if (perms != null) {
988           for (UserPermission perm : perms) {
989             AccessControlLists.addUserPermission(conf, perm);
990           }
991         }
992         tableAcls.remove(tableName);
993         return null;
994       }
995     });
996   }
997 
998   @Override
999   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1000       HTableDescriptor htd) throws IOException {
1001     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1002   }
1003 
1004   @Override
1005   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
1006       TableName tableName, final HTableDescriptor htd) throws IOException {
1007     final Configuration conf = c.getEnvironment().getConfiguration();
1008     // default the table owner to current user, if not specified.
1009     final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
1010       getActiveUser().getShortName();
1011     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1012       @Override
1013       public Void run() throws Exception {
1014         UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1015           htd.getTableName(), null, Action.values());
1016         AccessControlLists.addUserPermission(conf, userperm);
1017         return null;
1018       }
1019     });
1020   }
1021 
1022   @Override
1023   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1024       HColumnDescriptor column) throws IOException {
1025     requirePermission("addColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
1026   }
1027 
1028   @Override
1029   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1030       HColumnDescriptor descriptor) throws IOException {
1031     requirePermission("modifyColumn", tableName, descriptor.getName(), null, Action.ADMIN,
1032       Action.CREATE);
1033   }
1034 
1035   @Override
1036   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1037       byte[] col) throws IOException {
1038     requirePermission("deleteColumn", tableName, col, null, Action.ADMIN, Action.CREATE);
1039   }
1040 
1041   @Override
1042   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
1043       final TableName tableName, final byte[] col) throws IOException {
1044     final Configuration conf = c.getEnvironment().getConfiguration();
1045     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1046       @Override
1047       public Void run() throws Exception {
1048         AccessControlLists.removeTablePermissions(conf, tableName, col);
1049         return null;
1050       }
1051     });
1052   }
1053 
1054   @Override
1055   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1056       throws IOException {
1057     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1058   }
1059 
1060   @Override
1061   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1062       throws IOException {
1063     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1064       throw new AccessDeniedException("Not allowed to disable "
1065           + AccessControlLists.ACL_TABLE_NAME + " table.");
1066     }
1067     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1068   }
1069 
1070   @Override
1071   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1072       ServerName srcServer, ServerName destServer) throws IOException {
1073     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1074   }
1075 
1076   @Override
1077   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1078       throws IOException {
1079     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1080   }
1081 
1082   @Override
1083   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1084       boolean force) throws IOException {
1085     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1086   }
1087 
1088   @Override
1089   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1090       HRegionInfo regionInfo) throws IOException {
1091     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1092   }
1093 
1094   @Override
1095   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1096       throws IOException {
1097     requirePermission("balance", Action.ADMIN);
1098   }
1099 
1100   @Override
1101   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1102       boolean newValue) throws IOException {
1103     requirePermission("balanceSwitch", Action.ADMIN);
1104     return newValue;
1105   }
1106 
1107   @Override
1108   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1109       throws IOException {
1110     requirePermission("shutdown", Action.ADMIN);
1111   }
1112 
1113   @Override
1114   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1115       throws IOException {
1116     requirePermission("stopMaster", Action.ADMIN);
1117   }
1118 
1119   @Override
1120   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1121       throws IOException {
1122     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1123       .getConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1124       // initialize the ACL storage table
1125       AccessControlLists.createACLTable(ctx.getEnvironment().getMasterServices());
1126     } else {
1127       aclTabAvailable = true;
1128     }
1129   }
1130 
1131   @Override
1132   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1133       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1134       throws IOException {
1135     requirePermission("snapshot", hTableDescriptor.getTableName(), null, null,
1136       Permission.Action.ADMIN);
1137   }
1138 
1139   @Override
1140   public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1141       final SnapshotDescription snapshot) throws IOException {
1142     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1143       // list it, if user is the owner of snapshot
1144     } else {
1145       requirePermission("listSnapshot", Action.ADMIN);
1146     }
1147   }
1148 
1149   @Override
1150   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1151       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1152       throws IOException {
1153     requirePermission("clone", Action.ADMIN);
1154   }
1155 
1156   @Override
1157   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1158       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1159       throws IOException {
1160     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1161       requirePermission("restoreSnapshot", hTableDescriptor.getTableName(), null, null,
1162         Permission.Action.ADMIN);
1163     } else {
1164       requirePermission("restoreSnapshot", Action.ADMIN);
1165     }
1166   }
1167 
1168   @Override
1169   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1170       final SnapshotDescription snapshot) throws IOException {
1171     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1172       // Snapshot owner is allowed to delete the snapshot
1173     } else {
1174       requirePermission("deleteSnapshot", Action.ADMIN);
1175     }
1176   }
1177 
1178   @Override
1179   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1180       NamespaceDescriptor ns) throws IOException {
1181     requirePermission("createNamespace", Action.ADMIN);
1182   }
1183 
1184   @Override
1185   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1186       throws IOException {
1187     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1188   }
1189 
1190   @Override
1191   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1192       final String namespace) throws IOException {
1193     final Configuration conf = ctx.getEnvironment().getConfiguration();
1194     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1195       @Override
1196       public Void run() throws Exception {
1197         AccessControlLists.removeNamespacePermissions(conf, namespace);
1198         return null;
1199       }
1200     });
1201     LOG.info(namespace + "entry deleted in " + AccessControlLists.ACL_TABLE_NAME + " table.");
1202   }
1203 
1204   @Override
1205   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1206       NamespaceDescriptor ns) throws IOException {
1207     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1208   }
1209 
1210   @Override
1211   public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1212       throws IOException {
1213     requireGlobalPermission("getNamespaceDescriptor", Action.ADMIN, namespace);
1214   }
1215 
1216   @Override
1217   public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1218       List<NamespaceDescriptor> descriptors) throws IOException {
1219     // Retains only those which passes authorization checks, as the checks weren't done as part
1220     // of preGetTableDescriptors.
1221     Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1222     while (itr.hasNext()) {
1223       NamespaceDescriptor desc = itr.next();
1224       try {
1225         requireGlobalPermission("listNamespaces", Action.ADMIN, desc.getName());
1226       } catch (AccessDeniedException e) {
1227         itr.remove();
1228       }
1229     }
1230   }
1231 
1232   @Override
1233   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1234       final TableName tableName) throws IOException {
1235     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1236   }
1237 
1238   /* ---- RegionObserver implementation ---- */
1239 
1240   @Override
1241   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1242       throws IOException {
1243     RegionCoprocessorEnvironment env = e.getEnvironment();
1244     final HRegion region = env.getRegion();
1245     if (region == null) {
1246       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1247     } else {
1248       HRegionInfo regionInfo = region.getRegionInfo();
1249       if (regionInfo.getTable().isSystemTable()) {
1250         isSystemOrSuperUser(regionEnv.getConfiguration());
1251       } else {
1252         requirePermission("preOpen", Action.ADMIN);
1253       }
1254     }
1255   }
1256 
1257   @Override
1258   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1259     RegionCoprocessorEnvironment env = c.getEnvironment();
1260     final HRegion region = env.getRegion();
1261     if (region == null) {
1262       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1263       return;
1264     }
1265     if (AccessControlLists.isAclRegion(region)) {
1266       aclRegion = true;
1267       // When this region is under recovering state, initialize will be handled by postLogReplay
1268       if (!region.isRecovering()) {
1269         try {
1270           initialize(env);
1271         } catch (IOException ex) {
1272           // if we can't obtain permissions, it's better to fail
1273           // than perform checks incorrectly
1274           throw new RuntimeException("Failed to initialize permissions cache", ex);
1275         }
1276       }
1277     } else {
1278       initialized = true;
1279     }
1280   }
1281 
1282   @Override
1283   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1284     if (aclRegion) {
1285       try {
1286         initialize(c.getEnvironment());
1287       } catch (IOException ex) {
1288         // if we can't obtain permissions, it's better to fail
1289         // than perform checks incorrectly
1290         throw new RuntimeException("Failed to initialize permissions cache", ex);
1291       }
1292     }
1293   }
1294 
1295   @Override
1296   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1297     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1298         Action.CREATE);
1299   }
1300 
1301   @Override
1302   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1303     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1304   }
1305 
1306   @Override
1307   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1308       byte[] splitRow) throws IOException {
1309     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1310   }
1311 
1312   @Override
1313   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1314       final Store store, final InternalScanner scanner, final ScanType scanType)
1315           throws IOException {
1316     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1317         Action.CREATE);
1318     return scanner;
1319   }
1320 
1321   @Override
1322   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1323       final byte [] row, final byte [] family, final Result result)
1324       throws IOException {
1325     assert family != null;
1326     RegionCoprocessorEnvironment env = c.getEnvironment();
1327     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, null);
1328     User user = getActiveUser();
1329     AuthResult authResult = permissionGranted(OpType.GET_CLOSEST_ROW_BEFORE, user, env, families,
1330       Action.READ);
1331     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1332       authResult.setAllowed(checkCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, env, row,
1333         families, HConstants.LATEST_TIMESTAMP, Action.READ));
1334       authResult.setReason("Covering cell set");
1335     }
1336     logResult(authResult);
1337     if (!authResult.isAllowed()) {
1338       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1339     }
1340   }
1341 
1342   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1343       final Query query, OpType opType) throws IOException {
1344     Filter filter = query.getFilter();
1345     // Don't wrap an AccessControlFilter
1346     if (filter != null && filter instanceof AccessControlFilter) {
1347       return;
1348     }
1349     User user = getActiveUser();
1350     RegionCoprocessorEnvironment env = c.getEnvironment();
1351     Map<byte[],? extends Collection<byte[]>> families = null;
1352     switch (opType) {
1353     case GET:
1354     case EXISTS:
1355       families = ((Get)query).getFamilyMap();
1356       break;
1357     case SCAN:
1358       families = ((Scan)query).getFamilyMap();
1359       break;
1360     default:
1361       throw new RuntimeException("Unhandled operation " + opType);
1362     }
1363     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1364     HRegion region = getRegion(env);
1365     TableName table = getTableName(region);
1366     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1367     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1368       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1369     }
1370     if (!authResult.isAllowed()) {
1371       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1372         // Old behavior: Scan with only qualifier checks if we have partial
1373         // permission. Backwards compatible behavior is to throw an
1374         // AccessDeniedException immediately if there are no grants for table
1375         // or CF or CF+qual. Only proceed with an injected filter if there are
1376         // grants for qualifiers. Otherwise we will fall through below and log
1377         // the result and throw an ADE. We may end up checking qualifier
1378         // grants three times (permissionGranted above, here, and in the
1379         // filter) but that's the price of backwards compatibility.
1380         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1381           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1382             AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1383             cfVsMaxVersions);
1384           // wrap any existing filter
1385           if (filter != null) {
1386             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1387               Lists.newArrayList(ourFilter, filter));
1388           }
1389           authResult.setAllowed(true);
1390           authResult.setReason("Access allowed with filter");
1391           switch (opType) {
1392           case GET:
1393           case EXISTS:
1394             ((Get)query).setFilter(ourFilter);
1395             break;
1396           case SCAN:
1397             ((Scan)query).setFilter(ourFilter);
1398             break;
1399           default:
1400             throw new RuntimeException("Unhandled operation " + opType);
1401           }
1402         }
1403       } else {
1404         // New behavior: Any access we might be granted is more fine-grained
1405         // than whole table or CF. Simply inject a filter and return what is
1406         // allowed. We will not throw an AccessDeniedException. This is a
1407         // behavioral change since 0.96.
1408         Filter ourFilter = new AccessControlFilter(authManager, user, table,
1409           AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1410         // wrap any existing filter
1411         if (filter != null) {
1412           ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1413             Lists.newArrayList(ourFilter, filter));
1414         }
1415         authResult.setAllowed(true);
1416         authResult.setReason("Access allowed with filter");
1417         switch (opType) {
1418         case GET:
1419         case EXISTS:
1420           ((Get)query).setFilter(ourFilter);
1421           break;
1422         case SCAN:
1423           ((Scan)query).setFilter(ourFilter);
1424           break;
1425         default:
1426           throw new RuntimeException("Unhandled operation " + opType);
1427         }
1428       }
1429     }
1430 
1431     logResult(authResult);
1432     if (!authResult.isAllowed()) {
1433       throw new AccessDeniedException("Insufficient permissions (table=" + table +
1434         ", action=READ)");
1435     }
1436   }
1437 
1438   @Override
1439   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1440       final Get get, final List<Cell> result) throws IOException {
1441     internalPreRead(c, get, OpType.GET);
1442   }
1443 
1444   @Override
1445   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1446       final Get get, final boolean exists) throws IOException {
1447     internalPreRead(c, get, OpType.EXISTS);
1448     return exists;
1449   }
1450 
1451   @Override
1452   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1453       final Put put, final WALEdit edit, final Durability durability)
1454       throws IOException {
1455     // Require WRITE permission to the table, CF, or top visible value, if any.
1456     // NOTE: We don't need to check the permissions for any earlier Puts
1457     // because we treat the ACLs in each Put as timestamped like any other
1458     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1459     // change the ACL of any previous Put. This allows simple evolution of
1460     // security policy over time without requiring expensive updates.
1461     User user = getActiveUser();
1462     checkForReservedTagPresence(user, put);
1463     RegionCoprocessorEnvironment env = c.getEnvironment();
1464     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1465     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1466     logResult(authResult);
1467     if (!authResult.isAllowed()) {
1468       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1469         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1470       } else {
1471         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1472       }
1473     }
1474     // Add cell ACLs from the operation to the cells themselves
1475     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1476     if (bytes != null) {
1477       if (cellFeaturesEnabled) {
1478         addCellPermissions(bytes, put.getFamilyCellMap());
1479       } else {
1480         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1481       }
1482     }
1483   }
1484 
1485   @Override
1486   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1487       final Put put, final WALEdit edit, final Durability durability) {
1488     if (aclRegion) {
1489       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1490     }
1491   }
1492 
1493   @Override
1494   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1495       final Delete delete, final WALEdit edit, final Durability durability)
1496       throws IOException {
1497     // An ACL on a delete is useless, we shouldn't allow it
1498     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1499       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1500     }
1501     // Require WRITE permissions on all cells covered by the delete. Unlike
1502     // for Puts we need to check all visible prior versions, because a major
1503     // compaction could remove them. If the user doesn't have permission to
1504     // overwrite any of the visible versions ('visible' defined as not covered
1505     // by a tombstone already) then we have to disallow this operation.
1506     RegionCoprocessorEnvironment env = c.getEnvironment();
1507     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1508     User user = getActiveUser();
1509     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1510     logResult(authResult);
1511     if (!authResult.isAllowed()) {
1512       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1513         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1514       } else {
1515         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1516       }
1517     }
1518   }
1519 
1520   @Override
1521   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1522       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1523     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1524       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1525       for (int i = 0; i < miniBatchOp.size(); i++) {
1526         Mutation m = miniBatchOp.getOperation(i);
1527         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1528           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1529           // perm check
1530           OpType opType;
1531           if (m instanceof Put) {
1532             checkForReservedTagPresence(getActiveUser(), m);
1533             opType = OpType.PUT;
1534           } else {
1535             opType = OpType.DELETE;
1536           }
1537           AuthResult authResult = null;
1538           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(), m.getFamilyCellMap(),
1539               m.getTimeStamp(), Action.WRITE)) {
1540             authResult = AuthResult.allow(opType.toString(), "Covering cell set", getActiveUser(),
1541                 Action.WRITE, table, m.getFamilyCellMap());
1542           } else {
1543             authResult = AuthResult.deny(opType.toString(), "Covering cell set", getActiveUser(),
1544                 Action.WRITE, table, m.getFamilyCellMap());
1545           }
1546           logResult(authResult);
1547           if (!authResult.isAllowed()) {
1548             throw new AccessDeniedException("Insufficient permissions "
1549                 + authResult.toContextString());
1550           }
1551         }
1552       }
1553     }
1554   }
1555 
1556   @Override
1557   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1558       final Delete delete, final WALEdit edit, final Durability durability)
1559       throws IOException {
1560     if (aclRegion) {
1561       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1562     }
1563   }
1564 
1565   @Override
1566   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1567       final byte [] row, final byte [] family, final byte [] qualifier,
1568       final CompareFilter.CompareOp compareOp,
1569       final ByteArrayComparable comparator, final Put put,
1570       final boolean result) throws IOException {
1571     // Require READ and WRITE permissions on the table, CF, and KV to update
1572     User user = getActiveUser();
1573     checkForReservedTagPresence(user, put);
1574     RegionCoprocessorEnvironment env = c.getEnvironment();
1575     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1576     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1577       Action.READ, Action.WRITE);
1578     logResult(authResult);
1579     if (!authResult.isAllowed()) {
1580       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1581         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1582       } else {
1583         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1584       }
1585     }
1586     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1587     if (bytes != null) {
1588       if (cellFeaturesEnabled) {
1589         addCellPermissions(bytes, put.getFamilyCellMap());
1590       } else {
1591         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1592       }
1593     }
1594     return result;
1595   }
1596 
1597   @Override
1598   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1599       final byte[] row, final byte[] family, final byte[] qualifier,
1600       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1601       final boolean result) throws IOException {
1602     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1603       // We had failure with table, cf and q perm checks and now giving a chance for cell
1604       // perm check
1605       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1606       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1607       AuthResult authResult = null;
1608       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1609           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1610         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1611             getActiveUser(), Action.READ, table, families);
1612       } else {
1613         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1614             getActiveUser(), Action.READ, table, families);
1615       }
1616       logResult(authResult);
1617       if (!authResult.isAllowed()) {
1618         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1619       }
1620     }
1621     return result;
1622   }
1623 
1624   @Override
1625   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1626       final byte [] row, final byte [] family, final byte [] qualifier,
1627       final CompareFilter.CompareOp compareOp,
1628       final ByteArrayComparable comparator, final Delete delete,
1629       final boolean result) throws IOException {
1630     // An ACL on a delete is useless, we shouldn't allow it
1631     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1632       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1633           delete.toString());
1634     }
1635     // Require READ and WRITE permissions on the table, CF, and the KV covered
1636     // by the delete
1637     RegionCoprocessorEnvironment env = c.getEnvironment();
1638     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1639     User user = getActiveUser();
1640     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1641       Action.READ, Action.WRITE);
1642     logResult(authResult);
1643     if (!authResult.isAllowed()) {
1644       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1645         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1646       } else {
1647         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1648       }
1649     }
1650     return result;
1651   }
1652 
1653   @Override
1654   public boolean preCheckAndDeleteAfterRowLock(
1655       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1656       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1657       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1658       throws IOException {
1659     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1660       // We had failure with table, cf and q perm checks and now giving a chance for cell
1661       // perm check
1662       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1663       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1664       AuthResult authResult = null;
1665       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1666           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1667         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1668             getActiveUser(), Action.READ, table, families);
1669       } else {
1670         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1671             getActiveUser(), Action.READ, table, families);
1672       }
1673       logResult(authResult);
1674       if (!authResult.isAllowed()) {
1675         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1676       }
1677     }
1678     return result;
1679   }
1680 
1681   @Override
1682   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1683       final byte [] row, final byte [] family, final byte [] qualifier,
1684       final long amount, final boolean writeToWAL)
1685       throws IOException {
1686     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1687     // incremented value
1688     RegionCoprocessorEnvironment env = c.getEnvironment();
1689     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1690     User user = getActiveUser();
1691     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1692       Action.WRITE);
1693     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1694       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1695         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1696       authResult.setReason("Covering cell set");
1697     }
1698     logResult(authResult);
1699     if (!authResult.isAllowed()) {
1700       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1701     }
1702     return -1;
1703   }
1704 
1705   @Override
1706   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1707       throws IOException {
1708     // Require WRITE permission to the table, CF, and the KV to be appended
1709     User user = getActiveUser();
1710     checkForReservedTagPresence(user, append);
1711     RegionCoprocessorEnvironment env = c.getEnvironment();
1712     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1713     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1714     logResult(authResult);
1715     if (!authResult.isAllowed()) {
1716       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1717         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1718       } else {
1719         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1720       }
1721     }
1722     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1723     if (bytes != null) {
1724       if (cellFeaturesEnabled) {
1725         addCellPermissions(bytes, append.getFamilyCellMap());
1726       } else {
1727         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1728       }
1729     }
1730     return null;
1731   }
1732 
1733   @Override
1734   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1735       final Append append) throws IOException {
1736     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1737       // We had failure with table, cf and q perm checks and now giving a chance for cell
1738       // perm check
1739       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1740       AuthResult authResult = null;
1741       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1742           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1743         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1744             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1745       } else {
1746         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1747             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1748       }
1749       logResult(authResult);
1750       if (!authResult.isAllowed()) {
1751         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1752       }
1753     }
1754     return null;
1755   }
1756 
1757   @Override
1758   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1759       final Increment increment)
1760       throws IOException {
1761     // Require WRITE permission to the table, CF, and the KV to be replaced by
1762     // the incremented value
1763     User user = getActiveUser();
1764     checkForReservedTagPresence(user, increment);
1765     RegionCoprocessorEnvironment env = c.getEnvironment();
1766     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1767     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1768       Action.WRITE);
1769     logResult(authResult);
1770     if (!authResult.isAllowed()) {
1771       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1772         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1773       } else {
1774         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1775       }
1776     }
1777     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1778     if (bytes != null) {
1779       if (cellFeaturesEnabled) {
1780         addCellPermissions(bytes, increment.getFamilyCellMap());
1781       } else {
1782         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1783       }
1784     }
1785     return null;
1786   }
1787 
1788   @Override
1789   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1790       final Increment increment) throws IOException {
1791     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1792       // We had failure with table, cf and q perm checks and now giving a chance for cell
1793       // perm check
1794       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1795       AuthResult authResult = null;
1796       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1797           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1798         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1799             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1800       } else {
1801         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1802             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1803       }
1804       logResult(authResult);
1805       if (!authResult.isAllowed()) {
1806         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1807       }
1808     }
1809     return null;
1810   }
1811 
1812   @Override
1813   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1814       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1815     // If the HFile version is insufficient to persist tags, we won't have any
1816     // work to do here
1817     if (!cellFeaturesEnabled) {
1818       return newCell;
1819     }
1820 
1821     // Collect any ACLs from the old cell
1822     List<Tag> tags = Lists.newArrayList();
1823     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1824     if (oldCell != null) {
1825       // Save an object allocation where we can
1826       if (oldCell.getTagsLength() > 0) {
1827         Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell.getTagsArray(),
1828           oldCell.getTagsOffset(), oldCell.getTagsLength());
1829         while (tagIterator.hasNext()) {
1830           Tag tag = tagIterator.next();
1831           if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
1832             // Not an ACL tag, just carry it through
1833             if (LOG.isTraceEnabled()) {
1834               LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
1835                 " length " + tag.getTagLength());
1836             }
1837             tags.add(tag);
1838           } else {
1839             // Merge the perms from the older ACL into the current permission set
1840             ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
1841               AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
1842                 tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
1843             perms.putAll(kvPerms);
1844           }
1845         }
1846       }
1847     }
1848 
1849     // Do we have an ACL on the operation?
1850     byte[] aclBytes = mutation.getACL();
1851     if (aclBytes != null) {
1852       // Yes, use it
1853       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
1854     } else {
1855       // No, use what we carried forward
1856       if (perms != null) {
1857         // TODO: If we collected ACLs from more than one tag we may have a
1858         // List<Permission> of size > 1, this can be collapsed into a single
1859         // Permission
1860         if (LOG.isTraceEnabled()) {
1861           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1862         }
1863         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
1864           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
1865       }
1866     }
1867 
1868     // If we have no tags to add, just return
1869     if (tags.isEmpty()) {
1870       return newCell;
1871     }
1872 
1873     Cell rewriteCell = new TagRewriteCell(newCell, Tag.fromList(tags));
1874     return rewriteCell;
1875   }
1876 
1877   @Override
1878   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1879       final Scan scan, final RegionScanner s) throws IOException {
1880     internalPreRead(c, scan, OpType.SCAN);
1881     return s;
1882   }
1883 
1884   @Override
1885   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1886       final Scan scan, final RegionScanner s) throws IOException {
1887     User user = getActiveUser();
1888     if (user != null && user.getShortName() != null) {      // store reference to scanner owner for later checks
1889       scannerOwners.put(s, user.getShortName());
1890     }
1891     return s;
1892   }
1893 
1894   @Override
1895   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1896       final InternalScanner s, final List<Result> result,
1897       final int limit, final boolean hasNext) throws IOException {
1898     requireScannerOwner(s);
1899     return hasNext;
1900   }
1901 
1902   @Override
1903   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1904       final InternalScanner s) throws IOException {
1905     requireScannerOwner(s);
1906   }
1907 
1908   @Override
1909   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1910       final InternalScanner s) throws IOException {
1911     // clean up any associated owner mapping
1912     scannerOwners.remove(s);
1913   }
1914 
1915   /**
1916    * Verify, when servicing an RPC, that the caller is the scanner owner.
1917    * If so, we assume that access control is correctly enforced based on
1918    * the checks performed in preScannerOpen()
1919    */
1920   private void requireScannerOwner(InternalScanner s)
1921       throws AccessDeniedException {
1922     if (RequestContext.isInRequestContext()) {
1923       String requestUserName = RequestContext.getRequestUserName();
1924       String owner = scannerOwners.get(s);
1925       if (owner != null && !owner.equals(requestUserName)) {
1926         throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
1927       }
1928     }
1929   }
1930 
1931   /**
1932    * Verifies user has WRITE privileges on
1933    * the Column Families involved in the bulkLoadHFile
1934    * request. Specific Column Write privileges are presently
1935    * ignored.
1936    */
1937   @Override
1938   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1939       List<Pair<byte[], String>> familyPaths) throws IOException {
1940     for(Pair<byte[],String> el : familyPaths) {
1941       requirePermission("preBulkLoadHFile",
1942           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
1943           el.getFirst(),
1944           null,
1945           Action.CREATE);
1946     }
1947   }
1948 
1949   private AuthResult hasSomeAccess(RegionCoprocessorEnvironment e, String method, Action action)
1950       throws IOException {
1951     User requestUser = getActiveUser();
1952     final TableName tableName = e.getRegion().getTableDesc().getTableName();
1953     AuthResult authResult = permissionGranted(method, requestUser, action, e,
1954       Collections.EMPTY_MAP);
1955     if (!authResult.isAllowed()) {
1956       final Configuration conf = e.getConfiguration();
1957       // hasSomeAccess is called from bulkload pre hooks
1958       List<UserPermission> perms =
1959         User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
1960           @Override
1961           public List<UserPermission> run() throws Exception {
1962             return AccessControlLists.getUserTablePermissions(conf, tableName);
1963           }
1964         });
1965       for (UserPermission userPerm: perms) {
1966         for (Action userAction: userPerm.getActions()) {
1967           if (userAction.equals(action)) {
1968             return AuthResult.allow(method, "Access allowed", requestUser,
1969               action, tableName, null, null);
1970           }
1971         }
1972       }
1973     }
1974     return authResult;
1975   }
1976 
1977   /**
1978    * Authorization check for
1979    * SecureBulkLoadProtocol.prepareBulkLoad()
1980    * @param ctx the context
1981    * @param request the request
1982    * @throws IOException
1983    */
1984   @Override
1985   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
1986                                  PrepareBulkLoadRequest request) throws IOException {
1987     RegionCoprocessorEnvironment e = ctx.getEnvironment();
1988 
1989     AuthResult authResult = hasSomeAccess(e, "prePrepareBulkLoad", Action.WRITE);
1990     logResult(authResult);
1991     if (!authResult.isAllowed()) {
1992       throw new AccessDeniedException("Insufficient permissions (table=" +
1993         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
1994     }
1995   }
1996 
1997   /**
1998    * Authorization security check for
1999    * SecureBulkLoadProtocol.cleanupBulkLoad()
2000    * @param ctx the context
2001    * @param request the request
2002    * @throws IOException
2003    */
2004   @Override
2005   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2006                                  CleanupBulkLoadRequest request) throws IOException {
2007     RegionCoprocessorEnvironment e = ctx.getEnvironment();
2008 
2009     AuthResult authResult = hasSomeAccess(e, "preCleanupBulkLoad", Action.WRITE);
2010     logResult(authResult);
2011     if (!authResult.isAllowed()) {
2012       throw new AccessDeniedException("Insufficient permissions (table=" +
2013         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
2014     }
2015   }
2016 
2017   /* ---- EndpointObserver implementation ---- */
2018 
2019   @Override
2020   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2021       Service service, String methodName, Message request) throws IOException {
2022     // Don't intercept calls to our own AccessControlService, we check for
2023     // appropriate permissions in the service handlers
2024     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2025       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
2026         methodName + ")",
2027         getTableName(ctx.getEnvironment()), null, null,
2028         Action.EXEC);
2029     }
2030     return request;
2031   }
2032 
2033   @Override
2034   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2035       Service service, String methodName, Message request, Message.Builder responseBuilder)
2036       throws IOException { }
2037 
2038   /* ---- Protobuf AccessControlService implementation ---- */
2039 
2040   @Override
2041   public void grant(RpcController controller,
2042                     AccessControlProtos.GrantRequest request,
2043                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2044     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2045     AccessControlProtos.GrantResponse response = null;
2046     try {
2047       // verify it's only running at .acl.
2048       if (aclRegion) {
2049         if (!initialized) {
2050           throw new CoprocessorException("AccessController not yet initialized");
2051         }
2052         if (LOG.isDebugEnabled()) {
2053           LOG.debug("Received request to grant access permission " + perm.toString());
2054         }
2055 
2056         switch(request.getUserPermission().getPermission().getType()) {
2057           case Global :
2058           case Table :
2059             requirePermission("grant", perm.getTableName(), perm.getFamily(),
2060                 perm.getQualifier(), Action.ADMIN);
2061             break;
2062           case Namespace :
2063             requireGlobalPermission("grant", Action.ADMIN, perm.getNamespace());
2064             break;
2065         }
2066 
2067         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2068           @Override
2069           public Void run() throws Exception {
2070             AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2071             return null;
2072           }
2073         });
2074 
2075         if (AUDITLOG.isTraceEnabled()) {
2076           // audit log should store permission changes in addition to auth results
2077           AUDITLOG.trace("Granted permission " + perm.toString());
2078         }
2079       } else {
2080         throw new CoprocessorException(AccessController.class, "This method "
2081             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2082       }
2083       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2084     } catch (IOException ioe) {
2085       // pass exception back up
2086       ResponseConverter.setControllerException(controller, ioe);
2087     }
2088     done.run(response);
2089   }
2090 
2091   @Override
2092   public void revoke(RpcController controller,
2093                      AccessControlProtos.RevokeRequest request,
2094                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2095     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2096     AccessControlProtos.RevokeResponse response = null;
2097     try {
2098       // only allowed to be called on _acl_ region
2099       if (aclRegion) {
2100         if (!initialized) {
2101           throw new CoprocessorException("AccessController not yet initialized");
2102         }
2103         if (LOG.isDebugEnabled()) {
2104           LOG.debug("Received request to revoke access permission " + perm.toString());
2105         }
2106 
2107         switch(request.getUserPermission().getPermission().getType()) {
2108           case Global :
2109           case Table :
2110             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2111                               perm.getQualifier(), Action.ADMIN);
2112             break;
2113           case Namespace :
2114             requireGlobalPermission("revoke", Action.ADMIN, perm.getNamespace());
2115             break;
2116         }
2117 
2118         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2119           @Override
2120           public Void run() throws Exception {
2121             AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2122             return null;
2123           }
2124         });
2125 
2126         if (AUDITLOG.isTraceEnabled()) {
2127           // audit log should record all permission changes
2128           AUDITLOG.trace("Revoked permission " + perm.toString());
2129         }
2130       } else {
2131         throw new CoprocessorException(AccessController.class, "This method "
2132             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2133       }
2134       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2135     } catch (IOException ioe) {
2136       // pass exception back up
2137       ResponseConverter.setControllerException(controller, ioe);
2138     }
2139     done.run(response);
2140   }
2141 
2142   @Override
2143   public void getUserPermissions(RpcController controller,
2144                                  AccessControlProtos.GetUserPermissionsRequest request,
2145                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2146     AccessControlProtos.GetUserPermissionsResponse response = null;
2147     try {
2148       // only allowed to be called on _acl_ region
2149       if (aclRegion) {
2150         if (!initialized) {
2151           throw new CoprocessorException("AccessController not yet initialized");
2152         }
2153         List<UserPermission> perms = null;
2154         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2155           final TableName table = request.hasTableName() ?
2156             ProtobufUtil.toTableName(request.getTableName()) : null;
2157           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2158           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2159             @Override
2160             public List<UserPermission> run() throws Exception {
2161               return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2162             }
2163           });
2164         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2165           final String namespace = request.getNamespaceName().toStringUtf8();
2166           requireGlobalPermission("userPermissions", Action.ADMIN, namespace);
2167           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2168             @Override
2169             public List<UserPermission> run() throws Exception {
2170               return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2171                 namespace);
2172             }
2173           });
2174         } else {
2175           requirePermission("userPermissions", Action.ADMIN);
2176           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2177             @Override
2178             public List<UserPermission> run() throws Exception {
2179               return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2180             }
2181           });
2182         }
2183         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2184       } else {
2185         throw new CoprocessorException(AccessController.class, "This method "
2186             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2187       }
2188     } catch (IOException ioe) {
2189       // pass exception back up
2190       ResponseConverter.setControllerException(controller, ioe);
2191     }
2192     done.run(response);
2193   }
2194 
2195   @Override
2196   public void checkPermissions(RpcController controller,
2197                                AccessControlProtos.CheckPermissionsRequest request,
2198                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2199     Permission[] permissions = new Permission[request.getPermissionCount()];
2200     for (int i=0; i < request.getPermissionCount(); i++) {
2201       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2202     }
2203     AccessControlProtos.CheckPermissionsResponse response = null;
2204     try {
2205       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2206       for (Permission permission : permissions) {
2207         if (permission instanceof TablePermission) {
2208           TablePermission tperm = (TablePermission) permission;
2209           for (Action action : permission.getActions()) {
2210             if (!tperm.getTableName().equals(tableName)) {
2211               throw new CoprocessorException(AccessController.class, String.format("This method "
2212                   + "can only execute at the table specified in TablePermission. " +
2213                   "Table of the region:%s , requested table:%s", tableName,
2214                   tperm.getTableName()));
2215             }
2216 
2217             Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2218             if (tperm.getFamily() != null) {
2219               if (tperm.getQualifier() != null) {
2220                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2221                 qualifiers.add(tperm.getQualifier());
2222                 familyMap.put(tperm.getFamily(), qualifiers);
2223               } else {
2224                 familyMap.put(tperm.getFamily(), null);
2225               }
2226             }
2227 
2228             requirePermission("checkPermissions", action, regionEnv, familyMap);
2229           }
2230 
2231         } else {
2232           for (Action action : permission.getActions()) {
2233             requirePermission("checkPermissions", action);
2234           }
2235         }
2236       }
2237       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2238     } catch (IOException ioe) {
2239       ResponseConverter.setControllerException(controller, ioe);
2240     }
2241     done.run(response);
2242   }
2243 
2244   @Override
2245   public Service getService() {
2246     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2247   }
2248 
2249   private HRegion getRegion(RegionCoprocessorEnvironment e) {
2250     return e.getRegion();
2251   }
2252 
2253   private TableName getTableName(RegionCoprocessorEnvironment e) {
2254     HRegion region = e.getRegion();
2255     if (region != null) {
2256       return getTableName(region);
2257     }
2258     return null;
2259   }
2260 
2261   private TableName getTableName(HRegion region) {
2262     HRegionInfo regionInfo = region.getRegionInfo();
2263     if (regionInfo != null) {
2264       return regionInfo.getTable();
2265     }
2266     return null;
2267   }
2268 
2269   @Override
2270   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2271       throws IOException {
2272     requirePermission("preClose", Action.ADMIN);
2273   }
2274 
2275   private void isSystemOrSuperUser(Configuration conf) throws IOException {
2276     User user = userProvider.getCurrent();
2277     if (user == null) {
2278       throw new IOException("Unable to obtain the current user, " +
2279         "authorization checks for internal operations will not work correctly!");
2280     }
2281     User activeUser = getActiveUser();
2282     if (!(superusers.contains(activeUser.getShortName()))) {
2283       throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null") +
2284         "is not system or super user.");
2285     }
2286   }
2287 
2288   @Override
2289   public void preStopRegionServer(
2290       ObserverContext<RegionServerCoprocessorEnvironment> env)
2291       throws IOException {
2292     requirePermission("preStopRegionServer", Action.ADMIN);
2293   }
2294 
2295   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2296       byte[] qualifier) {
2297     if (family == null) {
2298       return null;
2299     }
2300 
2301     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2302     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2303     return familyMap;
2304   }
2305 
2306   @Override
2307   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2308        List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2309        String regex) throws IOException {
2310     // We are delegating the authorization check to postGetTableDescriptors as we don't have
2311     // any concrete set of table names when a regex is present or the full list is requested.
2312     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2313       // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2314       // request can be granted.
2315       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2316       for (TableName tableName: tableNamesList) {
2317         // Skip checks for a table that does not exist
2318         if (!masterServices.getTableStateManager().isTablePresent(tableName))
2319           continue;
2320         requirePermission("getTableDescriptors", tableName, null, null,
2321             Action.ADMIN, Action.CREATE);
2322       }
2323     }
2324   }
2325 
2326   @Override
2327   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2328       List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2329       String regex) throws IOException {
2330     // Skipping as checks in this case are already done by preGetTableDescriptors.
2331     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2332       return;
2333     }
2334 
2335     // Retains only those which passes authorization checks, as the checks weren't done as part
2336     // of preGetTableDescriptors.
2337     Iterator<HTableDescriptor> itr = descriptors.iterator();
2338     while (itr.hasNext()) {
2339       HTableDescriptor htd = itr.next();
2340       try {
2341         requirePermission("getTableDescriptors", htd.getTableName(), null, null,
2342             Action.ADMIN, Action.CREATE);
2343       } catch (AccessDeniedException e) {
2344         itr.remove();
2345       }
2346     }
2347   }
2348 
2349   @Override
2350   public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2351       List<HTableDescriptor> descriptors, String regex) throws IOException {
2352     // Retains only those which passes authorization checks.
2353     Iterator<HTableDescriptor> itr = descriptors.iterator();
2354     while (itr.hasNext()) {
2355       HTableDescriptor htd = itr.next();
2356       try {
2357         requireAccess("getTableNames", htd.getTableName(), Action.values());
2358       } catch (AccessDeniedException e) {
2359         itr.remove();
2360       }
2361     }
2362   }
2363 
2364   @Override
2365   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA,
2366       HRegion regionB) throws IOException {
2367     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2368       Action.ADMIN);
2369   }
2370 
2371   @Override
2372   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, HRegion regionA,
2373       HRegion regionB, HRegion mergedRegion) throws IOException { }
2374 
2375   @Override
2376   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2377       HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException { }
2378 
2379   @Override
2380   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2381       HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException { }
2382 
2383   @Override
2384   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2385       HRegion regionA, HRegion regionB) throws IOException { }
2386 
2387   @Override
2388   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2389       HRegion regionA, HRegion regionB) throws IOException { }
2390 
2391   @Override
2392   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2393       throws IOException {
2394     requirePermission("preRollLogWriterRequest", Permission.Action.ADMIN);
2395   }
2396 
2397   @Override
2398   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2399       throws IOException { }
2400 
2401   @Override
2402   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2403       final String userName, final Quotas quotas) throws IOException {
2404     requirePermission("setUserQuota", Action.ADMIN);
2405   }
2406 
2407   @Override
2408   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2409       final String userName, final TableName tableName, final Quotas quotas) throws IOException {
2410     requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN);
2411   }
2412 
2413   @Override
2414   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2415       final String userName, final String namespace, final Quotas quotas) throws IOException {
2416     requirePermission("setUserNamespaceQuota", Action.ADMIN);
2417   }
2418 
2419   @Override
2420   public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2421       final TableName tableName, final Quotas quotas) throws IOException {
2422     requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
2423   }
2424 
2425   @Override
2426   public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2427       final String namespace, final Quotas quotas) throws IOException {
2428     requirePermission("setNamespaceQuota", Action.ADMIN);
2429   }
2430 
2431   @Override
2432   public ReplicationEndpoint postCreateReplicationEndPoint(
2433       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2434     return endpoint;
2435   }
2436 }