View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.security.access;
20  
21  import com.google.common.net.HostAndPort;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.security.PrivilegedExceptionAction;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  import java.util.Set;
34  import java.util.TreeMap;
35  import java.util.TreeSet;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.ArrayBackedTag;
41  import org.apache.hadoop.hbase.Cell;
42  import org.apache.hadoop.hbase.CellScanner;
43  import org.apache.hadoop.hbase.CellUtil;
44  import org.apache.hadoop.hbase.CompoundConfiguration;
45  import org.apache.hadoop.hbase.CoprocessorEnvironment;
46  import org.apache.hadoop.hbase.DoNotRetryIOException;
47  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
48  import org.apache.hadoop.hbase.HColumnDescriptor;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.HTableDescriptor;
52  import org.apache.hadoop.hbase.KeyValue;
53  import org.apache.hadoop.hbase.KeyValue.Type;
54  import org.apache.hadoop.hbase.MetaTableAccessor;
55  import org.apache.hadoop.hbase.NamespaceDescriptor;
56  import org.apache.hadoop.hbase.ProcedureInfo;
57  import org.apache.hadoop.hbase.ServerName;
58  import org.apache.hadoop.hbase.TableName;
59  import org.apache.hadoop.hbase.Tag;
60  import org.apache.hadoop.hbase.TagRewriteCell;
61  import org.apache.hadoop.hbase.TagUtil;
62  import org.apache.hadoop.hbase.classification.InterfaceAudience;
63  import org.apache.hadoop.hbase.client.Append;
64  import org.apache.hadoop.hbase.client.Delete;
65  import org.apache.hadoop.hbase.client.Durability;
66  import org.apache.hadoop.hbase.client.Get;
67  import org.apache.hadoop.hbase.client.Increment;
68  import org.apache.hadoop.hbase.client.MasterSwitchType;
69  import org.apache.hadoop.hbase.client.Mutation;
70  import org.apache.hadoop.hbase.client.Put;
71  import org.apache.hadoop.hbase.client.Query;
72  import org.apache.hadoop.hbase.client.Result;
73  import org.apache.hadoop.hbase.client.Scan;
74  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
75  import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
76  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
77  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
78  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
79  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
80  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
81  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
82  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
83  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
84  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
85  import org.apache.hadoop.hbase.filter.CompareFilter;
86  import org.apache.hadoop.hbase.filter.Filter;
87  import org.apache.hadoop.hbase.filter.FilterList;
88  import org.apache.hadoop.hbase.io.hfile.HFile;
89  import org.apache.hadoop.hbase.ipc.RpcServer;
90  import org.apache.hadoop.hbase.master.MasterServices;
91  import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
92  import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
93  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
94  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
95  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
96  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
97  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
98  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
99  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
100 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
101 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
102 import org.apache.hadoop.hbase.regionserver.InternalScanner;
103 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
104 import org.apache.hadoop.hbase.regionserver.Region;
105 import org.apache.hadoop.hbase.regionserver.RegionScanner;
106 import org.apache.hadoop.hbase.regionserver.ScanType;
107 import org.apache.hadoop.hbase.regionserver.ScannerContext;
108 import org.apache.hadoop.hbase.regionserver.Store;
109 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
110 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
111 import org.apache.hadoop.hbase.security.AccessDeniedException;
112 import org.apache.hadoop.hbase.security.Superusers;
113 import org.apache.hadoop.hbase.security.User;
114 import org.apache.hadoop.hbase.security.UserProvider;
115 import org.apache.hadoop.hbase.security.access.Permission.Action;
116 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
117 import org.apache.hadoop.hbase.util.ByteRange;
118 import org.apache.hadoop.hbase.util.Bytes;
119 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
120 import org.apache.hadoop.hbase.util.Pair;
121 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
122 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
123 
124 import com.google.common.collect.ArrayListMultimap;
125 import com.google.common.collect.ImmutableSet;
126 import com.google.common.collect.ListMultimap;
127 import com.google.common.collect.Lists;
128 import com.google.common.collect.MapMaker;
129 import com.google.common.collect.Maps;
130 import com.google.common.collect.Sets;
131 import com.google.protobuf.Message;
132 import com.google.protobuf.RpcCallback;
133 import com.google.protobuf.RpcController;
134 import com.google.protobuf.Service;
135 
136 /**
137  * Provides basic authorization checks for data access and administrative
138  * operations.
139  *
140  * <p>
141  * {@code AccessController} performs authorization checks for HBase operations
142  * based on:
143  * </p>
144  * <ul>
145  *   <li>the identity of the user performing the operation</li>
146  *   <li>the scope over which the operation is performed, in increasing
147  *   specificity: global, table, column family, or qualifier</li>
148  *   <li>the type of action being performed (as mapped to
149  *   {@link Permission.Action} values)</li>
150  * </ul>
151  * <p>
152  * If the authorization check fails, an {@link AccessDeniedException}
153  * will be thrown for the operation.
154  * </p>
155  *
156  * <p>
157  * To perform authorization checks, {@code AccessController} relies on the
158  * RpcServerEngine being loaded to provide
159  * the user identities for remote requests.
160  * </p>
161  *
162  * <p>
163  * The access control lists used for authorization can be manipulated via the
164  * exposed {@link AccessControlService} Interface implementation, and the associated
165  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
166  * commands.
167  * </p>
168  */
169 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
170 public class AccessController extends BaseMasterAndRegionObserver
171     implements RegionServerObserver,
172       AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
173 
174   private static final Log LOG = LogFactory.getLog(AccessController.class);
175 
176   private static final Log AUDITLOG =
177     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
178   private static final String CHECK_COVERING_PERM = "check_covering_perm";
179   private static final String TAG_CHECK_PASSED = "tag_check_passed";
180   private static final byte[] TRUE = Bytes.toBytes(true);
181 
182   TableAuthManager authManager = null;
183 
184   /** flags if we are running on a region of the _acl_ table */
185   boolean aclRegion = false;
186 
187   /** defined only for Endpoint implementation, so it can have way to
188    access region services */
189   private RegionCoprocessorEnvironment regionEnv;
190 
191   /** Mapping of scanner instances to the user who created them */
192   private Map<InternalScanner,String> scannerOwners =
193       new MapMaker().weakKeys().makeMap();
194 
195   private Map<TableName, List<UserPermission>> tableAcls;
196 
197   /** Provider for mapping principal names to Users */
198   private UserProvider userProvider;
199 
200   /** if we are active, usually true, only not true if "hbase.security.authorization"
201    has been set to false in site configuration */
202   boolean authorizationEnabled;
203 
204   /** if we are able to support cell ACLs */
205   boolean cellFeaturesEnabled;
206 
207   /** if we should check EXEC permissions */
208   boolean shouldCheckExecPermission;
209 
210   /** if we should terminate access checks early as soon as table or CF grants
211     allow access; pre-0.98 compatible behavior */
212   boolean compatibleEarlyTermination;
213 
214   /** if we have been successfully initialized */
215   private volatile boolean initialized = false;
216 
217   /** if the ACL table is available, only relevant in the master */
218   private volatile boolean aclTabAvailable = false;
219 
220   public static boolean isAuthorizationSupported(Configuration conf) {
221     return conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
222   }
223 
224   public static boolean isCellAuthorizationSupported(Configuration conf) {
225     return isAuthorizationSupported(conf) &&
226         (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
227   }
228 
229   public Region getRegion() {
230     return regionEnv != null ? regionEnv.getRegion() : null;
231   }
232 
233   public TableAuthManager getAuthManager() {
234     return authManager;
235   }
236 
237   void initialize(RegionCoprocessorEnvironment e) throws IOException {
238     final Region region = e.getRegion();
239     Configuration conf = e.getConfiguration();
240     Map<byte[], ListMultimap<String,TablePermission>> tables =
241         AccessControlLists.loadAll(region);
242     // For each table, write out the table's permissions to the respective
243     // znode for that table.
244     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
245       tables.entrySet()) {
246       byte[] entry = t.getKey();
247       ListMultimap<String,TablePermission> perms = t.getValue();
248       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
249       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
250     }
251     initialized = true;
252   }
253 
254   /**
255    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
256    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
257    * table updates.
258    */
259   void updateACL(RegionCoprocessorEnvironment e,
260       final Map<byte[], List<Cell>> familyMap) {
261     Set<byte[]> entries =
262         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
263     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
264       List<Cell> cells = f.getValue();
265       for (Cell cell: cells) {
266         if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
267             cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
268             AccessControlLists.ACL_LIST_FAMILY.length)) {
269           entries.add(CellUtil.cloneRow(cell));
270         }
271       }
272     }
273     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
274     Configuration conf = regionEnv.getConfiguration();
275     for (byte[] entry: entries) {
276       try {
277         ListMultimap<String,TablePermission> perms =
278           AccessControlLists.getPermissions(conf, entry);
279         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
280         zkw.writeToZookeeper(entry, serialized);
281       } catch (IOException ex) {
282         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
283             ex);
284       }
285     }
286   }
287 
288   /**
289    * Check the current user for authorization to perform a specific action
290    * against the given set of row data.
291    *
292    * <p>Note: Ordering of the authorization checks
293    * has been carefully optimized to short-circuit the most common requests
294    * and minimize the amount of processing required.</p>
295    *
296    * @param permRequest the action being requested
297    * @param e the coprocessor environment
298    * @param families the map of column families to qualifiers present in
299    * the request
300    * @return an authorization result
301    */
302   AuthResult permissionGranted(String request, User user, Action permRequest,
303       RegionCoprocessorEnvironment e,
304       Map<byte [], ? extends Collection<?>> families) {
305     HRegionInfo hri = e.getRegion().getRegionInfo();
306     TableName tableName = hri.getTable();
307 
308     // 1. All users need read access to hbase:meta table.
309     // this is a very common operation, so deal with it quickly.
310     if (hri.isMetaRegion()) {
311       if (permRequest == Action.READ) {
312         return AuthResult.allow(request, "All users allowed", user,
313           permRequest, tableName, families);
314       }
315     }
316 
317     if (user == null) {
318       return AuthResult.deny(request, "No user associated with request!", null,
319         permRequest, tableName, families);
320     }
321 
322     // 2. check for the table-level, if successful we can short-circuit
323     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
324       return AuthResult.allow(request, "Table permission granted", user,
325         permRequest, tableName, families);
326     }
327 
328     // 3. check permissions against the requested families
329     if (families != null && families.size() > 0) {
330       // all families must pass
331       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
332         // a) check for family level access
333         if (authManager.authorize(user, tableName, family.getKey(),
334             permRequest)) {
335           continue;  // family-level permission overrides per-qualifier
336         }
337 
338         // b) qualifier level access can still succeed
339         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
340           if (family.getValue() instanceof Set) {
341             // for each qualifier of the family
342             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
343             for (byte[] qualifier : familySet) {
344               if (!authManager.authorize(user, tableName, family.getKey(),
345                                          qualifier, permRequest)) {
346                 return AuthResult.deny(request, "Failed qualifier check", user,
347                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
348               }
349             }
350           } else if (family.getValue() instanceof List) { // List<KeyValue>
351             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
352             for (KeyValue kv : kvList) {
353               if (!authManager.authorize(user, tableName, family.getKey(),
354                 CellUtil.cloneQualifier(kv), permRequest)) {
355                 return AuthResult.deny(request, "Failed qualifier check", user, permRequest,
356                   tableName, makeFamilyMap(family.getKey(), CellUtil.cloneQualifier(kv)));
357               }
358             }
359           }
360         } else {
361           // no qualifiers and family-level check already failed
362           return AuthResult.deny(request, "Failed family check", user, permRequest,
363               tableName, makeFamilyMap(family.getKey(), null));
364         }
365       }
366 
367       // all family checks passed
368       return AuthResult.allow(request, "All family checks passed", user, permRequest,
369           tableName, families);
370     }
371 
372     // 4. no families to check and table level access failed
373     return AuthResult.deny(request, "No families to check and table permission failed",
374         user, permRequest, tableName, families);
375   }
376 
377   /**
378    * Check the current user for authorization to perform a specific action
379    * against the given set of row data.
380    * @param opType the operation type
381    * @param user the user
382    * @param e the coprocessor environment
383    * @param families the map of column families to qualifiers present in
384    * the request
385    * @param actions the desired actions
386    * @return an authorization result
387    */
388   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
389       Map<byte [], ? extends Collection<?>> families, Action... actions) {
390     AuthResult result = null;
391     for (Action action: actions) {
392       result = permissionGranted(opType.toString(), user, action, e, families);
393       if (!result.isAllowed()) {
394         return result;
395       }
396     }
397     return result;
398   }
399 
400   private void logResult(AuthResult result) {
401     if (AUDITLOG.isTraceEnabled()) {
402       InetAddress remoteAddr = RpcServer.getRemoteAddress();
403       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
404           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
405           "; reason: " + result.getReason() +
406           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
407           "; request: " + result.getRequest() +
408           "; context: " + result.toContextString());
409     }
410   }
411 
412   /**
413    * Returns the active user to which authorization checks should be applied.
414    * If we are in the context of an RPC call, the remote user is used,
415    * otherwise the currently logged in user is used.
416    */
417   private User getActiveUser(ObserverContext ctx) throws IOException {
418     User user = ctx.getCaller();
419     if (user == null) {
420       // for non-rpc handling, fallback to system user
421       user = userProvider.getCurrent();
422     }
423     return user;
424   }
425 
426   /**
427    * Authorizes that the current user has any of the given permissions for the
428    * given table, column family and column qualifier.
429    * @param tableName Table requested
430    * @param family Column family requested
431    * @param qualifier Column qualifier requested
432    * @throws IOException if obtaining the current user fails
433    * @throws AccessDeniedException if user has no authorization
434    */
435   private void requirePermission(User user, String request, TableName tableName, byte[] family,
436       byte[] qualifier, Action... permissions) throws IOException {
437     AuthResult result = null;
438
439     for (Action permission : permissions) {
440       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
441         result = AuthResult.allow(request, "Table permission granted", user,
442                                   permission, tableName, family, qualifier);
443         break;
444       } else {
445         // rest of the world
446         result = AuthResult.deny(request, "Insufficient permissions", user,
447                                  permission, tableName, family, qualifier);
448       }
449     }
450     logResult(result);
451     if (authorizationEnabled && !result.isAllowed()) {
452       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
453     }
454   }
455
456   /**
457    * Authorizes that the current user has any of the given permissions for the
458    * given table, column family and column qualifier.
459    * @param tableName Table requested
460    * @param family Column family param
461    * @param qualifier Column qualifier param
462    * @throws IOException if obtaining the current user fails
463    * @throws AccessDeniedException if user has no authorization
464    */
465   private void requireTablePermission(User user, String request, TableName tableName, byte[] family,
466       byte[] qualifier, Action... permissions) throws IOException {
467     AuthResult result = null;
468
469     for (Action permission : permissions) {
470       if (authManager.authorize(user, tableName, null, null, permission)) {
471         result = AuthResult.allow(request, "Table permission granted", user,
472             permission, tableName, null, null);
473         result.getParams().setFamily(family).setQualifier(qualifier);
474         break;
475       } else {
476         // rest of the world
477         result = AuthResult.deny(request, "Insufficient permissions", user,
478             permission, tableName, family, qualifier);
479         result.getParams().setFamily(family).setQualifier(qualifier);
480       }
481     }
482     logResult(result);
483     if (authorizationEnabled && !result.isAllowed()) {
484       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
485     }
486   }
487
488   /**
489    * Authorizes that the current user has any of the given permissions to access the table.
490    *
491    * @param tableName Table requested
492    * @param permissions Actions being requested
493    * @throws IOException if obtaining the current user fails
494    * @throws AccessDeniedException if user has no authorization
495    */
496   private void requireAccess(User user, String request, TableName tableName,
497       Action... permissions) throws IOException {
498     AuthResult result = null;
499
500     for (Action permission : permissions) {
501       if (authManager.hasAccess(user, tableName, permission)) {
502         result = AuthResult.allow(request, "Table permission granted", user,
503                                   permission, tableName, null, null);
504         break;
505       } else {
506         // rest of the world
507         result = AuthResult.deny(request, "Insufficient permissions", user,
508                                  permission, tableName, null, null);
509       }
510     }
511     logResult(result);
512     if (authorizationEnabled && !result.isAllowed()) {
513       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
514     }
515   }
516
517   /**
518    * Authorizes that the current user has global privileges for the given action.
519    * @param perm The action being requested
520    * @throws IOException if obtaining the current user fails
521    * @throws AccessDeniedException if authorization is denied
522    */
523   private void requirePermission(User user, String request, Action perm) throws IOException {
524     requireGlobalPermission(user, request, perm, null, null);
525   }
526
527   /**
528    * Checks that the user has the given global permission. The generated
529    * audit log message will contain context information for the operation
530    * being authorized, based on the given parameters.
531    * @param perm Action being requested
532    * @param tableName Affected table name.
533    * @param familyMap Affected column families.
534    */
535   private void requireGlobalPermission(User user, String request, Action perm, TableName tableName,
536       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
537     AuthResult result = null;
538     if (authManager.authorize(user, perm)) {
539       result = AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap);
540       result.getParams().setTableName(tableName).setFamilies(familyMap);
541       logResult(result);
542     } else {
543       result = AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap);
544       result.getParams().setTableName(tableName).setFamilies(familyMap);
545       logResult(result);
546       if (authorizationEnabled) {
547         throw new AccessDeniedException("Insufficient permissions for user '" +
548           (user != null ? user.getShortName() : "null") +"' (global, action=" +
549           perm.toString() + ")");
550       }
551     }
552   }
553
554   /**
555    * Checks that the user has the given global permission. The generated
556    * audit log message will contain context information for the operation
557    * being authorized, based on the given parameters.
558    * @param perm Action being requested
559    * @param namespace
560    */
561   private void requireGlobalPermission(User user, String request, Action perm,
562                                        String namespace) throws IOException {
563     AuthResult authResult = null;
564     if (authManager.authorize(user, perm)) {
565       authResult = AuthResult.allow(request, "Global check allowed", user, perm, null);
566       authResult.getParams().setNamespace(namespace);
567       logResult(authResult);
568     } else {
569       authResult = AuthResult.deny(request, "Global check failed", user, perm, null);
570       authResult.getParams().setNamespace(namespace);
571       logResult(authResult);
572       if (authorizationEnabled) {
573         throw new AccessDeniedException("Insufficient permissions for user '" +
574           (user != null ? user.getShortName() : "null") +"' (global, action=" +
575           perm.toString() + ")");
576       }
577     }
578   }
579
580   /**
581    * Checks that the user has the given global or namespace permission.
582    * @param namespace
583    * @param permissions Actions being requested
584    */
585   public void requireNamespacePermission(User user, String request, String namespace,
586       Action... permissions) throws IOException {
587     AuthResult result = null;
588
589     for (Action permission : permissions) {
590       if (authManager.authorize(user, namespace, permission)) {
591         result = AuthResult.allow(request, "Namespace permission granted",
592             user, permission, namespace);
593         break;
594       } else {
595         // rest of the world
596         result = AuthResult.deny(request, "Insufficient permissions", user,
597             permission, namespace);
598       }
599     }
600     logResult(result);
601     if (authorizationEnabled && !result.isAllowed()) {
602       throw new AccessDeniedException("Insufficient permissions "
603           + result.toContextString());
604     }
605   }
606
607   /**
608    * Checks that the user has the given global or namespace permission.
609    * @param namespace
610    * @param permissions Actions being requested
611    */
612   public void requireNamespacePermission(User user, String request, String namespace,
613       TableName tableName, Map<byte[], ? extends Collection<byte[]>> familyMap,
614       Action... permissions)
615       throws IOException {
616     AuthResult result = null;
617
618     for (Action permission : permissions) {
619       if (authManager.authorize(user, namespace, permission)) {
620         result = AuthResult.allow(request, "Namespace permission granted",
621             user, permission, namespace);
622         result.getParams().setTableName(tableName).setFamilies(familyMap);
623         break;
624       } else {
625         // rest of the world
626         result = AuthResult.deny(request, "Insufficient permissions", user,
627             permission, namespace);
628         result.getParams().setTableName(tableName).setFamilies(familyMap);
629       }
630     }
631     logResult(result);
632     if (authorizationEnabled && !result.isAllowed()) {
633       throw new AccessDeniedException("Insufficient permissions "
634           + result.toContextString());
635     }
636   }
637
638   /**
639    * Returns <code>true</code> if the current user is allowed the given action
640    * over at least one of the column qualifiers in the given column families.
641    */
642   private boolean hasFamilyQualifierPermission(User user,
643       Action perm,
644       RegionCoprocessorEnvironment env,
645       Map<byte[], ? extends Collection<byte[]>> familyMap)
646     throws IOException {
647     HRegionInfo hri = env.getRegion().getRegionInfo();
648     TableName tableName = hri.getTable();
649
650     if (user == null) {
651       return false;
652     }
653
654     if (familyMap != null && familyMap.size() > 0) {
655       // at least one family must be allowed
656       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
657           familyMap.entrySet()) {
658         if (family.getValue() != null && !family.getValue().isEmpty()) {
659           for (byte[] qualifier : family.getValue()) {
660             if (authManager.matchPermission(user, tableName,
661                 family.getKey(), qualifier, perm)) {
662               return true;
663             }
664           }
665         } else {
666           if (authManager.matchPermission(user, tableName, family.getKey(),
667               perm)) {
668             return true;
669           }
670         }
671       }
672     } else if (LOG.isDebugEnabled()) {
673       LOG.debug("Empty family map passed for permission check");
674     }
675
676     return false;
677   }
678
679   private enum OpType {
680     GET("get"),
681     EXISTS("exists"),
682     SCAN("scan"),
683     PUT("put"),
684     DELETE("delete"),
685     CHECK_AND_PUT("checkAndPut"),
686     CHECK_AND_DELETE("checkAndDelete"),
687     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
688     APPEND("append"),
689     INCREMENT("increment");
690
691     private String type;
692
693     private OpType(String type) {
694       this.type = type;
695     }
696 
697     @Override
698     public String toString() {
699       return type;
700     }
701   }
702 
703   /**
704    * Determine if cell ACLs covered by the operation grant access. This is expensive.
705    * @return false if cell ACLs failed to grant access, true otherwise
706    * @throws IOException
707    */
708   private boolean checkCoveringPermission(User user, OpType request, RegionCoprocessorEnvironment e,
709       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
710       throws IOException {
711     if (!cellFeaturesEnabled) {
712       return false;
713     }
714     long cellGrants = 0;
715     long latestCellTs = 0;
716     Get get = new Get(row);
717     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
718     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
719     // version. We have to get every cell version and check its TS against the TS asked for in
720     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
721     // consider only one such passing cell. In case of Delete we have to consider all the cell
722     // versions under this passing version. When Delete Mutation contains columns which are a
723     // version delete just consider only one version for those column cells.
724     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
725     if (considerCellTs) {
726       get.setMaxVersions();
727     } else {
728       get.setMaxVersions(1);
729     }
730     boolean diffCellTsFromOpTs = false;
731     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
732       byte[] col = entry.getKey();
733       // TODO: HBASE-7114 could possibly unify the collection type in family
734       // maps so we would not need to do this
735       if (entry.getValue() instanceof Set) {
736         Set<byte[]> set = (Set<byte[]>)entry.getValue();
737         if (set == null || set.isEmpty()) {
738           get.addFamily(col);
739         } else {
740           for (byte[] qual: set) {
741             get.addColumn(col, qual);
742           }
743         }
744       } else if (entry.getValue() instanceof List) {
745         List<Cell> list = (List<Cell>)entry.getValue();
746         if (list == null || list.isEmpty()) {
747           get.addFamily(col);
748         } else {
749           // In case of family delete, a Cell will be added into the list with Qualifier as null.
750           for (Cell cell : list) {
751             if (cell.getQualifierLength() == 0
752                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
753                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
754               get.addFamily(col);
755             } else {
756               get.addColumn(col, CellUtil.cloneQualifier(cell));
757             }
758             if (considerCellTs) {
759               long cellTs = cell.getTimestamp();
760               latestCellTs = Math.max(latestCellTs, cellTs);
761               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
762             }
763           }
764         }
765       } else if (entry.getValue() == null) {
766         get.addFamily(col);
767       } else {
768         throw new RuntimeException("Unhandled collection type " +
769           entry.getValue().getClass().getName());
770       }
771     }
772     // We want to avoid looking into the future. So, if the cells of the
773     // operation specify a timestamp, or the operation itself specifies a
774     // timestamp, then we use the maximum ts found. Otherwise, we bound
775     // the Get to the current server time. We add 1 to the timerange since
776     // the upper bound of a timerange is exclusive yet we need to examine
777     // any cells found there inclusively.
778     long latestTs = Math.max(opTs, latestCellTs);
779     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
780       latestTs = EnvironmentEdgeManager.currentTime();
781     }
782     get.setTimeRange(0, latestTs + 1);
783     // In case of Put operation we set to read all versions. This was done to consider the case
784     // where columns are added with TS other than the Mutation TS. But normally this wont be the
785     // case with Put. There no need to get all versions but get latest version only.
786     if (!diffCellTsFromOpTs && request == OpType.PUT) {
787       get.setMaxVersions(1);
788     }
789     if (LOG.isTraceEnabled()) {
790       LOG.trace("Scanning for cells with " + get);
791     }
792     // This Map is identical to familyMap. The key is a BR rather than byte[].
793     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
794     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
795     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
796     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
797       if (entry.getValue() instanceof List) {
798         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
799       }
800     }
801     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
802     List<Cell> cells = Lists.newArrayList();
803     Cell prevCell = null;
804     ByteRange curFam = new SimpleMutableByteRange();
805     boolean curColAllVersions = (request == OpType.DELETE);
806     long curColCheckTs = opTs;
807     boolean foundColumn = false;
808     try {
809       boolean more = false;
810       ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
811
812       do {
813         cells.clear();
814         // scan with limit as 1 to hold down memory use on wide rows
815         more = scanner.next(cells, scannerContext);
816         for (Cell cell: cells) {
817           if (LOG.isTraceEnabled()) {
818             LOG.trace("Found cell " + cell);
819           }
820           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
821           if (colChange) foundColumn = false;
822           prevCell = cell;
823           if (!curColAllVersions && foundColumn) {
824             continue;
825           }
826           if (colChange && considerCellTs) {
827             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
828             List<Cell> cols = familyMap1.get(curFam);
829             for (Cell col : cols) {
830               // null/empty qualifier is used to denote a Family delete. The TS and delete type
831               // associated with this is applicable for all columns within the family. That is
832               // why the below (col.getQualifierLength() == 0) check.
833               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
834                   || CellUtil.matchingQualifier(cell, col)) {
835                 byte type = col.getTypeByte();
836                 if (considerCellTs) {
837                   curColCheckTs = col.getTimestamp();
838                 }
839                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
840                 // a version delete for a column no need to check all the covering cells within
841                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
842                 // One version delete types are Delete/DeleteFamilyVersion
843                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
844                     || (KeyValue.Type.DeleteFamily.getCode() == type);
845                 break;
846               }
847             }
848           }
849           if (cell.getTimestamp() > curColCheckTs) {
850             // Just ignore this cell. This is not a covering cell.
851             continue;
852           }
853           foundColumn = true;
854           for (Action action: actions) {
855             // Are there permissions for this user for the cell?
856             if (!authManager.authorize(user, getTableName(e), cell, action)) {
857               // We can stop if the cell ACL denies access
858               return false;
859             }
860           }
861           cellGrants++;
862         }
863       } while (more);
864     } catch (AccessDeniedException ex) {
865       throw ex;
866     } catch (IOException ex) {
867       LOG.error("Exception while getting cells to calculate covering permission", ex);
868     } finally {
869       scanner.close();
870     }
871     // We should not authorize unless we have found one or more cell ACLs that
872     // grant access. This code is used to check for additional permissions
873     // after no table or CF grants are found.
874     return cellGrants > 0;
875   }
876
877   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
878     // Iterate over the entries in the familyMap, replacing the cells therein
879     // with new cells including the ACL data
880     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
881       List<Cell> newCells = Lists.newArrayList();
882       for (Cell cell: e.getValue()) {
883         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
884         List<Tag> tags = new ArrayList<Tag>();
885         tags.add(new ArrayBackedTag(AccessControlLists.ACL_TAG_TYPE, perms));
886         Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell);
887         while (tagIterator.hasNext()) {
888           tags.add(tagIterator.next());
889         }
890         newCells.add(new TagRewriteCell(cell, TagUtil.fromList(tags)));
891       }
892       // This is supposed to be safe, won't CME
893       e.setValue(newCells);
894     }
895   }
896
897   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
898   // type is reserved and should not be explicitly set by user.
899   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
900     // No need to check if we're not going to throw
901     if (!authorizationEnabled) {
902       m.setAttribute(TAG_CHECK_PASSED, TRUE);
903       return;
904     }
905     // Superusers are allowed to store cells unconditionally.
906     if (Superusers.isSuperUser(user)) {
907       m.setAttribute(TAG_CHECK_PASSED, TRUE);
908       return;
909     }
910     // We already checked (prePut vs preBatchMutation)
911     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
912       return;
913     }
914     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
915       Iterator<Tag> tagsItr = CellUtil.tagsIterator(cellScanner.current());
916       while (tagsItr.hasNext()) {
917         if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
918           throw new AccessDeniedException("Mutation contains cell with reserved type tag");
919         }
920       }
921     }
922     m.setAttribute(TAG_CHECK_PASSED, TRUE);
923   }
924
925   /* ---- MasterObserver implementation ---- */
926   @Override
927   public void start(CoprocessorEnvironment env) throws IOException {
928     CompoundConfiguration conf = new CompoundConfiguration();
929     conf.add(env.getConfiguration());
930
931     authorizationEnabled = isAuthorizationSupported(conf);
932     if (!authorizationEnabled) {
933       LOG.warn("The AccessController has been loaded with authorization checks disabled.");
934     }
935
936     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
937       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
938
939     cellFeaturesEnabled = (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
940     if (!cellFeaturesEnabled) {
941       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
942           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
943           + " accordingly.");
944     }
945 
946     ZooKeeperWatcher zk = null;
947     if (env instanceof MasterCoprocessorEnvironment) {
948       // if running on HMaster
949       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
950       zk = mEnv.getMasterServices().getZooKeeper();
951     } else if (env instanceof RegionServerCoprocessorEnvironment) {
952       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
953       zk = rsEnv.getRegionServerServices().getZooKeeper();
954     } else if (env instanceof RegionCoprocessorEnvironment) {
955       // if running at region
956       regionEnv = (RegionCoprocessorEnvironment) env;
957       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
958       zk = regionEnv.getRegionServerServices().getZooKeeper();
959       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
960         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
961     }
962
963     // set the user-provider.
964     this.userProvider = UserProvider.instantiate(env.getConfiguration());
965
966     // If zk is null or IOException while obtaining auth manager,
967     // throw RuntimeException so that the coprocessor is unloaded.
968     if (zk != null) {
969       try {
970         this.authManager = TableAuthManager.getOrCreate(zk, env.getConfiguration());
971       } catch (IOException ioe) {
972         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
973       }
974     } else {
975       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
976     }
977
978     tableAcls = new MapMaker().weakValues().makeMap();
979   }
980
981   @Override
982   public void stop(CoprocessorEnvironment env) {
983     if (this.authManager != null) {
984       TableAuthManager.release(authManager);
985     }
986   }
987 
988   @Override
989   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
990       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
991     Set<byte[]> families = desc.getFamiliesKeys();
992     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
993     for (byte[] family: families) {
994       familyMap.put(family, null);
995     }
996     requireNamespacePermission(getActiveUser(c), "createTable",
997         desc.getTableName().getNamespaceAsString(), desc.getTableName(), familyMap, Action.CREATE);
998   }
999
1000   @Override
1001   public void postCompletedCreateTableAction(
1002       final ObserverContext<MasterCoprocessorEnvironment> c,
1003       final HTableDescriptor desc,
1004       final HRegionInfo[] regions) throws IOException {
1005     // When AC is used, it should be configured as the 1st CP.
1006     // In Master, the table operations like create, are handled by a Thread pool but the max size
1007     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
1008     // sequentially only.
1009     // Related code in HMaster#startServiceThreads
1010     // {code}
1011     //   // We depend on there being only one instance of this executor running
1012     //   // at a time. To do concurrency, would need fencing of enable/disable of
1013     //   // tables.
1014     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1015     // {code}
1016     // In future if we change this pool to have more threads, then there is a chance for thread,
1017     // creating acl table, getting delayed and by that time another table creation got over and
1018     // this hook is getting called. In such a case, we will need a wait logic here which will
1019     // wait till the acl table is created.
1020     if (AccessControlLists.isAclTable(desc)) {
1021       this.aclTabAvailable = true;
1022     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
1023       if (!aclTabAvailable) {
1024         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
1025             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
1026             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
1027       } else {
1028         String owner = desc.getOwnerString();
1029         // default the table owner to current user, if not specified.
1030         if (owner == null)
1031           owner = getActiveUser(c).getShortName();
1032         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1033             desc.getTableName(), null, Action.values());
1034         // switch to the real hbase master user for doing the RPC on the ACL table
1035         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1036           @Override
1037           public Void run() throws Exception {
1038             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
1039                 userperm);
1040             return null;
1041           }
1042         });
1043       }
1044     }
1045   }
1046
1047   @Override
1048   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1049       throws IOException {
1050     requirePermission(getActiveUser(c), "deleteTable", tableName, null, null,
1051         Action.ADMIN, Action.CREATE);
1052   }
1053
1054   @Override
1055   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
1056       final TableName tableName) throws IOException {
1057     final Configuration conf = c.getEnvironment().getConfiguration();
1058     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1059       @Override
1060       public Void run() throws Exception {
1061         AccessControlLists.removeTablePermissions(conf, tableName);
1062         return null;
1063       }
1064     });
1065     this.authManager.getZKPermissionWatcher().deleteTableACLNode(tableName);
1066   }
1067
1068   @Override
1069   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
1070       final TableName tableName) throws IOException {
1071     requirePermission(getActiveUser(c), "truncateTable", tableName, null, null,
1072         Action.ADMIN, Action.CREATE);
1073
1074     final Configuration conf = c.getEnvironment().getConfiguration();
1075     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1076       @Override
1077       public Void run() throws Exception {
1078         List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
1079         if (acls != null) {
1080           tableAcls.put(tableName, acls);
1081         }
1082         return null;
1083       }
1084     });
1085   }
1086
1087   @Override
1088   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
1089       final TableName tableName) throws IOException {
1090     final Configuration conf = ctx.getEnvironment().getConfiguration();
1091     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1092       @Override
1093       public Void run() throws Exception {
1094         List<UserPermission> perms = tableAcls.get(tableName);
1095         if (perms != null) {
1096           for (UserPermission perm : perms) {
1097             AccessControlLists.addUserPermission(conf, perm);
1098           }
1099         }
1100         tableAcls.remove(tableName);
1101         return null;
1102       }
1103     });
1104   }
1105
1106   @Override
1107   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1108       HTableDescriptor htd) throws IOException {
1109     requirePermission(getActiveUser(c), "modifyTable", tableName, null, null,
1110         Action.ADMIN, Action.CREATE);
1111   }
1112
1113   @Override
1114   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
1115       TableName tableName, final HTableDescriptor htd) throws IOException {
1116     final Configuration conf = c.getEnvironment().getConfiguration();
1117     // default the table owner to current user, if not specified.
1118     final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
1119       getActiveUser(c).getShortName();
1120     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1121       @Override
1122       public Void run() throws Exception {
1123         UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1124             htd.getTableName(), null, Action.values());
1125         AccessControlLists.addUserPermission(conf, userperm);
1126         return null;
1127       }
1128     });
1129   }
1130
1131   @Override
1132   public void preAddColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1133                                  TableName tableName, HColumnDescriptor columnFamily)
1134       throws IOException {
1135     requireTablePermission(getActiveUser(ctx), "addColumn", tableName, columnFamily.getName(), null,
1136         Action.ADMIN, Action.CREATE);
1137   }
1138
1139   @Override
1140   public void preModifyColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1141                                     TableName tableName, HColumnDescriptor columnFamily)
1142       throws IOException {
1143     requirePermission(getActiveUser(ctx), "modifyColumn", tableName, columnFamily.getName(), null,
1144         Action.ADMIN, Action.CREATE);
1145   }
1146
1147   @Override
1148   public void preDeleteColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1149                                     TableName tableName, byte[] columnFamily) throws IOException {
1150     requirePermission(getActiveUser(ctx), "deleteColumn", tableName, columnFamily, null,
1151         Action.ADMIN, Action.CREATE);
1152   }
1153
1154   @Override
1155   public void postDeleteColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1156       final TableName tableName, final byte[] columnFamily) throws IOException {
1157     final Configuration conf = ctx.getEnvironment().getConfiguration();
1158     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1159       @Override
1160       public Void run() throws Exception {
1161         AccessControlLists.removeTablePermissions(conf, tableName, columnFamily);
1162         return null;
1163       }
1164     });
1165   }
1166
1167   @Override
1168   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1169       throws IOException {
1170     requirePermission(getActiveUser(c), "enableTable", tableName, null, null,
1171         Action.ADMIN, Action.CREATE);
1172   }
1173
1174   @Override
1175   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1176       throws IOException {
1177     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1178       // We have to unconditionally disallow disable of the ACL table when we are installed,
1179       // even if not enforcing authorizations. We are still allowing grants and revocations,
1180       // checking permissions and logging audit messages, etc. If the ACL table is not
1181       // available we will fail random actions all over the place.
1182       throw new AccessDeniedException("Not allowed to disable "
1183           + AccessControlLists.ACL_TABLE_NAME + " table with AccessController installed");
1184     }
1185     requirePermission(getActiveUser(c), "disableTable", tableName, null, null,
1186         Action.ADMIN, Action.CREATE);
1187   }
1188
1189   @Override
1190   public void preAbortProcedure(
1191       ObserverContext<MasterCoprocessorEnvironment> ctx,
1192       final ProcedureExecutor<MasterProcedureEnv> procEnv,
1193       final long procId) throws IOException {
1194     if (!procEnv.isProcedureOwner(procId, getActiveUser(ctx))) {
1195       // If the user is not the procedure owner, then we should further probe whether
1196       // he can abort the procedure.
1197       requirePermission(getActiveUser(ctx), "abortProcedure", Action.ADMIN);
1198     }
1199   }
1200
1201   @Override
1202   public void postAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx)
1203       throws IOException {
1204     // There is nothing to do at this time after the procedure abort request was sent.
1205   }
1206
1207   @Override
1208   public void preListProcedures(ObserverContext<MasterCoprocessorEnvironment> ctx)
1209       throws IOException {
1210     // We are delegating the authorization check to postListProcedures as we don't have
1211     // any concrete set of procedures to work with
1212   }
1213
1214   @Override
1215   public void postListProcedures(
1216       ObserverContext<MasterCoprocessorEnvironment> ctx,
1217       List<ProcedureInfo> procInfoList) throws IOException {
1218     if (procInfoList.isEmpty()) {
1219       return;
1220     }
1221
1222     // Retains only those which passes authorization checks, as the checks weren't done as part
1223     // of preListProcedures.
1224     Iterator<ProcedureInfo> itr = procInfoList.iterator();
1225     User user = getActiveUser(ctx);
1226     while (itr.hasNext()) {
1227       ProcedureInfo procInfo = itr.next();
1228       try {
1229         if (!ProcedureInfo.isProcedureOwner(procInfo, user)) {
1230           // If the user is not the procedure owner, then we should further probe whether
1231           // he can see the procedure.
1232           requirePermission(user, "listProcedures", Action.ADMIN);
1233         }
1234       } catch (AccessDeniedException e) {
1235         itr.remove();
1236       }
1237     }
1238   }
1239
1240   @Override
1241   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1242       ServerName srcServer, ServerName destServer) throws IOException {
1243     requirePermission(getActiveUser(c), "move", region.getTable(), null, null, Action.ADMIN);
1244   }
1245
1246   @Override
1247   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1248       throws IOException {
1249     requirePermission(getActiveUser(c), "assign", regionInfo.getTable(), null, null, Action.ADMIN);
1250   }
1251
1252   @Override
1253   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1254       boolean force) throws IOException {
1255     requirePermission(getActiveUser(c), "unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1256   }
1257
1258   @Override
1259   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1260       HRegionInfo regionInfo) throws IOException {
1261     requirePermission(getActiveUser(c), "regionOffline", regionInfo.getTable(), null, null,
1262         Action.ADMIN);
1263   }
1264
1265   @Override
1266   public boolean preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1267       final boolean newValue, final MasterSwitchType switchType) throws IOException {
1268     requirePermission(getActiveUser(ctx), "setSplitOrMergeEnabled", Action.ADMIN);
1269     return false;
1270   }
1271
1272   @Override
1273   public void postSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1274       final boolean newValue, final MasterSwitchType switchType) throws IOException {
1275   }
1276
1277   @Override
1278   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1279       throws IOException {
1280     requirePermission(getActiveUser(c), "balance", Action.ADMIN);
1281   }
1282
1283   @Override
1284   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1285       boolean newValue) throws IOException {
1286     requirePermission(getActiveUser(c), "balanceSwitch", Action.ADMIN);
1287     return newValue;
1288   }
1289
1290   @Override
1291   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1292       throws IOException {
1293     requirePermission(getActiveUser(c), "shutdown", Action.ADMIN);
1294   }
1295
1296   @Override
1297   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1298       throws IOException {
1299     requirePermission(getActiveUser(c), "stopMaster", Action.ADMIN);
1300   }
1301
1302   @Override
1303   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1304       throws IOException {
1305     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1306       .getConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1307       // initialize the ACL storage table
1308       AccessControlLists.createACLTable(ctx.getEnvironment().getMasterServices());
1309     } else {
1310       aclTabAvailable = true;
1311     }
1312   }
1313
1314   @Override
1315   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1316       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1317       throws IOException {
1318     requirePermission(getActiveUser(ctx), "snapshot", hTableDescriptor.getTableName(), null, null,
1319       Permission.Action.ADMIN);
1320   }
1321
1322   @Override
1323   public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1324       final SnapshotDescription snapshot) throws IOException {
1325     User user = getActiveUser(ctx);
1326     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1327       // list it, if user is the owner of snapshot
1328       // TODO: We are not logging this for audit
1329     } else {
1330       requirePermission(user, "listSnapshot", Action.ADMIN);
1331     }
1332   }
1333
1334   @Override
1335   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1336       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1337       throws IOException {
1338     requirePermission(getActiveUser(ctx), "clone", Action.ADMIN);
1339   }
1340
1341   @Override
1342   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1343       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1344       throws IOException {
1345     User user = getActiveUser(ctx);
1346     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1347       requirePermission(user, "restoreSnapshot", hTableDescriptor.getTableName(), null, null,
1348         Permission.Action.ADMIN);
1349     } else {
1350       requirePermission(user, "restoreSnapshot", Action.ADMIN);
1351     }
1352   }
1353
1354   @Override
1355   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1356       final SnapshotDescription snapshot) throws IOException {
1357     User user = getActiveUser(ctx);
1358     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1359       // Snapshot owner is allowed to delete the snapshot
1360       // TODO: We are not logging this for audit
1361     } else {
1362       requirePermission(user, "deleteSnapshot", Action.ADMIN);
1363     }
1364   }
1365
1366   @Override
1367   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1368       NamespaceDescriptor ns) throws IOException {
1369     requireGlobalPermission(getActiveUser(ctx), "createNamespace", Action.ADMIN, ns.getName());
1370   }
1371
1372   @Override
1373   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1374       throws IOException {
1375     requireGlobalPermission(getActiveUser(ctx), "deleteNamespace", Action.ADMIN, namespace);
1376   }
1377
1378   @Override
1379   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1380       final String namespace) throws IOException {
1381     final Configuration conf = ctx.getEnvironment().getConfiguration();
1382     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1383       @Override
1384       public Void run() throws Exception {
1385         AccessControlLists.removeNamespacePermissions(conf, namespace);
1386         return null;
1387       }
1388     });
1389     this.authManager.getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
1390     LOG.info(namespace + " entry deleted in " + AccessControlLists.ACL_TABLE_NAME + " table.");
1391   }
1392
1393   @Override
1394   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1395       NamespaceDescriptor ns) throws IOException {
1396     // We require only global permission so that
1397     // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1398     requireGlobalPermission(getActiveUser(ctx), "modifyNamespace", Action.ADMIN, ns.getName());
1399   }
1400
1401   @Override
1402   public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1403       throws IOException {
1404     requireNamespacePermission(getActiveUser(ctx), "getNamespaceDescriptor", namespace, Action.ADMIN);
1405   }
1406
1407   @Override
1408   public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1409       List<NamespaceDescriptor> descriptors) throws IOException {
1410     // Retains only those which passes authorization checks, as the checks weren't done as part
1411     // of preGetTableDescriptors.
1412     Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1413     User user = getActiveUser(ctx);
1414     while (itr.hasNext()) {
1415       NamespaceDescriptor desc = itr.next();
1416       try {
1417         requireNamespacePermission(user, "listNamespaces", desc.getName(), Action.ADMIN);
1418       } catch (AccessDeniedException e) {
1419         itr.remove();
1420       }
1421     }
1422   }
1423
1424   @Override
1425   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1426       final TableName tableName) throws IOException {
1427     requirePermission(getActiveUser(ctx), "flushTable", tableName, null, null,
1428         Action.ADMIN, Action.CREATE);
1429   }
1430
1431   /* ---- RegionObserver implementation ---- */
1432
1433   @Override
1434   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c)
1435       throws IOException {
1436     RegionCoprocessorEnvironment env = c.getEnvironment();
1437     final Region region = env.getRegion();
1438     if (region == null) {
1439       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1440     } else {
1441       HRegionInfo regionInfo = region.getRegionInfo();
1442       if (regionInfo.getTable().isSystemTable()) {
1443         checkSystemOrSuperUser(getActiveUser(c));
1444       } else {
1445         requirePermission(getActiveUser(c), "preOpen", Action.ADMIN);
1446       }
1447     }
1448   }
1449
1450   @Override
1451   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1452     RegionCoprocessorEnvironment env = c.getEnvironment();
1453     final Region region = env.getRegion();
1454     if (region == null) {
1455       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1456       return;
1457     }
1458     if (AccessControlLists.isAclRegion(region)) {
1459       aclRegion = true;
1460       // When this region is under recovering state, initialize will be handled by postLogReplay
1461       if (!region.isRecovering()) {
1462         try {
1463           initialize(env);
1464         } catch (IOException ex) {
1465           // if we can't obtain permissions, it's better to fail
1466           // than perform checks incorrectly
1467           throw new RuntimeException("Failed to initialize permissions cache", ex);
1468         }
1469       }
1470     } else {
1471       initialized = true;
1472     }
1473   }
1474
1475   @Override
1476   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1477     if (aclRegion) {
1478       try {
1479         initialize(c.getEnvironment());
1480       } catch (IOException ex) {
1481         // if we can't obtain permissions, it's better to fail
1482         // than perform checks incorrectly
1483         throw new RuntimeException("Failed to initialize permissions cache", ex);
1484       }
1485     }
1486   }
1487
1488   @Override
1489   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
1490     requirePermission(getActiveUser(c), "flush", getTableName(c.getEnvironment()), null, null,
1491         Action.ADMIN, Action.CREATE);
1492   }
1493
1494   @Override
1495   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
1496     requirePermission(getActiveUser(c), "split", getTableName(c.getEnvironment()), null, null,
1497         Action.ADMIN);
1498   }
1499
1500   @Override
1501   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c,
1502       byte[] splitRow) throws IOException {
1503     requirePermission(getActiveUser(c), "split", getTableName(c.getEnvironment()), null, null,
1504         Action.ADMIN);
1505   }
1506
1507   @Override
1508   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c,
1509       final Store store, final InternalScanner scanner, final ScanType scanType)
1510           throws IOException {
1511     requirePermission(getActiveUser(c), "compact", getTableName(c.getEnvironment()), null, null,
1512         Action.ADMIN, Action.CREATE);
1513     return scanner;
1514   }
1515
1516   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1517       final Query query, OpType opType) throws IOException {
1518     Filter filter = query.getFilter();
1519     // Don't wrap an AccessControlFilter
1520     if (filter != null && filter instanceof AccessControlFilter) {
1521       return;
1522     }
1523     User user = getActiveUser(c);
1524     RegionCoprocessorEnvironment env = c.getEnvironment();
1525     Map<byte[],? extends Collection<byte[]>> families = null;
1526     switch (opType) {
1527     case GET:
1528     case EXISTS:
1529       families = ((Get)query).getFamilyMap();
1530       break;
1531     case SCAN:
1532       families = ((Scan)query).getFamilyMap();
1533       break;
1534     default:
1535       throw new RuntimeException("Unhandled operation " + opType);
1536     }
1537     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1538     Region region = getRegion(env);
1539     TableName table = getTableName(region);
1540     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1541     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1542       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1543     }
1544     if (!authResult.isAllowed()) {
1545       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1546         // Old behavior: Scan with only qualifier checks if we have partial
1547         // permission. Backwards compatible behavior is to throw an
1548         // AccessDeniedException immediately if there are no grants for table
1549         // or CF or CF+qual. Only proceed with an injected filter if there are
1550         // grants for qualifiers. Otherwise we will fall through below and log
1551         // the result and throw an ADE. We may end up checking qualifier
1552         // grants three times (permissionGranted above, here, and in the
1553         // filter) but that's the price of backwards compatibility.
1554         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1555           authResult.setAllowed(true);
1556           authResult.setReason("Access allowed with filter");
1557           // Only wrap the filter if we are enforcing authorizations
1558           if (authorizationEnabled) {
1559             Filter ourFilter = new AccessControlFilter(authManager, user, table,
1560               AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1561               cfVsMaxVersions);
1562             // wrap any existing filter
1563             if (filter != null) {
1564               ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1565                 Lists.newArrayList(ourFilter, filter));
1566             }
1567             switch (opType) {
1568               case GET:
1569               case EXISTS:
1570                 ((Get)query).setFilter(ourFilter);
1571                 break;
1572               case SCAN:
1573                 ((Scan)query).setFilter(ourFilter);
1574                 break;
1575               default:
1576                 throw new RuntimeException("Unhandled operation " + opType);
1577             }
1578           }
1579         }
1580       } else {
1581         // New behavior: Any access we might be granted is more fine-grained
1582         // than whole table or CF. Simply inject a filter and return what is
1583         // allowed. We will not throw an AccessDeniedException. This is a
1584         // behavioral change since 0.96.
1585         authResult.setAllowed(true);
1586         authResult.setReason("Access allowed with filter");
1587         // Only wrap the filter if we are enforcing authorizations
1588         if (authorizationEnabled) {
1589           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1590             AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1591           // wrap any existing filter
1592           if (filter != null) {
1593             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1594               Lists.newArrayList(ourFilter, filter));
1595           }
1596           switch (opType) {
1597             case GET:
1598             case EXISTS:
1599               ((Get)query).setFilter(ourFilter);
1600               break;
1601             case SCAN:
1602               ((Scan)query).setFilter(ourFilter);
1603               break;
1604             default:
1605               throw new RuntimeException("Unhandled operation " + opType);
1606           }
1607         }
1608       }
1609     }
1610
1611     logResult(authResult);
1612     if (authorizationEnabled && !authResult.isAllowed()) {
1613       throw new AccessDeniedException("Insufficient permissions for user '"
1614           + (user != null ? user.getShortName() : "null")
1615           + "' (table=" + table + ", action=READ)");
1616     }
1617   }
1618
1619   @Override
1620   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1621       final Get get, final List<Cell> result) throws IOException {
1622     internalPreRead(c, get, OpType.GET);
1623   }
1624
1625   @Override
1626   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1627       final Get get, final boolean exists) throws IOException {
1628     internalPreRead(c, get, OpType.EXISTS);
1629     return exists;
1630   }
1631
1632   @Override
1633   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1634       final Put put, final WALEdit edit, final Durability durability)
1635       throws IOException {
1636     User user = getActiveUser(c);
1637     checkForReservedTagPresence(user, put);
1638
1639     // Require WRITE permission to the table, CF, or top visible value, if any.
1640     // NOTE: We don't need to check the permissions for any earlier Puts
1641     // because we treat the ACLs in each Put as timestamped like any other
1642     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1643     // change the ACL of any previous Put. This allows simple evolution of
1644     // security policy over time without requiring expensive updates.
1645     RegionCoprocessorEnvironment env = c.getEnvironment();
1646     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1647     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1648     logResult(authResult);
1649     if (!authResult.isAllowed()) {
1650       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1651         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1652       } else if (authorizationEnabled) {
1653         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1654       }
1655     }
1656
1657     // Add cell ACLs from the operation to the cells themselves
1658     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1659     if (bytes != null) {
1660       if (cellFeaturesEnabled) {
1661         addCellPermissions(bytes, put.getFamilyCellMap());
1662       } else {
1663         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1664       }
1665     }
1666   }
1667
1668   @Override
1669   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1670       final Put put, final WALEdit edit, final Durability durability) {
1671     if (aclRegion) {
1672       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1673     }
1674   }
1675
1676   @Override
1677   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1678       final Delete delete, final WALEdit edit, final Durability durability)
1679       throws IOException {
1680     // An ACL on a delete is useless, we shouldn't allow it
1681     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1682       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1683     }
1684     // Require WRITE permissions on all cells covered by the delete. Unlike
1685     // for Puts we need to check all visible prior versions, because a major
1686     // compaction could remove them. If the user doesn't have permission to
1687     // overwrite any of the visible versions ('visible' defined as not covered
1688     // by a tombstone already) then we have to disallow this operation.
1689     RegionCoprocessorEnvironment env = c.getEnvironment();
1690     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1691     User user = getActiveUser(c);
1692     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1693     logResult(authResult);
1694     if (!authResult.isAllowed()) {
1695       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1696         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1697       } else if (authorizationEnabled) {
1698         throw new AccessDeniedException("Insufficient permissions " +
1699           authResult.toContextString());
1700       }
1701     }
1702   }
1703
1704   @Override
1705   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1706       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1707     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1708       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1709       User user = getActiveUser(c);
1710       for (int i = 0; i < miniBatchOp.size(); i++) {
1711         Mutation m = miniBatchOp.getOperation(i);
1712         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1713           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1714           // perm check
1715           OpType opType;
1716           if (m instanceof Put) {
1717             checkForReservedTagPresence(user, m);
1718             opType = OpType.PUT;
1719           } else {
1720             opType = OpType.DELETE;
1721           }
1722           AuthResult authResult = null;
1723           if (checkCoveringPermission(user, opType, c.getEnvironment(), m.getRow(),
1724             m.getFamilyCellMap(), m.getTimeStamp(), Action.WRITE)) {
1725             authResult = AuthResult.allow(opType.toString(), "Covering cell set",
1726               user, Action.WRITE, table, m.getFamilyCellMap());
1727           } else {
1728             authResult = AuthResult.deny(opType.toString(), "Covering cell set",
1729               user, Action.WRITE, table, m.getFamilyCellMap());
1730           }
1731           logResult(authResult);
1732           if (authorizationEnabled && !authResult.isAllowed()) {
1733             throw new AccessDeniedException("Insufficient permissions "
1734               + authResult.toContextString());
1735           }
1736         }
1737       }
1738     }
1739   }
1740
1741   @Override
1742   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1743       final Delete delete, final WALEdit edit, final Durability durability)
1744       throws IOException {
1745     if (aclRegion) {
1746       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1747     }
1748   }
1749
1750   @Override
1751   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1752       final byte [] row, final byte [] family, final byte [] qualifier,
1753       final CompareFilter.CompareOp compareOp,
1754       final ByteArrayComparable comparator, final Put put,
1755       final boolean result) throws IOException {
1756     User user = getActiveUser(c);
1757     checkForReservedTagPresence(user, put);
1758
1759     // Require READ and WRITE permissions on the table, CF, and KV to update
1760     RegionCoprocessorEnvironment env = c.getEnvironment();
1761     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1762     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1763       Action.READ, Action.WRITE);
1764     logResult(authResult);
1765     if (!authResult.isAllowed()) {
1766       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1767         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1768       } else if (authorizationEnabled) {
1769         throw new AccessDeniedException("Insufficient permissions " +
1770           authResult.toContextString());
1771       }
1772     }
1773
1774     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1775     if (bytes != null) {
1776       if (cellFeaturesEnabled) {
1777         addCellPermissions(bytes, put.getFamilyCellMap());
1778       } else {
1779         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1780       }
1781     }
1782     return result;
1783   }
1784
1785   @Override
1786   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1787       final byte[] row, final byte[] family, final byte[] qualifier,
1788       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1789       final boolean result) throws IOException {
1790     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1791       // We had failure with table, cf and q perm checks and now giving a chance for cell
1792       // perm check
1793       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1794       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1795       AuthResult authResult = null;
1796       User user = getActiveUser(c);
1797       if (checkCoveringPermission(user, OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1798           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1799         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1800             user, Action.READ, table, families);
1801       } else {
1802         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1803             user, Action.READ, table, families);
1804       }
1805       logResult(authResult);
1806       if (authorizationEnabled && !authResult.isAllowed()) {
1807         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1808       }
1809     }
1810     return result;
1811   }
1812
1813   @Override
1814   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1815       final byte [] row, final byte [] family, final byte [] qualifier,
1816       final CompareFilter.CompareOp compareOp,
1817       final ByteArrayComparable comparator, final Delete delete,
1818       final boolean result) throws IOException {
1819     // An ACL on a delete is useless, we shouldn't allow it
1820     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1821       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1822           delete.toString());
1823     }
1824     // Require READ and WRITE permissions on the table, CF, and the KV covered
1825     // by the delete
1826     RegionCoprocessorEnvironment env = c.getEnvironment();
1827     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1828     User user = getActiveUser(c);
1829     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1830         Action.READ, Action.WRITE);
1831     logResult(authResult);
1832     if (!authResult.isAllowed()) {
1833       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1834         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1835       } else if (authorizationEnabled) {
1836         throw new AccessDeniedException("Insufficient permissions " +
1837           authResult.toContextString());
1838       }
1839     }
1840     return result;
1841   }
1842
1843   @Override
1844   public boolean preCheckAndDeleteAfterRowLock(
1845       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1846       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1847       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1848       throws IOException {
1849     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1850       // We had failure with table, cf and q perm checks and now giving a chance for cell
1851       // perm check
1852       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1853       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1854       AuthResult authResult = null;
1855       User user = getActiveUser(c);
1856       if (checkCoveringPermission(user, OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1857           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1858         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1859             user, Action.READ, table, families);
1860       } else {
1861         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1862             user, Action.READ, table, families);
1863       }
1864       logResult(authResult);
1865       if (authorizationEnabled && !authResult.isAllowed()) {
1866         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1867       }
1868     }
1869     return result;
1870   }
1871
1872   @Override
1873   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1874       final byte [] row, final byte [] family, final byte [] qualifier,
1875       final long amount, final boolean writeToWAL)
1876       throws IOException {
1877     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1878     // incremented value
1879     RegionCoprocessorEnvironment env = c.getEnvironment();
1880     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1881     User user = getActiveUser(c);
1882     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1883         Action.WRITE);
1884     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1885       authResult.setAllowed(checkCoveringPermission(user, OpType.INCREMENT_COLUMN_VALUE, env, row,
1886         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1887       authResult.setReason("Covering cell set");
1888     }
1889     logResult(authResult);
1890     if (authorizationEnabled && !authResult.isAllowed()) {
1891       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1892     }
1893     return -1;
1894   }
1895
1896   @Override
1897   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1898       throws IOException {
1899     User user = getActiveUser(c);
1900     checkForReservedTagPresence(user, append);
1901
1902     // Require WRITE permission to the table, CF, and the KV to be appended
1903     RegionCoprocessorEnvironment env = c.getEnvironment();
1904     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1905     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1906     logResult(authResult);
1907     if (!authResult.isAllowed()) {
1908       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1909         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1910       } else if (authorizationEnabled)  {
1911         throw new AccessDeniedException("Insufficient permissions " +
1912           authResult.toContextString());
1913       }
1914     }
1915
1916     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1917     if (bytes != null) {
1918       if (cellFeaturesEnabled) {
1919         addCellPermissions(bytes, append.getFamilyCellMap());
1920       } else {
1921         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1922       }
1923     }
1924
1925     return null;
1926   }
1927
1928   @Override
1929   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1930       final Append append) throws IOException {
1931     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1932       // We had failure with table, cf and q perm checks and now giving a chance for cell
1933       // perm check
1934       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1935       AuthResult authResult = null;
1936       User user = getActiveUser(c);
1937       if (checkCoveringPermission(user, OpType.APPEND, c.getEnvironment(), append.getRow(),
1938           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1939         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1940             user, Action.WRITE, table, append.getFamilyCellMap());
1941       } else {
1942         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1943             user, Action.WRITE, table, append.getFamilyCellMap());
1944       }
1945       logResult(authResult);
1946       if (authorizationEnabled && !authResult.isAllowed()) {
1947         throw new AccessDeniedException("Insufficient permissions " +
1948           authResult.toContextString());
1949       }
1950     }
1951     return null;
1952   }
1953
1954   @Override
1955   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1956       final Increment increment)
1957       throws IOException {
1958     User user = getActiveUser(c);
1959     checkForReservedTagPresence(user, increment);
1960
1961     // Require WRITE permission to the table, CF, and the KV to be replaced by
1962     // the incremented value
1963     RegionCoprocessorEnvironment env = c.getEnvironment();
1964     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1965     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1966       Action.WRITE);
1967     logResult(authResult);
1968     if (!authResult.isAllowed()) {
1969       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1970         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1971       } else if (authorizationEnabled) {
1972         throw new AccessDeniedException("Insufficient permissions " +
1973           authResult.toContextString());
1974       }
1975     }
1976
1977     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1978     if (bytes != null) {
1979       if (cellFeaturesEnabled) {
1980         addCellPermissions(bytes, increment.getFamilyCellMap());
1981       } else {
1982         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1983       }
1984     }
1985
1986     return null;
1987   }
1988
1989   @Override
1990   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1991       final Increment increment) throws IOException {
1992     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1993       // We had failure with table, cf and q perm checks and now giving a chance for cell
1994       // perm check
1995       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1996       AuthResult authResult = null;
1997       User user = getActiveUser(c);
1998       if (checkCoveringPermission(user, OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1999           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
2000         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
2001             user, Action.WRITE, table, increment.getFamilyCellMap());
2002       } else {
2003         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
2004             user, Action.WRITE, table, increment.getFamilyCellMap());
2005       }
2006       logResult(authResult);
2007       if (authorizationEnabled && !authResult.isAllowed()) {
2008         throw new AccessDeniedException("Insufficient permissions " +
2009           authResult.toContextString());
2010       }
2011     }
2012     return null;
2013   }
2014
2015   @Override
2016   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
2017       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
2018     // If the HFile version is insufficient to persist tags, we won't have any
2019     // work to do here
2020     if (!cellFeaturesEnabled) {
2021       return newCell;
2022     }
2023
2024     // Collect any ACLs from the old cell
2025     List<Tag> tags = Lists.newArrayList();
2026     List<Tag> aclTags = Lists.newArrayList();
2027     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
2028     if (oldCell != null) {
2029       Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell);
2030       while (tagIterator.hasNext()) {
2031         Tag tag = tagIterator.next();
2032         if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
2033           // Not an ACL tag, just carry it through
2034           if (LOG.isTraceEnabled()) {
2035             LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType()
2036                 + " length " + tag.getValueLength());
2037           }
2038           tags.add(tag);
2039         } else {
2040           aclTags.add(tag);
2041         }
2042       }
2043     }
2044
2045     // Do we have an ACL on the operation?
2046     byte[] aclBytes = mutation.getACL();
2047     if (aclBytes != null) {
2048       // Yes, use it
2049       tags.add(new ArrayBackedTag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
2050     } else {
2051       // No, use what we carried forward
2052       if (perms != null) {
2053         // TODO: If we collected ACLs from more than one tag we may have a
2054         // List<Permission> of size > 1, this can be collapsed into a single
2055         // Permission
2056         if (LOG.isTraceEnabled()) {
2057           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
2058         }
2059         tags.addAll(aclTags);
2060       }
2061     }
2062
2063     // If we have no tags to add, just return
2064     if (tags.isEmpty()) {
2065       return newCell;
2066     }
2067
2068     Cell rewriteCell = new TagRewriteCell(newCell, TagUtil.fromList(tags));
2069     return rewriteCell;
2070   }
2071
2072   @Override
2073   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2074       final Scan scan, final RegionScanner s) throws IOException {
2075     internalPreRead(c, scan, OpType.SCAN);
2076     return s;
2077   }
2078
2079   @Override
2080   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2081       final Scan scan, final RegionScanner s) throws IOException {
2082     User user = getActiveUser(c);
2083     if (user != null && user.getShortName() != null) {
2084       // store reference to scanner owner for later checks
2085       scannerOwners.put(s, user.getShortName());
2086     }
2087     return s;
2088   }
2089
2090   @Override
2091   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
2092       final InternalScanner s, final List<Result> result,
2093       final int limit, final boolean hasNext) throws IOException {
2094     requireScannerOwner(s);
2095     return hasNext;
2096   }
2097
2098   @Override
2099   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2100       final InternalScanner s) throws IOException {
2101     requireScannerOwner(s);
2102   }
2103
2104   @Override
2105   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2106       final InternalScanner s) throws IOException {
2107     // clean up any associated owner mapping
2108     scannerOwners.remove(s);
2109   }
2110
2111   @Override
2112   public boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> e,
2113       final InternalScanner s, final Cell curRowCell, final boolean hasMore) throws IOException {
2114     // Impl in BaseRegionObserver might do unnecessary copy for Off heap backed Cells.
2115     return hasMore;
2116   }
2117
2118   /**
2119    * Verify, when servicing an RPC, that the caller is the scanner owner.
2120    * If so, we assume that access control is correctly enforced based on
2121    * the checks performed in preScannerOpen()
2122    */
2123   private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
2124     if (!RpcServer.isInRpcCallContext())
2125       return;
2126     String requestUserName = RpcServer.getRequestUserName();
2127     String owner = scannerOwners.get(s);
2128     if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
2129       throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
2130     }
2131   }
2132
2133   /**
2134    * Verifies user has CREATE privileges on
2135    * the Column Families involved in the bulkLoadHFile
2136    * request. Specific Column Write privileges are presently
2137    * ignored.
2138    */
2139   @Override
2140   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
2141       List<Pair<byte[], String>> familyPaths) throws IOException {
2142     User user = getActiveUser(ctx);
2143     for(Pair<byte[],String> el : familyPaths) {
2144       requirePermission(user, "preBulkLoadHFile",
2145           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
2146           el.getFirst(),
2147           null,
2148           Action.CREATE);
2149     }
2150   }
2151
2152   /**
2153    * Authorization check for
2154    * SecureBulkLoadProtocol.prepareBulkLoad()
2155    * @param ctx the context
2156    * @param request the request
2157    * @throws IOException
2158    */
2159   @Override
2160   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2161       PrepareBulkLoadRequest request) throws IOException {
2162     requireAccess(getActiveUser(ctx), "prePrepareBulkLoad",
2163         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2164   }
2165
2166   /**
2167    * Authorization security check for
2168    * SecureBulkLoadProtocol.cleanupBulkLoad()
2169    * @param ctx the context
2170    * @param request the request
2171    * @throws IOException
2172    */
2173   @Override
2174   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2175       CleanupBulkLoadRequest request) throws IOException {
2176     requireAccess(getActiveUser(ctx), "preCleanupBulkLoad",
2177         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2178   }
2179
2180   /* ---- EndpointObserver implementation ---- */
2181
2182   @Override
2183   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2184       Service service, String methodName, Message request) throws IOException {
2185     // Don't intercept calls to our own AccessControlService, we check for
2186     // appropriate permissions in the service handlers
2187     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2188       requirePermission(getActiveUser(ctx),
2189           "invoke(" + service.getDescriptorForType().getName() + "." + methodName + ")",
2190           getTableName(ctx.getEnvironment()), null, null,
2191           Action.EXEC);
2192     }
2193     return request;
2194   }
2195
2196   @Override
2197   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2198       Service service, String methodName, Message request, Message.Builder responseBuilder)
2199       throws IOException { }
2200
2201   /* ---- Protobuf AccessControlService implementation ---- */
2202
2203   @Override
2204   public void grant(RpcController controller,
2205                     AccessControlProtos.GrantRequest request,
2206                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2207     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2208     AccessControlProtos.GrantResponse response = null;
2209     try {
2210       // verify it's only running at .acl.
2211       if (aclRegion) {
2212         if (!initialized) {
2213           throw new CoprocessorException("AccessController not yet initialized");
2214         }
2215         if (LOG.isDebugEnabled()) {
2216           LOG.debug("Received request to grant access permission " + perm.toString());
2217         }
2218         User caller = RpcServer.getRequestUser();
2219
2220         switch(request.getUserPermission().getPermission().getType()) {
2221           case Global :
2222           case Table :
2223             requirePermission(caller, "grant", perm.getTableName(),
2224                 perm.getFamily(), perm.getQualifier(), Action.ADMIN);
2225             break;
2226           case Namespace :
2227             requireNamespacePermission(caller, "grant", perm.getNamespace(), Action.ADMIN);
2228            break;
2229         }
2230
2231         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2232           @Override
2233           public Void run() throws Exception {
2234             AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2235             return null;
2236           }
2237         });
2238
2239         if (AUDITLOG.isTraceEnabled()) {
2240           // audit log should store permission changes in addition to auth results
2241           AUDITLOG.trace("Granted permission " + perm.toString());
2242         }
2243       } else {
2244         throw new CoprocessorException(AccessController.class, "This method "
2245             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2246       }
2247       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2248     } catch (IOException ioe) {
2249       // pass exception back up
2250       ResponseConverter.setControllerException(controller, ioe);
2251     }
2252     done.run(response);
2253   }
2254
2255   @Override
2256   public void revoke(RpcController controller,
2257                      AccessControlProtos.RevokeRequest request,
2258                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2259     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2260     AccessControlProtos.RevokeResponse response = null;
2261     try {
2262       // only allowed to be called on _acl_ region
2263       if (aclRegion) {
2264         if (!initialized) {
2265           throw new CoprocessorException("AccessController not yet initialized");
2266         }
2267         if (LOG.isDebugEnabled()) {
2268           LOG.debug("Received request to revoke access permission " + perm.toString());
2269         }
2270         User caller = RpcServer.getRequestUser();
2271
2272         switch(request.getUserPermission().getPermission().getType()) {
2273           case Global :
2274           case Table :
2275             requirePermission(caller, "revoke", perm.getTableName(), perm.getFamily(),
2276               perm.getQualifier(), Action.ADMIN);
2277             break;
2278           case Namespace :
2279             requireNamespacePermission(caller, "revoke", perm.getNamespace(), Action.ADMIN);
2280             break;
2281         }
2282
2283         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2284           @Override
2285           public Void run() throws Exception {
2286             AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2287             return null;
2288           }
2289         });
2290
2291         if (AUDITLOG.isTraceEnabled()) {
2292           // audit log should record all permission changes
2293           AUDITLOG.trace("Revoked permission " + perm.toString());
2294         }
2295       } else {
2296         throw new CoprocessorException(AccessController.class, "This method "
2297             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2298       }
2299       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2300     } catch (IOException ioe) {
2301       // pass exception back up
2302       ResponseConverter.setControllerException(controller, ioe);
2303     }
2304     done.run(response);
2305   }
2306
2307   @Override
2308   public void getUserPermissions(RpcController controller,
2309                                  AccessControlProtos.GetUserPermissionsRequest request,
2310                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2311     AccessControlProtos.GetUserPermissionsResponse response = null;
2312     try {
2313       // only allowed to be called on _acl_ region
2314       if (aclRegion) {
2315         if (!initialized) {
2316           throw new CoprocessorException("AccessController not yet initialized");
2317         }
2318         User caller = RpcServer.getRequestUser();
2319
2320         List<UserPermission> perms = null;
2321         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2322           final TableName table = request.hasTableName() ?
2323             ProtobufUtil.toTableName(request.getTableName()) : null;
2324           requirePermission(caller, "userPermissions", table, null, null, Action.ADMIN);
2325           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2326             @Override
2327             public List<UserPermission> run() throws Exception {
2328               return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2329             }
2330           });
2331         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2332           final String namespace = request.getNamespaceName().toStringUtf8();
2333           requireNamespacePermission(caller, "userPermissions", namespace, Action.ADMIN);
2334           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2335             @Override
2336             public List<UserPermission> run() throws Exception {
2337               return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2338                 namespace);
2339             }
2340           });
2341         } else {
2342           requirePermission(caller, "userPermissions", Action.ADMIN);
2343           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2344             @Override
2345             public List<UserPermission> run() throws Exception {
2346               return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2347             }
2348           });
2349           // Adding superusers explicitly to the result set as AccessControlLists do not store them.
2350           // Also using acl as table name to be inline  with the results of global admin and will
2351           // help in avoiding any leakage of information about being superusers.
2352           for (String user: Superusers.getSuperUsers()) {
2353             perms.add(new UserPermission(user.getBytes(), AccessControlLists.ACL_TABLE_NAME, null,
2354                 Action.values()));
2355           }
2356         }
2357         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2358       } else {
2359         throw new CoprocessorException(AccessController.class, "This method "
2360             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2361       }
2362     } catch (IOException ioe) {
2363       // pass exception back up
2364       ResponseConverter.setControllerException(controller, ioe);
2365     }
2366     done.run(response);
2367   }
2368
2369   @Override
2370   public void checkPermissions(RpcController controller,
2371                                AccessControlProtos.CheckPermissionsRequest request,
2372                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2373     Permission[] permissions = new Permission[request.getPermissionCount()];
2374     for (int i=0; i < request.getPermissionCount(); i++) {
2375       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2376     }
2377     AccessControlProtos.CheckPermissionsResponse response = null;
2378     try {
2379       User user = RpcServer.getRequestUser();
2380       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2381       for (Permission permission : permissions) {
2382         if (permission instanceof TablePermission) {
2383           // Check table permissions
2384
2385           TablePermission tperm = (TablePermission) permission;
2386           for (Action action : permission.getActions()) {
2387             if (!tperm.getTableName().equals(tableName)) {
2388               throw new CoprocessorException(AccessController.class, String.format("This method "
2389                   + "can only execute at the table specified in TablePermission. " +
2390                   "Table of the region:%s , requested table:%s", tableName,
2391                   tperm.getTableName()));
2392             }
2393
2394             Map<byte[], Set<byte[]>> familyMap =
2395                 new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2396             if (tperm.getFamily() != null) {
2397               if (tperm.getQualifier() != null) {
2398                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2399                 qualifiers.add(tperm.getQualifier());
2400                 familyMap.put(tperm.getFamily(), qualifiers);
2401               } else {
2402                 familyMap.put(tperm.getFamily(), null);
2403               }
2404             }
2405
2406             AuthResult result = permissionGranted("checkPermissions", user, action, regionEnv,
2407               familyMap);
2408             logResult(result);
2409             if (!result.isAllowed()) {
2410               // Even if passive we need to throw an exception here, we support checking
2411               // effective permissions, so throw unconditionally
2412               throw new AccessDeniedException("Insufficient permissions (table=" + tableName +
2413                 (familyMap.size() > 0 ? ", family: " + result.toFamilyString() : "") +
2414                 ", action=" + action.toString() + ")");
2415             }
2416           }
2417
2418         } else {
2419           // Check global permissions
2420
2421           for (Action action : permission.getActions()) {
2422             AuthResult result;
2423             if (authManager.authorize(user, action)) {
2424               result = AuthResult.allow("checkPermissions", "Global action allowed", user,
2425                 action, null, null);
2426             } else {
2427               result = AuthResult.deny("checkPermissions", "Global action denied", user, action,
2428                 null, null);
2429             }
2430             logResult(result);
2431             if (!result.isAllowed()) {
2432               // Even if passive we need to throw an exception here, we support checking
2433               // effective permissions, so throw unconditionally
2434               throw new AccessDeniedException("Insufficient permissions (action=" +
2435                 action.toString() + ")");
2436             }
2437           }
2438         }
2439       }
2440       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2441     } catch (IOException ioe) {
2442       ResponseConverter.setControllerException(controller, ioe);
2443     }
2444     done.run(response);
2445   }
2446
2447   @Override
2448   public Service getService() {
2449     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2450   }
2451
2452   private Region getRegion(RegionCoprocessorEnvironment e) {
2453     return e.getRegion();
2454   }
2455
2456   private TableName getTableName(RegionCoprocessorEnvironment e) {
2457     Region region = e.getRegion();
2458     if (region != null) {
2459       return getTableName(region);
2460     }
2461     return null;
2462   }
2463
2464   private TableName getTableName(Region region) {
2465     HRegionInfo regionInfo = region.getRegionInfo();
2466     if (regionInfo != null) {
2467       return regionInfo.getTable();
2468     }
2469     return null;
2470   }
2471
2472   @Override
2473   public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
2474       throws IOException {
2475     requirePermission(getActiveUser(c), "preClose", Action.ADMIN);
2476   }
2477
2478   private void checkSystemOrSuperUser(User activeUser) throws IOException {
2479     // No need to check if we're not going to throw
2480     if (!authorizationEnabled) {
2481       return;
2482     }
2483     if (!Superusers.isSuperUser(activeUser)) {
2484       throw new AccessDeniedException("User '" + (activeUser != null ?
2485         activeUser.getShortName() : "null") + "' is not system or super user.");
2486     }
2487   }
2488
2489   @Override
2490   public void preStopRegionServer(
2491       ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2492       throws IOException {
2493     requirePermission(getActiveUser(ctx), "preStopRegionServer", Action.ADMIN);
2494   }
2495
2496   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2497       byte[] qualifier) {
2498     if (family == null) {
2499       return null;
2500     }
2501
2502     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2503     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2504     return familyMap;
2505   }
2506
2507   @Override
2508   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2509        List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2510        String regex) throws IOException {
2511     // We are delegating the authorization check to postGetTableDescriptors as we don't have
2512     // any concrete set of table names when a regex is present or the full list is requested.
2513     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2514       // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2515       // request can be granted.
2516       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2517       for (TableName tableName: tableNamesList) {
2518         // Skip checks for a table that does not exist
2519         if (!masterServices.getTableStateManager().isTablePresent(tableName))
2520           continue;
2521         requirePermission(getActiveUser(ctx), "getTableDescriptors", tableName, null, null,
2522             Action.ADMIN, Action.CREATE);
2523       }
2524     }
2525   }
2526
2527   @Override
2528   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2529       List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2530       String regex) throws IOException {
2531     // Skipping as checks in this case are already done by preGetTableDescriptors.
2532     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2533       return;
2534     }
2535
2536     // Retains only those which passes authorization checks, as the checks weren't done as part
2537     // of preGetTableDescriptors.
2538     Iterator<HTableDescriptor> itr = descriptors.iterator();
2539     while (itr.hasNext()) {
2540       HTableDescriptor htd = itr.next();
2541       try {
2542         requirePermission(getActiveUser(ctx), "getTableDescriptors", htd.getTableName(), null, null,
2543             Action.ADMIN, Action.CREATE);
2544       } catch (AccessDeniedException e) {
2545         itr.remove();
2546       }
2547     }
2548   }
2549
2550   @Override
2551   public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2552       List<HTableDescriptor> descriptors, String regex) throws IOException {
2553     // Retains only those which passes authorization checks.
2554     Iterator<HTableDescriptor> itr = descriptors.iterator();
2555     while (itr.hasNext()) {
2556       HTableDescriptor htd = itr.next();
2557       try {
2558         requireAccess(getActiveUser(ctx), "getTableNames", htd.getTableName(), Action.values());
2559       } catch (AccessDeniedException e) {
2560         itr.remove();
2561       }
2562     }
2563   }
2564
2565   @Override
2566   public void preDispatchMerge(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2567       HRegionInfo regionA, HRegionInfo regionB) throws IOException {
2568     requirePermission(getActiveUser(ctx), "mergeRegions", regionA.getTable(), null, null,
2569       Action.ADMIN);
2570   }
2571
2572   @Override
2573   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA,
2574       Region regionB) throws IOException {
2575     requirePermission(getActiveUser(ctx), "mergeRegions", regionA.getTableDesc().getTableName(),
2576         null, null, Action.ADMIN);
2577   }
2578
2579   @Override
2580   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, Region regionA,
2581       Region regionB, Region mergedRegion) throws IOException { }
2582
2583   @Override
2584   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2585       Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException { }
2586
2587   @Override
2588   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2589       Region regionA, Region regionB, Region mergedRegion) throws IOException { }
2590
2591   @Override
2592   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2593       Region regionA, Region regionB) throws IOException { }
2594
2595   @Override
2596   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2597       Region regionA, Region regionB) throws IOException { }
2598
2599   @Override
2600   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2601       throws IOException {
2602     requirePermission(getActiveUser(ctx), "preRollLogWriterRequest", Permission.Action.ADMIN);
2603   }
2604
2605   @Override
2606   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2607       throws IOException { }
2608
2609   @Override
2610   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2611       final String userName, final Quotas quotas) throws IOException {
2612     requirePermission(getActiveUser(ctx), "setUserQuota", Action.ADMIN);
2613   }
2614
2615   @Override
2616   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2617       final String userName, final TableName tableName, final Quotas quotas) throws IOException {
2618     requirePermission(getActiveUser(ctx), "setUserTableQuota", tableName, null, null, Action.ADMIN);
2619   }
2620
2621   @Override
2622   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2623       final String userName, final String namespace, final Quotas quotas) throws IOException {
2624     requirePermission(getActiveUser(ctx), "setUserNamespaceQuota", Action.ADMIN);
2625   }
2626
2627   @Override
2628   public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2629       final TableName tableName, final Quotas quotas) throws IOException {
2630     requirePermission(getActiveUser(ctx), "setTableQuota", tableName, null, null, Action.ADMIN);
2631   }
2632
2633   @Override
2634   public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2635       final String namespace, final Quotas quotas) throws IOException {
2636     requirePermission(getActiveUser(ctx), "setNamespaceQuota", Action.ADMIN);
2637   }
2638
2639   @Override
2640   public ReplicationEndpoint postCreateReplicationEndPoint(
2641       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2642     return endpoint;
2643   }
2644
2645   @Override
2646   public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2647       List<WALEntry> entries, CellScanner cells) throws IOException {
2648     requirePermission(getActiveUser(ctx), "replicateLogEntries", Action.WRITE);
2649   }
2650
2651   @Override
2652   public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2653       List<WALEntry> entries, CellScanner cells) throws IOException {
2654   }
2655
2656   @Override
2657   public void preMoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
2658                              Set<HostAndPort> servers, String targetGroup) throws IOException {
2659     requirePermission(getActiveUser(ctx), "moveServers", Action.ADMIN);
2660   }
2661
2662   @Override
2663   public void preMoveTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
2664                             Set<TableName> tables, String targetGroup) throws IOException {
2665     requirePermission(getActiveUser(ctx), "moveTables", Action.ADMIN);
2666   }
2667
2668   @Override
2669   public void preAddRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2670                             String name) throws IOException {
2671     requirePermission(getActiveUser(ctx), "addRSGroup", Action.ADMIN);
2672   }
2673
2674   @Override
2675   public void preRemoveRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2676                                String name) throws IOException {
2677     requirePermission(getActiveUser(ctx), "removeRSGroup", Action.ADMIN);
2678   }
2679
2680   @Override
2681   public void preBalanceRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2682                                 String groupName) throws IOException {
2683     requirePermission(getActiveUser(ctx), "balanceRSGroup", Action.ADMIN);
2684   }
2685 }