View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.access;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.TreeMap;
32  import java.util.TreeSet;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.classification.InterfaceAudience;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.NamespaceDescriptor;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.Tag;
46  import org.apache.hadoop.hbase.TagType;
47  import org.apache.hadoop.hbase.client.Delete;
48  import org.apache.hadoop.hbase.client.Get;
49  import org.apache.hadoop.hbase.client.HTable;
50  import org.apache.hadoop.hbase.client.Put;
51  import org.apache.hadoop.hbase.client.Result;
52  import org.apache.hadoop.hbase.client.ResultScanner;
53  import org.apache.hadoop.hbase.client.Scan;
54  import org.apache.hadoop.hbase.client.Table;
55  import org.apache.hadoop.hbase.exceptions.DeserializationException;
56  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
57  import org.apache.hadoop.hbase.filter.QualifierFilter;
58  import org.apache.hadoop.hbase.filter.RegexStringComparator;
59  import org.apache.hadoop.hbase.io.compress.Compression;
60  import org.apache.hadoop.hbase.master.MasterServices;
61  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
62  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
63  import org.apache.hadoop.hbase.regionserver.BloomType;
64  import org.apache.hadoop.hbase.regionserver.HRegion;
65  import org.apache.hadoop.hbase.regionserver.InternalScanner;
66  import org.apache.hadoop.hbase.security.User;
67  import org.apache.hadoop.hbase.util.Bytes;
68  import org.apache.hadoop.hbase.util.Pair;
69  import org.apache.hadoop.io.Text;
70  
71  import com.google.common.collect.ArrayListMultimap;
72  import com.google.common.collect.ListMultimap;
73  import com.google.common.collect.Lists;
74  import com.google.protobuf.InvalidProtocolBufferException;
75  
76  /**
77   * Maintains lists of permission grants to users and groups to allow for
78   * authorization checks by {@link AccessController}.
79   *
80   * <p>
81   * Access control lists are stored in an "internal" metadata table named
82   * {@code _acl_}. Each table's permission grants are stored as a separate row,
83   * keyed by the table name. KeyValues for permissions assignments are stored
84   * in one of the formats:
85   * <pre>
86   * Key                      Desc
87   * --------                 --------
88   * user                     table level permissions for a user [R=read, W=write]
89   * group                    table level permissions for a group
90   * user,family              column family level permissions for a user
91   * group,family             column family level permissions for a group
92   * user,family,qualifier    column qualifier level permissions for a user
93   * group,family,qualifier   column qualifier level permissions for a group
94   * </pre>
95   * All values are encoded as byte arrays containing the codes from the
96   * org.apache.hadoop.hbase.security.access.TablePermission.Action enum.
97   * </p>
98   */
99  @InterfaceAudience.Private
100 public class AccessControlLists {
101   /** Internal storage table for access control lists */
102   public static final TableName ACL_TABLE_NAME =
103       TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
104   public static final byte[] ACL_GLOBAL_NAME = ACL_TABLE_NAME.getName();
105   /** Column family used to store ACL grants */
106   public static final String ACL_LIST_FAMILY_STR = "l";
107   public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
108   /** KV tag to store per cell access control lists */
109   public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE;
110 
111   public static final char NAMESPACE_PREFIX = '@';
112 
113   /** Table descriptor for ACL internal table */
114   public static final HTableDescriptor ACL_TABLEDESC = new HTableDescriptor(ACL_TABLE_NAME);
115   static {
116     ACL_TABLEDESC.addFamily(
117         new HColumnDescriptor(ACL_LIST_FAMILY,
118             10, // Ten is arbitrary number.  Keep versions to help debugging.
119             Compression.Algorithm.NONE.getName(), true, true, 8 * 1024,
120             HConstants.FOREVER, BloomType.NONE.toString(),
121             HConstants.REPLICATION_SCOPE_LOCAL).
122             // Set cache data blocks in L1 if more than one cache tier deployed; e.g. this will
123             // be the case if we are using CombinedBlockCache (Bucket Cache).
124             setCacheDataInL1(true));
125   }
126 
127   /**
128    * Delimiter to separate user, column family, and qualifier in
129    * _acl_ table info: column keys */
130   public static final char ACL_KEY_DELIMITER = ',';
131   /** Prefix character to denote group names */
132   public static final String GROUP_PREFIX = "@";
133   /** Configuration key for superusers */
134   public static final String SUPERUSER_CONF_KEY = "hbase.superuser";
135 
136   private static Log LOG = LogFactory.getLog(AccessControlLists.class);
137 
138   /**
139    * Check for existence of {@code _acl_} table and create it if it does not exist
140    * @param master reference to HMaster
141    */
142   static void init(MasterServices master) throws IOException {
143     master.createTable(ACL_TABLEDESC, null);
144   }
145 
146   /**
147    * Stores a new user permission grant in the access control lists table.
148    * @param conf the configuration
149    * @param userPerm the details of the permission to be granted
150    * @throws IOException in the case of an error accessing the metadata table
151    */
152   static void addUserPermission(Configuration conf, UserPermission userPerm)
153       throws IOException {
154     Permission.Action[] actions = userPerm.getActions();
155     byte[] rowKey = userPermissionRowKey(userPerm);
156     Put p = new Put(rowKey);
157     byte[] key = userPermissionKey(userPerm);
158 
159     if ((actions == null) || (actions.length == 0)) {
160       String msg = "No actions associated with user '" + Bytes.toString(userPerm.getUser()) + "'";
161       LOG.warn(msg);
162       throw new IOException(msg);
163     }
164 
165     byte[] value = new byte[actions.length];
166     for (int i = 0; i < actions.length; i++) {
167       value[i] = actions[i].code();
168     }
169     p.addImmutable(ACL_LIST_FAMILY, key, value);
170     if (LOG.isDebugEnabled()) {
171       LOG.debug("Writing permission with rowKey "+
172           Bytes.toString(rowKey)+" "+
173           Bytes.toString(key)+": "+Bytes.toStringBinary(value)
174       );
175     }
176     Table acls = null;
177     try {
178       acls = new HTable(conf, ACL_TABLE_NAME);
179       acls.put(p);
180     } finally {
181       if (acls != null) acls.close();
182     }
183   }
184 
185   /**
186    * Removes a previously granted permission from the stored access control
187    * lists.  The {@link TablePermission} being removed must exactly match what
188    * is stored -- no wildcard matching is attempted.  Ie, if user "bob" has
189    * been granted "READ" access to the "data" table, but only to column family
190    * plus qualifier "info:colA", then trying to call this method with only
191    * user "bob" and the table name "data" (but without specifying the
192    * column qualifier "info:colA") will have no effect.
193    *
194    * @param conf the configuration
195    * @param userPerm the details of the permission to be revoked
196    * @throws IOException if there is an error accessing the metadata table
197    */
198   static void removeUserPermission(Configuration conf, UserPermission userPerm)
199       throws IOException {
200     Delete d = new Delete(userPermissionRowKey(userPerm));
201     byte[] key = userPermissionKey(userPerm);
202 
203     if (LOG.isDebugEnabled()) {
204       LOG.debug("Removing permission "+ userPerm.toString());
205     }
206     d.deleteColumns(ACL_LIST_FAMILY, key);
207     Table acls = null;
208     try {
209       acls = new HTable(conf, ACL_TABLE_NAME);
210       acls.delete(d);
211     } finally {
212       if (acls != null) acls.close();
213     }
214   }
215 
216   /**
217    * Remove specified table from the _acl_ table.
218    */
219   static void removeTablePermissions(Configuration conf, TableName tableName)
220       throws IOException{
221     Delete d = new Delete(tableName.getName());
222 
223     if (LOG.isDebugEnabled()) {
224       LOG.debug("Removing permissions of removed table "+ tableName);
225     }
226 
227     Table acls = null;
228     try {
229       acls = new HTable(conf, ACL_TABLE_NAME);
230       acls.delete(d);
231     } finally {
232       if (acls != null) acls.close();
233     }
234   }
235 
236   /**
237    * Remove specified namespace from the acl table.
238    */
239   static void removeNamespacePermissions(Configuration conf, String namespace)
240       throws IOException{
241     Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
242 
243     if (LOG.isDebugEnabled()) {
244       LOG.debug("Removing permissions of removed namespace "+ namespace);
245     }
246 
247     Table acls = null;
248     try {
249       acls = new HTable(conf, ACL_TABLE_NAME);
250       acls.delete(d);
251     } finally {
252       if (acls != null) acls.close();
253     }
254   }
255 
256   /**
257    * Remove specified table column from the acl table.
258    */
259   static void removeTablePermissions(Configuration conf, TableName tableName, byte[] column)
260       throws IOException{
261 
262     if (LOG.isDebugEnabled()) {
263       LOG.debug("Removing permissions of removed column " + Bytes.toString(column) +
264                 " from table "+ tableName);
265     }
266 
267     Table acls = null;
268     try {
269       acls = new HTable(conf, ACL_TABLE_NAME);
270 
271       Scan scan = new Scan();
272       scan.addFamily(ACL_LIST_FAMILY);
273 
274       String columnName = Bytes.toString(column);
275       scan.setFilter(new QualifierFilter(CompareOp.EQUAL, new RegexStringComparator(
276                      String.format("(%s%s%s)|(%s%s)$",
277                      ACL_KEY_DELIMITER, columnName, ACL_KEY_DELIMITER,
278                      ACL_KEY_DELIMITER, columnName))));
279 
280       Set<byte[]> qualifierSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
281       ResultScanner scanner = acls.getScanner(scan);
282       try {
283         for (Result res : scanner) {
284           for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
285             qualifierSet.add(q);
286           }
287         }
288       } finally {
289         scanner.close();
290       }
291 
292       if (qualifierSet.size() > 0) {
293         Delete d = new Delete(tableName.getName());
294         for (byte[] qualifier : qualifierSet) {
295           d.deleteColumns(ACL_LIST_FAMILY, qualifier);
296         }
297         acls.delete(d);
298       }
299     } finally {
300       if (acls != null) acls.close();
301     }
302   }
303 
304   static byte[] userPermissionRowKey(UserPermission userPerm) {
305     byte[] row;
306     if(userPerm.hasNamespace()) {
307       row = Bytes.toBytes(toNamespaceEntry(userPerm.getNamespace()));
308     } else if(userPerm.isGlobal()) {
309       row = ACL_GLOBAL_NAME;
310     } else {
311       row = userPerm.getTableName().getName();
312     }
313     return row;
314   }
315 
316   /**
317    * Build qualifier key from user permission:
318    *  username
319    *  username,family
320    *  username,family,qualifier
321    */
322   static byte[] userPermissionKey(UserPermission userPerm) {
323     byte[] qualifier = userPerm.getQualifier();
324     byte[] family = userPerm.getFamily();
325     byte[] key = userPerm.getUser();
326 
327     if (family != null && family.length > 0) {
328       key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, family));
329       if (qualifier != null && qualifier.length > 0) {
330         key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, qualifier));
331       }
332     }
333 
334     return key;
335   }
336 
337   /**
338    * Returns {@code true} if the given region is part of the {@code _acl_}
339    * metadata table.
340    */
341   static boolean isAclRegion(HRegion region) {
342     return ACL_TABLE_NAME.equals(region.getTableDesc().getTableName());
343   }
344 
345   /**
346    * Returns {@code true} if the given table is {@code _acl_} metadata table.
347    */
348   static boolean isAclTable(HTableDescriptor desc) {
349     return ACL_TABLE_NAME.equals(desc.getTableName());
350   }
351 
352   /**
353    * Loads all of the permission grants stored in a region of the {@code _acl_}
354    * table.
355    *
356    * @param aclRegion
357    * @return a map of the permissions for this table.
358    * @throws IOException
359    */
360   static Map<byte[], ListMultimap<String,TablePermission>> loadAll(
361       HRegion aclRegion)
362     throws IOException {
363 
364     if (!isAclRegion(aclRegion)) {
365       throw new IOException("Can only load permissions from "+ACL_TABLE_NAME);
366     }
367 
368     Map<byte[], ListMultimap<String, TablePermission>> allPerms =
369         new TreeMap<byte[], ListMultimap<String, TablePermission>>(Bytes.BYTES_RAWCOMPARATOR);
370 
371     // do a full scan of _acl_ table
372 
373     Scan scan = new Scan();
374     scan.addFamily(ACL_LIST_FAMILY);
375 
376     InternalScanner iScanner = null;
377     try {
378       iScanner = aclRegion.getScanner(scan);
379 
380       while (true) {
381         List<Cell> row = new ArrayList<Cell>();
382 
383         boolean hasNext = iScanner.next(row);
384         ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
385         byte[] entry = null;
386         for (Cell kv : row) {
387           if (entry == null) {
388             entry = CellUtil.cloneRow(kv);
389           }
390           Pair<String,TablePermission> permissionsOfUserOnTable =
391               parsePermissionRecord(entry, kv);
392           if (permissionsOfUserOnTable != null) {
393             String username = permissionsOfUserOnTable.getFirst();
394             TablePermission permissions = permissionsOfUserOnTable.getSecond();
395             perms.put(username, permissions);
396           }
397         }
398         if (entry != null) {
399           allPerms.put(entry, perms);
400         }
401         if (!hasNext) {
402           break;
403         }
404       }
405     } finally {
406       if (iScanner != null) {
407         iScanner.close();
408       }
409     }
410 
411     return allPerms;
412   }
413 
414   /**
415    * Load all permissions from the region server holding {@code _acl_},
416    * primarily intended for testing purposes.
417    */
418   static Map<byte[], ListMultimap<String,TablePermission>> loadAll(
419       Configuration conf) throws IOException {
420     Map<byte[], ListMultimap<String,TablePermission>> allPerms =
421         new TreeMap<byte[], ListMultimap<String,TablePermission>>(Bytes.BYTES_RAWCOMPARATOR);
422 
423     // do a full scan of _acl_, filtering on only first table region rows
424 
425     Scan scan = new Scan();
426     scan.addFamily(ACL_LIST_FAMILY);
427 
428     Table acls = null;
429     ResultScanner scanner = null;
430     try {
431       acls = new HTable(conf, ACL_TABLE_NAME);
432       scanner = acls.getScanner(scan);
433       for (Result row : scanner) {
434         ListMultimap<String,TablePermission> resultPerms =
435             parsePermissions(row.getRow(), row);
436         allPerms.put(row.getRow(), resultPerms);
437       }
438     } finally {
439       if (scanner != null) scanner.close();
440       if (acls != null) acls.close();
441     }
442 
443     return allPerms;
444   }
445 
446   static ListMultimap<String, TablePermission> getTablePermissions(Configuration conf,
447         TableName tableName) throws IOException {
448     return getPermissions(conf, tableName != null ? tableName.getName() : null);
449   }
450 
451   static ListMultimap<String, TablePermission> getNamespacePermissions(Configuration conf,
452         String namespace) throws IOException {
453     return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)));
454   }
455 
456   /**
457    * Reads user permission assignments stored in the <code>l:</code> column
458    * family of the first table row in <code>_acl_</code>.
459    *
460    * <p>
461    * See {@link AccessControlLists class documentation} for the key structure
462    * used for storage.
463    * </p>
464    */
465   static ListMultimap<String, TablePermission> getPermissions(Configuration conf,
466       byte[] entryName) throws IOException {
467     if (entryName == null) entryName = ACL_TABLE_NAME.getName();
468 
469     // for normal user tables, we just read the table row from _acl_
470     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
471     Table acls = null;
472     try {
473       acls = new HTable(conf, ACL_TABLE_NAME);
474       Get get = new Get(entryName);
475       get.addFamily(ACL_LIST_FAMILY);
476       Result row = acls.get(get);
477       if (!row.isEmpty()) {
478         perms = parsePermissions(entryName, row);
479       } else {
480         LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
481             + Bytes.toString(entryName));
482       }
483     } finally {
484       if (acls != null) acls.close();
485     }
486 
487     return perms;
488   }
489 
490   /**
491    * Returns the currently granted permissions for a given table as a list of
492    * user plus associated permissions.
493    */
494   static List<UserPermission> getUserTablePermissions(
495       Configuration conf, TableName tableName) throws IOException {
496     return getUserPermissions(conf, tableName == null ? null : tableName.getName());
497   }
498 
499   static List<UserPermission> getUserNamespacePermissions(
500       Configuration conf, String namespace) throws IOException {
501     return getUserPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)));
502   }
503 
504   static List<UserPermission> getUserPermissions(
505       Configuration conf, byte[] entryName)
506   throws IOException {
507     ListMultimap<String,TablePermission> allPerms = getPermissions(
508       conf, entryName);
509 
510     List<UserPermission> perms = new ArrayList<UserPermission>();
511 
512     for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
513       UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
514           entry.getValue().getTableName(), entry.getValue().getFamily(),
515           entry.getValue().getQualifier(), entry.getValue().getActions());
516       perms.add(up);
517     }
518     return perms;
519   }
520 
521   private static ListMultimap<String, TablePermission> parsePermissions(
522       byte[] entryName, Result result) {
523     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
524     if (result != null && result.size() > 0) {
525       for (Cell kv : result.rawCells()) {
526 
527         Pair<String,TablePermission> permissionsOfUserOnTable =
528             parsePermissionRecord(entryName, kv);
529 
530         if (permissionsOfUserOnTable != null) {
531           String username = permissionsOfUserOnTable.getFirst();
532           TablePermission permissions = permissionsOfUserOnTable.getSecond();
533           perms.put(username, permissions);
534         }
535       }
536     }
537     return perms;
538   }
539 
540   private static Pair<String, TablePermission> parsePermissionRecord(
541       byte[] entryName, Cell kv) {
542     // return X given a set of permissions encoded in the permissionRecord kv.
543     byte[] family = CellUtil.cloneFamily(kv);
544 
545     if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
546       return null;
547     }
548 
549     byte[] key = CellUtil.cloneQualifier(kv);
550     byte[] value = CellUtil.cloneValue(kv);
551     if (LOG.isDebugEnabled()) {
552       LOG.debug("Read acl: kv ["+
553                 Bytes.toStringBinary(key)+": "+
554                 Bytes.toStringBinary(value)+"]");
555     }
556 
557     // check for a column family appended to the key
558     // TODO: avoid the string conversion to make this more efficient
559     String username = Bytes.toString(key);
560 
561     //Handle namespace entry
562     if(isNamespaceEntry(entryName)) {
563       return new Pair<String, TablePermission>(username,
564           new TablePermission(Bytes.toString(fromNamespaceEntry(entryName)), value));
565     }
566 
567     //Handle table and global entry
568     //TODO global entry should be handled differently
569     int idx = username.indexOf(ACL_KEY_DELIMITER);
570     byte[] permFamily = null;
571     byte[] permQualifier = null;
572     if (idx > 0 && idx < username.length()-1) {
573       String remainder = username.substring(idx+1);
574       username = username.substring(0, idx);
575       idx = remainder.indexOf(ACL_KEY_DELIMITER);
576       if (idx > 0 && idx < remainder.length()-1) {
577         permFamily = Bytes.toBytes(remainder.substring(0, idx));
578         permQualifier = Bytes.toBytes(remainder.substring(idx+1));
579       } else {
580         permFamily = Bytes.toBytes(remainder);
581       }
582     }
583 
584     return new Pair<String,TablePermission>(username,
585         new TablePermission(TableName.valueOf(entryName), permFamily, permQualifier, value));
586   }
587 
588   /**
589    * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances
590    * and returns the resulting byte array.
591    *
592    * Writes a set of permission [user: table permission]
593    */
594   public static byte[] writePermissionsAsBytes(ListMultimap<String, TablePermission> perms,
595       Configuration conf) {
596     return ProtobufUtil.prependPBMagic(ProtobufUtil.toUserTablePermissions(perms).toByteArray());
597   }
598 
599   /**
600    * Reads a set of permissions as {@link org.apache.hadoop.io.Writable} instances
601    * from the input stream.
602    */
603   public static ListMultimap<String, TablePermission> readPermissions(byte[] data,
604       Configuration conf)
605   throws DeserializationException {
606     if (ProtobufUtil.isPBMagicPrefix(data)) {
607       int pblen = ProtobufUtil.lengthOfPBMagic();
608       try {
609         AccessControlProtos.UsersAndPermissions perms =
610           AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
611             data, pblen, data.length - pblen).build();
612         return ProtobufUtil.toUserTablePermissions(perms);
613       } catch (InvalidProtocolBufferException e) {
614         throw new DeserializationException(e);
615       }
616     } else {
617       ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
618       try {
619         DataInput in = new DataInputStream(new ByteArrayInputStream(data));
620         int length = in.readInt();
621         for (int i=0; i<length; i++) {
622           String user = Text.readString(in);
623           List<TablePermission> userPerms =
624             (List)HbaseObjectWritableFor96Migration.readObject(in, conf);
625           perms.putAll(user, userPerms);
626         }
627       } catch (IOException e) {
628         throw new DeserializationException(e);
629       }
630       return perms;
631     }
632   }
633 
634   /**
635    * Returns whether or not the given name should be interpreted as a group
636    * principal.  Currently this simply checks if the name starts with the
637    * special group prefix character ("@").
638    */
639   public static boolean isGroupPrincipal(String name) {
640     return name != null && name.startsWith(GROUP_PREFIX);
641   }
642 
643   /**
644    * Returns the actual name for a group principal (stripped of the
645    * group prefix).
646    */
647   public static String getGroupName(String aclKey) {
648     if (!isGroupPrincipal(aclKey)) {
649       return aclKey;
650     }
651 
652     return aclKey.substring(GROUP_PREFIX.length());
653   }
654 
655   public static boolean isNamespaceEntry(String entryName) {
656     return entryName.charAt(0) == NAMESPACE_PREFIX;
657   }
658 
659   public static boolean isNamespaceEntry(byte[] entryName) {
660     return entryName[0] == NAMESPACE_PREFIX;
661   }
662   
663   public static String toNamespaceEntry(String namespace) {
664      return NAMESPACE_PREFIX + namespace;
665    }
666 
667    public static String fromNamespaceEntry(String namespace) {
668      if(namespace.charAt(0) != NAMESPACE_PREFIX)
669        throw new IllegalArgumentException("Argument is not a valid namespace entry");
670      return namespace.substring(1);
671    }
672 
673    public static byte[] toNamespaceEntry(byte[] namespace) {
674      byte[] ret = new byte[namespace.length+1];
675      ret[0] = NAMESPACE_PREFIX;
676      System.arraycopy(namespace, 0, ret, 1, namespace.length);
677      return ret;
678    }
679 
680    public static byte[] fromNamespaceEntry(byte[] namespace) {
681      if(namespace[0] != NAMESPACE_PREFIX) {
682        throw new IllegalArgumentException("Argument is not a valid namespace entry: " +
683            Bytes.toString(namespace));
684      }
685      return Arrays.copyOfRange(namespace, 1, namespace.length);
686    }
687 
688    public static List<Permission> getCellPermissionsForUser(User user, Cell cell)
689        throws IOException {
690      // Save an object allocation where we can
691      if (cell.getTagsLength() == 0) {
692        return null;
693      }
694      List<Permission> results = Lists.newArrayList();
695      Iterator<Tag> tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
696         cell.getTagsLength());
697      while (tagsIterator.hasNext()) {
698        Tag tag = tagsIterator.next();
699        if (tag.getType() == ACL_TAG_TYPE) {
700          // Deserialize the table permissions from the KV
701          ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
702            AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
703              tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
704          // Are there permissions for this user?
705          List<Permission> userPerms = kvPerms.get(user.getShortName());
706          if (userPerms != null) {
707            results.addAll(userPerms);
708          }
709          // Are there permissions for any of the groups this user belongs to?
710          String groupNames[] = user.getGroupNames();
711          if (groupNames != null) {
712            for (String group : groupNames) {
713              List<Permission> groupPerms = kvPerms.get(GROUP_PREFIX + group);
714              if (results != null) {
715                results.addAll(groupPerms);
716              }
717            }
718          }
719        }
720      }
721      return results;
722    }
723 }