View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.access;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.TreeMap;
32  import java.util.TreeSet;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.classification.InterfaceAudience;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.NamespaceDescriptor;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.Tag;
46  import org.apache.hadoop.hbase.TagType;
47  import org.apache.hadoop.hbase.client.Delete;
48  import org.apache.hadoop.hbase.client.Get;
49  import org.apache.hadoop.hbase.client.HTable;
50  import org.apache.hadoop.hbase.client.Put;
51  import org.apache.hadoop.hbase.client.Result;
52  import org.apache.hadoop.hbase.client.ResultScanner;
53  import org.apache.hadoop.hbase.client.Scan;
54  import org.apache.hadoop.hbase.exceptions.DeserializationException;
55  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
56  import org.apache.hadoop.hbase.filter.QualifierFilter;
57  import org.apache.hadoop.hbase.filter.RegexStringComparator;
58  import org.apache.hadoop.hbase.io.compress.Compression;
59  import org.apache.hadoop.hbase.master.MasterServices;
60  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
61  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
62  import org.apache.hadoop.hbase.regionserver.BloomType;
63  import org.apache.hadoop.hbase.regionserver.HRegion;
64  import org.apache.hadoop.hbase.regionserver.InternalScanner;
65  import org.apache.hadoop.hbase.security.User;
66  import org.apache.hadoop.hbase.util.Bytes;
67  import org.apache.hadoop.hbase.util.Pair;
68  import org.apache.hadoop.io.Text;
69  
70  import com.google.common.collect.ArrayListMultimap;
71  import com.google.common.collect.ListMultimap;
72  import com.google.common.collect.Lists;
73  import com.google.protobuf.InvalidProtocolBufferException;
74  
75  /**
76   * Maintains lists of permission grants to users and groups to allow for
77   * authorization checks by {@link AccessController}.
78   *
79   * <p>
80   * Access control lists are stored in an "internal" metadata table named
81   * {@code _acl_}. Each table's permission grants are stored as a separate row,
82   * keyed by the table name. KeyValues for permissions assignments are stored
83   * in one of the formats:
84   * <pre>
85   * Key                      Desc
86   * --------                 --------
87   * user                     table level permissions for a user [R=read, W=write]
88   * group                    table level permissions for a group
89   * user,family              column family level permissions for a user
90   * group,family             column family level permissions for a group
91   * user,family,qualifier    column qualifier level permissions for a user
92   * group,family,qualifier   column qualifier level permissions for a group
93   * </pre>
94   * All values are encoded as byte arrays containing the codes from the
95   * org.apache.hadoop.hbase.security.access.TablePermission.Action enum.
96   * </p>
97   */
98  @InterfaceAudience.Private
99  public class AccessControlLists {
100   /** Internal storage table for access control lists */
101   public static final TableName ACL_TABLE_NAME =
102       TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
103   public static final byte[] ACL_GLOBAL_NAME = ACL_TABLE_NAME.getName();
104   /** Column family used to store ACL grants */
105   public static final String ACL_LIST_FAMILY_STR = "l";
106   public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
107   /** KV tag to store per cell access control lists */
108   public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE;
109 
110   public static final char NAMESPACE_PREFIX = '@';
111 
112   /** Table descriptor for ACL internal table */
113   public static final HTableDescriptor ACL_TABLEDESC = new HTableDescriptor(ACL_TABLE_NAME);
114   static {
115     ACL_TABLEDESC.addFamily(
116         new HColumnDescriptor(ACL_LIST_FAMILY,
117             10, // Ten is arbitrary number.  Keep versions to help debugging.
118             Compression.Algorithm.NONE.getName(), true, true, 8 * 1024,
119             HConstants.FOREVER, BloomType.NONE.toString(),
120             HConstants.REPLICATION_SCOPE_LOCAL).
121             // Set cache data blocks in L1 if more than one cache tier deployed; e.g. this will
122             // be the case if we are using CombinedBlockCache (Bucket Cache).
123             setCacheDataInL1(true));
124   }
125 
126   /**
127    * Delimiter to separate user, column family, and qualifier in
128    * _acl_ table info: column keys */
129   public static final char ACL_KEY_DELIMITER = ',';
130   /** Prefix character to denote group names */
131   public static final String GROUP_PREFIX = "@";
132   /** Configuration key for superusers */
133   public static final String SUPERUSER_CONF_KEY = "hbase.superuser";
134 
135   private static Log LOG = LogFactory.getLog(AccessControlLists.class);
136 
137   /**
138    * Check for existence of {@code _acl_} table and create it if it does not exist
139    * @param master reference to HMaster
140    */
141   static void init(MasterServices master) throws IOException {
142     master.createTable(ACL_TABLEDESC, null);
143   }
144 
145   /**
146    * Stores a new user permission grant in the access control lists table.
147    * @param conf the configuration
148    * @param userPerm the details of the permission to be granted
149    * @throws IOException in the case of an error accessing the metadata table
150    */
151   static void addUserPermission(Configuration conf, UserPermission userPerm)
152       throws IOException {
153     Permission.Action[] actions = userPerm.getActions();
154     byte[] rowKey = userPermissionRowKey(userPerm);
155     Put p = new Put(rowKey);
156     byte[] key = userPermissionKey(userPerm);
157 
158     if ((actions == null) || (actions.length == 0)) {
159       String msg = "No actions associated with user '" + Bytes.toString(userPerm.getUser()) + "'";
160       LOG.warn(msg);
161       throw new IOException(msg);
162     }
163 
164     byte[] value = new byte[actions.length];
165     for (int i = 0; i < actions.length; i++) {
166       value[i] = actions[i].code();
167     }
168     p.addImmutable(ACL_LIST_FAMILY, key, value);
169     if (LOG.isDebugEnabled()) {
170       LOG.debug("Writing permission with rowKey "+
171           Bytes.toString(rowKey)+" "+
172           Bytes.toString(key)+": "+Bytes.toStringBinary(value)
173       );
174     }
175     HTable acls = null;
176     try {
177       acls = new HTable(conf, ACL_TABLE_NAME);
178       acls.put(p);
179     } finally {
180       if (acls != null) acls.close();
181     }
182   }
183 
184   /**
185    * Removes a previously granted permission from the stored access control
186    * lists.  The {@link TablePermission} being removed must exactly match what
187    * is stored -- no wildcard matching is attempted.  Ie, if user "bob" has
188    * been granted "READ" access to the "data" table, but only to column family
189    * plus qualifier "info:colA", then trying to call this method with only
190    * user "bob" and the table name "data" (but without specifying the
191    * column qualifier "info:colA") will have no effect.
192    *
193    * @param conf the configuration
194    * @param userPerm the details of the permission to be revoked
195    * @throws IOException if there is an error accessing the metadata table
196    */
197   static void removeUserPermission(Configuration conf, UserPermission userPerm)
198       throws IOException {
199     Delete d = new Delete(userPermissionRowKey(userPerm));
200     byte[] key = userPermissionKey(userPerm);
201 
202     if (LOG.isDebugEnabled()) {
203       LOG.debug("Removing permission "+ userPerm.toString());
204     }
205     d.deleteColumns(ACL_LIST_FAMILY, key);
206     HTable acls = null;
207     try {
208       acls = new HTable(conf, ACL_TABLE_NAME);
209       acls.delete(d);
210     } finally {
211       if (acls != null) acls.close();
212     }
213   }
214 
215   /**
216    * Remove specified table from the _acl_ table.
217    */
218   static void removeTablePermissions(Configuration conf, TableName tableName)
219       throws IOException{
220     Delete d = new Delete(tableName.getName());
221 
222     if (LOG.isDebugEnabled()) {
223       LOG.debug("Removing permissions of removed table "+ tableName);
224     }
225 
226     HTable acls = null;
227     try {
228       acls = new HTable(conf, ACL_TABLE_NAME);
229       acls.delete(d);
230     } finally {
231       if (acls != null) acls.close();
232     }
233   }
234 
235   /**
236    * Remove specified namespace from the acl table.
237    */
238   static void removeNamespacePermissions(Configuration conf, String namespace)
239       throws IOException{
240     Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
241 
242     if (LOG.isDebugEnabled()) {
243       LOG.debug("Removing permissions of removed namespace "+ namespace);
244     }
245 
246     HTable acls = null;
247     try {
248       acls = new HTable(conf, ACL_TABLE_NAME);
249       acls.delete(d);
250     } finally {
251       if (acls != null) acls.close();
252     }
253   }
254 
255   /**
256    * Remove specified table column from the acl table.
257    */
258   static void removeTablePermissions(Configuration conf, TableName tableName, byte[] column)
259       throws IOException{
260 
261     if (LOG.isDebugEnabled()) {
262       LOG.debug("Removing permissions of removed column " + Bytes.toString(column) +
263                 " from table "+ tableName);
264     }
265 
266     HTable acls = null;
267     try {
268       acls = new HTable(conf, ACL_TABLE_NAME);
269 
270       Scan scan = new Scan();
271       scan.addFamily(ACL_LIST_FAMILY);
272 
273       String columnName = Bytes.toString(column);
274       scan.setFilter(new QualifierFilter(CompareOp.EQUAL, new RegexStringComparator(
275                      String.format("(%s%s%s)|(%s%s)$",
276                      ACL_KEY_DELIMITER, columnName, ACL_KEY_DELIMITER,
277                      ACL_KEY_DELIMITER, columnName))));
278 
279       Set<byte[]> qualifierSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
280       ResultScanner scanner = acls.getScanner(scan);
281       try {
282         for (Result res : scanner) {
283           for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
284             qualifierSet.add(q);
285           }
286         }
287       } finally {
288         scanner.close();
289       }
290 
291       if (qualifierSet.size() > 0) {
292         Delete d = new Delete(tableName.getName());
293         for (byte[] qualifier : qualifierSet) {
294           d.deleteColumns(ACL_LIST_FAMILY, qualifier);
295         }
296         acls.delete(d);
297       }
298     } finally {
299       if (acls != null) acls.close();
300     }
301   }
302 
303   static byte[] userPermissionRowKey(UserPermission userPerm) {
304     byte[] row;
305     if(userPerm.hasNamespace()) {
306       row = Bytes.toBytes(toNamespaceEntry(userPerm.getNamespace()));
307     } else if(userPerm.isGlobal()) {
308       row = ACL_GLOBAL_NAME;
309     } else {
310       row = userPerm.getTableName().getName();
311     }
312     return row;
313   }
314 
315   /**
316    * Build qualifier key from user permission:
317    *  username
318    *  username,family
319    *  username,family,qualifier
320    */
321   static byte[] userPermissionKey(UserPermission userPerm) {
322     byte[] qualifier = userPerm.getQualifier();
323     byte[] family = userPerm.getFamily();
324     byte[] key = userPerm.getUser();
325 
326     if (family != null && family.length > 0) {
327       key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, family));
328       if (qualifier != null && qualifier.length > 0) {
329         key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, qualifier));
330       }
331     }
332 
333     return key;
334   }
335 
336   /**
337    * Returns {@code true} if the given region is part of the {@code _acl_}
338    * metadata table.
339    */
340   static boolean isAclRegion(HRegion region) {
341     return ACL_TABLE_NAME.equals(region.getTableDesc().getTableName());
342   }
343 
344   /**
345    * Returns {@code true} if the given table is {@code _acl_} metadata table.
346    */
347   static boolean isAclTable(HTableDescriptor desc) {
348     return ACL_TABLE_NAME.equals(desc.getTableName());
349   }
350 
351   /**
352    * Loads all of the permission grants stored in a region of the {@code _acl_}
353    * table.
354    *
355    * @param aclRegion
356    * @return a map of the permissions for this table.
357    * @throws IOException
358    */
359   static Map<byte[], ListMultimap<String,TablePermission>> loadAll(
360       HRegion aclRegion)
361     throws IOException {
362 
363     if (!isAclRegion(aclRegion)) {
364       throw new IOException("Can only load permissions from "+ACL_TABLE_NAME);
365     }
366 
367     Map<byte[], ListMultimap<String, TablePermission>> allPerms =
368         new TreeMap<byte[], ListMultimap<String, TablePermission>>(Bytes.BYTES_RAWCOMPARATOR);
369 
370     // do a full scan of _acl_ table
371 
372     Scan scan = new Scan();
373     scan.addFamily(ACL_LIST_FAMILY);
374 
375     InternalScanner iScanner = null;
376     try {
377       iScanner = aclRegion.getScanner(scan);
378 
379       while (true) {
380         List<Cell> row = new ArrayList<Cell>();
381 
382         boolean hasNext = iScanner.next(row);
383         ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
384         byte[] entry = null;
385         for (Cell kv : row) {
386           if (entry == null) {
387             entry = CellUtil.cloneRow(kv);
388           }
389           Pair<String,TablePermission> permissionsOfUserOnTable =
390               parsePermissionRecord(entry, kv);
391           if (permissionsOfUserOnTable != null) {
392             String username = permissionsOfUserOnTable.getFirst();
393             TablePermission permissions = permissionsOfUserOnTable.getSecond();
394             perms.put(username, permissions);
395           }
396         }
397         if (entry != null) {
398           allPerms.put(entry, perms);
399         }
400         if (!hasNext) {
401           break;
402         }
403       }
404     } finally {
405       if (iScanner != null) {
406         iScanner.close();
407       }
408     }
409 
410     return allPerms;
411   }
412 
413   /**
414    * Load all permissions from the region server holding {@code _acl_},
415    * primarily intended for testing purposes.
416    */
417   static Map<byte[], ListMultimap<String,TablePermission>> loadAll(
418       Configuration conf) throws IOException {
419     Map<byte[], ListMultimap<String,TablePermission>> allPerms =
420         new TreeMap<byte[], ListMultimap<String,TablePermission>>(Bytes.BYTES_RAWCOMPARATOR);
421 
422     // do a full scan of _acl_, filtering on only first table region rows
423 
424     Scan scan = new Scan();
425     scan.addFamily(ACL_LIST_FAMILY);
426 
427     HTable acls = null;
428     ResultScanner scanner = null;
429     try {
430       acls = new HTable(conf, ACL_TABLE_NAME);
431       scanner = acls.getScanner(scan);
432       for (Result row : scanner) {
433         ListMultimap<String,TablePermission> resultPerms =
434             parsePermissions(row.getRow(), row);
435         allPerms.put(row.getRow(), resultPerms);
436       }
437     } finally {
438       if (scanner != null) scanner.close();
439       if (acls != null) acls.close();
440     }
441 
442     return allPerms;
443   }
444 
445   static ListMultimap<String, TablePermission> getTablePermissions(Configuration conf,
446         TableName tableName) throws IOException {
447     return getPermissions(conf, tableName != null ? tableName.getName() : null);
448   }
449 
450   static ListMultimap<String, TablePermission> getNamespacePermissions(Configuration conf,
451         String namespace) throws IOException {
452     return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)));
453   }
454 
455   /**
456    * Reads user permission assignments stored in the <code>l:</code> column
457    * family of the first table row in <code>_acl_</code>.
458    *
459    * <p>
460    * See {@link AccessControlLists class documentation} for the key structure
461    * used for storage.
462    * </p>
463    */
464   static ListMultimap<String, TablePermission> getPermissions(Configuration conf,
465       byte[] entryName) throws IOException {
466     if (entryName == null) entryName = ACL_TABLE_NAME.getName();
467 
468     // for normal user tables, we just read the table row from _acl_
469     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
470     HTable acls = null;
471     try {
472       acls = new HTable(conf, ACL_TABLE_NAME);
473       Get get = new Get(entryName);
474       get.addFamily(ACL_LIST_FAMILY);
475       Result row = acls.get(get);
476       if (!row.isEmpty()) {
477         perms = parsePermissions(entryName, row);
478       } else {
479         LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
480             + Bytes.toString(entryName));
481       }
482     } finally {
483       if (acls != null) acls.close();
484     }
485 
486     return perms;
487   }
488 
489   /**
490    * Returns the currently granted permissions for a given table as a list of
491    * user plus associated permissions.
492    */
493   static List<UserPermission> getUserTablePermissions(
494       Configuration conf, TableName tableName) throws IOException {
495     return getUserPermissions(conf, tableName == null ? null : tableName.getName());
496   }
497 
498   static List<UserPermission> getUserNamespacePermissions(
499       Configuration conf, String namespace) throws IOException {
500     return getUserPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)));
501   }
502 
503   static List<UserPermission> getUserPermissions(
504       Configuration conf, byte[] entryName)
505   throws IOException {
506     ListMultimap<String,TablePermission> allPerms = getPermissions(
507       conf, entryName);
508 
509     List<UserPermission> perms = new ArrayList<UserPermission>();
510 
511     for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
512       UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
513           entry.getValue().getTableName(), entry.getValue().getFamily(),
514           entry.getValue().getQualifier(), entry.getValue().getActions());
515       perms.add(up);
516     }
517     return perms;
518   }
519 
520   private static ListMultimap<String, TablePermission> parsePermissions(
521       byte[] entryName, Result result) {
522     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
523     if (result != null && result.size() > 0) {
524       for (Cell kv : result.rawCells()) {
525 
526         Pair<String,TablePermission> permissionsOfUserOnTable =
527             parsePermissionRecord(entryName, kv);
528 
529         if (permissionsOfUserOnTable != null) {
530           String username = permissionsOfUserOnTable.getFirst();
531           TablePermission permissions = permissionsOfUserOnTable.getSecond();
532           perms.put(username, permissions);
533         }
534       }
535     }
536     return perms;
537   }
538 
539   private static Pair<String, TablePermission> parsePermissionRecord(
540       byte[] entryName, Cell kv) {
541     // return X given a set of permissions encoded in the permissionRecord kv.
542     byte[] family = CellUtil.cloneFamily(kv);
543 
544     if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
545       return null;
546     }
547 
548     byte[] key = CellUtil.cloneQualifier(kv);
549     byte[] value = CellUtil.cloneValue(kv);
550     if (LOG.isDebugEnabled()) {
551       LOG.debug("Read acl: kv ["+
552                 Bytes.toStringBinary(key)+": "+
553                 Bytes.toStringBinary(value)+"]");
554     }
555 
556     // check for a column family appended to the key
557     // TODO: avoid the string conversion to make this more efficient
558     String username = Bytes.toString(key);
559 
560     //Handle namespace entry
561     if(isNamespaceEntry(entryName)) {
562       return new Pair<String, TablePermission>(username,
563           new TablePermission(Bytes.toString(fromNamespaceEntry(entryName)), value));
564     }
565 
566     //Handle table and global entry
567     //TODO global entry should be handled differently
568     int idx = username.indexOf(ACL_KEY_DELIMITER);
569     byte[] permFamily = null;
570     byte[] permQualifier = null;
571     if (idx > 0 && idx < username.length()-1) {
572       String remainder = username.substring(idx+1);
573       username = username.substring(0, idx);
574       idx = remainder.indexOf(ACL_KEY_DELIMITER);
575       if (idx > 0 && idx < remainder.length()-1) {
576         permFamily = Bytes.toBytes(remainder.substring(0, idx));
577         permQualifier = Bytes.toBytes(remainder.substring(idx+1));
578       } else {
579         permFamily = Bytes.toBytes(remainder);
580       }
581     }
582 
583     return new Pair<String,TablePermission>(username,
584         new TablePermission(TableName.valueOf(entryName), permFamily, permQualifier, value));
585   }
586 
587   /**
588    * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances
589    * and returns the resulting byte array.
590    *
591    * Writes a set of permission [user: table permission]
592    */
593   public static byte[] writePermissionsAsBytes(ListMultimap<String, TablePermission> perms,
594       Configuration conf) {
595     return ProtobufUtil.prependPBMagic(ProtobufUtil.toUserTablePermissions(perms).toByteArray());
596   }
597 
598   /**
599    * Reads a set of permissions as {@link org.apache.hadoop.io.Writable} instances
600    * from the input stream.
601    */
602   public static ListMultimap<String, TablePermission> readPermissions(byte[] data,
603       Configuration conf)
604   throws DeserializationException {
605     if (ProtobufUtil.isPBMagicPrefix(data)) {
606       int pblen = ProtobufUtil.lengthOfPBMagic();
607       try {
608         AccessControlProtos.UsersAndPermissions perms =
609           AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
610             data, pblen, data.length - pblen).build();
611         return ProtobufUtil.toUserTablePermissions(perms);
612       } catch (InvalidProtocolBufferException e) {
613         throw new DeserializationException(e);
614       }
615     } else {
616       ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
617       try {
618         DataInput in = new DataInputStream(new ByteArrayInputStream(data));
619         int length = in.readInt();
620         for (int i=0; i<length; i++) {
621           String user = Text.readString(in);
622           List<TablePermission> userPerms =
623             (List)HbaseObjectWritableFor96Migration.readObject(in, conf);
624           perms.putAll(user, userPerms);
625         }
626       } catch (IOException e) {
627         throw new DeserializationException(e);
628       }
629       return perms;
630     }
631   }
632 
633   /**
634    * Returns whether or not the given name should be interpreted as a group
635    * principal.  Currently this simply checks if the name starts with the
636    * special group prefix character ("@").
637    */
638   public static boolean isGroupPrincipal(String name) {
639     return name != null && name.startsWith(GROUP_PREFIX);
640   }
641 
642   /**
643    * Returns the actual name for a group principal (stripped of the
644    * group prefix).
645    */
646   public static String getGroupName(String aclKey) {
647     if (!isGroupPrincipal(aclKey)) {
648       return aclKey;
649     }
650 
651     return aclKey.substring(GROUP_PREFIX.length());
652   }
653 
654   public static boolean isNamespaceEntry(String entryName) {
655     return entryName.charAt(0) == NAMESPACE_PREFIX;
656   }
657 
658   public static boolean isNamespaceEntry(byte[] entryName) {
659     return entryName[0] == NAMESPACE_PREFIX;
660   }
661   
662   public static String toNamespaceEntry(String namespace) {
663      return NAMESPACE_PREFIX + namespace;
664    }
665 
666    public static String fromNamespaceEntry(String namespace) {
667      if(namespace.charAt(0) != NAMESPACE_PREFIX)
668        throw new IllegalArgumentException("Argument is not a valid namespace entry");
669      return namespace.substring(1);
670    }
671 
672    public static byte[] toNamespaceEntry(byte[] namespace) {
673      byte[] ret = new byte[namespace.length+1];
674      ret[0] = NAMESPACE_PREFIX;
675      System.arraycopy(namespace, 0, ret, 1, namespace.length);
676      return ret;
677    }
678 
679    public static byte[] fromNamespaceEntry(byte[] namespace) {
680      if(namespace[0] != NAMESPACE_PREFIX) {
681        throw new IllegalArgumentException("Argument is not a valid namespace entry: " +
682            Bytes.toString(namespace));
683      }
684      return Arrays.copyOfRange(namespace, 1, namespace.length);
685    }
686 
687    public static List<Permission> getCellPermissionsForUser(User user, Cell cell)
688        throws IOException {
689      // Save an object allocation where we can
690      if (cell.getTagsLength() == 0) {
691        return null;
692      }
693      List<Permission> results = Lists.newArrayList();
694      Iterator<Tag> tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
695         cell.getTagsLength());
696      while (tagsIterator.hasNext()) {
697        Tag tag = tagsIterator.next();
698        if (tag.getType() == ACL_TAG_TYPE) {
699          // Deserialize the table permissions from the KV
700          ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
701            AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
702              tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
703          // Are there permissions for this user?
704          List<Permission> userPerms = kvPerms.get(user.getShortName());
705          if (userPerms != null) {
706            results.addAll(userPerms);
707          }
708          // Are there permissions for any of the groups this user belongs to?
709          String groupNames[] = user.getGroupNames();
710          if (groupNames != null) {
711            for (String group : groupNames) {
712              List<Permission> groupPerms = kvPerms.get(GROUP_PREFIX + group);
713              if (results != null) {
714                results.addAll(groupPerms);
715              }
716            }
717          }
718        }
719      }
720      return results;
721    }
722 }