View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.access;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.TreeMap;
32  import java.util.TreeSet;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.CellUtil;
39  import org.apache.hadoop.hbase.HColumnDescriptor;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.NamespaceDescriptor;
43  import org.apache.hadoop.hbase.TableName;
44  import org.apache.hadoop.hbase.Tag;
45  import org.apache.hadoop.hbase.TagType;
46  import org.apache.hadoop.hbase.client.Delete;
47  import org.apache.hadoop.hbase.client.Get;
48  import org.apache.hadoop.hbase.client.HTable;
49  import org.apache.hadoop.hbase.client.Put;
50  import org.apache.hadoop.hbase.client.Result;
51  import org.apache.hadoop.hbase.client.ResultScanner;
52  import org.apache.hadoop.hbase.client.Scan;
53  import org.apache.hadoop.hbase.exceptions.DeserializationException;
54  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
55  import org.apache.hadoop.hbase.filter.QualifierFilter;
56  import org.apache.hadoop.hbase.filter.RegexStringComparator;
57  import org.apache.hadoop.hbase.io.compress.Compression;
58  import org.apache.hadoop.hbase.master.MasterServices;
59  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
60  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
61  import org.apache.hadoop.hbase.regionserver.BloomType;
62  import org.apache.hadoop.hbase.regionserver.HRegion;
63  import org.apache.hadoop.hbase.regionserver.InternalScanner;
64  import org.apache.hadoop.hbase.security.User;
65  import org.apache.hadoop.hbase.util.Bytes;
66  import org.apache.hadoop.hbase.util.Pair;
67  import org.apache.hadoop.io.Text;
68  
69  import com.google.common.collect.ArrayListMultimap;
70  import com.google.common.collect.ListMultimap;
71  import com.google.common.collect.Lists;
72  import com.google.protobuf.InvalidProtocolBufferException;
73  
74  /**
75   * Maintains lists of permission grants to users and groups to allow for
76   * authorization checks by {@link AccessController}.
77   *
78   * <p>
79   * Access control lists are stored in an "internal" metadata table named
80   * {@code _acl_}. Each table's permission grants are stored as a separate row,
81   * keyed by the table name. KeyValues for permissions assignments are stored
82   * in one of the formats:
83   * <pre>
84   * Key                      Desc
85   * --------                 --------
86   * user                     table level permissions for a user [R=read, W=write]
87   * group                    table level permissions for a group
88   * user,family              column family level permissions for a user
89   * group,family             column family level permissions for a group
90   * user,family,qualifier    column qualifier level permissions for a user
91   * group,family,qualifier   column qualifier level permissions for a group
92   * </pre>
93   * All values are encoded as byte arrays containing the codes from the
94   * org.apache.hadoop.hbase.security.access.TablePermission.Action enum.
95   * </p>
96   */
97  public class AccessControlLists {
98    /** Internal storage table for access control lists */
99    public static final TableName ACL_TABLE_NAME =
100       TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
101   public static final byte[] ACL_GLOBAL_NAME = ACL_TABLE_NAME.getName();
102   /** Column family used to store ACL grants */
103   public static final String ACL_LIST_FAMILY_STR = "l";
104   public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
105   /** KV tag to store per cell access control lists */
106   public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE;
107 
108   public static final char NAMESPACE_PREFIX = '@';
109 
110   /** Table descriptor for ACL internal table */
111   public static final HTableDescriptor ACL_TABLEDESC = new HTableDescriptor(ACL_TABLE_NAME);
112   static {
113     ACL_TABLEDESC.addFamily(
114         new HColumnDescriptor(ACL_LIST_FAMILY,
115             10, // Ten is arbitrary number.  Keep versions to help debugging.
116             Compression.Algorithm.NONE.getName(), true, true, 8 * 1024,
117             HConstants.FOREVER, BloomType.NONE.toString(),
118             HConstants.REPLICATION_SCOPE_LOCAL).
119             // Set cache data blocks in L1 if more than one cache tier deployed; e.g. this will
120             // be the case if we are using CombinedBlockCache (Bucket Cache).
121             setCacheDataInL1(true));
122   }
123 
124   /**
125    * Delimiter to separate user, column family, and qualifier in
126    * _acl_ table info: column keys */
127   public static final char ACL_KEY_DELIMITER = ',';
128   /** Prefix character to denote group names */
129   public static final String GROUP_PREFIX = "@";
130   /** Configuration key for superusers */
131   public static final String SUPERUSER_CONF_KEY = "hbase.superuser";
132 
133   private static Log LOG = LogFactory.getLog(AccessControlLists.class);
134 
135   /**
136    * Check for existence of {@code _acl_} table and create it if it does not exist
137    * @param master reference to HMaster
138    */
139   static void init(MasterServices master) throws IOException {
140     master.createTable(ACL_TABLEDESC, null);
141   }
142 
143   /**
144    * Stores a new user permission grant in the access control lists table.
145    * @param conf the configuration
146    * @param userPerm the details of the permission to be granted
147    * @throws IOException in the case of an error accessing the metadata table
148    */
149   static void addUserPermission(Configuration conf, UserPermission userPerm)
150       throws IOException {
151     Permission.Action[] actions = userPerm.getActions();
152     byte[] rowKey = userPermissionRowKey(userPerm);
153     Put p = new Put(rowKey);
154     byte[] key = userPermissionKey(userPerm);
155 
156     if ((actions == null) || (actions.length == 0)) {
157       String msg = "No actions associated with user '" + Bytes.toString(userPerm.getUser()) + "'";
158       LOG.warn(msg);
159       throw new IOException(msg);
160     }
161 
162     byte[] value = new byte[actions.length];
163     for (int i = 0; i < actions.length; i++) {
164       value[i] = actions[i].code();
165     }
166     p.addImmutable(ACL_LIST_FAMILY, key, value);
167     if (LOG.isDebugEnabled()) {
168       LOG.debug("Writing permission with rowKey "+
169           Bytes.toString(rowKey)+" "+
170           Bytes.toString(key)+": "+Bytes.toStringBinary(value)
171       );
172     }
173     HTable acls = null;
174     try {
175       acls = new HTable(conf, ACL_TABLE_NAME);
176       acls.put(p);
177     } finally {
178       if (acls != null) acls.close();
179     }
180   }
181 
182   /**
183    * Removes a previously granted permission from the stored access control
184    * lists.  The {@link TablePermission} being removed must exactly match what
185    * is stored -- no wildcard matching is attempted.  Ie, if user "bob" has
186    * been granted "READ" access to the "data" table, but only to column family
187    * plus qualifier "info:colA", then trying to call this method with only
188    * user "bob" and the table name "data" (but without specifying the
189    * column qualifier "info:colA") will have no effect.
190    *
191    * @param conf the configuration
192    * @param userPerm the details of the permission to be revoked
193    * @throws IOException if there is an error accessing the metadata table
194    */
195   static void removeUserPermission(Configuration conf, UserPermission userPerm)
196       throws IOException {
197     Delete d = new Delete(userPermissionRowKey(userPerm));
198     byte[] key = userPermissionKey(userPerm);
199 
200     if (LOG.isDebugEnabled()) {
201       LOG.debug("Removing permission "+ userPerm.toString());
202     }
203     d.deleteColumns(ACL_LIST_FAMILY, key);
204     HTable acls = null;
205     try {
206       acls = new HTable(conf, ACL_TABLE_NAME);
207       acls.delete(d);
208     } finally {
209       if (acls != null) acls.close();
210     }
211   }
212 
213   /**
214    * Remove specified table from the _acl_ table.
215    */
216   static void removeTablePermissions(Configuration conf, TableName tableName)
217       throws IOException{
218     Delete d = new Delete(tableName.getName());
219 
220     if (LOG.isDebugEnabled()) {
221       LOG.debug("Removing permissions of removed table "+ tableName);
222     }
223 
224     HTable acls = null;
225     try {
226       acls = new HTable(conf, ACL_TABLE_NAME);
227       acls.delete(d);
228     } finally {
229       if (acls != null) acls.close();
230     }
231   }
232 
233   /**
234    * Remove specified namespace from the acl table.
235    */
236   static void removeNamespacePermissions(Configuration conf, String namespace)
237       throws IOException{
238     Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
239 
240     if (LOG.isDebugEnabled()) {
241       LOG.debug("Removing permissions of removed namespace "+ namespace);
242     }
243 
244     HTable acls = null;
245     try {
246       acls = new HTable(conf, ACL_TABLE_NAME);
247       acls.delete(d);
248     } finally {
249       if (acls != null) acls.close();
250     }
251   }
252 
253   /**
254    * Remove specified table column from the acl table.
255    */
256   static void removeTablePermissions(Configuration conf, TableName tableName, byte[] column)
257       throws IOException{
258 
259     if (LOG.isDebugEnabled()) {
260       LOG.debug("Removing permissions of removed column " + Bytes.toString(column) +
261                 " from table "+ tableName);
262     }
263 
264     HTable acls = null;
265     try {
266       acls = new HTable(conf, ACL_TABLE_NAME);
267 
268       Scan scan = new Scan();
269       scan.addFamily(ACL_LIST_FAMILY);
270 
271       String columnName = Bytes.toString(column);
272       scan.setFilter(new QualifierFilter(CompareOp.EQUAL, new RegexStringComparator(
273                      String.format("(%s%s%s)|(%s%s)$",
274                      ACL_KEY_DELIMITER, columnName, ACL_KEY_DELIMITER,
275                      ACL_KEY_DELIMITER, columnName))));
276 
277       Set<byte[]> qualifierSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
278       ResultScanner scanner = acls.getScanner(scan);
279       try {
280         for (Result res : scanner) {
281           for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
282             qualifierSet.add(q);
283           }
284         }
285       } finally {
286         scanner.close();
287       }
288 
289       if (qualifierSet.size() > 0) {
290         Delete d = new Delete(tableName.getName());
291         for (byte[] qualifier : qualifierSet) {
292           d.deleteColumns(ACL_LIST_FAMILY, qualifier);
293         }
294         acls.delete(d);
295       }
296     } finally {
297       if (acls != null) acls.close();
298     }
299   }
300 
301   static byte[] userPermissionRowKey(UserPermission userPerm) {
302     byte[] row;
303     if(userPerm.hasNamespace()) {
304       row = Bytes.toBytes(toNamespaceEntry(userPerm.getNamespace()));
305     } else if(userPerm.isGlobal()) {
306       row = ACL_GLOBAL_NAME;
307     } else {
308       row = userPerm.getTableName().getName();
309     }
310     return row;
311   }
312 
313   /**
314    * Build qualifier key from user permission:
315    *  username
316    *  username,family
317    *  username,family,qualifier
318    */
319   static byte[] userPermissionKey(UserPermission userPerm) {
320     byte[] qualifier = userPerm.getQualifier();
321     byte[] family = userPerm.getFamily();
322     byte[] key = userPerm.getUser();
323 
324     if (family != null && family.length > 0) {
325       key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, family));
326       if (qualifier != null && qualifier.length > 0) {
327         key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, qualifier));
328       }
329     }
330 
331     return key;
332   }
333 
334   /**
335    * Returns {@code true} if the given region is part of the {@code _acl_}
336    * metadata table.
337    */
338   static boolean isAclRegion(HRegion region) {
339     return ACL_TABLE_NAME.equals(region.getTableDesc().getTableName());
340   }
341 
342   /**
343    * Returns {@code true} if the given table is {@code _acl_} metadata table.
344    */
345   static boolean isAclTable(HTableDescriptor desc) {
346     return ACL_TABLE_NAME.equals(desc.getTableName());
347   }
348 
349   /**
350    * Loads all of the permission grants stored in a region of the {@code _acl_}
351    * table.
352    *
353    * @param aclRegion
354    * @return a map of the permissions for this table.
355    * @throws IOException
356    */
357   static Map<byte[], ListMultimap<String,TablePermission>> loadAll(
358       HRegion aclRegion)
359     throws IOException {
360 
361     if (!isAclRegion(aclRegion)) {
362       throw new IOException("Can only load permissions from "+ACL_TABLE_NAME);
363     }
364 
365     Map<byte[], ListMultimap<String, TablePermission>> allPerms =
366         new TreeMap<byte[], ListMultimap<String, TablePermission>>(Bytes.BYTES_RAWCOMPARATOR);
367 
368     // do a full scan of _acl_ table
369 
370     Scan scan = new Scan();
371     scan.addFamily(ACL_LIST_FAMILY);
372 
373     InternalScanner iScanner = null;
374     try {
375       iScanner = aclRegion.getScanner(scan);
376 
377       while (true) {
378         List<Cell> row = new ArrayList<Cell>();
379 
380         boolean hasNext = iScanner.next(row);
381         ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
382         byte[] entry = null;
383         for (Cell kv : row) {
384           if (entry == null) {
385             entry = CellUtil.cloneRow(kv);
386           }
387           Pair<String,TablePermission> permissionsOfUserOnTable =
388               parsePermissionRecord(entry, kv);
389           if (permissionsOfUserOnTable != null) {
390             String username = permissionsOfUserOnTable.getFirst();
391             TablePermission permissions = permissionsOfUserOnTable.getSecond();
392             perms.put(username, permissions);
393           }
394         }
395         if (entry != null) {
396           allPerms.put(entry, perms);
397         }
398         if (!hasNext) {
399           break;
400         }
401       }
402     } finally {
403       if (iScanner != null) {
404         iScanner.close();
405       }
406     }
407 
408     return allPerms;
409   }
410 
411   /**
412    * Load all permissions from the region server holding {@code _acl_},
413    * primarily intended for testing purposes.
414    */
415   static Map<byte[], ListMultimap<String,TablePermission>> loadAll(
416       Configuration conf) throws IOException {
417     Map<byte[], ListMultimap<String,TablePermission>> allPerms =
418         new TreeMap<byte[], ListMultimap<String,TablePermission>>(Bytes.BYTES_RAWCOMPARATOR);
419 
420     // do a full scan of _acl_, filtering on only first table region rows
421 
422     Scan scan = new Scan();
423     scan.addFamily(ACL_LIST_FAMILY);
424 
425     HTable acls = null;
426     ResultScanner scanner = null;
427     try {
428       acls = new HTable(conf, ACL_TABLE_NAME);
429       scanner = acls.getScanner(scan);
430       for (Result row : scanner) {
431         ListMultimap<String,TablePermission> resultPerms =
432             parsePermissions(row.getRow(), row);
433         allPerms.put(row.getRow(), resultPerms);
434       }
435     } finally {
436       if (scanner != null) scanner.close();
437       if (acls != null) acls.close();
438     }
439 
440     return allPerms;
441   }
442 
443   static ListMultimap<String, TablePermission> getTablePermissions(Configuration conf,
444         TableName tableName) throws IOException {
445     return getPermissions(conf, tableName != null ? tableName.getName() : null);
446   }
447 
448   static ListMultimap<String, TablePermission> getNamespacePermissions(Configuration conf,
449         String namespace) throws IOException {
450     return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)));
451   }
452 
453   /**
454    * Reads user permission assignments stored in the <code>l:</code> column
455    * family of the first table row in <code>_acl_</code>.
456    *
457    * <p>
458    * See {@link AccessControlLists class documentation} for the key structure
459    * used for storage.
460    * </p>
461    */
462   static ListMultimap<String, TablePermission> getPermissions(Configuration conf,
463       byte[] entryName) throws IOException {
464     if (entryName == null) entryName = ACL_TABLE_NAME.getName();
465 
466     // for normal user tables, we just read the table row from _acl_
467     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
468     HTable acls = null;
469     try {
470       acls = new HTable(conf, ACL_TABLE_NAME);
471       Get get = new Get(entryName);
472       get.addFamily(ACL_LIST_FAMILY);
473       Result row = acls.get(get);
474       if (!row.isEmpty()) {
475         perms = parsePermissions(entryName, row);
476       } else {
477         LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
478             + Bytes.toString(entryName));
479       }
480     } finally {
481       if (acls != null) acls.close();
482     }
483 
484     return perms;
485   }
486 
487   /**
488    * Returns the currently granted permissions for a given table as a list of
489    * user plus associated permissions.
490    */
491   static List<UserPermission> getUserTablePermissions(
492       Configuration conf, TableName tableName) throws IOException {
493     return getUserPermissions(conf, tableName == null ? null : tableName.getName());
494   }
495 
496   static List<UserPermission> getUserNamespacePermissions(
497       Configuration conf, String namespace) throws IOException {
498     return getUserPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)));
499   }
500 
501   static List<UserPermission> getUserPermissions(
502       Configuration conf, byte[] entryName)
503   throws IOException {
504     ListMultimap<String,TablePermission> allPerms = getPermissions(
505       conf, entryName);
506 
507     List<UserPermission> perms = new ArrayList<UserPermission>();
508 
509     for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
510       UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
511           entry.getValue().getTableName(), entry.getValue().getFamily(),
512           entry.getValue().getQualifier(), entry.getValue().getActions());
513       perms.add(up);
514     }
515     return perms;
516   }
517 
518   private static ListMultimap<String, TablePermission> parsePermissions(
519       byte[] entryName, Result result) {
520     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
521     if (result != null && result.size() > 0) {
522       for (Cell kv : result.rawCells()) {
523 
524         Pair<String,TablePermission> permissionsOfUserOnTable =
525             parsePermissionRecord(entryName, kv);
526 
527         if (permissionsOfUserOnTable != null) {
528           String username = permissionsOfUserOnTable.getFirst();
529           TablePermission permissions = permissionsOfUserOnTable.getSecond();
530           perms.put(username, permissions);
531         }
532       }
533     }
534     return perms;
535   }
536 
537   private static Pair<String, TablePermission> parsePermissionRecord(
538       byte[] entryName, Cell kv) {
539     // return X given a set of permissions encoded in the permissionRecord kv.
540     byte[] family = CellUtil.cloneFamily(kv);
541 
542     if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
543       return null;
544     }
545 
546     byte[] key = CellUtil.cloneQualifier(kv);
547     byte[] value = CellUtil.cloneValue(kv);
548     if (LOG.isDebugEnabled()) {
549       LOG.debug("Read acl: kv ["+
550                 Bytes.toStringBinary(key)+": "+
551                 Bytes.toStringBinary(value)+"]");
552     }
553 
554     // check for a column family appended to the key
555     // TODO: avoid the string conversion to make this more efficient
556     String username = Bytes.toString(key);
557 
558     //Handle namespace entry
559     if(isNamespaceEntry(entryName)) {
560       return new Pair<String, TablePermission>(username,
561           new TablePermission(Bytes.toString(fromNamespaceEntry(entryName)), value));
562     }
563 
564     //Handle table and global entry
565     //TODO global entry should be handled differently
566     int idx = username.indexOf(ACL_KEY_DELIMITER);
567     byte[] permFamily = null;
568     byte[] permQualifier = null;
569     if (idx > 0 && idx < username.length()-1) {
570       String remainder = username.substring(idx+1);
571       username = username.substring(0, idx);
572       idx = remainder.indexOf(ACL_KEY_DELIMITER);
573       if (idx > 0 && idx < remainder.length()-1) {
574         permFamily = Bytes.toBytes(remainder.substring(0, idx));
575         permQualifier = Bytes.toBytes(remainder.substring(idx+1));
576       } else {
577         permFamily = Bytes.toBytes(remainder);
578       }
579     }
580 
581     return new Pair<String,TablePermission>(username,
582         new TablePermission(TableName.valueOf(entryName), permFamily, permQualifier, value));
583   }
584 
585   /**
586    * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances
587    * and returns the resulting byte array.
588    *
589    * Writes a set of permission [user: table permission]
590    */
591   public static byte[] writePermissionsAsBytes(ListMultimap<String, TablePermission> perms,
592       Configuration conf) {
593     return ProtobufUtil.prependPBMagic(ProtobufUtil.toUserTablePermissions(perms).toByteArray());
594   }
595 
596   /**
597    * Reads a set of permissions as {@link org.apache.hadoop.io.Writable} instances
598    * from the input stream.
599    */
600   public static ListMultimap<String, TablePermission> readPermissions(byte[] data,
601       Configuration conf)
602   throws DeserializationException {
603     if (ProtobufUtil.isPBMagicPrefix(data)) {
604       int pblen = ProtobufUtil.lengthOfPBMagic();
605       try {
606         AccessControlProtos.UsersAndPermissions perms =
607           AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
608             data, pblen, data.length - pblen).build();
609         return ProtobufUtil.toUserTablePermissions(perms);
610       } catch (InvalidProtocolBufferException e) {
611         throw new DeserializationException(e);
612       }
613     } else {
614       ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
615       try {
616         DataInput in = new DataInputStream(new ByteArrayInputStream(data));
617         int length = in.readInt();
618         for (int i=0; i<length; i++) {
619           String user = Text.readString(in);
620           List<TablePermission> userPerms =
621             (List)HbaseObjectWritableFor96Migration.readObject(in, conf);
622           perms.putAll(user, userPerms);
623         }
624       } catch (IOException e) {
625         throw new DeserializationException(e);
626       }
627       return perms;
628     }
629   }
630 
631   /**
632    * Returns whether or not the given name should be interpreted as a group
633    * principal.  Currently this simply checks if the name starts with the
634    * special group prefix character ("@").
635    */
636   public static boolean isGroupPrincipal(String name) {
637     return name != null && name.startsWith(GROUP_PREFIX);
638   }
639 
640   /**
641    * Returns the actual name for a group principal (stripped of the
642    * group prefix).
643    */
644   public static String getGroupName(String aclKey) {
645     if (!isGroupPrincipal(aclKey)) {
646       return aclKey;
647     }
648 
649     return aclKey.substring(GROUP_PREFIX.length());
650   }
651 
652   public static boolean isNamespaceEntry(String entryName) {
653     return entryName.charAt(0) == NAMESPACE_PREFIX;
654   }
655 
656   public static boolean isNamespaceEntry(byte[] entryName) {
657     return entryName[0] == NAMESPACE_PREFIX;
658   }
659   
660   public static String toNamespaceEntry(String namespace) {
661      return NAMESPACE_PREFIX + namespace;
662    }
663 
664    public static String fromNamespaceEntry(String namespace) {
665      if(namespace.charAt(0) != NAMESPACE_PREFIX)
666        throw new IllegalArgumentException("Argument is not a valid namespace entry");
667      return namespace.substring(1);
668    }
669 
670    public static byte[] toNamespaceEntry(byte[] namespace) {
671      byte[] ret = new byte[namespace.length+1];
672      ret[0] = NAMESPACE_PREFIX;
673      System.arraycopy(namespace, 0, ret, 1, namespace.length);
674      return ret;
675    }
676 
677    public static byte[] fromNamespaceEntry(byte[] namespace) {
678      if(namespace[0] != NAMESPACE_PREFIX) {
679        throw new IllegalArgumentException("Argument is not a valid namespace entry: " +
680            Bytes.toString(namespace));
681      }
682      return Arrays.copyOfRange(namespace, 1, namespace.length);
683    }
684 
685    public static List<Permission> getCellPermissionsForUser(User user, Cell cell)
686        throws IOException {
687      List<Permission> results = Lists.newArrayList();
688      Iterator<Tag> tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
689         cell.getTagsLength());
690      while (tagsIterator.hasNext()) {
691        Tag tag = tagsIterator.next();
692        if (tag.getType() == ACL_TAG_TYPE) {
693          // Deserialize the table permissions from the KV
694          ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
695            AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
696              tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
697          // Are there permissions for this user?
698          List<Permission> userPerms = kvPerms.get(user.getShortName());
699          if (userPerms != null) {
700            results.addAll(userPerms);
701          }
702          // Are there permissions for any of the groups this user belongs to?
703          String groupNames[] = user.getGroupNames();
704          if (groupNames != null) {
705            for (String group : groupNames) {
706              List<Permission> groupPerms = kvPerms.get(GROUP_PREFIX + group);
707              if (results != null) {
708                results.addAll(groupPerms);
709              }
710            }
711          }
712        }
713      }
714      return results;
715    }
716 }