View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.access;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.TreeMap;
32  import java.util.TreeSet;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.CellUtil;
39  import org.apache.hadoop.hbase.HColumnDescriptor;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.NamespaceDescriptor;
43  import org.apache.hadoop.hbase.TableName;
44  import org.apache.hadoop.hbase.Tag;
45  import org.apache.hadoop.hbase.TagType;
46  import org.apache.hadoop.hbase.classification.InterfaceAudience;
47  import org.apache.hadoop.hbase.client.Connection;
48  import org.apache.hadoop.hbase.client.ConnectionFactory;
49  import org.apache.hadoop.hbase.client.Delete;
50  import org.apache.hadoop.hbase.client.Get;
51  import org.apache.hadoop.hbase.client.Put;
52  import org.apache.hadoop.hbase.client.Result;
53  import org.apache.hadoop.hbase.client.ResultScanner;
54  import org.apache.hadoop.hbase.client.Scan;
55  import org.apache.hadoop.hbase.client.Table;
56  import org.apache.hadoop.hbase.exceptions.DeserializationException;
57  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
58  import org.apache.hadoop.hbase.filter.QualifierFilter;
59  import org.apache.hadoop.hbase.filter.RegexStringComparator;
60  import org.apache.hadoop.hbase.master.MasterServices;
61  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
62  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
63  import org.apache.hadoop.hbase.regionserver.BloomType;
64  import org.apache.hadoop.hbase.regionserver.HRegion;
65  import org.apache.hadoop.hbase.regionserver.InternalScanner;
66  import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
67  import org.apache.hadoop.hbase.security.User;
68  import org.apache.hadoop.hbase.util.Bytes;
69  import org.apache.hadoop.hbase.util.Pair;
70  import org.apache.hadoop.io.Text;
71  
72  import com.google.common.collect.ArrayListMultimap;
73  import com.google.common.collect.ListMultimap;
74  import com.google.common.collect.Lists;
75  import com.google.protobuf.InvalidProtocolBufferException;
76  
77  /**
78   * Maintains lists of permission grants to users and groups to allow for
79   * authorization checks by {@link AccessController}.
80   *
81   * <p>
82   * Access control lists are stored in an "internal" metadata table named
83   * {@code _acl_}. Each table's permission grants are stored as a separate row,
84   * keyed by the table name. KeyValues for permissions assignments are stored
85   * in one of the formats:
86   * <pre>
87   * Key                      Desc
88   * --------                 --------
89   * user                     table level permissions for a user [R=read, W=write]
90   * group                    table level permissions for a group
91   * user,family              column family level permissions for a user
92   * group,family             column family level permissions for a group
93   * user,family,qualifier    column qualifier level permissions for a user
94   * group,family,qualifier   column qualifier level permissions for a group
95   * </pre>
96   * All values are encoded as byte arrays containing the codes from the
97   * org.apache.hadoop.hbase.security.access.TablePermission.Action enum.
98   * </p>
99   */
100 @InterfaceAudience.Private
101 public class AccessControlLists {
102   /** Internal storage table for access control lists */
103   public static final TableName ACL_TABLE_NAME =
104       TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
105   public static final byte[] ACL_GLOBAL_NAME = ACL_TABLE_NAME.getName();
106   /** Column family used to store ACL grants */
107   public static final String ACL_LIST_FAMILY_STR = "l";
108   public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
109   /** KV tag to store per cell access control lists */
110   public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE;
111 
112   public static final char NAMESPACE_PREFIX = '@';
113 
114   /**
115    * Delimiter to separate user, column family, and qualifier in
116    * _acl_ table info: column keys */
117   public static final char ACL_KEY_DELIMITER = ',';
118   /** Prefix character to denote group names */
119   public static final String GROUP_PREFIX = "@";
120   /** Configuration key for superusers */
121   public static final String SUPERUSER_CONF_KEY = "hbase.superuser";
122 
123   private static Log LOG = LogFactory.getLog(AccessControlLists.class);
124 
125   /**
126    * Create the ACL table
127    * @param master
128    * @throws IOException
129    */
130   static void createACLTable(MasterServices master) throws IOException {
131     master.createTable(new HTableDescriptor(ACL_TABLE_NAME)
132       .addFamily(new HColumnDescriptor(ACL_LIST_FAMILY)
133         .setMaxVersions(1)
134         .setInMemory(true)
135         .setBlockCacheEnabled(true)
136         .setBlocksize(8 * 1024)
137         .setBloomFilterType(BloomType.NONE)
138         .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
139         // Set cache data blocks in L1 if more than one cache tier deployed; e.g. this will
140         // be the case if we are using CombinedBlockCache (Bucket Cache).
141         .setCacheDataInL1(true)),
142     null);
143   }
144 
145   /**
146    * Stores a new user permission grant in the access control lists table.
147    * @param conf the configuration
148    * @param userPerm the details of the permission to be granted
149    * @throws IOException in the case of an error accessing the metadata table
150    */
151   static void addUserPermission(Configuration conf, UserPermission userPerm)
152       throws IOException {
153     Permission.Action[] actions = userPerm.getActions();
154     byte[] rowKey = userPermissionRowKey(userPerm);
155     Put p = new Put(rowKey);
156     byte[] key = userPermissionKey(userPerm);
157 
158     if ((actions == null) || (actions.length == 0)) {
159       String msg = "No actions associated with user '" + Bytes.toString(userPerm.getUser()) + "'";
160       LOG.warn(msg);
161       throw new IOException(msg);
162     }
163 
164     byte[] value = new byte[actions.length];
165     for (int i = 0; i < actions.length; i++) {
166       value[i] = actions[i].code();
167     }
168     p.addImmutable(ACL_LIST_FAMILY, key, value);
169     if (LOG.isDebugEnabled()) {
170       LOG.debug("Writing permission with rowKey "+
171           Bytes.toString(rowKey)+" "+
172           Bytes.toString(key)+": "+Bytes.toStringBinary(value)
173       );
174     }
175     // TODO: Pass in a Connection rather than create one each time.
176     try (Connection connection = ConnectionFactory.createConnection(conf)) {
177       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
178         table.put(p);
179       }
180     }
181   }
182 
183   /**
184    * Removes a previously granted permission from the stored access control
185    * lists.  The {@link TablePermission} being removed must exactly match what
186    * is stored -- no wildcard matching is attempted.  Ie, if user "bob" has
187    * been granted "READ" access to the "data" table, but only to column family
188    * plus qualifier "info:colA", then trying to call this method with only
189    * user "bob" and the table name "data" (but without specifying the
190    * column qualifier "info:colA") will have no effect.
191    *
192    * @param conf the configuration
193    * @param userPerm the details of the permission to be revoked
194    * @throws IOException if there is an error accessing the metadata table
195    */
196   static void removeUserPermission(Configuration conf, UserPermission userPerm)
197       throws IOException {
198     Delete d = new Delete(userPermissionRowKey(userPerm));
199     byte[] key = userPermissionKey(userPerm);
200 
201     if (LOG.isDebugEnabled()) {
202       LOG.debug("Removing permission "+ userPerm.toString());
203     }
204     d.addColumns(ACL_LIST_FAMILY, key);
205     // TODO: Pass in a Connection rather than create one each time.
206     try (Connection connection = ConnectionFactory.createConnection(conf)) {
207       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
208         table.delete(d);
209       }
210     }
211   }
212 
213   /**
214    * Remove specified table from the _acl_ table.
215    */
216   static void removeTablePermissions(Configuration conf, TableName tableName)
217       throws IOException{
218     Delete d = new Delete(tableName.getName());
219 
220     if (LOG.isDebugEnabled()) {
221       LOG.debug("Removing permissions of removed table "+ tableName);
222     }
223     // TODO: Pass in a Connection rather than create one each time.
224     try (Connection connection = ConnectionFactory.createConnection(conf)) {
225       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
226         table.delete(d);
227       }
228     }
229   }
230 
231   /**
232    * Remove specified namespace from the acl table.
233    */
234   static void removeNamespacePermissions(Configuration conf, String namespace)
235       throws IOException{
236     Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
237 
238     if (LOG.isDebugEnabled()) {
239       LOG.debug("Removing permissions of removed namespace "+ namespace);
240     }
241 
242     try (Connection connection = ConnectionFactory.createConnection(conf)) {
243       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
244         table.delete(d);
245       }
246     }
247   }
248 
249   /**
250    * Remove specified table column from the acl table.
251    */
252   static void removeTablePermissions(Configuration conf, TableName tableName, byte[] column)
253       throws IOException{
254 
255     if (LOG.isDebugEnabled()) {
256       LOG.debug("Removing permissions of removed column " + Bytes.toString(column) +
257                 " from table "+ tableName);
258     }
259     // TODO: Pass in a Connection rather than create one each time.
260     try (Connection connection = ConnectionFactory.createConnection(conf)) {
261       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
262         Scan scan = new Scan();
263         scan.addFamily(ACL_LIST_FAMILY);
264 
265         String columnName = Bytes.toString(column);
266         scan.setFilter(new QualifierFilter(CompareOp.EQUAL, new RegexStringComparator(
267             String.format("(%s%s%s)|(%s%s)$",
268                 ACL_KEY_DELIMITER, columnName, ACL_KEY_DELIMITER,
269                 ACL_KEY_DELIMITER, columnName))));
270 
271         Set<byte[]> qualifierSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
272         ResultScanner scanner = table.getScanner(scan);
273         try {
274           for (Result res : scanner) {
275             for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
276               qualifierSet.add(q);
277             }
278           }
279         } finally {
280           scanner.close();
281         }
282 
283         if (qualifierSet.size() > 0) {
284           Delete d = new Delete(tableName.getName());
285           for (byte[] qualifier : qualifierSet) {
286             d.addColumns(ACL_LIST_FAMILY, qualifier);
287           }
288           table.delete(d);
289         }
290       }
291     }
292   }
293 
294   static byte[] userPermissionRowKey(UserPermission userPerm) {
295     byte[] row;
296     if(userPerm.hasNamespace()) {
297       row = Bytes.toBytes(toNamespaceEntry(userPerm.getNamespace()));
298     } else if(userPerm.isGlobal()) {
299       row = ACL_GLOBAL_NAME;
300     } else {
301       row = userPerm.getTableName().getName();
302     }
303     return row;
304   }
305 
306   /**
307    * Build qualifier key from user permission:
308    *  username
309    *  username,family
310    *  username,family,qualifier
311    */
312   static byte[] userPermissionKey(UserPermission userPerm) {
313     byte[] qualifier = userPerm.getQualifier();
314     byte[] family = userPerm.getFamily();
315     byte[] key = userPerm.getUser();
316 
317     if (family != null && family.length > 0) {
318       key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, family));
319       if (qualifier != null && qualifier.length > 0) {
320         key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, qualifier));
321       }
322     }
323 
324     return key;
325   }
326 
327   /**
328    * Returns {@code true} if the given region is part of the {@code _acl_}
329    * metadata table.
330    */
331   static boolean isAclRegion(HRegion region) {
332     return ACL_TABLE_NAME.equals(region.getTableDesc().getTableName());
333   }
334 
335   /**
336    * Returns {@code true} if the given table is {@code _acl_} metadata table.
337    */
338   static boolean isAclTable(HTableDescriptor desc) {
339     return ACL_TABLE_NAME.equals(desc.getTableName());
340   }
341 
342   /**
343    * Loads all of the permission grants stored in a region of the {@code _acl_}
344    * table.
345    *
346    * @param aclRegion
347    * @return a map of the permissions for this table.
348    * @throws IOException
349    */
350   static Map<byte[], ListMultimap<String,TablePermission>> loadAll(
351       HRegion aclRegion)
352     throws IOException {
353 
354     if (!isAclRegion(aclRegion)) {
355       throw new IOException("Can only load permissions from "+ACL_TABLE_NAME);
356     }
357 
358     Map<byte[], ListMultimap<String, TablePermission>> allPerms =
359         new TreeMap<byte[], ListMultimap<String, TablePermission>>(Bytes.BYTES_RAWCOMPARATOR);
360 
361     // do a full scan of _acl_ table
362 
363     Scan scan = new Scan();
364     scan.addFamily(ACL_LIST_FAMILY);
365 
366     InternalScanner iScanner = null;
367     try {
368       iScanner = aclRegion.getScanner(scan);
369 
370       while (true) {
371         List<Cell> row = new ArrayList<Cell>();
372 
373         boolean hasNext = NextState.hasMoreValues(iScanner.next(row));
374         ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
375         byte[] entry = null;
376         for (Cell kv : row) {
377           if (entry == null) {
378             entry = CellUtil.cloneRow(kv);
379           }
380           Pair<String,TablePermission> permissionsOfUserOnTable =
381               parsePermissionRecord(entry, kv);
382           if (permissionsOfUserOnTable != null) {
383             String username = permissionsOfUserOnTable.getFirst();
384             TablePermission permissions = permissionsOfUserOnTable.getSecond();
385             perms.put(username, permissions);
386           }
387         }
388         if (entry != null) {
389           allPerms.put(entry, perms);
390         }
391         if (!hasNext) {
392           break;
393         }
394       }
395     } finally {
396       if (iScanner != null) {
397         iScanner.close();
398       }
399     }
400 
401     return allPerms;
402   }
403 
404   /**
405    * Load all permissions from the region server holding {@code _acl_},
406    * primarily intended for testing purposes.
407    */
408   static Map<byte[], ListMultimap<String,TablePermission>> loadAll(
409       Configuration conf) throws IOException {
410     Map<byte[], ListMultimap<String,TablePermission>> allPerms =
411         new TreeMap<byte[], ListMultimap<String,TablePermission>>(Bytes.BYTES_RAWCOMPARATOR);
412 
413     // do a full scan of _acl_, filtering on only first table region rows
414 
415     Scan scan = new Scan();
416     scan.addFamily(ACL_LIST_FAMILY);
417 
418     ResultScanner scanner = null;
419     // TODO: Pass in a Connection rather than create one each time.
420     try (Connection connection = ConnectionFactory.createConnection(conf)) {
421       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
422         scanner = table.getScanner(scan);
423         try {
424           for (Result row : scanner) {
425             ListMultimap<String,TablePermission> resultPerms = parsePermissions(row.getRow(), row);
426             allPerms.put(row.getRow(), resultPerms);
427           }
428         } finally {
429           if (scanner != null) scanner.close();
430         }
431       }
432     }
433 
434     return allPerms;
435   }
436 
437   static ListMultimap<String, TablePermission> getTablePermissions(Configuration conf,
438         TableName tableName) throws IOException {
439     return getPermissions(conf, tableName != null ? tableName.getName() : null);
440   }
441 
442   static ListMultimap<String, TablePermission> getNamespacePermissions(Configuration conf,
443         String namespace) throws IOException {
444     return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)));
445   }
446 
447   /**
448    * Reads user permission assignments stored in the <code>l:</code> column
449    * family of the first table row in <code>_acl_</code>.
450    *
451    * <p>
452    * See {@link AccessControlLists class documentation} for the key structure
453    * used for storage.
454    * </p>
455    */
456   static ListMultimap<String, TablePermission> getPermissions(Configuration conf,
457       byte[] entryName) throws IOException {
458     if (entryName == null) entryName = ACL_GLOBAL_NAME;
459 
460     // for normal user tables, we just read the table row from _acl_
461     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
462     // TODO: Pass in a Connection rather than create one each time.
463     try (Connection connection = ConnectionFactory.createConnection(conf)) {
464       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
465         Get get = new Get(entryName);
466         get.addFamily(ACL_LIST_FAMILY);
467         Result row = table.get(get);
468         if (!row.isEmpty()) {
469           perms = parsePermissions(entryName, row);
470         } else {
471           LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
472               + Bytes.toString(entryName));
473         }
474       }
475     }
476 
477     return perms;
478   }
479 
480   /**
481    * Returns the currently granted permissions for a given table as a list of
482    * user plus associated permissions.
483    */
484   static List<UserPermission> getUserTablePermissions(
485       Configuration conf, TableName tableName) throws IOException {
486     return getUserPermissions(conf, tableName == null ? null : tableName.getName());
487   }
488 
489   static List<UserPermission> getUserNamespacePermissions(
490       Configuration conf, String namespace) throws IOException {
491     return getUserPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)));
492   }
493 
494   static List<UserPermission> getUserPermissions(
495       Configuration conf, byte[] entryName)
496   throws IOException {
497     ListMultimap<String,TablePermission> allPerms = getPermissions(
498       conf, entryName);
499 
500     List<UserPermission> perms = new ArrayList<UserPermission>();
501 
502     for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
503       UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
504           entry.getValue().getTableName(), entry.getValue().getFamily(),
505           entry.getValue().getQualifier(), entry.getValue().getActions());
506       perms.add(up);
507     }
508     return perms;
509   }
510 
511   private static ListMultimap<String, TablePermission> parsePermissions(
512       byte[] entryName, Result result) {
513     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
514     if (result != null && result.size() > 0) {
515       for (Cell kv : result.rawCells()) {
516 
517         Pair<String,TablePermission> permissionsOfUserOnTable =
518             parsePermissionRecord(entryName, kv);
519 
520         if (permissionsOfUserOnTable != null) {
521           String username = permissionsOfUserOnTable.getFirst();
522           TablePermission permissions = permissionsOfUserOnTable.getSecond();
523           perms.put(username, permissions);
524         }
525       }
526     }
527     return perms;
528   }
529 
530   private static Pair<String, TablePermission> parsePermissionRecord(
531       byte[] entryName, Cell kv) {
532     // return X given a set of permissions encoded in the permissionRecord kv.
533     byte[] family = CellUtil.cloneFamily(kv);
534 
535     if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
536       return null;
537     }
538 
539     byte[] key = CellUtil.cloneQualifier(kv);
540     byte[] value = CellUtil.cloneValue(kv);
541     if (LOG.isDebugEnabled()) {
542       LOG.debug("Read acl: kv ["+
543                 Bytes.toStringBinary(key)+": "+
544                 Bytes.toStringBinary(value)+"]");
545     }
546 
547     // check for a column family appended to the key
548     // TODO: avoid the string conversion to make this more efficient
549     String username = Bytes.toString(key);
550 
551     //Handle namespace entry
552     if(isNamespaceEntry(entryName)) {
553       return new Pair<String, TablePermission>(username,
554           new TablePermission(Bytes.toString(fromNamespaceEntry(entryName)), value));
555     }
556 
557     //Handle table and global entry
558     //TODO global entry should be handled differently
559     int idx = username.indexOf(ACL_KEY_DELIMITER);
560     byte[] permFamily = null;
561     byte[] permQualifier = null;
562     if (idx > 0 && idx < username.length()-1) {
563       String remainder = username.substring(idx+1);
564       username = username.substring(0, idx);
565       idx = remainder.indexOf(ACL_KEY_DELIMITER);
566       if (idx > 0 && idx < remainder.length()-1) {
567         permFamily = Bytes.toBytes(remainder.substring(0, idx));
568         permQualifier = Bytes.toBytes(remainder.substring(idx+1));
569       } else {
570         permFamily = Bytes.toBytes(remainder);
571       }
572     }
573 
574     return new Pair<String,TablePermission>(username,
575         new TablePermission(TableName.valueOf(entryName), permFamily, permQualifier, value));
576   }
577 
578   /**
579    * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances
580    * and returns the resulting byte array.
581    *
582    * Writes a set of permission [user: table permission]
583    */
584   public static byte[] writePermissionsAsBytes(ListMultimap<String, TablePermission> perms,
585       Configuration conf) {
586     return ProtobufUtil.prependPBMagic(ProtobufUtil.toUserTablePermissions(perms).toByteArray());
587   }
588 
589   /**
590    * Reads a set of permissions as {@link org.apache.hadoop.io.Writable} instances
591    * from the input stream.
592    */
593   public static ListMultimap<String, TablePermission> readPermissions(byte[] data,
594       Configuration conf)
595   throws DeserializationException {
596     if (ProtobufUtil.isPBMagicPrefix(data)) {
597       int pblen = ProtobufUtil.lengthOfPBMagic();
598       try {
599         AccessControlProtos.UsersAndPermissions perms =
600           AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
601             data, pblen, data.length - pblen).build();
602         return ProtobufUtil.toUserTablePermissions(perms);
603       } catch (InvalidProtocolBufferException e) {
604         throw new DeserializationException(e);
605       }
606     } else {
607       ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
608       try {
609         DataInput in = new DataInputStream(new ByteArrayInputStream(data));
610         int length = in.readInt();
611         for (int i=0; i<length; i++) {
612           String user = Text.readString(in);
613           List<TablePermission> userPerms =
614             (List)HbaseObjectWritableFor96Migration.readObject(in, conf);
615           perms.putAll(user, userPerms);
616         }
617       } catch (IOException e) {
618         throw new DeserializationException(e);
619       }
620       return perms;
621     }
622   }
623 
624   /**
625    * Returns whether or not the given name should be interpreted as a group
626    * principal.  Currently this simply checks if the name starts with the
627    * special group prefix character ("@").
628    */
629   public static boolean isGroupPrincipal(String name) {
630     return name != null && name.startsWith(GROUP_PREFIX);
631   }
632 
633   /**
634    * Returns the actual name for a group principal (stripped of the
635    * group prefix).
636    */
637   public static String getGroupName(String aclKey) {
638     if (!isGroupPrincipal(aclKey)) {
639       return aclKey;
640     }
641 
642     return aclKey.substring(GROUP_PREFIX.length());
643   }
644 
645   /**
646    * Returns the group entry with the group prefix for a group principal.
647    */
648   public static String toGroupEntry(String name) {
649     return GROUP_PREFIX + name;
650   }
651 
652   public static boolean isNamespaceEntry(String entryName) {
653     return entryName.charAt(0) == NAMESPACE_PREFIX;
654   }
655 
656   public static boolean isNamespaceEntry(byte[] entryName) {
657     return entryName[0] == NAMESPACE_PREFIX;
658   }
659 
660   public static String toNamespaceEntry(String namespace) {
661      return NAMESPACE_PREFIX + namespace;
662    }
663 
664    public static String fromNamespaceEntry(String namespace) {
665      if(namespace.charAt(0) != NAMESPACE_PREFIX)
666        throw new IllegalArgumentException("Argument is not a valid namespace entry");
667      return namespace.substring(1);
668    }
669 
670    public static byte[] toNamespaceEntry(byte[] namespace) {
671      byte[] ret = new byte[namespace.length+1];
672      ret[0] = NAMESPACE_PREFIX;
673      System.arraycopy(namespace, 0, ret, 1, namespace.length);
674      return ret;
675    }
676 
677    public static byte[] fromNamespaceEntry(byte[] namespace) {
678      if(namespace[0] != NAMESPACE_PREFIX) {
679        throw new IllegalArgumentException("Argument is not a valid namespace entry: " +
680            Bytes.toString(namespace));
681      }
682      return Arrays.copyOfRange(namespace, 1, namespace.length);
683    }
684 
685    public static List<Permission> getCellPermissionsForUser(User user, Cell cell)
686        throws IOException {
687      // Save an object allocation where we can
688      if (cell.getTagsLength() == 0) {
689        return null;
690      }
691      List<Permission> results = Lists.newArrayList();
692      Iterator<Tag> tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
693         cell.getTagsLength());
694      while (tagsIterator.hasNext()) {
695        Tag tag = tagsIterator.next();
696        if (tag.getType() == ACL_TAG_TYPE) {
697          // Deserialize the table permissions from the KV
698          ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
699            AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
700              tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
701          // Are there permissions for this user?
702          List<Permission> userPerms = kvPerms.get(user.getShortName());
703          if (userPerms != null) {
704            results.addAll(userPerms);
705          }
706          // Are there permissions for any of the groups this user belongs to?
707          String groupNames[] = user.getGroupNames();
708          if (groupNames != null) {
709            for (String group : groupNames) {
710              List<Permission> groupPerms = kvPerms.get(GROUP_PREFIX + group);
711              if (results != null) {
712                results.addAll(groupPerms);
713              }
714            }
715          }
716        }
717      }
718      return results;
719    }
720 }