View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.access;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.TreeMap;
32  import java.util.TreeSet;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.AuthUtil;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.NamespaceDescriptor;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.Tag;
46  import org.apache.hadoop.hbase.TagType;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.hbase.client.Connection;
49  import org.apache.hadoop.hbase.client.ConnectionFactory;
50  import org.apache.hadoop.hbase.client.Delete;
51  import org.apache.hadoop.hbase.client.Get;
52  import org.apache.hadoop.hbase.client.Put;
53  import org.apache.hadoop.hbase.client.Result;
54  import org.apache.hadoop.hbase.client.ResultScanner;
55  import org.apache.hadoop.hbase.client.Scan;
56  import org.apache.hadoop.hbase.client.Table;
57  import org.apache.hadoop.hbase.exceptions.DeserializationException;
58  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
59  import org.apache.hadoop.hbase.filter.QualifierFilter;
60  import org.apache.hadoop.hbase.filter.RegexStringComparator;
61  import org.apache.hadoop.hbase.master.MasterServices;
62  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
63  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
64  import org.apache.hadoop.hbase.regionserver.BloomType;
65  import org.apache.hadoop.hbase.regionserver.InternalScanner;
66  import org.apache.hadoop.hbase.regionserver.Region;
67  import org.apache.hadoop.hbase.security.User;
68  import org.apache.hadoop.hbase.util.Bytes;
69  import org.apache.hadoop.hbase.util.Pair;
70  import org.apache.hadoop.io.Text;
71  
72  import com.google.common.collect.ArrayListMultimap;
73  import com.google.common.collect.ListMultimap;
74  import com.google.common.collect.Lists;
75  import com.google.protobuf.InvalidProtocolBufferException;
76  
77  /**
78   * Maintains lists of permission grants to users and groups to allow for
79   * authorization checks by {@link AccessController}.
80   *
81   * <p>
82   * Access control lists are stored in an "internal" metadata table named
83   * {@code _acl_}. Each table's permission grants are stored as a separate row,
84   * keyed by the table name. KeyValues for permissions assignments are stored
85   * in one of the formats:
86   * <pre>
87   * Key                      Desc
88   * --------                 --------
89   * user                     table level permissions for a user [R=read, W=write]
90   * group                    table level permissions for a group
91   * user,family              column family level permissions for a user
92   * group,family             column family level permissions for a group
93   * user,family,qualifier    column qualifier level permissions for a user
94   * group,family,qualifier   column qualifier level permissions for a group
95   * </pre>
96   * <p>
97   * All values are encoded as byte arrays containing the codes from the
98   * org.apache.hadoop.hbase.security.access.TablePermission.Action enum.
99   * </p>
100  */
101 @InterfaceAudience.Private
102 public class AccessControlLists {
103   /** Internal storage table for access control lists */
104   public static final TableName ACL_TABLE_NAME =
105       TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
106   public static final byte[] ACL_GLOBAL_NAME = ACL_TABLE_NAME.getName();
107   /** Column family used to store ACL grants */
108   public static final String ACL_LIST_FAMILY_STR = "l";
109   public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
110   /** KV tag to store per cell access control lists */
111   public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE;
112 
113   public static final char NAMESPACE_PREFIX = '@';
114 
115   /**
116    * Delimiter to separate user, column family, and qualifier in
117    * _acl_ table info: column keys */
118   public static final char ACL_KEY_DELIMITER = ',';
119 
120   private static final Log LOG = LogFactory.getLog(AccessControlLists.class);
121 
122   /**
123    * Create the ACL table
124    * @param master
125    * @throws IOException
126    */
127   static void createACLTable(MasterServices master) throws IOException {
128     master.createTable(new HTableDescriptor(ACL_TABLE_NAME)
129       .addFamily(new HColumnDescriptor(ACL_LIST_FAMILY)
130         .setMaxVersions(1)
131         .setInMemory(true)
132         .setBlockCacheEnabled(true)
133         .setBlocksize(8 * 1024)
134         .setBloomFilterType(BloomType.NONE)
135         .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
136         // Set cache data blocks in L1 if more than one cache tier deployed; e.g. this will
137         // be the case if we are using CombinedBlockCache (Bucket Cache).
138         .setCacheDataInL1(true)),
139     null);
140   }
141 
142   /**
143    * Stores a new user permission grant in the access control lists table.
144    * @param conf the configuration
145    * @param userPerm the details of the permission to be granted
146    * @throws IOException in the case of an error accessing the metadata table
147    */
148   static void addUserPermission(Configuration conf, UserPermission userPerm)
149       throws IOException {
150     Permission.Action[] actions = userPerm.getActions();
151     byte[] rowKey = userPermissionRowKey(userPerm);
152     Put p = new Put(rowKey);
153     byte[] key = userPermissionKey(userPerm);
154 
155     if ((actions == null) || (actions.length == 0)) {
156       String msg = "No actions associated with user '" + Bytes.toString(userPerm.getUser()) + "'";
157       LOG.warn(msg);
158       throw new IOException(msg);
159     }
160 
161     byte[] value = new byte[actions.length];
162     for (int i = 0; i < actions.length; i++) {
163       value[i] = actions[i].code();
164     }
165     p.addImmutable(ACL_LIST_FAMILY, key, value);
166     if (LOG.isDebugEnabled()) {
167       LOG.debug("Writing permission with rowKey "+
168           Bytes.toString(rowKey)+" "+
169           Bytes.toString(key)+": "+Bytes.toStringBinary(value)
170       );
171     }
172     // TODO: Pass in a Connection rather than create one each time.
173     try (Connection connection = ConnectionFactory.createConnection(conf)) {
174       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
175         table.put(p);
176       }
177     }
178   }
179 
180   /**
181    * Removes a previously granted permission from the stored access control
182    * lists.  The {@link TablePermission} being removed must exactly match what
183    * is stored -- no wildcard matching is attempted.  Ie, if user "bob" has
184    * been granted "READ" access to the "data" table, but only to column family
185    * plus qualifier "info:colA", then trying to call this method with only
186    * user "bob" and the table name "data" (but without specifying the
187    * column qualifier "info:colA") will have no effect.
188    *
189    * @param conf the configuration
190    * @param userPerm the details of the permission to be revoked
191    * @throws IOException if there is an error accessing the metadata table
192    */
193   static void removeUserPermission(Configuration conf, UserPermission userPerm)
194       throws IOException {
195     Delete d = new Delete(userPermissionRowKey(userPerm));
196     byte[] key = userPermissionKey(userPerm);
197 
198     if (LOG.isDebugEnabled()) {
199       LOG.debug("Removing permission "+ userPerm.toString());
200     }
201     d.addColumns(ACL_LIST_FAMILY, key);
202     // TODO: Pass in a Connection rather than create one each time.
203     try (Connection connection = ConnectionFactory.createConnection(conf)) {
204       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
205         table.delete(d);
206       }
207     }
208   }
209 
210   /**
211    * Remove specified table from the _acl_ table.
212    */
213   static void removeTablePermissions(Configuration conf, TableName tableName)
214       throws IOException{
215     Delete d = new Delete(tableName.getName());
216 
217     if (LOG.isDebugEnabled()) {
218       LOG.debug("Removing permissions of removed table "+ tableName);
219     }
220     // TODO: Pass in a Connection rather than create one each time.
221     try (Connection connection = ConnectionFactory.createConnection(conf)) {
222       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
223         table.delete(d);
224       }
225     }
226   }
227 
228   /**
229    * Remove specified namespace from the acl table.
230    */
231   static void removeNamespacePermissions(Configuration conf, String namespace)
232       throws IOException{
233     Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
234 
235     if (LOG.isDebugEnabled()) {
236       LOG.debug("Removing permissions of removed namespace "+ namespace);
237     }
238 
239     try (Connection connection = ConnectionFactory.createConnection(conf)) {
240       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
241         table.delete(d);
242       }
243     }
244   }
245 
246   /**
247    * Remove specified table column from the acl table.
248    */
249   static void removeTablePermissions(Configuration conf, TableName tableName, byte[] column)
250       throws IOException{
251 
252     if (LOG.isDebugEnabled()) {
253       LOG.debug("Removing permissions of removed column " + Bytes.toString(column) +
254                 " from table "+ tableName);
255     }
256     // TODO: Pass in a Connection rather than create one each time.
257     try (Connection connection = ConnectionFactory.createConnection(conf)) {
258       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
259         Scan scan = new Scan();
260         scan.addFamily(ACL_LIST_FAMILY);
261 
262         String columnName = Bytes.toString(column);
263         scan.setFilter(new QualifierFilter(CompareOp.EQUAL, new RegexStringComparator(
264             String.format("(%s%s%s)|(%s%s)$",
265                 ACL_KEY_DELIMITER, columnName, ACL_KEY_DELIMITER,
266                 ACL_KEY_DELIMITER, columnName))));
267 
268         Set<byte[]> qualifierSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
269         ResultScanner scanner = table.getScanner(scan);
270         try {
271           for (Result res : scanner) {
272             for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
273               qualifierSet.add(q);
274             }
275           }
276         } finally {
277           scanner.close();
278         }
279 
280         if (qualifierSet.size() > 0) {
281           Delete d = new Delete(tableName.getName());
282           for (byte[] qualifier : qualifierSet) {
283             d.addColumns(ACL_LIST_FAMILY, qualifier);
284           }
285           table.delete(d);
286         }
287       }
288     }
289   }
290 
291   static byte[] userPermissionRowKey(UserPermission userPerm) {
292     byte[] row;
293     if(userPerm.hasNamespace()) {
294       row = Bytes.toBytes(toNamespaceEntry(userPerm.getNamespace()));
295     } else if(userPerm.isGlobal()) {
296       row = ACL_GLOBAL_NAME;
297     } else {
298       row = userPerm.getTableName().getName();
299     }
300     return row;
301   }
302 
303   /**
304    * Build qualifier key from user permission:
305    *  username
306    *  username,family
307    *  username,family,qualifier
308    */
309   static byte[] userPermissionKey(UserPermission userPerm) {
310     byte[] qualifier = userPerm.getQualifier();
311     byte[] family = userPerm.getFamily();
312     byte[] key = userPerm.getUser();
313 
314     if (family != null && family.length > 0) {
315       key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, family));
316       if (qualifier != null && qualifier.length > 0) {
317         key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, qualifier));
318       }
319     }
320 
321     return key;
322   }
323 
324   /**
325    * Returns {@code true} if the given region is part of the {@code _acl_}
326    * metadata table.
327    */
328   static boolean isAclRegion(Region region) {
329     return ACL_TABLE_NAME.equals(region.getTableDesc().getTableName());
330   }
331 
332   /**
333    * Returns {@code true} if the given table is {@code _acl_} metadata table.
334    */
335   static boolean isAclTable(HTableDescriptor desc) {
336     return ACL_TABLE_NAME.equals(desc.getTableName());
337   }
338 
339   /**
340    * Loads all of the permission grants stored in a region of the {@code _acl_}
341    * table.
342    *
343    * @param aclRegion
344    * @return a map of the permissions for this table.
345    * @throws IOException
346    */
347   static Map<byte[], ListMultimap<String,TablePermission>> loadAll(Region aclRegion)
348     throws IOException {
349 
350     if (!isAclRegion(aclRegion)) {
351       throw new IOException("Can only load permissions from "+ACL_TABLE_NAME);
352     }
353 
354     Map<byte[], ListMultimap<String, TablePermission>> allPerms =
355         new TreeMap<byte[], ListMultimap<String, TablePermission>>(Bytes.BYTES_RAWCOMPARATOR);
356 
357     // do a full scan of _acl_ table
358 
359     Scan scan = new Scan();
360     scan.addFamily(ACL_LIST_FAMILY);
361 
362     InternalScanner iScanner = null;
363     try {
364       iScanner = aclRegion.getScanner(scan);
365 
366       while (true) {
367         List<Cell> row = new ArrayList<Cell>();
368 
369         boolean hasNext = iScanner.next(row);
370         ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
371         byte[] entry = null;
372         for (Cell kv : row) {
373           if (entry == null) {
374             entry = CellUtil.cloneRow(kv);
375           }
376           Pair<String,TablePermission> permissionsOfUserOnTable =
377               parsePermissionRecord(entry, kv);
378           if (permissionsOfUserOnTable != null) {
379             String username = permissionsOfUserOnTable.getFirst();
380             TablePermission permissions = permissionsOfUserOnTable.getSecond();
381             perms.put(username, permissions);
382           }
383         }
384         if (entry != null) {
385           allPerms.put(entry, perms);
386         }
387         if (!hasNext) {
388           break;
389         }
390       }
391     } finally {
392       if (iScanner != null) {
393         iScanner.close();
394       }
395     }
396 
397     return allPerms;
398   }
399 
400   /**
401    * Load all permissions from the region server holding {@code _acl_},
402    * primarily intended for testing purposes.
403    */
404   static Map<byte[], ListMultimap<String,TablePermission>> loadAll(
405       Configuration conf) throws IOException {
406     Map<byte[], ListMultimap<String,TablePermission>> allPerms =
407         new TreeMap<byte[], ListMultimap<String,TablePermission>>(Bytes.BYTES_RAWCOMPARATOR);
408 
409     // do a full scan of _acl_, filtering on only first table region rows
410 
411     Scan scan = new Scan();
412     scan.addFamily(ACL_LIST_FAMILY);
413 
414     ResultScanner scanner = null;
415     // TODO: Pass in a Connection rather than create one each time.
416     try (Connection connection = ConnectionFactory.createConnection(conf)) {
417       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
418         scanner = table.getScanner(scan);
419         try {
420           for (Result row : scanner) {
421             ListMultimap<String,TablePermission> resultPerms = parsePermissions(row.getRow(), row);
422             allPerms.put(row.getRow(), resultPerms);
423           }
424         } finally {
425           if (scanner != null) scanner.close();
426         }
427       }
428     }
429 
430     return allPerms;
431   }
432 
433   static ListMultimap<String, TablePermission> getTablePermissions(Configuration conf,
434         TableName tableName) throws IOException {
435     return getPermissions(conf, tableName != null ? tableName.getName() : null);
436   }
437 
438   static ListMultimap<String, TablePermission> getNamespacePermissions(Configuration conf,
439         String namespace) throws IOException {
440     return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)));
441   }
442 
443   /**
444    * Reads user permission assignments stored in the <code>l:</code> column
445    * family of the first table row in <code>_acl_</code>.
446    *
447    * <p>
448    * See {@link AccessControlLists class documentation} for the key structure
449    * used for storage.
450    * </p>
451    */
452   static ListMultimap<String, TablePermission> getPermissions(Configuration conf,
453       byte[] entryName) throws IOException {
454     if (entryName == null) entryName = ACL_GLOBAL_NAME;
455 
456     // for normal user tables, we just read the table row from _acl_
457     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
458     // TODO: Pass in a Connection rather than create one each time.
459     try (Connection connection = ConnectionFactory.createConnection(conf)) {
460       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
461         Get get = new Get(entryName);
462         get.addFamily(ACL_LIST_FAMILY);
463         Result row = table.get(get);
464         if (!row.isEmpty()) {
465           perms = parsePermissions(entryName, row);
466         } else {
467           LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
468               + Bytes.toString(entryName));
469         }
470       }
471     }
472 
473     return perms;
474   }
475 
476   /**
477    * Returns the currently granted permissions for a given table as a list of
478    * user plus associated permissions.
479    */
480   static List<UserPermission> getUserTablePermissions(
481       Configuration conf, TableName tableName) throws IOException {
482     return getUserPermissions(conf, tableName == null ? null : tableName.getName());
483   }
484 
485   static List<UserPermission> getUserNamespacePermissions(
486       Configuration conf, String namespace) throws IOException {
487     return getUserPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)));
488   }
489 
490   static List<UserPermission> getUserPermissions(
491       Configuration conf, byte[] entryName)
492   throws IOException {
493     ListMultimap<String,TablePermission> allPerms = getPermissions(
494       conf, entryName);
495 
496     List<UserPermission> perms = new ArrayList<UserPermission>();
497 
498     for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
499       UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
500           entry.getValue().getTableName(), entry.getValue().getFamily(),
501           entry.getValue().getQualifier(), entry.getValue().getActions());
502       perms.add(up);
503     }
504     return perms;
505   }
506 
507   private static ListMultimap<String, TablePermission> parsePermissions(
508       byte[] entryName, Result result) {
509     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
510     if (result != null && result.size() > 0) {
511       for (Cell kv : result.rawCells()) {
512 
513         Pair<String,TablePermission> permissionsOfUserOnTable =
514             parsePermissionRecord(entryName, kv);
515 
516         if (permissionsOfUserOnTable != null) {
517           String username = permissionsOfUserOnTable.getFirst();
518           TablePermission permissions = permissionsOfUserOnTable.getSecond();
519           perms.put(username, permissions);
520         }
521       }
522     }
523     return perms;
524   }
525 
526   private static Pair<String, TablePermission> parsePermissionRecord(
527       byte[] entryName, Cell kv) {
528     // return X given a set of permissions encoded in the permissionRecord kv.
529     byte[] family = CellUtil.cloneFamily(kv);
530 
531     if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
532       return null;
533     }
534 
535     byte[] key = CellUtil.cloneQualifier(kv);
536     byte[] value = CellUtil.cloneValue(kv);
537     if (LOG.isDebugEnabled()) {
538       LOG.debug("Read acl: kv ["+
539                 Bytes.toStringBinary(key)+": "+
540                 Bytes.toStringBinary(value)+"]");
541     }
542 
543     // check for a column family appended to the key
544     // TODO: avoid the string conversion to make this more efficient
545     String username = Bytes.toString(key);
546 
547     //Handle namespace entry
548     if(isNamespaceEntry(entryName)) {
549       return new Pair<String, TablePermission>(username,
550           new TablePermission(Bytes.toString(fromNamespaceEntry(entryName)), value));
551     }
552 
553     //Handle table and global entry
554     //TODO global entry should be handled differently
555     int idx = username.indexOf(ACL_KEY_DELIMITER);
556     byte[] permFamily = null;
557     byte[] permQualifier = null;
558     if (idx > 0 && idx < username.length()-1) {
559       String remainder = username.substring(idx+1);
560       username = username.substring(0, idx);
561       idx = remainder.indexOf(ACL_KEY_DELIMITER);
562       if (idx > 0 && idx < remainder.length()-1) {
563         permFamily = Bytes.toBytes(remainder.substring(0, idx));
564         permQualifier = Bytes.toBytes(remainder.substring(idx+1));
565       } else {
566         permFamily = Bytes.toBytes(remainder);
567       }
568     }
569 
570     return new Pair<String,TablePermission>(username,
571         new TablePermission(TableName.valueOf(entryName), permFamily, permQualifier, value));
572   }
573 
574   /**
575    * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances
576    * and returns the resulting byte array.
577    *
578    * Writes a set of permission [user: table permission]
579    */
580   public static byte[] writePermissionsAsBytes(ListMultimap<String, TablePermission> perms,
581       Configuration conf) {
582     return ProtobufUtil.prependPBMagic(ProtobufUtil.toUserTablePermissions(perms).toByteArray());
583   }
584 
585   /**
586    * Reads a set of permissions as {@link org.apache.hadoop.io.Writable} instances
587    * from the input stream.
588    */
589   public static ListMultimap<String, TablePermission> readPermissions(byte[] data,
590       Configuration conf)
591   throws DeserializationException {
592     if (ProtobufUtil.isPBMagicPrefix(data)) {
593       int pblen = ProtobufUtil.lengthOfPBMagic();
594       try {
595         AccessControlProtos.UsersAndPermissions perms =
596           AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
597             data, pblen, data.length - pblen).build();
598         return ProtobufUtil.toUserTablePermissions(perms);
599       } catch (InvalidProtocolBufferException e) {
600         throw new DeserializationException(e);
601       }
602     } else {
603       ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
604       try {
605         DataInput in = new DataInputStream(new ByteArrayInputStream(data));
606         int length = in.readInt();
607         for (int i=0; i<length; i++) {
608           String user = Text.readString(in);
609           List<TablePermission> userPerms =
610             (List)HbaseObjectWritableFor96Migration.readObject(in, conf);
611           perms.putAll(user, userPerms);
612         }
613       } catch (IOException e) {
614         throw new DeserializationException(e);
615       }
616       return perms;
617     }
618   }
619 
620   public static boolean isNamespaceEntry(String entryName) {
621     return entryName.charAt(0) == NAMESPACE_PREFIX;
622   }
623 
624   public static boolean isNamespaceEntry(byte[] entryName) {
625     return entryName[0] == NAMESPACE_PREFIX;
626   }
627 
628   public static String toNamespaceEntry(String namespace) {
629      return NAMESPACE_PREFIX + namespace;
630    }
631 
632    public static String fromNamespaceEntry(String namespace) {
633      if(namespace.charAt(0) != NAMESPACE_PREFIX)
634        throw new IllegalArgumentException("Argument is not a valid namespace entry");
635      return namespace.substring(1);
636    }
637 
638    public static byte[] toNamespaceEntry(byte[] namespace) {
639      byte[] ret = new byte[namespace.length+1];
640      ret[0] = NAMESPACE_PREFIX;
641      System.arraycopy(namespace, 0, ret, 1, namespace.length);
642      return ret;
643    }
644 
645    public static byte[] fromNamespaceEntry(byte[] namespace) {
646      if(namespace[0] != NAMESPACE_PREFIX) {
647        throw new IllegalArgumentException("Argument is not a valid namespace entry: " +
648            Bytes.toString(namespace));
649      }
650      return Arrays.copyOfRange(namespace, 1, namespace.length);
651    }
652 
653    public static List<Permission> getCellPermissionsForUser(User user, Cell cell)
654        throws IOException {
655      // Save an object allocation where we can
656      if (cell.getTagsLength() == 0) {
657        return null;
658      }
659      List<Permission> results = Lists.newArrayList();
660      Iterator<Tag> tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
661         cell.getTagsLength());
662      while (tagsIterator.hasNext()) {
663        Tag tag = tagsIterator.next();
664        if (tag.getType() == ACL_TAG_TYPE) {
665          // Deserialize the table permissions from the KV
666          ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
667            AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
668              tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
669          // Are there permissions for this user?
670          List<Permission> userPerms = kvPerms.get(user.getShortName());
671          if (userPerms != null) {
672            results.addAll(userPerms);
673          }
674          // Are there permissions for any of the groups this user belongs to?
675          String groupNames[] = user.getGroupNames();
676          if (groupNames != null) {
677            for (String group : groupNames) {
678              List<Permission> groupPerms = kvPerms.get(AuthUtil.toGroupEntry(group));
679              if (results != null) {
680                results.addAll(groupPerms);
681              }
682            }
683          }
684        }
685      }
686      return results;
687    }
688 }