001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.access;
019
020import java.io.ByteArrayInputStream;
021import java.io.DataInput;
022import java.io.DataInputStream;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.TreeMap;
031import java.util.TreeSet;
032import org.apache.commons.lang3.StringUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.AuthUtil;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.Cell.Type;
037import org.apache.hadoop.hbase.CellBuilderFactory;
038import org.apache.hadoop.hbase.CellBuilderType;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.CompareOperator;
041import org.apache.hadoop.hbase.NamespaceDescriptor;
042import org.apache.hadoop.hbase.PrivateCellUtil;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.Tag;
045import org.apache.hadoop.hbase.TagType;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.client.Delete;
049import org.apache.hadoop.hbase.client.Get;
050import org.apache.hadoop.hbase.client.Put;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.ResultScanner;
053import org.apache.hadoop.hbase.client.Scan;
054import org.apache.hadoop.hbase.client.Table;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.exceptions.DeserializationException;
057import org.apache.hadoop.hbase.filter.QualifierFilter;
058import org.apache.hadoop.hbase.filter.RegexStringComparator;
059import org.apache.hadoop.hbase.regionserver.InternalScanner;
060import org.apache.hadoop.hbase.regionserver.Region;
061import org.apache.hadoop.hbase.security.User;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.Pair;
064import org.apache.hadoop.io.Text;
065import org.apache.hadoop.io.Writable;
066import org.apache.hadoop.io.WritableFactories;
067import org.apache.hadoop.io.WritableUtils;
068import org.apache.yetus.audience.InterfaceAudience;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
073import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
074import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
075
076import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
078
079/**
080 * Maintains lists of permission grants to users and groups to allow for authorization checks by
081 * {@link AccessController}.
082 * <p>
083 * Access control lists are stored in an "internal" metadata table named {@code _acl_}. Each table's
084 * permission grants are stored as a separate row, keyed by the table name. KeyValues for
085 * permissions assignments are stored in one of the formats:
086 *
087 * <pre>
088 * Key                      Desc
089 * --------                 --------
090 * user                     table level permissions for a user [R=read, W=write]
091 * group                    table level permissions for a group
092 * user,family              column family level permissions for a user
093 * group,family             column family level permissions for a group
094 * user,family,qualifier    column qualifier level permissions for a user
095 * group,family,qualifier   column qualifier level permissions for a group
096 * </pre>
097 * <p>
098 * All values are encoded as byte arrays containing the codes from the
099 * org.apache.hadoop.hbase.security.access.TablePermission.Action enum.
100 * </p>
101 */
102@InterfaceAudience.Private
103public final class PermissionStorage {
104  /** Internal storage table for access control lists */
105  public static final TableName ACL_TABLE_NAME =
106    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
107  public static final byte[] ACL_GLOBAL_NAME = ACL_TABLE_NAME.getName();
108  /** Column family used to store ACL grants */
109  public static final String ACL_LIST_FAMILY_STR = "l";
110  public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
111  /** KV tag to store per cell access control lists */
112  public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE;
113
114  public static final char NAMESPACE_PREFIX = '@';
115
116  /**
117   * Delimiter to separate user, column family, and qualifier in _acl_ table info: column keys
118   */
119  public static final char ACL_KEY_DELIMITER = ',';
120
121  private static final Logger LOG = LoggerFactory.getLogger(PermissionStorage.class);
122
123  private PermissionStorage() {
124  }
125
126  /**
127   * Stores a new user permission grant in the access control lists table.
128   * @param conf     the configuration
129   * @param userPerm the details of the permission to be granted
130   * @param t        acl table instance. It is closed upon method return.
131   * @throws IOException in the case of an error accessing the metadata table
132   */
133  public static void addUserPermission(Configuration conf, UserPermission userPerm, Table t,
134    boolean mergeExistingPermissions) throws IOException {
135    Permission permission = userPerm.getPermission();
136    Permission.Action[] actions = permission.getActions();
137    byte[] rowKey = userPermissionRowKey(permission);
138    Put p = new Put(rowKey);
139    byte[] key = userPermissionKey(userPerm);
140
141    if ((actions == null) || (actions.length == 0)) {
142      String msg = "No actions associated with user '" + userPerm.getUser() + "'";
143      LOG.warn(msg);
144      throw new IOException(msg);
145    }
146
147    Set<Permission.Action> actionSet = new TreeSet<Permission.Action>();
148    if (mergeExistingPermissions) {
149      List<UserPermission> perms = getUserPermissions(conf, rowKey, null, null, null, false);
150      UserPermission currentPerm = null;
151      for (UserPermission perm : perms) {
152        if (userPerm.equalsExceptActions(perm)) {
153          currentPerm = perm;
154          break;
155        }
156      }
157
158      if (currentPerm != null && currentPerm.getPermission().getActions() != null) {
159        actionSet.addAll(Arrays.asList(currentPerm.getPermission().getActions()));
160      }
161    }
162
163    // merge current action with new action.
164    actionSet.addAll(Arrays.asList(actions));
165
166    // serialize to byte array.
167    byte[] value = new byte[actionSet.size()];
168    int index = 0;
169    for (Permission.Action action : actionSet) {
170      value[index++] = action.code();
171    }
172    p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(p.getRow())
173      .setFamily(ACL_LIST_FAMILY).setQualifier(key).setTimestamp(p.getTimestamp()).setType(Type.Put)
174      .setValue(value).build());
175    if (LOG.isDebugEnabled()) {
176      LOG.debug("Writing permission with rowKey " + Bytes.toString(rowKey) + " "
177        + Bytes.toString(key) + ": " + Bytes.toStringBinary(value));
178    }
179    try {
180      t.put(p);
181    } finally {
182      t.close();
183    }
184  }
185
186  static void addUserPermission(Configuration conf, UserPermission userPerm, Table t)
187    throws IOException {
188    addUserPermission(conf, userPerm, t, false);
189  }
190
191  /**
192   * Removes a previously granted permission from the stored access control lists. The
193   * {@link TablePermission} being removed must exactly match what is stored -- no wildcard matching
194   * is attempted. Ie, if user "bob" has been granted "READ" access to the "data" table, but only to
195   * column family plus qualifier "info:colA", then trying to call this method with only user "bob"
196   * and the table name "data" (but without specifying the column qualifier "info:colA") will have
197   * no effect.
198   * @param conf     the configuration
199   * @param userPerm the details of the permission to be revoked
200   * @param t        acl table
201   * @throws IOException if there is an error accessing the metadata table
202   */
203  public static void removeUserPermission(Configuration conf, UserPermission userPerm, Table t)
204    throws IOException {
205    if (
206      null == userPerm.getPermission().getActions()
207        || userPerm.getPermission().getActions().length == 0
208    ) {
209      removePermissionRecord(conf, userPerm, t);
210    } else {
211      // Get all the global user permissions from the acl table
212      List<UserPermission> permsList = getUserPermissions(conf,
213        userPermissionRowKey(userPerm.getPermission()), null, null, null, false);
214      List<Permission.Action> remainingActions = new ArrayList<>();
215      List<Permission.Action> dropActions = Arrays.asList(userPerm.getPermission().getActions());
216      for (UserPermission perm : permsList) {
217        // Find the user and remove only the requested permissions
218        if (perm.getUser().equals(userPerm.getUser())) {
219          for (Permission.Action oldAction : perm.getPermission().getActions()) {
220            if (!dropActions.contains(oldAction)) {
221              remainingActions.add(oldAction);
222            }
223          }
224          if (!remainingActions.isEmpty()) {
225            perm.getPermission()
226              .setActions(remainingActions.toArray(new Permission.Action[remainingActions.size()]));
227            addUserPermission(conf, perm, t);
228          } else {
229            removePermissionRecord(conf, userPerm, t);
230          }
231          break;
232        }
233      }
234    }
235    if (LOG.isDebugEnabled()) {
236      LOG.debug("Removed permission " + userPerm.toString());
237    }
238  }
239
240  private static void removePermissionRecord(Configuration conf, UserPermission userPerm, Table t)
241    throws IOException {
242    Delete d = new Delete(userPermissionRowKey(userPerm.getPermission()));
243    d.addColumns(ACL_LIST_FAMILY, userPermissionKey(userPerm));
244    try {
245      t.delete(d);
246    } finally {
247      t.close();
248    }
249  }
250
251  /**
252   * Remove specified table from the _acl_ table.
253   */
254  static void removeTablePermissions(Configuration conf, TableName tableName, Table t)
255    throws IOException {
256    Delete d = new Delete(tableName.getName());
257    d.addFamily(ACL_LIST_FAMILY);
258
259    if (LOG.isDebugEnabled()) {
260      LOG.debug("Removing permissions of removed table " + tableName);
261    }
262    try {
263      t.delete(d);
264    } finally {
265      t.close();
266    }
267  }
268
269  /**
270   * Remove specified namespace from the acl table.
271   */
272  static void removeNamespacePermissions(Configuration conf, String namespace, Table t)
273    throws IOException {
274    Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
275    d.addFamily(ACL_LIST_FAMILY);
276    if (LOG.isDebugEnabled()) {
277      LOG.debug("Removing permissions of removed namespace " + namespace);
278    }
279
280    try {
281      t.delete(d);
282    } finally {
283      t.close();
284    }
285  }
286
287  static private void removeTablePermissions(TableName tableName, byte[] column, Table table,
288    boolean closeTable) throws IOException {
289    Scan scan = new Scan();
290    scan.addFamily(ACL_LIST_FAMILY);
291
292    String columnName = Bytes.toString(column);
293    scan.setFilter(new QualifierFilter(CompareOperator.EQUAL,
294      new RegexStringComparator(String.format("(%s%s%s)|(%s%s)$", ACL_KEY_DELIMITER, columnName,
295        ACL_KEY_DELIMITER, ACL_KEY_DELIMITER, columnName))));
296
297    Set<byte[]> qualifierSet = new TreeSet<>(Bytes.BYTES_COMPARATOR);
298    ResultScanner scanner = null;
299    try {
300      scanner = table.getScanner(scan);
301      for (Result res : scanner) {
302        for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
303          qualifierSet.add(q);
304        }
305      }
306
307      if (qualifierSet.size() > 0) {
308        Delete d = new Delete(tableName.getName());
309        for (byte[] qualifier : qualifierSet) {
310          d.addColumns(ACL_LIST_FAMILY, qualifier);
311        }
312        table.delete(d);
313      }
314    } finally {
315      if (scanner != null) {
316        scanner.close();
317      }
318      if (closeTable) {
319        table.close();
320      }
321    }
322  }
323
324  /**
325   * Remove specified table column from the acl table.
326   */
327  static void removeTablePermissions(Configuration conf, TableName tableName, byte[] column,
328    Table t) throws IOException {
329    if (LOG.isDebugEnabled()) {
330      LOG.debug("Removing permissions of removed column " + Bytes.toString(column) + " from table "
331        + tableName);
332    }
333    removeTablePermissions(tableName, column, t, true);
334  }
335
336  static byte[] userPermissionRowKey(Permission permission) {
337    byte[] row;
338    if (permission instanceof TablePermission) {
339      TablePermission tablePerm = (TablePermission) permission;
340      row = tablePerm.getTableName().getName();
341    } else if (permission instanceof NamespacePermission) {
342      NamespacePermission nsPerm = (NamespacePermission) permission;
343      row = Bytes.toBytes(toNamespaceEntry(nsPerm.getNamespace()));
344    } else {
345      // permission instanceof TablePermission
346      row = ACL_GLOBAL_NAME;
347    }
348    return row;
349  }
350
351  /**
352   * Build qualifier key from user permission: username username,family username,family,qualifier
353   */
354  static byte[] userPermissionKey(UserPermission permission) {
355    byte[] key = Bytes.toBytes(permission.getUser());
356    byte[] qualifier = null;
357    byte[] family = null;
358    if (permission.getPermission().getAccessScope() == Permission.Scope.TABLE) {
359      TablePermission tablePermission = (TablePermission) permission.getPermission();
360      family = tablePermission.getFamily();
361      qualifier = tablePermission.getQualifier();
362    }
363
364    if (family != null && family.length > 0) {
365      key = Bytes.add(key, Bytes.add(new byte[] { ACL_KEY_DELIMITER }, family));
366      if (qualifier != null && qualifier.length > 0) {
367        key = Bytes.add(key, Bytes.add(new byte[] { ACL_KEY_DELIMITER }, qualifier));
368      }
369    }
370
371    return key;
372  }
373
374  /**
375   * Returns {@code true} if the given region is part of the {@code _acl_} metadata table.
376   */
377  static boolean isAclRegion(Region region) {
378    return ACL_TABLE_NAME.equals(region.getTableDescriptor().getTableName());
379  }
380
381  /**
382   * Returns {@code true} if the given table is {@code _acl_} metadata table.
383   */
384  static boolean isAclTable(TableDescriptor desc) {
385    return ACL_TABLE_NAME.equals(desc.getTableName());
386  }
387
388  /**
389   * Loads all of the permission grants stored in a region of the {@code _acl_} table.
390   * @param aclRegion the acl region
391   * @return a map of the permissions for this table.
392   * @throws IOException if an error occurs
393   */
394  static Map<byte[], ListMultimap<String, UserPermission>> loadAll(Region aclRegion)
395    throws IOException {
396    if (!isAclRegion(aclRegion)) {
397      throw new IOException("Can only load permissions from " + ACL_TABLE_NAME);
398    }
399
400    Map<byte[], ListMultimap<String, UserPermission>> allPerms =
401      new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
402
403    // do a full scan of _acl_ table
404
405    Scan scan = new Scan();
406    scan.addFamily(ACL_LIST_FAMILY);
407
408    InternalScanner iScanner = null;
409    try {
410      iScanner = aclRegion.getScanner(scan);
411
412      while (true) {
413        List<Cell> row = new ArrayList<>();
414
415        boolean hasNext = iScanner.next(row);
416        ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
417        byte[] entry = null;
418        for (Cell kv : row) {
419          if (entry == null) {
420            entry = CellUtil.cloneRow(kv);
421          }
422          Pair<String, Permission> permissionsOfUserOnTable =
423            parsePermissionRecord(entry, kv, null, null, false, null);
424          if (permissionsOfUserOnTable != null) {
425            String username = permissionsOfUserOnTable.getFirst();
426            Permission permission = permissionsOfUserOnTable.getSecond();
427            perms.put(username, new UserPermission(username, permission));
428          }
429        }
430        if (entry != null) {
431          allPerms.put(entry, perms);
432        }
433        if (!hasNext) {
434          break;
435        }
436      }
437    } finally {
438      if (iScanner != null) {
439        iScanner.close();
440      }
441    }
442
443    return allPerms;
444  }
445
446  /**
447   * Load all permissions from the region server holding {@code _acl_}, primarily intended for
448   * testing purposes.
449   */
450  static Map<byte[], ListMultimap<String, UserPermission>> loadAll(Configuration conf)
451    throws IOException {
452    Map<byte[], ListMultimap<String, UserPermission>> allPerms =
453      new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
454
455    // do a full scan of _acl_, filtering on only first table region rows
456
457    Scan scan = new Scan();
458    scan.addFamily(ACL_LIST_FAMILY);
459
460    ResultScanner scanner = null;
461    // TODO: Pass in a Connection rather than create one each time.
462    try (Connection connection = ConnectionFactory.createConnection(conf)) {
463      try (Table table = connection.getTable(ACL_TABLE_NAME)) {
464        scanner = table.getScanner(scan);
465        try {
466          for (Result row : scanner) {
467            ListMultimap<String, UserPermission> resultPerms =
468              parsePermissions(row.getRow(), row, null, null, null, false);
469            allPerms.put(row.getRow(), resultPerms);
470          }
471        } finally {
472          if (scanner != null) {
473            scanner.close();
474          }
475        }
476      }
477    }
478
479    return allPerms;
480  }
481
482  public static ListMultimap<String, UserPermission> getTablePermissions(Configuration conf,
483    TableName tableName) throws IOException {
484    return getPermissions(conf, tableName != null ? tableName.getName() : null, null, null, null,
485      null, false);
486  }
487
488  public static ListMultimap<String, UserPermission> getNamespacePermissions(Configuration conf,
489    String namespace) throws IOException {
490    return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), null, null, null, null,
491      false);
492  }
493
494  public static ListMultimap<String, UserPermission> getGlobalPermissions(Configuration conf)
495    throws IOException {
496    return getPermissions(conf, null, null, null, null, null, false);
497  }
498
499  /**
500   * Reads user permission assignments stored in the <code>l:</code> column family of the first
501   * table row in <code>_acl_</code>.
502   * <p>
503   * See {@link PermissionStorage class documentation} for the key structure used for storage.
504   * </p>
505   */
506  static ListMultimap<String, UserPermission> getPermissions(Configuration conf, byte[] entryName,
507    Table t, byte[] cf, byte[] cq, String user, boolean hasFilterUser) throws IOException {
508    if (entryName == null) {
509      entryName = ACL_GLOBAL_NAME;
510    }
511    // for normal user tables, we just read the table row from _acl_
512    ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
513    Get get = new Get(entryName);
514    get.addFamily(ACL_LIST_FAMILY);
515    Result row = null;
516    if (t == null) {
517      try (Connection connection = ConnectionFactory.createConnection(conf)) {
518        try (Table table = connection.getTable(ACL_TABLE_NAME)) {
519          row = table.get(get);
520        }
521      }
522    } else {
523      row = t.get(get);
524    }
525    if (!row.isEmpty()) {
526      perms = parsePermissions(entryName, row, cf, cq, user, hasFilterUser);
527    } else {
528      LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
529        + Bytes.toString(entryName));
530    }
531
532    return perms;
533  }
534
535  /**
536   * Returns the currently granted permissions for a given table as the specified user plus
537   * associated permissions.
538   */
539  public static List<UserPermission> getUserTablePermissions(Configuration conf,
540    TableName tableName, byte[] cf, byte[] cq, String userName, boolean hasFilterUser)
541    throws IOException {
542    return getUserPermissions(conf, tableName == null ? null : tableName.getName(), cf, cq,
543      userName, hasFilterUser);
544  }
545
546  /**
547   * Returns the currently granted permissions for a given namespace as the specified user plus
548   * associated permissions.
549   */
550  public static List<UserPermission> getUserNamespacePermissions(Configuration conf,
551    String namespace, String user, boolean hasFilterUser) throws IOException {
552    return getUserPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), null, null, user,
553      hasFilterUser);
554  }
555
556  /**
557   * Returns the currently granted permissions for a given table/namespace with associated
558   * permissions based on the specified column family, column qualifier and user name.
559   * @param conf          the configuration
560   * @param entryName     Table name or the namespace
561   * @param cf            Column family
562   * @param cq            Column qualifier
563   * @param user          User name to be filtered from permission as requested
564   * @param hasFilterUser true if filter user is provided, otherwise false.
565   * @return List of UserPermissions
566   * @throws IOException on failure
567   */
568  public static List<UserPermission> getUserPermissions(Configuration conf, byte[] entryName,
569    byte[] cf, byte[] cq, String user, boolean hasFilterUser) throws IOException {
570    ListMultimap<String, UserPermission> allPerms =
571      getPermissions(conf, entryName, null, cf, cq, user, hasFilterUser);
572    List<UserPermission> perms = new ArrayList<>();
573    for (Map.Entry<String, UserPermission> entry : allPerms.entries()) {
574      perms.add(entry.getValue());
575    }
576    return perms;
577  }
578
579  /**
580   * Parse and filter permission based on the specified column family, column qualifier and user
581   * name.
582   */
583  private static ListMultimap<String, UserPermission> parsePermissions(byte[] entryName,
584    Result result, byte[] cf, byte[] cq, String user, boolean hasFilterUser) {
585    ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
586    if (result != null && result.size() > 0) {
587      for (Cell kv : result.rawCells()) {
588        Pair<String, Permission> permissionsOfUserOnTable =
589          parsePermissionRecord(entryName, kv, cf, cq, hasFilterUser, user);
590
591        if (permissionsOfUserOnTable != null) {
592          String username = permissionsOfUserOnTable.getFirst();
593          Permission permission = permissionsOfUserOnTable.getSecond();
594          perms.put(username, new UserPermission(username, permission));
595        }
596      }
597    }
598    return perms;
599  }
600
601  private static Pair<String, Permission> parsePermissionRecord(byte[] entryName, Cell kv,
602    byte[] cf, byte[] cq, boolean filterPerms, String filterUser) {
603    // return X given a set of permissions encoded in the permissionRecord kv.
604    byte[] family = CellUtil.cloneFamily(kv);
605    if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
606      return null;
607    }
608
609    byte[] key = CellUtil.cloneQualifier(kv);
610    byte[] value = CellUtil.cloneValue(kv);
611    if (LOG.isDebugEnabled()) {
612      LOG.debug("Read acl: entry[" + Bytes.toStringBinary(entryName) + "], kv ["
613        + Bytes.toStringBinary(key) + ": " + Bytes.toStringBinary(value) + "]");
614    }
615
616    // check for a column family appended to the key
617    // TODO: avoid the string conversion to make this more efficient
618    String username = Bytes.toString(key);
619
620    // Retrieve group list for the filterUser if cell key is a group.
621    // Group list is not required when filterUser itself a group
622    List<String> filterUserGroups = null;
623    if (filterPerms) {
624      if (
625        username.charAt(0) == '@' && !StringUtils.isEmpty(filterUser) && filterUser.charAt(0) != '@'
626      ) {
627        filterUserGroups = AccessChecker.getUserGroups(filterUser);
628      }
629    }
630
631    // Handle namespace entry
632    if (isNamespaceEntry(entryName)) {
633      // Filter the permissions cell record if client query
634      if (filterPerms && !validateFilterUser(username, filterUser, filterUserGroups)) {
635        return null;
636      }
637
638      return new Pair<>(username, Permission
639        .newBuilder(Bytes.toString(fromNamespaceEntry(entryName))).withActionCodes(value).build());
640    }
641
642    // Handle global entry
643    if (isGlobalEntry(entryName)) {
644      // Filter the permissions cell record if client query
645      if (filterPerms && !validateFilterUser(username, filterUser, filterUserGroups)) {
646        return null;
647      }
648
649      return new Pair<>(username, Permission.newBuilder().withActionCodes(value).build());
650    }
651
652    // Handle table entry
653    int idx = username.indexOf(ACL_KEY_DELIMITER);
654    byte[] permFamily = null;
655    byte[] permQualifier = null;
656    if (idx > 0 && idx < username.length() - 1) {
657      String remainder = username.substring(idx + 1);
658      username = username.substring(0, idx);
659      idx = remainder.indexOf(ACL_KEY_DELIMITER);
660      if (idx > 0 && idx < remainder.length() - 1) {
661        permFamily = Bytes.toBytes(remainder.substring(0, idx));
662        permQualifier = Bytes.toBytes(remainder.substring(idx + 1));
663      } else {
664        permFamily = Bytes.toBytes(remainder);
665      }
666    }
667
668    // Filter the permissions cell record if client query
669    if (filterPerms) {
670      // ACL table contain 3 types of cell key entries; hbase:Acl, namespace and table. So to filter
671      // the permission cell records additional validations are required at CF, CQ and username.
672      // Here we can proceed based on client input whether it contain filterUser.
673      // Validate the filterUser when specified
674      if (filterUser != null && !validateFilterUser(username, filterUser, filterUserGroups)) {
675        return null;
676      }
677      if (!validateCFAndCQ(permFamily, cf, permQualifier, cq)) {
678        return null;
679      }
680    }
681
682    return new Pair<>(username, Permission.newBuilder(TableName.valueOf(entryName))
683      .withFamily(permFamily).withQualifier(permQualifier).withActionCodes(value).build());
684  }
685
686  /*
687   * Validate the cell key with the client filterUser if specified in the query input. 1. If cell
688   * key (username) is not a group then check whether client filterUser is equal to username 2. If
689   * cell key (username) is a group then check whether client filterUser belongs to the cell key
690   * group (username) 3. In case when both filterUser and username are group names then cell will be
691   * filtered if not equal.
692   */
693  private static boolean validateFilterUser(String username, String filterUser,
694    List<String> filterUserGroups) {
695    if (filterUserGroups == null) {
696      // Validate user name or group names whether equal
697      if (filterUser.equals(username)) {
698        return true;
699      }
700    } else {
701      // Check whether filter user belongs to the cell key group.
702      return filterUserGroups.contains(username.substring(1));
703    }
704    return false;
705  }
706
707  /*
708   * Validate the cell with client CF and CQ if specified in the query input. 1. If CF is NULL, then
709   * no need of further validation, result should include all CF and CQ. 2. IF CF specified and
710   * equal then validation required at CQ level if CF specified in client input, otherwise return
711   * all CQ records.
712   */
713  private static boolean validateCFAndCQ(byte[] permFamily, byte[] cf, byte[] permQualifier,
714    byte[] cq) {
715    boolean include = true;
716    if (cf != null) {
717      if (Bytes.equals(cf, permFamily)) {
718        if (cq != null && !Bytes.equals(cq, permQualifier)) {
719          // if CQ specified and didn't match then ignore this cell
720          include = false;
721        }
722      } else {
723        // if CF specified and didn't match then ignore this cell
724        include = false;
725      }
726    }
727    return include;
728  }
729
730  /**
731   * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances and returns the
732   * resulting byte array. Writes a set of permission [user: table permission]
733   */
734  public static byte[] writePermissionsAsBytes(ListMultimap<String, UserPermission> perms,
735    Configuration conf) {
736    return ProtobufUtil
737      .prependPBMagic(AccessControlUtil.toUserTablePermissions(perms).toByteArray());
738  }
739
740  // This is part of the old HbaseObjectWritableFor96Migration.
741  private static final int LIST_CODE = 61;
742
743  private static final int WRITABLE_CODE = 14;
744
745  private static final int WRITABLE_NOT_ENCODED = 0;
746
747  private static List<Permission> readWritableUserPermission(DataInput in, Configuration conf)
748    throws IOException, ClassNotFoundException {
749    assert WritableUtils.readVInt(in) == LIST_CODE;
750    int length = in.readInt();
751    List<Permission> list = new ArrayList<>(length);
752    for (int i = 0; i < length; i++) {
753      assert WritableUtils.readVInt(in) == WRITABLE_CODE;
754      assert WritableUtils.readVInt(in) == WRITABLE_NOT_ENCODED;
755      String className = Text.readString(in);
756      Class<? extends Writable> clazz = conf.getClassByName(className).asSubclass(Writable.class);
757      Writable instance = WritableFactories.newInstance(clazz, conf);
758      instance.readFields(in);
759      list.add((Permission) instance);
760    }
761    return list;
762  }
763
764  public static ListMultimap<String, UserPermission> readUserPermission(byte[] data,
765    Configuration conf) throws DeserializationException {
766    if (ProtobufUtil.isPBMagicPrefix(data)) {
767      int pblen = ProtobufUtil.lengthOfPBMagic();
768      try {
769        AccessControlProtos.UsersAndPermissions.Builder builder =
770          AccessControlProtos.UsersAndPermissions.newBuilder();
771        ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
772        return AccessControlUtil.toUserPermission(builder.build());
773      } catch (IOException e) {
774        throw new DeserializationException(e);
775      }
776    } else {
777      // TODO: We have to re-write non-PB data as PB encoded. Otherwise we will carry old Writables
778      // forever (here and a couple of other places).
779      ListMultimap<String, UserPermission> userPermission = ArrayListMultimap.create();
780      try {
781        DataInput in = new DataInputStream(new ByteArrayInputStream(data));
782        int length = in.readInt();
783        for (int i = 0; i < length; i++) {
784          String user = Text.readString(in);
785          List<Permission> perms = readWritableUserPermission(in, conf);
786          for (Permission p : perms) {
787            userPermission.put(user, new UserPermission(user, p));
788          }
789        }
790      } catch (IOException | ClassNotFoundException e) {
791        throw new DeserializationException(e);
792      }
793      return userPermission;
794    }
795  }
796
797  public static ListMultimap<String, Permission> readPermissions(byte[] data, Configuration conf)
798    throws DeserializationException {
799    if (ProtobufUtil.isPBMagicPrefix(data)) {
800      int pblen = ProtobufUtil.lengthOfPBMagic();
801      try {
802        AccessControlProtos.UsersAndPermissions.Builder builder =
803          AccessControlProtos.UsersAndPermissions.newBuilder();
804        ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
805        return AccessControlUtil.toPermission(builder.build());
806      } catch (IOException e) {
807        throw new DeserializationException(e);
808      }
809    } else {
810      // TODO: We have to re-write non-PB data as PB encoded. Otherwise we will carry old Writables
811      // forever (here and a couple of other places).
812      ListMultimap<String, Permission> perms = ArrayListMultimap.create();
813      try {
814        DataInput in = new DataInputStream(new ByteArrayInputStream(data));
815        int length = in.readInt();
816        for (int i = 0; i < length; i++) {
817          String user = Text.readString(in);
818          perms.putAll(user, readWritableUserPermission(in, conf));
819        }
820      } catch (IOException | ClassNotFoundException e) {
821        throw new DeserializationException(e);
822      }
823      return perms;
824    }
825  }
826
827  public static boolean isGlobalEntry(byte[] entryName) {
828    return Bytes.equals(entryName, ACL_GLOBAL_NAME);
829  }
830
831  public static boolean isNamespaceEntry(String entryName) {
832    return isNamespaceEntry(Bytes.toBytes(entryName));
833  }
834
835  public static boolean isNamespaceEntry(byte[] entryName) {
836    return entryName != null && entryName.length != 0 && entryName[0] == NAMESPACE_PREFIX;
837  }
838
839  public static boolean isTableEntry(byte[] entryName) {
840    return !isNamespaceEntry(entryName) && !isGlobalEntry(entryName) && entryName != null;
841  }
842
843  public static String toNamespaceEntry(String namespace) {
844    return NAMESPACE_PREFIX + namespace;
845  }
846
847  public static String fromNamespaceEntry(String namespace) {
848    if (namespace.charAt(0) != NAMESPACE_PREFIX) {
849      throw new IllegalArgumentException("Argument is not a valid namespace entry");
850    }
851    return namespace.substring(1);
852  }
853
854  public static byte[] toNamespaceEntry(byte[] namespace) {
855    byte[] ret = new byte[namespace.length + 1];
856    ret[0] = NAMESPACE_PREFIX;
857    System.arraycopy(namespace, 0, ret, 1, namespace.length);
858    return ret;
859  }
860
861  public static byte[] fromNamespaceEntry(byte[] namespace) {
862    if (namespace[0] != NAMESPACE_PREFIX) {
863      throw new IllegalArgumentException(
864        "Argument is not a valid namespace entry: " + Bytes.toString(namespace));
865    }
866    return Arrays.copyOfRange(namespace, 1, namespace.length);
867  }
868
869  public static List<Permission> getCellPermissionsForUser(User user, Cell cell)
870    throws IOException {
871    // Save an object allocation where we can
872    if (cell.getTagsLength() == 0) {
873      return null;
874    }
875    List<Permission> results = Lists.newArrayList();
876    Iterator<Tag> tagsIterator = PrivateCellUtil.tagsIterator(cell);
877    while (tagsIterator.hasNext()) {
878      Tag tag = tagsIterator.next();
879      if (tag.getType() == ACL_TAG_TYPE) {
880        // Deserialize the table permissions from the KV
881        // TODO: This can be improved. Don't build UsersAndPermissions just to unpack it again,
882        // use the builder
883        AccessControlProtos.UsersAndPermissions.Builder builder =
884          AccessControlProtos.UsersAndPermissions.newBuilder();
885        if (tag.hasArray()) {
886          ProtobufUtil.mergeFrom(builder, tag.getValueArray(), tag.getValueOffset(),
887            tag.getValueLength());
888        } else {
889          ProtobufUtil.mergeFrom(builder, Tag.cloneValue(tag));
890        }
891        ListMultimap<String, Permission> kvPerms =
892          AccessControlUtil.toUsersAndPermissions(builder.build());
893        // Are there permissions for this user?
894        List<Permission> userPerms = kvPerms.get(user.getShortName());
895        if (userPerms != null) {
896          results.addAll(userPerms);
897        }
898        // Are there permissions for any of the groups this user belongs to?
899        String[] groupNames = user.getGroupNames();
900        if (groupNames != null) {
901          for (String group : groupNames) {
902            List<Permission> groupPerms = kvPerms.get(AuthUtil.toGroupEntry(group));
903            if (results != null) {
904              results.addAll(groupPerms);
905            }
906          }
907        }
908      }
909    }
910    return results;
911  }
912}