View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package org.apache.hadoop.hbase.security.access;
16  
17  import java.io.IOException;
18  import java.net.InetAddress;
19  import java.security.PrivilegedExceptionAction;
20  import java.util.Collection;
21  import java.util.HashMap;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  import java.util.Set;
27  import java.util.TreeMap;
28  import java.util.TreeSet;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.CellScanner;
35  import org.apache.hadoop.hbase.CellUtil;
36  import org.apache.hadoop.hbase.CompoundConfiguration;
37  import org.apache.hadoop.hbase.CoprocessorEnvironment;
38  import org.apache.hadoop.hbase.DoNotRetryIOException;
39  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.KeyValue.Type;
46  import org.apache.hadoop.hbase.MetaTableAccessor;
47  import org.apache.hadoop.hbase.NamespaceDescriptor;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.Tag;
51  import org.apache.hadoop.hbase.TagRewriteCell;
52  import org.apache.hadoop.hbase.classification.InterfaceAudience;
53  import org.apache.hadoop.hbase.client.Append;
54  import org.apache.hadoop.hbase.client.Delete;
55  import org.apache.hadoop.hbase.client.Durability;
56  import org.apache.hadoop.hbase.client.Get;
57  import org.apache.hadoop.hbase.client.Increment;
58  import org.apache.hadoop.hbase.client.Mutation;
59  import org.apache.hadoop.hbase.client.Put;
60  import org.apache.hadoop.hbase.client.Query;
61  import org.apache.hadoop.hbase.client.Result;
62  import org.apache.hadoop.hbase.client.Scan;
63  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
64  import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
65  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
66  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
67  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
68  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
69  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
70  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
71  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
72  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
73  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
74  import org.apache.hadoop.hbase.filter.CompareFilter;
75  import org.apache.hadoop.hbase.filter.Filter;
76  import org.apache.hadoop.hbase.filter.FilterList;
77  import org.apache.hadoop.hbase.io.hfile.HFile;
78  import org.apache.hadoop.hbase.ipc.RequestContext;
79  import org.apache.hadoop.hbase.master.MasterServices;
80  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
81  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
82  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
83  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
84  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
85  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
86  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
87  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
88  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
89  import org.apache.hadoop.hbase.regionserver.HRegion;
90  import org.apache.hadoop.hbase.regionserver.InternalScanner;
91  import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
92  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
93  import org.apache.hadoop.hbase.regionserver.RegionScanner;
94  import org.apache.hadoop.hbase.regionserver.ScanType;
95  import org.apache.hadoop.hbase.regionserver.Store;
96  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
97  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
98  import org.apache.hadoop.hbase.security.AccessDeniedException;
99  import org.apache.hadoop.hbase.security.User;
100 import org.apache.hadoop.hbase.security.UserProvider;
101 import org.apache.hadoop.hbase.security.access.Permission.Action;
102 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
103 import org.apache.hadoop.hbase.util.ByteRange;
104 import org.apache.hadoop.hbase.util.Bytes;
105 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
106 import org.apache.hadoop.hbase.util.Pair;
107 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
108 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
109 
110 import com.google.common.collect.ArrayListMultimap;
111 import com.google.common.collect.ImmutableSet;
112 import com.google.common.collect.ListMultimap;
113 import com.google.common.collect.Lists;
114 import com.google.common.collect.MapMaker;
115 import com.google.common.collect.Maps;
116 import com.google.common.collect.Sets;
117 import com.google.protobuf.Message;
118 import com.google.protobuf.RpcCallback;
119 import com.google.protobuf.RpcController;
120 import com.google.protobuf.Service;
121 
122 /**
123  * Provides basic authorization checks for data access and administrative
124  * operations.
125  *
126  * <p>
127  * {@code AccessController} performs authorization checks for HBase operations
128  * based on:
129  * <ul>
130  *   <li>the identity of the user performing the operation</li>
131  *   <li>the scope over which the operation is performed, in increasing
132  *   specificity: global, table, column family, or qualifier</li>
133  *   <li>the type of action being performed (as mapped to
134  *   {@link Permission.Action} values)</li>
135  * </ul>
136  * If the authorization check fails, an {@link AccessDeniedException}
137  * will be thrown for the operation.
138  * </p>
139  *
140  * <p>
141  * To perform authorization checks, {@code AccessController} relies on the
142  * RpcServerEngine being loaded to provide
143  * the user identities for remote requests.
144  * </p>
145  *
146  * <p>
147  * The access control lists used for authorization can be manipulated via the
148  * exposed {@link AccessControlService} Interface implementation, and the associated
149  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
150  * commands.
151  * </p>
152  */
153 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
154 public class AccessController extends BaseMasterAndRegionObserver
155     implements RegionServerObserver,
156       AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
157 
158   public static final Log LOG = LogFactory.getLog(AccessController.class);
159 
160   private static final Log AUDITLOG =
161     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
162   private static final String CHECK_COVERING_PERM = "check_covering_perm";
163   private static final String TAG_CHECK_PASSED = "tag_check_passed";
164   private static final byte[] TRUE = Bytes.toBytes(true);
165 
166   TableAuthManager authManager = null;
167 
168   // flags if we are running on a region of the _acl_ table
169   boolean aclRegion = false;
170 
171   // defined only for Endpoint implementation, so it can have way to
172   // access region services.
173   private RegionCoprocessorEnvironment regionEnv;
174 
175   /** Mapping of scanner instances to the user who created them */
176   private Map<InternalScanner,String> scannerOwners =
177       new MapMaker().weakKeys().makeMap();
178 
179   private Map<TableName, List<UserPermission>> tableAcls;
180 
181   // Provider for mapping principal names to Users
182   private UserProvider userProvider;
183 
184   // The list of users with superuser authority
185   private List<String> superusers;
186 
187   // if we are able to support cell ACLs
188   boolean cellFeaturesEnabled;
189 
190   // if we should check EXEC permissions
191   boolean shouldCheckExecPermission;
192 
193   // if we should terminate access checks early as soon as table or CF grants
194   // allow access; pre-0.98 compatible behavior
195   boolean compatibleEarlyTermination;
196 
197   private volatile boolean initialized = false;
198 
199   // This boolean having relevance only in the Master.
200   private volatile boolean aclTabAvailable = false;
201 
202   public HRegion getRegion() {
203     return regionEnv != null ? regionEnv.getRegion() : null;
204   }
205 
206   public TableAuthManager getAuthManager() {
207     return authManager;
208   }
209 
210   void initialize(RegionCoprocessorEnvironment e) throws IOException {
211     final HRegion region = e.getRegion();
212     Configuration conf = e.getConfiguration();
213     Map<byte[], ListMultimap<String,TablePermission>> tables =
214         AccessControlLists.loadAll(region);
215     // For each table, write out the table's permissions to the respective
216     // znode for that table.
217     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
218       tables.entrySet()) {
219       byte[] entry = t.getKey();
220       ListMultimap<String,TablePermission> perms = t.getValue();
221       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
222       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
223     }
224     initialized = true;
225   }
226 
227   /**
228    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
229    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
230    * table updates.
231    */
232   void updateACL(RegionCoprocessorEnvironment e,
233       final Map<byte[], List<Cell>> familyMap) {
234     Set<byte[]> entries =
235         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
236     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
237       List<Cell> cells = f.getValue();
238       for (Cell cell: cells) {
239         if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
240             cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
241             AccessControlLists.ACL_LIST_FAMILY.length)) {
242           entries.add(CellUtil.cloneRow(cell));
243         }
244       }
245     }
246     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
247     Configuration conf = regionEnv.getConfiguration();
248     for (byte[] entry: entries) {
249       try {
250         ListMultimap<String,TablePermission> perms =
251           AccessControlLists.getPermissions(conf, entry);
252         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
253         zkw.writeToZookeeper(entry, serialized);
254       } catch (IOException ex) {
255         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
256             ex);
257       }
258     }
259   }
260 
261   /**
262    * Check the current user for authorization to perform a specific action
263    * against the given set of row data.
264    *
265    * <p>Note: Ordering of the authorization checks
266    * has been carefully optimized to short-circuit the most common requests
267    * and minimize the amount of processing required.</p>
268    *
269    * @param permRequest the action being requested
270    * @param e the coprocessor environment
271    * @param families the map of column families to qualifiers present in
272    * the request
273    * @return an authorization result
274    */
275   AuthResult permissionGranted(String request, User user, Action permRequest,
276       RegionCoprocessorEnvironment e,
277       Map<byte [], ? extends Collection<?>> families) {
278     HRegionInfo hri = e.getRegion().getRegionInfo();
279     TableName tableName = hri.getTable();
280 
281     // 1. All users need read access to hbase:meta table.
282     // this is a very common operation, so deal with it quickly.
283     if (hri.isMetaRegion()) {
284       if (permRequest == Action.READ) {
285         return AuthResult.allow(request, "All users allowed", user,
286           permRequest, tableName, families);
287       }
288     }
289 
290     if (user == null) {
291       return AuthResult.deny(request, "No user associated with request!", null,
292         permRequest, tableName, families);
293     }
294 
295     // 2. check for the table-level, if successful we can short-circuit
296     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
297       return AuthResult.allow(request, "Table permission granted", user,
298         permRequest, tableName, families);
299     }
300 
301     // 3. check permissions against the requested families
302     if (families != null && families.size() > 0) {
303       // all families must pass
304       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
305         // a) check for family level access
306         if (authManager.authorize(user, tableName, family.getKey(),
307             permRequest)) {
308           continue;  // family-level permission overrides per-qualifier
309         }
310 
311         // b) qualifier level access can still succeed
312         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
313           if (family.getValue() instanceof Set) {
314             // for each qualifier of the family
315             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
316             for (byte[] qualifier : familySet) {
317               if (!authManager.authorize(user, tableName, family.getKey(),
318                                          qualifier, permRequest)) {
319                 return AuthResult.deny(request, "Failed qualifier check", user,
320                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
321               }
322             }
323           } else if (family.getValue() instanceof List) { // List<KeyValue>
324             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
325             for (KeyValue kv : kvList) {
326               if (!authManager.authorize(user, tableName, family.getKey(),
327                       kv.getQualifier(), permRequest)) {
328                 return AuthResult.deny(request, "Failed qualifier check", user,
329                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
330               }
331             }
332           }
333         } else {
334           // no qualifiers and family-level check already failed
335           return AuthResult.deny(request, "Failed family check", user, permRequest,
336               tableName, makeFamilyMap(family.getKey(), null));
337         }
338       }
339 
340       // all family checks passed
341       return AuthResult.allow(request, "All family checks passed", user, permRequest,
342           tableName, families);
343     }
344 
345     // 4. no families to check and table level access failed
346     return AuthResult.deny(request, "No families to check and table permission failed",
347         user, permRequest, tableName, families);
348   }
349 
350   /**
351    * Check the current user for authorization to perform a specific action
352    * against the given set of row data.
353    * @param opType the operation type
354    * @param user the user
355    * @param e the coprocessor environment
356    * @param families the map of column families to qualifiers present in
357    * the request
358    * @param actions the desired actions
359    * @return an authorization result
360    */
361   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
362       Map<byte [], ? extends Collection<?>> families, Action... actions) {
363     AuthResult result = null;
364     for (Action action: actions) {
365       result = permissionGranted(opType.toString(), user, action, e, families);
366       if (!result.isAllowed()) {
367         return result;
368       }
369     }
370     return result;
371   }
372 
373   private void logResult(AuthResult result) {
374     if (AUDITLOG.isTraceEnabled()) {
375       RequestContext ctx = RequestContext.get();
376       InetAddress remoteAddr = null;
377       if (ctx != null) {
378         remoteAddr = ctx.getRemoteAddress();
379       }
380       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
381           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
382           "; reason: " + result.getReason() +
383           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
384           "; request: " + result.getRequest() +
385           "; context: " + result.toContextString());
386     }
387   }
388 
389   /**
390    * Returns the active user to which authorization checks should be applied.
391    * If we are in the context of an RPC call, the remote user is used,
392    * otherwise the currently logged in user is used.
393    */
394   private User getActiveUser() throws IOException {
395     User user = RequestContext.getRequestUser();
396     if (!RequestContext.isInRequestContext()) {
397       // for non-rpc handling, fallback to system user
398       user = userProvider.getCurrent();
399     }
400     return user;
401   }
402 
403   /**
404    * Authorizes that the current user has any of the given permissions for the
405    * given table, column family and column qualifier.
406    * @param tableName Table requested
407    * @param family Column family requested
408    * @param qualifier Column qualifier requested
409    * @throws IOException if obtaining the current user fails
410    * @throws AccessDeniedException if user has no authorization
411    */
412   private void requirePermission(String request, TableName tableName, byte[] family, byte[] qualifier,
413       Action... permissions) throws IOException {
414     User user = getActiveUser();
415     AuthResult result = null;
416 
417     for (Action permission : permissions) {
418       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
419         result = AuthResult.allow(request, "Table permission granted", user,
420                                   permission, tableName, family, qualifier);
421         break;
422       } else {
423         // rest of the world
424         result = AuthResult.deny(request, "Insufficient permissions", user,
425                                  permission, tableName, family, qualifier);
426       }
427     }
428     logResult(result);
429     if (!result.isAllowed()) {
430       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
431     }
432   }
433 
434   /**
435    * Authorizes that the current user has any of the given permissions to access the table.
436    *
437    * @param tableName Table requested
438    * @param permissions Actions being requested
439    * @throws IOException if obtaining the current user fails
440    * @throws AccessDeniedException if user has no authorization
441    */
442   private void requireAccess(String request, TableName tableName,
443       Action... permissions) throws IOException {
444     User user = getActiveUser();
445     AuthResult result = null;
446 
447     for (Action permission : permissions) {
448       if (authManager.hasAccess(user, tableName, permission)) {
449         result = AuthResult.allow(request, "Table permission granted", user,
450                                   permission, tableName, null, null);
451         break;
452       } else {
453         // rest of the world
454         result = AuthResult.deny(request, "Insufficient permissions", user,
455                                  permission, tableName, null, null);
456       }
457     }
458     logResult(result);
459     if (!result.isAllowed()) {
460       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
461     }
462   }
463 
464   /**
465    * Authorizes that the current user has global privileges for the given action.
466    * @param perm The action being requested
467    * @throws IOException if obtaining the current user fails
468    * @throws AccessDeniedException if authorization is denied
469    */
470   private void requirePermission(String request, Action perm) throws IOException {
471     requireGlobalPermission(request, perm, null, null);
472   }
473 
474   /**
475    * Authorizes that the current user has permission to perform the given
476    * action on the set of table column families.
477    * @param perm Action that is required
478    * @param env The current coprocessor environment
479    * @param families The map of column families-qualifiers.
480    * @throws AccessDeniedException if the authorization check failed
481    */
482   private void requirePermission(String request, Action perm,
483         RegionCoprocessorEnvironment env,
484         Map<byte[], ? extends Collection<?>> families)
485       throws IOException {
486     User user = getActiveUser();
487     AuthResult result = permissionGranted(request, user, perm, env, families);
488     logResult(result);
489 
490     if (!result.isAllowed()) {
491       throw new AccessDeniedException("Insufficient permissions (table=" +
492         env.getRegion().getTableDesc().getTableName()+
493         ((families != null && families.size() > 0) ? ", family: " +
494         result.toFamilyString() : "") + ", action=" +
495         perm.toString() + ")");
496     }
497   }
498 
499   /**
500    * Checks that the user has the given global permission. The generated
501    * audit log message will contain context information for the operation
502    * being authorized, based on the given parameters.
503    * @param perm Action being requested
504    * @param tableName Affected table name.
505    * @param familyMap Affected column families.
506    */
507   private void requireGlobalPermission(String request, Action perm, TableName tableName,
508       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
509     User user = getActiveUser();
510     if (authManager.authorize(user, perm)) {
511       logResult(AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap));
512     } else {
513       logResult(AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap));
514       throw new AccessDeniedException("Insufficient permissions for user '" +
515           (user != null ? user.getShortName() : "null") +"' (global, action=" +
516           perm.toString() + ")");
517     }
518   }
519 
520   /**
521    * Checks that the user has the given global permission. The generated
522    * audit log message will contain context information for the operation
523    * being authorized, based on the given parameters.
524    * @param perm Action being requested
525    * @param namespace
526    */
527   private void requireGlobalPermission(String request, Action perm,
528                                        String namespace) throws IOException {
529     User user = getActiveUser();
530     if (authManager.authorize(user, perm)) {
531       logResult(AuthResult.allow(request, "Global check allowed", user, perm, namespace));
532     } else {
533       logResult(AuthResult.deny(request, "Global check failed", user, perm, namespace));
534       throw new AccessDeniedException("Insufficient permissions for user '" +
535           (user != null ? user.getShortName() : "null") +"' (global, action=" +
536           perm.toString() + ")");
537     }
538   }
539 
540   /**
541    * Checks that the user has the given global or namespace permission.
542    * @param namespace
543    * @param permissions Actions being requested
544    */
545   public void requireNamespacePermission(String request, String namespace,
546       Action... permissions) throws IOException {
547     User user = getActiveUser();
548     AuthResult result = null;
549 
550     for (Action permission : permissions) {
551       if (authManager.authorize(user, namespace, permission)) {
552         result = AuthResult.allow(request, "Namespace permission granted",
553             user, permission, namespace);
554         break;
555       } else {
556         // rest of the world
557         result = AuthResult.deny(request, "Insufficient permissions", user,
558             permission, namespace);
559       }
560     }
561     logResult(result);
562     if (!result.isAllowed()) {
563       throw new AccessDeniedException("Insufficient permissions "
564           + result.toContextString());
565     }
566   }
567 
568   /**
569    * Returns <code>true</code> if the current user is allowed the given action
570    * over at least one of the column qualifiers in the given column families.
571    */
572   private boolean hasFamilyQualifierPermission(User user,
573       Action perm,
574       RegionCoprocessorEnvironment env,
575       Map<byte[], ? extends Collection<byte[]>> familyMap)
576     throws IOException {
577     HRegionInfo hri = env.getRegion().getRegionInfo();
578     TableName tableName = hri.getTable();
579 
580     if (user == null) {
581       return false;
582     }
583 
584     if (familyMap != null && familyMap.size() > 0) {
585       // at least one family must be allowed
586       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
587           familyMap.entrySet()) {
588         if (family.getValue() != null && !family.getValue().isEmpty()) {
589           for (byte[] qualifier : family.getValue()) {
590             if (authManager.matchPermission(user, tableName,
591                 family.getKey(), qualifier, perm)) {
592               return true;
593             }
594           }
595         } else {
596           if (authManager.matchPermission(user, tableName, family.getKey(),
597               perm)) {
598             return true;
599           }
600         }
601       }
602     } else if (LOG.isDebugEnabled()) {
603       LOG.debug("Empty family map passed for permission check");
604     }
605 
606     return false;
607   }
608 
609   private enum OpType {
610     GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
611     GET("get"),
612     EXISTS("exists"),
613     SCAN("scan"),
614     PUT("put"),
615     DELETE("delete"),
616     CHECK_AND_PUT("checkAndPut"),
617     CHECK_AND_DELETE("checkAndDelete"),
618     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
619     APPEND("append"),
620     INCREMENT("increment");
621 
622     private String type;
623 
624     private OpType(String type) {
625       this.type = type;
626     }
627 
628     @Override
629     public String toString() {
630       return type;
631     }
632   }
633 
634   /**
635    * Determine if cell ACLs covered by the operation grant access. This is expensive.
636    * @return false if cell ACLs failed to grant access, true otherwise
637    * @throws IOException
638    */
639   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
640       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
641       throws IOException {
642     if (!cellFeaturesEnabled) {
643       return false;
644     }
645     long cellGrants = 0;
646     User user = getActiveUser();
647     long latestCellTs = 0;
648     Get get = new Get(row);
649     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
650     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
651     // version. We have to get every cell version and check its TS against the TS asked for in
652     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
653     // consider only one such passing cell. In case of Delete we have to consider all the cell
654     // versions under this passing version. When Delete Mutation contains columns which are a
655     // version delete just consider only one version for those column cells.
656     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
657     if (considerCellTs) {
658       get.setMaxVersions();
659     } else {
660       get.setMaxVersions(1);
661     }
662     boolean diffCellTsFromOpTs = false;
663     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
664       byte[] col = entry.getKey();
665       // TODO: HBASE-7114 could possibly unify the collection type in family
666       // maps so we would not need to do this
667       if (entry.getValue() instanceof Set) {
668         Set<byte[]> set = (Set<byte[]>)entry.getValue();
669         if (set == null || set.isEmpty()) {
670           get.addFamily(col);
671         } else {
672           for (byte[] qual: set) {
673             get.addColumn(col, qual);
674           }
675         }
676       } else if (entry.getValue() instanceof List) {
677         List<Cell> list = (List<Cell>)entry.getValue();
678         if (list == null || list.isEmpty()) {
679           get.addFamily(col);
680         } else {
681           // In case of family delete, a Cell will be added into the list with Qualifier as null.
682           for (Cell cell : list) {
683             if (cell.getQualifierLength() == 0
684                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
685                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
686               get.addFamily(col);
687             } else {
688               get.addColumn(col, CellUtil.cloneQualifier(cell));
689             }
690             if (considerCellTs) {
691               long cellTs = cell.getTimestamp();
692               latestCellTs = Math.max(latestCellTs, cellTs);
693               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
694             }
695           }
696         }
697       } else {
698         throw new RuntimeException("Unhandled collection type " +
699           entry.getValue().getClass().getName());
700       }
701     }
702     // We want to avoid looking into the future. So, if the cells of the
703     // operation specify a timestamp, or the operation itself specifies a
704     // timestamp, then we use the maximum ts found. Otherwise, we bound
705     // the Get to the current server time. We add 1 to the timerange since
706     // the upper bound of a timerange is exclusive yet we need to examine
707     // any cells found there inclusively.
708     long latestTs = Math.max(opTs, latestCellTs);
709     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
710       latestTs = EnvironmentEdgeManager.currentTime();
711     }
712     get.setTimeRange(0, latestTs + 1);
713     // In case of Put operation we set to read all versions. This was done to consider the case
714     // where columns are added with TS other than the Mutation TS. But normally this wont be the
715     // case with Put. There no need to get all versions but get latest version only.
716     if (!diffCellTsFromOpTs && request == OpType.PUT) {
717       get.setMaxVersions(1);
718     }
719     if (LOG.isTraceEnabled()) {
720       LOG.trace("Scanning for cells with " + get);
721     }
722     // This Map is identical to familyMap. The key is a BR rather than byte[].
723     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
724     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
725     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
726     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
727       if (entry.getValue() instanceof List) {
728         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
729       }
730     }
731     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
732     List<Cell> cells = Lists.newArrayList();
733     Cell prevCell = null;
734     ByteRange curFam = new SimpleMutableByteRange();
735     boolean curColAllVersions = (request == OpType.DELETE);
736     long curColCheckTs = opTs;
737     boolean foundColumn = false;
738     try {
739       boolean more = false;
740       do {
741         cells.clear();
742         // scan with limit as 1 to hold down memory use on wide rows
743         more = NextState.hasMoreValues(scanner.next(cells, 1));
744         for (Cell cell: cells) {
745           if (LOG.isTraceEnabled()) {
746             LOG.trace("Found cell " + cell);
747           }
748           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
749           if (colChange) foundColumn = false;
750           prevCell = cell;
751           if (!curColAllVersions && foundColumn) {
752             continue;
753           }
754           if (colChange && considerCellTs) {
755             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
756             List<Cell> cols = familyMap1.get(curFam);
757             for (Cell col : cols) {
758               // null/empty qualifier is used to denote a Family delete. The TS and delete type
759               // associated with this is applicable for all columns within the family. That is
760               // why the below (col.getQualifierLength() == 0) check.
761               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
762                   || CellUtil.matchingQualifier(cell, col)) {
763                 byte type = col.getTypeByte();
764                 if (considerCellTs) {
765                   curColCheckTs = col.getTimestamp();
766                 }
767                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
768                 // a version delete for a column no need to check all the covering cells within
769                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
770                 // One version delete types are Delete/DeleteFamilyVersion
771                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
772                     || (KeyValue.Type.DeleteFamily.getCode() == type);
773                 break;
774               }
775             }
776           }
777           if (cell.getTimestamp() > curColCheckTs) {
778             // Just ignore this cell. This is not a covering cell.
779             continue;
780           }
781           foundColumn = true;
782           for (Action action: actions) {
783             // Are there permissions for this user for the cell?
784             if (!authManager.authorize(user, getTableName(e), cell, action)) {
785               // We can stop if the cell ACL denies access
786               return false;
787             }
788           }
789           cellGrants++;
790         }
791       } while (more);
792     } catch (AccessDeniedException ex) {
793       throw ex;
794     } catch (IOException ex) {
795       LOG.error("Exception while getting cells to calculate covering permission", ex);
796     } finally {
797       scanner.close();
798     }
799     // We should not authorize unless we have found one or more cell ACLs that
800     // grant access. This code is used to check for additional permissions
801     // after no table or CF grants are found.
802     return cellGrants > 0;
803   }
804 
805   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
806     // Iterate over the entries in the familyMap, replacing the cells therein
807     // with new cells including the ACL data
808     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
809       List<Cell> newCells = Lists.newArrayList();
810       for (Cell cell: e.getValue()) {
811         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
812         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
813         if (cell.getTagsLength() > 0) {
814           Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
815             cell.getTagsOffset(), cell.getTagsLength());
816           while (tagIterator.hasNext()) {
817             tags.add(tagIterator.next());
818           }
819         }
820         newCells.add(new TagRewriteCell(cell, Tag.fromList(tags)));
821       }
822       // This is supposed to be safe, won't CME
823       e.setValue(newCells);
824     }
825   }
826 
827   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
828   // type is reserved and should not be explicitly set by user.
829   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
830     // Superusers are allowed to store cells unconditionally.
831     if (superusers.contains(user.getShortName())) {
832       return;
833     }
834     // We already checked (prePut vs preBatchMutation)
835     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
836       return;
837     }
838     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
839       Cell cell = cellScanner.current();
840       if (cell.getTagsLength() > 0) {
841         Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
842           cell.getTagsLength());
843         while (tagsItr.hasNext()) {
844           if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
845             throw new AccessDeniedException("Mutation contains cell with reserved type tag");
846           }
847         }
848       }
849     }
850     m.setAttribute(TAG_CHECK_PASSED, TRUE);
851   }
852 
853   /* ---- MasterObserver implementation ---- */
854   @Override
855   public void start(CoprocessorEnvironment env) throws IOException {
856     CompoundConfiguration conf = new CompoundConfiguration();
857     conf.add(env.getConfiguration());
858 
859     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
860       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
861 
862     cellFeaturesEnabled = HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS;
863     if (!cellFeaturesEnabled) {
864       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
865           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
866           + " accordingly.");
867     }
868 
869     ZooKeeperWatcher zk = null;
870     if (env instanceof MasterCoprocessorEnvironment) {
871       // if running on HMaster
872       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
873       zk = mEnv.getMasterServices().getZooKeeper();
874     } else if (env instanceof RegionServerCoprocessorEnvironment) {
875       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
876       zk = rsEnv.getRegionServerServices().getZooKeeper();
877     } else if (env instanceof RegionCoprocessorEnvironment) {
878       // if running at region
879       regionEnv = (RegionCoprocessorEnvironment) env;
880       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
881       zk = regionEnv.getRegionServerServices().getZooKeeper();
882       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
883         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
884     }
885 
886     // set the user-provider.
887     this.userProvider = UserProvider.instantiate(env.getConfiguration());
888 
889     // set up the list of users with superuser privilege
890     User user = userProvider.getCurrent();
891     superusers = Lists.asList(user.getShortName(),
892       conf.getStrings(AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
893 
894     // If zk is null or IOException while obtaining auth manager,
895     // throw RuntimeException so that the coprocessor is unloaded.
896     if (zk != null) {
897       try {
898         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
899       } catch (IOException ioe) {
900         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
901       }
902     } else {
903       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
904     }
905 
906     tableAcls = new MapMaker().weakValues().makeMap();
907   }
908 
909   @Override
910   public void stop(CoprocessorEnvironment env) {
911 
912   }
913 
914   @Override
915   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
916       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
917     Set<byte[]> families = desc.getFamiliesKeys();
918     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
919     for (byte[] family: families) {
920       familyMap.put(family, null);
921     }
922     requireNamespacePermission("createTable", desc.getTableName().getNamespaceAsString(), Action.CREATE);
923   }
924 
925   @Override
926   public void postCreateTableHandler(final ObserverContext<MasterCoprocessorEnvironment> c,
927       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
928     // When AC is used, it should be configured as the 1st CP.
929     // In Master, the table operations like create, are handled by a Thread pool but the max size
930     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
931     // sequentially only.
932     // Related code in HMaster#startServiceThreads
933     // {code}
934     //   // We depend on there being only one instance of this executor running
935     //   // at a time. To do concurrency, would need fencing of enable/disable of
936     //   // tables.
937     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
938     // {code}
939     // In future if we change this pool to have more threads, then there is a chance for thread,
940     // creating acl table, getting delayed and by that time another table creation got over and
941     // this hook is getting called. In such a case, we will need a wait logic here which will
942     // wait till the acl table is created.
943     if (AccessControlLists.isAclTable(desc)) {
944       this.aclTabAvailable = true;
945     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
946       if (!aclTabAvailable) {
947         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
948             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
949             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
950       } else {
951         String owner = desc.getOwnerString();
952         // default the table owner to current user, if not specified.
953         if (owner == null)
954           owner = getActiveUser().getShortName();
955         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
956             desc.getTableName(), null, Action.values());
957         // switch to the real hbase master user for doing the RPC on the ACL table
958         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
959           @Override
960           public Void run() throws Exception {
961             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
962                 userperm);
963             return null;
964           }
965         });
966       }
967     }
968   }
969 
970   @Override
971   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
972       throws IOException {
973     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
974   }
975 
976   @Override
977   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
978       final TableName tableName) throws IOException {
979     final Configuration conf = c.getEnvironment().getConfiguration();
980     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
981       @Override
982       public Void run() throws Exception {
983         AccessControlLists.removeTablePermissions(conf, tableName);
984         return null;
985       }
986     });
987     this.authManager.getZKPermissionWatcher().deleteTableACLNode(tableName);
988   }
989 
990   @Override
991   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
992       final TableName tableName) throws IOException {
993     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
994     final Configuration conf = c.getEnvironment().getConfiguration();
995     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
996       @Override
997       public Void run() throws Exception {
998         List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
999         if (acls != null) {
1000           tableAcls.put(tableName, acls);
1001         }
1002         return null;
1003       }
1004     });
1005   }
1006 
1007   @Override
1008   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
1009       final TableName tableName) throws IOException {
1010     final Configuration conf = ctx.getEnvironment().getConfiguration();
1011     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1012       @Override
1013       public Void run() throws Exception {
1014         List<UserPermission> perms = tableAcls.get(tableName);
1015         if (perms != null) {
1016           for (UserPermission perm : perms) {
1017             AccessControlLists.addUserPermission(conf, perm);
1018           }
1019         }
1020         tableAcls.remove(tableName);
1021         return null;
1022       }
1023     });
1024   }
1025 
1026   @Override
1027   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1028       HTableDescriptor htd) throws IOException {
1029     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1030   }
1031 
1032   @Override
1033   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
1034       TableName tableName, final HTableDescriptor htd) throws IOException {
1035     final Configuration conf = c.getEnvironment().getConfiguration();
1036     // default the table owner to current user, if not specified.
1037     final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
1038       getActiveUser().getShortName();
1039     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1040       @Override
1041       public Void run() throws Exception {
1042         UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1043           htd.getTableName(), null, Action.values());
1044         AccessControlLists.addUserPermission(conf, userperm);
1045         return null;
1046       }
1047     });
1048   }
1049 
1050   @Override
1051   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1052       HColumnDescriptor column) throws IOException {
1053     requirePermission("addColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
1054   }
1055 
1056   @Override
1057   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1058       HColumnDescriptor descriptor) throws IOException {
1059     requirePermission("modifyColumn", tableName, descriptor.getName(), null, Action.ADMIN,
1060       Action.CREATE);
1061   }
1062 
1063   @Override
1064   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1065       byte[] col) throws IOException {
1066     requirePermission("deleteColumn", tableName, col, null, Action.ADMIN, Action.CREATE);
1067   }
1068 
1069   @Override
1070   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
1071       final TableName tableName, final byte[] col) throws IOException {
1072     final Configuration conf = c.getEnvironment().getConfiguration();
1073     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1074       @Override
1075       public Void run() throws Exception {
1076         AccessControlLists.removeTablePermissions(conf, tableName, col);
1077         return null;
1078       }
1079     });
1080   }
1081 
1082   @Override
1083   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1084       throws IOException {
1085     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1086   }
1087 
1088   @Override
1089   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1090       throws IOException {
1091     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1092       throw new AccessDeniedException("Not allowed to disable "
1093           + AccessControlLists.ACL_TABLE_NAME + " table.");
1094     }
1095     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1096   }
1097 
1098   @Override
1099   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1100       ServerName srcServer, ServerName destServer) throws IOException {
1101     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1102   }
1103 
1104   @Override
1105   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1106       throws IOException {
1107     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1108   }
1109 
1110   @Override
1111   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1112       boolean force) throws IOException {
1113     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1114   }
1115 
1116   @Override
1117   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1118       HRegionInfo regionInfo) throws IOException {
1119     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1120   }
1121 
1122   @Override
1123   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1124       throws IOException {
1125     requirePermission("balance", Action.ADMIN);
1126   }
1127 
1128   @Override
1129   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1130       boolean newValue) throws IOException {
1131     requirePermission("balanceSwitch", Action.ADMIN);
1132     return newValue;
1133   }
1134 
1135   @Override
1136   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1137       throws IOException {
1138     requirePermission("shutdown", Action.ADMIN);
1139   }
1140 
1141   @Override
1142   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1143       throws IOException {
1144     requirePermission("stopMaster", Action.ADMIN);
1145   }
1146 
1147   @Override
1148   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1149       throws IOException {
1150     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1151       .getConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1152       // initialize the ACL storage table
1153       AccessControlLists.createACLTable(ctx.getEnvironment().getMasterServices());
1154     } else {
1155       aclTabAvailable = true;
1156     }
1157   }
1158 
1159   @Override
1160   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1161       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1162       throws IOException {
1163     requirePermission("snapshot", hTableDescriptor.getTableName(), null, null,
1164       Permission.Action.ADMIN);
1165   }
1166 
1167   @Override
1168   public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1169       final SnapshotDescription snapshot) throws IOException {
1170     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1171       // list it, if user is the owner of snapshot
1172     } else {
1173       requirePermission("listSnapshot", Action.ADMIN);
1174     }
1175   }
1176 
1177   @Override
1178   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1179       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1180       throws IOException {
1181     requirePermission("clone", Action.ADMIN);
1182   }
1183 
1184   @Override
1185   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1186       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1187       throws IOException {
1188     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1189       requirePermission("restoreSnapshot", hTableDescriptor.getTableName(), null, null,
1190         Permission.Action.ADMIN);
1191     } else {
1192       requirePermission("restoreSnapshot", Action.ADMIN);
1193     }
1194   }
1195 
1196   @Override
1197   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1198       final SnapshotDescription snapshot) throws IOException {
1199     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1200       // Snapshot owner is allowed to delete the snapshot
1201     } else {
1202       requirePermission("deleteSnapshot", Action.ADMIN);
1203     }
1204   }
1205 
1206   @Override
1207   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1208       NamespaceDescriptor ns) throws IOException {
1209     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1210   }
1211 
1212   @Override
1213   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1214       throws IOException {
1215     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1216   }
1217 
1218   @Override
1219   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1220       final String namespace) throws IOException {
1221     final Configuration conf = ctx.getEnvironment().getConfiguration();
1222     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1223       @Override
1224       public Void run() throws Exception {
1225         AccessControlLists.removeNamespacePermissions(conf, namespace);
1226         return null;
1227       }
1228     });
1229     LOG.info(namespace + "entry deleted in " + AccessControlLists.ACL_TABLE_NAME + " table.");
1230   }
1231 
1232   @Override
1233   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1234       NamespaceDescriptor ns) throws IOException {
1235     // We require only global permission so that 
1236     // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1237     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1238   }
1239 
1240   @Override
1241   public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1242       throws IOException {
1243     requireNamespacePermission("getNamespaceDescriptor", namespace, Action.ADMIN);
1244   }
1245 
1246   @Override
1247   public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1248       List<NamespaceDescriptor> descriptors) throws IOException {
1249     // Retains only those which passes authorization checks, as the checks weren't done as part
1250     // of preGetTableDescriptors.
1251     Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1252     while (itr.hasNext()) {
1253       NamespaceDescriptor desc = itr.next();
1254       try {
1255         requireNamespacePermission("listNamespaces", desc.getName(), Action.ADMIN);
1256       } catch (AccessDeniedException e) {
1257         itr.remove();
1258       }
1259     }
1260   }
1261 
1262   @Override
1263   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1264       final TableName tableName) throws IOException {
1265     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1266   }
1267 
1268   /* ---- RegionObserver implementation ---- */
1269 
1270   @Override
1271   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1272       throws IOException {
1273     RegionCoprocessorEnvironment env = e.getEnvironment();
1274     final HRegion region = env.getRegion();
1275     if (region == null) {
1276       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1277     } else {
1278       HRegionInfo regionInfo = region.getRegionInfo();
1279       if (regionInfo.getTable().isSystemTable()) {
1280         isSystemOrSuperUser(regionEnv.getConfiguration());
1281       } else {
1282         requirePermission("preOpen", Action.ADMIN);
1283       }
1284     }
1285   }
1286 
1287   @Override
1288   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1289     RegionCoprocessorEnvironment env = c.getEnvironment();
1290     final HRegion region = env.getRegion();
1291     if (region == null) {
1292       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1293       return;
1294     }
1295     if (AccessControlLists.isAclRegion(region)) {
1296       aclRegion = true;
1297       // When this region is under recovering state, initialize will be handled by postLogReplay
1298       if (!region.isRecovering()) {
1299         try {
1300           initialize(env);
1301         } catch (IOException ex) {
1302           // if we can't obtain permissions, it's better to fail
1303           // than perform checks incorrectly
1304           throw new RuntimeException("Failed to initialize permissions cache", ex);
1305         }
1306       }
1307     } else {
1308       initialized = true;
1309     }
1310   }
1311 
1312   @Override
1313   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1314     if (aclRegion) {
1315       try {
1316         initialize(c.getEnvironment());
1317       } catch (IOException ex) {
1318         // if we can't obtain permissions, it's better to fail
1319         // than perform checks incorrectly
1320         throw new RuntimeException("Failed to initialize permissions cache", ex);
1321       }
1322     }
1323   }
1324 
1325   @Override
1326   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1327     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1328         Action.CREATE);
1329   }
1330 
1331   @Override
1332   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1333     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1334   }
1335 
1336   @Override
1337   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1338       byte[] splitRow) throws IOException {
1339     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1340   }
1341 
1342   @Override
1343   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1344       final Store store, final InternalScanner scanner, final ScanType scanType)
1345           throws IOException {
1346     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1347         Action.CREATE);
1348     return scanner;
1349   }
1350 
1351   @Override
1352   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1353       final byte [] row, final byte [] family, final Result result)
1354       throws IOException {
1355     assert family != null;
1356     RegionCoprocessorEnvironment env = c.getEnvironment();
1357     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, null);
1358     User user = getActiveUser();
1359     AuthResult authResult = permissionGranted(OpType.GET_CLOSEST_ROW_BEFORE, user, env, families,
1360       Action.READ);
1361     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1362       authResult.setAllowed(checkCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, env, row,
1363         families, HConstants.LATEST_TIMESTAMP, Action.READ));
1364       authResult.setReason("Covering cell set");
1365     }
1366     logResult(authResult);
1367     if (!authResult.isAllowed()) {
1368       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1369     }
1370   }
1371 
1372   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1373       final Query query, OpType opType) throws IOException {
1374     Filter filter = query.getFilter();
1375     // Don't wrap an AccessControlFilter
1376     if (filter != null && filter instanceof AccessControlFilter) {
1377       return;
1378     }
1379     User user = getActiveUser();
1380     RegionCoprocessorEnvironment env = c.getEnvironment();
1381     Map<byte[],? extends Collection<byte[]>> families = null;
1382     switch (opType) {
1383     case GET:
1384     case EXISTS:
1385       families = ((Get)query).getFamilyMap();
1386       break;
1387     case SCAN:
1388       families = ((Scan)query).getFamilyMap();
1389       break;
1390     default:
1391       throw new RuntimeException("Unhandled operation " + opType);
1392     }
1393     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1394     HRegion region = getRegion(env);
1395     TableName table = getTableName(region);
1396     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1397     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1398       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1399     }
1400     if (!authResult.isAllowed()) {
1401       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1402         // Old behavior: Scan with only qualifier checks if we have partial
1403         // permission. Backwards compatible behavior is to throw an
1404         // AccessDeniedException immediately if there are no grants for table
1405         // or CF or CF+qual. Only proceed with an injected filter if there are
1406         // grants for qualifiers. Otherwise we will fall through below and log
1407         // the result and throw an ADE. We may end up checking qualifier
1408         // grants three times (permissionGranted above, here, and in the
1409         // filter) but that's the price of backwards compatibility.
1410         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1411           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1412             AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1413             cfVsMaxVersions);
1414           // wrap any existing filter
1415           if (filter != null) {
1416             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1417               Lists.newArrayList(ourFilter, filter));
1418           }
1419           authResult.setAllowed(true);
1420           authResult.setReason("Access allowed with filter");
1421           switch (opType) {
1422           case GET:
1423           case EXISTS:
1424             ((Get)query).setFilter(ourFilter);
1425             break;
1426           case SCAN:
1427             ((Scan)query).setFilter(ourFilter);
1428             break;
1429           default:
1430             throw new RuntimeException("Unhandled operation " + opType);
1431           }
1432         }
1433       } else {
1434         // New behavior: Any access we might be granted is more fine-grained
1435         // than whole table or CF. Simply inject a filter and return what is
1436         // allowed. We will not throw an AccessDeniedException. This is a
1437         // behavioral change since 0.96.
1438         Filter ourFilter = new AccessControlFilter(authManager, user, table,
1439           AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1440         // wrap any existing filter
1441         if (filter != null) {
1442           ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1443             Lists.newArrayList(ourFilter, filter));
1444         }
1445         authResult.setAllowed(true);
1446         authResult.setReason("Access allowed with filter");
1447         switch (opType) {
1448         case GET:
1449         case EXISTS:
1450           ((Get)query).setFilter(ourFilter);
1451           break;
1452         case SCAN:
1453           ((Scan)query).setFilter(ourFilter);
1454           break;
1455         default:
1456           throw new RuntimeException("Unhandled operation " + opType);
1457         }
1458       }
1459     }
1460 
1461     logResult(authResult);
1462     if (!authResult.isAllowed()) {
1463       throw new AccessDeniedException("Insufficient permissions (table=" + table +
1464         ", action=READ)");
1465     }
1466   }
1467 
1468   @Override
1469   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1470       final Get get, final List<Cell> result) throws IOException {
1471     internalPreRead(c, get, OpType.GET);
1472   }
1473 
1474   @Override
1475   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1476       final Get get, final boolean exists) throws IOException {
1477     internalPreRead(c, get, OpType.EXISTS);
1478     return exists;
1479   }
1480 
1481   @Override
1482   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1483       final Put put, final WALEdit edit, final Durability durability)
1484       throws IOException {
1485     // Require WRITE permission to the table, CF, or top visible value, if any.
1486     // NOTE: We don't need to check the permissions for any earlier Puts
1487     // because we treat the ACLs in each Put as timestamped like any other
1488     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1489     // change the ACL of any previous Put. This allows simple evolution of
1490     // security policy over time without requiring expensive updates.
1491     User user = getActiveUser();
1492     checkForReservedTagPresence(user, put);
1493     RegionCoprocessorEnvironment env = c.getEnvironment();
1494     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1495     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1496     logResult(authResult);
1497     if (!authResult.isAllowed()) {
1498       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1499         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1500       } else {
1501         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1502       }
1503     }
1504     // Add cell ACLs from the operation to the cells themselves
1505     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1506     if (bytes != null) {
1507       if (cellFeaturesEnabled) {
1508         addCellPermissions(bytes, put.getFamilyCellMap());
1509       } else {
1510         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1511       }
1512     }
1513   }
1514 
1515   @Override
1516   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1517       final Put put, final WALEdit edit, final Durability durability) {
1518     if (aclRegion) {
1519       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1520     }
1521   }
1522 
1523   @Override
1524   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1525       final Delete delete, final WALEdit edit, final Durability durability)
1526       throws IOException {
1527     // An ACL on a delete is useless, we shouldn't allow it
1528     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1529       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1530     }
1531     // Require WRITE permissions on all cells covered by the delete. Unlike
1532     // for Puts we need to check all visible prior versions, because a major
1533     // compaction could remove them. If the user doesn't have permission to
1534     // overwrite any of the visible versions ('visible' defined as not covered
1535     // by a tombstone already) then we have to disallow this operation.
1536     RegionCoprocessorEnvironment env = c.getEnvironment();
1537     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1538     User user = getActiveUser();
1539     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1540     logResult(authResult);
1541     if (!authResult.isAllowed()) {
1542       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1543         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1544       } else {
1545         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1546       }
1547     }
1548   }
1549 
1550   @Override
1551   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1552       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1553     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1554       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1555       for (int i = 0; i < miniBatchOp.size(); i++) {
1556         Mutation m = miniBatchOp.getOperation(i);
1557         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1558           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1559           // perm check
1560           OpType opType;
1561           if (m instanceof Put) {
1562             checkForReservedTagPresence(getActiveUser(), m);
1563             opType = OpType.PUT;
1564           } else {
1565             opType = OpType.DELETE;
1566           }
1567           AuthResult authResult = null;
1568           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(), m.getFamilyCellMap(),
1569               m.getTimeStamp(), Action.WRITE)) {
1570             authResult = AuthResult.allow(opType.toString(), "Covering cell set", getActiveUser(),
1571                 Action.WRITE, table, m.getFamilyCellMap());
1572           } else {
1573             authResult = AuthResult.deny(opType.toString(), "Covering cell set", getActiveUser(),
1574                 Action.WRITE, table, m.getFamilyCellMap());
1575           }
1576           logResult(authResult);
1577           if (!authResult.isAllowed()) {
1578             throw new AccessDeniedException("Insufficient permissions "
1579                 + authResult.toContextString());
1580           }
1581         }
1582       }
1583     }
1584   }
1585 
1586   @Override
1587   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1588       final Delete delete, final WALEdit edit, final Durability durability)
1589       throws IOException {
1590     if (aclRegion) {
1591       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1592     }
1593   }
1594 
1595   @Override
1596   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1597       final byte [] row, final byte [] family, final byte [] qualifier,
1598       final CompareFilter.CompareOp compareOp,
1599       final ByteArrayComparable comparator, final Put put,
1600       final boolean result) throws IOException {
1601     // Require READ and WRITE permissions on the table, CF, and KV to update
1602     User user = getActiveUser();
1603     checkForReservedTagPresence(user, put);
1604     RegionCoprocessorEnvironment env = c.getEnvironment();
1605     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1606     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1607       Action.READ, Action.WRITE);
1608     logResult(authResult);
1609     if (!authResult.isAllowed()) {
1610       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1611         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1612       } else {
1613         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1614       }
1615     }
1616     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1617     if (bytes != null) {
1618       if (cellFeaturesEnabled) {
1619         addCellPermissions(bytes, put.getFamilyCellMap());
1620       } else {
1621         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1622       }
1623     }
1624     return result;
1625   }
1626 
1627   @Override
1628   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1629       final byte[] row, final byte[] family, final byte[] qualifier,
1630       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1631       final boolean result) throws IOException {
1632     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1633       // We had failure with table, cf and q perm checks and now giving a chance for cell
1634       // perm check
1635       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1636       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1637       AuthResult authResult = null;
1638       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1639           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1640         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1641             getActiveUser(), Action.READ, table, families);
1642       } else {
1643         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1644             getActiveUser(), Action.READ, table, families);
1645       }
1646       logResult(authResult);
1647       if (!authResult.isAllowed()) {
1648         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1649       }
1650     }
1651     return result;
1652   }
1653 
1654   @Override
1655   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1656       final byte [] row, final byte [] family, final byte [] qualifier,
1657       final CompareFilter.CompareOp compareOp,
1658       final ByteArrayComparable comparator, final Delete delete,
1659       final boolean result) throws IOException {
1660     // An ACL on a delete is useless, we shouldn't allow it
1661     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1662       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1663           delete.toString());
1664     }
1665     // Require READ and WRITE permissions on the table, CF, and the KV covered
1666     // by the delete
1667     RegionCoprocessorEnvironment env = c.getEnvironment();
1668     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1669     User user = getActiveUser();
1670     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1671       Action.READ, Action.WRITE);
1672     logResult(authResult);
1673     if (!authResult.isAllowed()) {
1674       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1675         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1676       } else {
1677         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1678       }
1679     }
1680     return result;
1681   }
1682 
1683   @Override
1684   public boolean preCheckAndDeleteAfterRowLock(
1685       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1686       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1687       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1688       throws IOException {
1689     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1690       // We had failure with table, cf and q perm checks and now giving a chance for cell
1691       // perm check
1692       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1693       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1694       AuthResult authResult = null;
1695       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1696           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1697         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1698             getActiveUser(), Action.READ, table, families);
1699       } else {
1700         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1701             getActiveUser(), Action.READ, table, families);
1702       }
1703       logResult(authResult);
1704       if (!authResult.isAllowed()) {
1705         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1706       }
1707     }
1708     return result;
1709   }
1710 
1711   @Override
1712   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1713       final byte [] row, final byte [] family, final byte [] qualifier,
1714       final long amount, final boolean writeToWAL)
1715       throws IOException {
1716     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1717     // incremented value
1718     RegionCoprocessorEnvironment env = c.getEnvironment();
1719     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1720     User user = getActiveUser();
1721     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1722       Action.WRITE);
1723     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1724       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1725         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1726       authResult.setReason("Covering cell set");
1727     }
1728     logResult(authResult);
1729     if (!authResult.isAllowed()) {
1730       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1731     }
1732     return -1;
1733   }
1734 
1735   @Override
1736   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1737       throws IOException {
1738     // Require WRITE permission to the table, CF, and the KV to be appended
1739     User user = getActiveUser();
1740     checkForReservedTagPresence(user, append);
1741     RegionCoprocessorEnvironment env = c.getEnvironment();
1742     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1743     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1744     logResult(authResult);
1745     if (!authResult.isAllowed()) {
1746       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1747         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1748       } else {
1749         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1750       }
1751     }
1752     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1753     if (bytes != null) {
1754       if (cellFeaturesEnabled) {
1755         addCellPermissions(bytes, append.getFamilyCellMap());
1756       } else {
1757         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1758       }
1759     }
1760     return null;
1761   }
1762 
1763   @Override
1764   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1765       final Append append) throws IOException {
1766     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1767       // We had failure with table, cf and q perm checks and now giving a chance for cell
1768       // perm check
1769       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1770       AuthResult authResult = null;
1771       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1772           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1773         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1774             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1775       } else {
1776         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1777             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1778       }
1779       logResult(authResult);
1780       if (!authResult.isAllowed()) {
1781         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1782       }
1783     }
1784     return null;
1785   }
1786 
1787   @Override
1788   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1789       final Increment increment)
1790       throws IOException {
1791     // Require WRITE permission to the table, CF, and the KV to be replaced by
1792     // the incremented value
1793     User user = getActiveUser();
1794     checkForReservedTagPresence(user, increment);
1795     RegionCoprocessorEnvironment env = c.getEnvironment();
1796     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1797     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1798       Action.WRITE);
1799     logResult(authResult);
1800     if (!authResult.isAllowed()) {
1801       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1802         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1803       } else {
1804         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1805       }
1806     }
1807     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1808     if (bytes != null) {
1809       if (cellFeaturesEnabled) {
1810         addCellPermissions(bytes, increment.getFamilyCellMap());
1811       } else {
1812         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1813       }
1814     }
1815     return null;
1816   }
1817 
1818   @Override
1819   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1820       final Increment increment) throws IOException {
1821     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1822       // We had failure with table, cf and q perm checks and now giving a chance for cell
1823       // perm check
1824       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1825       AuthResult authResult = null;
1826       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1827           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1828         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1829             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1830       } else {
1831         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1832             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1833       }
1834       logResult(authResult);
1835       if (!authResult.isAllowed()) {
1836         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1837       }
1838     }
1839     return null;
1840   }
1841 
1842   @Override
1843   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1844       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1845     // If the HFile version is insufficient to persist tags, we won't have any
1846     // work to do here
1847     if (!cellFeaturesEnabled) {
1848       return newCell;
1849     }
1850 
1851     // Collect any ACLs from the old cell
1852     List<Tag> tags = Lists.newArrayList();
1853     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1854     if (oldCell != null) {
1855       // Save an object allocation where we can
1856       if (oldCell.getTagsLength() > 0) {
1857         Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell.getTagsArray(),
1858           oldCell.getTagsOffset(), oldCell.getTagsLength());
1859         while (tagIterator.hasNext()) {
1860           Tag tag = tagIterator.next();
1861           if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
1862             // Not an ACL tag, just carry it through
1863             if (LOG.isTraceEnabled()) {
1864               LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
1865                 " length " + tag.getTagLength());
1866             }
1867             tags.add(tag);
1868           } else {
1869             // Merge the perms from the older ACL into the current permission set
1870             ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
1871               AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
1872                 tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
1873             perms.putAll(kvPerms);
1874           }
1875         }
1876       }
1877     }
1878 
1879     // Do we have an ACL on the operation?
1880     byte[] aclBytes = mutation.getACL();
1881     if (aclBytes != null) {
1882       // Yes, use it
1883       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
1884     } else {
1885       // No, use what we carried forward
1886       if (perms != null) {
1887         // TODO: If we collected ACLs from more than one tag we may have a
1888         // List<Permission> of size > 1, this can be collapsed into a single
1889         // Permission
1890         if (LOG.isTraceEnabled()) {
1891           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1892         }
1893         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
1894           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
1895       }
1896     }
1897 
1898     // If we have no tags to add, just return
1899     if (tags.isEmpty()) {
1900       return newCell;
1901     }
1902 
1903     Cell rewriteCell = new TagRewriteCell(newCell, Tag.fromList(tags));
1904     return rewriteCell;
1905   }
1906 
1907   @Override
1908   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1909       final Scan scan, final RegionScanner s) throws IOException {
1910     internalPreRead(c, scan, OpType.SCAN);
1911     return s;
1912   }
1913 
1914   @Override
1915   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1916       final Scan scan, final RegionScanner s) throws IOException {
1917     User user = getActiveUser();
1918     if (user != null && user.getShortName() != null) {      // store reference to scanner owner for later checks
1919       scannerOwners.put(s, user.getShortName());
1920     }
1921     return s;
1922   }
1923 
1924   @Override
1925   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1926       final InternalScanner s, final List<Result> result,
1927       final int limit, final boolean hasNext) throws IOException {
1928     requireScannerOwner(s);
1929     return hasNext;
1930   }
1931 
1932   @Override
1933   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1934       final InternalScanner s) throws IOException {
1935     requireScannerOwner(s);
1936   }
1937 
1938   @Override
1939   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1940       final InternalScanner s) throws IOException {
1941     // clean up any associated owner mapping
1942     scannerOwners.remove(s);
1943   }
1944 
1945   /**
1946    * Verify, when servicing an RPC, that the caller is the scanner owner.
1947    * If so, we assume that access control is correctly enforced based on
1948    * the checks performed in preScannerOpen()
1949    */
1950   private void requireScannerOwner(InternalScanner s)
1951       throws AccessDeniedException {
1952     if (RequestContext.isInRequestContext()) {
1953       String requestUserName = RequestContext.getRequestUserName();
1954       String owner = scannerOwners.get(s);
1955       if (owner != null && !owner.equals(requestUserName)) {
1956         throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
1957       }
1958     }
1959   }
1960 
1961   /**
1962    * Verifies user has CREATE privileges on
1963    * the Column Families involved in the bulkLoadHFile
1964    * request. Specific Column Write privileges are presently
1965    * ignored.
1966    */
1967   @Override
1968   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1969       List<Pair<byte[], String>> familyPaths) throws IOException {
1970     for(Pair<byte[],String> el : familyPaths) {
1971       requirePermission("preBulkLoadHFile",
1972           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
1973           el.getFirst(),
1974           null,
1975           Action.CREATE);
1976     }
1977   }
1978 
1979   /**
1980    * Authorization check for
1981    * SecureBulkLoadProtocol.prepareBulkLoad()
1982    * @param ctx the context
1983    * @param request the request
1984    * @throws IOException
1985    */
1986   @Override
1987   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
1988                                  PrepareBulkLoadRequest request) throws IOException {
1989     requireAccess("prePareBulkLoad",
1990         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
1991   }
1992 
1993   /**
1994    * Authorization security check for
1995    * SecureBulkLoadProtocol.cleanupBulkLoad()
1996    * @param ctx the context
1997    * @param request the request
1998    * @throws IOException
1999    */
2000   @Override
2001   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2002                                  CleanupBulkLoadRequest request) throws IOException {
2003     requireAccess("preCleanupBulkLoad",
2004         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2005   }
2006 
2007   /* ---- EndpointObserver implementation ---- */
2008 
2009   @Override
2010   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2011       Service service, String methodName, Message request) throws IOException {
2012     // Don't intercept calls to our own AccessControlService, we check for
2013     // appropriate permissions in the service handlers
2014     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2015       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
2016         methodName + ")",
2017         getTableName(ctx.getEnvironment()), null, null,
2018         Action.EXEC);
2019     }
2020     return request;
2021   }
2022 
2023   @Override
2024   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2025       Service service, String methodName, Message request, Message.Builder responseBuilder)
2026       throws IOException { }
2027 
2028   /* ---- Protobuf AccessControlService implementation ---- */
2029 
2030   @Override
2031   public void grant(RpcController controller,
2032                     AccessControlProtos.GrantRequest request,
2033                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2034     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2035     AccessControlProtos.GrantResponse response = null;
2036     try {
2037       // verify it's only running at .acl.
2038       if (aclRegion) {
2039         if (!initialized) {
2040           throw new CoprocessorException("AccessController not yet initialized");
2041         }
2042         if (LOG.isDebugEnabled()) {
2043           LOG.debug("Received request to grant access permission " + perm.toString());
2044         }
2045 
2046         switch(request.getUserPermission().getPermission().getType()) {
2047           case Global :
2048           case Table :
2049             requirePermission("grant", perm.getTableName(), perm.getFamily(),
2050                 perm.getQualifier(), Action.ADMIN);
2051             break;
2052           case Namespace :
2053             requireGlobalPermission("grant", Action.ADMIN, perm.getNamespace());
2054             break;
2055         }
2056 
2057         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2058           @Override
2059           public Void run() throws Exception {
2060             AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2061             return null;
2062           }
2063         });
2064 
2065         if (AUDITLOG.isTraceEnabled()) {
2066           // audit log should store permission changes in addition to auth results
2067           AUDITLOG.trace("Granted permission " + perm.toString());
2068         }
2069       } else {
2070         throw new CoprocessorException(AccessController.class, "This method "
2071             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2072       }
2073       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2074     } catch (IOException ioe) {
2075       // pass exception back up
2076       ResponseConverter.setControllerException(controller, ioe);
2077     }
2078     done.run(response);
2079   }
2080 
2081   @Override
2082   public void revoke(RpcController controller,
2083                      AccessControlProtos.RevokeRequest request,
2084                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2085     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2086     AccessControlProtos.RevokeResponse response = null;
2087     try {
2088       // only allowed to be called on _acl_ region
2089       if (aclRegion) {
2090         if (!initialized) {
2091           throw new CoprocessorException("AccessController not yet initialized");
2092         }
2093         if (LOG.isDebugEnabled()) {
2094           LOG.debug("Received request to revoke access permission " + perm.toString());
2095         }
2096 
2097         switch(request.getUserPermission().getPermission().getType()) {
2098           case Global :
2099           case Table :
2100             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2101                               perm.getQualifier(), Action.ADMIN);
2102             break;
2103           case Namespace :
2104             requireGlobalPermission("revoke", Action.ADMIN, perm.getNamespace());
2105             break;
2106         }
2107 
2108         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2109           @Override
2110           public Void run() throws Exception {
2111             AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2112             return null;
2113           }
2114         });
2115 
2116         if (AUDITLOG.isTraceEnabled()) {
2117           // audit log should record all permission changes
2118           AUDITLOG.trace("Revoked permission " + perm.toString());
2119         }
2120       } else {
2121         throw new CoprocessorException(AccessController.class, "This method "
2122             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2123       }
2124       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2125     } catch (IOException ioe) {
2126       // pass exception back up
2127       ResponseConverter.setControllerException(controller, ioe);
2128     }
2129     done.run(response);
2130   }
2131 
2132   @Override
2133   public void getUserPermissions(RpcController controller,
2134                                  AccessControlProtos.GetUserPermissionsRequest request,
2135                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2136     AccessControlProtos.GetUserPermissionsResponse response = null;
2137     try {
2138       // only allowed to be called on _acl_ region
2139       if (aclRegion) {
2140         if (!initialized) {
2141           throw new CoprocessorException("AccessController not yet initialized");
2142         }
2143         List<UserPermission> perms = null;
2144         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2145           final TableName table = request.hasTableName() ?
2146             ProtobufUtil.toTableName(request.getTableName()) : null;
2147           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2148           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2149             @Override
2150             public List<UserPermission> run() throws Exception {
2151               return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2152             }
2153           });
2154         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2155           final String namespace = request.getNamespaceName().toStringUtf8();
2156           requireNamespacePermission("userPermissions", namespace, Action.ADMIN);
2157           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2158             @Override
2159             public List<UserPermission> run() throws Exception {
2160               return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2161                 namespace);
2162             }
2163           });
2164         } else {
2165           requirePermission("userPermissions", Action.ADMIN);
2166           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2167             @Override
2168             public List<UserPermission> run() throws Exception {
2169               return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2170             }
2171           });
2172         }
2173         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2174       } else {
2175         throw new CoprocessorException(AccessController.class, "This method "
2176             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2177       }
2178     } catch (IOException ioe) {
2179       // pass exception back up
2180       ResponseConverter.setControllerException(controller, ioe);
2181     }
2182     done.run(response);
2183   }
2184 
2185   @Override
2186   public void checkPermissions(RpcController controller,
2187                                AccessControlProtos.CheckPermissionsRequest request,
2188                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2189     Permission[] permissions = new Permission[request.getPermissionCount()];
2190     for (int i=0; i < request.getPermissionCount(); i++) {
2191       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2192     }
2193     AccessControlProtos.CheckPermissionsResponse response = null;
2194     try {
2195       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2196       for (Permission permission : permissions) {
2197         if (permission instanceof TablePermission) {
2198           TablePermission tperm = (TablePermission) permission;
2199           for (Action action : permission.getActions()) {
2200             if (!tperm.getTableName().equals(tableName)) {
2201               throw new CoprocessorException(AccessController.class, String.format("This method "
2202                   + "can only execute at the table specified in TablePermission. " +
2203                   "Table of the region:%s , requested table:%s", tableName,
2204                   tperm.getTableName()));
2205             }
2206 
2207             Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2208             if (tperm.getFamily() != null) {
2209               if (tperm.getQualifier() != null) {
2210                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2211                 qualifiers.add(tperm.getQualifier());
2212                 familyMap.put(tperm.getFamily(), qualifiers);
2213               } else {
2214                 familyMap.put(tperm.getFamily(), null);
2215               }
2216             }
2217 
2218             requirePermission("checkPermissions", action, regionEnv, familyMap);
2219           }
2220 
2221         } else {
2222           for (Action action : permission.getActions()) {
2223             requirePermission("checkPermissions", action);
2224           }
2225         }
2226       }
2227       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2228     } catch (IOException ioe) {
2229       ResponseConverter.setControllerException(controller, ioe);
2230     }
2231     done.run(response);
2232   }
2233 
2234   @Override
2235   public Service getService() {
2236     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2237   }
2238 
2239   private HRegion getRegion(RegionCoprocessorEnvironment e) {
2240     return e.getRegion();
2241   }
2242 
2243   private TableName getTableName(RegionCoprocessorEnvironment e) {
2244     HRegion region = e.getRegion();
2245     if (region != null) {
2246       return getTableName(region);
2247     }
2248     return null;
2249   }
2250 
2251   private TableName getTableName(HRegion region) {
2252     HRegionInfo regionInfo = region.getRegionInfo();
2253     if (regionInfo != null) {
2254       return regionInfo.getTable();
2255     }
2256     return null;
2257   }
2258 
2259   @Override
2260   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2261       throws IOException {
2262     requirePermission("preClose", Action.ADMIN);
2263   }
2264 
2265   private void isSystemOrSuperUser(Configuration conf) throws IOException {
2266     User user = userProvider.getCurrent();
2267     if (user == null) {
2268       throw new IOException("Unable to obtain the current user, " +
2269         "authorization checks for internal operations will not work correctly!");
2270     }
2271     User activeUser = getActiveUser();
2272     if (!(superusers.contains(activeUser.getShortName()))) {
2273       throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null") +
2274         "is not system or super user.");
2275     }
2276   }
2277 
2278   @Override
2279   public void preStopRegionServer(
2280       ObserverContext<RegionServerCoprocessorEnvironment> env)
2281       throws IOException {
2282     requirePermission("preStopRegionServer", Action.ADMIN);
2283   }
2284 
2285   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2286       byte[] qualifier) {
2287     if (family == null) {
2288       return null;
2289     }
2290 
2291     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2292     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2293     return familyMap;
2294   }
2295 
2296   @Override
2297   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2298        List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2299        String regex) throws IOException {
2300     // We are delegating the authorization check to postGetTableDescriptors as we don't have
2301     // any concrete set of table names when a regex is present or the full list is requested.
2302     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2303       // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2304       // request can be granted.
2305       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2306       for (TableName tableName: tableNamesList) {
2307         // Skip checks for a table that does not exist
2308         if (!masterServices.getTableStateManager().isTablePresent(tableName))
2309           continue;
2310         requirePermission("getTableDescriptors", tableName, null, null,
2311             Action.ADMIN, Action.CREATE);
2312       }
2313     }
2314   }
2315 
2316   @Override
2317   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2318       List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2319       String regex) throws IOException {
2320     // Skipping as checks in this case are already done by preGetTableDescriptors.
2321     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2322       return;
2323     }
2324 
2325     // Retains only those which passes authorization checks, as the checks weren't done as part
2326     // of preGetTableDescriptors.
2327     Iterator<HTableDescriptor> itr = descriptors.iterator();
2328     while (itr.hasNext()) {
2329       HTableDescriptor htd = itr.next();
2330       try {
2331         requirePermission("getTableDescriptors", htd.getTableName(), null, null,
2332             Action.ADMIN, Action.CREATE);
2333       } catch (AccessDeniedException e) {
2334         itr.remove();
2335       }
2336     }
2337   }
2338 
2339   @Override
2340   public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2341       List<HTableDescriptor> descriptors, String regex) throws IOException {
2342     // Retains only those which passes authorization checks.
2343     Iterator<HTableDescriptor> itr = descriptors.iterator();
2344     while (itr.hasNext()) {
2345       HTableDescriptor htd = itr.next();
2346       try {
2347         requireAccess("getTableNames", htd.getTableName(), Action.values());
2348       } catch (AccessDeniedException e) {
2349         itr.remove();
2350       }
2351     }
2352   }
2353 
2354   @Override
2355   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA,
2356       HRegion regionB) throws IOException {
2357     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2358       Action.ADMIN);
2359   }
2360 
2361   @Override
2362   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, HRegion regionA,
2363       HRegion regionB, HRegion mergedRegion) throws IOException { }
2364 
2365   @Override
2366   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2367       HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException { }
2368 
2369   @Override
2370   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2371       HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException { }
2372 
2373   @Override
2374   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2375       HRegion regionA, HRegion regionB) throws IOException { }
2376 
2377   @Override
2378   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2379       HRegion regionA, HRegion regionB) throws IOException { }
2380 
2381   @Override
2382   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2383       throws IOException {
2384     requirePermission("preRollLogWriterRequest", Permission.Action.ADMIN);
2385   }
2386 
2387   @Override
2388   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2389       throws IOException { }
2390 
2391   @Override
2392   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2393       final String userName, final Quotas quotas) throws IOException {
2394     requirePermission("setUserQuota", Action.ADMIN);
2395   }
2396 
2397   @Override
2398   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2399       final String userName, final TableName tableName, final Quotas quotas) throws IOException {
2400     requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN);
2401   }
2402 
2403   @Override
2404   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2405       final String userName, final String namespace, final Quotas quotas) throws IOException {
2406     requirePermission("setUserNamespaceQuota", Action.ADMIN);
2407   }
2408 
2409   @Override
2410   public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2411       final TableName tableName, final Quotas quotas) throws IOException {
2412     requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
2413   }
2414 
2415   @Override
2416   public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2417       final String namespace, final Quotas quotas) throws IOException {
2418     requirePermission("setNamespaceQuota", Action.ADMIN);
2419   }
2420 
2421   @Override
2422   public ReplicationEndpoint postCreateReplicationEndPoint(
2423       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2424     return endpoint;
2425   }
2426 
2427   @Override
2428   public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2429       List<WALEntry> entries, CellScanner cells) throws IOException {
2430     requirePermission("replicateLogEntries", Action.WRITE);
2431   }
2432 
2433   @Override
2434   public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2435       List<WALEntry> entries, CellScanner cells) throws IOException {
2436   }
2437 }