001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security.access;
020
021import java.io.ByteArrayInputStream;
022import java.io.DataInput;
023import java.io.DataInputStream;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.TreeMap;
033import java.util.TreeSet;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.AuthUtil;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.Cell.Type;
039import org.apache.hadoop.hbase.CellBuilderFactory;
040import org.apache.hadoop.hbase.CellBuilderType;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.CompareOperator;
043import org.apache.hadoop.hbase.NamespaceDescriptor;
044import org.apache.hadoop.hbase.PrivateCellUtil;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.Tag;
047import org.apache.hadoop.hbase.TagType;
048import org.apache.hadoop.hbase.client.Connection;
049import org.apache.hadoop.hbase.client.ConnectionFactory;
050import org.apache.hadoop.hbase.client.Delete;
051import org.apache.hadoop.hbase.client.Get;
052import org.apache.hadoop.hbase.client.Put;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.ResultScanner;
055import org.apache.hadoop.hbase.client.Scan;
056import org.apache.hadoop.hbase.client.Table;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.exceptions.DeserializationException;
059import org.apache.hadoop.hbase.filter.QualifierFilter;
060import org.apache.hadoop.hbase.filter.RegexStringComparator;
061import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
062import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
063import org.apache.hadoop.hbase.regionserver.InternalScanner;
064import org.apache.hadoop.hbase.regionserver.Region;
065import org.apache.hadoop.hbase.security.User;
066import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
067import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
068import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
069import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
070import org.apache.hadoop.hbase.util.Bytes;
071import org.apache.hadoop.hbase.util.Pair;
072import org.apache.hadoop.io.Text;
073import org.apache.hadoop.io.Writable;
074import org.apache.hadoop.io.WritableFactories;
075import org.apache.hadoop.io.WritableUtils;
076import org.apache.yetus.audience.InterfaceAudience;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080/**
081 * Maintains lists of permission grants to users and groups to allow for
082 * authorization checks by {@link AccessController}.
083 *
084 * <p>
085 * Access control lists are stored in an "internal" metadata table named
086 * {@code _acl_}. Each table's permission grants are stored as a separate row,
087 * keyed by the table name. KeyValues for permissions assignments are stored
088 * in one of the formats:
089 * <pre>
090 * Key                      Desc
091 * --------                 --------
092 * user                     table level permissions for a user [R=read, W=write]
093 * group                    table level permissions for a group
094 * user,family              column family level permissions for a user
095 * group,family             column family level permissions for a group
096 * user,family,qualifier    column qualifier level permissions for a user
097 * group,family,qualifier   column qualifier level permissions for a group
098 * </pre>
099 * <p>
100 * All values are encoded as byte arrays containing the codes from the
101 * org.apache.hadoop.hbase.security.access.TablePermission.Action enum.
102 * </p>
103 */
104@InterfaceAudience.Private
105public class AccessControlLists {
106  /** Internal storage table for access control lists */
107  public static final TableName ACL_TABLE_NAME =
108      TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
109  public static final byte[] ACL_GLOBAL_NAME = ACL_TABLE_NAME.getName();
110  /** Column family used to store ACL grants */
111  public static final String ACL_LIST_FAMILY_STR = "l";
112  public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
113  /** KV tag to store per cell access control lists */
114  public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE;
115
116  public static final char NAMESPACE_PREFIX = '@';
117
118  /**
119   * Delimiter to separate user, column family, and qualifier in
120   * _acl_ table info: column keys */
121  public static final char ACL_KEY_DELIMITER = ',';
122
123  private static final Logger LOG = LoggerFactory.getLogger(AccessControlLists.class);
124
125  /**
126   * Stores a new user permission grant in the access control lists table.
127   * @param conf the configuration
128   * @param userPerm the details of the permission to be granted
129   * @param t acl table instance. It is closed upon method return.
130   * @throws IOException in the case of an error accessing the metadata table
131   */
132  static void addUserPermission(Configuration conf, UserPermission userPerm, Table t,
133                                boolean mergeExistingPermissions) throws IOException {
134    Permission.Action[] actions = userPerm.getActions();
135    byte[] rowKey = userPermissionRowKey(userPerm);
136    Put p = new Put(rowKey);
137    byte[] key = userPermissionKey(userPerm);
138
139    if ((actions == null) || (actions.length == 0)) {
140      String msg = "No actions associated with user '" + Bytes.toString(userPerm.getUser()) + "'";
141      LOG.warn(msg);
142      throw new IOException(msg);
143    }
144
145    Set<Permission.Action> actionSet = new TreeSet<Permission.Action>();
146    if(mergeExistingPermissions){
147      List<UserPermission> perms = getUserPermissions(conf, rowKey);
148      UserPermission currentPerm = null;
149      for (UserPermission perm : perms) {
150        if (Bytes.equals(perm.getUser(), userPerm.getUser())
151                && ((userPerm.isGlobal() && ACL_TABLE_NAME.equals(perm.getTableName()))
152                || perm.tableFieldsEqual(userPerm))) {
153          currentPerm = perm;
154          break;
155        }
156      }
157
158      if(currentPerm != null && currentPerm.getActions() != null){
159        actionSet.addAll(Arrays.asList(currentPerm.getActions()));
160      }
161    }
162
163    // merge current action with new action.
164    actionSet.addAll(Arrays.asList(actions));
165
166    // serialize to byte array.
167    byte[] value = new byte[actionSet.size()];
168    int index = 0;
169    for (Permission.Action action : actionSet) {
170      value[index++] = action.code();
171    }
172    p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
173        .setRow(p.getRow())
174        .setFamily(ACL_LIST_FAMILY)
175        .setQualifier(key)
176        .setTimestamp(p.getTimestamp())
177        .setType(Type.Put)
178        .setValue(value)
179        .build());
180    if (LOG.isDebugEnabled()) {
181      LOG.debug("Writing permission with rowKey "+
182          Bytes.toString(rowKey)+" "+
183          Bytes.toString(key)+": "+Bytes.toStringBinary(value)
184          );
185    }
186    try {
187      /**
188       * TODO: Use Table.put(Put) instead. This Table.put() happens within the RS. We are already in
189       * AccessController. Means already there was an RPC happened to server (Actual grant call from
190       * client side). At RpcServer we have a ThreadLocal where we keep the CallContext and inside
191       * that the current RPC called user info is set. The table on which put was called is created
192       * via the RegionCP env and that uses a special Connection. The normal RPC channel will be by
193       * passed here means there would have no further contact on to the RpcServer. So the
194       * ThreadLocal is never getting reset. We ran the new put as a super user (User.runAsLoginUser
195       * where the login user is the user who started RS process) but still as per the RPC context
196       * it is the old user. When AsyncProcess was used, the execute happen via another thread from
197       * pool and so old ThreadLocal variable is not accessible and so it looks as if no Rpc context
198       * and we were relying on the super user who starts the RS process.
199       */
200      t.put(Collections.singletonList(p));
201    } finally {
202      t.close();
203    }
204  }
205
206  static void addUserPermission(Configuration conf, UserPermission userPerm, Table t)
207          throws IOException{
208    addUserPermission(conf, userPerm, t, false);
209  }
210
211  /**
212   * Removes a previously granted permission from the stored access control
213   * lists.  The {@link TablePermission} being removed must exactly match what
214   * is stored -- no wildcard matching is attempted.  Ie, if user "bob" has
215   * been granted "READ" access to the "data" table, but only to column family
216   * plus qualifier "info:colA", then trying to call this method with only
217   * user "bob" and the table name "data" (but without specifying the
218   * column qualifier "info:colA") will have no effect.
219   *
220   * @param conf the configuration
221   * @param userPerm the details of the permission to be revoked
222   * @param t acl table
223   * @throws IOException if there is an error accessing the metadata table
224   */
225  static void removeUserPermission(Configuration conf, UserPermission userPerm, Table t)
226      throws IOException {
227    if (null == userPerm.getActions()) {
228      removePermissionRecord(conf, userPerm, t);
229    } else {
230      // Get all the global user permissions from the acl table
231      List<UserPermission> permsList = getUserPermissions(conf, userPermissionRowKey(userPerm));
232      List<Permission.Action> remainingActions = new ArrayList<>();
233      List<Permission.Action> dropActions = Arrays.asList(userPerm.getActions());
234      for (UserPermission perm : permsList) {
235        // Find the user and remove only the requested permissions
236        if (Bytes.toString(perm.getUser()).equals(Bytes.toString(userPerm.getUser()))) {
237          for (Permission.Action oldAction : perm.getActions()) {
238            if (!dropActions.contains(oldAction)) {
239              remainingActions.add(oldAction);
240            }
241          }
242          if (!remainingActions.isEmpty()) {
243            perm.setActions(remainingActions.toArray(new Permission.Action[remainingActions.size()]));
244            addUserPermission(conf, perm, t);
245          } else {
246            removePermissionRecord(conf, userPerm, t);
247          }
248          break;
249        }
250      }
251    }
252    if (LOG.isDebugEnabled()) {
253      LOG.debug("Removed permission "+ userPerm.toString());
254    }
255  }
256
257  private static void removePermissionRecord(Configuration conf, UserPermission userPerm, Table t)
258      throws IOException {
259    Delete d = new Delete(userPermissionRowKey(userPerm));
260    d.addColumns(ACL_LIST_FAMILY, userPermissionKey(userPerm));
261    try {
262      t.delete(d);
263    } finally {
264      t.close();
265    }
266  }
267
268  /**
269   * Remove specified table from the _acl_ table.
270   */
271  static void removeTablePermissions(Configuration conf, TableName tableName, Table t)
272      throws IOException{
273    Delete d = new Delete(tableName.getName());
274
275    if (LOG.isDebugEnabled()) {
276      LOG.debug("Removing permissions of removed table "+ tableName);
277    }
278    try {
279      t.delete(d);
280    } finally {
281      t.close();
282    }
283  }
284
285  /**
286   * Remove specified namespace from the acl table.
287   */
288  static void removeNamespacePermissions(Configuration conf, String namespace, Table t)
289      throws IOException{
290    Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
291
292    if (LOG.isDebugEnabled()) {
293      LOG.debug("Removing permissions of removed namespace "+ namespace);
294    }
295
296    try {
297      t.delete(d);
298    } finally {
299      t.close();
300    }
301  }
302
303  static private void removeTablePermissions(TableName tableName, byte[] column, Table table,
304      boolean closeTable) throws IOException {
305    Scan scan = new Scan();
306    scan.addFamily(ACL_LIST_FAMILY);
307
308    String columnName = Bytes.toString(column);
309    scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new RegexStringComparator(
310        String.format("(%s%s%s)|(%s%s)$",
311            ACL_KEY_DELIMITER, columnName, ACL_KEY_DELIMITER,
312            ACL_KEY_DELIMITER, columnName))));
313
314    Set<byte[]> qualifierSet = new TreeSet<>(Bytes.BYTES_COMPARATOR);
315    ResultScanner scanner = null;
316    try {
317      scanner = table.getScanner(scan);
318      for (Result res : scanner) {
319        for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
320          qualifierSet.add(q);
321        }
322      }
323
324      if (qualifierSet.size() > 0) {
325        Delete d = new Delete(tableName.getName());
326        for (byte[] qualifier : qualifierSet) {
327          d.addColumns(ACL_LIST_FAMILY, qualifier);
328        }
329        table.delete(d);
330      }
331    } finally {
332      if (scanner != null) scanner.close();
333      if (closeTable) table.close();
334    }
335  }
336
337  /**
338   * Remove specified table column from the acl table.
339   */
340  static void removeTablePermissions(Configuration conf, TableName tableName, byte[] column,
341      Table t) throws IOException {
342    if (LOG.isDebugEnabled()) {
343      LOG.debug("Removing permissions of removed column " + Bytes.toString(column) +
344          " from table "+ tableName);
345    }
346    removeTablePermissions(tableName, column, t, true);
347  }
348
349  static byte[] userPermissionRowKey(UserPermission userPerm) {
350    byte[] row;
351    if(userPerm.hasNamespace()) {
352      row = Bytes.toBytes(toNamespaceEntry(userPerm.getNamespace()));
353    } else if(userPerm.isGlobal()) {
354      row = ACL_GLOBAL_NAME;
355    } else {
356      row = userPerm.getTableName().getName();
357    }
358    return row;
359  }
360
361  /**
362   * Build qualifier key from user permission:
363   *  username
364   *  username,family
365   *  username,family,qualifier
366   */
367  static byte[] userPermissionKey(UserPermission userPerm) {
368    byte[] qualifier = userPerm.getQualifier();
369    byte[] family = userPerm.getFamily();
370    byte[] key = userPerm.getUser();
371
372    if (family != null && family.length > 0) {
373      key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, family));
374      if (qualifier != null && qualifier.length > 0) {
375        key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, qualifier));
376      }
377    }
378
379    return key;
380  }
381
382  /**
383   * Returns {@code true} if the given region is part of the {@code _acl_}
384   * metadata table.
385   */
386  static boolean isAclRegion(Region region) {
387    return ACL_TABLE_NAME.equals(region.getTableDescriptor().getTableName());
388  }
389
390  /**
391   * Returns {@code true} if the given table is {@code _acl_} metadata table.
392   */
393  static boolean isAclTable(TableDescriptor desc) {
394    return ACL_TABLE_NAME.equals(desc.getTableName());
395  }
396
397  /**
398   * Loads all of the permission grants stored in a region of the {@code _acl_}
399   * table.
400   *
401   * @param aclRegion
402   * @return a map of the permissions for this table.
403   * @throws IOException
404   */
405  static Map<byte[], ListMultimap<String,TablePermission>> loadAll(Region aclRegion)
406      throws IOException {
407
408    if (!isAclRegion(aclRegion)) {
409      throw new IOException("Can only load permissions from "+ACL_TABLE_NAME);
410    }
411
412    Map<byte[], ListMultimap<String, TablePermission>> allPerms = new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
413
414    // do a full scan of _acl_ table
415
416    Scan scan = new Scan();
417    scan.addFamily(ACL_LIST_FAMILY);
418
419    InternalScanner iScanner = null;
420    try {
421      iScanner = aclRegion.getScanner(scan);
422
423      while (true) {
424        List<Cell> row = new ArrayList<>();
425
426        boolean hasNext = iScanner.next(row);
427        ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
428        byte[] entry = null;
429        for (Cell kv : row) {
430          if (entry == null) {
431            entry = CellUtil.cloneRow(kv);
432          }
433          Pair<String,TablePermission> permissionsOfUserOnTable =
434              parsePermissionRecord(entry, kv);
435          if (permissionsOfUserOnTable != null) {
436            String username = permissionsOfUserOnTable.getFirst();
437            TablePermission permissions = permissionsOfUserOnTable.getSecond();
438            perms.put(username, permissions);
439          }
440        }
441        if (entry != null) {
442          allPerms.put(entry, perms);
443        }
444        if (!hasNext) {
445          break;
446        }
447      }
448    } finally {
449      if (iScanner != null) {
450        iScanner.close();
451      }
452    }
453
454    return allPerms;
455  }
456
457  /**
458   * Load all permissions from the region server holding {@code _acl_},
459   * primarily intended for testing purposes.
460   */
461  static Map<byte[], ListMultimap<String,TablePermission>> loadAll(
462      Configuration conf) throws IOException {
463    Map<byte[], ListMultimap<String,TablePermission>> allPerms = new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
464
465    // do a full scan of _acl_, filtering on only first table region rows
466
467    Scan scan = new Scan();
468    scan.addFamily(ACL_LIST_FAMILY);
469
470    ResultScanner scanner = null;
471    // TODO: Pass in a Connection rather than create one each time.
472    try (Connection connection = ConnectionFactory.createConnection(conf)) {
473      try (Table table = connection.getTable(ACL_TABLE_NAME)) {
474        scanner = table.getScanner(scan);
475        try {
476          for (Result row : scanner) {
477            ListMultimap<String,TablePermission> resultPerms = parsePermissions(row.getRow(), row);
478            allPerms.put(row.getRow(), resultPerms);
479          }
480        } finally {
481          if (scanner != null) scanner.close();
482        }
483      }
484    }
485
486    return allPerms;
487  }
488
489  public static ListMultimap<String, TablePermission> getTablePermissions(Configuration conf,
490      TableName tableName) throws IOException {
491    return getPermissions(conf, tableName != null ? tableName.getName() : null, null);
492  }
493
494  @VisibleForTesting
495  public static ListMultimap<String, TablePermission> getNamespacePermissions(Configuration conf,
496      String namespace) throws IOException {
497    return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), null);
498  }
499
500  /**
501   * Reads user permission assignments stored in the <code>l:</code> column
502   * family of the first table row in <code>_acl_</code>.
503   *
504   * <p>
505   * See {@link AccessControlLists class documentation} for the key structure
506   * used for storage.
507   * </p>
508   */
509  static ListMultimap<String, TablePermission> getPermissions(Configuration conf,
510      byte[] entryName, Table t) throws IOException {
511    if (entryName == null) entryName = ACL_GLOBAL_NAME;
512
513    // for normal user tables, we just read the table row from _acl_
514    ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
515    Get get = new Get(entryName);
516    get.addFamily(ACL_LIST_FAMILY);
517    Result row = null;
518    if (t == null) {
519      try (Connection connection = ConnectionFactory.createConnection(conf)) {
520        try (Table table = connection.getTable(ACL_TABLE_NAME)) {
521          row = table.get(get);
522        }
523      }
524    } else {
525      row = t.get(get);
526    }
527    if (!row.isEmpty()) {
528      perms = parsePermissions(entryName, row);
529    } else {
530      LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
531          + Bytes.toString(entryName));
532    }
533
534    return perms;
535  }
536
537  /**
538   * Returns the currently granted permissions for a given table as a list of
539   * user plus associated permissions.
540   */
541  static List<UserPermission> getUserTablePermissions(
542      Configuration conf, TableName tableName) throws IOException {
543    return getUserPermissions(conf, tableName == null ? null : tableName.getName());
544  }
545
546  static List<UserPermission> getUserNamespacePermissions(
547      Configuration conf, String namespace) throws IOException {
548    return getUserPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)));
549  }
550
551  static List<UserPermission> getUserPermissions(
552      Configuration conf, byte[] entryName)
553          throws IOException {
554    ListMultimap<String,TablePermission> allPerms = getPermissions(
555        conf, entryName, null);
556
557    List<UserPermission> perms = new ArrayList<>();
558
559    if(isNamespaceEntry(entryName)) {  // Namespace
560      for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
561        UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
562            entry.getValue().getNamespace(), entry.getValue().getActions());
563        perms.add(up);
564      }
565    } else {  // Table
566      for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
567        UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
568            entry.getValue().getTableName(), entry.getValue().getFamily(),
569            entry.getValue().getQualifier(), entry.getValue().getActions());
570        perms.add(up);
571      }
572    }
573    return perms;
574  }
575
576  private static ListMultimap<String, TablePermission> parsePermissions(
577      byte[] entryName, Result result) {
578    ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
579    if (result != null && result.size() > 0) {
580      for (Cell kv : result.rawCells()) {
581
582        Pair<String,TablePermission> permissionsOfUserOnTable =
583            parsePermissionRecord(entryName, kv);
584
585        if (permissionsOfUserOnTable != null) {
586          String username = permissionsOfUserOnTable.getFirst();
587          TablePermission permissions = permissionsOfUserOnTable.getSecond();
588          perms.put(username, permissions);
589        }
590      }
591    }
592    return perms;
593  }
594
595  private static Pair<String, TablePermission> parsePermissionRecord(
596      byte[] entryName, Cell kv) {
597    // return X given a set of permissions encoded in the permissionRecord kv.
598    byte[] family = CellUtil.cloneFamily(kv);
599
600    if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
601      return null;
602    }
603
604    byte[] key = CellUtil.cloneQualifier(kv);
605    byte[] value = CellUtil.cloneValue(kv);
606    if (LOG.isDebugEnabled()) {
607      LOG.debug("Read acl: kv ["+
608          Bytes.toStringBinary(key)+": "+
609          Bytes.toStringBinary(value)+"]");
610    }
611
612    // check for a column family appended to the key
613    // TODO: avoid the string conversion to make this more efficient
614    String username = Bytes.toString(key);
615
616    //Handle namespace entry
617    if(isNamespaceEntry(entryName)) {
618      return new Pair<>(username, new TablePermission(Bytes.toString(fromNamespaceEntry(entryName)), value));
619    }
620
621    //Handle table and global entry
622    //TODO global entry should be handled differently
623    int idx = username.indexOf(ACL_KEY_DELIMITER);
624    byte[] permFamily = null;
625    byte[] permQualifier = null;
626    if (idx > 0 && idx < username.length()-1) {
627      String remainder = username.substring(idx+1);
628      username = username.substring(0, idx);
629      idx = remainder.indexOf(ACL_KEY_DELIMITER);
630      if (idx > 0 && idx < remainder.length()-1) {
631        permFamily = Bytes.toBytes(remainder.substring(0, idx));
632        permQualifier = Bytes.toBytes(remainder.substring(idx+1));
633      } else {
634        permFamily = Bytes.toBytes(remainder);
635      }
636    }
637
638    return new Pair<>(username, new TablePermission(TableName.valueOf(entryName), permFamily, permQualifier, value));
639  }
640
641  /**
642   * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances
643   * and returns the resulting byte array.
644   *
645   * Writes a set of permission [user: table permission]
646   */
647  public static byte[] writePermissionsAsBytes(ListMultimap<String, TablePermission> perms,
648      Configuration conf) {
649    return ProtobufUtil.prependPBMagic(AccessControlUtil.toUserTablePermissions(perms).toByteArray());
650  }
651
652  // This is part of the old HbaseObjectWritableFor96Migration.
653  private static final int LIST_CODE = 61;
654
655  private static final int WRITABLE_CODE = 14;
656
657  private static final int WRITABLE_NOT_ENCODED = 0;
658
659  private static List<TablePermission> readWritablePermissions(DataInput in, Configuration conf)
660      throws IOException, ClassNotFoundException {
661    assert WritableUtils.readVInt(in) == LIST_CODE;
662    int length = in.readInt();
663    List<TablePermission> list = new ArrayList<>(length);
664    for (int i = 0; i < length; i++) {
665      assert WritableUtils.readVInt(in) == WRITABLE_CODE;
666      assert WritableUtils.readVInt(in) == WRITABLE_NOT_ENCODED;
667      String className = Text.readString(in);
668      Class<? extends Writable> clazz = conf.getClassByName(className).asSubclass(Writable.class);
669      Writable instance = WritableFactories.newInstance(clazz, conf);
670      instance.readFields(in);
671      list.add((TablePermission) instance);
672    }
673    return list;
674  }
675
676  /**
677   * Reads a set of permissions as {@link org.apache.hadoop.io.Writable} instances from the input
678   * stream.
679   */
680  public static ListMultimap<String, TablePermission> readPermissions(byte[] data,
681      Configuration conf) throws DeserializationException {
682    if (ProtobufUtil.isPBMagicPrefix(data)) {
683      int pblen = ProtobufUtil.lengthOfPBMagic();
684      try {
685        AccessControlProtos.UsersAndPermissions.Builder builder =
686            AccessControlProtos.UsersAndPermissions.newBuilder();
687        ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
688        return AccessControlUtil.toUserTablePermissions(builder.build());
689      } catch (IOException e) {
690        throw new DeserializationException(e);
691      }
692    } else {
693      // TODO: We have to re-write non-PB data as PB encoded. Otherwise we will carry old Writables
694      // forever (here and a couple of other places).
695      ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
696      try {
697        DataInput in = new DataInputStream(new ByteArrayInputStream(data));
698        int length = in.readInt();
699        for (int i = 0; i < length; i++) {
700          String user = Text.readString(in);
701          List<TablePermission> userPerms = readWritablePermissions(in, conf);
702          perms.putAll(user, userPerms);
703        }
704      } catch (IOException | ClassNotFoundException e) {
705        throw new DeserializationException(e);
706      }
707      return perms;
708    }
709  }
710
711  public static boolean isNamespaceEntry(String entryName) {
712    return entryName != null && entryName.charAt(0) == NAMESPACE_PREFIX;
713  }
714
715  public static boolean isNamespaceEntry(byte[] entryName) {
716    return entryName != null && entryName.length !=0 && entryName[0] == NAMESPACE_PREFIX;
717  }
718
719  public static String toNamespaceEntry(String namespace) {
720    return NAMESPACE_PREFIX + namespace;
721  }
722
723  public static String fromNamespaceEntry(String namespace) {
724    if(namespace.charAt(0) != NAMESPACE_PREFIX)
725      throw new IllegalArgumentException("Argument is not a valid namespace entry");
726    return namespace.substring(1);
727  }
728
729  public static byte[] toNamespaceEntry(byte[] namespace) {
730    byte[] ret = new byte[namespace.length+1];
731    ret[0] = NAMESPACE_PREFIX;
732    System.arraycopy(namespace, 0, ret, 1, namespace.length);
733    return ret;
734  }
735
736  public static byte[] fromNamespaceEntry(byte[] namespace) {
737    if(namespace[0] != NAMESPACE_PREFIX) {
738      throw new IllegalArgumentException("Argument is not a valid namespace entry: " +
739          Bytes.toString(namespace));
740    }
741    return Arrays.copyOfRange(namespace, 1, namespace.length);
742  }
743
744  public static List<Permission> getCellPermissionsForUser(User user, Cell cell)
745      throws IOException {
746    // Save an object allocation where we can
747    if (cell.getTagsLength() == 0) {
748      return null;
749    }
750    List<Permission> results = Lists.newArrayList();
751    Iterator<Tag> tagsIterator = PrivateCellUtil.tagsIterator(cell);
752    while (tagsIterator.hasNext()) {
753      Tag tag = tagsIterator.next();
754      if (tag.getType() == ACL_TAG_TYPE) {
755        // Deserialize the table permissions from the KV
756        // TODO: This can be improved. Don't build UsersAndPermissions just to unpack it again,
757        // use the builder
758        AccessControlProtos.UsersAndPermissions.Builder builder =
759            AccessControlProtos.UsersAndPermissions.newBuilder();
760        if (tag.hasArray()) {
761          ProtobufUtil.mergeFrom(builder, tag.getValueArray(), tag.getValueOffset(), tag.getValueLength());
762        } else {
763          ProtobufUtil.mergeFrom(builder, Tag.cloneValue(tag));
764        }
765        ListMultimap<String,Permission> kvPerms =
766            AccessControlUtil.toUsersAndPermissions(builder.build());
767        // Are there permissions for this user?
768        List<Permission> userPerms = kvPerms.get(user.getShortName());
769        if (userPerms != null) {
770          results.addAll(userPerms);
771        }
772        // Are there permissions for any of the groups this user belongs to?
773        String groupNames[] = user.getGroupNames();
774        if (groupNames != null) {
775          for (String group : groupNames) {
776            List<Permission> groupPerms = kvPerms.get(AuthUtil.toGroupEntry(group));
777            if (results != null) {
778              results.addAll(groupPerms);
779            }
780          }
781        }
782      }
783    }
784    return results;
785  }
786}