View Javadoc

1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  
15  package org.apache.hadoop.hbase.security.access;
16  
17  import java.io.IOException;
18  import java.net.InetAddress;
19  import java.util.Collection;
20  import java.util.Collections;
21  import java.util.Iterator;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Set;
25  import java.util.TreeMap;
26  import java.util.TreeSet;
27  
28  import com.google.protobuf.Message;
29  import com.google.protobuf.RpcCallback;
30  import com.google.protobuf.RpcController;
31  import com.google.protobuf.Service;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.CoprocessorEnvironment;
39  import org.apache.hadoop.hbase.DoNotRetryIOException;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.KeyValue;
46  import org.apache.hadoop.hbase.KeyValueUtil;
47  import org.apache.hadoop.hbase.NamespaceDescriptor;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.TableNotDisabledException;
50  import org.apache.hadoop.hbase.TableNotFoundException;
51  import org.apache.hadoop.hbase.Tag;
52  import org.apache.hadoop.hbase.client.Append;
53  import org.apache.hadoop.hbase.client.Delete;
54  import org.apache.hadoop.hbase.client.Get;
55  import org.apache.hadoop.hbase.client.Increment;
56  import org.apache.hadoop.hbase.client.Mutation;
57  import org.apache.hadoop.hbase.client.Put;
58  import org.apache.hadoop.hbase.client.Query;
59  import org.apache.hadoop.hbase.client.Result;
60  import org.apache.hadoop.hbase.client.Scan;
61  import org.apache.hadoop.hbase.client.Durability;
62  import org.apache.hadoop.hbase.coprocessor.*;
63  import org.apache.hadoop.hbase.filter.CompareFilter;
64  import org.apache.hadoop.hbase.filter.Filter;
65  import org.apache.hadoop.hbase.filter.FilterList;
66  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
67  import org.apache.hadoop.hbase.io.hfile.HFile;
68  import org.apache.hadoop.hbase.ipc.RequestContext;
69  import org.apache.hadoop.hbase.master.MasterServices;
70  import org.apache.hadoop.hbase.master.RegionPlan;
71  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
72  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
73  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
74  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
75  import org.apache.hadoop.hbase.regionserver.HRegion;
76  import org.apache.hadoop.hbase.regionserver.InternalScanner;
77  import org.apache.hadoop.hbase.regionserver.RegionScanner;
78  import org.apache.hadoop.hbase.regionserver.Store;
79  import org.apache.hadoop.hbase.regionserver.ScanType;
80  import org.apache.hadoop.hbase.regionserver.StoreFile;
81  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
82  import org.apache.hadoop.hbase.security.AccessDeniedException;
83  import org.apache.hadoop.hbase.security.User;
84  import org.apache.hadoop.hbase.security.UserProvider;
85  import org.apache.hadoop.hbase.security.access.Permission.Action;
86  import org.apache.hadoop.hbase.util.Bytes;
87  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
88  import org.apache.hadoop.hbase.util.Pair;
89  
90  import com.google.common.collect.ArrayListMultimap;
91  import com.google.common.collect.ImmutableSet;
92  import com.google.common.collect.ListMultimap;
93  import com.google.common.collect.Lists;
94  import com.google.common.collect.MapMaker;
95  import com.google.common.collect.Sets;
96  
97  import static org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
98  
99  /**
100  * Provides basic authorization checks for data access and administrative
101  * operations.
102  *
103  * <p>
104  * {@code AccessController} performs authorization checks for HBase operations
105  * based on:
106  * <ul>
107  *   <li>the identity of the user performing the operation</li>
108  *   <li>the scope over which the operation is performed, in increasing
109  *   specificity: global, table, column family, or qualifier</li>
110  *   <li>the type of action being performed (as mapped to
111  *   {@link Permission.Action} values)</li>
112  * </ul>
113  * If the authorization check fails, an {@link AccessDeniedException}
114  * will be thrown for the operation.
115  * </p>
116  *
117  * <p>
118  * To perform authorization checks, {@code AccessController} relies on the
119  * {@link org.apache.hadoop.hbase.ipc.RpcServerEngine} being loaded to provide
120  * the user identities for remote requests.
121  * </p>
122  *
123  * <p>
124  * The access control lists used for authorization can be manipulated via the
125  * exposed {@link AccessControlService} Interface implementation, and the associated
126  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
127  * commands.
128  * </p>
129  */
130 public class AccessController extends BaseRegionObserver
131     implements MasterObserver, RegionServerObserver,
132       AccessControlService.Interface, CoprocessorService, EndpointObserver {
133 
134   public static final Log LOG = LogFactory.getLog(AccessController.class);
135 
136   private static final Log AUDITLOG =
137     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
138 
139   static final String EXEC_PERMISSION_CHECKS_KEY = "hbase.security.exec.permission.checks";
140   static final boolean DEFAULT_EXEC_PERMISSION_CHECKS = false;
141 
142   TableAuthManager authManager = null;
143 
144   // flags if we are running on a region of the _acl_ table
145   boolean aclRegion = false;
146 
147   // defined only for Endpoint implementation, so it can have way to
148   // access region services.
149   private RegionCoprocessorEnvironment regionEnv;
150 
151   /** Mapping of scanner instances to the user who created them */
152   private Map<InternalScanner,String> scannerOwners =
153       new MapMaker().weakKeys().makeMap();
154 
155   private UserProvider userProvider;
156 
157   // flags if we are able to support cell ACLs
158   boolean canPersistCellACLs;
159 
160   // flags if we should check EXEC permissions
161   boolean shouldCheckExecPermissions;
162 
163   private volatile boolean initialized = false;
164 
165   public HRegion getRegion() {
166     return regionEnv != null ? regionEnv.getRegion() : null;
167   }
168 
169   public TableAuthManager getAuthManager() {
170     return authManager;
171   }
172 
173   void initialize(RegionCoprocessorEnvironment e) throws IOException {
174     final HRegion region = e.getRegion();
175     Map<byte[], ListMultimap<String,TablePermission>> tables =
176         AccessControlLists.loadAll(region);
177     // For each table, write out the table's permissions to the respective
178     // znode for that table.
179     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
180       tables.entrySet()) {
181       byte[] entry = t.getKey();
182       ListMultimap<String,TablePermission> perms = t.getValue();
183       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, e.getConfiguration());
184       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
185     }
186     shouldCheckExecPermissions = e.getConfiguration().getBoolean(EXEC_PERMISSION_CHECKS_KEY,
187       DEFAULT_EXEC_PERMISSION_CHECKS);
188     initialized = true;
189   }
190 
191   /**
192    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
193    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
194    * table updates.
195    */
196   void updateACL(RegionCoprocessorEnvironment e,
197       final Map<byte[], List<Cell>> familyMap) {
198     Set<byte[]> entries =
199         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
200     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
201       List<Cell> cells = f.getValue();
202       for (Cell cell: cells) {
203         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
204         if (Bytes.equals(kv.getFamilyArray(), kv.getFamilyOffset(),
205             kv.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
206             AccessControlLists.ACL_LIST_FAMILY.length)) {
207           entries.add(kv.getRow());
208         }
209       }
210     }
211     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
212     Configuration conf = regionEnv.getConfiguration();
213     for (byte[] entry: entries) {
214       try {
215         ListMultimap<String,TablePermission> perms =
216           AccessControlLists.getPermissions(conf, entry);
217         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
218         zkw.writeToZookeeper(entry, serialized);
219       } catch (IOException ex) {
220         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
221             ex);
222       }
223     }
224   }
225 
226   /**
227    * Check the current user for authorization to perform a specific action
228    * against the given set of row data.
229    *
230    * <p>Note: Ordering of the authorization checks
231    * has been carefully optimized to short-circuit the most common requests
232    * and minimize the amount of processing required.</p>
233    *
234    * @param permRequest the action being requested
235    * @param e the coprocessor environment
236    * @param families the map of column families to qualifiers present in
237    * the request
238    * @return an authorization result
239    */
240   AuthResult permissionGranted(String request, User user, Permission.Action permRequest,
241       RegionCoprocessorEnvironment e,
242       Map<byte [], ? extends Collection<?>> families) {
243     HRegionInfo hri = e.getRegion().getRegionInfo();
244     TableName tableName = hri.getTable();
245 
246     // 1. All users need read access to hbase:meta table.
247     // this is a very common operation, so deal with it quickly.
248     if (hri.isMetaRegion()) {
249       if (permRequest == Permission.Action.READ) {
250         return AuthResult.allow(request, "All users allowed", user,
251           permRequest, tableName, families);
252       }
253     }
254 
255     if (user == null) {
256       return AuthResult.deny(request, "No user associated with request!", null,
257         permRequest, tableName, families);
258     }
259 
260     // Users with CREATE/ADMIN rights need to modify hbase:meta and _acl_ table
261     // e.g. When a new table is created a new entry in hbase:meta is added,
262     // so the user need to be allowed to write on it.
263     // e.g. When a table is removed an entry is removed from hbase:meta and _acl_
264     // and the user need to be allowed to write on both tables.
265     if (permRequest == Permission.Action.WRITE &&
266        (hri.isMetaRegion() ||
267         Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) &&
268        (authManager.authorize(user, Permission.Action.CREATE) ||
269         authManager.authorize(user, Permission.Action.ADMIN)))
270     {
271        return AuthResult.allow(request, "Table permission granted", user,
272         permRequest, tableName, families);
273     }
274 
275     // 2. check for the table-level, if successful we can short-circuit
276     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
277       return AuthResult.allow(request, "Table permission granted", user,
278         permRequest, tableName, families);
279     }
280 
281     // 3. check permissions against the requested families
282     if (families != null && families.size() > 0) {
283       // all families must pass
284       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
285         // a) check for family level access
286         if (authManager.authorize(user, tableName, family.getKey(),
287             permRequest)) {
288           continue;  // family-level permission overrides per-qualifier
289         }
290 
291         // b) qualifier level access can still succeed
292         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
293           if (family.getValue() instanceof Set) {
294             // for each qualifier of the family
295             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
296             for (byte[] qualifier : familySet) {
297               if (!authManager.authorize(user, tableName, family.getKey(),
298                                          qualifier, permRequest)) {
299                 return AuthResult.deny(request, "Failed qualifier check", user,
300                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
301               }
302             }
303           } else if (family.getValue() instanceof List) { // List<KeyValue>
304             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
305             for (KeyValue kv : kvList) {
306               if (!authManager.authorize(user, tableName, family.getKey(),
307                       kv.getQualifier(), permRequest)) {
308                 return AuthResult.deny(request, "Failed qualifier check", user,
309                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
310               }
311             }
312           }
313         } else {
314           // no qualifiers and family-level check already failed
315           return AuthResult.deny(request, "Failed family check", user, permRequest,
316               tableName, makeFamilyMap(family.getKey(), null));
317         }
318       }
319 
320       // all family checks passed
321       return AuthResult.allow(request, "All family checks passed", user, permRequest,
322           tableName, families);
323     }
324 
325     // 4. no families to check and table level access failed
326     return AuthResult.deny(request, "No families to check and table permission failed",
327         user, permRequest, tableName, families);
328   }
329 
330   private void logResult(AuthResult result) {
331     if (AUDITLOG.isTraceEnabled()) {
332       RequestContext ctx = RequestContext.get();
333       InetAddress remoteAddr = null;
334       if (ctx != null) {
335         remoteAddr = ctx.getRemoteAddress();
336       }
337       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
338           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
339           "; reason: " + result.getReason() +
340           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
341           "; request: " + result.getRequest() +
342           "; context: " + result.toContextString());
343     }
344   }
345 
346   /**
347    * Returns the active user to which authorization checks should be applied.
348    * If we are in the context of an RPC call, the remote user is used,
349    * otherwise the currently logged in user is used.
350    */
351   private User getActiveUser() throws IOException {
352     User user = RequestContext.getRequestUser();
353     if (!RequestContext.isInRequestContext()) {
354       // for non-rpc handling, fallback to system user
355       user = userProvider.getCurrent();
356     }
357     return user;
358   }
359 
360   /**
361    * Authorizes that the current user has any of the given permissions for the
362    * given table, column family and column qualifier.
363    * @param tableName Table requested
364    * @param family Column family requested
365    * @param qualifier Column qualifier requested
366    * @throws IOException if obtaining the current user fails
367    * @throws AccessDeniedException if user has no authorization
368    */
369   private void requirePermission(String request, TableName tableName, byte[] family, byte[] qualifier,
370       Action... permissions) throws IOException {
371     User user = getActiveUser();
372     AuthResult result = null;
373 
374     for (Action permission : permissions) {
375       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
376         result = AuthResult.allow(request, "Table permission granted", user,
377                                   permission, tableName, family, qualifier);
378         break;
379       } else {
380         // rest of the world
381         result = AuthResult.deny(request, "Insufficient permissions", user,
382                                  permission, tableName, family, qualifier);
383       }
384     }
385     logResult(result);
386     if (!result.isAllowed()) {
387       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
388     }
389   }
390 
391   /**
392    * Authorizes that the current user has global privileges for the given action.
393    * @param perm The action being requested
394    * @throws IOException if obtaining the current user fails
395    * @throws AccessDeniedException if authorization is denied
396    */
397   private void requirePermission(String request, Permission.Action perm) throws IOException {
398     requireGlobalPermission(request, perm, null, null);
399   }
400 
401   /**
402    * Authorizes that the current user has permission to perform the given
403    * action on the set of table column families.
404    * @param perm Action that is required
405    * @param env The current coprocessor environment
406    * @param families The map of column families-qualifiers.
407    * @throws AccessDeniedException if the authorization check failed
408    */
409   private void requirePermission(String request, Permission.Action perm,
410         RegionCoprocessorEnvironment env,
411         Map<byte[], ? extends Collection<?>> families)
412       throws IOException {
413     User user = getActiveUser();
414     AuthResult result = permissionGranted(request, user, perm, env, families);
415     logResult(result);
416 
417     if (!result.isAllowed()) {
418       throw new AccessDeniedException("Insufficient permissions (table=" +
419         env.getRegion().getTableDesc().getTableName()+
420         ((families != null && families.size() > 0) ? ", family: " +
421         result.toFamilyString() : "") + ", action=" +
422         perm.toString() + ")");
423     }
424   }
425 
426   /**
427    * Checks that the user has the given global permission. The generated
428    * audit log message will contain context information for the operation
429    * being authorized, based on the given parameters.
430    * @param perm Action being requested
431    * @param tableName Affected table name.
432    * @param familyMap Affected column families.
433    */
434   private void requireGlobalPermission(String request, Permission.Action perm, TableName tableName,
435       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
436     User user = getActiveUser();
437     if (authManager.authorize(user, perm)) {
438       logResult(AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap));
439     } else {
440       logResult(AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap));
441       throw new AccessDeniedException("Insufficient permissions for user '" +
442           (user != null ? user.getShortName() : "null") +"' (global, action=" +
443           perm.toString() + ")");
444     }
445   }
446 
447   /**
448    * Checks that the user has the given global permission. The generated
449    * audit log message will contain context information for the operation
450    * being authorized, based on the given parameters.
451    * @param perm Action being requested
452    * @param namespace
453    */
454   private void requireGlobalPermission(String request, Permission.Action perm,
455                                        String namespace) throws IOException {
456     User user = getActiveUser();
457     if (authManager.authorize(user, perm)) {
458       logResult(AuthResult.allow(request, "Global check allowed", user, perm, namespace));
459     } else {
460       logResult(AuthResult.deny(request, "Global check failed", user, perm, namespace));
461       throw new AccessDeniedException("Insufficient permissions for user '" +
462           (user != null ? user.getShortName() : "null") +"' (global, action=" +
463           perm.toString() + ")");
464     }
465   }
466 
467   private void requireCoveringPermission(String request, RegionCoprocessorEnvironment e,
468       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long timestamp,
469       boolean allVersions, Action...actions) throws IOException {
470     User user = getActiveUser();
471 
472     // First check table or CF level permissions, if they grant access we can
473     // early out before needing to enumerate over per KV perms.
474 
475     List<Action> cellCheckActions = Lists.newArrayList();
476     // TODO: permissionGranted should support checking multiple actions or
477     // we should convert actions into a bitmap and pass that around. See
478     // HBASE-7123.
479     AuthResult results[] = new AuthResult[actions.length];
480     for (int i = 0; i < actions.length; i++) {
481       results[i] = permissionGranted(request, user, actions[i], e, familyMap);
482       if (!results[i].isAllowed()) {
483         if (LOG.isTraceEnabled()) {
484           LOG.trace("Got " + results[i] + ", added to cellCheckActions");
485         }
486         cellCheckActions.add(actions[i]);
487       }
488     }
489     // If all permissions checks passed, we can early out
490     if (cellCheckActions.isEmpty()) {
491       if (LOG.isTraceEnabled()) {
492         LOG.trace("All permissions checks passed, we can early out");
493       }
494       for (int i = 0; i < results.length; i++) {
495         logResult(results[i]);
496       }
497       return;
498     }
499 
500     // Table or CF permissions do not allow, enumerate the covered KVs. We
501     // can stop at the first which does not grant access.
502     int cellsChecked = 0;
503     if (canPersistCellACLs) {
504       Get get = new Get(row);
505       if (timestamp != HConstants.LATEST_TIMESTAMP) get.setTimeStamp(timestamp);
506       get.setMaxResultsPerColumnFamily(1); // Hold down memory use on wide rows
507       if (allVersions) {
508         get.setMaxVersions();
509       } else {
510         get.setMaxVersions(1);
511       }
512       for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
513         byte[] col = entry.getKey();
514         // TODO: HBASE-7114 could possibly unify the collection type in family
515         // maps so we would not need to do this
516         if (entry.getValue() instanceof Set) {
517           Set<byte[]> set = (Set<byte[]>)entry.getValue();
518           if (set == null || set.isEmpty()) {
519             get.addFamily(col);
520           } else {
521             for (byte[] qual: set) {
522               get.addColumn(col, qual);
523             }
524           }
525         } else if (entry.getValue() instanceof List) {
526           List<Cell> list = (List<Cell>)entry.getValue();
527           if (list == null || list.isEmpty()) {
528             get.addFamily(col);
529           } else {
530             for (Cell cell: list) {
531               get.addColumn(col, CellUtil.cloneQualifier(cell));
532             }
533           }
534         } else {
535           throw new RuntimeException("Unhandled collection type " +
536             entry.getValue().getClass().getName());
537         }
538       }
539       if (LOG.isTraceEnabled()) {
540         LOG.trace("Scanning for cells with " + get);
541       }
542       RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
543       List<Cell> cells = Lists.newArrayList();
544       try {
545         boolean more = false;
546         do {
547           cells.clear();
548           more = scanner.next(cells);
549           for (Cell cell: cells) {
550             if (LOG.isTraceEnabled()) {
551               LOG.trace("Found cell " + cell);
552             }
553             for (Action action: cellCheckActions) {
554               // Are there permissions for this user for the cell?
555               if (!authManager.authorize(user, getTableName(e), cell, false, action)) {
556                 AuthResult authResult = AuthResult.deny(request, "Insufficient permissions",
557                   user, action, getTableName(e), CellUtil.cloneFamily(cell),
558                   CellUtil.cloneQualifier(cell));
559                 logResult(authResult);
560                 throw new AccessDeniedException("Insufficient permissions " +
561                   authResult.toContextString());
562               }
563             }
564             cellsChecked++;
565           }
566         } while (more);
567       } catch (AccessDeniedException ex) {
568         throw ex;
569       } catch (IOException ex) {
570         LOG.error("Exception while getting cells to calculate covering permission", ex);
571       } finally {
572         scanner.close();
573       }
574     }
575 
576     // If there were no cells to check, throw the ADE
577     if (cellsChecked < 1) {
578       if (LOG.isTraceEnabled()) {
579         LOG.trace("No cells found with scan");
580       }
581       AuthResult authResult = AuthResult.deny(request, "Insufficient permissions",
582         user, cellCheckActions.get(0), getTableName(e), familyMap);
583       logResult(authResult);
584       throw new AccessDeniedException("Insufficient permissions " +
585         authResult.toContextString());
586     }
587 
588     // Log that authentication succeeded. We need to trade off logging maybe
589     // thousands of fine grained decisions with providing detail.
590     for (byte[] family: familyMap.keySet()) {
591       for (Action action: actions) {
592         logResult(AuthResult.allow(request, "Permission granted", user, action,
593           getTableName(e), family, null));
594       }
595     }
596   }
597 
598   private void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
599     // Iterate over the entries in the familyMap, replacing the cells therein
600     // with new cells including the ACL data
601     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
602       List<Cell> newCells = Lists.newArrayList();
603       for (Cell cell: e.getValue()) {
604         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
605         byte[] tagBytes = CellUtil.getTagArray(cell);
606         Iterator<Tag> tagIterator = CellUtil.tagsIterator(tagBytes, 0, tagBytes.length);
607         while (tagIterator.hasNext()) {
608           tags.add(tagIterator.next());
609         }
610         // Ensure KeyValue so we can do a scatter gather copy. This is only a win if the
611         // incoming cell type is actually KeyValue.
612         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
613         newCells.add(
614           new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
615             kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(),
616             kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
617             kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()),
618             kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(),
619             tags));
620       }
621       // This is supposed to be safe, won't CME
622       e.setValue(newCells);
623     }
624   }
625 
626   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
627       final Query query) throws IOException {
628     TableName tableName = getTableName(c.getEnvironment());
629     User activeUser = getActiveUser();
630     Filter filter = query.getFilter();
631     boolean cellFirstStrategy = query.getACLStrategy();
632     // Don't wrap an AccessControlFilter
633     if (filter != null && filter instanceof AccessControlFilter) {
634       return;
635     }
636     Filter newFilter = (filter != null)
637       ? new FilterList(FilterList.Operator.MUST_PASS_ALL,
638           Lists.newArrayList(
639             new AccessControlFilter(authManager, activeUser, tableName, cellFirstStrategy),
640             filter))
641       : new AccessControlFilter(authManager, activeUser, tableName, cellFirstStrategy);
642     query.setFilter(newFilter);
643   }
644 
645   /* ---- MasterObserver implementation ---- */
646 
647   public void start(CoprocessorEnvironment env) throws IOException {
648     canPersistCellACLs = HFile.getFormatVersion(env.getConfiguration()) >=
649       HFile.MIN_FORMAT_VERSION_WITH_TAGS;
650     if (!canPersistCellACLs) {
651       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
652           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
653           + " accordingly.");
654     }
655     ZooKeeperWatcher zk = null;
656     if (env instanceof MasterCoprocessorEnvironment) {
657       // if running on HMaster
658       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
659       zk = mEnv.getMasterServices().getZooKeeper();      
660     } else if (env instanceof RegionServerCoprocessorEnvironment) {      
661       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
662       zk = rsEnv.getRegionServerServices().getZooKeeper();      
663     } else if (env instanceof RegionCoprocessorEnvironment) {
664       // if running at region
665       regionEnv = (RegionCoprocessorEnvironment) env;
666       zk = regionEnv.getRegionServerServices().getZooKeeper();
667     }
668 
669     // set the user-provider.
670     this.userProvider = UserProvider.instantiate(env.getConfiguration());
671 
672     // If zk is null or IOException while obtaining auth manager,
673     // throw RuntimeException so that the coprocessor is unloaded.
674     if (zk != null) {
675       try {
676         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
677       } catch (IOException ioe) {
678         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
679       }
680     } else {
681       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
682     }
683   }
684 
685   public void stop(CoprocessorEnvironment env) {
686 
687   }
688 
689   @Override
690   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
691       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
692     Set<byte[]> families = desc.getFamiliesKeys();
693     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
694     for (byte[] family: families) {
695       familyMap.put(family, null);
696     }
697     requireGlobalPermission("createTable", Permission.Action.CREATE, desc.getTableName(), familyMap);
698   }
699 
700   @Override
701   public void preCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
702       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {}
703 
704   @Override
705   public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
706       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
707     if (!AccessControlLists.isAclTable(desc)) {
708       String owner = desc.getOwnerString();
709       // default the table owner to current user, if not specified.
710       if (owner == null) owner = getActiveUser().getShortName();
711       UserPermission userperm = new UserPermission(Bytes.toBytes(owner), desc.getTableName(), null,
712           Action.values());
713       AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(), userperm);
714     }
715   }
716 
717   @Override
718   public void postCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
719       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {}
720 
721   @Override
722   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
723       throws IOException {
724     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
725   }
726 
727   @Override
728   public void preDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
729       TableName tableName) throws IOException {}
730 
731   @Override
732   public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
733       TableName tableName) throws IOException {
734     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(), tableName);
735   }
736 
737   @Override
738   public void postDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
739       TableName tableName) throws IOException {}
740 
741   @Override
742   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
743       HTableDescriptor htd) throws IOException {
744     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
745   }
746 
747   @Override
748   public void preModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
749       TableName tableName, HTableDescriptor htd) throws IOException {}
750 
751   @Override
752   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
753       TableName tableName, HTableDescriptor htd) throws IOException {
754     String owner = htd.getOwnerString();
755     // default the table owner to current user, if not specified.
756     if (owner == null) owner = getActiveUser().getShortName();
757     UserPermission userperm = new UserPermission(Bytes.toBytes(owner), htd.getTableName(), null,
758         Action.values());
759     AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(), userperm);
760   }
761 
762   @Override
763   public void postModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
764       TableName tableName, HTableDescriptor htd) throws IOException {}
765 
766 
767   @Override
768   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
769       HColumnDescriptor column) throws IOException {
770     requirePermission("addColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
771   }
772 
773   @Override
774   public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
775       TableName tableName, HColumnDescriptor column) throws IOException {}
776 
777   @Override
778   public void postAddColumn(ObserverContext<MasterCoprocessorEnvironment> c,
779       TableName tableName, HColumnDescriptor column) throws IOException {}
780 
781   @Override
782   public void postAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
783       TableName tableName, HColumnDescriptor column) throws IOException {}
784 
785   @Override
786   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
787       HColumnDescriptor descriptor) throws IOException {
788     requirePermission("modifyColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
789   }
790 
791   @Override
792   public void preModifyColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
793       TableName tableName, HColumnDescriptor descriptor) throws IOException {}
794 
795   @Override
796   public void postModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c,
797       TableName tableName, HColumnDescriptor descriptor) throws IOException {}
798 
799   @Override
800   public void postModifyColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
801       TableName tableName, HColumnDescriptor descriptor) throws IOException {}
802 
803   @Override
804   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
805       byte[] col) throws IOException {
806     requirePermission("deleteColumn", tableName, null, null, Action.ADMIN, Action.CREATE);
807   }
808 
809   @Override
810   public void preDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
811       TableName tableName, byte[] col) throws IOException {}
812 
813   @Override
814   public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
815       TableName tableName, byte[] col) throws IOException {
816     AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(),
817                                               tableName, col);
818   }
819 
820   @Override
821   public void postDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
822       TableName tableName, byte[] col) throws IOException {}
823 
824   @Override
825   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
826       throws IOException {
827     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
828   }
829 
830   @Override
831   public void preEnableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
832       TableName tableName) throws IOException {}
833 
834   @Override
835   public void postEnableTable(ObserverContext<MasterCoprocessorEnvironment> c,
836       TableName tableName) throws IOException {}
837 
838   @Override
839   public void postEnableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
840       TableName tableName) throws IOException {}
841 
842   @Override
843   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
844       throws IOException {
845     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
846       throw new AccessDeniedException("Not allowed to disable "
847           + AccessControlLists.ACL_TABLE_NAME + " table.");
848     }
849     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
850   }
851 
852   @Override
853   public void preDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
854       TableName tableName) throws IOException {}
855 
856   @Override
857   public void postDisableTable(ObserverContext<MasterCoprocessorEnvironment> c,
858       TableName tableName) throws IOException {}
859 
860   @Override
861   public void postDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
862       TableName tableName) throws IOException {}
863 
864   @Override
865   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
866       ServerName srcServer, ServerName destServer) throws IOException {
867     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
868   }
869 
870   @Override
871   public void postMove(ObserverContext<MasterCoprocessorEnvironment> c,
872       HRegionInfo region, ServerName srcServer, ServerName destServer)
873     throws IOException {}
874 
875   @Override
876   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
877       throws IOException {
878     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
879   }
880 
881   @Override
882   public void postAssign(ObserverContext<MasterCoprocessorEnvironment> c,
883       HRegionInfo regionInfo) throws IOException {}
884 
885   @Override
886   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
887       boolean force) throws IOException {
888     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
889   }
890 
891   @Override
892   public void postUnassign(ObserverContext<MasterCoprocessorEnvironment> c,
893       HRegionInfo regionInfo, boolean force) throws IOException {}
894 
895   @Override
896   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
897       HRegionInfo regionInfo) throws IOException {
898     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
899   }
900 
901   @Override
902   public void postRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
903       HRegionInfo regionInfo) throws IOException {
904   }
905 
906   @Override
907   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
908       throws IOException {
909     requirePermission("balance", Permission.Action.ADMIN);
910   }
911   @Override
912   public void postBalance(ObserverContext<MasterCoprocessorEnvironment> c, List<RegionPlan> plans)
913       throws IOException {}
914 
915   @Override
916   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
917       boolean newValue) throws IOException {
918     requirePermission("balanceSwitch", Permission.Action.ADMIN);
919     return newValue;
920   }
921 
922   @Override
923   public void postBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
924       boolean oldValue, boolean newValue) throws IOException {}
925 
926   @Override
927   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
928       throws IOException {
929     requirePermission("shutdown", Permission.Action.ADMIN);
930   }
931 
932   @Override
933   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
934       throws IOException {
935     requirePermission("stopMaster", Permission.Action.ADMIN);
936   }
937 
938   @Override
939   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
940       throws IOException {
941     // initialize the ACL storage table
942     AccessControlLists.init(ctx.getEnvironment().getMasterServices());
943   }
944 
945   @Override
946   public void preMasterInitialization(
947       ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
948   }
949 
950   @Override
951   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
952       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
953       throws IOException {
954     requirePermission("snapshot", Permission.Action.ADMIN);
955   }
956 
957   @Override
958   public void postSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
959       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
960       throws IOException {
961   }
962 
963   @Override
964   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
965       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
966       throws IOException {
967     requirePermission("clone", Permission.Action.ADMIN);
968   }
969 
970   @Override
971   public void postCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
972       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
973       throws IOException {
974   }
975 
976   @Override
977   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
978       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
979       throws IOException {
980     requirePermission("restore", Permission.Action.ADMIN);
981   }
982 
983   @Override
984   public void postRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
985       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
986       throws IOException {
987   }
988 
989   @Override
990   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
991       final SnapshotDescription snapshot) throws IOException {
992     requirePermission("deleteSnapshot", Permission.Action.ADMIN);
993   }
994 
995   @Override
996   public void postDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
997       final SnapshotDescription snapshot) throws IOException {
998   }
999 
1000   @Override
1001   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1002       NamespaceDescriptor ns) throws IOException {
1003     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1004   }
1005 
1006   @Override
1007   public void postCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1008       NamespaceDescriptor ns) throws IOException {
1009   }
1010 
1011   @Override
1012   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1013       throws IOException {
1014     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1015   }
1016 
1017   @Override
1018   public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1019                                   String namespace) throws IOException {
1020     AccessControlLists.removeNamespacePermissions(ctx.getEnvironment().getConfiguration(),
1021         namespace);
1022     LOG.info(namespace + "entry deleted in "+AccessControlLists.ACL_TABLE_NAME+" table.");
1023   }
1024 
1025   @Override
1026   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1027       NamespaceDescriptor ns) throws IOException {
1028     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1029   }
1030 
1031   @Override
1032   public void postModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1033                                   NamespaceDescriptor ns) throws IOException {
1034   }
1035 
1036   /* ---- RegionObserver implementation ---- */
1037 
1038   @Override
1039   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1040       throws IOException {
1041     RegionCoprocessorEnvironment env = e.getEnvironment();
1042     final HRegion region = env.getRegion();
1043     if (region == null) {
1044       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1045     } else {
1046       HRegionInfo regionInfo = region.getRegionInfo();
1047       if (regionInfo.getTable().isSystemTable()) {
1048         isSystemOrSuperUser(regionEnv.getConfiguration());
1049       } else {
1050         requirePermission("preOpen", Action.ADMIN);
1051       }
1052     }
1053   }
1054 
1055   @Override
1056   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1057     RegionCoprocessorEnvironment env = c.getEnvironment();
1058     final HRegion region = env.getRegion();
1059     if (region == null) {
1060       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1061       return;
1062     }
1063     if (AccessControlLists.isAclRegion(region)) {
1064       aclRegion = true;
1065       // When this region is under recovering state, initialize will be handled by postLogReplay
1066       if (!region.isRecovering()) {
1067         try {
1068           initialize(env);
1069         } catch (IOException ex) {
1070           // if we can't obtain permissions, it's better to fail
1071           // than perform checks incorrectly
1072           throw new RuntimeException("Failed to initialize permissions cache", ex);
1073         }
1074       }
1075     } else {
1076       initialized = true;
1077     }
1078   }
1079 
1080   @Override
1081   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1082     if (aclRegion) {
1083       try {
1084         initialize(c.getEnvironment());
1085       } catch (IOException ex) {
1086         // if we can't obtain permissions, it's better to fail
1087         // than perform checks incorrectly
1088         throw new RuntimeException("Failed to initialize permissions cache", ex);
1089       }
1090     }
1091   }
1092 
1093   @Override
1094   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1095     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1096   }
1097 
1098   @Override
1099   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1100     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1101   }
1102   
1103   @Override
1104   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1105       byte[] splitRow) throws IOException {
1106     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1107   }
1108 
1109   @Override
1110   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1111       final Store store, final InternalScanner scanner, final ScanType scanType)
1112           throws IOException {
1113     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1114     return scanner;
1115   }
1116 
1117   @Override
1118   public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> e,
1119       final Store store, final List<StoreFile> candidates) throws IOException {
1120     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1121   }
1122 
1123   @Override
1124   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1125       final byte [] row, final byte [] family, final Result result)
1126       throws IOException {
1127     assert family != null;
1128     requireCoveringPermission("getClosestRowBefore", c.getEnvironment(), row,
1129       makeFamilyMap(family, null), HConstants.LATEST_TIMESTAMP, false, Permission.Action.READ);
1130   }
1131 
1132   @Override
1133   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1134       final Get get, final List<Cell> result) throws IOException {
1135     internalPreRead(c, get);
1136   }
1137 
1138   @Override
1139   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1140       final Get get, final boolean exists) throws IOException {
1141     internalPreRead(c, get);
1142     return exists;
1143   }
1144 
1145   @Override
1146   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1147       final Put put, final WALEdit edit, final Durability durability)
1148       throws IOException {
1149     // Require WRITE permission to the table, CF, or top visible value, if any.
1150     // NOTE: We don't need to check the permissions for any earlier Puts
1151     // because we treat the ACLs in each Put as timestamped like any other
1152     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1153     // change the ACL of any previous Put. This allows simple evolution of
1154     // security policy over time without requiring expensive updates.
1155     requireCoveringPermission("put", c.getEnvironment(), put.getRow(),
1156       put.getFamilyCellMap(), put.getTimeStamp(), false, Permission.Action.WRITE);
1157     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1158     if (bytes != null) {
1159       if (canPersistCellACLs) {
1160         addCellPermissions(bytes, put.getFamilyCellMap());
1161       } else {
1162         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1163       }
1164     }
1165   }
1166 
1167   @Override
1168   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1169       final Put put, final WALEdit edit, final Durability durability) {
1170     if (aclRegion) {
1171       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1172     }
1173   }
1174 
1175   @Override
1176   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1177       final Delete delete, final WALEdit edit, final Durability durability)
1178       throws IOException {
1179     // An ACL on a delete is useless, we shouldn't allow it
1180     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1181       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1182     }
1183     // Require WRITE permissions on all cells covered by the delete. Unlike
1184     // for Puts we need to check all visible prior versions, because a major
1185     // compaction could remove them. If the user doesn't have permission to
1186     // overwrite any of the visible versions ('visible' defined as not covered
1187     // by a tombstone already) then we have to disallow this operation.
1188     requireCoveringPermission("delete", c.getEnvironment(), delete.getRow(),
1189       delete.getFamilyCellMap(), delete.getTimeStamp(), true, Action.WRITE);
1190   }
1191 
1192   @Override
1193   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1194       final Delete delete, final WALEdit edit, final Durability durability)
1195       throws IOException {
1196     if (aclRegion) {
1197       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1198     }
1199   }
1200 
1201   @Override
1202   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1203       final byte [] row, final byte [] family, final byte [] qualifier,
1204       final CompareFilter.CompareOp compareOp,
1205       final ByteArrayComparable comparator, final Put put,
1206       final boolean result) throws IOException {
1207     // Require READ and WRITE permissions on the table, CF, and KV to update
1208     requireCoveringPermission("checkAndPut", c.getEnvironment(), row,
1209       makeFamilyMap(family, qualifier), HConstants.LATEST_TIMESTAMP, false,
1210       Action.READ, Action.WRITE);
1211     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1212     if (bytes != null) {
1213       if (canPersistCellACLs) {
1214         addCellPermissions(bytes, put.getFamilyCellMap());
1215       } else {
1216         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1217       }
1218     }
1219     return result;
1220   }
1221 
1222   @Override
1223   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1224       final byte [] row, final byte [] family, final byte [] qualifier,
1225       final CompareFilter.CompareOp compareOp,
1226       final ByteArrayComparable comparator, final Delete delete,
1227       final boolean result) throws IOException {
1228     // An ACL on a delete is useless, we shouldn't allow it
1229     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1230       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1231           delete.toString());
1232     }
1233     // Require READ and WRITE permissions on the table, CF, and the KV covered
1234     // by the delete
1235     requireCoveringPermission("checkAndDelete", c.getEnvironment(), row,
1236       makeFamilyMap(family, qualifier), HConstants.LATEST_TIMESTAMP, false,
1237       Action.READ, Action.WRITE);
1238     return result;
1239   }
1240 
1241   @Override
1242   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1243       final byte [] row, final byte [] family, final byte [] qualifier,
1244       final long amount, final boolean writeToWAL)
1245       throws IOException {
1246     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1247     // incremented value
1248     requireCoveringPermission("incrementColumnValue", c.getEnvironment(), row,
1249       makeFamilyMap(family, qualifier), HConstants.LATEST_TIMESTAMP, false,
1250       Action.WRITE);
1251     return -1;
1252   }
1253 
1254   @Override
1255   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1256       throws IOException {
1257     // Require WRITE permission to the table, CF, and the KV to be appended
1258     requireCoveringPermission("append", c.getEnvironment(), append.getRow(),
1259       append.getFamilyCellMap(), append.getTimeStamp(), false,
1260       Action.WRITE);
1261     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1262     if (bytes != null) {
1263       if (canPersistCellACLs) {
1264         addCellPermissions(bytes, append.getFamilyCellMap());
1265       } else {
1266         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1267       }
1268     }
1269     return null;
1270   }
1271 
1272   @Override
1273   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1274       final Increment increment)
1275       throws IOException {
1276     // Require WRITE permission to the table, CF, and the KV to be replaced by
1277     // the incremented value
1278     requireCoveringPermission("increment", c.getEnvironment(), increment.getRow(),
1279       increment.getFamilyCellMap(), increment.getTimeRange().getMax(), false,
1280       Action.WRITE);
1281     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1282     if (bytes != null) {
1283       if (canPersistCellACLs) {
1284         addCellPermissions(bytes, increment.getFamilyCellMap());
1285       } else {
1286         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1287       }
1288     }
1289     return null;
1290   }
1291 
1292   @Override
1293   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
1294       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
1295     // If the HFile version is insufficient to persist tags, we won't have any
1296     // work to do here
1297     if (!canPersistCellACLs) {
1298       return newCell;
1299     }
1300 
1301     // Collect any ACLs from the old cell
1302     List<Tag> tags = Lists.newArrayList();
1303     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
1304     if (oldCell != null) {
1305       byte[] tagBytes = CellUtil.getTagArray(oldCell);
1306       Iterator<Tag> tagIterator = CellUtil.tagsIterator(tagBytes, 0, tagBytes.length);
1307       while (tagIterator.hasNext()) {
1308         Tag tag = tagIterator.next();
1309         if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
1310           if (LOG.isTraceEnabled()) {
1311             LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
1312               " length " + tag.getValue().length);
1313           }
1314           tags.add(tag);
1315         } else {
1316           ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
1317             AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
1318               tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
1319           perms.putAll(kvPerms);
1320         }
1321       }
1322     }
1323 
1324     // Do we have an ACL on the operation?
1325     byte[] aclBytes = mutation.getACL();
1326     if (aclBytes != null) {
1327       // Yes, use it
1328       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
1329     } else {
1330       // No, use what we carried forward
1331       if (perms != null) {
1332         // TODO: If we collected ACLs from more than one tag we may have a
1333         // List<Permission> of size > 1, this can be collapsed into a single
1334         // Permission
1335         if (LOG.isTraceEnabled()) {
1336           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
1337         }
1338         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
1339           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
1340       }
1341     }
1342 
1343     // If we have no tags to add, just return
1344     if (tags.isEmpty()) {
1345       return newCell;
1346     }
1347 
1348     // We need to create another KV, unfortunately, because the current new KV
1349     // has no space for tags
1350     KeyValue newKv = KeyValueUtil.ensureKeyValue(newCell);
1351     KeyValue rewriteKv = new KeyValue(newKv.getRowArray(), newKv.getRowOffset(), newKv.getRowLength(),
1352       newKv.getFamilyArray(), newKv.getFamilyOffset(), newKv.getFamilyLength(),
1353       newKv.getQualifierArray(), newKv.getQualifierOffset(), newKv.getQualifierLength(),
1354       newKv.getTimestamp(), KeyValue.Type.codeToType(newKv.getTypeByte()),
1355       newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(),
1356       tags);
1357     // Preserve mvcc data
1358     rewriteKv.setMvccVersion(newKv.getMvccVersion());
1359     return rewriteKv;
1360   }
1361 
1362   @Override
1363   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1364       final Scan scan, final RegionScanner s) throws IOException {
1365     internalPreRead(c, scan);
1366     return s;
1367   }
1368 
1369   @Override
1370   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
1371       final Scan scan, final RegionScanner s) throws IOException {
1372     User user = getActiveUser();
1373     if (user != null && user.getShortName() != null) {      // store reference to scanner owner for later checks
1374       scannerOwners.put(s, user.getShortName());
1375     }
1376     return s;
1377   }
1378 
1379   @Override
1380   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
1381       final InternalScanner s, final List<Result> result,
1382       final int limit, final boolean hasNext) throws IOException {
1383     requireScannerOwner(s);
1384     return hasNext;
1385   }
1386 
1387   @Override
1388   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1389       final InternalScanner s) throws IOException {
1390     requireScannerOwner(s);
1391   }
1392 
1393   @Override
1394   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
1395       final InternalScanner s) throws IOException {
1396     // clean up any associated owner mapping
1397     scannerOwners.remove(s);
1398   }
1399 
1400   /**
1401    * Verify, when servicing an RPC, that the caller is the scanner owner.
1402    * If so, we assume that access control is correctly enforced based on
1403    * the checks performed in preScannerOpen()
1404    */
1405   private void requireScannerOwner(InternalScanner s)
1406       throws AccessDeniedException {
1407     if (RequestContext.isInRequestContext()) {
1408       String requestUserName = RequestContext.getRequestUserName();
1409       String owner = scannerOwners.get(s);
1410       if (owner != null && !owner.equals(requestUserName)) {
1411         throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
1412       }
1413     }
1414   }
1415 
1416   /**
1417    * Verifies user has WRITE privileges on
1418    * the Column Families involved in the bulkLoadHFile
1419    * request. Specific Column Write privileges are presently
1420    * ignored.
1421    */
1422   @Override
1423   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
1424       List<Pair<byte[], String>> familyPaths) throws IOException {
1425     for(Pair<byte[],String> el : familyPaths) {
1426       requirePermission("preBulkLoadHFile",
1427           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
1428           el.getFirst(),
1429           null,
1430           Permission.Action.WRITE);
1431     }
1432   }
1433 
1434   private AuthResult hasSomeAccess(RegionCoprocessorEnvironment e, String method, Action action) throws IOException {
1435     User requestUser = getActiveUser();
1436     TableName tableName = e.getRegion().getTableDesc().getTableName();
1437     AuthResult authResult = permissionGranted(method, requestUser,
1438         action, e, Collections.EMPTY_MAP);
1439     if (!authResult.isAllowed()) {
1440       for(UserPermission userPerm:
1441           AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), tableName)) {
1442         for(Permission.Action userAction: userPerm.getActions()) {
1443           if(userAction.equals(action)) {
1444             return AuthResult.allow(method, "Access allowed", requestUser,
1445                 action, tableName, null, null);
1446           }
1447         }
1448       }
1449     }
1450     return authResult;
1451   }
1452 
1453   /**
1454    * Authorization check for
1455    * SecureBulkLoadProtocol.prepareBulkLoad()
1456    * @param e
1457    * @throws IOException
1458    */
1459   //TODO this should end up as a coprocessor hook
1460   public void prePrepareBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
1461     AuthResult authResult = hasSomeAccess(e, "prePrepareBulkLoad", Action.WRITE);
1462     logResult(authResult);
1463     if (!authResult.isAllowed()) {
1464       throw new AccessDeniedException("Insufficient permissions (table=" +
1465         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
1466     }
1467   }
1468 
1469   /**
1470    * Authorization security check for
1471    * SecureBulkLoadProtocol.cleanupBulkLoad()
1472    * @param e
1473    * @throws IOException
1474    */
1475   //TODO this should end up as a coprocessor hook
1476   public void preCleanupBulkLoad(RegionCoprocessorEnvironment e) throws IOException {
1477     AuthResult authResult = hasSomeAccess(e, "preCleanupBulkLoad", Action.WRITE);
1478     logResult(authResult);
1479     if (!authResult.isAllowed()) {
1480       throw new AccessDeniedException("Insufficient permissions (table=" +
1481         e.getRegion().getTableDesc().getTableName() + ", action=WRITE)");
1482     }
1483   }
1484 
1485   /* ---- EndpointObserver implementation ---- */
1486 
1487   @Override
1488   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1489       Service service, String methodName, Message request) throws IOException {
1490     // Don't intercept calls to our own AccessControlService, we check for
1491     // appropriate permissions in the service handlers
1492     if (shouldCheckExecPermissions && !(service instanceof AccessControlService)) {
1493       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
1494         methodName + ")",
1495         getTableName(ctx.getEnvironment()), null, null,
1496         Action.EXEC);
1497     }
1498     return request;
1499   }
1500 
1501   @Override
1502   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
1503       Service service, String methodName, Message request, Message.Builder responseBuilder)
1504       throws IOException { }
1505 
1506   /* ---- Protobuf AccessControlService implementation ---- */
1507 
1508   @Override
1509   public void grant(RpcController controller,
1510                     AccessControlProtos.GrantRequest request,
1511                     RpcCallback<AccessControlProtos.GrantResponse> done) {
1512     UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
1513     AccessControlProtos.GrantResponse response = null;
1514     try {
1515       // verify it's only running at .acl.
1516       if (aclRegion) {
1517         if (!initialized) {
1518           throw new CoprocessorException("AccessController not yet initialized");
1519         }
1520         if (LOG.isDebugEnabled()) {
1521           LOG.debug("Received request to grant access permission " + perm.toString());
1522         }
1523 
1524         switch(request.getUserPermission().getPermission().getType()) {
1525           case Global :
1526           case Table :
1527             requirePermission("grant", perm.getTableName(), perm.getFamily(),
1528                 perm.getQualifier(), Action.ADMIN);
1529             break;
1530           case Namespace :
1531             requireGlobalPermission("grant", Action.ADMIN, perm.getNamespace());
1532         }
1533 
1534         AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
1535         if (AUDITLOG.isTraceEnabled()) {
1536           // audit log should store permission changes in addition to auth results
1537           AUDITLOG.trace("Granted permission " + perm.toString());
1538         }
1539       } else {
1540         throw new CoprocessorException(AccessController.class, "This method "
1541             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
1542       }
1543       response = AccessControlProtos.GrantResponse.getDefaultInstance();
1544     } catch (IOException ioe) {
1545       // pass exception back up
1546       ResponseConverter.setControllerException(controller, ioe);
1547     }
1548     done.run(response);
1549   }
1550 
1551   @Override
1552   public void revoke(RpcController controller,
1553                      AccessControlProtos.RevokeRequest request,
1554                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
1555     UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
1556     AccessControlProtos.RevokeResponse response = null;
1557     try {
1558       // only allowed to be called on _acl_ region
1559       if (aclRegion) {
1560         if (!initialized) {
1561           throw new CoprocessorException("AccessController not yet initialized");
1562         }
1563         if (LOG.isDebugEnabled()) {
1564           LOG.debug("Received request to revoke access permission " + perm.toString());
1565         }
1566 
1567         switch(request.getUserPermission().getPermission().getType()) {
1568           case Global :
1569           case Table :
1570             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
1571                               perm.getQualifier(), Action.ADMIN);
1572             break;
1573           case Namespace :
1574             requireGlobalPermission("revoke", Action.ADMIN, perm.getNamespace());
1575         }
1576 
1577         AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm);
1578         if (AUDITLOG.isTraceEnabled()) {
1579           // audit log should record all permission changes
1580           AUDITLOG.trace("Revoked permission " + perm.toString());
1581         }
1582       } else {
1583         throw new CoprocessorException(AccessController.class, "This method "
1584             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
1585       }
1586       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
1587     } catch (IOException ioe) {
1588       // pass exception back up
1589       ResponseConverter.setControllerException(controller, ioe);
1590     }
1591     done.run(response);
1592   }
1593 
1594   @Override
1595   public void getUserPermissions(RpcController controller,
1596                                  AccessControlProtos.GetUserPermissionsRequest request,
1597                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
1598     AccessControlProtos.GetUserPermissionsResponse response = null;
1599     try {
1600       // only allowed to be called on _acl_ region
1601       if (aclRegion) {
1602         if (!initialized) {
1603           throw new CoprocessorException("AccessController not yet initialized");
1604         }
1605         List<UserPermission> perms = null;
1606         if(request.getType() == AccessControlProtos.Permission.Type.Table) {
1607           TableName table = null;
1608           if (request.hasTableName()) {
1609             table = ProtobufUtil.toTableName(request.getTableName());
1610           }
1611           requirePermission("userPermissions", table, null, null, Action.ADMIN);
1612 
1613           perms = AccessControlLists.getUserTablePermissions(
1614               regionEnv.getConfiguration(), table);
1615         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
1616           perms = AccessControlLists.getUserNamespacePermissions(
1617               regionEnv.getConfiguration(), request.getNamespaceName().toStringUtf8());
1618         } else {
1619           perms = AccessControlLists.getUserPermissions(
1620               regionEnv.getConfiguration(), null);
1621         }
1622         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
1623       } else {
1624         throw new CoprocessorException(AccessController.class, "This method "
1625             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
1626       }
1627     } catch (IOException ioe) {
1628       // pass exception back up
1629       ResponseConverter.setControllerException(controller, ioe);
1630     }
1631     done.run(response);
1632   }
1633 
1634   @Override
1635   public void checkPermissions(RpcController controller,
1636                                AccessControlProtos.CheckPermissionsRequest request,
1637                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
1638     Permission[] permissions = new Permission[request.getPermissionCount()];
1639     for (int i=0; i < request.getPermissionCount(); i++) {
1640       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
1641     }
1642     AccessControlProtos.CheckPermissionsResponse response = null;
1643     try {
1644       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
1645       for (Permission permission : permissions) {
1646         if (permission instanceof TablePermission) {
1647           TablePermission tperm = (TablePermission) permission;
1648           for (Permission.Action action : permission.getActions()) {
1649             if (!tperm.getTableName().equals(tableName)) {
1650               throw new CoprocessorException(AccessController.class, String.format("This method "
1651                   + "can only execute at the table specified in TablePermission. " +
1652                   "Table of the region:%s , requested table:%s", tableName,
1653                   tperm.getTableName()));
1654             }
1655 
1656             Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
1657             if (tperm.getFamily() != null) {
1658               if (tperm.getQualifier() != null) {
1659                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
1660                 qualifiers.add(tperm.getQualifier());
1661                 familyMap.put(tperm.getFamily(), qualifiers);
1662               } else {
1663                 familyMap.put(tperm.getFamily(), null);
1664               }
1665             }
1666 
1667             requirePermission("checkPermissions", action, regionEnv, familyMap);
1668           }
1669 
1670         } else {
1671           for (Permission.Action action : permission.getActions()) {
1672             requirePermission("checkPermissions", action);
1673           }
1674         }
1675       }
1676       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
1677     } catch (IOException ioe) {
1678       ResponseConverter.setControllerException(controller, ioe);
1679     }
1680     done.run(response);
1681   }
1682 
1683   @Override
1684   public Service getService() {
1685     return AccessControlProtos.AccessControlService.newReflectiveService(this);
1686   }
1687 
1688   private HRegion getRegion(RegionCoprocessorEnvironment e) {
1689     return e.getRegion();
1690   }
1691 
1692   private TableName getTableName(RegionCoprocessorEnvironment e) {
1693     HRegion region = e.getRegion();
1694     TableName tableName = null;
1695 
1696     if (region != null) {
1697       HRegionInfo regionInfo = region.getRegionInfo();
1698       if (regionInfo != null) {
1699         tableName = regionInfo.getTable();
1700       }
1701     }
1702     return tableName;
1703   }
1704 
1705 
1706   @Override
1707   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
1708       throws IOException {
1709     requirePermission("preClose", Action.ADMIN);
1710   }
1711 
1712   private void isSystemOrSuperUser(Configuration conf) throws IOException {
1713     User user = userProvider.getCurrent();
1714     if (user == null) {
1715       throw new IOException("Unable to obtain the current user, " +
1716         "authorization checks for internal operations will not work correctly!");
1717     }
1718 
1719     String currentUser = user.getShortName();
1720     List<String> superusers = Lists.asList(currentUser, conf.getStrings(
1721       AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
1722 
1723     User activeUser = getActiveUser();
1724     if (!(superusers.contains(activeUser.getShortName()))) {
1725       throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null") +
1726         "is not system or super user.");
1727     }
1728   }
1729 
1730   @Override
1731   public void preStopRegionServer(
1732       ObserverContext<RegionServerCoprocessorEnvironment> env)
1733       throws IOException {
1734     requirePermission("preStopRegionServer", Permission.Action.ADMIN);
1735   }
1736 
1737   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
1738       byte[] qualifier) {
1739     if (family == null) {
1740       return null;
1741     }
1742 
1743     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
1744     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
1745     return familyMap;
1746   }
1747 
1748   @Override
1749   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1750       List<TableName> tableNamesList,
1751       List<HTableDescriptor> descriptors) throws IOException {
1752     // If the list is empty, this is a request for all table descriptors and requires GLOBAL
1753     // ADMIN privs.
1754     if (tableNamesList == null || tableNamesList.isEmpty()) {
1755       requireGlobalPermission("getTableDescriptors", Permission.Action.ADMIN, null, null);
1756     }
1757     // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
1758     // request can be granted.
1759     else {
1760       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
1761       for (TableName tableName: tableNamesList) {
1762         // Do not deny if the table does not exist
1763         try {
1764           masterServices.checkTableModifiable(tableName);
1765         } catch (TableNotFoundException ex) {
1766           // Skip checks for a table that does not exist
1767           continue;
1768         } catch (TableNotDisabledException ex) {
1769           // We don't care about this
1770         }
1771         requirePermission("getTableDescriptors", tableName, null, null,
1772           Permission.Action.ADMIN, Permission.Action.CREATE);
1773       }
1774     }
1775   }
1776 
1777   @Override
1778   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1779       List<HTableDescriptor> descriptors) throws IOException {
1780   }
1781 
1782   @Override
1783   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA,
1784       HRegion regionB) throws IOException {
1785     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
1786       Action.ADMIN);
1787   }
1788 
1789   @Override
1790   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, HRegion regionA,
1791       HRegion regionB, HRegion mergedRegion) throws IOException { }
1792 
1793   @Override
1794   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
1795       HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException { }
1796 
1797   @Override
1798   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
1799       HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException { }
1800 
1801   @Override
1802   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
1803       HRegion regionA, HRegion regionB) throws IOException { }
1804 
1805   @Override
1806   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
1807       HRegion regionA, HRegion regionB) throws IOException { }
1808 }