View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.security.access;
20  
21  import com.google.common.net.HostAndPort;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.security.PrivilegedExceptionAction;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  import java.util.Set;
34  import java.util.TreeMap;
35  import java.util.TreeSet;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.ArrayBackedTag;
41  import org.apache.hadoop.hbase.Cell;
42  import org.apache.hadoop.hbase.CellScanner;
43  import org.apache.hadoop.hbase.CellUtil;
44  import org.apache.hadoop.hbase.CompoundConfiguration;
45  import org.apache.hadoop.hbase.CoprocessorEnvironment;
46  import org.apache.hadoop.hbase.DoNotRetryIOException;
47  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
48  import org.apache.hadoop.hbase.HColumnDescriptor;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.HTableDescriptor;
52  import org.apache.hadoop.hbase.KeyValue;
53  import org.apache.hadoop.hbase.KeyValue.Type;
54  import org.apache.hadoop.hbase.MetaTableAccessor;
55  import org.apache.hadoop.hbase.NamespaceDescriptor;
56  import org.apache.hadoop.hbase.ProcedureInfo;
57  import org.apache.hadoop.hbase.ServerName;
58  import org.apache.hadoop.hbase.TableName;
59  import org.apache.hadoop.hbase.Tag;
60  import org.apache.hadoop.hbase.TagRewriteCell;
61  import org.apache.hadoop.hbase.TagUtil;
62  import org.apache.hadoop.hbase.classification.InterfaceAudience;
63  import org.apache.hadoop.hbase.client.Admin;
64  import org.apache.hadoop.hbase.client.Append;
65  import org.apache.hadoop.hbase.client.Delete;
66  import org.apache.hadoop.hbase.client.Durability;
67  import org.apache.hadoop.hbase.client.Get;
68  import org.apache.hadoop.hbase.client.Increment;
69  import org.apache.hadoop.hbase.client.Mutation;
70  import org.apache.hadoop.hbase.client.Put;
71  import org.apache.hadoop.hbase.client.Query;
72  import org.apache.hadoop.hbase.client.Result;
73  import org.apache.hadoop.hbase.client.Scan;
74  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
75  import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
76  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
77  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
78  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
79  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
80  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
81  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
82  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
83  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
84  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
85  import org.apache.hadoop.hbase.filter.CompareFilter;
86  import org.apache.hadoop.hbase.filter.Filter;
87  import org.apache.hadoop.hbase.filter.FilterList;
88  import org.apache.hadoop.hbase.io.hfile.HFile;
89  import org.apache.hadoop.hbase.ipc.RpcServer;
90  import org.apache.hadoop.hbase.master.MasterServices;
91  import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
92  import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
93  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
94  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
95  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
96  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
97  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
98  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
99  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
100 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
101 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
102 import org.apache.hadoop.hbase.regionserver.InternalScanner;
103 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
104 import org.apache.hadoop.hbase.regionserver.Region;
105 import org.apache.hadoop.hbase.regionserver.RegionScanner;
106 import org.apache.hadoop.hbase.regionserver.ScanType;
107 import org.apache.hadoop.hbase.regionserver.ScannerContext;
108 import org.apache.hadoop.hbase.regionserver.Store;
109 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
110 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
111 import org.apache.hadoop.hbase.security.AccessDeniedException;
112 import org.apache.hadoop.hbase.security.Superusers;
113 import org.apache.hadoop.hbase.security.User;
114 import org.apache.hadoop.hbase.security.UserProvider;
115 import org.apache.hadoop.hbase.security.access.Permission.Action;
116 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
117 import org.apache.hadoop.hbase.util.ByteRange;
118 import org.apache.hadoop.hbase.util.Bytes;
119 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
120 import org.apache.hadoop.hbase.util.Pair;
121 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
122 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
123 
124 import com.google.common.collect.ArrayListMultimap;
125 import com.google.common.collect.ImmutableSet;
126 import com.google.common.collect.ListMultimap;
127 import com.google.common.collect.Lists;
128 import com.google.common.collect.MapMaker;
129 import com.google.common.collect.Maps;
130 import com.google.common.collect.Sets;
131 import com.google.protobuf.Message;
132 import com.google.protobuf.RpcCallback;
133 import com.google.protobuf.RpcController;
134 import com.google.protobuf.Service;
135 
136 /**
137  * Provides basic authorization checks for data access and administrative
138  * operations.
139  *
140  * <p>
141  * {@code AccessController} performs authorization checks for HBase operations
142  * based on:
143  * </p>
144  * <ul>
145  *   <li>the identity of the user performing the operation</li>
146  *   <li>the scope over which the operation is performed, in increasing
147  *   specificity: global, table, column family, or qualifier</li>
148  *   <li>the type of action being performed (as mapped to
149  *   {@link Permission.Action} values)</li>
150  * </ul>
151  * <p>
152  * If the authorization check fails, an {@link AccessDeniedException}
153  * will be thrown for the operation.
154  * </p>
155  *
156  * <p>
157  * To perform authorization checks, {@code AccessController} relies on the
158  * RpcServerEngine being loaded to provide
159  * the user identities for remote requests.
160  * </p>
161  *
162  * <p>
163  * The access control lists used for authorization can be manipulated via the
164  * exposed {@link AccessControlService} Interface implementation, and the associated
165  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
166  * commands.
167  * </p>
168  */
169 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
170 public class AccessController extends BaseMasterAndRegionObserver
171     implements RegionServerObserver,
172       AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
173 
174   private static final Log LOG = LogFactory.getLog(AccessController.class);
175 
176   private static final Log AUDITLOG =
177     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
178   private static final String CHECK_COVERING_PERM = "check_covering_perm";
179   private static final String TAG_CHECK_PASSED = "tag_check_passed";
180   private static final byte[] TRUE = Bytes.toBytes(true);
181 
182   TableAuthManager authManager = null;
183 
184   /** flags if we are running on a region of the _acl_ table */
185   boolean aclRegion = false;
186 
187   /** defined only for Endpoint implementation, so it can have way to
188    access region services */
189   private RegionCoprocessorEnvironment regionEnv;
190 
191   /** Mapping of scanner instances to the user who created them */
192   private Map<InternalScanner,String> scannerOwners =
193       new MapMaker().weakKeys().makeMap();
194 
195   private Map<TableName, List<UserPermission>> tableAcls;
196 
197   /** Provider for mapping principal names to Users */
198   private UserProvider userProvider;
199 
200   /** if we are active, usually true, only not true if "hbase.security.authorization"
201    has been set to false in site configuration */
202   boolean authorizationEnabled;
203 
204   /** if we are able to support cell ACLs */
205   boolean cellFeaturesEnabled;
206 
207   /** if we should check EXEC permissions */
208   boolean shouldCheckExecPermission;
209 
210   /** if we should terminate access checks early as soon as table or CF grants
211     allow access; pre-0.98 compatible behavior */
212   boolean compatibleEarlyTermination;
213 
214   /** if we have been successfully initialized */
215   private volatile boolean initialized = false;
216 
217   /** if the ACL table is available, only relevant in the master */
218   private volatile boolean aclTabAvailable = false;
219 
220   public static boolean isAuthorizationSupported(Configuration conf) {
221     return conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
222   }
223 
224   public static boolean isCellAuthorizationSupported(Configuration conf) {
225     return isAuthorizationSupported(conf) &&
226         (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
227   }
228 
229   public Region getRegion() {
230     return regionEnv != null ? regionEnv.getRegion() : null;
231   }
232 
233   public TableAuthManager getAuthManager() {
234     return authManager;
235   }
236 
237   void initialize(RegionCoprocessorEnvironment e) throws IOException {
238     final Region region = e.getRegion();
239     Configuration conf = e.getConfiguration();
240     Map<byte[], ListMultimap<String,TablePermission>> tables =
241         AccessControlLists.loadAll(region);
242     // For each table, write out the table's permissions to the respective
243     // znode for that table.
244     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
245       tables.entrySet()) {
246       byte[] entry = t.getKey();
247       ListMultimap<String,TablePermission> perms = t.getValue();
248       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
249       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
250     }
251     initialized = true;
252   }
253 
254   /**
255    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
256    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
257    * table updates.
258    */
259   void updateACL(RegionCoprocessorEnvironment e,
260       final Map<byte[], List<Cell>> familyMap) {
261     Set<byte[]> entries =
262         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
263     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
264       List<Cell> cells = f.getValue();
265       for (Cell cell: cells) {
266         if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
267             cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
268             AccessControlLists.ACL_LIST_FAMILY.length)) {
269           entries.add(CellUtil.cloneRow(cell));
270         }
271       }
272     }
273     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
274     Configuration conf = regionEnv.getConfiguration();
275     for (byte[] entry: entries) {
276       try {
277         ListMultimap<String,TablePermission> perms =
278           AccessControlLists.getPermissions(conf, entry);
279         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
280         zkw.writeToZookeeper(entry, serialized);
281       } catch (IOException ex) {
282         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
283             ex);
284       }
285     }
286   }
287 
288   /**
289    * Check the current user for authorization to perform a specific action
290    * against the given set of row data.
291    *
292    * <p>Note: Ordering of the authorization checks
293    * has been carefully optimized to short-circuit the most common requests
294    * and minimize the amount of processing required.</p>
295    *
296    * @param permRequest the action being requested
297    * @param e the coprocessor environment
298    * @param families the map of column families to qualifiers present in
299    * the request
300    * @return an authorization result
301    */
302   AuthResult permissionGranted(String request, User user, Action permRequest,
303       RegionCoprocessorEnvironment e,
304       Map<byte [], ? extends Collection<?>> families) {
305     HRegionInfo hri = e.getRegion().getRegionInfo();
306     TableName tableName = hri.getTable();
307 
308     // 1. All users need read access to hbase:meta table.
309     // this is a very common operation, so deal with it quickly.
310     if (hri.isMetaRegion()) {
311       if (permRequest == Action.READ) {
312         return AuthResult.allow(request, "All users allowed", user,
313           permRequest, tableName, families);
314       }
315     }
316 
317     if (user == null) {
318       return AuthResult.deny(request, "No user associated with request!", null,
319         permRequest, tableName, families);
320     }
321 
322     // 2. check for the table-level, if successful we can short-circuit
323     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
324       return AuthResult.allow(request, "Table permission granted", user,
325         permRequest, tableName, families);
326     }
327 
328     // 3. check permissions against the requested families
329     if (families != null && families.size() > 0) {
330       // all families must pass
331       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
332         // a) check for family level access
333         if (authManager.authorize(user, tableName, family.getKey(),
334             permRequest)) {
335           continue;  // family-level permission overrides per-qualifier
336         }
337 
338         // b) qualifier level access can still succeed
339         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
340           if (family.getValue() instanceof Set) {
341             // for each qualifier of the family
342             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
343             for (byte[] qualifier : familySet) {
344               if (!authManager.authorize(user, tableName, family.getKey(),
345                                          qualifier, permRequest)) {
346                 return AuthResult.deny(request, "Failed qualifier check", user,
347                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
348               }
349             }
350           } else if (family.getValue() instanceof List) { // List<KeyValue>
351             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
352             for (KeyValue kv : kvList) {
353               if (!authManager.authorize(user, tableName, family.getKey(),
354                 CellUtil.cloneQualifier(kv), permRequest)) {
355                 return AuthResult.deny(request, "Failed qualifier check", user, permRequest,
356                   tableName, makeFamilyMap(family.getKey(), CellUtil.cloneQualifier(kv)));
357               }
358             }
359           }
360         } else {
361           // no qualifiers and family-level check already failed
362           return AuthResult.deny(request, "Failed family check", user, permRequest,
363               tableName, makeFamilyMap(family.getKey(), null));
364         }
365       }
366 
367       // all family checks passed
368       return AuthResult.allow(request, "All family checks passed", user, permRequest,
369           tableName, families);
370     }
371 
372     // 4. no families to check and table level access failed
373     return AuthResult.deny(request, "No families to check and table permission failed",
374         user, permRequest, tableName, families);
375   }
376 
377   /**
378    * Check the current user for authorization to perform a specific action
379    * against the given set of row data.
380    * @param opType the operation type
381    * @param user the user
382    * @param e the coprocessor environment
383    * @param families the map of column families to qualifiers present in
384    * the request
385    * @param actions the desired actions
386    * @return an authorization result
387    */
388   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
389       Map<byte [], ? extends Collection<?>> families, Action... actions) {
390     AuthResult result = null;
391     for (Action action: actions) {
392       result = permissionGranted(opType.toString(), user, action, e, families);
393       if (!result.isAllowed()) {
394         return result;
395       }
396     }
397     return result;
398   }
399 
400   private void logResult(AuthResult result) {
401     if (AUDITLOG.isTraceEnabled()) {
402       InetAddress remoteAddr = RpcServer.getRemoteAddress();
403       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
404           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
405           "; reason: " + result.getReason() +
406           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
407           "; request: " + result.getRequest() +
408           "; context: " + result.toContextString());
409     }
410   }
411 
412   /**
413    * Returns the active user to which authorization checks should be applied.
414    * If we are in the context of an RPC call, the remote user is used,
415    * otherwise the currently logged in user is used.
416    */
417   private User getActiveUser() throws IOException {
418     User user = RpcServer.getRequestUser();
419     if (user == null) {
420       // for non-rpc handling, fallback to system user
421       user = userProvider.getCurrent();
422     }
423     return user;
424   }
425 
426   /**
427    * Authorizes that the current user has any of the given permissions for the
428    * given table, column family and column qualifier.
429    * @param tableName Table requested
430    * @param family Column family requested
431    * @param qualifier Column qualifier requested
432    * @throws IOException if obtaining the current user fails
433    * @throws AccessDeniedException if user has no authorization
434    */
435   private void requirePermission(String request, TableName tableName, byte[] family,
436       byte[] qualifier, Action... permissions) throws IOException {
437     User user = getActiveUser();
438     AuthResult result = null;
439 
440     for (Action permission : permissions) {
441       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
442         result = AuthResult.allow(request, "Table permission granted", user,
443                                   permission, tableName, family, qualifier);
444         break;
445       } else {
446         // rest of the world
447         result = AuthResult.deny(request, "Insufficient permissions", user,
448                                  permission, tableName, family, qualifier);
449       }
450     }
451     logResult(result);
452     if (authorizationEnabled && !result.isAllowed()) {
453       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
454     }
455   }
456 
457   /**
458    * Authorizes that the current user has any of the given permissions for the
459    * given table, column family and column qualifier.
460    * @param tableName Table requested
461    * @param family Column family param
462    * @param qualifier Column qualifier param
463    * @throws IOException if obtaining the current user fails
464    * @throws AccessDeniedException if user has no authorization
465    */
466   private void requireTablePermission(String request, TableName tableName, byte[] family,
467       byte[] qualifier, Action... permissions) throws IOException {
468     User user = getActiveUser();
469     AuthResult result = null;
470 
471     for (Action permission : permissions) {
472       if (authManager.authorize(user, tableName, null, null, permission)) {
473         result = AuthResult.allow(request, "Table permission granted", user,
474             permission, tableName, null, null);
475         result.getParams().setFamily(family).setQualifier(qualifier);
476         break;
477       } else {
478         // rest of the world
479         result = AuthResult.deny(request, "Insufficient permissions", user,
480             permission, tableName, family, qualifier);
481         result.getParams().setFamily(family).setQualifier(qualifier);
482       }
483     }
484     logResult(result);
485     if (authorizationEnabled && !result.isAllowed()) {
486       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
487     }
488   }
489 
490   /**
491    * Authorizes that the current user has any of the given permissions to access the table.
492    *
493    * @param tableName Table requested
494    * @param permissions Actions being requested
495    * @throws IOException if obtaining the current user fails
496    * @throws AccessDeniedException if user has no authorization
497    */
498   private void requireAccess(String request, TableName tableName,
499       Action... permissions) throws IOException {
500     User user = getActiveUser();
501     AuthResult result = null;
502 
503     for (Action permission : permissions) {
504       if (authManager.hasAccess(user, tableName, permission)) {
505         result = AuthResult.allow(request, "Table permission granted", user,
506                                   permission, tableName, null, null);
507         break;
508       } else {
509         // rest of the world
510         result = AuthResult.deny(request, "Insufficient permissions", user,
511                                  permission, tableName, null, null);
512       }
513     }
514     logResult(result);
515     if (authorizationEnabled && !result.isAllowed()) {
516       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
517     }
518   }
519 
520   /**
521    * Authorizes that the current user has global privileges for the given action.
522    * @param perm The action being requested
523    * @throws IOException if obtaining the current user fails
524    * @throws AccessDeniedException if authorization is denied
525    */
526   private void requirePermission(String request, Action perm) throws IOException {
527     requireGlobalPermission(request, perm, null, null);
528   }
529 
530   /**
531    * Checks that the user has the given global permission. The generated
532    * audit log message will contain context information for the operation
533    * being authorized, based on the given parameters.
534    * @param perm Action being requested
535    * @param tableName Affected table name.
536    * @param familyMap Affected column families.
537    */
538   private void requireGlobalPermission(String request, Action perm, TableName tableName,
539       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
540     User user = getActiveUser();
541     AuthResult result = null;
542     if (authManager.authorize(user, perm)) {
543       result = AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap);
544       result.getParams().setTableName(tableName).setFamilies(familyMap);
545       logResult(result);
546     } else {
547       result = AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap);
548       result.getParams().setTableName(tableName).setFamilies(familyMap);
549       logResult(result);
550       if (authorizationEnabled) {
551         throw new AccessDeniedException("Insufficient permissions for user '" +
552           (user != null ? user.getShortName() : "null") +"' (global, action=" +
553           perm.toString() + ")");
554       }
555     }
556   }
557 
558   /**
559    * Checks that the user has the given global permission. The generated
560    * audit log message will contain context information for the operation
561    * being authorized, based on the given parameters.
562    * @param perm Action being requested
563    * @param namespace
564    */
565   private void requireGlobalPermission(String request, Action perm,
566                                        String namespace) throws IOException {
567     User user = getActiveUser();
568     AuthResult authResult = null;
569     if (authManager.authorize(user, perm)) {
570       authResult = AuthResult.allow(request, "Global check allowed", user, perm, null);
571       authResult.getParams().setNamespace(namespace);
572       logResult(authResult);
573     } else {
574       authResult = AuthResult.deny(request, "Global check failed", user, perm, null);
575       authResult.getParams().setNamespace(namespace);
576       logResult(authResult);
577       if (authorizationEnabled) {
578         throw new AccessDeniedException("Insufficient permissions for user '" +
579           (user != null ? user.getShortName() : "null") +"' (global, action=" +
580           perm.toString() + ")");
581       }
582     }
583   }
584 
585   /**
586    * Checks that the user has the given global or namespace permission.
587    * @param namespace
588    * @param permissions Actions being requested
589    */
590   public void requireNamespacePermission(String request, String namespace,
591       Action... permissions) throws IOException {
592     User user = getActiveUser();
593     AuthResult result = null;
594 
595     for (Action permission : permissions) {
596       if (authManager.authorize(user, namespace, permission)) {
597         result = AuthResult.allow(request, "Namespace permission granted",
598             user, permission, namespace);
599         break;
600       } else {
601         // rest of the world
602         result = AuthResult.deny(request, "Insufficient permissions", user,
603             permission, namespace);
604       }
605     }
606     logResult(result);
607     if (authorizationEnabled && !result.isAllowed()) {
608       throw new AccessDeniedException("Insufficient permissions "
609           + result.toContextString());
610     }
611   }
612 
613   /**
614    * Checks that the user has the given global or namespace permission.
615    * @param namespace
616    * @param permissions Actions being requested
617    */
618   public void requireNamespacePermission(String request, String namespace, TableName tableName,
619       Map<byte[], ? extends Collection<byte[]>> familyMap, Action... permissions)
620       throws IOException {
621     User user = getActiveUser();
622     AuthResult result = null;
623 
624     for (Action permission : permissions) {
625       if (authManager.authorize(user, namespace, permission)) {
626         result = AuthResult.allow(request, "Namespace permission granted",
627             user, permission, namespace);
628         result.getParams().setTableName(tableName).setFamilies(familyMap);
629         break;
630       } else {
631         // rest of the world
632         result = AuthResult.deny(request, "Insufficient permissions", user,
633             permission, namespace);
634         result.getParams().setTableName(tableName).setFamilies(familyMap);
635       }
636     }
637     logResult(result);
638     if (authorizationEnabled && !result.isAllowed()) {
639       throw new AccessDeniedException("Insufficient permissions "
640           + result.toContextString());
641     }
642   }
643 
644   /**
645    * Returns <code>true</code> if the current user is allowed the given action
646    * over at least one of the column qualifiers in the given column families.
647    */
648   private boolean hasFamilyQualifierPermission(User user,
649       Action perm,
650       RegionCoprocessorEnvironment env,
651       Map<byte[], ? extends Collection<byte[]>> familyMap)
652     throws IOException {
653     HRegionInfo hri = env.getRegion().getRegionInfo();
654     TableName tableName = hri.getTable();
655 
656     if (user == null) {
657       return false;
658     }
659 
660     if (familyMap != null && familyMap.size() > 0) {
661       // at least one family must be allowed
662       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
663           familyMap.entrySet()) {
664         if (family.getValue() != null && !family.getValue().isEmpty()) {
665           for (byte[] qualifier : family.getValue()) {
666             if (authManager.matchPermission(user, tableName,
667                 family.getKey(), qualifier, perm)) {
668               return true;
669             }
670           }
671         } else {
672           if (authManager.matchPermission(user, tableName, family.getKey(),
673               perm)) {
674             return true;
675           }
676         }
677       }
678     } else if (LOG.isDebugEnabled()) {
679       LOG.debug("Empty family map passed for permission check");
680     }
681 
682     return false;
683   }
684 
685   private enum OpType {
686     GET("get"),
687     EXISTS("exists"),
688     SCAN("scan"),
689     PUT("put"),
690     DELETE("delete"),
691     CHECK_AND_PUT("checkAndPut"),
692     CHECK_AND_DELETE("checkAndDelete"),
693     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
694     APPEND("append"),
695     INCREMENT("increment");
696 
697     private String type;
698 
699     private OpType(String type) {
700       this.type = type;
701     }
702 
703     @Override
704     public String toString() {
705       return type;
706     }
707   }
708 
709   /**
710    * Determine if cell ACLs covered by the operation grant access. This is expensive.
711    * @return false if cell ACLs failed to grant access, true otherwise
712    * @throws IOException
713    */
714   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
715       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
716       throws IOException {
717     if (!cellFeaturesEnabled) {
718       return false;
719     }
720     long cellGrants = 0;
721     User user = getActiveUser();
722     long latestCellTs = 0;
723     Get get = new Get(row);
724     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
725     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
726     // version. We have to get every cell version and check its TS against the TS asked for in
727     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
728     // consider only one such passing cell. In case of Delete we have to consider all the cell
729     // versions under this passing version. When Delete Mutation contains columns which are a
730     // version delete just consider only one version for those column cells.
731     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
732     if (considerCellTs) {
733       get.setMaxVersions();
734     } else {
735       get.setMaxVersions(1);
736     }
737     boolean diffCellTsFromOpTs = false;
738     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
739       byte[] col = entry.getKey();
740       // TODO: HBASE-7114 could possibly unify the collection type in family
741       // maps so we would not need to do this
742       if (entry.getValue() instanceof Set) {
743         Set<byte[]> set = (Set<byte[]>)entry.getValue();
744         if (set == null || set.isEmpty()) {
745           get.addFamily(col);
746         } else {
747           for (byte[] qual: set) {
748             get.addColumn(col, qual);
749           }
750         }
751       } else if (entry.getValue() instanceof List) {
752         List<Cell> list = (List<Cell>)entry.getValue();
753         if (list == null || list.isEmpty()) {
754           get.addFamily(col);
755         } else {
756           // In case of family delete, a Cell will be added into the list with Qualifier as null.
757           for (Cell cell : list) {
758             if (cell.getQualifierLength() == 0
759                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
760                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
761               get.addFamily(col);
762             } else {
763               get.addColumn(col, CellUtil.cloneQualifier(cell));
764             }
765             if (considerCellTs) {
766               long cellTs = cell.getTimestamp();
767               latestCellTs = Math.max(latestCellTs, cellTs);
768               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
769             }
770           }
771         }
772       } else if (entry.getValue() == null) {
773         get.addFamily(col);
774       } else {
775         throw new RuntimeException("Unhandled collection type " +
776           entry.getValue().getClass().getName());
777       }
778     }
779     // We want to avoid looking into the future. So, if the cells of the
780     // operation specify a timestamp, or the operation itself specifies a
781     // timestamp, then we use the maximum ts found. Otherwise, we bound
782     // the Get to the current server time. We add 1 to the timerange since
783     // the upper bound of a timerange is exclusive yet we need to examine
784     // any cells found there inclusively.
785     long latestTs = Math.max(opTs, latestCellTs);
786     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
787       latestTs = EnvironmentEdgeManager.currentTime();
788     }
789     get.setTimeRange(0, latestTs + 1);
790     // In case of Put operation we set to read all versions. This was done to consider the case
791     // where columns are added with TS other than the Mutation TS. But normally this wont be the
792     // case with Put. There no need to get all versions but get latest version only.
793     if (!diffCellTsFromOpTs && request == OpType.PUT) {
794       get.setMaxVersions(1);
795     }
796     if (LOG.isTraceEnabled()) {
797       LOG.trace("Scanning for cells with " + get);
798     }
799     // This Map is identical to familyMap. The key is a BR rather than byte[].
800     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
801     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
802     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
803     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
804       if (entry.getValue() instanceof List) {
805         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
806       }
807     }
808     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
809     List<Cell> cells = Lists.newArrayList();
810     Cell prevCell = null;
811     ByteRange curFam = new SimpleMutableByteRange();
812     boolean curColAllVersions = (request == OpType.DELETE);
813     long curColCheckTs = opTs;
814     boolean foundColumn = false;
815     try {
816       boolean more = false;
817       ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
818 
819       do {
820         cells.clear();
821         // scan with limit as 1 to hold down memory use on wide rows
822         more = scanner.next(cells, scannerContext);
823         for (Cell cell: cells) {
824           if (LOG.isTraceEnabled()) {
825             LOG.trace("Found cell " + cell);
826           }
827           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
828           if (colChange) foundColumn = false;
829           prevCell = cell;
830           if (!curColAllVersions && foundColumn) {
831             continue;
832           }
833           if (colChange && considerCellTs) {
834             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
835             List<Cell> cols = familyMap1.get(curFam);
836             for (Cell col : cols) {
837               // null/empty qualifier is used to denote a Family delete. The TS and delete type
838               // associated with this is applicable for all columns within the family. That is
839               // why the below (col.getQualifierLength() == 0) check.
840               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
841                   || CellUtil.matchingQualifier(cell, col)) {
842                 byte type = col.getTypeByte();
843                 if (considerCellTs) {
844                   curColCheckTs = col.getTimestamp();
845                 }
846                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
847                 // a version delete for a column no need to check all the covering cells within
848                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
849                 // One version delete types are Delete/DeleteFamilyVersion
850                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
851                     || (KeyValue.Type.DeleteFamily.getCode() == type);
852                 break;
853               }
854             }
855           }
856           if (cell.getTimestamp() > curColCheckTs) {
857             // Just ignore this cell. This is not a covering cell.
858             continue;
859           }
860           foundColumn = true;
861           for (Action action: actions) {
862             // Are there permissions for this user for the cell?
863             if (!authManager.authorize(user, getTableName(e), cell, action)) {
864               // We can stop if the cell ACL denies access
865               return false;
866             }
867           }
868           cellGrants++;
869         }
870       } while (more);
871     } catch (AccessDeniedException ex) {
872       throw ex;
873     } catch (IOException ex) {
874       LOG.error("Exception while getting cells to calculate covering permission", ex);
875     } finally {
876       scanner.close();
877     }
878     // We should not authorize unless we have found one or more cell ACLs that
879     // grant access. This code is used to check for additional permissions
880     // after no table or CF grants are found.
881     return cellGrants > 0;
882   }
883 
884   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
885     // Iterate over the entries in the familyMap, replacing the cells therein
886     // with new cells including the ACL data
887     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
888       List<Cell> newCells = Lists.newArrayList();
889       for (Cell cell: e.getValue()) {
890         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
891         List<Tag> tags = new ArrayList<Tag>();
892         tags.add(new ArrayBackedTag(AccessControlLists.ACL_TAG_TYPE, perms));
893         Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell);
894         while (tagIterator.hasNext()) {
895           tags.add(tagIterator.next());
896         }
897         newCells.add(new TagRewriteCell(cell, TagUtil.fromList(tags)));
898       }
899       // This is supposed to be safe, won't CME
900       e.setValue(newCells);
901     }
902   }
903 
904   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
905   // type is reserved and should not be explicitly set by user.
906   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
907     // No need to check if we're not going to throw
908     if (!authorizationEnabled) {
909       m.setAttribute(TAG_CHECK_PASSED, TRUE);
910       return;
911     }
912     // Superusers are allowed to store cells unconditionally.
913     if (Superusers.isSuperUser(user)) {
914       m.setAttribute(TAG_CHECK_PASSED, TRUE);
915       return;
916     }
917     // We already checked (prePut vs preBatchMutation)
918     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
919       return;
920     }
921     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
922       Iterator<Tag> tagsItr = CellUtil.tagsIterator(cellScanner.current());
923       while (tagsItr.hasNext()) {
924         if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
925           throw new AccessDeniedException("Mutation contains cell with reserved type tag");
926         }
927       }
928     }
929     m.setAttribute(TAG_CHECK_PASSED, TRUE);
930   }
931 
932   /* ---- MasterObserver implementation ---- */
933   @Override
934   public void start(CoprocessorEnvironment env) throws IOException {
935     CompoundConfiguration conf = new CompoundConfiguration();
936     conf.add(env.getConfiguration());
937 
938     authorizationEnabled = isAuthorizationSupported(conf);
939     if (!authorizationEnabled) {
940       LOG.warn("The AccessController has been loaded with authorization checks disabled.");
941     }
942 
943     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
944       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
945 
946     cellFeaturesEnabled = (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
947     if (!cellFeaturesEnabled) {
948       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
949           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
950           + " accordingly.");
951     }
952 
953     ZooKeeperWatcher zk = null;
954     if (env instanceof MasterCoprocessorEnvironment) {
955       // if running on HMaster
956       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
957       zk = mEnv.getMasterServices().getZooKeeper();
958     } else if (env instanceof RegionServerCoprocessorEnvironment) {
959       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
960       zk = rsEnv.getRegionServerServices().getZooKeeper();
961     } else if (env instanceof RegionCoprocessorEnvironment) {
962       // if running at region
963       regionEnv = (RegionCoprocessorEnvironment) env;
964       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
965       zk = regionEnv.getRegionServerServices().getZooKeeper();
966       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
967         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
968     }
969 
970     // set the user-provider.
971     this.userProvider = UserProvider.instantiate(env.getConfiguration());
972 
973     // If zk is null or IOException while obtaining auth manager,
974     // throw RuntimeException so that the coprocessor is unloaded.
975     if (zk != null) {
976       try {
977         this.authManager = TableAuthManager.getOrCreate(zk, env.getConfiguration());
978       } catch (IOException ioe) {
979         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
980       }
981     } else {
982       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
983     }
984 
985     tableAcls = new MapMaker().weakValues().makeMap();
986   }
987 
988   @Override
989   public void stop(CoprocessorEnvironment env) {
990     if (this.authManager != null) {
991       TableAuthManager.release(authManager);
992     }
993   }
994 
995   @Override
996   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
997       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
998     Set<byte[]> families = desc.getFamiliesKeys();
999     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
1000     for (byte[] family: families) {
1001       familyMap.put(family, null);
1002     }
1003     requireNamespacePermission("createTable", desc.getTableName().getNamespaceAsString(),
1004         desc.getTableName(), familyMap, Action.CREATE);
1005   }
1006 
1007   @Override
1008   public void postCreateTableHandler(final ObserverContext<MasterCoprocessorEnvironment> c,
1009       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
1010     // When AC is used, it should be configured as the 1st CP.
1011     // In Master, the table operations like create, are handled by a Thread pool but the max size
1012     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
1013     // sequentially only.
1014     // Related code in HMaster#startServiceThreads
1015     // {code}
1016     //   // We depend on there being only one instance of this executor running
1017     //   // at a time. To do concurrency, would need fencing of enable/disable of
1018     //   // tables.
1019     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1020     // {code}
1021     // In future if we change this pool to have more threads, then there is a chance for thread,
1022     // creating acl table, getting delayed and by that time another table creation got over and
1023     // this hook is getting called. In such a case, we will need a wait logic here which will
1024     // wait till the acl table is created.
1025     if (AccessControlLists.isAclTable(desc)) {
1026       this.aclTabAvailable = true;
1027     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
1028       if (!aclTabAvailable) {
1029         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
1030             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
1031             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
1032       } else {
1033         String owner = desc.getOwnerString();
1034         // default the table owner to current user, if not specified.
1035         if (owner == null)
1036           owner = getActiveUser().getShortName();
1037         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1038             desc.getTableName(), null, Action.values());
1039         // switch to the real hbase master user for doing the RPC on the ACL table
1040         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1041           @Override
1042           public Void run() throws Exception {
1043             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
1044                 userperm);
1045             return null;
1046           }
1047         });
1048       }
1049     }
1050   }
1051 
1052   @Override
1053   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1054       throws IOException {
1055     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1056   }
1057 
1058   @Override
1059   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
1060       final TableName tableName) throws IOException {
1061     final Configuration conf = c.getEnvironment().getConfiguration();
1062     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1063       @Override
1064       public Void run() throws Exception {
1065         AccessControlLists.removeTablePermissions(conf, tableName);
1066         return null;
1067       }
1068     });
1069     this.authManager.getZKPermissionWatcher().deleteTableACLNode(tableName);
1070   }
1071 
1072   @Override
1073   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
1074       final TableName tableName) throws IOException {
1075     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1076 
1077     final Configuration conf = c.getEnvironment().getConfiguration();
1078     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1079       @Override
1080       public Void run() throws Exception {
1081         List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
1082         if (acls != null) {
1083           tableAcls.put(tableName, acls);
1084         }
1085         return null;
1086       }
1087     });
1088   }
1089 
1090   @Override
1091   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
1092       final TableName tableName) throws IOException {
1093     final Configuration conf = ctx.getEnvironment().getConfiguration();
1094     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1095       @Override
1096       public Void run() throws Exception {
1097         List<UserPermission> perms = tableAcls.get(tableName);
1098         if (perms != null) {
1099           for (UserPermission perm : perms) {
1100             AccessControlLists.addUserPermission(conf, perm);
1101           }
1102         }
1103         tableAcls.remove(tableName);
1104         return null;
1105       }
1106     });
1107   }
1108 
1109   @Override
1110   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1111       HTableDescriptor htd) throws IOException {
1112     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1113   }
1114 
1115   @Override
1116   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
1117       TableName tableName, final HTableDescriptor htd) throws IOException {
1118     final Configuration conf = c.getEnvironment().getConfiguration();
1119     // default the table owner to current user, if not specified.
1120     final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
1121       getActiveUser().getShortName();
1122     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1123       @Override
1124       public Void run() throws Exception {
1125         UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1126             htd.getTableName(), null, Action.values());
1127         AccessControlLists.addUserPermission(conf, userperm);
1128         return null;
1129       }
1130     });
1131   }
1132 
1133   @Override
1134   public void preAddColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1135                                  TableName tableName, HColumnDescriptor columnFamily)
1136       throws IOException {
1137     requireTablePermission("addColumn", tableName, columnFamily.getName(), null, Action.ADMIN,
1138                            Action.CREATE);
1139   }
1140 
1141   @Override
1142   public void preModifyColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1143                                     TableName tableName, HColumnDescriptor columnFamily)
1144       throws IOException {
1145     requirePermission("modifyColumn", tableName, columnFamily.getName(), null, Action.ADMIN,
1146       Action.CREATE);
1147   }
1148 
1149   @Override
1150   public void preDeleteColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1151                                     TableName tableName, byte[] columnFamily) throws IOException {
1152     requirePermission("deleteColumn", tableName, columnFamily, null, Action.ADMIN, Action.CREATE);
1153   }
1154 
1155   @Override
1156   public void postDeleteColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1157       final TableName tableName, final byte[] columnFamily) throws IOException {
1158     final Configuration conf = ctx.getEnvironment().getConfiguration();
1159     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1160       @Override
1161       public Void run() throws Exception {
1162         AccessControlLists.removeTablePermissions(conf, tableName, columnFamily);
1163         return null;
1164       }
1165     });
1166   }
1167 
1168   @Override
1169   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1170       throws IOException {
1171     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1172   }
1173 
1174   @Override
1175   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1176       throws IOException {
1177     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1178       // We have to unconditionally disallow disable of the ACL table when we are installed,
1179       // even if not enforcing authorizations. We are still allowing grants and revocations,
1180       // checking permissions and logging audit messages, etc. If the ACL table is not
1181       // available we will fail random actions all over the place.
1182       throw new AccessDeniedException("Not allowed to disable "
1183           + AccessControlLists.ACL_TABLE_NAME + " table with AccessController installed");
1184     }
1185     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1186   }
1187 
1188   @Override
1189   public void preAbortProcedure(
1190       ObserverContext<MasterCoprocessorEnvironment> ctx,
1191       final ProcedureExecutor<MasterProcedureEnv> procEnv,
1192       final long procId) throws IOException {
1193     if (!procEnv.isProcedureOwner(procId, getActiveUser())) {
1194       // If the user is not the procedure owner, then we should further probe whether
1195       // he can abort the procedure.
1196       requirePermission("abortProcedure", Action.ADMIN);
1197     }
1198   }
1199 
1200   @Override
1201   public void postAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx)
1202       throws IOException {
1203     // There is nothing to do at this time after the procedure abort request was sent.
1204   }
1205 
1206   @Override
1207   public void preListProcedures(ObserverContext<MasterCoprocessorEnvironment> ctx)
1208       throws IOException {
1209     // We are delegating the authorization check to postListProcedures as we don't have
1210     // any concrete set of procedures to work with
1211   }
1212 
1213   @Override
1214   public void postListProcedures(
1215       ObserverContext<MasterCoprocessorEnvironment> ctx,
1216       List<ProcedureInfo> procInfoList) throws IOException {
1217     if (procInfoList.isEmpty()) {
1218       return;
1219     }
1220 
1221     // Retains only those which passes authorization checks, as the checks weren't done as part
1222     // of preListProcedures.
1223     Iterator<ProcedureInfo> itr = procInfoList.iterator();
1224     User user = getActiveUser();
1225     while (itr.hasNext()) {
1226       ProcedureInfo procInfo = itr.next();
1227       try {
1228         if (!ProcedureInfo.isProcedureOwner(procInfo, user)) {
1229           // If the user is not the procedure owner, then we should further probe whether
1230           // he can see the procedure.
1231           requirePermission("listProcedures", Action.ADMIN);
1232         }
1233       } catch (AccessDeniedException e) {
1234         itr.remove();
1235       }
1236     }
1237   }
1238 
1239   @Override
1240   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1241       ServerName srcServer, ServerName destServer) throws IOException {
1242     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1243   }
1244 
1245   @Override
1246   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1247       throws IOException {
1248     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1249   }
1250 
1251   @Override
1252   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1253       boolean force) throws IOException {
1254     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1255   }
1256 
1257   @Override
1258   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1259       HRegionInfo regionInfo) throws IOException {
1260     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1261   }
1262 
1263   @Override
1264   public boolean preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1265       final boolean newValue, final Admin.MasterSwitchType switchType) throws IOException {
1266     requirePermission("setSplitOrMergeEnabled", Action.ADMIN);
1267     return false;
1268   }
1269 
1270   @Override
1271   public void postSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1272       final boolean newValue, final Admin.MasterSwitchType switchType) throws IOException {
1273   }
1274 
1275   @Override
1276   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1277       throws IOException {
1278     requirePermission("balance", Action.ADMIN);
1279   }
1280 
1281   @Override
1282   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1283       boolean newValue) throws IOException {
1284     requirePermission("balanceSwitch", Action.ADMIN);
1285     return newValue;
1286   }
1287 
1288   @Override
1289   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1290       throws IOException {
1291     requirePermission("shutdown", Action.ADMIN);
1292   }
1293 
1294   @Override
1295   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1296       throws IOException {
1297     requirePermission("stopMaster", Action.ADMIN);
1298   }
1299 
1300   @Override
1301   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1302       throws IOException {
1303     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1304       .getConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1305       // initialize the ACL storage table
1306       AccessControlLists.createACLTable(ctx.getEnvironment().getMasterServices());
1307     } else {
1308       aclTabAvailable = true;
1309     }
1310   }
1311 
1312   @Override
1313   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1314       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1315       throws IOException {
1316     requirePermission("snapshot", hTableDescriptor.getTableName(), null, null,
1317       Permission.Action.ADMIN);
1318   }
1319 
1320   @Override
1321   public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1322       final SnapshotDescription snapshot) throws IOException {
1323     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1324       // list it, if user is the owner of snapshot
1325       // TODO: We are not logging this for audit
1326     } else {
1327       requirePermission("listSnapshot", Action.ADMIN);
1328     }
1329   }
1330 
1331   @Override
1332   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1333       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1334       throws IOException {
1335     requirePermission("clone", Action.ADMIN);
1336   }
1337 
1338   @Override
1339   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1340       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1341       throws IOException {
1342     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1343       requirePermission("restoreSnapshot", hTableDescriptor.getTableName(), null, null,
1344         Permission.Action.ADMIN);
1345     } else {
1346       requirePermission("restoreSnapshot", Action.ADMIN);
1347     }
1348   }
1349 
1350   @Override
1351   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1352       final SnapshotDescription snapshot) throws IOException {
1353     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1354       // Snapshot owner is allowed to delete the snapshot
1355       // TODO: We are not logging this for audit
1356     } else {
1357       requirePermission("deleteSnapshot", Action.ADMIN);
1358     }
1359   }
1360 
1361   @Override
1362   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1363       NamespaceDescriptor ns) throws IOException {
1364     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1365   }
1366 
1367   @Override
1368   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1369       throws IOException {
1370     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1371   }
1372 
1373   @Override
1374   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1375       final String namespace) throws IOException {
1376     final Configuration conf = ctx.getEnvironment().getConfiguration();
1377     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1378       @Override
1379       public Void run() throws Exception {
1380         AccessControlLists.removeNamespacePermissions(conf, namespace);
1381         return null;
1382       }
1383     });
1384     this.authManager.getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
1385     LOG.info(namespace + " entry deleted in " + AccessControlLists.ACL_TABLE_NAME + " table.");
1386   }
1387 
1388   @Override
1389   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1390       NamespaceDescriptor ns) throws IOException {
1391     // We require only global permission so that
1392     // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1393     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1394   }
1395 
1396   @Override
1397   public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1398       throws IOException {
1399     requireNamespacePermission("getNamespaceDescriptor", namespace, Action.ADMIN);
1400   }
1401 
1402   @Override
1403   public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1404       List<NamespaceDescriptor> descriptors) throws IOException {
1405     // Retains only those which passes authorization checks, as the checks weren't done as part
1406     // of preGetTableDescriptors.
1407     Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1408     while (itr.hasNext()) {
1409       NamespaceDescriptor desc = itr.next();
1410       try {
1411         requireNamespacePermission("listNamespaces", desc.getName(), Action.ADMIN);
1412       } catch (AccessDeniedException e) {
1413         itr.remove();
1414       }
1415     }
1416   }
1417 
1418   @Override
1419   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1420       final TableName tableName) throws IOException {
1421     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1422   }
1423 
1424   /* ---- RegionObserver implementation ---- */
1425 
1426   @Override
1427   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1428       throws IOException {
1429     RegionCoprocessorEnvironment env = e.getEnvironment();
1430     final Region region = env.getRegion();
1431     if (region == null) {
1432       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1433     } else {
1434       HRegionInfo regionInfo = region.getRegionInfo();
1435       if (regionInfo.getTable().isSystemTable()) {
1436         checkSystemOrSuperUser();
1437       } else {
1438         requirePermission("preOpen", Action.ADMIN);
1439       }
1440     }
1441   }
1442 
1443   @Override
1444   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1445     RegionCoprocessorEnvironment env = c.getEnvironment();
1446     final Region region = env.getRegion();
1447     if (region == null) {
1448       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1449       return;
1450     }
1451     if (AccessControlLists.isAclRegion(region)) {
1452       aclRegion = true;
1453       // When this region is under recovering state, initialize will be handled by postLogReplay
1454       if (!region.isRecovering()) {
1455         try {
1456           initialize(env);
1457         } catch (IOException ex) {
1458           // if we can't obtain permissions, it's better to fail
1459           // than perform checks incorrectly
1460           throw new RuntimeException("Failed to initialize permissions cache", ex);
1461         }
1462       }
1463     } else {
1464       initialized = true;
1465     }
1466   }
1467 
1468   @Override
1469   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1470     if (aclRegion) {
1471       try {
1472         initialize(c.getEnvironment());
1473       } catch (IOException ex) {
1474         // if we can't obtain permissions, it's better to fail
1475         // than perform checks incorrectly
1476         throw new RuntimeException("Failed to initialize permissions cache", ex);
1477       }
1478     }
1479   }
1480 
1481   @Override
1482   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1483     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1484         Action.CREATE);
1485   }
1486 
1487   @Override
1488   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1489     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1490   }
1491 
1492   @Override
1493   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1494       byte[] splitRow) throws IOException {
1495     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1496   }
1497 
1498   @Override
1499   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1500       final Store store, final InternalScanner scanner, final ScanType scanType)
1501           throws IOException {
1502     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1503         Action.CREATE);
1504     return scanner;
1505   }
1506 
1507   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1508       final Query query, OpType opType) throws IOException {
1509     Filter filter = query.getFilter();
1510     // Don't wrap an AccessControlFilter
1511     if (filter != null && filter instanceof AccessControlFilter) {
1512       return;
1513     }
1514     User user = getActiveUser();
1515     RegionCoprocessorEnvironment env = c.getEnvironment();
1516     Map<byte[],? extends Collection<byte[]>> families = null;
1517     switch (opType) {
1518     case GET:
1519     case EXISTS:
1520       families = ((Get)query).getFamilyMap();
1521       break;
1522     case SCAN:
1523       families = ((Scan)query).getFamilyMap();
1524       break;
1525     default:
1526       throw new RuntimeException("Unhandled operation " + opType);
1527     }
1528     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1529     Region region = getRegion(env);
1530     TableName table = getTableName(region);
1531     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1532     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1533       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1534     }
1535     if (!authResult.isAllowed()) {
1536       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1537         // Old behavior: Scan with only qualifier checks if we have partial
1538         // permission. Backwards compatible behavior is to throw an
1539         // AccessDeniedException immediately if there are no grants for table
1540         // or CF or CF+qual. Only proceed with an injected filter if there are
1541         // grants for qualifiers. Otherwise we will fall through below and log
1542         // the result and throw an ADE. We may end up checking qualifier
1543         // grants three times (permissionGranted above, here, and in the
1544         // filter) but that's the price of backwards compatibility.
1545         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1546           authResult.setAllowed(true);
1547           authResult.setReason("Access allowed with filter");
1548           // Only wrap the filter if we are enforcing authorizations
1549           if (authorizationEnabled) {
1550             Filter ourFilter = new AccessControlFilter(authManager, user, table,
1551               AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1552               cfVsMaxVersions);
1553             // wrap any existing filter
1554             if (filter != null) {
1555               ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1556                 Lists.newArrayList(ourFilter, filter));
1557             }
1558             switch (opType) {
1559               case GET:
1560               case EXISTS:
1561                 ((Get)query).setFilter(ourFilter);
1562                 break;
1563               case SCAN:
1564                 ((Scan)query).setFilter(ourFilter);
1565                 break;
1566               default:
1567                 throw new RuntimeException("Unhandled operation " + opType);
1568             }
1569           }
1570         }
1571       } else {
1572         // New behavior: Any access we might be granted is more fine-grained
1573         // than whole table or CF. Simply inject a filter and return what is
1574         // allowed. We will not throw an AccessDeniedException. This is a
1575         // behavioral change since 0.96.
1576         authResult.setAllowed(true);
1577         authResult.setReason("Access allowed with filter");
1578         // Only wrap the filter if we are enforcing authorizations
1579         if (authorizationEnabled) {
1580           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1581             AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1582           // wrap any existing filter
1583           if (filter != null) {
1584             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1585               Lists.newArrayList(ourFilter, filter));
1586           }
1587           switch (opType) {
1588             case GET:
1589             case EXISTS:
1590               ((Get)query).setFilter(ourFilter);
1591               break;
1592             case SCAN:
1593               ((Scan)query).setFilter(ourFilter);
1594               break;
1595             default:
1596               throw new RuntimeException("Unhandled operation " + opType);
1597           }
1598         }
1599       }
1600     }
1601 
1602     logResult(authResult);
1603     if (authorizationEnabled && !authResult.isAllowed()) {
1604       throw new AccessDeniedException("Insufficient permissions for user '"
1605           + (user != null ? user.getShortName() : "null")
1606           + "' (table=" + table + ", action=READ)");
1607     }
1608   }
1609 
1610   @Override
1611   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1612       final Get get, final List<Cell> result) throws IOException {
1613     internalPreRead(c, get, OpType.GET);
1614   }
1615 
1616   @Override
1617   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1618       final Get get, final boolean exists) throws IOException {
1619     internalPreRead(c, get, OpType.EXISTS);
1620     return exists;
1621   }
1622 
1623   @Override
1624   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1625       final Put put, final WALEdit edit, final Durability durability)
1626       throws IOException {
1627     User user = getActiveUser();
1628     checkForReservedTagPresence(user, put);
1629 
1630     // Require WRITE permission to the table, CF, or top visible value, if any.
1631     // NOTE: We don't need to check the permissions for any earlier Puts
1632     // because we treat the ACLs in each Put as timestamped like any other
1633     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1634     // change the ACL of any previous Put. This allows simple evolution of
1635     // security policy over time without requiring expensive updates.
1636     RegionCoprocessorEnvironment env = c.getEnvironment();
1637     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1638     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1639     logResult(authResult);
1640     if (!authResult.isAllowed()) {
1641       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1642         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1643       } else if (authorizationEnabled) {
1644         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1645       }
1646     }
1647 
1648     // Add cell ACLs from the operation to the cells themselves
1649     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1650     if (bytes != null) {
1651       if (cellFeaturesEnabled) {
1652         addCellPermissions(bytes, put.getFamilyCellMap());
1653       } else {
1654         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1655       }
1656     }
1657   }
1658 
1659   @Override
1660   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1661       final Put put, final WALEdit edit, final Durability durability) {
1662     if (aclRegion) {
1663       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1664     }
1665   }
1666 
1667   @Override
1668   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1669       final Delete delete, final WALEdit edit, final Durability durability)
1670       throws IOException {
1671     // An ACL on a delete is useless, we shouldn't allow it
1672     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1673       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1674     }
1675     // Require WRITE permissions on all cells covered by the delete. Unlike
1676     // for Puts we need to check all visible prior versions, because a major
1677     // compaction could remove them. If the user doesn't have permission to
1678     // overwrite any of the visible versions ('visible' defined as not covered
1679     // by a tombstone already) then we have to disallow this operation.
1680     RegionCoprocessorEnvironment env = c.getEnvironment();
1681     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1682     User user = getActiveUser();
1683     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1684     logResult(authResult);
1685     if (!authResult.isAllowed()) {
1686       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1687         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1688       } else if (authorizationEnabled) {
1689         throw new AccessDeniedException("Insufficient permissions " +
1690           authResult.toContextString());
1691       }
1692     }
1693   }
1694 
1695   @Override
1696   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1697       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1698     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1699       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1700       for (int i = 0; i < miniBatchOp.size(); i++) {
1701         Mutation m = miniBatchOp.getOperation(i);
1702         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1703           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1704           // perm check
1705           OpType opType;
1706           if (m instanceof Put) {
1707             checkForReservedTagPresence(getActiveUser(), m);
1708             opType = OpType.PUT;
1709           } else {
1710             opType = OpType.DELETE;
1711           }
1712           AuthResult authResult = null;
1713           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(),
1714             m.getFamilyCellMap(), m.getTimeStamp(), Action.WRITE)) {
1715             authResult = AuthResult.allow(opType.toString(), "Covering cell set",
1716               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1717           } else {
1718             authResult = AuthResult.deny(opType.toString(), "Covering cell set",
1719               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1720           }
1721           logResult(authResult);
1722           if (authorizationEnabled && !authResult.isAllowed()) {
1723             throw new AccessDeniedException("Insufficient permissions "
1724               + authResult.toContextString());
1725           }
1726         }
1727       }
1728     }
1729   }
1730 
1731   @Override
1732   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1733       final Delete delete, final WALEdit edit, final Durability durability)
1734       throws IOException {
1735     if (aclRegion) {
1736       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1737     }
1738   }
1739 
1740   @Override
1741   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1742       final byte [] row, final byte [] family, final byte [] qualifier,
1743       final CompareFilter.CompareOp compareOp,
1744       final ByteArrayComparable comparator, final Put put,
1745       final boolean result) throws IOException {
1746     User user = getActiveUser();
1747     checkForReservedTagPresence(user, put);
1748 
1749     // Require READ and WRITE permissions on the table, CF, and KV to update
1750     RegionCoprocessorEnvironment env = c.getEnvironment();
1751     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1752     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1753       Action.READ, Action.WRITE);
1754     logResult(authResult);
1755     if (!authResult.isAllowed()) {
1756       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1757         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1758       } else if (authorizationEnabled) {
1759         throw new AccessDeniedException("Insufficient permissions " +
1760           authResult.toContextString());
1761       }
1762     }
1763 
1764     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1765     if (bytes != null) {
1766       if (cellFeaturesEnabled) {
1767         addCellPermissions(bytes, put.getFamilyCellMap());
1768       } else {
1769         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1770       }
1771     }
1772     return result;
1773   }
1774 
1775   @Override
1776   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1777       final byte[] row, final byte[] family, final byte[] qualifier,
1778       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1779       final boolean result) throws IOException {
1780     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1781       // We had failure with table, cf and q perm checks and now giving a chance for cell
1782       // perm check
1783       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1784       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1785       AuthResult authResult = null;
1786       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1787           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1788         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1789             getActiveUser(), Action.READ, table, families);
1790       } else {
1791         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1792             getActiveUser(), Action.READ, table, families);
1793       }
1794       logResult(authResult);
1795       if (authorizationEnabled && !authResult.isAllowed()) {
1796         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1797       }
1798     }
1799     return result;
1800   }
1801 
1802   @Override
1803   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1804       final byte [] row, final byte [] family, final byte [] qualifier,
1805       final CompareFilter.CompareOp compareOp,
1806       final ByteArrayComparable comparator, final Delete delete,
1807       final boolean result) throws IOException {
1808     // An ACL on a delete is useless, we shouldn't allow it
1809     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1810       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1811           delete.toString());
1812     }
1813     // Require READ and WRITE permissions on the table, CF, and the KV covered
1814     // by the delete
1815     RegionCoprocessorEnvironment env = c.getEnvironment();
1816     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1817     User user = getActiveUser();
1818     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1819         Action.READ, Action.WRITE);
1820     logResult(authResult);
1821     if (!authResult.isAllowed()) {
1822       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1823         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1824       } else if (authorizationEnabled) {
1825         throw new AccessDeniedException("Insufficient permissions " +
1826           authResult.toContextString());
1827       }
1828     }
1829     return result;
1830   }
1831 
1832   @Override
1833   public boolean preCheckAndDeleteAfterRowLock(
1834       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1835       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1836       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1837       throws IOException {
1838     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1839       // We had failure with table, cf and q perm checks and now giving a chance for cell
1840       // perm check
1841       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1842       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1843       AuthResult authResult = null;
1844       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1845           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1846         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1847             getActiveUser(), Action.READ, table, families);
1848       } else {
1849         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1850             getActiveUser(), Action.READ, table, families);
1851       }
1852       logResult(authResult);
1853       if (authorizationEnabled && !authResult.isAllowed()) {
1854         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1855       }
1856     }
1857     return result;
1858   }
1859 
1860   @Override
1861   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1862       final byte [] row, final byte [] family, final byte [] qualifier,
1863       final long amount, final boolean writeToWAL)
1864       throws IOException {
1865     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1866     // incremented value
1867     RegionCoprocessorEnvironment env = c.getEnvironment();
1868     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1869     User user = getActiveUser();
1870     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1871         Action.WRITE);
1872     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1873       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1874         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1875       authResult.setReason("Covering cell set");
1876     }
1877     logResult(authResult);
1878     if (authorizationEnabled && !authResult.isAllowed()) {
1879       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1880     }
1881     return -1;
1882   }
1883 
1884   @Override
1885   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1886       throws IOException {
1887     User user = getActiveUser();
1888     checkForReservedTagPresence(user, append);
1889 
1890     // Require WRITE permission to the table, CF, and the KV to be appended
1891     RegionCoprocessorEnvironment env = c.getEnvironment();
1892     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1893     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1894     logResult(authResult);
1895     if (!authResult.isAllowed()) {
1896       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1897         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1898       } else if (authorizationEnabled)  {
1899         throw new AccessDeniedException("Insufficient permissions " +
1900           authResult.toContextString());
1901       }
1902     }
1903 
1904     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1905     if (bytes != null) {
1906       if (cellFeaturesEnabled) {
1907         addCellPermissions(bytes, append.getFamilyCellMap());
1908       } else {
1909         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1910       }
1911     }
1912 
1913     return null;
1914   }
1915 
1916   @Override
1917   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1918       final Append append) throws IOException {
1919     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1920       // We had failure with table, cf and q perm checks and now giving a chance for cell
1921       // perm check
1922       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1923       AuthResult authResult = null;
1924       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1925           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1926         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1927             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1928       } else {
1929         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1930             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1931       }
1932       logResult(authResult);
1933       if (authorizationEnabled && !authResult.isAllowed()) {
1934         throw new AccessDeniedException("Insufficient permissions " +
1935           authResult.toContextString());
1936       }
1937     }
1938     return null;
1939   }
1940 
1941   @Override
1942   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1943       final Increment increment)
1944       throws IOException {
1945     User user = getActiveUser();
1946     checkForReservedTagPresence(user, increment);
1947 
1948     // Require WRITE permission to the table, CF, and the KV to be replaced by
1949     // the incremented value
1950     RegionCoprocessorEnvironment env = c.getEnvironment();
1951     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1952     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1953       Action.WRITE);
1954     logResult(authResult);
1955     if (!authResult.isAllowed()) {
1956       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1957         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1958       } else if (authorizationEnabled) {
1959         throw new AccessDeniedException("Insufficient permissions " +
1960           authResult.toContextString());
1961       }
1962     }
1963 
1964     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1965     if (bytes != null) {
1966       if (cellFeaturesEnabled) {
1967         addCellPermissions(bytes, increment.getFamilyCellMap());
1968       } else {
1969         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1970       }
1971     }
1972 
1973     return null;
1974   }
1975 
1976   @Override
1977   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1978       final Increment increment) throws IOException {
1979     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1980       // We had failure with table, cf and q perm checks and now giving a chance for cell
1981       // perm check
1982       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1983       AuthResult authResult = null;
1984       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1985           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1986         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1987             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1988       } else {
1989         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
1990             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1991       }
1992       logResult(authResult);
1993       if (authorizationEnabled && !authResult.isAllowed()) {
1994         throw new AccessDeniedException("Insufficient permissions " +
1995           authResult.toContextString());
1996       }
1997     }
1998     return null;
1999   }
2000 
2001   @Override
2002   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
2003       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
2004     // If the HFile version is insufficient to persist tags, we won't have any
2005     // work to do here
2006     if (!cellFeaturesEnabled) {
2007       return newCell;
2008     }
2009 
2010     // Collect any ACLs from the old cell
2011     List<Tag> tags = Lists.newArrayList();
2012     List<Tag> aclTags = Lists.newArrayList();
2013     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
2014     if (oldCell != null) {
2015       Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell);
2016       while (tagIterator.hasNext()) {
2017         Tag tag = tagIterator.next();
2018         if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
2019           // Not an ACL tag, just carry it through
2020           if (LOG.isTraceEnabled()) {
2021             LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType()
2022                 + " length " + tag.getValueLength());
2023           }
2024           tags.add(tag);
2025         } else {
2026           aclTags.add(tag);
2027         }
2028       }
2029     }
2030 
2031     // Do we have an ACL on the operation?
2032     byte[] aclBytes = mutation.getACL();
2033     if (aclBytes != null) {
2034       // Yes, use it
2035       tags.add(new ArrayBackedTag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
2036     } else {
2037       // No, use what we carried forward
2038       if (perms != null) {
2039         // TODO: If we collected ACLs from more than one tag we may have a
2040         // List<Permission> of size > 1, this can be collapsed into a single
2041         // Permission
2042         if (LOG.isTraceEnabled()) {
2043           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
2044         }
2045         tags.addAll(aclTags);
2046       }
2047     }
2048 
2049     // If we have no tags to add, just return
2050     if (tags.isEmpty()) {
2051       return newCell;
2052     }
2053 
2054     Cell rewriteCell = new TagRewriteCell(newCell, TagUtil.fromList(tags));
2055     return rewriteCell;
2056   }
2057 
2058   @Override
2059   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2060       final Scan scan, final RegionScanner s) throws IOException {
2061     internalPreRead(c, scan, OpType.SCAN);
2062     return s;
2063   }
2064 
2065   @Override
2066   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2067       final Scan scan, final RegionScanner s) throws IOException {
2068     User user = getActiveUser();
2069     if (user != null && user.getShortName() != null) {
2070       // store reference to scanner owner for later checks
2071       scannerOwners.put(s, user.getShortName());
2072     }
2073     return s;
2074   }
2075 
2076   @Override
2077   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
2078       final InternalScanner s, final List<Result> result,
2079       final int limit, final boolean hasNext) throws IOException {
2080     requireScannerOwner(s);
2081     return hasNext;
2082   }
2083 
2084   @Override
2085   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2086       final InternalScanner s) throws IOException {
2087     requireScannerOwner(s);
2088   }
2089 
2090   @Override
2091   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2092       final InternalScanner s) throws IOException {
2093     // clean up any associated owner mapping
2094     scannerOwners.remove(s);
2095   }
2096 
2097   @Override
2098   public boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> e,
2099       final InternalScanner s, final Cell curRowCell, final boolean hasMore) throws IOException {
2100     // Impl in BaseRegionObserver might do unnecessary copy for Off heap backed Cells.
2101     return hasMore;
2102   }
2103 
2104   /**
2105    * Verify, when servicing an RPC, that the caller is the scanner owner.
2106    * If so, we assume that access control is correctly enforced based on
2107    * the checks performed in preScannerOpen()
2108    */
2109   private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
2110     if (!RpcServer.isInRpcCallContext())
2111       return;
2112     String requestUserName = RpcServer.getRequestUserName();
2113     String owner = scannerOwners.get(s);
2114     if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
2115       throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
2116     }
2117   }
2118 
2119   /**
2120    * Verifies user has CREATE privileges on
2121    * the Column Families involved in the bulkLoadHFile
2122    * request. Specific Column Write privileges are presently
2123    * ignored.
2124    */
2125   @Override
2126   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
2127       List<Pair<byte[], String>> familyPaths) throws IOException {
2128     for(Pair<byte[],String> el : familyPaths) {
2129       requirePermission("preBulkLoadHFile",
2130           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
2131           el.getFirst(),
2132           null,
2133           Action.CREATE);
2134     }
2135   }
2136 
2137   /**
2138    * Authorization check for
2139    * SecureBulkLoadProtocol.prepareBulkLoad()
2140    * @param ctx the context
2141    * @param request the request
2142    * @throws IOException
2143    */
2144   @Override
2145   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2146                                  PrepareBulkLoadRequest request) throws IOException {
2147     requireAccess("prePareBulkLoad",
2148         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2149   }
2150 
2151   /**
2152    * Authorization security check for
2153    * SecureBulkLoadProtocol.cleanupBulkLoad()
2154    * @param ctx the context
2155    * @param request the request
2156    * @throws IOException
2157    */
2158   @Override
2159   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2160                                  CleanupBulkLoadRequest request) throws IOException {
2161     requireAccess("preCleanupBulkLoad",
2162         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2163   }
2164 
2165   /* ---- EndpointObserver implementation ---- */
2166 
2167   @Override
2168   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2169       Service service, String methodName, Message request) throws IOException {
2170     // Don't intercept calls to our own AccessControlService, we check for
2171     // appropriate permissions in the service handlers
2172     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2173       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
2174         methodName + ")",
2175         getTableName(ctx.getEnvironment()), null, null,
2176         Action.EXEC);
2177     }
2178     return request;
2179   }
2180 
2181   @Override
2182   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2183       Service service, String methodName, Message request, Message.Builder responseBuilder)
2184       throws IOException { }
2185 
2186   /* ---- Protobuf AccessControlService implementation ---- */
2187 
2188   @Override
2189   public void grant(RpcController controller,
2190                     AccessControlProtos.GrantRequest request,
2191                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2192     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2193     AccessControlProtos.GrantResponse response = null;
2194     try {
2195       // verify it's only running at .acl.
2196       if (aclRegion) {
2197         if (!initialized) {
2198           throw new CoprocessorException("AccessController not yet initialized");
2199         }
2200         if (LOG.isDebugEnabled()) {
2201           LOG.debug("Received request to grant access permission " + perm.toString());
2202         }
2203 
2204         switch(request.getUserPermission().getPermission().getType()) {
2205           case Global :
2206           case Table :
2207             requirePermission("grant", perm.getTableName(), perm.getFamily(),
2208               perm.getQualifier(), Action.ADMIN);
2209             break;
2210           case Namespace :
2211             requireNamespacePermission("grant", perm.getNamespace(), Action.ADMIN);
2212            break;
2213         }
2214 
2215         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2216           @Override
2217           public Void run() throws Exception {
2218             AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2219             return null;
2220           }
2221         });
2222 
2223         if (AUDITLOG.isTraceEnabled()) {
2224           // audit log should store permission changes in addition to auth results
2225           AUDITLOG.trace("Granted permission " + perm.toString());
2226         }
2227       } else {
2228         throw new CoprocessorException(AccessController.class, "This method "
2229             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2230       }
2231       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2232     } catch (IOException ioe) {
2233       // pass exception back up
2234       ResponseConverter.setControllerException(controller, ioe);
2235     }
2236     done.run(response);
2237   }
2238 
2239   @Override
2240   public void revoke(RpcController controller,
2241                      AccessControlProtos.RevokeRequest request,
2242                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2243     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2244     AccessControlProtos.RevokeResponse response = null;
2245     try {
2246       // only allowed to be called on _acl_ region
2247       if (aclRegion) {
2248         if (!initialized) {
2249           throw new CoprocessorException("AccessController not yet initialized");
2250         }
2251         if (LOG.isDebugEnabled()) {
2252           LOG.debug("Received request to revoke access permission " + perm.toString());
2253         }
2254 
2255         switch(request.getUserPermission().getPermission().getType()) {
2256           case Global :
2257           case Table :
2258             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2259               perm.getQualifier(), Action.ADMIN);
2260             break;
2261           case Namespace :
2262             requireNamespacePermission("revoke", perm.getNamespace(), Action.ADMIN);
2263             break;
2264         }
2265 
2266         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2267           @Override
2268           public Void run() throws Exception {
2269             AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2270             return null;
2271           }
2272         });
2273 
2274         if (AUDITLOG.isTraceEnabled()) {
2275           // audit log should record all permission changes
2276           AUDITLOG.trace("Revoked permission " + perm.toString());
2277         }
2278       } else {
2279         throw new CoprocessorException(AccessController.class, "This method "
2280             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2281       }
2282       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2283     } catch (IOException ioe) {
2284       // pass exception back up
2285       ResponseConverter.setControllerException(controller, ioe);
2286     }
2287     done.run(response);
2288   }
2289 
2290   @Override
2291   public void getUserPermissions(RpcController controller,
2292                                  AccessControlProtos.GetUserPermissionsRequest request,
2293                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2294     AccessControlProtos.GetUserPermissionsResponse response = null;
2295     try {
2296       // only allowed to be called on _acl_ region
2297       if (aclRegion) {
2298         if (!initialized) {
2299           throw new CoprocessorException("AccessController not yet initialized");
2300         }
2301         List<UserPermission> perms = null;
2302         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2303           final TableName table = request.hasTableName() ?
2304             ProtobufUtil.toTableName(request.getTableName()) : null;
2305           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2306           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2307             @Override
2308             public List<UserPermission> run() throws Exception {
2309               return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2310             }
2311           });
2312         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2313           final String namespace = request.getNamespaceName().toStringUtf8();
2314           requireNamespacePermission("userPermissions", namespace, Action.ADMIN);
2315           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2316             @Override
2317             public List<UserPermission> run() throws Exception {
2318               return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2319                 namespace);
2320             }
2321           });
2322         } else {
2323           requirePermission("userPermissions", Action.ADMIN);
2324           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2325             @Override
2326             public List<UserPermission> run() throws Exception {
2327               return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2328             }
2329           });
2330           // Adding superusers explicitly to the result set as AccessControlLists do not store them.
2331           // Also using acl as table name to be inline  with the results of global admin and will
2332           // help in avoiding any leakage of information about being superusers.
2333           for (String user: Superusers.getSuperUsers()) {
2334             perms.add(new UserPermission(user.getBytes(), AccessControlLists.ACL_TABLE_NAME, null,
2335                 Action.values()));
2336           }
2337         }
2338         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2339       } else {
2340         throw new CoprocessorException(AccessController.class, "This method "
2341             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2342       }
2343     } catch (IOException ioe) {
2344       // pass exception back up
2345       ResponseConverter.setControllerException(controller, ioe);
2346     }
2347     done.run(response);
2348   }
2349 
2350   @Override
2351   public void checkPermissions(RpcController controller,
2352                                AccessControlProtos.CheckPermissionsRequest request,
2353                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2354     Permission[] permissions = new Permission[request.getPermissionCount()];
2355     for (int i=0; i < request.getPermissionCount(); i++) {
2356       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2357     }
2358     AccessControlProtos.CheckPermissionsResponse response = null;
2359     try {
2360       User user = getActiveUser();
2361       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2362       for (Permission permission : permissions) {
2363         if (permission instanceof TablePermission) {
2364           // Check table permissions
2365 
2366           TablePermission tperm = (TablePermission) permission;
2367           for (Action action : permission.getActions()) {
2368             if (!tperm.getTableName().equals(tableName)) {
2369               throw new CoprocessorException(AccessController.class, String.format("This method "
2370                   + "can only execute at the table specified in TablePermission. " +
2371                   "Table of the region:%s , requested table:%s", tableName,
2372                   tperm.getTableName()));
2373             }
2374 
2375             Map<byte[], Set<byte[]>> familyMap =
2376                 new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2377             if (tperm.getFamily() != null) {
2378               if (tperm.getQualifier() != null) {
2379                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2380                 qualifiers.add(tperm.getQualifier());
2381                 familyMap.put(tperm.getFamily(), qualifiers);
2382               } else {
2383                 familyMap.put(tperm.getFamily(), null);
2384               }
2385             }
2386 
2387             AuthResult result = permissionGranted("checkPermissions", user, action, regionEnv,
2388               familyMap);
2389             logResult(result);
2390             if (!result.isAllowed()) {
2391               // Even if passive we need to throw an exception here, we support checking
2392               // effective permissions, so throw unconditionally
2393               throw new AccessDeniedException("Insufficient permissions (table=" + tableName +
2394                 (familyMap.size() > 0 ? ", family: " + result.toFamilyString() : "") +
2395                 ", action=" + action.toString() + ")");
2396             }
2397           }
2398 
2399         } else {
2400           // Check global permissions
2401 
2402           for (Action action : permission.getActions()) {
2403             AuthResult result;
2404             if (authManager.authorize(user, action)) {
2405               result = AuthResult.allow("checkPermissions", "Global action allowed", user,
2406                 action, null, null);
2407             } else {
2408               result = AuthResult.deny("checkPermissions", "Global action denied", user, action,
2409                 null, null);
2410             }
2411             logResult(result);
2412             if (!result.isAllowed()) {
2413               // Even if passive we need to throw an exception here, we support checking
2414               // effective permissions, so throw unconditionally
2415               throw new AccessDeniedException("Insufficient permissions (action=" +
2416                 action.toString() + ")");
2417             }
2418           }
2419         }
2420       }
2421       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2422     } catch (IOException ioe) {
2423       ResponseConverter.setControllerException(controller, ioe);
2424     }
2425     done.run(response);
2426   }
2427 
2428   @Override
2429   public Service getService() {
2430     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2431   }
2432 
2433   private Region getRegion(RegionCoprocessorEnvironment e) {
2434     return e.getRegion();
2435   }
2436 
2437   private TableName getTableName(RegionCoprocessorEnvironment e) {
2438     Region region = e.getRegion();
2439     if (region != null) {
2440       return getTableName(region);
2441     }
2442     return null;
2443   }
2444 
2445   private TableName getTableName(Region region) {
2446     HRegionInfo regionInfo = region.getRegionInfo();
2447     if (regionInfo != null) {
2448       return regionInfo.getTable();
2449     }
2450     return null;
2451   }
2452 
2453   @Override
2454   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2455       throws IOException {
2456     requirePermission("preClose", Action.ADMIN);
2457   }
2458 
2459   private void checkSystemOrSuperUser() throws IOException {
2460     // No need to check if we're not going to throw
2461     if (!authorizationEnabled) {
2462       return;
2463     }
2464     User activeUser = getActiveUser();
2465     if (!Superusers.isSuperUser(activeUser)) {
2466       throw new AccessDeniedException("User '" + (activeUser != null ?
2467         activeUser.getShortName() : "null") + "' is not system or super user.");
2468     }
2469   }
2470 
2471   @Override
2472   public void preStopRegionServer(
2473       ObserverContext<RegionServerCoprocessorEnvironment> env)
2474       throws IOException {
2475     requirePermission("preStopRegionServer", Action.ADMIN);
2476   }
2477 
2478   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2479       byte[] qualifier) {
2480     if (family == null) {
2481       return null;
2482     }
2483 
2484     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2485     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2486     return familyMap;
2487   }
2488 
2489   @Override
2490   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2491        List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2492        String regex) throws IOException {
2493     // We are delegating the authorization check to postGetTableDescriptors as we don't have
2494     // any concrete set of table names when a regex is present or the full list is requested.
2495     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2496       // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2497       // request can be granted.
2498       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2499       for (TableName tableName: tableNamesList) {
2500         // Skip checks for a table that does not exist
2501         if (!masterServices.getTableStateManager().isTablePresent(tableName))
2502           continue;
2503         requirePermission("getTableDescriptors", tableName, null, null,
2504             Action.ADMIN, Action.CREATE);
2505       }
2506     }
2507   }
2508 
2509   @Override
2510   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2511       List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2512       String regex) throws IOException {
2513     // Skipping as checks in this case are already done by preGetTableDescriptors.
2514     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2515       return;
2516     }
2517 
2518     // Retains only those which passes authorization checks, as the checks weren't done as part
2519     // of preGetTableDescriptors.
2520     Iterator<HTableDescriptor> itr = descriptors.iterator();
2521     while (itr.hasNext()) {
2522       HTableDescriptor htd = itr.next();
2523       try {
2524         requirePermission("getTableDescriptors", htd.getTableName(), null, null,
2525             Action.ADMIN, Action.CREATE);
2526       } catch (AccessDeniedException e) {
2527         itr.remove();
2528       }
2529     }
2530   }
2531 
2532   @Override
2533   public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2534       List<HTableDescriptor> descriptors, String regex) throws IOException {
2535     // Retains only those which passes authorization checks.
2536     Iterator<HTableDescriptor> itr = descriptors.iterator();
2537     while (itr.hasNext()) {
2538       HTableDescriptor htd = itr.next();
2539       try {
2540         requireAccess("getTableNames", htd.getTableName(), Action.values());
2541       } catch (AccessDeniedException e) {
2542         itr.remove();
2543       }
2544     }
2545   }
2546 
2547   @Override
2548   public void preDispatchMerge(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2549       HRegionInfo regionA, HRegionInfo regionB) throws IOException {
2550     requirePermission("mergeRegions", regionA.getTable(), null, null,
2551       Action.ADMIN);
2552   }
2553 
2554   @Override
2555   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA,
2556       Region regionB) throws IOException {
2557     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2558       Action.ADMIN);
2559   }
2560 
2561   @Override
2562   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, Region regionA,
2563       Region regionB, Region mergedRegion) throws IOException { }
2564 
2565   @Override
2566   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2567       Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException { }
2568 
2569   @Override
2570   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2571       Region regionA, Region regionB, Region mergedRegion) throws IOException { }
2572 
2573   @Override
2574   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2575       Region regionA, Region regionB) throws IOException { }
2576 
2577   @Override
2578   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2579       Region regionA, Region regionB) throws IOException { }
2580 
2581   @Override
2582   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2583       throws IOException {
2584     requirePermission("preRollLogWriterRequest", Permission.Action.ADMIN);
2585   }
2586 
2587   @Override
2588   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2589       throws IOException { }
2590 
2591   @Override
2592   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2593       final String userName, final Quotas quotas) throws IOException {
2594     requirePermission("setUserQuota", Action.ADMIN);
2595   }
2596 
2597   @Override
2598   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2599       final String userName, final TableName tableName, final Quotas quotas) throws IOException {
2600     requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN);
2601   }
2602 
2603   @Override
2604   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2605       final String userName, final String namespace, final Quotas quotas) throws IOException {
2606     requirePermission("setUserNamespaceQuota", Action.ADMIN);
2607   }
2608 
2609   @Override
2610   public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2611       final TableName tableName, final Quotas quotas) throws IOException {
2612     requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
2613   }
2614 
2615   @Override
2616   public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2617       final String namespace, final Quotas quotas) throws IOException {
2618     requirePermission("setNamespaceQuota", Action.ADMIN);
2619   }
2620 
2621   @Override
2622   public ReplicationEndpoint postCreateReplicationEndPoint(
2623       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2624     return endpoint;
2625   }
2626 
2627   @Override
2628   public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2629       List<WALEntry> entries, CellScanner cells) throws IOException {
2630     requirePermission("replicateLogEntries", Action.WRITE);
2631   }
2632 
2633   @Override
2634   public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2635       List<WALEntry> entries, CellScanner cells) throws IOException {
2636   }
2637 
2638   @Override
2639   public void preMoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
2640                              Set<HostAndPort> servers, String targetGroup) throws IOException {
2641     requirePermission("moveServers", Action.ADMIN);
2642   }
2643 
2644   @Override
2645   public void preMoveTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
2646                             Set<TableName> tables, String targetGroup) throws IOException {
2647     requirePermission("moveTables", Action.ADMIN);
2648   }
2649 
2650   @Override
2651   public void preAddRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2652                             String name) throws IOException {
2653     requirePermission("addRSGroup", Action.ADMIN);
2654   }
2655 
2656   @Override
2657   public void preRemoveRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2658                                String name) throws IOException {
2659     requirePermission("removeRSGroup", Action.ADMIN);
2660   }
2661 
2662   @Override
2663   public void preBalanceRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2664                                 String groupName) throws IOException {
2665     requirePermission("balanceRSGroup", Action.ADMIN);
2666   }
2667 }