View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.access;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.TreeMap;
32  import java.util.TreeSet;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.AuthUtil;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.NamespaceDescriptor;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.Tag;
46  import org.apache.hadoop.hbase.TagType;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.hbase.client.Connection;
49  import org.apache.hadoop.hbase.client.ConnectionFactory;
50  import org.apache.hadoop.hbase.client.Delete;
51  import org.apache.hadoop.hbase.client.Get;
52  import org.apache.hadoop.hbase.client.Put;
53  import org.apache.hadoop.hbase.client.Result;
54  import org.apache.hadoop.hbase.client.ResultScanner;
55  import org.apache.hadoop.hbase.client.Scan;
56  import org.apache.hadoop.hbase.client.Table;
57  import org.apache.hadoop.hbase.exceptions.DeserializationException;
58  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
59  import org.apache.hadoop.hbase.filter.QualifierFilter;
60  import org.apache.hadoop.hbase.filter.RegexStringComparator;
61  import org.apache.hadoop.hbase.master.MasterServices;
62  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
63  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
64  import org.apache.hadoop.hbase.regionserver.BloomType;
65  import org.apache.hadoop.hbase.regionserver.InternalScanner;
66  import org.apache.hadoop.hbase.regionserver.Region;
67  import org.apache.hadoop.hbase.security.User;
68  import org.apache.hadoop.hbase.util.Bytes;
69  import org.apache.hadoop.hbase.util.Pair;
70  import org.apache.hadoop.io.Text;
71  
72  import com.google.common.collect.ArrayListMultimap;
73  import com.google.common.collect.ListMultimap;
74  import com.google.common.collect.Lists;
75  
76  /**
77   * Maintains lists of permission grants to users and groups to allow for
78   * authorization checks by {@link AccessController}.
79   *
80   * <p>
81   * Access control lists are stored in an "internal" metadata table named
82   * {@code _acl_}. Each table's permission grants are stored as a separate row,
83   * keyed by the table name. KeyValues for permissions assignments are stored
84   * in one of the formats:
85   * <pre>
86   * Key                      Desc
87   * --------                 --------
88   * user                     table level permissions for a user [R=read, W=write]
89   * group                    table level permissions for a group
90   * user,family              column family level permissions for a user
91   * group,family             column family level permissions for a group
92   * user,family,qualifier    column qualifier level permissions for a user
93   * group,family,qualifier   column qualifier level permissions for a group
94   * </pre>
95   * <p>
96   * All values are encoded as byte arrays containing the codes from the
97   * org.apache.hadoop.hbase.security.access.TablePermission.Action enum.
98   * </p>
99   */
100 @InterfaceAudience.Private
101 public class AccessControlLists {
102   /** Internal storage table for access control lists */
103   public static final TableName ACL_TABLE_NAME =
104       TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
105   public static final byte[] ACL_GLOBAL_NAME = ACL_TABLE_NAME.getName();
106   /** Column family used to store ACL grants */
107   public static final String ACL_LIST_FAMILY_STR = "l";
108   public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
109   /** KV tag to store per cell access control lists */
110   public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE;
111 
112   public static final char NAMESPACE_PREFIX = '@';
113 
114   /**
115    * Delimiter to separate user, column family, and qualifier in
116    * _acl_ table info: column keys */
117   public static final char ACL_KEY_DELIMITER = ',';
118 
119   private static final Log LOG = LogFactory.getLog(AccessControlLists.class);
120 
121   /**
122    * Create the ACL table
123    * @param master
124    * @throws IOException
125    */
126   static void createACLTable(MasterServices master) throws IOException {
127     master.createTable(new HTableDescriptor(ACL_TABLE_NAME)
128       .addFamily(new HColumnDescriptor(ACL_LIST_FAMILY)
129         .setMaxVersions(1)
130         .setInMemory(true)
131         .setBlockCacheEnabled(true)
132         .setBlocksize(8 * 1024)
133         .setBloomFilterType(BloomType.NONE)
134         .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
135         // Set cache data blocks in L1 if more than one cache tier deployed; e.g. this will
136         // be the case if we are using CombinedBlockCache (Bucket Cache).
137         .setCacheDataInL1(true)),
138     null,
139     HConstants.NO_NONCE,
140     HConstants.NO_NONCE);
141   }
142 
143   /**
144    * Stores a new user permission grant in the access control lists table.
145    * @param conf the configuration
146    * @param userPerm the details of the permission to be granted
147    * @throws IOException in the case of an error accessing the metadata table
148    */
149   static void addUserPermission(Configuration conf, UserPermission userPerm)
150       throws IOException {
151     Permission.Action[] actions = userPerm.getActions();
152     byte[] rowKey = userPermissionRowKey(userPerm);
153     Put p = new Put(rowKey);
154     byte[] key = userPermissionKey(userPerm);
155 
156     if ((actions == null) || (actions.length == 0)) {
157       String msg = "No actions associated with user '" + Bytes.toString(userPerm.getUser()) + "'";
158       LOG.warn(msg);
159       throw new IOException(msg);
160     }
161 
162     byte[] value = new byte[actions.length];
163     for (int i = 0; i < actions.length; i++) {
164       value[i] = actions[i].code();
165     }
166     p.addImmutable(ACL_LIST_FAMILY, key, value);
167     if (LOG.isDebugEnabled()) {
168       LOG.debug("Writing permission with rowKey "+
169           Bytes.toString(rowKey)+" "+
170           Bytes.toString(key)+": "+Bytes.toStringBinary(value)
171       );
172     }
173     // TODO: Pass in a Connection rather than create one each time.
174     try (Connection connection = ConnectionFactory.createConnection(conf)) {
175       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
176         table.put(p);
177       }
178     }
179   }
180 
181   /**
182    * Removes a previously granted permission from the stored access control
183    * lists.  The {@link TablePermission} being removed must exactly match what
184    * is stored -- no wildcard matching is attempted.  Ie, if user "bob" has
185    * been granted "READ" access to the "data" table, but only to column family
186    * plus qualifier "info:colA", then trying to call this method with only
187    * user "bob" and the table name "data" (but without specifying the
188    * column qualifier "info:colA") will have no effect.
189    *
190    * @param conf the configuration
191    * @param userPerm the details of the permission to be revoked
192    * @throws IOException if there is an error accessing the metadata table
193    */
194   static void removeUserPermission(Configuration conf, UserPermission userPerm)
195       throws IOException {
196     Delete d = new Delete(userPermissionRowKey(userPerm));
197     byte[] key = userPermissionKey(userPerm);
198 
199     if (LOG.isDebugEnabled()) {
200       LOG.debug("Removing permission "+ userPerm.toString());
201     }
202     d.addColumns(ACL_LIST_FAMILY, key);
203     // TODO: Pass in a Connection rather than create one each time.
204     try (Connection connection = ConnectionFactory.createConnection(conf)) {
205       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
206         table.delete(d);
207       }
208     }
209   }
210 
211   /**
212    * Remove specified table from the _acl_ table.
213    */
214   static void removeTablePermissions(Configuration conf, TableName tableName)
215       throws IOException{
216     Delete d = new Delete(tableName.getName());
217 
218     if (LOG.isDebugEnabled()) {
219       LOG.debug("Removing permissions of removed table "+ tableName);
220     }
221     // TODO: Pass in a Connection rather than create one each time.
222     try (Connection connection = ConnectionFactory.createConnection(conf)) {
223       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
224         table.delete(d);
225       }
226     }
227   }
228 
229   /**
230    * Remove specified namespace from the acl table.
231    */
232   static void removeNamespacePermissions(Configuration conf, String namespace)
233       throws IOException{
234     Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
235 
236     if (LOG.isDebugEnabled()) {
237       LOG.debug("Removing permissions of removed namespace "+ namespace);
238     }
239 
240     try (Connection connection = ConnectionFactory.createConnection(conf)) {
241       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
242         table.delete(d);
243       }
244     }
245   }
246 
247   /**
248    * Remove specified table column from the acl table.
249    */
250   static void removeTablePermissions(Configuration conf, TableName tableName, byte[] column)
251       throws IOException{
252 
253     if (LOG.isDebugEnabled()) {
254       LOG.debug("Removing permissions of removed column " + Bytes.toString(column) +
255                 " from table "+ tableName);
256     }
257     // TODO: Pass in a Connection rather than create one each time.
258     try (Connection connection = ConnectionFactory.createConnection(conf)) {
259       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
260         Scan scan = new Scan();
261         scan.addFamily(ACL_LIST_FAMILY);
262 
263         String columnName = Bytes.toString(column);
264         scan.setFilter(new QualifierFilter(CompareOp.EQUAL, new RegexStringComparator(
265             String.format("(%s%s%s)|(%s%s)$",
266                 ACL_KEY_DELIMITER, columnName, ACL_KEY_DELIMITER,
267                 ACL_KEY_DELIMITER, columnName))));
268 
269         Set<byte[]> qualifierSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
270         ResultScanner scanner = table.getScanner(scan);
271         try {
272           for (Result res : scanner) {
273             for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
274               qualifierSet.add(q);
275             }
276           }
277         } finally {
278           scanner.close();
279         }
280 
281         if (qualifierSet.size() > 0) {
282           Delete d = new Delete(tableName.getName());
283           for (byte[] qualifier : qualifierSet) {
284             d.addColumns(ACL_LIST_FAMILY, qualifier);
285           }
286           table.delete(d);
287         }
288       }
289     }
290   }
291 
292   static byte[] userPermissionRowKey(UserPermission userPerm) {
293     byte[] row;
294     if(userPerm.hasNamespace()) {
295       row = Bytes.toBytes(toNamespaceEntry(userPerm.getNamespace()));
296     } else if(userPerm.isGlobal()) {
297       row = ACL_GLOBAL_NAME;
298     } else {
299       row = userPerm.getTableName().getName();
300     }
301     return row;
302   }
303 
304   /**
305    * Build qualifier key from user permission:
306    *  username
307    *  username,family
308    *  username,family,qualifier
309    */
310   static byte[] userPermissionKey(UserPermission userPerm) {
311     byte[] qualifier = userPerm.getQualifier();
312     byte[] family = userPerm.getFamily();
313     byte[] key = userPerm.getUser();
314 
315     if (family != null && family.length > 0) {
316       key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, family));
317       if (qualifier != null && qualifier.length > 0) {
318         key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, qualifier));
319       }
320     }
321 
322     return key;
323   }
324 
325   /**
326    * Returns {@code true} if the given region is part of the {@code _acl_}
327    * metadata table.
328    */
329   static boolean isAclRegion(Region region) {
330     return ACL_TABLE_NAME.equals(region.getTableDesc().getTableName());
331   }
332 
333   /**
334    * Returns {@code true} if the given table is {@code _acl_} metadata table.
335    */
336   static boolean isAclTable(HTableDescriptor desc) {
337     return ACL_TABLE_NAME.equals(desc.getTableName());
338   }
339 
340   /**
341    * Loads all of the permission grants stored in a region of the {@code _acl_}
342    * table.
343    *
344    * @param aclRegion
345    * @return a map of the permissions for this table.
346    * @throws IOException
347    */
348   static Map<byte[], ListMultimap<String,TablePermission>> loadAll(Region aclRegion)
349     throws IOException {
350 
351     if (!isAclRegion(aclRegion)) {
352       throw new IOException("Can only load permissions from "+ACL_TABLE_NAME);
353     }
354 
355     Map<byte[], ListMultimap<String, TablePermission>> allPerms =
356         new TreeMap<byte[], ListMultimap<String, TablePermission>>(Bytes.BYTES_RAWCOMPARATOR);
357 
358     // do a full scan of _acl_ table
359 
360     Scan scan = new Scan();
361     scan.addFamily(ACL_LIST_FAMILY);
362 
363     InternalScanner iScanner = null;
364     try {
365       iScanner = aclRegion.getScanner(scan);
366 
367       while (true) {
368         List<Cell> row = new ArrayList<Cell>();
369 
370         boolean hasNext = iScanner.next(row);
371         ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
372         byte[] entry = null;
373         for (Cell kv : row) {
374           if (entry == null) {
375             entry = CellUtil.cloneRow(kv);
376           }
377           Pair<String,TablePermission> permissionsOfUserOnTable =
378               parsePermissionRecord(entry, kv);
379           if (permissionsOfUserOnTable != null) {
380             String username = permissionsOfUserOnTable.getFirst();
381             TablePermission permissions = permissionsOfUserOnTable.getSecond();
382             perms.put(username, permissions);
383           }
384         }
385         if (entry != null) {
386           allPerms.put(entry, perms);
387         }
388         if (!hasNext) {
389           break;
390         }
391       }
392     } finally {
393       if (iScanner != null) {
394         iScanner.close();
395       }
396     }
397 
398     return allPerms;
399   }
400 
401   /**
402    * Load all permissions from the region server holding {@code _acl_},
403    * primarily intended for testing purposes.
404    */
405   static Map<byte[], ListMultimap<String,TablePermission>> loadAll(
406       Configuration conf) throws IOException {
407     Map<byte[], ListMultimap<String,TablePermission>> allPerms =
408         new TreeMap<byte[], ListMultimap<String,TablePermission>>(Bytes.BYTES_RAWCOMPARATOR);
409 
410     // do a full scan of _acl_, filtering on only first table region rows
411 
412     Scan scan = new Scan();
413     scan.addFamily(ACL_LIST_FAMILY);
414 
415     ResultScanner scanner = null;
416     // TODO: Pass in a Connection rather than create one each time.
417     try (Connection connection = ConnectionFactory.createConnection(conf)) {
418       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
419         scanner = table.getScanner(scan);
420         try {
421           for (Result row : scanner) {
422             ListMultimap<String,TablePermission> resultPerms = parsePermissions(row.getRow(), row);
423             allPerms.put(row.getRow(), resultPerms);
424           }
425         } finally {
426           if (scanner != null) scanner.close();
427         }
428       }
429     }
430 
431     return allPerms;
432   }
433 
434   static ListMultimap<String, TablePermission> getTablePermissions(Configuration conf,
435         TableName tableName) throws IOException {
436     return getPermissions(conf, tableName != null ? tableName.getName() : null);
437   }
438 
439   static ListMultimap<String, TablePermission> getNamespacePermissions(Configuration conf,
440         String namespace) throws IOException {
441     return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)));
442   }
443 
444   /**
445    * Reads user permission assignments stored in the <code>l:</code> column
446    * family of the first table row in <code>_acl_</code>.
447    *
448    * <p>
449    * See {@link AccessControlLists class documentation} for the key structure
450    * used for storage.
451    * </p>
452    */
453   static ListMultimap<String, TablePermission> getPermissions(Configuration conf,
454       byte[] entryName) throws IOException {
455     if (entryName == null) entryName = ACL_GLOBAL_NAME;
456 
457     // for normal user tables, we just read the table row from _acl_
458     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
459     // TODO: Pass in a Connection rather than create one each time.
460     try (Connection connection = ConnectionFactory.createConnection(conf)) {
461       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
462         Get get = new Get(entryName);
463         get.addFamily(ACL_LIST_FAMILY);
464         Result row = table.get(get);
465         if (!row.isEmpty()) {
466           perms = parsePermissions(entryName, row);
467         } else {
468           LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
469               + Bytes.toString(entryName));
470         }
471       }
472     }
473 
474     return perms;
475   }
476 
477   /**
478    * Returns the currently granted permissions for a given table as a list of
479    * user plus associated permissions.
480    */
481   static List<UserPermission> getUserTablePermissions(
482       Configuration conf, TableName tableName) throws IOException {
483     return getUserPermissions(conf, tableName == null ? null : tableName.getName());
484   }
485 
486   static List<UserPermission> getUserNamespacePermissions(
487       Configuration conf, String namespace) throws IOException {
488     return getUserPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)));
489   }
490 
491   static List<UserPermission> getUserPermissions(
492       Configuration conf, byte[] entryName)
493   throws IOException {
494     ListMultimap<String,TablePermission> allPerms = getPermissions(
495       conf, entryName);
496 
497     List<UserPermission> perms = new ArrayList<UserPermission>();
498 
499     if(isNamespaceEntry(entryName)) {  // Namespace
500       for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
501         UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
502           entry.getValue().getNamespace(), entry.getValue().getActions());
503         perms.add(up);
504       }
505     } else {  // Table
506       for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
507         UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
508             entry.getValue().getTableName(), entry.getValue().getFamily(),
509             entry.getValue().getQualifier(), entry.getValue().getActions());
510         perms.add(up);
511       }
512     }
513     return perms;
514   }
515 
516   private static ListMultimap<String, TablePermission> parsePermissions(
517       byte[] entryName, Result result) {
518     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
519     if (result != null && result.size() > 0) {
520       for (Cell kv : result.rawCells()) {
521 
522         Pair<String,TablePermission> permissionsOfUserOnTable =
523             parsePermissionRecord(entryName, kv);
524 
525         if (permissionsOfUserOnTable != null) {
526           String username = permissionsOfUserOnTable.getFirst();
527           TablePermission permissions = permissionsOfUserOnTable.getSecond();
528           perms.put(username, permissions);
529         }
530       }
531     }
532     return perms;
533   }
534 
535   private static Pair<String, TablePermission> parsePermissionRecord(
536       byte[] entryName, Cell kv) {
537     // return X given a set of permissions encoded in the permissionRecord kv.
538     byte[] family = CellUtil.cloneFamily(kv);
539 
540     if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
541       return null;
542     }
543 
544     byte[] key = CellUtil.cloneQualifier(kv);
545     byte[] value = CellUtil.cloneValue(kv);
546     if (LOG.isDebugEnabled()) {
547       LOG.debug("Read acl: kv ["+
548                 Bytes.toStringBinary(key)+": "+
549                 Bytes.toStringBinary(value)+"]");
550     }
551 
552     // check for a column family appended to the key
553     // TODO: avoid the string conversion to make this more efficient
554     String username = Bytes.toString(key);
555 
556     //Handle namespace entry
557     if(isNamespaceEntry(entryName)) {
558       return new Pair<String, TablePermission>(username,
559           new TablePermission(Bytes.toString(fromNamespaceEntry(entryName)), value));
560     }
561 
562     //Handle table and global entry
563     //TODO global entry should be handled differently
564     int idx = username.indexOf(ACL_KEY_DELIMITER);
565     byte[] permFamily = null;
566     byte[] permQualifier = null;
567     if (idx > 0 && idx < username.length()-1) {
568       String remainder = username.substring(idx+1);
569       username = username.substring(0, idx);
570       idx = remainder.indexOf(ACL_KEY_DELIMITER);
571       if (idx > 0 && idx < remainder.length()-1) {
572         permFamily = Bytes.toBytes(remainder.substring(0, idx));
573         permQualifier = Bytes.toBytes(remainder.substring(idx+1));
574       } else {
575         permFamily = Bytes.toBytes(remainder);
576       }
577     }
578 
579     return new Pair<String,TablePermission>(username,
580         new TablePermission(TableName.valueOf(entryName), permFamily, permQualifier, value));
581   }
582 
583   /**
584    * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances
585    * and returns the resulting byte array.
586    *
587    * Writes a set of permission [user: table permission]
588    */
589   public static byte[] writePermissionsAsBytes(ListMultimap<String, TablePermission> perms,
590       Configuration conf) {
591     return ProtobufUtil.prependPBMagic(ProtobufUtil.toUserTablePermissions(perms).toByteArray());
592   }
593 
594   /**
595    * Reads a set of permissions as {@link org.apache.hadoop.io.Writable} instances
596    * from the input stream.
597    */
598   public static ListMultimap<String, TablePermission> readPermissions(byte[] data,
599       Configuration conf)
600   throws DeserializationException {
601     if (ProtobufUtil.isPBMagicPrefix(data)) {
602       int pblen = ProtobufUtil.lengthOfPBMagic();
603       try {
604         AccessControlProtos.UsersAndPermissions.Builder builder =
605           AccessControlProtos.UsersAndPermissions.newBuilder();
606         ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
607         return ProtobufUtil.toUserTablePermissions(builder.build());
608       } catch (IOException e) {
609         throw new DeserializationException(e);
610       }
611     } else {
612       ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
613       try {
614         DataInput in = new DataInputStream(new ByteArrayInputStream(data));
615         int length = in.readInt();
616         for (int i=0; i<length; i++) {
617           String user = Text.readString(in);
618           List<TablePermission> userPerms =
619             (List)HbaseObjectWritableFor96Migration.readObject(in, conf);
620           perms.putAll(user, userPerms);
621         }
622       } catch (IOException e) {
623         throw new DeserializationException(e);
624       }
625       return perms;
626     }
627   }
628 
629   public static boolean isNamespaceEntry(String entryName) {
630     return entryName != null && entryName.charAt(0) == NAMESPACE_PREFIX;
631   }
632 
633   public static boolean isNamespaceEntry(byte[] entryName) {
634     return entryName != null && entryName.length !=0 && entryName[0] == NAMESPACE_PREFIX;
635   }
636 
637   public static String toNamespaceEntry(String namespace) {
638      return NAMESPACE_PREFIX + namespace;
639    }
640 
641    public static String fromNamespaceEntry(String namespace) {
642      if(namespace.charAt(0) != NAMESPACE_PREFIX)
643        throw new IllegalArgumentException("Argument is not a valid namespace entry");
644      return namespace.substring(1);
645    }
646 
647    public static byte[] toNamespaceEntry(byte[] namespace) {
648      byte[] ret = new byte[namespace.length+1];
649      ret[0] = NAMESPACE_PREFIX;
650      System.arraycopy(namespace, 0, ret, 1, namespace.length);
651      return ret;
652    }
653 
654    public static byte[] fromNamespaceEntry(byte[] namespace) {
655      if(namespace[0] != NAMESPACE_PREFIX) {
656        throw new IllegalArgumentException("Argument is not a valid namespace entry: " +
657            Bytes.toString(namespace));
658      }
659      return Arrays.copyOfRange(namespace, 1, namespace.length);
660    }
661 
662    public static List<Permission> getCellPermissionsForUser(User user, Cell cell)
663        throws IOException {
664      // Save an object allocation where we can
665      if (cell.getTagsLength() == 0) {
666        return null;
667      }
668      List<Permission> results = Lists.newArrayList();
669      Iterator<Tag> tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
670         cell.getTagsLength());
671      while (tagsIterator.hasNext()) {
672        Tag tag = tagsIterator.next();
673        if (tag.getType() == ACL_TAG_TYPE) {
674          // Deserialize the table permissions from the KV
675          // TODO: This can be improved. Don't build UsersAndPermissions just to unpack it again,
676          // use the builder
677          AccessControlProtos.UsersAndPermissions.Builder builder = 
678            AccessControlProtos.UsersAndPermissions.newBuilder();
679          ProtobufUtil.mergeFrom(builder, tag.getBuffer(), tag.getTagOffset(), tag.getTagLength());
680          ListMultimap<String,Permission> kvPerms =
681            ProtobufUtil.toUsersAndPermissions(builder.build());
682          // Are there permissions for this user?
683          List<Permission> userPerms = kvPerms.get(user.getShortName());
684          if (userPerms != null) {
685            results.addAll(userPerms);
686          }
687          // Are there permissions for any of the groups this user belongs to?
688          String groupNames[] = user.getGroupNames();
689          if (groupNames != null) {
690            for (String group : groupNames) {
691              List<Permission> groupPerms = kvPerms.get(AuthUtil.toGroupEntry(group));
692              if (results != null) {
693                results.addAll(groupPerms);
694              }
695            }
696          }
697        }
698      }
699      return results;
700    }
701 }