View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package org.apache.hadoop.hbase.security.access;
16  
17  import java.io.IOException;
18  import java.net.InetAddress;
19  import java.security.PrivilegedExceptionAction;
20  import java.util.Collection;
21  import java.util.Collections;
22  import java.util.HashMap;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  import java.util.Set;
28  import java.util.TreeMap;
29  import java.util.TreeSet;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Cell;
36  import org.apache.hadoop.hbase.CellScanner;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.CompoundConfiguration;
39  import org.apache.hadoop.hbase.CoprocessorEnvironment;
40  import org.apache.hadoop.hbase.DoNotRetryIOException;
41  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.KeyValue;
47  import org.apache.hadoop.hbase.KeyValue.Type;
48  import org.apache.hadoop.hbase.MetaTableAccessor;
49  import org.apache.hadoop.hbase.NamespaceDescriptor;
50  import org.apache.hadoop.hbase.ServerName;
51  import org.apache.hadoop.hbase.TableName;
52  import org.apache.hadoop.hbase.Tag;
53  import org.apache.hadoop.hbase.TagRewriteCell;
54  import org.apache.hadoop.hbase.client.Append;
55  import org.apache.hadoop.hbase.client.Delete;
56  import org.apache.hadoop.hbase.client.Durability;
57  import org.apache.hadoop.hbase.client.Get;
58  import org.apache.hadoop.hbase.client.Increment;
59  import org.apache.hadoop.hbase.client.Mutation;
60  import org.apache.hadoop.hbase.client.Put;
61  import org.apache.hadoop.hbase.client.Query;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.client.Scan;
64  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
65  import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
66  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
67  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
68  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
69  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
70  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
71  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
72  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
73  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
74  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
75  import org.apache.hadoop.hbase.filter.CompareFilter;
76  import org.apache.hadoop.hbase.filter.Filter;
77  import org.apache.hadoop.hbase.filter.FilterList;
78  import org.apache.hadoop.hbase.io.hfile.HFile;
79  import org.apache.hadoop.hbase.ipc.RequestContext;
80  import org.apache.hadoop.hbase.master.MasterServices;
81  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
82  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
83  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
84  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
85  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
86  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
87  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
88  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
89  import org.apache.hadoop.hbase.regionserver.HRegion;
90  import org.apache.hadoop.hbase.regionserver.InternalScanner;
91  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
92  import org.apache.hadoop.hbase.regionserver.RegionScanner;
93  import org.apache.hadoop.hbase.regionserver.ScanType;
94  import org.apache.hadoop.hbase.regionserver.Store;
95  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
96  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
97  import org.apache.hadoop.hbase.security.AccessDeniedException;
98  import org.apache.hadoop.hbase.security.User;
99  import org.apache.hadoop.hbase.security.UserProvider;
100 import org.apache.hadoop.hbase.security.access.Permission.Action;
101 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
102 import org.apache.hadoop.hbase.util.ByteRange;
103 import org.apache.hadoop.hbase.util.Bytes;
104 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
105 import org.apache.hadoop.hbase.util.Pair;
106 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
107 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
108 
109 import com.google.common.collect.ArrayListMultimap;
110 import com.google.common.collect.ImmutableSet;
111 import com.google.common.collect.ListMultimap;
112 import com.google.common.collect.Lists;
113 import com.google.common.collect.MapMaker;
114 import com.google.common.collect.Maps;
115 import com.google.common.collect.Sets;
116 import com.google.protobuf.Message;
117 import com.google.protobuf.RpcCallback;
118 import com.google.protobuf.RpcController;
119 import com.google.protobuf.Service;
120 
121 /**
122  * Provides basic authorization checks for data access and administrative
123  * operations.
124  *
125  * <p>
126  * {@code AccessController} performs authorization checks for HBase operations
127  * based on:
128  * <ul>
129  *   <li>the identity of the user performing the operation</li>
130  *   <li>the scope over which the operation is performed, in increasing
131  *   specificity: global, table, column family, or qualifier</li>
132  *   <li>the type of action being performed (as mapped to
133  *   {@link Permission.Action} values)</li>
134  * </ul>
135  * If the authorization check fails, an {@link AccessDeniedException}
136  * will be thrown for the operation.
137  * </p>
138  *
139  * <p>
140  * To perform authorization checks, {@code AccessController} relies on the
141  * RpcServerEngine being loaded to provide
142  * the user identities for remote requests.
143  * </p>
144  *
145  * <p>
146  * The access control lists used for authorization can be manipulated via the
147  * exposed {@link AccessControlService} Interface implementation, and the associated
148  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
149  * commands.
150  * </p>
151  */
152 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
153 public class AccessController extends BaseMasterAndRegionObserver
154     implements RegionServerObserver,
155       AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
156 
157   public static final Log LOG = LogFactory.getLog(AccessController.class);
158 
159   private static final Log AUDITLOG =
160     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
161   private static final String CHECK_COVERING_PERM = "check_covering_perm";
162   private static final String TAG_CHECK_PASSED = "tag_check_passed";
163   private static final byte[] TRUE = Bytes.toBytes(true);
164 
165   TableAuthManager authManager = null;
166 
167   // flags if we are running on a region of the _acl_ table
168   boolean aclRegion = false;
169 
170   // defined only for Endpoint implementation, so it can have way to
171   // access region services.
172   private RegionCoprocessorEnvironment regionEnv;
173 
174   /** Mapping of scanner instances to the user who created them */
175   private Map<InternalScanner,String> scannerOwners =
176       new MapMaker().weakKeys().makeMap();
177 
178   private Map<TableName, List<UserPermission>> tableAcls;
179 
180   // Provider for mapping principal names to Users
181   private UserProvider userProvider;
182 
183   // The list of users with superuser authority
184   private List<String> superusers;
185 
186   // if we are able to support cell ACLs
187   boolean cellFeaturesEnabled;
188 
189   // if we should check EXEC permissions
190   boolean shouldCheckExecPermission;
191 
192   // if we should terminate access checks early as soon as table or CF grants
193   // allow access; pre-0.98 compatible behavior
194   boolean compatibleEarlyTermination;
195 
196   private volatile boolean initialized = false;
197 
198   // This boolean having relevance only in the Master.
199   private volatile boolean aclTabAvailable = false;
200 
201   public HRegion getRegion() {
202     return regionEnv != null ? regionEnv.getRegion() : null;
203   }
204 
205   public TableAuthManager getAuthManager() {
206     return authManager;
207   }
208 
209   void initialize(RegionCoprocessorEnvironment e) throws IOException {
210     final HRegion region = e.getRegion();
211     Configuration conf = e.getConfiguration();
212     Map<byte[], ListMultimap<String,TablePermission>> tables =
213         AccessControlLists.loadAll(region);
214     // For each table, write out the table's permissions to the respective
215     // znode for that table.
216     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
217       tables.entrySet()) {
218       byte[] entry = t.getKey();
219       ListMultimap<String,TablePermission> perms = t.getValue();
220       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
221       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
222     }
223     initialized = true;
224   }
225 
226   /**
227    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
228    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
229    * table updates.
230    */
231   void updateACL(RegionCoprocessorEnvironment e,
232       final Map<byte[], List<Cell>> familyMap) {
233     Set<byte[]> entries =
234         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
235     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
236       List<Cell> cells = f.getValue();
237       for (Cell cell: cells) {
238         if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
239             cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
240             AccessControlLists.ACL_LIST_FAMILY.length)) {
241           entries.add(CellUtil.cloneRow(cell));
242         }
243       }
244     }
245     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
246     Configuration conf = regionEnv.getConfiguration();
247     for (byte[] entry: entries) {
248       try {
249         ListMultimap<String,TablePermission> perms =
250           AccessControlLists.getPermissions(conf, entry);
251         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
252         zkw.writeToZookeeper(entry, serialized);
253       } catch (IOException ex) {
254         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
255             ex);
256       }
257     }
258   }
259 
260   /**
261    * Check the current user for authorization to perform a specific action
262    * against the given set of row data.
263    *
264    * <p>Note: Ordering of the authorization checks
265    * has been carefully optimized to short-circuit the most common requests
266    * and minimize the amount of processing required.</p>
267    *
268    * @param permRequest the action being requested
269    * @param e the coprocessor environment
270    * @param families the map of column families to qualifiers present in
271    * the request
272    * @return an authorization result
273    */
274   AuthResult permissionGranted(String request, User user, Action permRequest,
275       RegionCoprocessorEnvironment e,
276       Map<byte [], ? extends Collection<?>> families) {
277     HRegionInfo hri = e.getRegion().getRegionInfo();
278     TableName tableName = hri.getTable();
279 
280     // 1. All users need read access to hbase:meta table.
281     // this is a very common operation, so deal with it quickly.
282     if (hri.isMetaRegion()) {
283       if (permRequest == Action.READ) {
284         return AuthResult.allow(request, "All users allowed", user,
285           permRequest, tableName, families);
286       }
287     }
288 
289     if (user == null) {
290       return AuthResult.deny(request, "No user associated with request!", null,
291         permRequest, tableName, families);
292     }
293 
294     // 2. check for the table-level, if successful we can short-circuit
295     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
296       return AuthResult.allow(request, "Table permission granted", user,
297         permRequest, tableName, families);
298     }
299 
300     // 3. check permissions against the requested families
301     if (families != null && families.size() > 0) {
302       // all families must pass
303       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
304         // a) check for family level access
305         if (authManager.authorize(user, tableName, family.getKey(),
306             permRequest)) {
307           continue;  // family-level permission overrides per-qualifier
308         }
309 
310         // b) qualifier level access can still succeed
311         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
312           if (family.getValue() instanceof Set) {
313             // for each qualifier of the family
314             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
315             for (byte[] qualifier : familySet) {
316               if (!authManager.authorize(user, tableName, family.getKey(),
317                                          qualifier, permRequest)) {
318                 return AuthResult.deny(request, "Failed qualifier check", user,
319                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
320               }
321             }
322           } else if (family.getValue() instanceof List) { // List<KeyValue>
323             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
324             for (KeyValue kv : kvList) {
325               if (!authManager.authorize(user, tableName, family.getKey(),
326                       kv.getQualifier(), permRequest)) {
327                 return AuthResult.deny(request, "Failed qualifier check", user,
328                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
329               }
330             }
331           }
332         } else {
333           // no qualifiers and family-level check already failed
334           return AuthResult.deny(request, "Failed family check", user, permRequest,
335               tableName, makeFamilyMap(family.getKey(), null));
336         }
337       }
338 
339       // all family checks passed
340       return AuthResult.allow(request, "All family checks passed", user, permRequest,
341           tableName, families);
342     }
343 
344     // 4. no families to check and table level access failed
345     return AuthResult.deny(request, "No families to check and table permission failed",
346         user, permRequest, tableName, families);
347   }
348 
349   /**
350    * Check the current user for authorization to perform a specific action
351    * against the given set of row data.
352    * @param opType the operation type
353    * @param user the user
354    * @param e the coprocessor environment
355    * @param families the map of column families to qualifiers present in
356    * the request
357    * @param actions the desired actions
358    * @return an authorization result
359    */
360   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
361       Map<byte [], ? extends Collection<?>> families, Action... actions) {
362     AuthResult result = null;
363     for (Action action: actions) {
364       result = permissionGranted(opType.toString(), user, action, e, families);
365       if (!result.isAllowed()) {
366         return result;
367       }
368     }
369     return result;
370   }
371 
372   private void logResult(AuthResult result) {
373     if (AUDITLOG.isTraceEnabled()) {
374       RequestContext ctx = RequestContext.get();
375       InetAddress remoteAddr = null;
376       if (ctx != null) {
377         remoteAddr = ctx.getRemoteAddress();
378       }
379       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
380           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
381           "; reason: " + result.getReason() +
382           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
383           "; request: " + result.getRequest() +
384           "; context: " + result.toContextString());
385     }
386   }
387 
388   /**
389    * Returns the active user to which authorization checks should be applied.
390    * If we are in the context of an RPC call, the remote user is used,
391    * otherwise the currently logged in user is used.
392    */
393   private User getActiveUser() throws IOException {
394     User user = RequestContext.getRequestUser();
395     if (!RequestContext.isInRequestContext()) {
396       // for non-rpc handling, fallback to system user
397       user = userProvider.getCurrent();
398     }
399     return user;
400   }
401 
402   /**
403    * Authorizes that the current user has any of the given permissions for the
404    * given table, column family and column qualifier.
405    * @param tableName Table requested
406    * @param family Column family requested
407    * @param qualifier Column qualifier requested
408    * @throws IOException if obtaining the current user fails
409    * @throws AccessDeniedException if user has no authorization
410    */
411   private void requirePermission(String request, TableName tableName, byte[] family, byte[] qualifier,
412       Action... permissions) throws IOException {
413     User user = getActiveUser();
414     AuthResult result = null;
415 
416     for (Action permission : permissions) {
417       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
418         result = AuthResult.allow(request, "Table permission granted", user,
419                                   permission, tableName, family, qualifier);
420         break;
421       } else {
422         // rest of the world
423         result = AuthResult.deny(request, "Insufficient permissions", user,
424                                  permission, tableName, family, qualifier);
425       }
426     }
427     logResult(result);
428     if (!result.isAllowed()) {
429       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
430     }
431   }
432 
433   /**
434    * Authorizes that the current user has any of the given permissions to access the table.
435    *
436    * @param tableName Table requested
437    * @param permissions Actions being requested
438    * @throws IOException if obtaining the current user fails
439    * @throws AccessDeniedException if user has no authorization
440    */
441   private void requireAccess(String request, TableName tableName,
442       Action... permissions) throws IOException {
443     User user = getActiveUser();
444     AuthResult result = null;
445 
446     for (Action permission : permissions) {
447       if (authManager.hasAccess(user, tableName, permission)) {
448         result = AuthResult.allow(request, "Table permission granted", user,
449                                   permission, tableName, null, null);
450         break;
451       } else {
452         // rest of the world
453         result = AuthResult.deny(request, "Insufficient permissions", user,
454                                  permission, tableName, null, null);
455       }
456     }
457     logResult(result);
458     if (!result.isAllowed()) {
459       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
460     }
461   }
462 
463   /**
464    * Authorizes that the current user has global privileges for the given action.
465    * @param perm The action being requested
466    * @throws IOException if obtaining the current user fails
467    * @throws AccessDeniedException if authorization is denied
468    */
469   private void requirePermission(String request, Action perm) throws IOException {
470     requireGlobalPermission(request, perm, null, null);
471   }
472 
473   /**
474    * Authorizes that the current user has permission to perform the given
475    * action on the set of table column families.
476    * @param perm Action that is required
477    * @param env The current coprocessor environment
478    * @param families The map of column families-qualifiers.
479    * @throws AccessDeniedException if the authorization check failed
480    */
481   private void requirePermission(String request, Action perm,
482         RegionCoprocessorEnvironment env,
483         Map<byte[], ? extends Collection<?>> families)
484       throws IOException {
485     User user = getActiveUser();
486     AuthResult result = permissionGranted(request, user, perm, env, families);
487     logResult(result);
488 
489     if (!result.isAllowed()) {
490       throw new AccessDeniedException("Insufficient permissions (table=" +
491         env.getRegion().getTableDesc().getTableName()+
492         ((families != null && families.size() > 0) ? ", family: " +
493         result.toFamilyString() : "") + ", action=" +
494         perm.toString() + ")");
495     }
496   }
497 
498   /**
499    * Checks that the user has the given global permission. The generated
500    * audit log message will contain context information for the operation
501    * being authorized, based on the given parameters.
502    * @param perm Action being requested
503    * @param tableName Affected table name.
504    * @param familyMap Affected column families.
505    */
506   private void requireGlobalPermission(String request, Action perm, TableName tableName,
507       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
508     User user = getActiveUser();
509     if (authManager.authorize(user, perm)) {
510       logResult(AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap));
511     } else {
512       logResult(AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap));
513       throw new AccessDeniedException("Insufficient permissions for user '" +
514           (user != null ? user.getShortName() : "null") +"' (global, action=" +
515           perm.toString() + ")");
516     }
517   }
518 
519   /**
520    * Checks that the user has the given global permission. The generated
521    * audit log message will contain context information for the operation
522    * being authorized, based on the given parameters.
523    * @param perm Action being requested
524    * @param namespace
525    */
526   private void requireGlobalPermission(String request, Action perm,
527                                        String namespace) throws IOException {
528     User user = getActiveUser();
529     if (authManager.authorize(user, perm)) {
530       logResult(AuthResult.allow(request, "Global check allowed", user, perm, namespace));
531     } else {
532       logResult(AuthResult.deny(request, "Global check failed", user, perm, namespace));
533       throw new AccessDeniedException("Insufficient permissions for user '" +
534           (user != null ? user.getShortName() : "null") +"' (global, action=" +
535           perm.toString() + ")");
536     }
537   }
538 
539   /**
540    * Checks that the user has the given global or namespace permission.
541    * @param namespace
542    * @param permissions Actions being requested
543    */
544   public void requireNamespacePermission(String request, String namespace,
545       Action... permissions) throws IOException {
546     User user = getActiveUser();
547     AuthResult result = null;
548 
549     for (Action permission : permissions) {
550       if (authManager.authorize(user, namespace, permission)) {
551         result = AuthResult.allow(request, "Namespace permission granted",
552             user, permission, namespace);
553         break;
554       } else {
555         // rest of the world
556         result = AuthResult.deny(request, "Insufficient permissions", user,
557             permission, namespace);
558       }
559     }
560     logResult(result);
561     if (!result.isAllowed()) {
562       throw new AccessDeniedException("Insufficient permissions "
563           + result.toContextString());
564     }
565   }
566 
567   /**
568    * Returns <code>true</code> if the current user is allowed the given action
569    * over at least one of the column qualifiers in the given column families.
570    */
571   private boolean hasFamilyQualifierPermission(User user,
572       Action perm,
573       RegionCoprocessorEnvironment env,
574       Map<byte[], ? extends Collection<byte[]>> familyMap)
575     throws IOException {
576     HRegionInfo hri = env.getRegion().getRegionInfo();
577     TableName tableName = hri.getTable();
578 
579     if (user == null) {
580       return false;
581     }
582 
583     if (familyMap != null && familyMap.size() > 0) {
584       // at least one family must be allowed
585       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
586           familyMap.entrySet()) {
587         if (family.getValue() != null && !family.getValue().isEmpty()) {
588           for (byte[] qualifier : family.getValue()) {
589             if (authManager.matchPermission(user, tableName,
590                 family.getKey(), qualifier, perm)) {
591               return true;
592             }
593           }
594         } else {
595           if (authManager.matchPermission(user, tableName, family.getKey(),
596               perm)) {
597             return true;
598           }
599         }
600       }
601     } else if (LOG.isDebugEnabled()) {
602       LOG.debug("Empty family map passed for permission check");
603     }
604 
605     return false;
606   }
607 
608   private enum OpType {
609     GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
610     GET("get"),
611     EXISTS("exists"),
612     SCAN("scan"),
613     PUT("put"),
614     DELETE("delete"),
615     CHECK_AND_PUT("checkAndPut"),
616     CHECK_AND_DELETE("checkAndDelete"),
617     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
618     APPEND("append"),
619     INCREMENT("increment");
620 
621     private String type;
622 
623     private OpType(String type) {
624       this.type = type;
625     }
626 
627     @Override
628     public String toString() {
629       return type;
630     }
631   }
632 
633   /**
634    * Determine if cell ACLs covered by the operation grant access. This is expensive.
635    * @return false if cell ACLs failed to grant access, true otherwise
636    * @throws IOException
637    */
638   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
639       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
640       throws IOException {
641     if (!cellFeaturesEnabled) {
642       return false;
643     }
644     long cellGrants = 0;
645     User user = getActiveUser();
646     long latestCellTs = 0;
647     Get get = new Get(row);
648     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
649     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
650     // version. We have to get every cell version and check its TS against the TS asked for in
651     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
652     // consider only one such passing cell. In case of Delete we have to consider all the cell
653     // versions under this passing version. When Delete Mutation contains columns which are a
654     // version delete just consider only one version for those column cells.
655     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
656     if (considerCellTs) {
657       get.setMaxVersions();
658     } else {
659       get.setMaxVersions(1);
660     }
661     boolean diffCellTsFromOpTs = false;
662     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
663       byte[] col = entry.getKey();
664       // TODO: HBASE-7114 could possibly unify the collection type in family
665       // maps so we would not need to do this
666       if (entry.getValue() instanceof Set) {
667         Set<byte[]> set = (Set<byte[]>)entry.getValue();
668         if (set == null || set.isEmpty()) {
669           get.addFamily(col);
670         } else {
671           for (byte[] qual: set) {
672             get.addColumn(col, qual);
673           }
674         }
675       } else if (entry.getValue() instanceof List) {
676         List<Cell> list = (List<Cell>)entry.getValue();
677         if (list == null || list.isEmpty()) {
678           get.addFamily(col);
679         } else {
680           // In case of family delete, a Cell will be added into the list with Qualifier as null.
681           for (Cell cell : list) {
682             if (cell.getQualifierLength() == 0
683                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
684                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
685               get.addFamily(col);
686             } else {
687               get.addColumn(col, CellUtil.cloneQualifier(cell));
688             }
689             if (considerCellTs) {
690               long cellTs = cell.getTimestamp();
691               latestCellTs = Math.max(latestCellTs, cellTs);
692               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
693             }
694           }
695         }
696       } else {
697         throw new RuntimeException("Unhandled collection type " +
698           entry.getValue().getClass().getName());
699       }
700     }
701     // We want to avoid looking into the future. So, if the cells of the
702     // operation specify a timestamp, or the operation itself specifies a
703     // timestamp, then we use the maximum ts found. Otherwise, we bound
704     // the Get to the current server time. We add 1 to the timerange since
705     // the upper bound of a timerange is exclusive yet we need to examine
706     // any cells found there inclusively.
707     long latestTs = Math.max(opTs, latestCellTs);
708     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
709       latestTs = EnvironmentEdgeManager.currentTime();
710     }
711     get.setTimeRange(0, latestTs + 1);
712     // In case of Put operation we set to read all versions. This was done to consider the case
713     // where columns are added with TS other than the Mutation TS. But normally this wont be the
714     // case with Put. There no need to get all versions but get latest version only.
715     if (!diffCellTsFromOpTs && request == OpType.PUT) {
716       get.setMaxVersions(1);
717     }
718     if (LOG.isTraceEnabled()) {
719       LOG.trace("Scanning for cells with " + get);
720     }
721     // This Map is identical to familyMap. The key is a BR rather than byte[].
722     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
723     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
724     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
725     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
726       if (entry.getValue() instanceof List) {
727         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
728       }
729     }
730     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
731     List<Cell> cells = Lists.newArrayList();
732     Cell prevCell = null;
733     ByteRange curFam = new SimpleMutableByteRange();
734     boolean curColAllVersions = (request == OpType.DELETE);
735     long curColCheckTs = opTs;
736     boolean foundColumn = false;
737     try {
738       boolean more = false;
739       do {
740         cells.clear();
741         // scan with limit as 1 to hold down memory use on wide rows
742         more = scanner.next(cells, 1);
743         for (Cell cell: cells) {
744           if (LOG.isTraceEnabled()) {
745             LOG.trace("Found cell " + cell);
746           }
747           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
748           if (colChange) foundColumn = false;
749           prevCell = cell;
750           if (!curColAllVersions && foundColumn) {
751             continue;
752           }
753           if (colChange && considerCellTs) {
754             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
755             List<Cell> cols = familyMap1.get(curFam);
756             for (Cell col : cols) {
757               // null/empty qualifier is used to denote a Family delete. The TS and delete type
758               // associated with this is applicable for all columns within the family. That is
759               // why the below (col.getQualifierLength() == 0) check.
760               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
761                   || CellUtil.matchingQualifier(cell, col)) {
762                 byte type = col.getTypeByte();
763                 if (considerCellTs) {
764                   curColCheckTs = col.getTimestamp();
765                 }
766                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
767                 // a version delete for a column no need to check all the covering cells within
768                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
769                 // One version delete types are Delete/DeleteFamilyVersion
770                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
771                     || (KeyValue.Type.DeleteFamily.getCode() == type);
772                 break;
773               }
774             }
775           }
776           if (cell.getTimestamp() > curColCheckTs) {
777             // Just ignore this cell. This is not a covering cell.
778             continue;
779           }
780           foundColumn = true;
781           for (Action action: actions) {
782             // Are there permissions for this user for the cell?
783             if (!authManager.authorize(user, getTableName(e), cell, action)) {
784               // We can stop if the cell ACL denies access
785               return false;
786             }
787           }
788           cellGrants++;
789         }
790       } while (more);
791     } catch (AccessDeniedException ex) {
792       throw ex;
793     } catch (IOException ex) {
794       LOG.error("Exception while getting cells to calculate covering permission", ex);
795     } finally {
796       scanner.close();
797     }
798     // We should not authorize unless we have found one or more cell ACLs that
799     // grant access. This code is used to check for additional permissions
800     // after no table or CF grants are found.
801     return cellGrants > 0;
802   }
803 
804   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
805     // Iterate over the entries in the familyMap, replacing the cells therein
806     // with new cells including the ACL data
807     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
808       List<Cell> newCells = Lists.newArrayList();
809       for (Cell cell: e.getValue()) {
810         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
811         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
812         if (cell.getTagsLength() > 0) {
813           Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
814             cell.getTagsOffset(), cell.getTagsLength());
815           while (tagIterator.hasNext()) {
816             tags.add(tagIterator.next());
817           }
818         }
819         newCells.add(new TagRewriteCell(cell, Tag.fromList(tags)));
820       }
821       // This is supposed to be safe, won't CME
822       e.setValue(newCells);
823     }
824   }
825 
826   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
827   // type is reserved and should not be explicitly set by user.
828   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
829     // Superusers are allowed to store cells unconditionally.
830     if (superusers.contains(user.getShortName())) {
831       return;
832     }
833     // We already checked (prePut vs preBatchMutation)
834     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
835       return;
836     }
837     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
838       Cell cell = cellScanner.current();
839       if (cell.getTagsLength() > 0) {
840         Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
841           cell.getTagsLength());
842         while (tagsItr.hasNext()) {
843           if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
844             throw new AccessDeniedException("Mutation contains cell with reserved type tag");
845           }
846         }
847       }
848     }
849     m.setAttribute(TAG_CHECK_PASSED, TRUE);
850   }
851 
852   /* ---- MasterObserver implementation ---- */
853   @Override
854   public void start(CoprocessorEnvironment env) throws IOException {
855     CompoundConfiguration conf = new CompoundConfiguration();
856     conf.add(env.getConfiguration());
857 
858     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
859       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
860 
861     cellFeaturesEnabled = HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS;
862     if (!cellFeaturesEnabled) {
863       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
864           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
865           + " accordingly.");
866     }
867 
868     ZooKeeperWatcher zk = null;
869     if (env instanceof MasterCoprocessorEnvironment) {
870       // if running on HMaster
871       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
872       zk = mEnv.getMasterServices().getZooKeeper();
873     } else if (env instanceof RegionServerCoprocessorEnvironment) {
874       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
875       zk = rsEnv.getRegionServerServices().getZooKeeper();
876     } else if (env instanceof RegionCoprocessorEnvironment) {
877       // if running at region
878       regionEnv = (RegionCoprocessorEnvironment) env;
879       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
880       zk = regionEnv.getRegionServerServices().getZooKeeper();
881       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
882         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
883     }
884 
885     // set the user-provider.
886     this.userProvider = UserProvider.instantiate(env.getConfiguration());
887 
888     // set up the list of users with superuser privilege
889     User user = userProvider.getCurrent();
890     superusers = Lists.asList(user.getShortName(),
891       conf.getStrings(AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
892 
893     // If zk is null or IOException while obtaining auth manager,
894     // throw RuntimeException so that the coprocessor is unloaded.
895     if (zk != null) {
896       try {
897         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
898       } catch (IOException ioe) {
899         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
900       }
901     } else {
902       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
903     }
904 
905     tableAcls = new MapMaker().weakValues().makeMap();
906   }
907 
908   @Override
909   public void stop(CoprocessorEnvironment env) {
910 
911   }
912 
913   @Override
914   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
915       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
916     Set<byte[]> families = desc.getFamiliesKeys();
917     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
918     for (byte[] family: families) {
919       familyMap.put(family, null);
920     }
921     requireNamespacePermission("createTable", desc.getTableName().getNamespaceAsString(), Action.CREATE);
922   }
923 
924   @Override
925   public void postCreateTableHandler(final ObserverContext<MasterCoprocessorEnvironment> c,
926       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
927     // When AC is used, it should be configured as the 1st CP.
928     // In Master, the table operations like create, are handled by a Thread pool but the max size
929     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
930     // sequentially only.
931     // Related code in HMaster#startServiceThreads
932     // {code}
933     //   // We depend on there being only one instance of this executor running
934     //   // at a time. To do concurrency, would need fencing of enable/disable of
935     //   // tables.
936     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
937     // {code}
938     // In future if we change this pool to have more threads, then there is a chance for thread,
939     // creating acl table, getting delayed and by that time another table creation got over and
940     // this hook is getting called. In such a case, we will need a wait logic here which will
941     // wait till the acl table is created.
942     if (AccessControlLists.isAclTable(desc)) {
943       this.aclTabAvailable = true;
944     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
945       if (!aclTabAvailable) {
946         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
947             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
948             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
949       } else {
950         String owner = desc.getOwnerString();
951         // default the table owner to current user, if not specified.
952         if (owner == null)
953           owner = getActiveUser().getShortName();
954         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
955             desc.getTableName(), null, Action.values());
956         // switch to the real hbase master user for doing the RPC on the ACL table
957         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
958           @Override
959           public Void run() throws Exception {
960             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
961                 userperm);
962             return null;
963           }
964         });
965       }
966     }
967   }
968 
969   @Override
970   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
971       throws IOException {
972     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
973   }
974 
975   @Override
976   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
977       final TableName tableName) throws IOException {
978     final Configuration conf = c.getEnvironment().getConfiguration();
979     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
980       @Override
981       public Void run() throws Exception {
982         AccessControlLists.removeTablePermissions(conf, tableName);
983         return null;
984       }
985     });
986     this.authManager.getZKPermissionWatcher().deleteTableACLNode(tableName);
987   }
988 
989   @Override
990   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
991       final TableName tableName) throws IOException {
992     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
993     final Configuration conf = c.getEnvironment().getConfiguration();
994     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
995       @Override
996       public Void run() throws Exception {
997         List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
998         if (acls != null) {
999           tableAcls.put(tableName, acls);
1000         }
1001         return null;
1002       }
1003     });
1004   }
1005 
1006   @Override
1007   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
1008       final TableName tableName) throws IOException {
1009     final Configuration conf = ctx.getEnvironment().getConfiguration();
1010     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1011       @Override
1012       public Void run() throws Exception {
1013         List<UserPermission> perms = tableAcls.get(tableName);
1014         if (perms != null) {
1015           for (UserPermission perm : perms) {
1016             AccessControlLists.addUserPermission(conf, perm);
1017           }
1018         }
1019         tableAcls.remove(tableName);
1020         return null;
1021       }
1022     });
1023   }
1024 
1025   @Override
1026   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1027       HTableDescriptor htd) throws IOException {
1028     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1029   }
1030 
1031   @Override
1032   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
1033       TableName tableName, final HTableDescriptor htd) throws IOException {
1034     final Configuration conf = c.getEnvironment().getConfiguration();
1035     // default the table owner to current user, if not specified.
1036     final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
1037       getActiveUser().getShortName();
1038     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1039       @Override
1040       public Void run() throws Exception {
1041         UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1042           htd.getTableName(), null, Action.values());
1043         AccessControlLists.addUserPermission(conf, userperm);
1044         return null;
1045       }
1046     });
1047   }
1048 
1049   @Override
1050   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1051       HColumnDescriptor column) throws IOException {
1052     requirePermission("addColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
1053   }
1054 
1055   @Override
1056   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1057       HColumnDescriptor descriptor) throws IOException {
1058     requirePermission("modifyColumn", tableName, descriptor.getName(), null, Action.ADMIN,
1059       Action.CREATE);
1060   }
1061 
1062   @Override
1063   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1064       byte[] col) throws IOException {
1065     requirePermission("deleteColumn", tableName, col, null, Action.ADMIN, Action.CREATE);
1066   }
1067 
1068   @Override
1069   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
1070       final TableName tableName, final byte[] col) throws IOException {
1071     final Configuration conf = c.getEnvironment().getConfiguration();
1072     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1073       @Override
1074       public Void run() throws Exception {
1075         AccessControlLists.removeTablePermissions(conf, tableName, col);
1076         return null;
1077       }
1078     });
1079   }
1080 
1081   @Override
1082   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1083       throws IOException {
1084     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1085   }
1086 
1087   @Override
1088   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1089       throws IOException {
1090     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1091       throw new AccessDeniedException("Not allowed to disable "
1092           + AccessControlLists.ACL_TABLE_NAME + " table.");
1093     }
1094     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1095   }
1096 
1097   @Override
1098   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1099       ServerName srcServer, ServerName destServer) throws IOException {
1100     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1101   }
1102 
1103   @Override
1104   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1105       throws IOException {
1106     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1107   }
1108 
1109   @Override
1110   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1111       boolean force) throws IOException {
1112     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1113   }
1114 
1115   @Override
1116   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1117       HRegionInfo regionInfo) throws IOException {
1118     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1119   }
1120 
1121   @Override
1122   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1123       throws IOException {
1124     requirePermission("balance", Action.ADMIN);
1125   }
1126 
1127   @Override
1128   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1129       boolean newValue) throws IOException {
1130     requirePermission("balanceSwitch", Action.ADMIN);
1131     return newValue;
1132   }
1133 
1134   @Override
1135   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1136       throws IOException {
1137     requirePermission("shutdown", Action.ADMIN);
1138   }
1139 
1140   @Override
1141   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1142       throws IOException {
1143     requirePermission("stopMaster", Action.ADMIN);
1144   }
1145 
1146   @Override
1147   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1148       throws IOException {
1149     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1150       .getConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1151       // initialize the ACL storage table
1152       AccessControlLists.createACLTable(ctx.getEnvironment().getMasterServices());
1153     } else {
1154       aclTabAvailable = true;
1155     }
1156   }
1157 
1158   @Override
1159   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1160       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1161       throws IOException {
1162     requirePermission("snapshot", hTableDescriptor.getTableName(), null, null,
1163       Permission.Action.ADMIN);
1164   }
1165 
1166   @Override
1167   public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1168       final SnapshotDescription snapshot) throws IOException {
1169     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1170       // list it, if user is the owner of snapshot
1171     } else {
1172       requirePermission("listSnapshot", Action.ADMIN);
1173     }
1174   }
1175 
1176   @Override
1177   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1178       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1179       throws IOException {
1180     requirePermission("clone", Action.ADMIN);
1181   }
1182 
1183   @Override
1184   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1185       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1186       throws IOException {
1187     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1188       requirePermission("restoreSnapshot", hTableDescriptor.getTableName(), null, null,
1189         Permission.Action.ADMIN);
1190     } else {
1191       requirePermission("restoreSnapshot", Action.ADMIN);
1192     }
1193   }
1194 
1195   @Override
1196   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1197       final SnapshotDescription snapshot) throws IOException {
1198     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1199       // Snapshot owner is allowed to delete the snapshot
1200     } else {
1201       requirePermission("deleteSnapshot", Action.ADMIN);
1202     }
1203   }
1204 
1205   @Override
1206   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1207       NamespaceDescriptor ns) throws IOException {
1208     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1209   }
1210 
1211   @Override
1212   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1213       throws IOException {
1214     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1215   }
1216 
1217   @Override
1218   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1219       final String namespace) throws IOException {
1220     final Configuration conf = ctx.getEnvironment().getConfiguration();
1221     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1222       @Override
1223       public Void run() throws Exception {
1224         AccessControlLists.removeNamespacePermissions(conf, namespace);
1225         return null;
1226       }
1227     });
1228     LOG.info(namespace + "entry deleted in " + AccessControlLists.ACL_TABLE_NAME + " table.");
1229   }
1230 
1231   @Override
1232   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1233       NamespaceDescriptor ns) throws IOException {
1234     // We require only global permission so that 
1235     // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1236     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1237   }
1238 
1239   @Override
1240   public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1241       throws IOException {
1242     requireNamespacePermission("getNamespaceDescriptor", namespace, Action.ADMIN);
1243   }
1244 
1245   @Override
1246   public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1247       List<NamespaceDescriptor> descriptors) throws IOException {
1248     // Retains only those which passes authorization checks, as the checks weren't done as part
1249     // of preGetTableDescriptors.
1250     Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1251     while (itr.hasNext()) {
1252       NamespaceDescriptor desc = itr.next();
1253       try {
1254         requireNamespacePermission("listNamespaces", desc.getName(), Action.ADMIN);
1255       } catch (AccessDeniedException e) {
1256         itr.remove();
1257       }
1258     }
1259   }
1260 
1261   @Override
1262   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1263       final TableName tableName) throws IOException {
1264     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1265   }
1266 
1267   /* ---- RegionObserver implementation ---- */
1268 
1269   @Override
1270   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1271       throws IOException {
1272     RegionCoprocessorEnvironment env = e.getEnvironment();
1273     final HRegion region = env.getRegion();
1274     if (region == null) {
1275       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1276     } else {
1277       HRegionInfo regionInfo = region.getRegionInfo();
1278       if (regionInfo.getTable().isSystemTable()) {
1279         isSystemOrSuperUser(regionEnv.getConfiguration());
1280       } else {
1281         requirePermission("preOpen", Action.ADMIN);
1282       }
1283     }
1284   }
1285 
1286   @Override
1287   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1288     RegionCoprocessorEnvironment env = c.getEnvironment();
1289     final HRegion region = env.getRegion();
1290     if (region == null) {
1291       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1292       return;
1293     }
1294     if (AccessControlLists.isAclRegion(region)) {
1295       aclRegion = true;
1296       // When this region is under recovering state, initialize will be handled by postLogReplay
1297       if (!region.isRecovering()) {
1298         try {
1299           initialize(env);
1300         } catch (IOException ex) {
1301           // if we can't obtain permissions, it's better to fail
1302           // than perform checks incorrectly
1303           throw new RuntimeException("Failed to initialize permissions cache", ex);
1304         }
1305       }
1306     } else {
1307       initialized = true;
1308     }
1309   }
1310 
1311   @Override
1312   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1313     if (aclRegion) {
1314       try {
1315         initialize(c.getEnvironment());
1316       } catch (IOException ex) {
1317         // if we can't obtain permissions, it's better to fail
1318         // than perform checks incorrectly
1319         throw new RuntimeException("Failed to initialize permissions cache", ex);
1320       }
1321     }
1322   }
1323 
1324   @Override
1325   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1326     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1327         Action.CREATE);
1328   }
1329 
1330   @Override
1331   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1332     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1333   }
1334 
1335   @Override
1336   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1337       byte[] splitRow) throws IOException {
1338     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1339   }
1340 
1341   @Override
1342   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1343       final Store store, final InternalScanner scanner, final ScanType scanType)
1344           throws IOException {
1345     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1346         Action.CREATE);
1347     return scanner;
1348   }
1349 
1350   @Override
1351   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1352       final byte [] row, final byte [] family, final Result result)
1353       throws IOException {
1354     assert family != null;
1355     RegionCoprocessorEnvironment env = c.getEnvironment();
1356     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, null);
1357     User user = getActiveUser();
1358     AuthResult authResult = permissionGranted(OpType.GET_CLOSEST_ROW_BEFORE, user, env, families,
1359       Action.READ);
1360     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1361       authResult.setAllowed(checkCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, env, row,
1362         families, HConstants.LATEST_TIMESTAMP, Action.READ));
1363       authResult.setReason("Covering cell set");
1364     }
1365     logResult(authResult);
1366     if (!authResult.isAllowed()) {
1367       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1368     }
1369   }
1370 
1371   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1372       final Query query, OpType opType) throws IOException {
1373     Filter filter = query.getFilter();
1374     // Don't wrap an AccessControlFilter
1375     if (filter != null && filter instanceof AccessControlFilter) {
1376       return;
1377     }
1378     User user = getActiveUser();
1379     RegionCoprocessorEnvironment env = c.getEnvironment();
1380     Map<byte[],? extends Collection<byte[]>> families = null;
1381     switch (opType) {
1382     case GET:
1383     case EXISTS:
1384       families = ((Get)query).getFamilyMap();
1385       break;
1386     case SCAN:
1387       families = ((Scan)query).getFamilyMap();
1388       break;
1389     default:
1390       throw new RuntimeException("Unhandled operation " + opType);
1391     }
1392     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1393     HRegion region = getRegion(env);
1394     TableName table = getTableName(region);
1395     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1396     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1397       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1398     }
1399     if (!authResult.isAllowed()) {
1400       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1401         // Old behavior: Scan with only qualifier checks if we have partial
1402         // permission. Backwards compatible behavior is to throw an
1403         // AccessDeniedException immediately if there are no grants for table
1404         // or CF or CF+qual. Only proceed with an injected filter if there are
1405         // grants for qualifiers. Otherwise we will fall through below and log
1406         // the result and throw an ADE. We may end up checking qualifier
1407         // grants three times (permissionGranted above, here, and in the
1408         // filter) but that's the price of backwards compatibility.
1409         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1410           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1411             AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1412             cfVsMaxVersions);
1413           // wrap any existing filter
1414           if (filter != null) {
1415             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1416               Lists.newArrayList(ourFilter, filter));
1417           }
1418           authResult.setAllowed(true);
1419           authResult.setReason("Access allowed with filter");
1420           switch (opType) {
1421           case GET:
1422           case EXISTS:
1423             ((Get)query).setFilter(ourFilter);
1424             break;
1425           case SCAN:
1426             ((Scan)query).setFilter(ourFilter);
1427             break;
1428           default:
1429             throw new RuntimeException("Unhandled operation " + opType);
1430           }
1431         }
1432       } else {
1433         // New behavior: Any access we might be granted is more fine-grained
1434         // than whole table or CF. Simply inject a filter and return what is
1435         // allowed. We will not throw an AccessDeniedException. This is a
1436         // behavioral change since 0.96.
1437         Filter ourFilter = new AccessControlFilter(authManager, user, table,
1438           AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1439         // wrap any existing filter
1440         if (filter != null) {
1441           ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1442             Lists.newArrayList(ourFilter, filter));
1443         }
1444         authResult.setAllowed(true);
1445         authResult.setReason("Access allowed with filter");
1446         switch (opType) {
1447         case GET:
1448         case EXISTS:
1449           ((Get)query).setFilter(ourFilter);
1450           break;
1451         case SCAN:
1452           ((Scan)query).setFilter(ourFilter);
1453           break;
1454         default:
1455           throw new RuntimeException("Unhandled operation " + opType);
1456         }
1457       }
1458     }
1459 
1460     logResult(authResult);
1461     if (!authResult.isAllowed()) {
1462       throw new AccessDeniedException("Insufficient permissions (table=" + table +
1463         ", action=READ)");
1464     }
1465   }
1466 
1467   @Override
1468   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1469       final Get get, final List<Cell> result) throws IOException {
1470     internalPreRead(c, get, OpType.GET);
1471   }
1472 
1473   @Override
1474   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1475       final Get get, final boolean exists) throws IOException {
1476     internalPreRead(c, get, OpType.EXISTS);
1477     return exists;
1478   }
1479 
1480   @Override
1481   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1482       final Put put, final WALEdit edit, final Durability durability)
1483       throws IOException {
1484     // Require WRITE permission to the table, CF, or top visible value, if any.
1485     // NOTE: We don't need to check the permissions for any earlier Puts
1486     // because we treat the ACLs in each Put as timestamped like any other
1487     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1488     // change the ACL of any previous Put. This allows simple evolution of
1489     // security policy over time without requiring expensive updates.
1490     User user = getActiveUser();
1491     checkForReservedTagPresence(user, put);
1492     RegionCoprocessorEnvironment env = c.getEnvironment();
1493     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1494     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1495     logResult(authResult);
1496     if (!authResult.isAllowed()) {
1497       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1498         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1499       } else {
1500         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1501       }
1502     }
1503     // Add cell ACLs from the operation to the cells themselves
1504     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1505     if (bytes != null) {
1506       if (cellFeaturesEnabled) {
1507         addCellPermissions(bytes, put.getFamilyCellMap());
1508       } else {
1509         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1510       }
1511     }
1512   }
1513 
1514   @Override
1515   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1516       final Put put, final WALEdit edit, final Durability durability) {
1517     if (aclRegion) {
1518       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1519     }
1520   }
1521 
1522   @Override
1523   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1524       final Delete delete, final WALEdit edit, final Durability durability)
1525       throws IOException {
1526     // An ACL on a delete is useless, we shouldn't allow it
1527     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1528       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1529     }
1530     // Require WRITE permissions on all cells covered by the delete. Unlike
1531     // for Puts we need to check all visible prior versions, because a major
1532     // compaction could remove them. If the user doesn't have permission to
1533     // overwrite any of the visible versions ('visible' defined as not covered
1534     // by a tombstone already) then we have to disallow this operation.
1535     RegionCoprocessorEnvironment env = c.getEnvironment();
1536     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1537     User user = getActiveUser();
1538     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1539     logResult(authResult);
1540     if (!authResult.isAllowed()) {
1541       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1542         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1543       } else {
1544         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1545       }
1546     }
1547   }
1548 
1549   @Override
1550   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1551       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1552     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1553       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1554       for (int i = 0; i < miniBatchOp.size(); i++) {
1555         Mutation m = miniBatchOp.getOperation(i);
1556         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1557           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1558           // perm check
1559           OpType opType;
1560           if (m instanceof Put) {
1561             checkForReservedTagPresence(getActiveUser(), m);
1562             opType = OpType.PUT;
1563           } else {
1564             opType = OpType.DELETE;
1565           }
1566           AuthResult authResult = null;
1567           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(), m.getFamilyCellMap(),
1568               m.getTimeStamp(), Action.WRITE)) {
1569             authResult = AuthResult.allow(opType.toString(), "Covering cell set", getActiveUser(),
1570                 Action.WRITE, table, m.getFamilyCellMap());
1571           } else {
1572             authResult = AuthResult.deny(opType.toString(), "Covering cell set", getActiveUser(),
1573                 Action.WRITE, table, m.getFamilyCellMap());
1574           }
1575           logResult(authResult);
1576           if (!authResult.isAllowed()) {
1577             throw new AccessDeniedException("Insufficient permissions "
1578                 + authResult.toContextString());
1579           }
1580         }
1581       }
1582     }
1583   }
1584 
1585   @Override
1586   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1587       final Delete delete, final WALEdit edit, final Durability durability)
1588       throws IOException {
1589     if (aclRegion) {
1590       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1591     }
1592   }
1593 
1594   @Override
1595   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1596       final byte [] row, final byte [] family, final byte [] qualifier,
1597       final CompareFilter.CompareOp compareOp,
1598       final ByteArrayComparable comparator, final Put put,
1599       final boolean result) throws IOException {
1600     // Require READ and WRITE permissions on the table, CF, and KV to update
1601     User user = getActiveUser();
1602     checkForReservedTagPresence(user, put);
1603     RegionCoprocessorEnvironment env = c.getEnvironment();
1604     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1605     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1606       Action.READ, Action.WRITE);
1607     logResult(authResult);
1608     if (!authResult.isAllowed()) {
1609       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1610         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1611       } else {
1612         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1613       }
1614     }
1615     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1616     if (bytes != null) {
1617       if (cellFeaturesEnabled) {
1618         addCellPermissions(bytes, put.getFamilyCellMap());
1619       } else {
1620         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1621       }
1622     }
1623     return result;
1624   }
1625 
1626   @Override
1627   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1628       final byte[] row, final byte[] family, final byte[] qualifier,
1629       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1630       final boolean result) throws IOException {
1631     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1632       // We had failure with table, cf and q perm checks and now giving a chance for cell
1633       // perm check
1634       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1635       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1636       AuthResult authResult = null;
1637       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1638           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1639         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1640             getActiveUser(), Action.READ, table, families);
1641       } else {
1642         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1643             getActiveUser(), Action.READ, table, families);
1644       }
1645       logResult(authResult);
1646       if (!authResult.isAllowed()) {
1647         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1648       }
1649     }
1650     return result;
1651   }
1652 
1653   @Override
1654   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1655       final byte [] row, final byte [] family, final byte [] qualifier,
1656       final CompareFilter.CompareOp compareOp,
1657       final ByteArrayComparable comparator, final Delete delete,
1658       final boolean result) throws IOException {
1659     // An ACL on a delete is useless, we shouldn't allow it
1660     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1661       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1662           delete.toString());
1663     }
1664     // Require READ and WRITE permissions on the table, CF, and the KV covered
1665     // by the delete
1666     RegionCoprocessorEnvironment env = c.getEnvironment();
1667     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1668     User user = getActiveUser();
1669     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1670       Action.READ, Action.WRITE);
1671     logResult(authResult);
1672     if (!authResult.isAllowed()) {
1673       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1674         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1675       } else {
1676         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1677       }
1678     }
1679     return result;
1680   }
1681 
1682   @Override
1683   public boolean preCheckAndDeleteAfterRowLock(
1684       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1685       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1686       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1687       throws IOException {
1688     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1689       // We had failure with table, cf and q perm checks and now giving a chance for cell
1690       // perm check
1691       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1692       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1693       AuthResult authResult = null;
1694       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1695           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1696         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1697             getActiveUser(), Action.READ, table, families);
1698       } else {
1699         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1700             getActiveUser(), Action.READ, table, families);
1701       }
1702       logResult(authResult);
1703       if (!authResult.isAllowed()) {
1704         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1705       }
1706     }
1707     return result;
1708   }
1709 
1710   @Override
1711   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1712       final byte [] row, final byte [] family, final byte [] qualifier,
1713       final long amount, final boolean writeToWAL)
1714       throws IOException {
1715     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1716     // incremented value
1717     RegionCoprocessorEnvironment env = c.getEnvironment();
1718     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1719     User user = getActiveUser();
1720     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1721       Action.WRITE);
1722     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1723       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1724         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1725       authResult.setReason("Covering cell set");
1726     }
1727     logResult(authResult);
1728     if (!authResult.isAllowed()) {
1729       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1730     }
1731     return -1;
1732   }
1733 
1734   @Override
1735   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1736       throws IOException {
1737     // Require WRITE permission to the table, CF, and the KV to be appended
1738     User user = getActiveUser();
1739     checkForReservedTagPresence(user, append);
1740     RegionCoprocessorEnvironment env = c.getEnvironment();
1741     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1742     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1743     logResult(authResult);
1744     if (!authResult.isAllowed()) {
1745       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1746         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1747       } else {
1748         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1749       }
1750     }
1751     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1752     if (bytes != null) {
1753       if (cellFeaturesEnabled) {
1754         addCellPermissions(bytes, append.getFamilyCellMap());
1755       } else {
1756         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1757       }
1758     }
1759     return null;
1760   }
1761 
1762   @Override
1763   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1764       final Append append) throws IOException {
1765     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1766       // We had failure with table, cf and q perm checks and now giving a chance for cell
1767       // perm check
1768       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1769       AuthResult authResult = null;
1770       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1771           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1772         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1773             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1774       } else {
1775         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1776             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1777       }
1778       logResult(authResult);
1779       if (!authResult.isAllowed()) {
1780         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1781       }
1782     }
1783     return null;
1784   }
1785 
1786   @Override
1787   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1788       final Increment increment)
1789       throws IOException {
1790     // Require WRITE permission to the table, CF, and the KV to be replaced by
1791     // the incremented value
1792     User user = getActiveUser();
1793     checkForReservedTagPresence(user, increment);
1794     RegionCoprocessorEnvironment env = c.getEnvironment();
1795     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1796     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1797       Action.WRITE);
1798     logResult(authResult);
1799     if (!authResult.isAllowed()) {
1800       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1801         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1802       } else {
1803         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1804       }
1805     }
1806     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1807     if (bytes != null) {
1808       if (cellFeaturesEnabled) {
1809         addCellPermissions(bytes, increment.getFamilyCellMap());
1810       } else {
1811         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1812       }
1813     }
1814     return null;
1815   }
1816 
1817   @Override
1818   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1819       final Increment increment) throws IOException {
1820     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1821       // We had failure with table, cf and q perm checks and now giving a chance for cell
1822       // perm check
1823       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1824       AuthResult authResult = null;
1825       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1826           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1827         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1828             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1829       } else {
1830         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1831             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1832       }
1833       logResult(authResult);
1834       if (!authResult.isAllowed()) {
1835         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1836       }
1837     }
1838     return null;
1839   }
1840 
1841   @Override
1842   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1843       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1844     // If the HFile version is insufficient to persist tags, we won't have any
1845     // work to do here
1846     if (!cellFeaturesEnabled) {
1847       return newCell;
1848     }
1849 
1850     // Collect any ACLs from the old cell
1851     List<Tag> tags = Lists.newArrayList();
1852     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1853     if (oldCell != null) {
1854       // Save an object allocation where we can
1855       if (oldCell.getTagsLength() > 0) {
1856         Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell.getTagsArray(),
1857           oldCell.getTagsOffset(), oldCell.getTagsLength());
1858         while (tagIterator.hasNext()) {
1859           Tag tag = tagIterator.next();
1860           if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
1861             // Not an ACL tag, just carry it through
1862             if (LOG.isTraceEnabled()) {
1863               LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
1864                 " length " + tag.getTagLength());
1865             }
1866             tags.add(tag);
1867           } else {
1868             // Merge the perms from the older ACL into the current permission set
1869             ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
1870               AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
1871                 tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
1872             perms.putAll(kvPerms);
1873           }
1874         }
1875       }
1876     }
1877 
1878     // Do we have an ACL on the operation?
1879     byte[] aclBytes = mutation.getACL();
1880     if (aclBytes != null) {
1881       // Yes, use it
1882       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
1883     } else {
1884       // No, use what we carried forward
1885       if (perms != null) {
1886         // TODO: If we collected ACLs from more than one tag we may have a
1887         // List<Permission> of size > 1, this can be collapsed into a single
1888         // Permission
1889         if (LOG.isTraceEnabled()) {
1890           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1891         }
1892         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
1893           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
1894       }
1895     }
1896 
1897     // If we have no tags to add, just return
1898     if (tags.isEmpty()) {
1899       return newCell;
1900     }
1901 
1902     Cell rewriteCell = new TagRewriteCell(newCell, Tag.fromList(tags));
1903     return rewriteCell;
1904   }
1905 
1906   @Override
1907   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1908       final Scan scan, final RegionScanner s) throws IOException {
1909     internalPreRead(c, scan, OpType.SCAN);
1910     return s;
1911   }
1912 
1913   @Override
1914   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1915       final Scan scan, final RegionScanner s) throws IOException {
1916     User user = getActiveUser();
1917     if (user != null && user.getShortName() != null) {      // store reference to scanner owner for later checks
1918       scannerOwners.put(s, user.getShortName());
1919     }
1920     return s;
1921   }
1922 
1923   @Override
1924   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1925       final InternalScanner s, final List<Result> result,
1926       final int limit, final boolean hasNext) throws IOException {
1927     requireScannerOwner(s);
1928     return hasNext;
1929   }
1930 
1931   @Override
1932   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1933       final InternalScanner s) throws IOException {
1934     requireScannerOwner(s);
1935   }
1936 
1937   @Override
1938   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1939       final InternalScanner s) throws IOException {
1940     // clean up any associated owner mapping
1941     scannerOwners.remove(s);
1942   }
1943 
1944   /**
1945    * Verify, when servicing an RPC, that the caller is the scanner owner.
1946    * If so, we assume that access control is correctly enforced based on
1947    * the checks performed in preScannerOpen()
1948    */
1949   private void requireScannerOwner(InternalScanner s)
1950       throws AccessDeniedException {
1951     if (RequestContext.isInRequestContext()) {
1952       String requestUserName = RequestContext.getRequestUserName();
1953       String owner = scannerOwners.get(s);
1954       if (owner != null && !owner.equals(requestUserName)) {
1955         throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
1956       }
1957     }
1958   }
1959 
1960   /**
1961    * Verifies user has CREATE privileges on
1962    * the Column Families involved in the bulkLoadHFile
1963    * request. Specific Column Write privileges are presently
1964    * ignored.
1965    */
1966   @Override
1967   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1968       List<Pair<byte[], String>> familyPaths) throws IOException {
1969     for(Pair<byte[],String> el : familyPaths) {
1970       requirePermission("preBulkLoadHFile",
1971           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
1972           el.getFirst(),
1973           null,
1974           Action.CREATE);
1975     }
1976   }
1977 
1978   private AuthResult hasSomeAccess(RegionCoprocessorEnvironment e, String method, Action action)
1979       throws IOException {
1980     User requestUser = getActiveUser();
1981     final TableName tableName = e.getRegion().getTableDesc().getTableName();
1982     AuthResult authResult = permissionGranted(method, requestUser, action, e,
1983       Collections.EMPTY_MAP);
1984     if (!authResult.isAllowed()) {
1985       final Configuration conf = e.getConfiguration();
1986       // hasSomeAccess is called from bulkload pre hooks
1987       List<UserPermission> perms =
1988         User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
1989           @Override
1990           public List<UserPermission> run() throws Exception {
1991             return AccessControlLists.getUserTablePermissions(conf, tableName);
1992           }
1993         });
1994       for (UserPermission userPerm: perms) {
1995         for (Action userAction: userPerm.getActions()) {
1996           if (userAction.equals(action)) {
1997             return AuthResult.allow(method, "Access allowed", requestUser,
1998               action, tableName, null, null);
1999           }
2000         }
2001       }
2002     }
2003     return authResult;
2004   }
2005 
2006   /**
2007    * Authorization check for
2008    * SecureBulkLoadProtocol.prepareBulkLoad()
2009    * @param ctx the context
2010    * @param request the request
2011    * @throws IOException
2012    */
2013   @Override
2014   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2015                                  PrepareBulkLoadRequest request) throws IOException {
2016     RegionCoprocessorEnvironment e = ctx.getEnvironment();
2017 
2018     AuthResult authResult = hasSomeAccess(e, "prePrepareBulkLoad", Action.CREATE);
2019     logResult(authResult);
2020     if (!authResult.isAllowed()) {
2021       throw new AccessDeniedException("Insufficient permissions (table=" +
2022         e.getRegion().getTableDesc().getTableName() + ", action=CREATE)");
2023     }
2024   }
2025 
2026   /**
2027    * Authorization security check for
2028    * SecureBulkLoadProtocol.cleanupBulkLoad()
2029    * @param ctx the context
2030    * @param request the request
2031    * @throws IOException
2032    */
2033   @Override
2034   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2035                                  CleanupBulkLoadRequest request) throws IOException {
2036     RegionCoprocessorEnvironment e = ctx.getEnvironment();
2037 
2038     AuthResult authResult = hasSomeAccess(e, "preCleanupBulkLoad", Action.CREATE);
2039     logResult(authResult);
2040     if (!authResult.isAllowed()) {
2041       throw new AccessDeniedException("Insufficient permissions (table=" +
2042         e.getRegion().getTableDesc().getTableName() + ", action=CREATE)");
2043     }
2044   }
2045 
2046   /* ---- EndpointObserver implementation ---- */
2047 
2048   @Override
2049   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2050       Service service, String methodName, Message request) throws IOException {
2051     // Don't intercept calls to our own AccessControlService, we check for
2052     // appropriate permissions in the service handlers
2053     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2054       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
2055         methodName + ")",
2056         getTableName(ctx.getEnvironment()), null, null,
2057         Action.EXEC);
2058     }
2059     return request;
2060   }
2061 
2062   @Override
2063   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2064       Service service, String methodName, Message request, Message.Builder responseBuilder)
2065       throws IOException { }
2066 
2067   /* ---- Protobuf AccessControlService implementation ---- */
2068 
2069   @Override
2070   public void grant(RpcController controller,
2071                     AccessControlProtos.GrantRequest request,
2072                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2073     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2074     AccessControlProtos.GrantResponse response = null;
2075     try {
2076       // verify it's only running at .acl.
2077       if (aclRegion) {
2078         if (!initialized) {
2079           throw new CoprocessorException("AccessController not yet initialized");
2080         }
2081         if (LOG.isDebugEnabled()) {
2082           LOG.debug("Received request to grant access permission " + perm.toString());
2083         }
2084 
2085         switch(request.getUserPermission().getPermission().getType()) {
2086           case Global :
2087           case Table :
2088             requirePermission("grant", perm.getTableName(), perm.getFamily(),
2089                 perm.getQualifier(), Action.ADMIN);
2090             break;
2091           case Namespace :
2092             requireGlobalPermission("grant", Action.ADMIN, perm.getNamespace());
2093             break;
2094         }
2095 
2096         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2097           @Override
2098           public Void run() throws Exception {
2099             AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2100             return null;
2101           }
2102         });
2103 
2104         if (AUDITLOG.isTraceEnabled()) {
2105           // audit log should store permission changes in addition to auth results
2106           AUDITLOG.trace("Granted permission " + perm.toString());
2107         }
2108       } else {
2109         throw new CoprocessorException(AccessController.class, "This method "
2110             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2111       }
2112       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2113     } catch (IOException ioe) {
2114       // pass exception back up
2115       ResponseConverter.setControllerException(controller, ioe);
2116     }
2117     done.run(response);
2118   }
2119 
2120   @Override
2121   public void revoke(RpcController controller,
2122                      AccessControlProtos.RevokeRequest request,
2123                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2124     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2125     AccessControlProtos.RevokeResponse response = null;
2126     try {
2127       // only allowed to be called on _acl_ region
2128       if (aclRegion) {
2129         if (!initialized) {
2130           throw new CoprocessorException("AccessController not yet initialized");
2131         }
2132         if (LOG.isDebugEnabled()) {
2133           LOG.debug("Received request to revoke access permission " + perm.toString());
2134         }
2135 
2136         switch(request.getUserPermission().getPermission().getType()) {
2137           case Global :
2138           case Table :
2139             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2140                               perm.getQualifier(), Action.ADMIN);
2141             break;
2142           case Namespace :
2143             requireGlobalPermission("revoke", Action.ADMIN, perm.getNamespace());
2144             break;
2145         }
2146 
2147         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2148           @Override
2149           public Void run() throws Exception {
2150             AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2151             return null;
2152           }
2153         });
2154 
2155         if (AUDITLOG.isTraceEnabled()) {
2156           // audit log should record all permission changes
2157           AUDITLOG.trace("Revoked permission " + perm.toString());
2158         }
2159       } else {
2160         throw new CoprocessorException(AccessController.class, "This method "
2161             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2162       }
2163       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2164     } catch (IOException ioe) {
2165       // pass exception back up
2166       ResponseConverter.setControllerException(controller, ioe);
2167     }
2168     done.run(response);
2169   }
2170 
2171   @Override
2172   public void getUserPermissions(RpcController controller,
2173                                  AccessControlProtos.GetUserPermissionsRequest request,
2174                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2175     AccessControlProtos.GetUserPermissionsResponse response = null;
2176     try {
2177       // only allowed to be called on _acl_ region
2178       if (aclRegion) {
2179         if (!initialized) {
2180           throw new CoprocessorException("AccessController not yet initialized");
2181         }
2182         List<UserPermission> perms = null;
2183         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2184           final TableName table = request.hasTableName() ?
2185             ProtobufUtil.toTableName(request.getTableName()) : null;
2186           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2187           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2188             @Override
2189             public List<UserPermission> run() throws Exception {
2190               return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2191             }
2192           });
2193         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2194           final String namespace = request.getNamespaceName().toStringUtf8();
2195           requireNamespacePermission("userPermissions", namespace, Action.ADMIN);
2196           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2197             @Override
2198             public List<UserPermission> run() throws Exception {
2199               return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2200                 namespace);
2201             }
2202           });
2203         } else {
2204           requirePermission("userPermissions", Action.ADMIN);
2205           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2206             @Override
2207             public List<UserPermission> run() throws Exception {
2208               return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2209             }
2210           });
2211         }
2212         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2213       } else {
2214         throw new CoprocessorException(AccessController.class, "This method "
2215             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2216       }
2217     } catch (IOException ioe) {
2218       // pass exception back up
2219       ResponseConverter.setControllerException(controller, ioe);
2220     }
2221     done.run(response);
2222   }
2223 
2224   @Override
2225   public void checkPermissions(RpcController controller,
2226                                AccessControlProtos.CheckPermissionsRequest request,
2227                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2228     Permission[] permissions = new Permission[request.getPermissionCount()];
2229     for (int i=0; i < request.getPermissionCount(); i++) {
2230       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2231     }
2232     AccessControlProtos.CheckPermissionsResponse response = null;
2233     try {
2234       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2235       for (Permission permission : permissions) {
2236         if (permission instanceof TablePermission) {
2237           TablePermission tperm = (TablePermission) permission;
2238           for (Action action : permission.getActions()) {
2239             if (!tperm.getTableName().equals(tableName)) {
2240               throw new CoprocessorException(AccessController.class, String.format("This method "
2241                   + "can only execute at the table specified in TablePermission. " +
2242                   "Table of the region:%s , requested table:%s", tableName,
2243                   tperm.getTableName()));
2244             }
2245 
2246             Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2247             if (tperm.getFamily() != null) {
2248               if (tperm.getQualifier() != null) {
2249                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2250                 qualifiers.add(tperm.getQualifier());
2251                 familyMap.put(tperm.getFamily(), qualifiers);
2252               } else {
2253                 familyMap.put(tperm.getFamily(), null);
2254               }
2255             }
2256 
2257             requirePermission("checkPermissions", action, regionEnv, familyMap);
2258           }
2259 
2260         } else {
2261           for (Action action : permission.getActions()) {
2262             requirePermission("checkPermissions", action);
2263           }
2264         }
2265       }
2266       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2267     } catch (IOException ioe) {
2268       ResponseConverter.setControllerException(controller, ioe);
2269     }
2270     done.run(response);
2271   }
2272 
2273   @Override
2274   public Service getService() {
2275     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2276   }
2277 
2278   private HRegion getRegion(RegionCoprocessorEnvironment e) {
2279     return e.getRegion();
2280   }
2281 
2282   private TableName getTableName(RegionCoprocessorEnvironment e) {
2283     HRegion region = e.getRegion();
2284     if (region != null) {
2285       return getTableName(region);
2286     }
2287     return null;
2288   }
2289 
2290   private TableName getTableName(HRegion region) {
2291     HRegionInfo regionInfo = region.getRegionInfo();
2292     if (regionInfo != null) {
2293       return regionInfo.getTable();
2294     }
2295     return null;
2296   }
2297 
2298   @Override
2299   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2300       throws IOException {
2301     requirePermission("preClose", Action.ADMIN);
2302   }
2303 
2304   private void isSystemOrSuperUser(Configuration conf) throws IOException {
2305     User user = userProvider.getCurrent();
2306     if (user == null) {
2307       throw new IOException("Unable to obtain the current user, " +
2308         "authorization checks for internal operations will not work correctly!");
2309     }
2310     User activeUser = getActiveUser();
2311     if (!(superusers.contains(activeUser.getShortName()))) {
2312       throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null") +
2313         "is not system or super user.");
2314     }
2315   }
2316 
2317   @Override
2318   public void preStopRegionServer(
2319       ObserverContext<RegionServerCoprocessorEnvironment> env)
2320       throws IOException {
2321     requirePermission("preStopRegionServer", Action.ADMIN);
2322   }
2323 
2324   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2325       byte[] qualifier) {
2326     if (family == null) {
2327       return null;
2328     }
2329 
2330     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2331     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2332     return familyMap;
2333   }
2334 
2335   @Override
2336   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2337        List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2338        String regex) throws IOException {
2339     // We are delegating the authorization check to postGetTableDescriptors as we don't have
2340     // any concrete set of table names when a regex is present or the full list is requested.
2341     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2342       // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2343       // request can be granted.
2344       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2345       for (TableName tableName: tableNamesList) {
2346         // Skip checks for a table that does not exist
2347         if (!masterServices.getTableStateManager().isTablePresent(tableName))
2348           continue;
2349         requirePermission("getTableDescriptors", tableName, null, null,
2350             Action.ADMIN, Action.CREATE);
2351       }
2352     }
2353   }
2354 
2355   @Override
2356   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2357       List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2358       String regex) throws IOException {
2359     // Skipping as checks in this case are already done by preGetTableDescriptors.
2360     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2361       return;
2362     }
2363 
2364     // Retains only those which passes authorization checks, as the checks weren't done as part
2365     // of preGetTableDescriptors.
2366     Iterator<HTableDescriptor> itr = descriptors.iterator();
2367     while (itr.hasNext()) {
2368       HTableDescriptor htd = itr.next();
2369       try {
2370         requirePermission("getTableDescriptors", htd.getTableName(), null, null,
2371             Action.ADMIN, Action.CREATE);
2372       } catch (AccessDeniedException e) {
2373         itr.remove();
2374       }
2375     }
2376   }
2377 
2378   @Override
2379   public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2380       List<HTableDescriptor> descriptors, String regex) throws IOException {
2381     // Retains only those which passes authorization checks.
2382     Iterator<HTableDescriptor> itr = descriptors.iterator();
2383     while (itr.hasNext()) {
2384       HTableDescriptor htd = itr.next();
2385       try {
2386         requireAccess("getTableNames", htd.getTableName(), Action.values());
2387       } catch (AccessDeniedException e) {
2388         itr.remove();
2389       }
2390     }
2391   }
2392 
2393   @Override
2394   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA,
2395       HRegion regionB) throws IOException {
2396     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2397       Action.ADMIN);
2398   }
2399 
2400   @Override
2401   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, HRegion regionA,
2402       HRegion regionB, HRegion mergedRegion) throws IOException { }
2403 
2404   @Override
2405   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2406       HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException { }
2407 
2408   @Override
2409   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2410       HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException { }
2411 
2412   @Override
2413   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2414       HRegion regionA, HRegion regionB) throws IOException { }
2415 
2416   @Override
2417   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2418       HRegion regionA, HRegion regionB) throws IOException { }
2419 
2420   @Override
2421   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2422       throws IOException {
2423     requirePermission("preRollLogWriterRequest", Permission.Action.ADMIN);
2424   }
2425 
2426   @Override
2427   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2428       throws IOException { }
2429 
2430   @Override
2431   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2432       final String userName, final Quotas quotas) throws IOException {
2433     requirePermission("setUserQuota", Action.ADMIN);
2434   }
2435 
2436   @Override
2437   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2438       final String userName, final TableName tableName, final Quotas quotas) throws IOException {
2439     requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN);
2440   }
2441 
2442   @Override
2443   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2444       final String userName, final String namespace, final Quotas quotas) throws IOException {
2445     requirePermission("setUserNamespaceQuota", Action.ADMIN);
2446   }
2447 
2448   @Override
2449   public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2450       final TableName tableName, final Quotas quotas) throws IOException {
2451     requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
2452   }
2453 
2454   @Override
2455   public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2456       final String namespace, final Quotas quotas) throws IOException {
2457     requirePermission("setNamespaceQuota", Action.ADMIN);
2458   }
2459 
2460   @Override
2461   public ReplicationEndpoint postCreateReplicationEndPoint(
2462       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2463     return endpoint;
2464   }
2465 }