1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package org.apache.hadoop.hbase.security.access;
16  
17  import java.io.IOException;
18  import java.net.InetAddress;
19  import java.util.Arrays;
20  import java.util.Collection;
21  import java.util.Collections;
22  import java.util.LinkedList;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Set;
26  import java.util.TreeSet;
27  
28  import com.google.protobuf.RpcCallback;
29  import com.google.protobuf.RpcController;
30  import com.google.protobuf.Service;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.CoprocessorEnvironment;
36  import org.apache.hadoop.hbase.HColumnDescriptor;
37  import org.apache.hadoop.hbase.HRegionInfo;
38  import org.apache.hadoop.hbase.HTableDescriptor;
39  import org.apache.hadoop.hbase.KeyValue;
40  import org.apache.hadoop.hbase.KeyValueUtil;
41  import org.apache.hadoop.hbase.ServerName;
42  import org.apache.hadoop.hbase.client.Append;
43  import org.apache.hadoop.hbase.client.Delete;
44  import org.apache.hadoop.hbase.client.Get;
45  import org.apache.hadoop.hbase.client.Increment;
46  import org.apache.hadoop.hbase.client.Put;
47  import org.apache.hadoop.hbase.client.Result;
48  import org.apache.hadoop.hbase.client.Scan;
49  import org.apache.hadoop.hbase.client.Durability;
50  import org.apache.hadoop.hbase.coprocessor.*;
51  import org.apache.hadoop.hbase.exceptions.CoprocessorException;
52  import org.apache.hadoop.hbase.filter.CompareFilter;
53  import org.apache.hadoop.hbase.filter.FilterList;
54  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
55  import org.apache.hadoop.hbase.ipc.RequestContext;
56  import org.apache.hadoop.hbase.master.RegionPlan;
57  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
58  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
59  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
60  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
61  import org.apache.hadoop.hbase.regionserver.HRegion;
62  import org.apache.hadoop.hbase.regionserver.InternalScanner;
63  import org.apache.hadoop.hbase.regionserver.RegionScanner;
64  import org.apache.hadoop.hbase.regionserver.Store;
65  import org.apache.hadoop.hbase.regionserver.ScanType;
66  import org.apache.hadoop.hbase.regionserver.StoreFile;
67  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
68  import org.apache.hadoop.hbase.exceptions.AccessDeniedException;
69  import org.apache.hadoop.hbase.security.User;
70  import org.apache.hadoop.hbase.security.access.Permission.Action;
71  import org.apache.hadoop.hbase.util.Bytes;
72  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
73  import org.apache.hadoop.hbase.util.Pair;
74  
75  import com.google.common.collect.ImmutableSet;
76  import com.google.common.collect.ListMultimap;
77  import com.google.common.collect.Lists;
78  import com.google.common.collect.MapMaker;
79  import com.google.common.collect.Maps;
80  import com.google.common.collect.Sets;
81  
82  import static org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
83  
84  /**
85   * Provides basic authorization checks for data access and administrative
86   * operations.
87   *
88   * <p>
89   * {@code AccessController} performs authorization checks for HBase operations
90   * based on:
91   * <ul>
92   *   <li>the identity of the user performing the operation</li>
93   *   <li>the scope over which the operation is performed, in increasing
94   *   specificity: global, table, column family, or qualifier</li>
95   *   <li>the type of action being performed (as mapped to
96   *   {@link Permission.Action} values)</li>
97   * </ul>
98   * If the authorization check fails, an {@link AccessDeniedException}
99   * will be thrown for the operation.
100  * </p>
101  *
102  * <p>
103  * To perform authorization checks, {@code AccessController} relies on the
104  * {@link org.apache.hadoop.hbase.ipc.RpcServerEngine} being loaded to provide
105  * the user identities for remote requests.
106  * </p>
107  *
108  * <p>
109  * The access control lists used for authorization can be manipulated via the
110  * exposed {@link AccessControlService} Interface implementation, and the associated
111  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
112  * commands.
113  * </p>
114  */
115 public class AccessController extends BaseRegionObserver
116     implements MasterObserver, RegionServerObserver,
117       AccessControlService.Interface, CoprocessorService {
118 
119   public static final Log LOG = LogFactory.getLog(AccessController.class);
120 
121   private static final Log AUDITLOG =
122     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
123 
124   TableAuthManager authManager = null;
125 
126   // flags if we are running on a region of the _acl_ table
127   boolean aclRegion = false;
128 
129   // defined only for Endpoint implementation, so it can have way to
130   // access region services.
131   private RegionCoprocessorEnvironment regionEnv;
132 
133   /** Mapping of scanner instances to the user who created them */
134   private Map<InternalScanner,String> scannerOwners =
135       new MapMaker().weakKeys().makeMap();
136 
137   void initialize(RegionCoprocessorEnvironment e) throws IOException {
138     final HRegion region = e.getRegion();
139 
140     Map<byte[],ListMultimap<String,TablePermission>> tables =
141         AccessControlLists.loadAll(region);
142     // For each table, write out the table's permissions to the respective
143     // znode for that table.
144     for (Map.Entry<byte[],ListMultimap<String,TablePermission>> t:
145       tables.entrySet()) {
146       byte[] table = t.getKey();
147       ListMultimap<String,TablePermission> perms = t.getValue();
148       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, e.getConfiguration());
149       this.authManager.getZKPermissionWatcher().writeToZookeeper(table, serialized);
150     }
151   }
152 
153   /**
154    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
155    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
156    * table updates.
157    */
158   void updateACL(RegionCoprocessorEnvironment e,
159       final Map<byte[], List<? extends Cell>> familyMap) {
160     Set<byte[]> tableSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
161     for (Map.Entry<byte[], List<? extends Cell>> f : familyMap.entrySet()) {
162       List<? extends Cell> cells = f.getValue();
163       for (Cell cell: cells) {
164         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
165         if (Bytes.equals(kv.getBuffer(), kv.getFamilyOffset(),
166             kv.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
167             AccessControlLists.ACL_LIST_FAMILY.length)) {
168           tableSet.add(kv.getRow());
169         }
170       }
171     }
172 
173     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
174     Configuration conf = regionEnv.getConfiguration();
175     for (byte[] tableName: tableSet) {
176       try {
177         ListMultimap<String,TablePermission> perms =
178           AccessControlLists.getTablePermissions(conf, tableName);
179         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
180         zkw.writeToZookeeper(tableName, serialized);
181       } catch (IOException ex) {
182         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(tableName) + "'", ex);
183       }
184     }
185   }
186 
187   /**
188    * Check the current user for authorization to perform a specific action
189    * against the given set of row data.
190    *
191    * <p>Note: Ordering of the authorization checks
192    * has been carefully optimized to short-circuit the most common requests
193    * and minimize the amount of processing required.</p>
194    *
195    * @param permRequest the action being requested
196    * @param e the coprocessor environment
197    * @param families the map of column families to qualifiers present in
198    * the request
199    * @return
200    */
201   AuthResult permissionGranted(String request, User user, Permission.Action permRequest,
202       RegionCoprocessorEnvironment e,
203       Map<byte [], ? extends Collection<?>> families) {
204     HRegionInfo hri = e.getRegion().getRegionInfo();
205     byte[] tableName = hri.getTableName();
206 
207     // 1. All users need read access to .META. table.
208     // this is a very common operation, so deal with it quickly.
209     if (hri.isMetaRegion()) {
210       if (permRequest == Permission.Action.READ) {
211         return AuthResult.allow(request, "All users allowed", user,
212           permRequest, tableName, families);
213       }
214     }
215 
216     if (user == null) {
217       return AuthResult.deny(request, "No user associated with request!", null,
218         permRequest, tableName, families);
219     }
220 
221     // Users with CREATE/ADMIN rights need to modify .META. and _acl_ table
222     // e.g. When a new table is created a new entry in .META. is added,
223     // so the user need to be allowed to write on it.
224     // e.g. When a table is removed an entry is removed from .META. and _acl_
225     // and the user need to be allowed to write on both tables.
226     if (permRequest == Permission.Action.WRITE &&
227        (hri.isMetaRegion() ||
228         Bytes.equals(tableName, AccessControlLists.ACL_GLOBAL_NAME)) &&
229        (authManager.authorize(user, Permission.Action.CREATE) ||
230         authManager.authorize(user, Permission.Action.ADMIN)))
231     {
232        return AuthResult.allow(request, "Table permission granted", user,
233         permRequest, tableName, families);
234     }
235 
236     // 2. check for the table-level, if successful we can short-circuit
237     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
238       return AuthResult.allow(request, "Table permission granted", user,
239         permRequest, tableName, families);
240     }
241 
242     // 3. check permissions against the requested families
243     if (families != null && families.size() > 0) {
244       // all families must pass
245       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
246         // a) check for family level access
247         if (authManager.authorize(user, tableName, family.getKey(),
248             permRequest)) {
249           continue;  // family-level permission overrides per-qualifier
250         }
251 
252         // b) qualifier level access can still succeed
253         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
254           if (family.getValue() instanceof Set) {
255             // for each qualifier of the family
256             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
257             for (byte[] qualifier : familySet) {
258               if (!authManager.authorize(user, tableName, family.getKey(),
259                                          qualifier, permRequest)) {
260                 return AuthResult.deny(request, "Failed qualifier check", user,
261                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
262               }
263             }
264           } else if (family.getValue() instanceof List) { // List<KeyValue>
265             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
266             for (KeyValue kv : kvList) {
267               if (!authManager.authorize(user, tableName, family.getKey(),
268                       kv.getQualifier(), permRequest)) {
269                 return AuthResult.deny(request, "Failed qualifier check", user,
270                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
271               }
272             }
273           }
274         } else {
275           // no qualifiers and family-level check already failed
276           return AuthResult.deny(request, "Failed family check", user, permRequest,
277               tableName, makeFamilyMap(family.getKey(), null));
278         }
279       }
280 
281       // all family checks passed
282       return AuthResult.allow(request, "All family checks passed", user, permRequest,
283           tableName, families);
284     }
285 
286     // 4. no families to check and table level access failed
287     return AuthResult.deny(request, "No families to check and table permission failed",
288         user, permRequest, tableName, families);
289   }
290 
291   private void logResult(AuthResult result) {
292     if (AUDITLOG.isTraceEnabled()) {
293       RequestContext ctx = RequestContext.get();
294       InetAddress remoteAddr = null;
295       if (ctx != null) {
296         remoteAddr = ctx.getRemoteAddress();
297       }
298       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
299           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
300           "; reason: " + result.getReason() +
301           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
302           "; request: " + result.getRequest() +
303           "; context: " + result.toContextString());
304     }
305   }
306 
307   /**
308    * Returns the active user to which authorization checks should be applied.
309    * If we are in the context of an RPC call, the remote user is used,
310    * otherwise the currently logged in user is used.
311    */
312   private User getActiveUser() throws IOException {
313     User user = RequestContext.getRequestUser();
314     if (!RequestContext.isInRequestContext()) {
315       // for non-rpc handling, fallback to system user
316       user = User.getCurrent();
317     }
318     return user;
319   }
320 
321   /**
322    * Authorizes that the current user has any of the given permissions for the
323    * given table, column family and column qualifier.
324    * @param tableName Table requested
325    * @param family Column family requested
326    * @param qualifier Column qualifier requested
327    * @throws IOException if obtaining the current user fails
328    * @throws AccessDeniedException if user has no authorization
329    */
330   private void requirePermission(String request, byte[] tableName, byte[] family, byte[] qualifier,
331       Action... permissions) throws IOException {
332     User user = getActiveUser();
333     AuthResult result = null;
334 
335     for (Action permission : permissions) {
336       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
337         result = AuthResult.allow(request, "Table permission granted", user,
338                                   permission, tableName, family, qualifier);
339         break;
340       } else {
341         // rest of the world
342         result = AuthResult.deny(request, "Insufficient permissions", user,
343                                  permission, tableName, family, qualifier);
344       }
345     }
346     logResult(result);
347     if (!result.isAllowed()) {
348       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
349     }
350   }
351 
352   /**
353    * Authorizes that the current user has global privileges for the given action.
354    * @param perm The action being requested
355    * @throws IOException if obtaining the current user fails
356    * @throws AccessDeniedException if authorization is denied
357    */
358   private void requirePermission(String request, Permission.Action perm) throws IOException {
359     requireGlobalPermission(request, perm, null, null);
360   }
361 
362   /**
363    * Authorizes that the current user has permission to perform the given
364    * action on the set of table column families.
365    * @param perm Action that is required
366    * @param env The current coprocessor environment
367    * @param families The map of column families-qualifiers.
368    * @throws AccessDeniedException if the authorization check failed
369    */
370   private void requirePermission(String request, Permission.Action perm,
371         RegionCoprocessorEnvironment env,
372         Map<byte[], ? extends Collection<?>> families)
373       throws IOException {
374     User user = getActiveUser();
375     AuthResult result = permissionGranted(request, user, perm, env, families);
376     logResult(result);
377 
378     if (!result.isAllowed()) {
379       throw new AccessDeniedException("Insufficient permissions (table=" +
380         env.getRegion().getTableDesc().getNameAsString()+
381         ((families != null && families.size() > 0) ? ", family: " +
382         result.toFamilyString() : "") + ", action=" +
383         perm.toString() + ")");
384     }
385   }
386 
387   /**
388    * Checks that the user has the given global permission. The generated
389    * audit log message will contain context information for the operation
390    * being authorized, based on the given parameters.
391    * @param perm Action being requested
392    * @param tableName Affected table name.
393    * @param familiMap Affected column families.
394    */
395   private void requireGlobalPermission(String request, Permission.Action perm, byte[] tableName,
396       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
397     User user = getActiveUser();
398     if (authManager.authorize(user, perm)) {
399       logResult(AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap));
400     } else {
401       logResult(AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap));
402       throw new AccessDeniedException("Insufficient permissions for user '" +
403           (user != null ? user.getShortName() : "null") +"' (global, action=" +
404           perm.toString() + ")");
405     }
406   }
407 
408   /**
409    * Returns <code>true</code> if the current user is allowed the given action
410    * over at least one of the column qualifiers in the given column families.
411    */
412   private boolean hasFamilyQualifierPermission(User user,
413       Permission.Action perm,
414       RegionCoprocessorEnvironment env,
415       Map<byte[], ? extends Set<byte[]>> familyMap)
416     throws IOException {
417     HRegionInfo hri = env.getRegion().getRegionInfo();
418     byte[] tableName = hri.getTableName();
419 
420     if (user == null) {
421       return false;
422     }
423 
424     if (familyMap != null && familyMap.size() > 0) {
425       // at least one family must be allowed
426       for (Map.Entry<byte[], ? extends Set<byte[]>> family :
427           familyMap.entrySet()) {
428         if (family.getValue() != null && !family.getValue().isEmpty()) {
429           for (byte[] qualifier : family.getValue()) {
430             if (authManager.matchPermission(user, tableName,
431                 family.getKey(), qualifier, perm)) {
432               return true;
433             }
434           }
435         } else {
436           if (authManager.matchPermission(user, tableName, family.getKey(),
437               perm)) {
438             return true;
439           }
440         }
441       }
442     } else if (LOG.isDebugEnabled()) {
443       LOG.debug("Empty family map passed for permission check");
444     }
445 
446     return false;
447   }
448 
449   /* ---- MasterObserver implementation ---- */
450   public void start(CoprocessorEnvironment env) throws IOException {
451 
452     ZooKeeperWatcher zk = null;
453     if (env instanceof MasterCoprocessorEnvironment) {
454       // if running on HMaster
455       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
456       zk = mEnv.getMasterServices().getZooKeeper();      
457     } else if (env instanceof RegionServerCoprocessorEnvironment) {      
458       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
459       zk = rsEnv.getRegionServerServices().getZooKeeper();      
460     } else if (env instanceof RegionCoprocessorEnvironment) {
461       // if running at region
462       regionEnv = (RegionCoprocessorEnvironment) env;
463       zk = regionEnv.getRegionServerServices().getZooKeeper();
464     }
465 
466     // If zk is null or IOException while obtaining auth manager,
467     // throw RuntimeException so that the coprocessor is unloaded.
468     if (zk != null) {
469       try {
470         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
471       } catch (IOException ioe) {
472         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
473       }
474     } else {
475       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
476     }
477   }
478 
479   public void stop(CoprocessorEnvironment env) {
480 
481   }
482 
483   @Override
484   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
485       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
486     Set<byte[]> families = desc.getFamiliesKeys();
487     Map<byte[], Set<byte[]>> familyMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
488     for (byte[] family: families) {
489       familyMap.put(family, null);
490     }
491     requireGlobalPermission("createTable", Permission.Action.CREATE, desc.getName(), familyMap);
492   }
493 
494   @Override
495   public void preCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
496       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {}
497 
498   @Override
499   public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
500       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
501     if (!AccessControlLists.isAclTable(desc)) {
502       String owner = desc.getOwnerString();
503       // default the table owner to current user, if not specified.
504       if (owner == null) owner = getActiveUser().getShortName();
505       UserPermission userperm = new UserPermission(Bytes.toBytes(owner), desc.getName(), null,
506           Action.values());
507       AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(), userperm);
508     }
509   }
510 
511   @Override
512   public void postCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
513       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {}
514 
515   @Override
516   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, byte[] tableName)
517       throws IOException {
518     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
519   }
520 
521   @Override
522   public void preDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
523       byte[] tableName) throws IOException {}
524   @Override
525   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
526       byte[] tableName) throws IOException {
527     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(), tableName);
528   }
529   @Override
530   public void postDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
531       byte[] tableName) throws IOException {}
532 
533   @Override
534   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, byte[] tableName,
535       HTableDescriptor htd) throws IOException {
536     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
537   }
538 
539   @Override
540   public void preModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
541       byte[] tableName, HTableDescriptor htd) throws IOException {}
542 
543   @Override
544   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
545       byte[] tableName, HTableDescriptor htd) throws IOException {
546     String owner = htd.getOwnerString();
547     // default the table owner to current user, if not specified.
548     if (owner == null) owner = getActiveUser().getShortName();
549     UserPermission userperm = new UserPermission(Bytes.toBytes(owner), htd.getName(), null,
550         Action.values());
551     AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(), userperm);
552   }
553 
554   @Override
555   public void postModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
556       byte[] tableName, HTableDescriptor htd) throws IOException {}
557 
558 
559   @Override
560   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, byte[] tableName,
561       HColumnDescriptor column) throws IOException {
562     requirePermission("addColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
563   }
564 
565   @Override
566   public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
567       byte[] tableName, HColumnDescriptor column) throws IOException {}
568   @Override
569   public void postAddColumn(ObserverContext<MasterCoprocessorEnvironment> c,
570       byte[] tableName, HColumnDescriptor column) throws IOException {}
571   @Override
572   public void postAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
573       byte[] tableName, HColumnDescriptor column) throws IOException {}
574 
575   @Override
576   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, byte[] tableName,
577       HColumnDescriptor descriptor) throws IOException {
578     requirePermission("modifyColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
579   }
580 
581   @Override
582   public void preModifyColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
583       byte[] tableName, HColumnDescriptor descriptor) throws IOException {}
584   @Override
585   public void postModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c,
586       byte[] tableName, HColumnDescriptor descriptor) throws IOException {}
587   @Override
588   public void postModifyColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
589       byte[] tableName, HColumnDescriptor descriptor) throws IOException {}
590 
591 
592   @Override
593   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, byte[] tableName,
594       byte[] col) throws IOException {
595     requirePermission("deleteColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
596   }
597 
598   @Override
599   public void preDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
600       byte[] tableName, byte[] col) throws IOException {}
601   @Override
602   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
603       byte[] tableName, byte[] col) throws IOException {
604     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(),
605                                               tableName, col);
606   }
607   @Override
608   public void postDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
609       byte[] tableName, byte[] col) throws IOException {}
610 
611   @Override
612   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, byte[] tableName)
613       throws IOException {
614     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
615   }
616 
617   @Override
618   public void preEnableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
619       byte[] tableName) throws IOException {}
620   @Override
621   public void postEnableTable(ObserverContext<MasterCoprocessorEnvironment> c,
622       byte[] tableName) throws IOException {}
623   @Override
624   public void postEnableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
625       byte[] tableName) throws IOException {}
626 
627   @Override
628   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, byte[] tableName)
629       throws IOException {
630     if (Bytes.equals(tableName, AccessControlLists.ACL_GLOBAL_NAME)) {
631       throw new AccessDeniedException("Not allowed to disable "
632           + AccessControlLists.ACL_TABLE_NAME_STR + " table.");
633     }
634     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
635   }
636 
637   @Override
638   public void preDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
639       byte[] tableName) throws IOException {}
640   @Override
641   public void postDisableTable(ObserverContext<MasterCoprocessorEnvironment> c,
642       byte[] tableName) throws IOException {}
643   @Override
644   public void postDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
645       byte[] tableName) throws IOException {}
646 
647   @Override
648   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
649       ServerName srcServer, ServerName destServer) throws IOException {
650     requirePermission("move", region.getTableName(), null, null, Action.ADMIN);
651   }
652 
653   @Override
654   public void postMove(ObserverContext<MasterCoprocessorEnvironment> c,
655       HRegionInfo region, ServerName srcServer, ServerName destServer)
656     throws IOException {}
657 
658   @Override
659   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
660       throws IOException {
661     requirePermission("assign", regionInfo.getTableName(), null, null, Action.ADMIN);
662   }
663 
664   @Override
665   public void postAssign(ObserverContext<MasterCoprocessorEnvironment> c,
666       HRegionInfo regionInfo) throws IOException {}
667 
668   @Override
669   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
670       boolean force) throws IOException {
671     requirePermission("unassign", regionInfo.getTableName(), null, null, Action.ADMIN);
672   }
673 
674   @Override
675   public void postUnassign(ObserverContext<MasterCoprocessorEnvironment> c,
676       HRegionInfo regionInfo, boolean force) throws IOException {}
677 
678   @Override
679   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
680       HRegionInfo regionInfo) throws IOException {
681     requirePermission("regionOffline", regionInfo.getTableName(), null, null, Action.ADMIN);
682   }
683 
684   @Override
685   public void postRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
686       HRegionInfo regionInfo) throws IOException {
687   }
688 
689   @Override
690   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
691       throws IOException {
692     requirePermission("balance", Permission.Action.ADMIN);
693   }
694   @Override
695   public void postBalance(ObserverContext<MasterCoprocessorEnvironment> c, List<RegionPlan> plans)
696       throws IOException {}
697 
698   @Override
699   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
700       boolean newValue) throws IOException {
701     requirePermission("balanceSwitch", Permission.Action.ADMIN);
702     return newValue;
703   }
704   @Override
705   public void postBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
706       boolean oldValue, boolean newValue) throws IOException {}
707 
708   @Override
709   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
710       throws IOException {
711     requirePermission("shutdown", Permission.Action.ADMIN);
712   }
713 
714   @Override
715   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
716       throws IOException {
717     requirePermission("stopMaster", Permission.Action.ADMIN);
718   }
719 
720   @Override
721   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
722       throws IOException {
723     // initialize the ACL storage table
724     AccessControlLists.init(ctx.getEnvironment().getMasterServices());
725   }
726 
727   @Override
728   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
729       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
730       throws IOException {
731     requirePermission("snapshot", Permission.Action.ADMIN);
732   }
733 
734   @Override
735   public void postSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
736       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
737       throws IOException {
738   }
739 
740   @Override
741   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
742       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
743       throws IOException {
744     requirePermission("clone", Permission.Action.ADMIN);
745   }
746 
747   @Override
748   public void postCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
749       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
750       throws IOException {
751   }
752 
753   @Override
754   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
755       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
756       throws IOException {
757     requirePermission("restore", Permission.Action.ADMIN);
758   }
759 
760   @Override
761   public void postRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
762       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
763       throws IOException {
764   }
765 
766   @Override
767   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
768       final SnapshotDescription snapshot) throws IOException {
769     requirePermission("deleteSnapshot", Permission.Action.ADMIN);
770   }
771 
772   @Override
773   public void postDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
774       final SnapshotDescription snapshot) throws IOException {
775   }
776 
777   /* ---- RegionObserver implementation ---- */
778 
779   @Override
780   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
781       throws IOException {
782     RegionCoprocessorEnvironment env = e.getEnvironment();
783     final HRegion region = env.getRegion();
784     if (region == null) {
785       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
786     } else {
787       HRegionInfo regionInfo = region.getRegionInfo();
788       if (isSpecialTable(regionInfo)) {
789         isSystemOrSuperUser(regionEnv.getConfiguration());
790       } else {
791         requirePermission("preOpen", Action.ADMIN);
792       }
793     }
794   }
795 
796   @Override
797   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
798     RegionCoprocessorEnvironment env = c.getEnvironment();
799     final HRegion region = env.getRegion();
800     if (region == null) {
801       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
802       return;
803     }
804     if (AccessControlLists.isAclRegion(region)) {
805       aclRegion = true;
806       try {
807         initialize(env);
808       } catch (IOException ex) {
809         // if we can't obtain permissions, it's better to fail
810         // than perform checks incorrectly
811         throw new RuntimeException("Failed to initialize permissions cache", ex);
812       }
813     }
814   }
815 
816   @Override
817   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
818     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
819   }
820 
821   @Override
822   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
823     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
824   }
825   
826   @Override
827   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
828       byte[] splitRow) throws IOException {
829     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
830   }
831 
832   @Override
833   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
834       final Store store, final InternalScanner scanner, final ScanType scanType)
835           throws IOException {
836     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
837     return scanner;
838   }
839 
840   @Override
841   public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> e,
842       final Store store, final List<StoreFile> candidates) throws IOException {
843     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
844   }
845 
846   @Override
847   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
848       final byte [] row, final byte [] family, final Result result)
849       throws IOException {
850     assert family != null;
851     //noinspection PrimitiveArrayArgumentToVariableArgMethod
852     requirePermission("getClosestRowBefore", Permission.Action.READ, c.getEnvironment(),
853         makeFamilyMap(family, null));
854   }
855 
856   @Override
857   public void preGet(final ObserverContext<RegionCoprocessorEnvironment> c,
858       final Get get, final List<KeyValue> result) throws IOException {
859     /*
860      if column family level checks fail, check for a qualifier level permission
861      in one of the families.  If it is present, then continue with the AccessControlFilter.
862       */
863     RegionCoprocessorEnvironment e = c.getEnvironment();
864     User requestUser = getActiveUser();
865     AuthResult authResult = permissionGranted("get", requestUser,
866         Permission.Action.READ, e, get.getFamilyMap());
867     if (!authResult.isAllowed()) {
868       if (hasFamilyQualifierPermission(requestUser,
869           Permission.Action.READ, e, get.getFamilyMap())) {
870         byte[] table = getTableName(e);
871         AccessControlFilter filter = new AccessControlFilter(authManager,
872             requestUser, table);
873 
874         // wrap any existing filter
875         if (get.getFilter() != null) {
876           FilterList wrapper = new FilterList(FilterList.Operator.MUST_PASS_ALL,
877               Lists.newArrayList(filter, get.getFilter()));
878           get.setFilter(wrapper);
879         } else {
880           get.setFilter(filter);
881         }
882         logResult(AuthResult.allow("get", "Access allowed with filter", requestUser,
883             Permission.Action.READ, authResult.getTable(), get.getFamilyMap()));
884       } else {
885         logResult(authResult);
886         throw new AccessDeniedException("Insufficient permissions (table=" +
887           e.getRegion().getTableDesc().getNameAsString() + ", action=READ)");
888       }
889     } else {
890       // log auth success
891       logResult(authResult);
892     }
893   }
894 
895   @Override
896   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
897       final Get get, final boolean exists) throws IOException {
898     requirePermission("exists", Permission.Action.READ, c.getEnvironment(),
899         get.getFamilyMap());
900     return exists;
901   }
902 
903   @Override
904   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
905       final Put put, final WALEdit edit, final Durability durability)
906       throws IOException {
907     requirePermission("put", Permission.Action.WRITE, c.getEnvironment(),
908         put.getFamilyMap());
909   }
910 
911   @Override
912   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
913       final Put put, final WALEdit edit, final Durability durability) {
914     if (aclRegion) {
915       updateACL(c.getEnvironment(), put.getFamilyMap());
916     }
917   }
918 
919   @Override
920   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
921       final Delete delete, final WALEdit edit, final Durability durability)
922       throws IOException {
923     requirePermission("delete", Permission.Action.WRITE, c.getEnvironment(),
924         delete.getFamilyMap());
925   }
926 
927   @Override
928   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
929       final Delete delete, final WALEdit edit, final Durability durability)
930       throws IOException {
931     if (aclRegion) {
932       updateACL(c.getEnvironment(), delete.getFamilyMap());
933     }
934   }
935 
936   @Override
937   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
938       final byte [] row, final byte [] family, final byte [] qualifier,
939       final CompareFilter.CompareOp compareOp,
940       final ByteArrayComparable comparator, final Put put,
941       final boolean result) throws IOException {
942     Map<byte[], ? extends Collection<byte[]>> familyMap = makeFamilyMap(family, qualifier);
943     requirePermission("checkAndPut", Permission.Action.READ, c.getEnvironment(), familyMap);
944     requirePermission("checkAndPut", Permission.Action.WRITE, c.getEnvironment(), familyMap);
945     return result;
946   }
947 
948   @Override
949   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
950       final byte [] row, final byte [] family, final byte [] qualifier,
951       final CompareFilter.CompareOp compareOp,
952       final ByteArrayComparable comparator, final Delete delete,
953       final boolean result) throws IOException {
954     Map<byte[], ? extends Collection<byte[]>> familyMap = makeFamilyMap(family, qualifier);
955     requirePermission("checkAndDelete", Permission.Action.READ, c.getEnvironment(), familyMap);
956     requirePermission("checkAndDelete", Permission.Action.WRITE, c.getEnvironment(), familyMap);
957     return result;
958   }
959 
960   @Override
961   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
962       final byte [] row, final byte [] family, final byte [] qualifier,
963       final long amount, final boolean writeToWAL)
964       throws IOException {
965     Map<byte[], ? extends Collection<byte[]>> familyMap = makeFamilyMap(family, qualifier);
966     requirePermission("incrementColumnValue", Permission.Action.WRITE, c.getEnvironment(), familyMap);
967     return -1;
968   }
969 
970   @Override
971   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
972       throws IOException {
973     requirePermission("append", Permission.Action.WRITE, c.getEnvironment(), append.getFamilyMap());
974     return null;
975   }
976 
977   @Override
978   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
979       final Increment increment)
980       throws IOException {
981     // Create a map of family to qualifiers.
982     Map<byte[], Set<byte[]>> familyMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
983     for (Map.Entry<byte [], List<? extends Cell>> entry: increment.getFamilyMap().entrySet()) {
984       Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
985       for (Cell cell: entry.getValue()) {
986         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
987         qualifiers.add(kv.getQualifier());
988       }
989       familyMap.put(entry.getKey(), qualifiers);
990     }
991     requirePermission("increment", Permission.Action.WRITE, c.getEnvironment(), familyMap);
992     return null;
993   }
994 
995   @Override
996   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
997       final Scan scan, final RegionScanner s) throws IOException {
998     /*
999      if column family level checks fail, check for a qualifier level permission
1000      in one of the families.  If it is present, then continue with the AccessControlFilter.
1001       */
1002     RegionCoprocessorEnvironment e = c.getEnvironment();
1003     User user = getActiveUser();
1004     AuthResult authResult = permissionGranted("scannerOpen", user, Permission.Action.READ, e,
1005         scan.getFamilyMap());
1006     if (!authResult.isAllowed()) {
1007       if (hasFamilyQualifierPermission(user, Permission.Action.READ, e,
1008           scan.getFamilyMap())) {
1009         byte[] table = getTableName(e);
1010         AccessControlFilter filter = new AccessControlFilter(authManager,
1011             user, table);
1012 
1013         // wrap any existing filter
1014         if (scan.hasFilter()) {
1015           FilterList wrapper = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1016               Lists.newArrayList(filter, scan.getFilter()));
1017           scan.setFilter(wrapper);
1018         } else {
1019           scan.setFilter(filter);
1020         }
1021         logResult(AuthResult.allow("scannerOpen", "Access allowed with filter", user,
1022             Permission.Action.READ, authResult.getTable(), scan.getFamilyMap()));
1023       } else {
1024         // no table/family level perms and no qualifier level perms, reject
1025         logResult(authResult);
1026         throw new AccessDeniedException("Insufficient permissions for user '"+
1027             (user != null ? user.getShortName() : "null")+"' "+
1028             "for scanner open on table " + Bytes.toString(getTableName(e)));
1029       }
1030     } else {
1031       // log success
1032       logResult(authResult);
1033     }
1034     return s;
1035   }
1036 
1037   @Override
1038   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1039       final Scan scan, final RegionScanner s) throws IOException {
1040     User user = getActiveUser();
1041     if (user != null && user.getShortName() != null) {      // store reference to scanner owner for later checks
1042       scannerOwners.put(s, user.getShortName());
1043     }
1044     return s;
1045   }
1046 
1047   @Override
1048   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1049       final InternalScanner s, final List<Result> result,
1050       final int limit, final boolean hasNext) throws IOException {
1051     requireScannerOwner(s);
1052     return hasNext;
1053   }
1054 
1055   @Override
1056   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1057       final InternalScanner s) throws IOException {
1058     requireScannerOwner(s);
1059   }
1060 
1061   @Override
1062   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1063       final InternalScanner s) throws IOException {
1064     // clean up any associated owner mapping
1065     scannerOwners.remove(s);
1066   }
1067 
1068   /**
1069    * Verify, when servicing an RPC, that the caller is the scanner owner.
1070    * If so, we assume that access control is correctly enforced based on
1071    * the checks performed in preScannerOpen()
1072    */
1073   private void requireScannerOwner(InternalScanner s)
1074       throws AccessDeniedException {
1075     if (RequestContext.isInRequestContext()) {
1076       String requestUserName = RequestContext.getRequestUserName();
1077       String owner = scannerOwners.get(s);
1078       if (owner != null && !owner.equals(requestUserName)) {
1079         throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
1080       }
1081     }
1082   }
1083 
1084   /**
1085    * Verifies user has WRITE privileges on
1086    * the Column Families involved in the bulkLoadHFile
1087    * request. Specific Column Write privileges are presently
1088    * ignored.
1089    */
1090   @Override
1091   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1092       List<Pair<byte[], String>> familyPaths) throws IOException {
1093     List<byte[]> cfs = new LinkedList<byte[]>();
1094     for(Pair<byte[],String> el : familyPaths) {
1095       requirePermission("preBulkLoadHFile",
1096           ctx.getEnvironment().getRegion().getTableDesc().getName(),
1097           el.getFirst(),
1098           null,
1099           Permission.Action.WRITE);
1100     }
1101   }
1102 
1103   private AuthResult hasSomeAccess(RegionCoprocessorEnvironment e, String method, Action action) throws IOException {
1104     User requestUser = getActiveUser();
1105     byte[] tableName = e.getRegion().getTableDesc().getName();
1106     AuthResult authResult = permissionGranted(method, requestUser,
1107         action, e, Collections.EMPTY_MAP);
1108     if (!authResult.isAllowed()) {
1109       for(UserPermission userPerm:
1110           AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), tableName)) {
1111         for(Permission.Action userAction: userPerm.getActions()) {
1112           if(userAction.equals(action)) {
1113             return AuthResult.allow(method, "Access allowed", requestUser,
1114                 action, tableName, null, null);
1115           }
1116         }
1117       }
1118     }
1119     return authResult;
1120   }
1121 
1122   /**
1123    * Authorization check for
1124    * SecureBulkLoadProtocol.prepareBulkLoad()
1125    * @param e
1126    * @throws IOException
1127    */
1128   //TODO this should end up as a coprocessor hook
1129   public void prePrepareBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
1130     AuthResult authResult = hasSomeAccess(e, "prePrepareBulkLoad", Action.WRITE);
1131     logResult(authResult);
1132     if (!authResult.isAllowed()) {
1133       throw new AccessDeniedException("Insufficient permissions (table=" +
1134         e.getRegion().getTableDesc().getNameAsString() + ", action=WRITE)");
1135     }
1136   }
1137 
1138   /**
1139    * Authorization security check for
1140    * SecureBulkLoadProtocol.cleanupBulkLoad()
1141    * @param e
1142    * @throws IOException
1143    */
1144   //TODO this should end up as a coprocessor hook
1145   public void preCleanupBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
1146     AuthResult authResult = hasSomeAccess(e, "preCleanupBulkLoad", Action.WRITE);
1147     logResult(authResult);
1148     if (!authResult.isAllowed()) {
1149       throw new AccessDeniedException("Insufficient permissions (table=" +
1150         e.getRegion().getTableDesc().getNameAsString() + ", action=WRITE)");
1151     }
1152   }
1153 
1154   /* ---- Protobuf AccessControlService implementation ---- */
1155   @Override
1156   public void grant(RpcController controller,
1157                     AccessControlProtos.GrantRequest request,
1158                     RpcCallback<AccessControlProtos.GrantResponse> done) {
1159     UserPermission perm = ProtobufUtil.toUserPermission(request.getPermission());
1160     AccessControlProtos.GrantResponse response = null;
1161     try {
1162       // verify it's only running at .acl.
1163       if (aclRegion) {
1164         if (LOG.isDebugEnabled()) {
1165           LOG.debug("Received request to grant access permission " + perm.toString());
1166         }
1167 
1168         requirePermission("grant", perm.getTable(), perm.getFamily(), perm.getQualifier(), Action.ADMIN);
1169 
1170         AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
1171         if (AUDITLOG.isTraceEnabled()) {
1172           // audit log should store permission changes in addition to auth results
1173           AUDITLOG.trace("Granted permission " + perm.toString());
1174         }
1175       } else {
1176         throw new CoprocessorException(AccessController.class, "This method "
1177             + "can only execute at " + Bytes.toString(AccessControlLists.ACL_TABLE_NAME) + " table.");
1178       }
1179       response = AccessControlProtos.GrantResponse.getDefaultInstance();
1180     } catch (IOException ioe) {
1181       // pass exception back up
1182       ResponseConverter.setControllerException(controller, ioe);
1183     }
1184     done.run(response);
1185   }
1186 
1187   @Override
1188   public void revoke(RpcController controller,
1189                      AccessControlProtos.RevokeRequest request,
1190                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
1191     UserPermission perm = ProtobufUtil.toUserPermission(request.getPermission());
1192     AccessControlProtos.RevokeResponse response = null;
1193     try {
1194       // only allowed to be called on _acl_ region
1195       if (aclRegion) {
1196         if (LOG.isDebugEnabled()) {
1197           LOG.debug("Received request to revoke access permission " + perm.toString());
1198         }
1199 
1200         requirePermission("revoke", perm.getTable(), perm.getFamily(),
1201                           perm.getQualifier(), Action.ADMIN);
1202 
1203         AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
1204         if (AUDITLOG.isTraceEnabled()) {
1205           // audit log should record all permission changes
1206           AUDITLOG.trace("Revoked permission " + perm.toString());
1207         }
1208       } else {
1209         throw new CoprocessorException(AccessController.class, "This method "
1210             + "can only execute at " + Bytes.toString(AccessControlLists.ACL_TABLE_NAME) + " table.");
1211       }
1212       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
1213     } catch (IOException ioe) {
1214       // pass exception back up
1215       ResponseConverter.setControllerException(controller, ioe);
1216     }
1217     done.run(response);
1218   }
1219 
1220   @Override
1221   public void getUserPermissions(RpcController controller,
1222                                  AccessControlProtos.UserPermissionsRequest request,
1223                                  RpcCallback<AccessControlProtos.UserPermissionsResponse> done) {
1224     AccessControlProtos.UserPermissionsResponse response = null;
1225     byte[] table = null;
1226     if (request.hasTable()) {
1227       table = request.getTable().toByteArray();
1228     }
1229     try {
1230       // only allowed to be called on _acl_ region
1231       if (aclRegion) {
1232         requirePermission("userPermissions", table, null, null, Action.ADMIN);
1233 
1234         List<UserPermission> perms = AccessControlLists.getUserPermissions(
1235           regionEnv.getConfiguration(), table);
1236         response = ResponseConverter.buildUserPermissionsResponse(perms);
1237       } else {
1238         throw new CoprocessorException(AccessController.class, "This method "
1239             + "can only execute at " + Bytes.toString(AccessControlLists.ACL_TABLE_NAME) + " table.");
1240       }
1241     } catch (IOException ioe) {
1242       // pass exception back up
1243       ResponseConverter.setControllerException(controller, ioe);
1244     }
1245     done.run(response);
1246   }
1247 
1248   @Override
1249   public void checkPermissions(RpcController controller,
1250                                AccessControlProtos.CheckPermissionsRequest request,
1251                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
1252     Permission[] permissions = new Permission[request.getPermissionCount()];
1253     for (int i=0; i < request.getPermissionCount(); i++) {
1254       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
1255     }
1256     AccessControlProtos.CheckPermissionsResponse response = null;
1257     try {
1258       byte[] tableName = regionEnv.getRegion().getTableDesc().getName();
1259       for (Permission permission : permissions) {
1260         if (permission instanceof TablePermission) {
1261           TablePermission tperm = (TablePermission) permission;
1262           for (Permission.Action action : permission.getActions()) {
1263             if (!Arrays.equals(tperm.getTable(), tableName)) {
1264               throw new CoprocessorException(AccessController.class, String.format("This method "
1265                   + "can only execute at the table specified in TablePermission. " +
1266                   "Table of the region:%s , requested table:%s", Bytes.toString(tableName),
1267                   Bytes.toString(tperm.getTable())));
1268             }
1269 
1270             Map<byte[], Set<byte[]>> familyMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
1271             if (tperm.getFamily() != null) {
1272               if (tperm.getQualifier() != null) {
1273                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
1274                 qualifiers.add(tperm.getQualifier());
1275                 familyMap.put(tperm.getFamily(), qualifiers);
1276               } else {
1277                 familyMap.put(tperm.getFamily(), null);
1278               }
1279             }
1280 
1281             requirePermission("checkPermissions", action, regionEnv, familyMap);
1282           }
1283 
1284         } else {
1285           for (Permission.Action action : permission.getActions()) {
1286             requirePermission("checkPermissions", action);
1287           }
1288         }
1289       }
1290       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
1291     } catch (IOException ioe) {
1292       ResponseConverter.setControllerException(controller, ioe);
1293     }
1294     done.run(response);
1295   }
1296 
1297   @Override
1298   public Service getService() {
1299     return AccessControlProtos.AccessControlService.newReflectiveService(this);
1300   }
1301 
1302   private byte[] getTableName(RegionCoprocessorEnvironment e) {
1303     HRegion region = e.getRegion();
1304     byte[] tableName = null;
1305 
1306     if (region != null) {
1307       HRegionInfo regionInfo = region.getRegionInfo();
1308       if (regionInfo != null) {
1309         tableName = regionInfo.getTableName();
1310       }
1311     }
1312     return tableName;
1313   }
1314 
1315 
1316   @Override
1317   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
1318       throws IOException {
1319     requirePermission("preClose", Action.ADMIN);
1320   }
1321 
1322   private void isSystemOrSuperUser(Configuration conf) throws IOException {
1323     User user = User.getCurrent();
1324     if (user == null) {
1325       throw new IOException("Unable to obtain the current user, " +
1326         "authorization checks for internal operations will not work correctly!");
1327     }
1328 
1329     String currentUser = user.getShortName();
1330     List<String> superusers = Lists.asList(currentUser, conf.getStrings(
1331       AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
1332 
1333     User activeUser = getActiveUser();
1334     if (!(superusers.contains(activeUser.getShortName()))) {
1335       throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null") +
1336         "is not system or super user.");
1337     }
1338   }
1339 
1340   private boolean isSpecialTable(HRegionInfo regionInfo) {
1341     byte[] tableName = regionInfo.getTableName();
1342     return Arrays.equals(tableName, AccessControlLists.ACL_TABLE_NAME)
1343         || Arrays.equals(tableName, Bytes.toBytes("-ROOT-"))
1344         || Arrays.equals(tableName, Bytes.toBytes(".META."));
1345   }
1346 
1347   @Override
1348   public void preStopRegionServer(
1349       ObserverContext<RegionServerCoprocessorEnvironment> env)
1350       throws IOException {
1351     requirePermission("preStopRegionServer", Permission.Action.ADMIN);
1352   }
1353 
1354   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
1355       byte[] qualifier) {
1356     if (family == null) {
1357       return null;
1358     }
1359 
1360     Map<byte[], Collection<byte[]>> familyMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
1361     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
1362     return familyMap;
1363   }
1364 }