001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security.access;
020
021import java.io.ByteArrayInputStream;
022import java.io.DataInput;
023import java.io.DataInputStream;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.TreeMap;
033import java.util.TreeSet;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.AuthUtil;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.Cell.Type;
039import org.apache.hadoop.hbase.CellBuilderFactory;
040import org.apache.hadoop.hbase.CellBuilderType;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.CompareOperator;
043import org.apache.hadoop.hbase.NamespaceDescriptor;
044import org.apache.hadoop.hbase.PrivateCellUtil;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.Tag;
047import org.apache.hadoop.hbase.TagType;
048import org.apache.hadoop.hbase.client.Connection;
049import org.apache.hadoop.hbase.client.ConnectionFactory;
050import org.apache.hadoop.hbase.client.Delete;
051import org.apache.hadoop.hbase.client.Get;
052import org.apache.hadoop.hbase.client.Put;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.ResultScanner;
055import org.apache.hadoop.hbase.client.Scan;
056import org.apache.hadoop.hbase.client.Table;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.exceptions.DeserializationException;
059import org.apache.hadoop.hbase.filter.QualifierFilter;
060import org.apache.hadoop.hbase.filter.RegexStringComparator;
061import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
062import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
063import org.apache.hadoop.hbase.regionserver.InternalScanner;
064import org.apache.hadoop.hbase.regionserver.Region;
065import org.apache.hadoop.hbase.security.User;
066import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
067import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
068import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
069import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
070import org.apache.hadoop.hbase.util.Bytes;
071import org.apache.hadoop.hbase.util.Pair;
072import org.apache.hadoop.io.Text;
073import org.apache.hadoop.io.Writable;
074import org.apache.hadoop.io.WritableFactories;
075import org.apache.hadoop.io.WritableUtils;
076import org.apache.yetus.audience.InterfaceAudience;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080/**
081 * Maintains lists of permission grants to users and groups to allow for
082 * authorization checks by {@link AccessController}.
083 *
084 * <p>
085 * Access control lists are stored in an "internal" metadata table named
086 * {@code _acl_}. Each table's permission grants are stored as a separate row,
087 * keyed by the table name. KeyValues for permissions assignments are stored
088 * in one of the formats:
089 * <pre>
090 * Key                      Desc
091 * --------                 --------
092 * user                     table level permissions for a user [R=read, W=write]
093 * group                    table level permissions for a group
094 * user,family              column family level permissions for a user
095 * group,family             column family level permissions for a group
096 * user,family,qualifier    column qualifier level permissions for a user
097 * group,family,qualifier   column qualifier level permissions for a group
098 * </pre>
099 * <p>
100 * All values are encoded as byte arrays containing the codes from the
101 * org.apache.hadoop.hbase.security.access.TablePermission.Action enum.
102 * </p>
103 */
104@InterfaceAudience.Private
105public class AccessControlLists {
106  /** Internal storage table for access control lists */
107  public static final TableName ACL_TABLE_NAME =
108      TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
109  public static final byte[] ACL_GLOBAL_NAME = ACL_TABLE_NAME.getName();
110  /** Column family used to store ACL grants */
111  public static final String ACL_LIST_FAMILY_STR = "l";
112  public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
113  /** KV tag to store per cell access control lists */
114  public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE;
115
116  public static final char NAMESPACE_PREFIX = '@';
117
118  /**
119   * Delimiter to separate user, column family, and qualifier in
120   * _acl_ table info: column keys */
121  public static final char ACL_KEY_DELIMITER = ',';
122
123  private static final Logger LOG = LoggerFactory.getLogger(AccessControlLists.class);
124
125  /**
126   * Stores a new user permission grant in the access control lists table.
127   * @param conf the configuration
128   * @param userPerm the details of the permission to be granted
129   * @param t acl table instance. It is closed upon method return.
130   * @throws IOException in the case of an error accessing the metadata table
131   */
132  static void addUserPermission(Configuration conf, UserPermission userPerm, Table t,
133                                boolean mergeExistingPermissions) throws IOException {
134    Permission.Action[] actions = userPerm.getActions();
135    byte[] rowKey = userPermissionRowKey(userPerm);
136    Put p = new Put(rowKey);
137    byte[] key = userPermissionKey(userPerm);
138
139    if ((actions == null) || (actions.length == 0)) {
140      String msg = "No actions associated with user '" + Bytes.toString(userPerm.getUser()) + "'";
141      LOG.warn(msg);
142      throw new IOException(msg);
143    }
144
145    Set<Permission.Action> actionSet = new TreeSet<Permission.Action>();
146    if(mergeExistingPermissions){
147      List<UserPermission> perms = getUserPermissions(conf, rowKey);
148      UserPermission currentPerm = null;
149      for (UserPermission perm : perms) {
150        if (Bytes.equals(perm.getUser(), userPerm.getUser())
151                && ((userPerm.isGlobal() && ACL_TABLE_NAME.equals(perm.getTableName()))
152                || perm.tableFieldsEqual(userPerm))) {
153          currentPerm = perm;
154          break;
155        }
156      }
157
158      if(currentPerm != null && currentPerm.getActions() != null){
159        actionSet.addAll(Arrays.asList(currentPerm.getActions()));
160      }
161    }
162
163    // merge current action with new action.
164    actionSet.addAll(Arrays.asList(actions));
165
166    // serialize to byte array.
167    byte[] value = new byte[actionSet.size()];
168    int index = 0;
169    for (Permission.Action action : actionSet) {
170      value[index++] = action.code();
171    }
172    p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
173        .setRow(p.getRow())
174        .setFamily(ACL_LIST_FAMILY)
175        .setQualifier(key)
176        .setTimestamp(p.getTimestamp())
177        .setType(Type.Put)
178        .setValue(value)
179        .build());
180    if (LOG.isDebugEnabled()) {
181      LOG.debug("Writing permission with rowKey "+
182          Bytes.toString(rowKey)+" "+
183          Bytes.toString(key)+": "+Bytes.toStringBinary(value)
184          );
185    }
186    try {
187      /**
188       * TODO: Use Table.put(Put) instead. This Table.put() happens within the RS. We are already in
189       * AccessController. Means already there was an RPC happened to server (Actual grant call from
190       * client side). At RpcServer we have a ThreadLocal where we keep the CallContext and inside
191       * that the current RPC called user info is set. The table on which put was called is created
192       * via the RegionCP env and that uses a special Connection. The normal RPC channel will be by
193       * passed here means there would have no further contact on to the RpcServer. So the
194       * ThreadLocal is never getting reset. We ran the new put as a super user (User.runAsLoginUser
195       * where the login user is the user who started RS process) but still as per the RPC context
196       * it is the old user. When AsyncProcess was used, the execute happen via another thread from
197       * pool and so old ThreadLocal variable is not accessible and so it looks as if no Rpc context
198       * and we were relying on the super user who starts the RS process.
199       */
200      t.put(Collections.singletonList(p));
201    } finally {
202      t.close();
203    }
204  }
205
206  static void addUserPermission(Configuration conf, UserPermission userPerm, Table t)
207          throws IOException{
208    addUserPermission(conf, userPerm, t, false);
209  }
210
211  /**
212   * Removes a previously granted permission from the stored access control
213   * lists.  The {@link TablePermission} being removed must exactly match what
214   * is stored -- no wildcard matching is attempted.  Ie, if user "bob" has
215   * been granted "READ" access to the "data" table, but only to column family
216   * plus qualifier "info:colA", then trying to call this method with only
217   * user "bob" and the table name "data" (but without specifying the
218   * column qualifier "info:colA") will have no effect.
219   *
220   * @param conf the configuration
221   * @param userPerm the details of the permission to be revoked
222   * @param t acl table
223   * @throws IOException if there is an error accessing the metadata table
224   */
225  static void removeUserPermission(Configuration conf, UserPermission userPerm, Table t)
226      throws IOException {
227    if (null == userPerm.getActions()) {
228      removePermissionRecord(conf, userPerm, t);
229    } else {
230      // Get all the global user permissions from the acl table
231      List<UserPermission> permsList = getUserPermissions(conf, userPermissionRowKey(userPerm));
232      List<Permission.Action> remainingActions = new ArrayList<>();
233      List<Permission.Action> dropActions = Arrays.asList(userPerm.getActions());
234      for (UserPermission perm : permsList) {
235        // Find the user and remove only the requested permissions
236        if (Bytes.toString(perm.getUser()).equals(Bytes.toString(userPerm.getUser()))) {
237          for (Permission.Action oldAction : perm.getActions()) {
238            if (!dropActions.contains(oldAction)) {
239              remainingActions.add(oldAction);
240            }
241          }
242          if (!remainingActions.isEmpty()) {
243            perm.setActions(remainingActions.toArray(new Permission.Action[remainingActions.size()]));
244            addUserPermission(conf, perm, t);
245          } else {
246            removePermissionRecord(conf, userPerm, t);
247          }
248          break;
249        }
250      }
251    }
252    if (LOG.isDebugEnabled()) {
253      LOG.debug("Removed permission "+ userPerm.toString());
254    }
255  }
256
257  private static void removePermissionRecord(Configuration conf, UserPermission userPerm, Table t)
258      throws IOException {
259    Delete d = new Delete(userPermissionRowKey(userPerm));
260    d.addColumns(ACL_LIST_FAMILY, userPermissionKey(userPerm));
261    try {
262      /**
263       * We need to run the ACL delete in superuser context, to have
264       * similar authorization logic to addUserPermission().
265       * This ensures behaviour is consistent with pre 2.1.1 and 2.2+.
266       * The permission authorization has already happened here.
267       * See the TODO comment in addUserPermission for details
268       */
269      t.delete(new ArrayList<>(Arrays.asList(d)));
270    } finally {
271      t.close();
272    }
273  }
274
275  /**
276   * Remove specified table from the _acl_ table.
277   */
278  static void removeTablePermissions(Configuration conf, TableName tableName, Table t)
279      throws IOException{
280    Delete d = new Delete(tableName.getName());
281
282    if (LOG.isDebugEnabled()) {
283      LOG.debug("Removing permissions of removed table "+ tableName);
284    }
285    try {
286      t.delete(d);
287    } finally {
288      t.close();
289    }
290  }
291
292  /**
293   * Remove specified namespace from the acl table.
294   */
295  static void removeNamespacePermissions(Configuration conf, String namespace, Table t)
296      throws IOException{
297    Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
298
299    if (LOG.isDebugEnabled()) {
300      LOG.debug("Removing permissions of removed namespace "+ namespace);
301    }
302
303    try {
304      t.delete(d);
305    } finally {
306      t.close();
307    }
308  }
309
310  static private void removeTablePermissions(TableName tableName, byte[] column, Table table,
311      boolean closeTable) throws IOException {
312    Scan scan = new Scan();
313    scan.addFamily(ACL_LIST_FAMILY);
314
315    String columnName = Bytes.toString(column);
316    scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new RegexStringComparator(
317        String.format("(%s%s%s)|(%s%s)$",
318            ACL_KEY_DELIMITER, columnName, ACL_KEY_DELIMITER,
319            ACL_KEY_DELIMITER, columnName))));
320
321    Set<byte[]> qualifierSet = new TreeSet<>(Bytes.BYTES_COMPARATOR);
322    ResultScanner scanner = null;
323    try {
324      scanner = table.getScanner(scan);
325      for (Result res : scanner) {
326        for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
327          qualifierSet.add(q);
328        }
329      }
330
331      if (qualifierSet.size() > 0) {
332        Delete d = new Delete(tableName.getName());
333        for (byte[] qualifier : qualifierSet) {
334          d.addColumns(ACL_LIST_FAMILY, qualifier);
335        }
336        table.delete(d);
337      }
338    } finally {
339      if (scanner != null) scanner.close();
340      if (closeTable) table.close();
341    }
342  }
343
344  /**
345   * Remove specified table column from the acl table.
346   */
347  static void removeTablePermissions(Configuration conf, TableName tableName, byte[] column,
348      Table t) throws IOException {
349    if (LOG.isDebugEnabled()) {
350      LOG.debug("Removing permissions of removed column " + Bytes.toString(column) +
351          " from table "+ tableName);
352    }
353    removeTablePermissions(tableName, column, t, true);
354  }
355
356  static byte[] userPermissionRowKey(UserPermission userPerm) {
357    byte[] row;
358    if(userPerm.hasNamespace()) {
359      row = Bytes.toBytes(toNamespaceEntry(userPerm.getNamespace()));
360    } else if(userPerm.isGlobal()) {
361      row = ACL_GLOBAL_NAME;
362    } else {
363      row = userPerm.getTableName().getName();
364    }
365    return row;
366  }
367
368  /**
369   * Build qualifier key from user permission:
370   *  username
371   *  username,family
372   *  username,family,qualifier
373   */
374  static byte[] userPermissionKey(UserPermission userPerm) {
375    byte[] qualifier = userPerm.getQualifier();
376    byte[] family = userPerm.getFamily();
377    byte[] key = userPerm.getUser();
378
379    if (family != null && family.length > 0) {
380      key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, family));
381      if (qualifier != null && qualifier.length > 0) {
382        key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, qualifier));
383      }
384    }
385
386    return key;
387  }
388
389  /**
390   * Returns {@code true} if the given region is part of the {@code _acl_}
391   * metadata table.
392   */
393  static boolean isAclRegion(Region region) {
394    return ACL_TABLE_NAME.equals(region.getTableDescriptor().getTableName());
395  }
396
397  /**
398   * Returns {@code true} if the given table is {@code _acl_} metadata table.
399   */
400  static boolean isAclTable(TableDescriptor desc) {
401    return ACL_TABLE_NAME.equals(desc.getTableName());
402  }
403
404  /**
405   * Loads all of the permission grants stored in a region of the {@code _acl_}
406   * table.
407   *
408   * @param aclRegion
409   * @return a map of the permissions for this table.
410   * @throws IOException
411   */
412  static Map<byte[], ListMultimap<String,TablePermission>> loadAll(Region aclRegion)
413      throws IOException {
414
415    if (!isAclRegion(aclRegion)) {
416      throw new IOException("Can only load permissions from "+ACL_TABLE_NAME);
417    }
418
419    Map<byte[], ListMultimap<String, TablePermission>> allPerms = new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
420
421    // do a full scan of _acl_ table
422
423    Scan scan = new Scan();
424    scan.addFamily(ACL_LIST_FAMILY);
425
426    InternalScanner iScanner = null;
427    try {
428      iScanner = aclRegion.getScanner(scan);
429
430      while (true) {
431        List<Cell> row = new ArrayList<>();
432
433        boolean hasNext = iScanner.next(row);
434        ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
435        byte[] entry = null;
436        for (Cell kv : row) {
437          if (entry == null) {
438            entry = CellUtil.cloneRow(kv);
439          }
440          Pair<String,TablePermission> permissionsOfUserOnTable =
441              parsePermissionRecord(entry, kv);
442          if (permissionsOfUserOnTable != null) {
443            String username = permissionsOfUserOnTable.getFirst();
444            TablePermission permissions = permissionsOfUserOnTable.getSecond();
445            perms.put(username, permissions);
446          }
447        }
448        if (entry != null) {
449          allPerms.put(entry, perms);
450        }
451        if (!hasNext) {
452          break;
453        }
454      }
455    } finally {
456      if (iScanner != null) {
457        iScanner.close();
458      }
459    }
460
461    return allPerms;
462  }
463
464  /**
465   * Load all permissions from the region server holding {@code _acl_},
466   * primarily intended for testing purposes.
467   */
468  static Map<byte[], ListMultimap<String,TablePermission>> loadAll(
469      Configuration conf) throws IOException {
470    Map<byte[], ListMultimap<String,TablePermission>> allPerms = new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
471
472    // do a full scan of _acl_, filtering on only first table region rows
473
474    Scan scan = new Scan();
475    scan.addFamily(ACL_LIST_FAMILY);
476
477    ResultScanner scanner = null;
478    // TODO: Pass in a Connection rather than create one each time.
479    try (Connection connection = ConnectionFactory.createConnection(conf)) {
480      try (Table table = connection.getTable(ACL_TABLE_NAME)) {
481        scanner = table.getScanner(scan);
482        try {
483          for (Result row : scanner) {
484            ListMultimap<String,TablePermission> resultPerms = parsePermissions(row.getRow(), row);
485            allPerms.put(row.getRow(), resultPerms);
486          }
487        } finally {
488          if (scanner != null) scanner.close();
489        }
490      }
491    }
492
493    return allPerms;
494  }
495
496  public static ListMultimap<String, TablePermission> getTablePermissions(Configuration conf,
497      TableName tableName) throws IOException {
498    return getPermissions(conf, tableName != null ? tableName.getName() : null, null);
499  }
500
501  @VisibleForTesting
502  public static ListMultimap<String, TablePermission> getNamespacePermissions(Configuration conf,
503      String namespace) throws IOException {
504    return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), null);
505  }
506
507  /**
508   * Reads user permission assignments stored in the <code>l:</code> column
509   * family of the first table row in <code>_acl_</code>.
510   *
511   * <p>
512   * See {@link AccessControlLists class documentation} for the key structure
513   * used for storage.
514   * </p>
515   */
516  static ListMultimap<String, TablePermission> getPermissions(Configuration conf,
517      byte[] entryName, Table t) throws IOException {
518    if (entryName == null) entryName = ACL_GLOBAL_NAME;
519
520    // for normal user tables, we just read the table row from _acl_
521    ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
522    Get get = new Get(entryName);
523    get.addFamily(ACL_LIST_FAMILY);
524    Result row = null;
525    if (t == null) {
526      try (Connection connection = ConnectionFactory.createConnection(conf)) {
527        try (Table table = connection.getTable(ACL_TABLE_NAME)) {
528          row = table.get(get);
529        }
530      }
531    } else {
532      row = t.get(get);
533    }
534    if (!row.isEmpty()) {
535      perms = parsePermissions(entryName, row);
536    } else {
537      LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
538          + Bytes.toString(entryName));
539    }
540
541    return perms;
542  }
543
544  /**
545   * Returns the currently granted permissions for a given table as a list of
546   * user plus associated permissions.
547   */
548  static List<UserPermission> getUserTablePermissions(
549      Configuration conf, TableName tableName) throws IOException {
550    return getUserPermissions(conf, tableName == null ? null : tableName.getName());
551  }
552
553  static List<UserPermission> getUserNamespacePermissions(
554      Configuration conf, String namespace) throws IOException {
555    return getUserPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)));
556  }
557
558  static List<UserPermission> getUserPermissions(
559      Configuration conf, byte[] entryName)
560          throws IOException {
561    ListMultimap<String,TablePermission> allPerms = getPermissions(
562        conf, entryName, null);
563
564    List<UserPermission> perms = new ArrayList<>();
565
566    if(isNamespaceEntry(entryName)) {  // Namespace
567      for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
568        UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
569            entry.getValue().getNamespace(), entry.getValue().getActions());
570        perms.add(up);
571      }
572    } else {  // Table
573      for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
574        UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
575            entry.getValue().getTableName(), entry.getValue().getFamily(),
576            entry.getValue().getQualifier(), entry.getValue().getActions());
577        perms.add(up);
578      }
579    }
580    return perms;
581  }
582
583  private static ListMultimap<String, TablePermission> parsePermissions(
584      byte[] entryName, Result result) {
585    ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
586    if (result != null && result.size() > 0) {
587      for (Cell kv : result.rawCells()) {
588
589        Pair<String,TablePermission> permissionsOfUserOnTable =
590            parsePermissionRecord(entryName, kv);
591
592        if (permissionsOfUserOnTable != null) {
593          String username = permissionsOfUserOnTable.getFirst();
594          TablePermission permissions = permissionsOfUserOnTable.getSecond();
595          perms.put(username, permissions);
596        }
597      }
598    }
599    return perms;
600  }
601
602  private static Pair<String, TablePermission> parsePermissionRecord(
603      byte[] entryName, Cell kv) {
604    // return X given a set of permissions encoded in the permissionRecord kv.
605    byte[] family = CellUtil.cloneFamily(kv);
606
607    if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
608      return null;
609    }
610
611    byte[] key = CellUtil.cloneQualifier(kv);
612    byte[] value = CellUtil.cloneValue(kv);
613    if (LOG.isDebugEnabled()) {
614      LOG.debug("Read acl: kv ["+
615          Bytes.toStringBinary(key)+": "+
616          Bytes.toStringBinary(value)+"]");
617    }
618
619    // check for a column family appended to the key
620    // TODO: avoid the string conversion to make this more efficient
621    String username = Bytes.toString(key);
622
623    //Handle namespace entry
624    if(isNamespaceEntry(entryName)) {
625      return new Pair<>(username, new TablePermission(Bytes.toString(fromNamespaceEntry(entryName)), value));
626    }
627
628    //Handle table and global entry
629    //TODO global entry should be handled differently
630    int idx = username.indexOf(ACL_KEY_DELIMITER);
631    byte[] permFamily = null;
632    byte[] permQualifier = null;
633    if (idx > 0 && idx < username.length()-1) {
634      String remainder = username.substring(idx+1);
635      username = username.substring(0, idx);
636      idx = remainder.indexOf(ACL_KEY_DELIMITER);
637      if (idx > 0 && idx < remainder.length()-1) {
638        permFamily = Bytes.toBytes(remainder.substring(0, idx));
639        permQualifier = Bytes.toBytes(remainder.substring(idx+1));
640      } else {
641        permFamily = Bytes.toBytes(remainder);
642      }
643    }
644
645    return new Pair<>(username, new TablePermission(TableName.valueOf(entryName), permFamily, permQualifier, value));
646  }
647
648  /**
649   * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances
650   * and returns the resulting byte array.
651   *
652   * Writes a set of permission [user: table permission]
653   */
654  public static byte[] writePermissionsAsBytes(ListMultimap<String, TablePermission> perms,
655      Configuration conf) {
656    return ProtobufUtil.prependPBMagic(AccessControlUtil.toUserTablePermissions(perms).toByteArray());
657  }
658
659  // This is part of the old HbaseObjectWritableFor96Migration.
660  private static final int LIST_CODE = 61;
661
662  private static final int WRITABLE_CODE = 14;
663
664  private static final int WRITABLE_NOT_ENCODED = 0;
665
666  private static List<TablePermission> readWritablePermissions(DataInput in, Configuration conf)
667      throws IOException, ClassNotFoundException {
668    assert WritableUtils.readVInt(in) == LIST_CODE;
669    int length = in.readInt();
670    List<TablePermission> list = new ArrayList<>(length);
671    for (int i = 0; i < length; i++) {
672      assert WritableUtils.readVInt(in) == WRITABLE_CODE;
673      assert WritableUtils.readVInt(in) == WRITABLE_NOT_ENCODED;
674      String className = Text.readString(in);
675      Class<? extends Writable> clazz = conf.getClassByName(className).asSubclass(Writable.class);
676      Writable instance = WritableFactories.newInstance(clazz, conf);
677      instance.readFields(in);
678      list.add((TablePermission) instance);
679    }
680    return list;
681  }
682
683  /**
684   * Reads a set of permissions as {@link org.apache.hadoop.io.Writable} instances from the input
685   * stream.
686   */
687  public static ListMultimap<String, TablePermission> readPermissions(byte[] data,
688      Configuration conf) throws DeserializationException {
689    if (ProtobufUtil.isPBMagicPrefix(data)) {
690      int pblen = ProtobufUtil.lengthOfPBMagic();
691      try {
692        AccessControlProtos.UsersAndPermissions.Builder builder =
693            AccessControlProtos.UsersAndPermissions.newBuilder();
694        ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
695        return AccessControlUtil.toUserTablePermissions(builder.build());
696      } catch (IOException e) {
697        throw new DeserializationException(e);
698      }
699    } else {
700      // TODO: We have to re-write non-PB data as PB encoded. Otherwise we will carry old Writables
701      // forever (here and a couple of other places).
702      ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
703      try {
704        DataInput in = new DataInputStream(new ByteArrayInputStream(data));
705        int length = in.readInt();
706        for (int i = 0; i < length; i++) {
707          String user = Text.readString(in);
708          List<TablePermission> userPerms = readWritablePermissions(in, conf);
709          perms.putAll(user, userPerms);
710        }
711      } catch (IOException | ClassNotFoundException e) {
712        throw new DeserializationException(e);
713      }
714      return perms;
715    }
716  }
717
718  public static boolean isNamespaceEntry(String entryName) {
719    return entryName != null && entryName.charAt(0) == NAMESPACE_PREFIX;
720  }
721
722  public static boolean isNamespaceEntry(byte[] entryName) {
723    return entryName != null && entryName.length !=0 && entryName[0] == NAMESPACE_PREFIX;
724  }
725
726  public static String toNamespaceEntry(String namespace) {
727    return NAMESPACE_PREFIX + namespace;
728  }
729
730  public static String fromNamespaceEntry(String namespace) {
731    if(namespace.charAt(0) != NAMESPACE_PREFIX)
732      throw new IllegalArgumentException("Argument is not a valid namespace entry");
733    return namespace.substring(1);
734  }
735
736  public static byte[] toNamespaceEntry(byte[] namespace) {
737    byte[] ret = new byte[namespace.length+1];
738    ret[0] = NAMESPACE_PREFIX;
739    System.arraycopy(namespace, 0, ret, 1, namespace.length);
740    return ret;
741  }
742
743  public static byte[] fromNamespaceEntry(byte[] namespace) {
744    if(namespace[0] != NAMESPACE_PREFIX) {
745      throw new IllegalArgumentException("Argument is not a valid namespace entry: " +
746          Bytes.toString(namespace));
747    }
748    return Arrays.copyOfRange(namespace, 1, namespace.length);
749  }
750
751  public static List<Permission> getCellPermissionsForUser(User user, Cell cell)
752      throws IOException {
753    // Save an object allocation where we can
754    if (cell.getTagsLength() == 0) {
755      return null;
756    }
757    List<Permission> results = Lists.newArrayList();
758    Iterator<Tag> tagsIterator = PrivateCellUtil.tagsIterator(cell);
759    while (tagsIterator.hasNext()) {
760      Tag tag = tagsIterator.next();
761      if (tag.getType() == ACL_TAG_TYPE) {
762        // Deserialize the table permissions from the KV
763        // TODO: This can be improved. Don't build UsersAndPermissions just to unpack it again,
764        // use the builder
765        AccessControlProtos.UsersAndPermissions.Builder builder =
766            AccessControlProtos.UsersAndPermissions.newBuilder();
767        if (tag.hasArray()) {
768          ProtobufUtil.mergeFrom(builder, tag.getValueArray(), tag.getValueOffset(), tag.getValueLength());
769        } else {
770          ProtobufUtil.mergeFrom(builder, Tag.cloneValue(tag));
771        }
772        ListMultimap<String,Permission> kvPerms =
773            AccessControlUtil.toUsersAndPermissions(builder.build());
774        // Are there permissions for this user?
775        List<Permission> userPerms = kvPerms.get(user.getShortName());
776        if (userPerms != null) {
777          results.addAll(userPerms);
778        }
779        // Are there permissions for any of the groups this user belongs to?
780        String groupNames[] = user.getGroupNames();
781        if (groupNames != null) {
782          for (String group : groupNames) {
783            List<Permission> groupPerms = kvPerms.get(AuthUtil.toGroupEntry(group));
784            if (results != null) {
785              results.addAll(groupPerms);
786            }
787          }
788        }
789      }
790    }
791    return results;
792  }
793}