View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.security.access;
20  
21  import com.google.common.net.HostAndPort;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.security.PrivilegedExceptionAction;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  import java.util.Set;
34  import java.util.TreeMap;
35  import java.util.TreeSet;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.ArrayBackedTag;
41  import org.apache.hadoop.hbase.Cell;
42  import org.apache.hadoop.hbase.CellScanner;
43  import org.apache.hadoop.hbase.CellUtil;
44  import org.apache.hadoop.hbase.CompoundConfiguration;
45  import org.apache.hadoop.hbase.CoprocessorEnvironment;
46  import org.apache.hadoop.hbase.DoNotRetryIOException;
47  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
48  import org.apache.hadoop.hbase.HColumnDescriptor;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.HTableDescriptor;
52  import org.apache.hadoop.hbase.KeyValue;
53  import org.apache.hadoop.hbase.KeyValue.Type;
54  import org.apache.hadoop.hbase.MetaTableAccessor;
55  import org.apache.hadoop.hbase.NamespaceDescriptor;
56  import org.apache.hadoop.hbase.ProcedureInfo;
57  import org.apache.hadoop.hbase.ServerName;
58  import org.apache.hadoop.hbase.TableName;
59  import org.apache.hadoop.hbase.Tag;
60  import org.apache.hadoop.hbase.TagUtil;
61  import org.apache.hadoop.hbase.classification.InterfaceAudience;
62  import org.apache.hadoop.hbase.client.Append;
63  import org.apache.hadoop.hbase.client.Delete;
64  import org.apache.hadoop.hbase.client.Durability;
65  import org.apache.hadoop.hbase.client.Get;
66  import org.apache.hadoop.hbase.client.Increment;
67  import org.apache.hadoop.hbase.client.MasterSwitchType;
68  import org.apache.hadoop.hbase.client.Mutation;
69  import org.apache.hadoop.hbase.client.Put;
70  import org.apache.hadoop.hbase.client.Query;
71  import org.apache.hadoop.hbase.client.Result;
72  import org.apache.hadoop.hbase.client.Scan;
73  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
74  import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
75  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
76  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
77  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
78  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
79  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
80  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
81  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
82  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
83  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
84  import org.apache.hadoop.hbase.filter.CompareFilter;
85  import org.apache.hadoop.hbase.filter.Filter;
86  import org.apache.hadoop.hbase.filter.FilterList;
87  import org.apache.hadoop.hbase.io.hfile.HFile;
88  import org.apache.hadoop.hbase.ipc.RpcServer;
89  import org.apache.hadoop.hbase.master.MasterServices;
90  import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
91  import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
92  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
93  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
94  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
95  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
96  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
97  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
98  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
99  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
100 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
101 import org.apache.hadoop.hbase.regionserver.InternalScanner;
102 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
103 import org.apache.hadoop.hbase.regionserver.Region;
104 import org.apache.hadoop.hbase.regionserver.RegionScanner;
105 import org.apache.hadoop.hbase.regionserver.ScanType;
106 import org.apache.hadoop.hbase.regionserver.ScannerContext;
107 import org.apache.hadoop.hbase.regionserver.Store;
108 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
109 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
110 import org.apache.hadoop.hbase.security.AccessDeniedException;
111 import org.apache.hadoop.hbase.security.Superusers;
112 import org.apache.hadoop.hbase.security.User;
113 import org.apache.hadoop.hbase.security.UserProvider;
114 import org.apache.hadoop.hbase.security.access.Permission.Action;
115 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
116 import org.apache.hadoop.hbase.util.ByteRange;
117 import org.apache.hadoop.hbase.util.Bytes;
118 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
119 import org.apache.hadoop.hbase.util.Pair;
120 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
121 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
122
123 import com.google.common.collect.ArrayListMultimap;
124 import com.google.common.collect.ImmutableSet;
125 import com.google.common.collect.ListMultimap;
126 import com.google.common.collect.Lists;
127 import com.google.common.collect.MapMaker;
128 import com.google.common.collect.Maps;
129 import com.google.common.collect.Sets;
130 import com.google.protobuf.Message;
131 import com.google.protobuf.RpcCallback;
132 import com.google.protobuf.RpcController;
133 import com.google.protobuf.Service;
134
135 /**
136  * Provides basic authorization checks for data access and administrative
137  * operations.
138  *
139  * <p>
140  * {@code AccessController} performs authorization checks for HBase operations
141  * based on:
142  * </p>
143  * <ul>
144  *   <li>the identity of the user performing the operation</li>
145  *   <li>the scope over which the operation is performed, in increasing
146  *   specificity: global, table, column family, or qualifier</li>
147  *   <li>the type of action being performed (as mapped to
148  *   {@link Permission.Action} values)</li>
149  * </ul>
150  * <p>
151  * If the authorization check fails, an {@link AccessDeniedException}
152  * will be thrown for the operation.
153  * </p>
154  *
155  * <p>
156  * To perform authorization checks, {@code AccessController} relies on the
157  * RpcServerEngine being loaded to provide
158  * the user identities for remote requests.
159  * </p>
160  *
161  * <p>
162  * The access control lists used for authorization can be manipulated via the
163  * exposed {@link AccessControlService} Interface implementation, and the associated
164  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
165  * commands.
166  * </p>
167  */
168 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
169 public class AccessController extends BaseMasterAndRegionObserver
170     implements RegionServerObserver,
171       AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
172
173   private static final Log LOG = LogFactory.getLog(AccessController.class);
174
175   private static final Log AUDITLOG =
176     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
177   private static final String CHECK_COVERING_PERM = "check_covering_perm";
178   private static final String TAG_CHECK_PASSED = "tag_check_passed";
179   private static final byte[] TRUE = Bytes.toBytes(true);
180
181   TableAuthManager authManager = null;
182
183   /** flags if we are running on a region of the _acl_ table */
184   boolean aclRegion = false;
185
186   /** defined only for Endpoint implementation, so it can have way to
187    access region services */
188   private RegionCoprocessorEnvironment regionEnv;
189
190   /** Mapping of scanner instances to the user who created them */
191   private Map<InternalScanner,String> scannerOwners =
192       new MapMaker().weakKeys().makeMap();
193
194   private Map<TableName, List<UserPermission>> tableAcls;
195
196   /** Provider for mapping principal names to Users */
197   private UserProvider userProvider;
198
199   /** if we are active, usually true, only not true if "hbase.security.authorization"
200    has been set to false in site configuration */
201   boolean authorizationEnabled;
202
203   /** if we are able to support cell ACLs */
204   boolean cellFeaturesEnabled;
205
206   /** if we should check EXEC permissions */
207   boolean shouldCheckExecPermission;
208
209   /** if we should terminate access checks early as soon as table or CF grants
210     allow access; pre-0.98 compatible behavior */
211   boolean compatibleEarlyTermination;
212
213   /** if we have been successfully initialized */
214   private volatile boolean initialized = false;
215
216   /** if the ACL table is available, only relevant in the master */
217   private volatile boolean aclTabAvailable = false;
218
219   public static boolean isAuthorizationSupported(Configuration conf) {
220     return conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
221   }
222
223   public static boolean isCellAuthorizationSupported(Configuration conf) {
224     return isAuthorizationSupported(conf) &&
225         (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
226   }
227
228   public Region getRegion() {
229     return regionEnv != null ? regionEnv.getRegion() : null;
230   }
231
232   public TableAuthManager getAuthManager() {
233     return authManager;
234   }
235
236   void initialize(RegionCoprocessorEnvironment e) throws IOException {
237     final Region region = e.getRegion();
238     Configuration conf = e.getConfiguration();
239     Map<byte[], ListMultimap<String,TablePermission>> tables =
240         AccessControlLists.loadAll(region);
241     // For each table, write out the table's permissions to the respective
242     // znode for that table.
243     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
244       tables.entrySet()) {
245       byte[] entry = t.getKey();
246       ListMultimap<String,TablePermission> perms = t.getValue();
247       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
248       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
249     }
250     initialized = true;
251   }
252
253   /**
254    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
255    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
256    * table updates.
257    */
258   void updateACL(RegionCoprocessorEnvironment e,
259       final Map<byte[], List<Cell>> familyMap) {
260     Set<byte[]> entries =
261         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
262     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
263       List<Cell> cells = f.getValue();
264       for (Cell cell: cells) {
265         if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
266             cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
267             AccessControlLists.ACL_LIST_FAMILY.length)) {
268           entries.add(CellUtil.cloneRow(cell));
269         }
270       }
271     }
272     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
273     Configuration conf = regionEnv.getConfiguration();
274     for (byte[] entry: entries) {
275       try {
276         ListMultimap<String,TablePermission> perms =
277           AccessControlLists.getPermissions(conf, entry);
278         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
279         zkw.writeToZookeeper(entry, serialized);
280       } catch (IOException ex) {
281         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
282             ex);
283       }
284     }
285   }
286
287   /**
288    * Check the current user for authorization to perform a specific action
289    * against the given set of row data.
290    *
291    * <p>Note: Ordering of the authorization checks
292    * has been carefully optimized to short-circuit the most common requests
293    * and minimize the amount of processing required.</p>
294    *
295    * @param permRequest the action being requested
296    * @param e the coprocessor environment
297    * @param families the map of column families to qualifiers present in
298    * the request
299    * @return an authorization result
300    */
301   AuthResult permissionGranted(String request, User user, Action permRequest,
302       RegionCoprocessorEnvironment e,
303       Map<byte [], ? extends Collection<?>> families) {
304     HRegionInfo hri = e.getRegion().getRegionInfo();
305     TableName tableName = hri.getTable();
306
307     // 1. All users need read access to hbase:meta table.
308     // this is a very common operation, so deal with it quickly.
309     if (hri.isMetaRegion()) {
310       if (permRequest == Action.READ) {
311         return AuthResult.allow(request, "All users allowed", user,
312           permRequest, tableName, families);
313       }
314     }
315
316     if (user == null) {
317       return AuthResult.deny(request, "No user associated with request!", null,
318         permRequest, tableName, families);
319     }
320
321     // 2. check for the table-level, if successful we can short-circuit
322     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
323       return AuthResult.allow(request, "Table permission granted", user,
324         permRequest, tableName, families);
325     }
326
327     // 3. check permissions against the requested families
328     if (families != null && families.size() > 0) {
329       // all families must pass
330       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
331         // a) check for family level access
332         if (authManager.authorize(user, tableName, family.getKey(),
333             permRequest)) {
334           continue;  // family-level permission overrides per-qualifier
335         }
336
337         // b) qualifier level access can still succeed
338         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
339           if (family.getValue() instanceof Set) {
340             // for each qualifier of the family
341             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
342             for (byte[] qualifier : familySet) {
343               if (!authManager.authorize(user, tableName, family.getKey(),
344                                          qualifier, permRequest)) {
345                 return AuthResult.deny(request, "Failed qualifier check", user,
346                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
347               }
348             }
349           } else if (family.getValue() instanceof List) { // List<KeyValue>
350             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
351             for (KeyValue kv : kvList) {
352               if (!authManager.authorize(user, tableName, family.getKey(),
353                 CellUtil.cloneQualifier(kv), permRequest)) {
354                 return AuthResult.deny(request, "Failed qualifier check", user, permRequest,
355                   tableName, makeFamilyMap(family.getKey(), CellUtil.cloneQualifier(kv)));
356               }
357             }
358           }
359         } else {
360           // no qualifiers and family-level check already failed
361           return AuthResult.deny(request, "Failed family check", user, permRequest,
362               tableName, makeFamilyMap(family.getKey(), null));
363         }
364       }
365
366       // all family checks passed
367       return AuthResult.allow(request, "All family checks passed", user, permRequest,
368           tableName, families);
369     }
370
371     // 4. no families to check and table level access failed
372     return AuthResult.deny(request, "No families to check and table permission failed",
373         user, permRequest, tableName, families);
374   }
375
376   /**
377    * Check the current user for authorization to perform a specific action
378    * against the given set of row data.
379    * @param opType the operation type
380    * @param user the user
381    * @param e the coprocessor environment
382    * @param families the map of column families to qualifiers present in
383    * the request
384    * @param actions the desired actions
385    * @return an authorization result
386    */
387   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
388       Map<byte [], ? extends Collection<?>> families, Action... actions) {
389     AuthResult result = null;
390     for (Action action: actions) {
391       result = permissionGranted(opType.toString(), user, action, e, families);
392       if (!result.isAllowed()) {
393         return result;
394       }
395     }
396     return result;
397   }
398
399   private void logResult(AuthResult result) {
400     if (AUDITLOG.isTraceEnabled()) {
401       InetAddress remoteAddr = RpcServer.getRemoteAddress();
402       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
403           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
404           "; reason: " + result.getReason() +
405           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
406           "; request: " + result.getRequest() +
407           "; context: " + result.toContextString());
408     }
409   }
410
411   /**
412    * Returns the active user to which authorization checks should be applied.
413    * If we are in the context of an RPC call, the remote user is used,
414    * otherwise the currently logged in user is used.
415    */
416   private User getActiveUser(ObserverContext ctx) throws IOException {
417     User user = ctx.getCaller();
418     if (user == null) {
419       // for non-rpc handling, fallback to system user
420       user = userProvider.getCurrent();
421     }
422     return user;
423   }
424
425   /**
426    * Authorizes that the current user has any of the given permissions for the
427    * given table, column family and column qualifier.
428    * @param tableName Table requested
429    * @param family Column family requested
430    * @param qualifier Column qualifier requested
431    * @throws IOException if obtaining the current user fails
432    * @throws AccessDeniedException if user has no authorization
433    */
434   private void requirePermission(User user, String request, TableName tableName, byte[] family,
435       byte[] qualifier, Action... permissions) throws IOException {
436     AuthResult result = null;
437
438     for (Action permission : permissions) {
439       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
440         result = AuthResult.allow(request, "Table permission granted", user,
441                                   permission, tableName, family, qualifier);
442         break;
443       } else {
444         // rest of the world
445         result = AuthResult.deny(request, "Insufficient permissions", user,
446                                  permission, tableName, family, qualifier);
447       }
448     }
449     logResult(result);
450     if (authorizationEnabled && !result.isAllowed()) {
451       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
452     }
453   }
454
455   /**
456    * Authorizes that the current user has any of the given permissions for the
457    * given table, column family and column qualifier.
458    * @param tableName Table requested
459    * @param family Column family param
460    * @param qualifier Column qualifier param
461    * @throws IOException if obtaining the current user fails
462    * @throws AccessDeniedException if user has no authorization
463    */
464   private void requireTablePermission(User user, String request, TableName tableName, byte[] family,
465       byte[] qualifier, Action... permissions) throws IOException {
466     AuthResult result = null;
467
468     for (Action permission : permissions) {
469       if (authManager.authorize(user, tableName, null, null, permission)) {
470         result = AuthResult.allow(request, "Table permission granted", user,
471             permission, tableName, null, null);
472         result.getParams().setFamily(family).setQualifier(qualifier);
473         break;
474       } else {
475         // rest of the world
476         result = AuthResult.deny(request, "Insufficient permissions", user,
477             permission, tableName, family, qualifier);
478         result.getParams().setFamily(family).setQualifier(qualifier);
479       }
480     }
481     logResult(result);
482     if (authorizationEnabled && !result.isAllowed()) {
483       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
484     }
485   }
486
487   /**
488    * Authorizes that the current user has any of the given permissions to access the table.
489    *
490    * @param tableName Table requested
491    * @param permissions Actions being requested
492    * @throws IOException if obtaining the current user fails
493    * @throws AccessDeniedException if user has no authorization
494    */
495   private void requireAccess(User user, String request, TableName tableName,
496       Action... permissions) throws IOException {
497     AuthResult result = null;
498
499     for (Action permission : permissions) {
500       if (authManager.hasAccess(user, tableName, permission)) {
501         result = AuthResult.allow(request, "Table permission granted", user,
502                                   permission, tableName, null, null);
503         break;
504       } else {
505         // rest of the world
506         result = AuthResult.deny(request, "Insufficient permissions", user,
507                                  permission, tableName, null, null);
508       }
509     }
510     logResult(result);
511     if (authorizationEnabled && !result.isAllowed()) {
512       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
513     }
514   }
515
516   /**
517    * Authorizes that the current user has global privileges for the given action.
518    * @param perm The action being requested
519    * @throws IOException if obtaining the current user fails
520    * @throws AccessDeniedException if authorization is denied
521    */
522   private void requirePermission(User user, String request, Action perm) throws IOException {
523     requireGlobalPermission(user, request, perm, null, null);
524   }
525
526   /**
527    * Checks that the user has the given global permission. The generated
528    * audit log message will contain context information for the operation
529    * being authorized, based on the given parameters.
530    * @param perm Action being requested
531    * @param tableName Affected table name.
532    * @param familyMap Affected column families.
533    */
534   private void requireGlobalPermission(User user, String request, Action perm, TableName tableName,
535       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
536     AuthResult result = null;
537     if (authManager.authorize(user, perm)) {
538       result = AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap);
539       result.getParams().setTableName(tableName).setFamilies(familyMap);
540       logResult(result);
541     } else {
542       result = AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap);
543       result.getParams().setTableName(tableName).setFamilies(familyMap);
544       logResult(result);
545       if (authorizationEnabled) {
546         throw new AccessDeniedException("Insufficient permissions for user '" +
547           (user != null ? user.getShortName() : "null") +"' (global, action=" +
548           perm.toString() + ")");
549       }
550     }
551   }
552
553   /**
554    * Checks that the user has the given global permission. The generated
555    * audit log message will contain context information for the operation
556    * being authorized, based on the given parameters.
557    * @param perm Action being requested
558    * @param namespace
559    */
560   private void requireGlobalPermission(User user, String request, Action perm,
561                                        String namespace) throws IOException {
562     AuthResult authResult = null;
563     if (authManager.authorize(user, perm)) {
564       authResult = AuthResult.allow(request, "Global check allowed", user, perm, null);
565       authResult.getParams().setNamespace(namespace);
566       logResult(authResult);
567     } else {
568       authResult = AuthResult.deny(request, "Global check failed", user, perm, null);
569       authResult.getParams().setNamespace(namespace);
570       logResult(authResult);
571       if (authorizationEnabled) {
572         throw new AccessDeniedException("Insufficient permissions for user '" +
573           (user != null ? user.getShortName() : "null") +"' (global, action=" +
574           perm.toString() + ")");
575       }
576     }
577   }
578
579   /**
580    * Checks that the user has the given global or namespace permission.
581    * @param namespace
582    * @param permissions Actions being requested
583    */
584   public void requireNamespacePermission(User user, String request, String namespace,
585       Action... permissions) throws IOException {
586     AuthResult result = null;
587
588     for (Action permission : permissions) {
589       if (authManager.authorize(user, namespace, permission)) {
590         result = AuthResult.allow(request, "Namespace permission granted",
591             user, permission, namespace);
592         break;
593       } else {
594         // rest of the world
595         result = AuthResult.deny(request, "Insufficient permissions", user,
596             permission, namespace);
597       }
598     }
599     logResult(result);
600     if (authorizationEnabled && !result.isAllowed()) {
601       throw new AccessDeniedException("Insufficient permissions "
602           + result.toContextString());
603     }
604   }
605
606   /**
607    * Checks that the user has the given global or namespace permission.
608    * @param namespace
609    * @param permissions Actions being requested
610    */
611   public void requireNamespacePermission(User user, String request, String namespace,
612       TableName tableName, Map<byte[], ? extends Collection<byte[]>> familyMap,
613       Action... permissions)
614       throws IOException {
615     AuthResult result = null;
616
617     for (Action permission : permissions) {
618       if (authManager.authorize(user, namespace, permission)) {
619         result = AuthResult.allow(request, "Namespace permission granted",
620             user, permission, namespace);
621         result.getParams().setTableName(tableName).setFamilies(familyMap);
622         break;
623       } else {
624         // rest of the world
625         result = AuthResult.deny(request, "Insufficient permissions", user,
626             permission, namespace);
627         result.getParams().setTableName(tableName).setFamilies(familyMap);
628       }
629     }
630     logResult(result);
631     if (authorizationEnabled && !result.isAllowed()) {
632       throw new AccessDeniedException("Insufficient permissions "
633           + result.toContextString());
634     }
635   }
636
637   /**
638    * Returns <code>true</code> if the current user is allowed the given action
639    * over at least one of the column qualifiers in the given column families.
640    */
641   private boolean hasFamilyQualifierPermission(User user,
642       Action perm,
643       RegionCoprocessorEnvironment env,
644       Map<byte[], ? extends Collection<byte[]>> familyMap)
645     throws IOException {
646     HRegionInfo hri = env.getRegion().getRegionInfo();
647     TableName tableName = hri.getTable();
648
649     if (user == null) {
650       return false;
651     }
652
653     if (familyMap != null && familyMap.size() > 0) {
654       // at least one family must be allowed
655       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
656           familyMap.entrySet()) {
657         if (family.getValue() != null && !family.getValue().isEmpty()) {
658           for (byte[] qualifier : family.getValue()) {
659             if (authManager.matchPermission(user, tableName,
660                 family.getKey(), qualifier, perm)) {
661               return true;
662             }
663           }
664         } else {
665           if (authManager.matchPermission(user, tableName, family.getKey(),
666               perm)) {
667             return true;
668           }
669         }
670       }
671     } else if (LOG.isDebugEnabled()) {
672       LOG.debug("Empty family map passed for permission check");
673     }
674
675     return false;
676   }
677
678   private enum OpType {
679     GET("get"),
680     EXISTS("exists"),
681     SCAN("scan"),
682     PUT("put"),
683     DELETE("delete"),
684     CHECK_AND_PUT("checkAndPut"),
685     CHECK_AND_DELETE("checkAndDelete"),
686     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
687     APPEND("append"),
688     INCREMENT("increment");
689
690     private String type;
691
692     private OpType(String type) {
693       this.type = type;
694     }
695
696     @Override
697     public String toString() {
698       return type;
699     }
700   }
701
702   /**
703    * Determine if cell ACLs covered by the operation grant access. This is expensive.
704    * @return false if cell ACLs failed to grant access, true otherwise
705    * @throws IOException
706    */
707   private boolean checkCoveringPermission(User user, OpType request, RegionCoprocessorEnvironment e,
708       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
709       throws IOException {
710     if (!cellFeaturesEnabled) {
711       return false;
712     }
713     long cellGrants = 0;
714     long latestCellTs = 0;
715     Get get = new Get(row);
716     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
717     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
718     // version. We have to get every cell version and check its TS against the TS asked for in
719     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
720     // consider only one such passing cell. In case of Delete we have to consider all the cell
721     // versions under this passing version. When Delete Mutation contains columns which are a
722     // version delete just consider only one version for those column cells.
723     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
724     if (considerCellTs) {
725       get.setMaxVersions();
726     } else {
727       get.setMaxVersions(1);
728     }
729     boolean diffCellTsFromOpTs = false;
730     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
731       byte[] col = entry.getKey();
732       // TODO: HBASE-7114 could possibly unify the collection type in family
733       // maps so we would not need to do this
734       if (entry.getValue() instanceof Set) {
735         Set<byte[]> set = (Set<byte[]>)entry.getValue();
736         if (set == null || set.isEmpty()) {
737           get.addFamily(col);
738         } else {
739           for (byte[] qual: set) {
740             get.addColumn(col, qual);
741           }
742         }
743       } else if (entry.getValue() instanceof List) {
744         List<Cell> list = (List<Cell>)entry.getValue();
745         if (list == null || list.isEmpty()) {
746           get.addFamily(col);
747         } else {
748           // In case of family delete, a Cell will be added into the list with Qualifier as null.
749           for (Cell cell : list) {
750             if (cell.getQualifierLength() == 0
751                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
752                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
753               get.addFamily(col);
754             } else {
755               get.addColumn(col, CellUtil.cloneQualifier(cell));
756             }
757             if (considerCellTs) {
758               long cellTs = cell.getTimestamp();
759               latestCellTs = Math.max(latestCellTs, cellTs);
760               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
761             }
762           }
763         }
764       } else if (entry.getValue() == null) {
765         get.addFamily(col);
766       } else {
767         throw new RuntimeException("Unhandled collection type " +
768           entry.getValue().getClass().getName());
769       }
770     }
771     // We want to avoid looking into the future. So, if the cells of the
772     // operation specify a timestamp, or the operation itself specifies a
773     // timestamp, then we use the maximum ts found. Otherwise, we bound
774     // the Get to the current server time. We add 1 to the timerange since
775     // the upper bound of a timerange is exclusive yet we need to examine
776     // any cells found there inclusively.
777     long latestTs = Math.max(opTs, latestCellTs);
778     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
779       latestTs = EnvironmentEdgeManager.currentTime();
780     }
781     get.setTimeRange(0, latestTs + 1);
782     // In case of Put operation we set to read all versions. This was done to consider the case
783     // where columns are added with TS other than the Mutation TS. But normally this wont be the
784     // case with Put. There no need to get all versions but get latest version only.
785     if (!diffCellTsFromOpTs && request == OpType.PUT) {
786       get.setMaxVersions(1);
787     }
788     if (LOG.isTraceEnabled()) {
789       LOG.trace("Scanning for cells with " + get);
790     }
791     // This Map is identical to familyMap. The key is a BR rather than byte[].
792     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
793     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
794     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
795     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
796       if (entry.getValue() instanceof List) {
797         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
798       }
799     }
800     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
801     List<Cell> cells = Lists.newArrayList();
802     Cell prevCell = null;
803     ByteRange curFam = new SimpleMutableByteRange();
804     boolean curColAllVersions = (request == OpType.DELETE);
805     long curColCheckTs = opTs;
806     boolean foundColumn = false;
807     try {
808       boolean more = false;
809       ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
810
811       do {
812         cells.clear();
813         // scan with limit as 1 to hold down memory use on wide rows
814         more = scanner.next(cells, scannerContext);
815         for (Cell cell: cells) {
816           if (LOG.isTraceEnabled()) {
817             LOG.trace("Found cell " + cell);
818           }
819           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
820           if (colChange) foundColumn = false;
821           prevCell = cell;
822           if (!curColAllVersions && foundColumn) {
823             continue;
824           }
825           if (colChange && considerCellTs) {
826             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
827             List<Cell> cols = familyMap1.get(curFam);
828             for (Cell col : cols) {
829               // null/empty qualifier is used to denote a Family delete. The TS and delete type
830               // associated with this is applicable for all columns within the family. That is
831               // why the below (col.getQualifierLength() == 0) check.
832               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
833                   || CellUtil.matchingQualifier(cell, col)) {
834                 byte type = col.getTypeByte();
835                 if (considerCellTs) {
836                   curColCheckTs = col.getTimestamp();
837                 }
838                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
839                 // a version delete for a column no need to check all the covering cells within
840                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
841                 // One version delete types are Delete/DeleteFamilyVersion
842                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
843                     || (KeyValue.Type.DeleteFamily.getCode() == type);
844                 break;
845               }
846             }
847           }
848           if (cell.getTimestamp() > curColCheckTs) {
849             // Just ignore this cell. This is not a covering cell.
850             continue;
851           }
852           foundColumn = true;
853           for (Action action: actions) {
854             // Are there permissions for this user for the cell?
855             if (!authManager.authorize(user, getTableName(e), cell, action)) {
856               // We can stop if the cell ACL denies access
857               return false;
858             }
859           }
860           cellGrants++;
861         }
862       } while (more);
863     } catch (AccessDeniedException ex) {
864       throw ex;
865     } catch (IOException ex) {
866       LOG.error("Exception while getting cells to calculate covering permission", ex);
867     } finally {
868       scanner.close();
869     }
870     // We should not authorize unless we have found one or more cell ACLs that
871     // grant access. This code is used to check for additional permissions
872     // after no table or CF grants are found.
873     return cellGrants > 0;
874   }
875
876   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
877     // Iterate over the entries in the familyMap, replacing the cells therein
878     // with new cells including the ACL data
879     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
880       List<Cell> newCells = Lists.newArrayList();
881       for (Cell cell: e.getValue()) {
882         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
883         List<Tag> tags = new ArrayList<Tag>();
884         tags.add(new ArrayBackedTag(AccessControlLists.ACL_TAG_TYPE, perms));
885         Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell);
886         while (tagIterator.hasNext()) {
887           tags.add(tagIterator.next());
888         }
889         newCells.add(CellUtil.createCell(cell, tags));
890       }
891       // This is supposed to be safe, won't CME
892       e.setValue(newCells);
893     }
894   }
895
896   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
897   // type is reserved and should not be explicitly set by user.
898   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
899     // No need to check if we're not going to throw
900     if (!authorizationEnabled) {
901       m.setAttribute(TAG_CHECK_PASSED, TRUE);
902       return;
903     }
904     // Superusers are allowed to store cells unconditionally.
905     if (Superusers.isSuperUser(user)) {
906       m.setAttribute(TAG_CHECK_PASSED, TRUE);
907       return;
908     }
909     // We already checked (prePut vs preBatchMutation)
910     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
911       return;
912     }
913     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
914       Iterator<Tag> tagsItr = CellUtil.tagsIterator(cellScanner.current());
915       while (tagsItr.hasNext()) {
916         if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
917           throw new AccessDeniedException("Mutation contains cell with reserved type tag");
918         }
919       }
920     }
921     m.setAttribute(TAG_CHECK_PASSED, TRUE);
922   }
923
924   /* ---- MasterObserver implementation ---- */
925   @Override
926   public void start(CoprocessorEnvironment env) throws IOException {
927     CompoundConfiguration conf = new CompoundConfiguration();
928     conf.add(env.getConfiguration());
929
930     authorizationEnabled = isAuthorizationSupported(conf);
931     if (!authorizationEnabled) {
932       LOG.warn("The AccessController has been loaded with authorization checks disabled.");
933     }
934
935     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
936       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
937
938     cellFeaturesEnabled = (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS);
939     if (!cellFeaturesEnabled) {
940       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
941           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
942           + " accordingly.");
943     }
944
945     ZooKeeperWatcher zk = null;
946     if (env instanceof MasterCoprocessorEnvironment) {
947       // if running on HMaster
948       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
949       zk = mEnv.getMasterServices().getZooKeeper();
950     } else if (env instanceof RegionServerCoprocessorEnvironment) {
951       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
952       zk = rsEnv.getRegionServerServices().getZooKeeper();
953     } else if (env instanceof RegionCoprocessorEnvironment) {
954       // if running at region
955       regionEnv = (RegionCoprocessorEnvironment) env;
956       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
957       zk = regionEnv.getRegionServerServices().getZooKeeper();
958       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
959         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
960     }
961
962     // set the user-provider.
963     this.userProvider = UserProvider.instantiate(env.getConfiguration());
964
965     // If zk is null or IOException while obtaining auth manager,
966     // throw RuntimeException so that the coprocessor is unloaded.
967     if (zk != null) {
968       try {
969         this.authManager = TableAuthManager.getOrCreate(zk, env.getConfiguration());
970       } catch (IOException ioe) {
971         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
972       }
973     } else {
974       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
975     }
976
977     tableAcls = new MapMaker().weakValues().makeMap();
978   }
979
980   @Override
981   public void stop(CoprocessorEnvironment env) {
982     if (this.authManager != null) {
983       TableAuthManager.release(authManager);
984     }
985   }
986
987   @Override
988   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
989       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
990     Set<byte[]> families = desc.getFamiliesKeys();
991     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
992     for (byte[] family: families) {
993       familyMap.put(family, null);
994     }
995     requireNamespacePermission(getActiveUser(c), "createTable",
996         desc.getTableName().getNamespaceAsString(), desc.getTableName(), familyMap, Action.CREATE);
997   }
998
999   @Override
1000   public void postCompletedCreateTableAction(
1001       final ObserverContext<MasterCoprocessorEnvironment> c,
1002       final HTableDescriptor desc,
1003       final HRegionInfo[] regions) throws IOException {
1004     // When AC is used, it should be configured as the 1st CP.
1005     // In Master, the table operations like create, are handled by a Thread pool but the max size
1006     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
1007     // sequentially only.
1008     // Related code in HMaster#startServiceThreads
1009     // {code}
1010     //   // We depend on there being only one instance of this executor running
1011     //   // at a time. To do concurrency, would need fencing of enable/disable of
1012     //   // tables.
1013     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1014     // {code}
1015     // In future if we change this pool to have more threads, then there is a chance for thread,
1016     // creating acl table, getting delayed and by that time another table creation got over and
1017     // this hook is getting called. In such a case, we will need a wait logic here which will
1018     // wait till the acl table is created.
1019     if (AccessControlLists.isAclTable(desc)) {
1020       this.aclTabAvailable = true;
1021     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
1022       if (!aclTabAvailable) {
1023         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
1024             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
1025             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
1026       } else {
1027         String owner = desc.getOwnerString();
1028         // default the table owner to current user, if not specified.
1029         if (owner == null)
1030           owner = getActiveUser(c).getShortName();
1031         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1032             desc.getTableName(), null, Action.values());
1033         // switch to the real hbase master user for doing the RPC on the ACL table
1034         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1035           @Override
1036           public Void run() throws Exception {
1037             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
1038                 userperm);
1039             return null;
1040           }
1041         });
1042       }
1043     }
1044   }
1045
1046   @Override
1047   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1048       throws IOException {
1049     requirePermission(getActiveUser(c), "deleteTable", tableName, null, null,
1050         Action.ADMIN, Action.CREATE);
1051   }
1052
1053   @Override
1054   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
1055       final TableName tableName) throws IOException {
1056     final Configuration conf = c.getEnvironment().getConfiguration();
1057     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1058       @Override
1059       public Void run() throws Exception {
1060         AccessControlLists.removeTablePermissions(conf, tableName);
1061         return null;
1062       }
1063     });
1064     this.authManager.getZKPermissionWatcher().deleteTableACLNode(tableName);
1065   }
1066
1067   @Override
1068   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
1069       final TableName tableName) throws IOException {
1070     requirePermission(getActiveUser(c), "truncateTable", tableName, null, null,
1071         Action.ADMIN, Action.CREATE);
1072
1073     final Configuration conf = c.getEnvironment().getConfiguration();
1074     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1075       @Override
1076       public Void run() throws Exception {
1077         List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
1078         if (acls != null) {
1079           tableAcls.put(tableName, acls);
1080         }
1081         return null;
1082       }
1083     });
1084   }
1085
1086   @Override
1087   public void postTruncateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
1088       final TableName tableName) throws IOException {
1089     final Configuration conf = ctx.getEnvironment().getConfiguration();
1090     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1091       @Override
1092       public Void run() throws Exception {
1093         List<UserPermission> perms = tableAcls.get(tableName);
1094         if (perms != null) {
1095           for (UserPermission perm : perms) {
1096             AccessControlLists.addUserPermission(conf, perm);
1097           }
1098         }
1099         tableAcls.remove(tableName);
1100         return null;
1101       }
1102     });
1103   }
1104
1105   @Override
1106   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1107       HTableDescriptor htd) throws IOException {
1108     requirePermission(getActiveUser(c), "modifyTable", tableName, null, null,
1109         Action.ADMIN, Action.CREATE);
1110   }
1111
1112   @Override
1113   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
1114       TableName tableName, final HTableDescriptor htd) throws IOException {
1115     final Configuration conf = c.getEnvironment().getConfiguration();
1116     // default the table owner to current user, if not specified.
1117     final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
1118       getActiveUser(c).getShortName();
1119     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1120       @Override
1121       public Void run() throws Exception {
1122         UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1123             htd.getTableName(), null, Action.values());
1124         AccessControlLists.addUserPermission(conf, userperm);
1125         return null;
1126       }
1127     });
1128   }
1129
1130   @Override
1131   public void preAddColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1132                                  TableName tableName, HColumnDescriptor columnFamily)
1133       throws IOException {
1134     requireTablePermission(getActiveUser(ctx), "addColumn", tableName, columnFamily.getName(), null,
1135         Action.ADMIN, Action.CREATE);
1136   }
1137
1138   @Override
1139   public void preModifyColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1140                                     TableName tableName, HColumnDescriptor columnFamily)
1141       throws IOException {
1142     requirePermission(getActiveUser(ctx), "modifyColumn", tableName, columnFamily.getName(), null,
1143         Action.ADMIN, Action.CREATE);
1144   }
1145
1146   @Override
1147   public void preDeleteColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1148                                     TableName tableName, byte[] columnFamily) throws IOException {
1149     requirePermission(getActiveUser(ctx), "deleteColumn", tableName, columnFamily, null,
1150         Action.ADMIN, Action.CREATE);
1151   }
1152
1153   @Override
1154   public void postDeleteColumnFamily(ObserverContext<MasterCoprocessorEnvironment> ctx,
1155       final TableName tableName, final byte[] columnFamily) throws IOException {
1156     final Configuration conf = ctx.getEnvironment().getConfiguration();
1157     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1158       @Override
1159       public Void run() throws Exception {
1160         AccessControlLists.removeTablePermissions(conf, tableName, columnFamily);
1161         return null;
1162       }
1163     });
1164   }
1165
1166   @Override
1167   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1168       throws IOException {
1169     requirePermission(getActiveUser(c), "enableTable", tableName, null, null,
1170         Action.ADMIN, Action.CREATE);
1171   }
1172
1173   @Override
1174   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1175       throws IOException {
1176     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1177       // We have to unconditionally disallow disable of the ACL table when we are installed,
1178       // even if not enforcing authorizations. We are still allowing grants and revocations,
1179       // checking permissions and logging audit messages, etc. If the ACL table is not
1180       // available we will fail random actions all over the place.
1181       throw new AccessDeniedException("Not allowed to disable "
1182           + AccessControlLists.ACL_TABLE_NAME + " table with AccessController installed");
1183     }
1184     requirePermission(getActiveUser(c), "disableTable", tableName, null, null,
1185         Action.ADMIN, Action.CREATE);
1186   }
1187
1188   @Override
1189   public void preAbortProcedure(
1190       ObserverContext<MasterCoprocessorEnvironment> ctx,
1191       final ProcedureExecutor<MasterProcedureEnv> procEnv,
1192       final long procId) throws IOException {
1193     if (!procEnv.isProcedureOwner(procId, getActiveUser(ctx))) {
1194       // If the user is not the procedure owner, then we should further probe whether
1195       // he can abort the procedure.
1196       requirePermission(getActiveUser(ctx), "abortProcedure", Action.ADMIN);
1197     }
1198   }
1199
1200   @Override
1201   public void postAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx)
1202       throws IOException {
1203     // There is nothing to do at this time after the procedure abort request was sent.
1204   }
1205
1206   @Override
1207   public void preListProcedures(ObserverContext<MasterCoprocessorEnvironment> ctx)
1208       throws IOException {
1209     // We are delegating the authorization check to postListProcedures as we don't have
1210     // any concrete set of procedures to work with
1211   }
1212
1213   @Override
1214   public void postListProcedures(
1215       ObserverContext<MasterCoprocessorEnvironment> ctx,
1216       List<ProcedureInfo> procInfoList) throws IOException {
1217     if (procInfoList.isEmpty()) {
1218       return;
1219     }
1220
1221     // Retains only those which passes authorization checks, as the checks weren't done as part
1222     // of preListProcedures.
1223     Iterator<ProcedureInfo> itr = procInfoList.iterator();
1224     User user = getActiveUser(ctx);
1225     while (itr.hasNext()) {
1226       ProcedureInfo procInfo = itr.next();
1227       try {
1228         if (!ProcedureInfo.isProcedureOwner(procInfo, user)) {
1229           // If the user is not the procedure owner, then we should further probe whether
1230           // he can see the procedure.
1231           requirePermission(user, "listProcedures", Action.ADMIN);
1232         }
1233       } catch (AccessDeniedException e) {
1234         itr.remove();
1235       }
1236     }
1237   }
1238
1239   @Override
1240   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1241       ServerName srcServer, ServerName destServer) throws IOException {
1242     requirePermission(getActiveUser(c), "move", region.getTable(), null, null, Action.ADMIN);
1243   }
1244
1245   @Override
1246   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1247       throws IOException {
1248     requirePermission(getActiveUser(c), "assign", regionInfo.getTable(), null, null, Action.ADMIN);
1249   }
1250
1251   @Override
1252   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1253       boolean force) throws IOException {
1254     requirePermission(getActiveUser(c), "unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1255   }
1256
1257   @Override
1258   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1259       HRegionInfo regionInfo) throws IOException {
1260     requirePermission(getActiveUser(c), "regionOffline", regionInfo.getTable(), null, null,
1261         Action.ADMIN);
1262   }
1263
1264   @Override
1265   public boolean preSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1266       final boolean newValue, final MasterSwitchType switchType) throws IOException {
1267     requirePermission(getActiveUser(ctx), "setSplitOrMergeEnabled", Action.ADMIN);
1268     return false;
1269   }
1270
1271   @Override
1272   public void postSetSplitOrMergeEnabled(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1273       final boolean newValue, final MasterSwitchType switchType) throws IOException {
1274   }
1275
1276   @Override
1277   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1278       throws IOException {
1279     requirePermission(getActiveUser(c), "balance", Action.ADMIN);
1280   }
1281
1282   @Override
1283   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1284       boolean newValue) throws IOException {
1285     requirePermission(getActiveUser(c), "balanceSwitch", Action.ADMIN);
1286     return newValue;
1287   }
1288
1289   @Override
1290   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1291       throws IOException {
1292     requirePermission(getActiveUser(c), "shutdown", Action.ADMIN);
1293   }
1294
1295   @Override
1296   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1297       throws IOException {
1298     requirePermission(getActiveUser(c), "stopMaster", Action.ADMIN);
1299   }
1300
1301   @Override
1302   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1303       throws IOException {
1304     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1305       .getConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1306       // initialize the ACL storage table
1307       AccessControlLists.createACLTable(ctx.getEnvironment().getMasterServices());
1308     } else {
1309       aclTabAvailable = true;
1310     }
1311   }
1312
1313   @Override
1314   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1315       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1316       throws IOException {
1317     requirePermission(getActiveUser(ctx), "snapshot " + snapshot.getName(), hTableDescriptor.getTableName(), null, null,
1318       Permission.Action.ADMIN);
1319   }
1320
1321   @Override
1322   public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1323       final SnapshotDescription snapshot) throws IOException {
1324     User user = getActiveUser(ctx);
1325     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1326       // list it, if user is the owner of snapshot
1327       AuthResult result = AuthResult.allow("listSnapshot " + snapshot.getName(),
1328           "Snapshot owner check allowed", user, null, null, null);
1329       logResult(result);
1330     } else {
1331       requirePermission(user, "listSnapshot " + snapshot.getName(), Action.ADMIN);
1332     }
1333   }
1334
1335   @Override
1336   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1337       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1338       throws IOException {
1339     requirePermission(getActiveUser(ctx), "cloneSnapshot " + snapshot.getName(), Action.ADMIN);
1340   }
1341
1342   @Override
1343   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1344       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1345       throws IOException {
1346     User user = getActiveUser(ctx);
1347     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1348       requirePermission(user, "restoreSnapshot " + snapshot.getName(), hTableDescriptor.getTableName(), null, null,
1349         Permission.Action.ADMIN);
1350     } else {
1351       requirePermission(user, "restoreSnapshot " + snapshot.getName(), Action.ADMIN);
1352     }
1353   }
1354
1355   @Override
1356   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1357       final SnapshotDescription snapshot) throws IOException {
1358     User user = getActiveUser(ctx);
1359     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
1360       // Snapshot owner is allowed to delete the snapshot
1361       AuthResult result = AuthResult.allow("deleteSnapshot " + snapshot.getName(),
1362           "Snapshot owner check allowed", user, null, null, null);
1363       logResult(result);
1364     } else {
1365       requirePermission(user, "deleteSnapshot " + snapshot.getName(), Action.ADMIN);
1366     }
1367   }
1368
1369   @Override
1370   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1371       NamespaceDescriptor ns) throws IOException {
1372     requireGlobalPermission(getActiveUser(ctx), "createNamespace", Action.ADMIN, ns.getName());
1373   }
1374
1375   @Override
1376   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1377       throws IOException {
1378     requireGlobalPermission(getActiveUser(ctx), "deleteNamespace", Action.ADMIN, namespace);
1379   }
1380
1381   @Override
1382   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1383       final String namespace) throws IOException {
1384     final Configuration conf = ctx.getEnvironment().getConfiguration();
1385     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1386       @Override
1387       public Void run() throws Exception {
1388         AccessControlLists.removeNamespacePermissions(conf, namespace);
1389         return null;
1390       }
1391     });
1392     this.authManager.getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
1393     LOG.info(namespace + " entry deleted in " + AccessControlLists.ACL_TABLE_NAME + " table.");
1394   }
1395
1396   @Override
1397   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1398       NamespaceDescriptor ns) throws IOException {
1399     // We require only global permission so that
1400     // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1401     requireGlobalPermission(getActiveUser(ctx), "modifyNamespace", Action.ADMIN, ns.getName());
1402   }
1403
1404   @Override
1405   public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1406       throws IOException {
1407     requireNamespacePermission(getActiveUser(ctx), "getNamespaceDescriptor", namespace, Action.ADMIN);
1408   }
1409
1410   @Override
1411   public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1412       List<NamespaceDescriptor> descriptors) throws IOException {
1413     // Retains only those which passes authorization checks, as the checks weren't done as part
1414     // of preGetTableDescriptors.
1415     Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1416     User user = getActiveUser(ctx);
1417     while (itr.hasNext()) {
1418       NamespaceDescriptor desc = itr.next();
1419       try {
1420         requireNamespacePermission(user, "listNamespaces", desc.getName(), Action.ADMIN);
1421       } catch (AccessDeniedException e) {
1422         itr.remove();
1423       }
1424     }
1425   }
1426
1427   @Override
1428   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1429       final TableName tableName) throws IOException {
1430     requirePermission(getActiveUser(ctx), "flushTable", tableName, null, null,
1431         Action.ADMIN, Action.CREATE);
1432   }
1433
1434   /* ---- RegionObserver implementation ---- */
1435
1436   @Override
1437   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c)
1438       throws IOException {
1439     RegionCoprocessorEnvironment env = c.getEnvironment();
1440     final Region region = env.getRegion();
1441     if (region == null) {
1442       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1443     } else {
1444       HRegionInfo regionInfo = region.getRegionInfo();
1445       if (regionInfo.getTable().isSystemTable()) {
1446         checkSystemOrSuperUser(getActiveUser(c));
1447       } else {
1448         requirePermission(getActiveUser(c), "preOpen", Action.ADMIN);
1449       }
1450     }
1451   }
1452
1453   @Override
1454   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1455     RegionCoprocessorEnvironment env = c.getEnvironment();
1456     final Region region = env.getRegion();
1457     if (region == null) {
1458       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1459       return;
1460     }
1461     if (AccessControlLists.isAclRegion(region)) {
1462       aclRegion = true;
1463       // When this region is under recovering state, initialize will be handled by postLogReplay
1464       if (!region.isRecovering()) {
1465         try {
1466           initialize(env);
1467         } catch (IOException ex) {
1468           // if we can't obtain permissions, it's better to fail
1469           // than perform checks incorrectly
1470           throw new RuntimeException("Failed to initialize permissions cache", ex);
1471         }
1472       }
1473     } else {
1474       initialized = true;
1475     }
1476   }
1477
1478   @Override
1479   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1480     if (aclRegion) {
1481       try {
1482         initialize(c.getEnvironment());
1483       } catch (IOException ex) {
1484         // if we can't obtain permissions, it's better to fail
1485         // than perform checks incorrectly
1486         throw new RuntimeException("Failed to initialize permissions cache", ex);
1487       }
1488     }
1489   }
1490
1491   @Override
1492   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
1493     requirePermission(getActiveUser(c), "flush", getTableName(c.getEnvironment()), null, null,
1494         Action.ADMIN, Action.CREATE);
1495   }
1496
1497   @Override
1498   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
1499     requirePermission(getActiveUser(c), "split", getTableName(c.getEnvironment()), null, null,
1500         Action.ADMIN);
1501   }
1502
1503   @Override
1504   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c,
1505       byte[] splitRow) throws IOException {
1506     requirePermission(getActiveUser(c), "split", getTableName(c.getEnvironment()), null, null,
1507         Action.ADMIN);
1508   }
1509
1510   @Override
1511   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c,
1512       final Store store, final InternalScanner scanner, final ScanType scanType)
1513           throws IOException {
1514     requirePermission(getActiveUser(c), "compact", getTableName(c.getEnvironment()), null, null,
1515         Action.ADMIN, Action.CREATE);
1516     return scanner;
1517   }
1518
1519   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1520       final Query query, OpType opType) throws IOException {
1521     Filter filter = query.getFilter();
1522     // Don't wrap an AccessControlFilter
1523     if (filter != null && filter instanceof AccessControlFilter) {
1524       return;
1525     }
1526     User user = getActiveUser(c);
1527     RegionCoprocessorEnvironment env = c.getEnvironment();
1528     Map<byte[],? extends Collection<byte[]>> families = null;
1529     switch (opType) {
1530     case GET:
1531     case EXISTS:
1532       families = ((Get)query).getFamilyMap();
1533       break;
1534     case SCAN:
1535       families = ((Scan)query).getFamilyMap();
1536       break;
1537     default:
1538       throw new RuntimeException("Unhandled operation " + opType);
1539     }
1540     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1541     Region region = getRegion(env);
1542     TableName table = getTableName(region);
1543     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1544     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1545       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1546     }
1547     if (!authResult.isAllowed()) {
1548       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1549         // Old behavior: Scan with only qualifier checks if we have partial
1550         // permission. Backwards compatible behavior is to throw an
1551         // AccessDeniedException immediately if there are no grants for table
1552         // or CF or CF+qual. Only proceed with an injected filter if there are
1553         // grants for qualifiers. Otherwise we will fall through below and log
1554         // the result and throw an ADE. We may end up checking qualifier
1555         // grants three times (permissionGranted above, here, and in the
1556         // filter) but that's the price of backwards compatibility.
1557         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1558           authResult.setAllowed(true);
1559           authResult.setReason("Access allowed with filter");
1560           // Only wrap the filter if we are enforcing authorizations
1561           if (authorizationEnabled) {
1562             Filter ourFilter = new AccessControlFilter(authManager, user, table,
1563               AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1564               cfVsMaxVersions);
1565             // wrap any existing filter
1566             if (filter != null) {
1567               ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1568                 Lists.newArrayList(ourFilter, filter));
1569             }
1570             switch (opType) {
1571               case GET:
1572               case EXISTS:
1573                 ((Get)query).setFilter(ourFilter);
1574                 break;
1575               case SCAN:
1576                 ((Scan)query).setFilter(ourFilter);
1577                 break;
1578               default:
1579                 throw new RuntimeException("Unhandled operation " + opType);
1580             }
1581           }
1582         }
1583       } else {
1584         // New behavior: Any access we might be granted is more fine-grained
1585         // than whole table or CF. Simply inject a filter and return what is
1586         // allowed. We will not throw an AccessDeniedException. This is a
1587         // behavioral change since 0.96.
1588         authResult.setAllowed(true);
1589         authResult.setReason("Access allowed with filter");
1590         // Only wrap the filter if we are enforcing authorizations
1591         if (authorizationEnabled) {
1592           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1593             AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1594           // wrap any existing filter
1595           if (filter != null) {
1596             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1597               Lists.newArrayList(ourFilter, filter));
1598           }
1599           switch (opType) {
1600             case GET:
1601             case EXISTS:
1602               ((Get)query).setFilter(ourFilter);
1603               break;
1604             case SCAN:
1605               ((Scan)query).setFilter(ourFilter);
1606               break;
1607             default:
1608               throw new RuntimeException("Unhandled operation " + opType);
1609           }
1610         }
1611       }
1612     }
1613
1614     logResult(authResult);
1615     if (authorizationEnabled && !authResult.isAllowed()) {
1616       throw new AccessDeniedException("Insufficient permissions for user '"
1617           + (user != null ? user.getShortName() : "null")
1618           + "' (table=" + table + ", action=READ)");
1619     }
1620   }
1621
1622   @Override
1623   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1624       final Get get, final List<Cell> result) throws IOException {
1625     internalPreRead(c, get, OpType.GET);
1626   }
1627
1628   @Override
1629   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1630       final Get get, final boolean exists) throws IOException {
1631     internalPreRead(c, get, OpType.EXISTS);
1632     return exists;
1633   }
1634
1635   @Override
1636   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1637       final Put put, final WALEdit edit, final Durability durability)
1638       throws IOException {
1639     User user = getActiveUser(c);
1640     checkForReservedTagPresence(user, put);
1641
1642     // Require WRITE permission to the table, CF, or top visible value, if any.
1643     // NOTE: We don't need to check the permissions for any earlier Puts
1644     // because we treat the ACLs in each Put as timestamped like any other
1645     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1646     // change the ACL of any previous Put. This allows simple evolution of
1647     // security policy over time without requiring expensive updates.
1648     RegionCoprocessorEnvironment env = c.getEnvironment();
1649     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1650     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1651     logResult(authResult);
1652     if (!authResult.isAllowed()) {
1653       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1654         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1655       } else if (authorizationEnabled) {
1656         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1657       }
1658     }
1659
1660     // Add cell ACLs from the operation to the cells themselves
1661     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1662     if (bytes != null) {
1663       if (cellFeaturesEnabled) {
1664         addCellPermissions(bytes, put.getFamilyCellMap());
1665       } else {
1666         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1667       }
1668     }
1669   }
1670
1671   @Override
1672   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1673       final Put put, final WALEdit edit, final Durability durability) {
1674     if (aclRegion) {
1675       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1676     }
1677   }
1678
1679   @Override
1680   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1681       final Delete delete, final WALEdit edit, final Durability durability)
1682       throws IOException {
1683     // An ACL on a delete is useless, we shouldn't allow it
1684     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1685       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1686     }
1687     // Require WRITE permissions on all cells covered by the delete. Unlike
1688     // for Puts we need to check all visible prior versions, because a major
1689     // compaction could remove them. If the user doesn't have permission to
1690     // overwrite any of the visible versions ('visible' defined as not covered
1691     // by a tombstone already) then we have to disallow this operation.
1692     RegionCoprocessorEnvironment env = c.getEnvironment();
1693     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1694     User user = getActiveUser(c);
1695     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1696     logResult(authResult);
1697     if (!authResult.isAllowed()) {
1698       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1699         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1700       } else if (authorizationEnabled) {
1701         throw new AccessDeniedException("Insufficient permissions " +
1702           authResult.toContextString());
1703       }
1704     }
1705   }
1706
1707   @Override
1708   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1709       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1710     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1711       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1712       User user = getActiveUser(c);
1713       for (int i = 0; i < miniBatchOp.size(); i++) {
1714         Mutation m = miniBatchOp.getOperation(i);
1715         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1716           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1717           // perm check
1718           OpType opType;
1719           if (m instanceof Put) {
1720             checkForReservedTagPresence(user, m);
1721             opType = OpType.PUT;
1722           } else {
1723             opType = OpType.DELETE;
1724           }
1725           AuthResult authResult = null;
1726           if (checkCoveringPermission(user, opType, c.getEnvironment(), m.getRow(),
1727             m.getFamilyCellMap(), m.getTimeStamp(), Action.WRITE)) {
1728             authResult = AuthResult.allow(opType.toString(), "Covering cell set",
1729               user, Action.WRITE, table, m.getFamilyCellMap());
1730           } else {
1731             authResult = AuthResult.deny(opType.toString(), "Covering cell set",
1732               user, Action.WRITE, table, m.getFamilyCellMap());
1733           }
1734           logResult(authResult);
1735           if (authorizationEnabled && !authResult.isAllowed()) {
1736             throw new AccessDeniedException("Insufficient permissions "
1737               + authResult.toContextString());
1738           }
1739         }
1740       }
1741     }
1742   }
1743
1744   @Override
1745   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1746       final Delete delete, final WALEdit edit, final Durability durability)
1747       throws IOException {
1748     if (aclRegion) {
1749       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1750     }
1751   }
1752
1753   @Override
1754   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1755       final byte [] row, final byte [] family, final byte [] qualifier,
1756       final CompareFilter.CompareOp compareOp,
1757       final ByteArrayComparable comparator, final Put put,
1758       final boolean result) throws IOException {
1759     User user = getActiveUser(c);
1760     checkForReservedTagPresence(user, put);
1761
1762     // Require READ and WRITE permissions on the table, CF, and KV to update
1763     RegionCoprocessorEnvironment env = c.getEnvironment();
1764     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1765     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1766       Action.READ, Action.WRITE);
1767     logResult(authResult);
1768     if (!authResult.isAllowed()) {
1769       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1770         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1771       } else if (authorizationEnabled) {
1772         throw new AccessDeniedException("Insufficient permissions " +
1773           authResult.toContextString());
1774       }
1775     }
1776
1777     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1778     if (bytes != null) {
1779       if (cellFeaturesEnabled) {
1780         addCellPermissions(bytes, put.getFamilyCellMap());
1781       } else {
1782         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1783       }
1784     }
1785     return result;
1786   }
1787
1788   @Override
1789   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1790       final byte[] row, final byte[] family, final byte[] qualifier,
1791       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1792       final boolean result) throws IOException {
1793     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1794       // We had failure with table, cf and q perm checks and now giving a chance for cell
1795       // perm check
1796       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1797       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1798       AuthResult authResult = null;
1799       User user = getActiveUser(c);
1800       if (checkCoveringPermission(user, OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1801           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1802         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1803             user, Action.READ, table, families);
1804       } else {
1805         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1806             user, Action.READ, table, families);
1807       }
1808       logResult(authResult);
1809       if (authorizationEnabled && !authResult.isAllowed()) {
1810         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1811       }
1812     }
1813     return result;
1814   }
1815
1816   @Override
1817   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1818       final byte [] row, final byte [] family, final byte [] qualifier,
1819       final CompareFilter.CompareOp compareOp,
1820       final ByteArrayComparable comparator, final Delete delete,
1821       final boolean result) throws IOException {
1822     // An ACL on a delete is useless, we shouldn't allow it
1823     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1824       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1825           delete.toString());
1826     }
1827     // Require READ and WRITE permissions on the table, CF, and the KV covered
1828     // by the delete
1829     RegionCoprocessorEnvironment env = c.getEnvironment();
1830     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1831     User user = getActiveUser(c);
1832     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1833         Action.READ, Action.WRITE);
1834     logResult(authResult);
1835     if (!authResult.isAllowed()) {
1836       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1837         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1838       } else if (authorizationEnabled) {
1839         throw new AccessDeniedException("Insufficient permissions " +
1840           authResult.toContextString());
1841       }
1842     }
1843     return result;
1844   }
1845
1846   @Override
1847   public boolean preCheckAndDeleteAfterRowLock(
1848       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1849       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1850       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1851       throws IOException {
1852     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1853       // We had failure with table, cf and q perm checks and now giving a chance for cell
1854       // perm check
1855       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1856       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1857       AuthResult authResult = null;
1858       User user = getActiveUser(c);
1859       if (checkCoveringPermission(user, OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1860           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1861         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1862             user, Action.READ, table, families);
1863       } else {
1864         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1865             user, Action.READ, table, families);
1866       }
1867       logResult(authResult);
1868       if (authorizationEnabled && !authResult.isAllowed()) {
1869         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1870       }
1871     }
1872     return result;
1873   }
1874
1875   @Override
1876   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1877       final byte [] row, final byte [] family, final byte [] qualifier,
1878       final long amount, final boolean writeToWAL)
1879       throws IOException {
1880     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1881     // incremented value
1882     RegionCoprocessorEnvironment env = c.getEnvironment();
1883     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1884     User user = getActiveUser(c);
1885     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1886         Action.WRITE);
1887     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1888       authResult.setAllowed(checkCoveringPermission(user, OpType.INCREMENT_COLUMN_VALUE, env, row,
1889         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1890       authResult.setReason("Covering cell set");
1891     }
1892     logResult(authResult);
1893     if (authorizationEnabled && !authResult.isAllowed()) {
1894       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1895     }
1896     return -1;
1897   }
1898
1899   @Override
1900   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1901       throws IOException {
1902     User user = getActiveUser(c);
1903     checkForReservedTagPresence(user, append);
1904
1905     // Require WRITE permission to the table, CF, and the KV to be appended
1906     RegionCoprocessorEnvironment env = c.getEnvironment();
1907     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1908     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1909     logResult(authResult);
1910     if (!authResult.isAllowed()) {
1911       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1912         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1913       } else if (authorizationEnabled)  {
1914         throw new AccessDeniedException("Insufficient permissions " +
1915           authResult.toContextString());
1916       }
1917     }
1918
1919     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1920     if (bytes != null) {
1921       if (cellFeaturesEnabled) {
1922         addCellPermissions(bytes, append.getFamilyCellMap());
1923       } else {
1924         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1925       }
1926     }
1927
1928     return null;
1929   }
1930
1931   @Override
1932   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1933       final Append append) throws IOException {
1934     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1935       // We had failure with table, cf and q perm checks and now giving a chance for cell
1936       // perm check
1937       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1938       AuthResult authResult = null;
1939       User user = getActiveUser(c);
1940       if (checkCoveringPermission(user, OpType.APPEND, c.getEnvironment(), append.getRow(),
1941           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1942         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1943             user, Action.WRITE, table, append.getFamilyCellMap());
1944       } else {
1945         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1946             user, Action.WRITE, table, append.getFamilyCellMap());
1947       }
1948       logResult(authResult);
1949       if (authorizationEnabled && !authResult.isAllowed()) {
1950         throw new AccessDeniedException("Insufficient permissions " +
1951           authResult.toContextString());
1952       }
1953     }
1954     return null;
1955   }
1956
1957   @Override
1958   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1959       final Increment increment)
1960       throws IOException {
1961     User user = getActiveUser(c);
1962     checkForReservedTagPresence(user, increment);
1963
1964     // Require WRITE permission to the table, CF, and the KV to be replaced by
1965     // the incremented value
1966     RegionCoprocessorEnvironment env = c.getEnvironment();
1967     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1968     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1969       Action.WRITE);
1970     logResult(authResult);
1971     if (!authResult.isAllowed()) {
1972       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1973         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1974       } else if (authorizationEnabled) {
1975         throw new AccessDeniedException("Insufficient permissions " +
1976           authResult.toContextString());
1977       }
1978     }
1979
1980     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1981     if (bytes != null) {
1982       if (cellFeaturesEnabled) {
1983         addCellPermissions(bytes, increment.getFamilyCellMap());
1984       } else {
1985         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1986       }
1987     }
1988
1989     return null;
1990   }
1991
1992   @Override
1993   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1994       final Increment increment) throws IOException {
1995     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1996       // We had failure with table, cf and q perm checks and now giving a chance for cell
1997       // perm check
1998       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1999       AuthResult authResult = null;
2000       User user = getActiveUser(c);
2001       if (checkCoveringPermission(user, OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
2002           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
2003         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
2004             user, Action.WRITE, table, increment.getFamilyCellMap());
2005       } else {
2006         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
2007             user, Action.WRITE, table, increment.getFamilyCellMap());
2008       }
2009       logResult(authResult);
2010       if (authorizationEnabled && !authResult.isAllowed()) {
2011         throw new AccessDeniedException("Insufficient permissions " +
2012           authResult.toContextString());
2013       }
2014     }
2015     return null;
2016   }
2017
2018   @Override
2019   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
2020       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
2021     // If the HFile version is insufficient to persist tags, we won't have any
2022     // work to do here
2023     if (!cellFeaturesEnabled) {
2024       return newCell;
2025     }
2026
2027     // Collect any ACLs from the old cell
2028     List<Tag> tags = Lists.newArrayList();
2029     List<Tag> aclTags = Lists.newArrayList();
2030     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
2031     if (oldCell != null) {
2032       Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell);
2033       while (tagIterator.hasNext()) {
2034         Tag tag = tagIterator.next();
2035         if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
2036           // Not an ACL tag, just carry it through
2037           if (LOG.isTraceEnabled()) {
2038             LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType()
2039                 + " length " + tag.getValueLength());
2040           }
2041           tags.add(tag);
2042         } else {
2043           aclTags.add(tag);
2044         }
2045       }
2046     }
2047
2048     // Do we have an ACL on the operation?
2049     byte[] aclBytes = mutation.getACL();
2050     if (aclBytes != null) {
2051       // Yes, use it
2052       tags.add(new ArrayBackedTag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
2053     } else {
2054       // No, use what we carried forward
2055       if (perms != null) {
2056         // TODO: If we collected ACLs from more than one tag we may have a
2057         // List<Permission> of size > 1, this can be collapsed into a single
2058         // Permission
2059         if (LOG.isTraceEnabled()) {
2060           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
2061         }
2062         tags.addAll(aclTags);
2063       }
2064     }
2065
2066     // If we have no tags to add, just return
2067     if (tags.isEmpty()) {
2068       return newCell;
2069     }
2070
2071     Cell rewriteCell = CellUtil.createCell(newCell, tags);
2072     return rewriteCell;
2073   }
2074
2075   @Override
2076   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2077       final Scan scan, final RegionScanner s) throws IOException {
2078     internalPreRead(c, scan, OpType.SCAN);
2079     return s;
2080   }
2081
2082   @Override
2083   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2084       final Scan scan, final RegionScanner s) throws IOException {
2085     User user = getActiveUser(c);
2086     if (user != null && user.getShortName() != null) {
2087       // store reference to scanner owner for later checks
2088       scannerOwners.put(s, user.getShortName());
2089     }
2090     return s;
2091   }
2092
2093   @Override
2094   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
2095       final InternalScanner s, final List<Result> result,
2096       final int limit, final boolean hasNext) throws IOException {
2097     requireScannerOwner(s);
2098     return hasNext;
2099   }
2100
2101   @Override
2102   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2103       final InternalScanner s) throws IOException {
2104     requireScannerOwner(s);
2105   }
2106
2107   @Override
2108   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2109       final InternalScanner s) throws IOException {
2110     // clean up any associated owner mapping
2111     scannerOwners.remove(s);
2112   }
2113
2114   @Override
2115   public boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> e,
2116       final InternalScanner s, final Cell curRowCell, final boolean hasMore) throws IOException {
2117     // Impl in BaseRegionObserver might do unnecessary copy for Off heap backed Cells.
2118     return hasMore;
2119   }
2120
2121   /**
2122    * Verify, when servicing an RPC, that the caller is the scanner owner.
2123    * If so, we assume that access control is correctly enforced based on
2124    * the checks performed in preScannerOpen()
2125    */
2126   private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
2127     if (!RpcServer.isInRpcCallContext())
2128       return;
2129     String requestUserName = RpcServer.getRequestUserName();
2130     String owner = scannerOwners.get(s);
2131     if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
2132       throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
2133     }
2134   }
2135
2136   /**
2137    * Verifies user has CREATE privileges on
2138    * the Column Families involved in the bulkLoadHFile
2139    * request. Specific Column Write privileges are presently
2140    * ignored.
2141    */
2142   @Override
2143   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
2144       List<Pair<byte[], String>> familyPaths) throws IOException {
2145     User user = getActiveUser(ctx);
2146     for(Pair<byte[],String> el : familyPaths) {
2147       requirePermission(user, "preBulkLoadHFile",
2148           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
2149           el.getFirst(),
2150           null,
2151           Action.CREATE);
2152     }
2153   }
2154
2155   /**
2156    * Authorization check for
2157    * SecureBulkLoadProtocol.prepareBulkLoad()
2158    * @param ctx the context
2159    * @param request the request
2160    * @throws IOException
2161    */
2162   @Override
2163   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2164       PrepareBulkLoadRequest request) throws IOException {
2165     requireAccess(getActiveUser(ctx), "prePrepareBulkLoad",
2166         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2167   }
2168
2169   /**
2170    * Authorization security check for
2171    * SecureBulkLoadProtocol.cleanupBulkLoad()
2172    * @param ctx the context
2173    * @param request the request
2174    * @throws IOException
2175    */
2176   @Override
2177   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2178       CleanupBulkLoadRequest request) throws IOException {
2179     requireAccess(getActiveUser(ctx), "preCleanupBulkLoad",
2180         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2181   }
2182
2183   /* ---- EndpointObserver implementation ---- */
2184
2185   @Override
2186   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2187       Service service, String methodName, Message request) throws IOException {
2188     // Don't intercept calls to our own AccessControlService, we check for
2189     // appropriate permissions in the service handlers
2190     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2191       requirePermission(getActiveUser(ctx),
2192           "invoke(" + service.getDescriptorForType().getName() + "." + methodName + ")",
2193           getTableName(ctx.getEnvironment()), null, null,
2194           Action.EXEC);
2195     }
2196     return request;
2197   }
2198
2199   @Override
2200   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2201       Service service, String methodName, Message request, Message.Builder responseBuilder)
2202       throws IOException { }
2203
2204   /* ---- Protobuf AccessControlService implementation ---- */
2205
2206   @Override
2207   public void grant(RpcController controller,
2208                     AccessControlProtos.GrantRequest request,
2209                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2210     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2211     AccessControlProtos.GrantResponse response = null;
2212     try {
2213       // verify it's only running at .acl.
2214       if (aclRegion) {
2215         if (!initialized) {
2216           throw new CoprocessorException("AccessController not yet initialized");
2217         }
2218         if (LOG.isDebugEnabled()) {
2219           LOG.debug("Received request to grant access permission " + perm.toString());
2220         }
2221         User caller = RpcServer.getRequestUser();
2222
2223         switch(request.getUserPermission().getPermission().getType()) {
2224           case Global :
2225           case Table :
2226             requirePermission(caller, "grant", perm.getTableName(),
2227                 perm.getFamily(), perm.getQualifier(), Action.ADMIN);
2228             break;
2229           case Namespace :
2230             requireNamespacePermission(caller, "grant", perm.getNamespace(), Action.ADMIN);
2231            break;
2232         }
2233
2234         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2235           @Override
2236           public Void run() throws Exception {
2237             AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
2238             return null;
2239           }
2240         });
2241
2242         if (AUDITLOG.isTraceEnabled()) {
2243           // audit log should store permission changes in addition to auth results
2244           AUDITLOG.trace("Granted permission " + perm.toString());
2245         }
2246       } else {
2247         throw new CoprocessorException(AccessController.class, "This method "
2248             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2249       }
2250       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2251     } catch (IOException ioe) {
2252       // pass exception back up
2253       ResponseConverter.setControllerException(controller, ioe);
2254     }
2255     done.run(response);
2256   }
2257
2258   @Override
2259   public void revoke(RpcController controller,
2260                      AccessControlProtos.RevokeRequest request,
2261                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2262     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2263     AccessControlProtos.RevokeResponse response = null;
2264     try {
2265       // only allowed to be called on _acl_ region
2266       if (aclRegion) {
2267         if (!initialized) {
2268           throw new CoprocessorException("AccessController not yet initialized");
2269         }
2270         if (LOG.isDebugEnabled()) {
2271           LOG.debug("Received request to revoke access permission " + perm.toString());
2272         }
2273         User caller = RpcServer.getRequestUser();
2274
2275         switch(request.getUserPermission().getPermission().getType()) {
2276           case Global :
2277           case Table :
2278             requirePermission(caller, "revoke", perm.getTableName(), perm.getFamily(),
2279               perm.getQualifier(), Action.ADMIN);
2280             break;
2281           case Namespace :
2282             requireNamespacePermission(caller, "revoke", perm.getNamespace(), Action.ADMIN);
2283             break;
2284         }
2285
2286         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2287           @Override
2288           public Void run() throws Exception {
2289             AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
2290             return null;
2291           }
2292         });
2293
2294         if (AUDITLOG.isTraceEnabled()) {
2295           // audit log should record all permission changes
2296           AUDITLOG.trace("Revoked permission " + perm.toString());
2297         }
2298       } else {
2299         throw new CoprocessorException(AccessController.class, "This method "
2300             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2301       }
2302       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2303     } catch (IOException ioe) {
2304       // pass exception back up
2305       ResponseConverter.setControllerException(controller, ioe);
2306     }
2307     done.run(response);
2308   }
2309
2310   @Override
2311   public void getUserPermissions(RpcController controller,
2312                                  AccessControlProtos.GetUserPermissionsRequest request,
2313                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2314     AccessControlProtos.GetUserPermissionsResponse response = null;
2315     try {
2316       // only allowed to be called on _acl_ region
2317       if (aclRegion) {
2318         if (!initialized) {
2319           throw new CoprocessorException("AccessController not yet initialized");
2320         }
2321         User caller = RpcServer.getRequestUser();
2322
2323         List<UserPermission> perms = null;
2324         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2325           final TableName table = request.hasTableName() ?
2326             ProtobufUtil.toTableName(request.getTableName()) : null;
2327           requirePermission(caller, "userPermissions", table, null, null, Action.ADMIN);
2328           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2329             @Override
2330             public List<UserPermission> run() throws Exception {
2331               return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2332             }
2333           });
2334         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2335           final String namespace = request.getNamespaceName().toStringUtf8();
2336           requireNamespacePermission(caller, "userPermissions", namespace, Action.ADMIN);
2337           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2338             @Override
2339             public List<UserPermission> run() throws Exception {
2340               return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2341                 namespace);
2342             }
2343           });
2344         } else {
2345           requirePermission(caller, "userPermissions", Action.ADMIN);
2346           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2347             @Override
2348             public List<UserPermission> run() throws Exception {
2349               return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2350             }
2351           });
2352           // Adding superusers explicitly to the result set as AccessControlLists do not store them.
2353           // Also using acl as table name to be inline  with the results of global admin and will
2354           // help in avoiding any leakage of information about being superusers.
2355           for (String user: Superusers.getSuperUsers()) {
2356             perms.add(new UserPermission(user.getBytes(), AccessControlLists.ACL_TABLE_NAME, null,
2357                 Action.values()));
2358           }
2359         }
2360         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2361       } else {
2362         throw new CoprocessorException(AccessController.class, "This method "
2363             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2364       }
2365     } catch (IOException ioe) {
2366       // pass exception back up
2367       ResponseConverter.setControllerException(controller, ioe);
2368     }
2369     done.run(response);
2370   }
2371
2372   @Override
2373   public void checkPermissions(RpcController controller,
2374                                AccessControlProtos.CheckPermissionsRequest request,
2375                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2376     Permission[] permissions = new Permission[request.getPermissionCount()];
2377     for (int i=0; i < request.getPermissionCount(); i++) {
2378       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2379     }
2380     AccessControlProtos.CheckPermissionsResponse response = null;
2381     try {
2382       User user = RpcServer.getRequestUser();
2383       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2384       for (Permission permission : permissions) {
2385         if (permission instanceof TablePermission) {
2386           // Check table permissions
2387
2388           TablePermission tperm = (TablePermission) permission;
2389           for (Action action : permission.getActions()) {
2390             if (!tperm.getTableName().equals(tableName)) {
2391               throw new CoprocessorException(AccessController.class, String.format("This method "
2392                   + "can only execute at the table specified in TablePermission. " +
2393                   "Table of the region:%s , requested table:%s", tableName,
2394                   tperm.getTableName()));
2395             }
2396
2397             Map<byte[], Set<byte[]>> familyMap =
2398                 new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2399             if (tperm.getFamily() != null) {
2400               if (tperm.getQualifier() != null) {
2401                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2402                 qualifiers.add(tperm.getQualifier());
2403                 familyMap.put(tperm.getFamily(), qualifiers);
2404               } else {
2405                 familyMap.put(tperm.getFamily(), null);
2406               }
2407             }
2408
2409             AuthResult result = permissionGranted("checkPermissions", user, action, regionEnv,
2410               familyMap);
2411             logResult(result);
2412             if (!result.isAllowed()) {
2413               // Even if passive we need to throw an exception here, we support checking
2414               // effective permissions, so throw unconditionally
2415               throw new AccessDeniedException("Insufficient permissions (table=" + tableName +
2416                 (familyMap.size() > 0 ? ", family: " + result.toFamilyString() : "") +
2417                 ", action=" + action.toString() + ")");
2418             }
2419           }
2420
2421         } else {
2422           // Check global permissions
2423
2424           for (Action action : permission.getActions()) {
2425             AuthResult result;
2426             if (authManager.authorize(user, action)) {
2427               result = AuthResult.allow("checkPermissions", "Global action allowed", user,
2428                 action, null, null);
2429             } else {
2430               result = AuthResult.deny("checkPermissions", "Global action denied", user, action,
2431                 null, null);
2432             }
2433             logResult(result);
2434             if (!result.isAllowed()) {
2435               // Even if passive we need to throw an exception here, we support checking
2436               // effective permissions, so throw unconditionally
2437               throw new AccessDeniedException("Insufficient permissions (action=" +
2438                 action.toString() + ")");
2439             }
2440           }
2441         }
2442       }
2443       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2444     } catch (IOException ioe) {
2445       ResponseConverter.setControllerException(controller, ioe);
2446     }
2447     done.run(response);
2448   }
2449
2450   @Override
2451   public Service getService() {
2452     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2453   }
2454
2455   private Region getRegion(RegionCoprocessorEnvironment e) {
2456     return e.getRegion();
2457   }
2458
2459   private TableName getTableName(RegionCoprocessorEnvironment e) {
2460     Region region = e.getRegion();
2461     if (region != null) {
2462       return getTableName(region);
2463     }
2464     return null;
2465   }
2466
2467   private TableName getTableName(Region region) {
2468     HRegionInfo regionInfo = region.getRegionInfo();
2469     if (regionInfo != null) {
2470       return regionInfo.getTable();
2471     }
2472     return null;
2473   }
2474
2475   @Override
2476   public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
2477       throws IOException {
2478     requirePermission(getActiveUser(c), "preClose", Action.ADMIN);
2479   }
2480
2481   private void checkSystemOrSuperUser(User activeUser) throws IOException {
2482     // No need to check if we're not going to throw
2483     if (!authorizationEnabled) {
2484       return;
2485     }
2486     if (!Superusers.isSuperUser(activeUser)) {
2487       throw new AccessDeniedException("User '" + (activeUser != null ?
2488         activeUser.getShortName() : "null") + "' is not system or super user.");
2489     }
2490   }
2491
2492   @Override
2493   public void preStopRegionServer(
2494       ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2495       throws IOException {
2496     requirePermission(getActiveUser(ctx), "preStopRegionServer", Action.ADMIN);
2497   }
2498
2499   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2500       byte[] qualifier) {
2501     if (family == null) {
2502       return null;
2503     }
2504
2505     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2506     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2507     return familyMap;
2508   }
2509
2510   @Override
2511   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2512        List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2513        String regex) throws IOException {
2514     // We are delegating the authorization check to postGetTableDescriptors as we don't have
2515     // any concrete set of table names when a regex is present or the full list is requested.
2516     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2517       // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2518       // request can be granted.
2519       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2520       for (TableName tableName: tableNamesList) {
2521         // Skip checks for a table that does not exist
2522         if (!masterServices.getTableStateManager().isTablePresent(tableName))
2523           continue;
2524         requirePermission(getActiveUser(ctx), "getTableDescriptors", tableName, null, null,
2525             Action.ADMIN, Action.CREATE);
2526       }
2527     }
2528   }
2529
2530   @Override
2531   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2532       List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2533       String regex) throws IOException {
2534     // Skipping as checks in this case are already done by preGetTableDescriptors.
2535     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2536       return;
2537     }
2538
2539     // Retains only those which passes authorization checks, as the checks weren't done as part
2540     // of preGetTableDescriptors.
2541     Iterator<HTableDescriptor> itr = descriptors.iterator();
2542     while (itr.hasNext()) {
2543       HTableDescriptor htd = itr.next();
2544       try {
2545         requirePermission(getActiveUser(ctx), "getTableDescriptors", htd.getTableName(), null, null,
2546             Action.ADMIN, Action.CREATE);
2547       } catch (AccessDeniedException e) {
2548         itr.remove();
2549       }
2550     }
2551   }
2552
2553   @Override
2554   public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2555       List<HTableDescriptor> descriptors, String regex) throws IOException {
2556     // Retains only those which passes authorization checks.
2557     Iterator<HTableDescriptor> itr = descriptors.iterator();
2558     while (itr.hasNext()) {
2559       HTableDescriptor htd = itr.next();
2560       try {
2561         requireAccess(getActiveUser(ctx), "getTableNames", htd.getTableName(), Action.values());
2562       } catch (AccessDeniedException e) {
2563         itr.remove();
2564       }
2565     }
2566   }
2567
2568   @Override
2569   public void preDispatchMerge(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2570       HRegionInfo regionA, HRegionInfo regionB) throws IOException {
2571     requirePermission(getActiveUser(ctx), "mergeRegions", regionA.getTable(), null, null,
2572       Action.ADMIN);
2573   }
2574
2575   @Override
2576   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA,
2577       Region regionB) throws IOException {
2578     requirePermission(getActiveUser(ctx), "mergeRegions", regionA.getTableDesc().getTableName(),
2579         null, null, Action.ADMIN);
2580   }
2581
2582   @Override
2583   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, Region regionA,
2584       Region regionB, Region mergedRegion) throws IOException { }
2585
2586   @Override
2587   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2588       Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException { }
2589
2590   @Override
2591   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2592       Region regionA, Region regionB, Region mergedRegion) throws IOException { }
2593
2594   @Override
2595   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2596       Region regionA, Region regionB) throws IOException { }
2597
2598   @Override
2599   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2600       Region regionA, Region regionB) throws IOException { }
2601
2602   @Override
2603   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2604       throws IOException {
2605     requirePermission(getActiveUser(ctx), "preRollLogWriterRequest", Permission.Action.ADMIN);
2606   }
2607
2608   @Override
2609   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2610       throws IOException { }
2611
2612   @Override
2613   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2614       final String userName, final Quotas quotas) throws IOException {
2615     requirePermission(getActiveUser(ctx), "setUserQuota", Action.ADMIN);
2616   }
2617
2618   @Override
2619   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2620       final String userName, final TableName tableName, final Quotas quotas) throws IOException {
2621     requirePermission(getActiveUser(ctx), "setUserTableQuota", tableName, null, null, Action.ADMIN);
2622   }
2623
2624   @Override
2625   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2626       final String userName, final String namespace, final Quotas quotas) throws IOException {
2627     requirePermission(getActiveUser(ctx), "setUserNamespaceQuota", Action.ADMIN);
2628   }
2629
2630   @Override
2631   public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2632       final TableName tableName, final Quotas quotas) throws IOException {
2633     requirePermission(getActiveUser(ctx), "setTableQuota", tableName, null, null, Action.ADMIN);
2634   }
2635
2636   @Override
2637   public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2638       final String namespace, final Quotas quotas) throws IOException {
2639     requirePermission(getActiveUser(ctx), "setNamespaceQuota", Action.ADMIN);
2640   }
2641
2642   @Override
2643   public ReplicationEndpoint postCreateReplicationEndPoint(
2644       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2645     return endpoint;
2646   }
2647
2648   @Override
2649   public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2650       List<WALEntry> entries, CellScanner cells) throws IOException {
2651     requirePermission(getActiveUser(ctx), "replicateLogEntries", Action.WRITE);
2652   }
2653
2654   @Override
2655   public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2656       List<WALEntry> entries, CellScanner cells) throws IOException {
2657   }
2658
2659   @Override
2660   public void preMoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
2661                              Set<HostAndPort> servers, String targetGroup) throws IOException {
2662     requirePermission(getActiveUser(ctx), "moveServers", Action.ADMIN);
2663   }
2664
2665   @Override
2666   public void preMoveTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
2667                             Set<TableName> tables, String targetGroup) throws IOException {
2668     requirePermission(getActiveUser(ctx), "moveTables", Action.ADMIN);
2669   }
2670
2671   @Override
2672   public void preAddRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2673                             String name) throws IOException {
2674     requirePermission(getActiveUser(ctx), "addRSGroup", Action.ADMIN);
2675   }
2676
2677   @Override
2678   public void preRemoveRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2679                                String name) throws IOException {
2680     requirePermission(getActiveUser(ctx), "removeRSGroup", Action.ADMIN);
2681   }
2682
2683   @Override
2684   public void preBalanceRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2685                                 String groupName) throws IOException {
2686     requirePermission(getActiveUser(ctx), "balanceRSGroup", Action.ADMIN);
2687   }
2688 }