001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.access;
019
020import java.io.ByteArrayInputStream;
021import java.io.DataInput;
022import java.io.DataInputStream;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.TreeMap;
031import java.util.TreeSet;
032import org.apache.commons.lang3.StringUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.AuthUtil;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.Cell.Type;
037import org.apache.hadoop.hbase.CellBuilderFactory;
038import org.apache.hadoop.hbase.CellBuilderType;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.CompareOperator;
041import org.apache.hadoop.hbase.ExtendedCell;
042import org.apache.hadoop.hbase.NamespaceDescriptor;
043import org.apache.hadoop.hbase.PrivateCellUtil;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.Tag;
046import org.apache.hadoop.hbase.TagType;
047import org.apache.hadoop.hbase.client.Connection;
048import org.apache.hadoop.hbase.client.ConnectionFactory;
049import org.apache.hadoop.hbase.client.Delete;
050import org.apache.hadoop.hbase.client.Get;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.Result;
053import org.apache.hadoop.hbase.client.ResultScanner;
054import org.apache.hadoop.hbase.client.Scan;
055import org.apache.hadoop.hbase.client.Table;
056import org.apache.hadoop.hbase.client.TableDescriptor;
057import org.apache.hadoop.hbase.exceptions.DeserializationException;
058import org.apache.hadoop.hbase.filter.QualifierFilter;
059import org.apache.hadoop.hbase.filter.RegexStringComparator;
060import org.apache.hadoop.hbase.regionserver.InternalScanner;
061import org.apache.hadoop.hbase.regionserver.Region;
062import org.apache.hadoop.hbase.security.User;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.Pair;
065import org.apache.hadoop.io.Text;
066import org.apache.hadoop.io.Writable;
067import org.apache.hadoop.io.WritableFactories;
068import org.apache.hadoop.io.WritableUtils;
069import org.apache.yetus.audience.InterfaceAudience;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
074import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
075import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
076
077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
079
080/**
081 * Maintains lists of permission grants to users and groups to allow for authorization checks by
082 * {@link AccessController}.
083 * <p>
084 * Access control lists are stored in an "internal" metadata table named {@code _acl_}. Each table's
085 * permission grants are stored as a separate row, keyed by the table name. KeyValues for
086 * permissions assignments are stored in one of the formats:
087 *
088 * <pre>
089 * Key                      Desc
090 * --------                 --------
091 * user                     table level permissions for a user [R=read, W=write]
092 * group                    table level permissions for a group
093 * user,family              column family level permissions for a user
094 * group,family             column family level permissions for a group
095 * user,family,qualifier    column qualifier level permissions for a user
096 * group,family,qualifier   column qualifier level permissions for a group
097 * </pre>
098 * <p>
099 * All values are encoded as byte arrays containing the codes from the
100 * org.apache.hadoop.hbase.security.access.TablePermission.Action enum.
101 * </p>
102 */
103@InterfaceAudience.Private
104public final class PermissionStorage {
105  /** Internal storage table for access control lists */
106  public static final TableName ACL_TABLE_NAME =
107    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
108  public static final byte[] ACL_GLOBAL_NAME = ACL_TABLE_NAME.getName();
109  /** Column family used to store ACL grants */
110  public static final String ACL_LIST_FAMILY_STR = "l";
111  public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
112  /** KV tag to store per cell access control lists */
113  public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE;
114
115  public static final char NAMESPACE_PREFIX = '@';
116
117  /**
118   * Delimiter to separate user, column family, and qualifier in _acl_ table info: column keys
119   */
120  public static final char ACL_KEY_DELIMITER = ',';
121
122  private static final Logger LOG = LoggerFactory.getLogger(PermissionStorage.class);
123
124  private PermissionStorage() {
125  }
126
127  /**
128   * Stores a new user permission grant in the access control lists table.
129   * @param conf     the configuration
130   * @param userPerm the details of the permission to be granted
131   * @param t        acl table instance. It is closed upon method return.
132   * @throws IOException in the case of an error accessing the metadata table
133   */
134  public static void addUserPermission(Configuration conf, UserPermission userPerm, Table t,
135    boolean mergeExistingPermissions) throws IOException {
136    Permission permission = userPerm.getPermission();
137    Permission.Action[] actions = permission.getActions();
138    byte[] rowKey = userPermissionRowKey(permission);
139    Put p = new Put(rowKey);
140    byte[] key = userPermissionKey(userPerm);
141
142    if ((actions == null) || (actions.length == 0)) {
143      String msg = "No actions associated with user '" + userPerm.getUser() + "'";
144      LOG.warn(msg);
145      throw new IOException(msg);
146    }
147
148    Set<Permission.Action> actionSet = new TreeSet<Permission.Action>();
149    if (mergeExistingPermissions) {
150      List<UserPermission> perms = getUserPermissions(conf, rowKey, null, null, null, false);
151      UserPermission currentPerm = null;
152      for (UserPermission perm : perms) {
153        if (userPerm.equalsExceptActions(perm)) {
154          currentPerm = perm;
155          break;
156        }
157      }
158
159      if (currentPerm != null && currentPerm.getPermission().getActions() != null) {
160        actionSet.addAll(Arrays.asList(currentPerm.getPermission().getActions()));
161      }
162    }
163
164    // merge current action with new action.
165    actionSet.addAll(Arrays.asList(actions));
166
167    // serialize to byte array.
168    byte[] value = new byte[actionSet.size()];
169    int index = 0;
170    for (Permission.Action action : actionSet) {
171      value[index++] = action.code();
172    }
173    p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(p.getRow())
174      .setFamily(ACL_LIST_FAMILY).setQualifier(key).setTimestamp(p.getTimestamp()).setType(Type.Put)
175      .setValue(value).build());
176    if (LOG.isDebugEnabled()) {
177      LOG.debug("Writing permission with rowKey " + Bytes.toString(rowKey) + " "
178        + Bytes.toString(key) + ": " + Bytes.toStringBinary(value));
179    }
180    try {
181      t.put(p);
182    } finally {
183      t.close();
184    }
185  }
186
187  static void addUserPermission(Configuration conf, UserPermission userPerm, Table t)
188    throws IOException {
189    addUserPermission(conf, userPerm, t, false);
190  }
191
192  /**
193   * Removes a previously granted permission from the stored access control lists. The
194   * {@link TablePermission} being removed must exactly match what is stored -- no wildcard matching
195   * is attempted. Ie, if user "bob" has been granted "READ" access to the "data" table, but only to
196   * column family plus qualifier "info:colA", then trying to call this method with only user "bob"
197   * and the table name "data" (but without specifying the column qualifier "info:colA") will have
198   * no effect.
199   * @param conf     the configuration
200   * @param userPerm the details of the permission to be revoked
201   * @param t        acl table
202   * @throws IOException if there is an error accessing the metadata table
203   */
204  public static void removeUserPermission(Configuration conf, UserPermission userPerm, Table t)
205    throws IOException {
206    if (
207      null == userPerm.getPermission().getActions()
208        || userPerm.getPermission().getActions().length == 0
209    ) {
210      removePermissionRecord(conf, userPerm, t);
211    } else {
212      // Get all the global user permissions from the acl table
213      List<UserPermission> permsList = getUserPermissions(conf,
214        userPermissionRowKey(userPerm.getPermission()), null, null, null, false);
215      List<Permission.Action> remainingActions = new ArrayList<>();
216      List<Permission.Action> dropActions = Arrays.asList(userPerm.getPermission().getActions());
217      for (UserPermission perm : permsList) {
218        // Find the user and remove only the requested permissions
219        if (perm.getUser().equals(userPerm.getUser())) {
220          for (Permission.Action oldAction : perm.getPermission().getActions()) {
221            if (!dropActions.contains(oldAction)) {
222              remainingActions.add(oldAction);
223            }
224          }
225          if (!remainingActions.isEmpty()) {
226            perm.getPermission()
227              .setActions(remainingActions.toArray(new Permission.Action[remainingActions.size()]));
228            addUserPermission(conf, perm, t);
229          } else {
230            removePermissionRecord(conf, userPerm, t);
231          }
232          break;
233        }
234      }
235    }
236    if (LOG.isDebugEnabled()) {
237      LOG.debug("Removed permission " + userPerm.toString());
238    }
239  }
240
241  private static void removePermissionRecord(Configuration conf, UserPermission userPerm, Table t)
242    throws IOException {
243    Delete d = new Delete(userPermissionRowKey(userPerm.getPermission()));
244    d.addColumns(ACL_LIST_FAMILY, userPermissionKey(userPerm));
245    try {
246      t.delete(d);
247    } finally {
248      t.close();
249    }
250  }
251
252  /**
253   * Remove specified table from the _acl_ table.
254   */
255  static void removeTablePermissions(Configuration conf, TableName tableName, Table t)
256    throws IOException {
257    Delete d = new Delete(tableName.getName());
258    d.addFamily(ACL_LIST_FAMILY);
259
260    if (LOG.isDebugEnabled()) {
261      LOG.debug("Removing permissions of removed table " + tableName);
262    }
263    try {
264      t.delete(d);
265    } finally {
266      t.close();
267    }
268  }
269
270  /**
271   * Remove specified namespace from the acl table.
272   */
273  static void removeNamespacePermissions(Configuration conf, String namespace, Table t)
274    throws IOException {
275    Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
276    d.addFamily(ACL_LIST_FAMILY);
277    if (LOG.isDebugEnabled()) {
278      LOG.debug("Removing permissions of removed namespace " + namespace);
279    }
280
281    try {
282      t.delete(d);
283    } finally {
284      t.close();
285    }
286  }
287
288  static private void removeTablePermissions(TableName tableName, byte[] column, Table table,
289    boolean closeTable) throws IOException {
290    Scan scan = new Scan();
291    scan.addFamily(ACL_LIST_FAMILY);
292
293    String columnName = Bytes.toString(column);
294    scan.setFilter(new QualifierFilter(CompareOperator.EQUAL,
295      new RegexStringComparator(String.format("(%s%s%s)|(%s%s)$", ACL_KEY_DELIMITER, columnName,
296        ACL_KEY_DELIMITER, ACL_KEY_DELIMITER, columnName))));
297
298    Set<byte[]> qualifierSet = new TreeSet<>(Bytes.BYTES_COMPARATOR);
299    ResultScanner scanner = null;
300    try {
301      scanner = table.getScanner(scan);
302      for (Result res : scanner) {
303        for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
304          qualifierSet.add(q);
305        }
306      }
307
308      if (qualifierSet.size() > 0) {
309        Delete d = new Delete(tableName.getName());
310        for (byte[] qualifier : qualifierSet) {
311          d.addColumns(ACL_LIST_FAMILY, qualifier);
312        }
313        table.delete(d);
314      }
315    } finally {
316      if (scanner != null) {
317        scanner.close();
318      }
319      if (closeTable) {
320        table.close();
321      }
322    }
323  }
324
325  /**
326   * Remove specified table column from the acl table.
327   */
328  static void removeTablePermissions(Configuration conf, TableName tableName, byte[] column,
329    Table t) throws IOException {
330    if (LOG.isDebugEnabled()) {
331      LOG.debug("Removing permissions of removed column " + Bytes.toString(column) + " from table "
332        + tableName);
333    }
334    removeTablePermissions(tableName, column, t, true);
335  }
336
337  static byte[] userPermissionRowKey(Permission permission) {
338    byte[] row;
339    if (permission instanceof TablePermission) {
340      TablePermission tablePerm = (TablePermission) permission;
341      row = tablePerm.getTableName().getName();
342    } else if (permission instanceof NamespacePermission) {
343      NamespacePermission nsPerm = (NamespacePermission) permission;
344      row = Bytes.toBytes(toNamespaceEntry(nsPerm.getNamespace()));
345    } else {
346      // permission instanceof TablePermission
347      row = ACL_GLOBAL_NAME;
348    }
349    return row;
350  }
351
352  /**
353   * Build qualifier key from user permission: username username,family username,family,qualifier
354   */
355  static byte[] userPermissionKey(UserPermission permission) {
356    byte[] key = Bytes.toBytes(permission.getUser());
357    byte[] qualifier = null;
358    byte[] family = null;
359    if (permission.getPermission().getAccessScope() == Permission.Scope.TABLE) {
360      TablePermission tablePermission = (TablePermission) permission.getPermission();
361      family = tablePermission.getFamily();
362      qualifier = tablePermission.getQualifier();
363    }
364
365    if (family != null && family.length > 0) {
366      key = Bytes.add(key, Bytes.add(new byte[] { ACL_KEY_DELIMITER }, family));
367      if (qualifier != null && qualifier.length > 0) {
368        key = Bytes.add(key, Bytes.add(new byte[] { ACL_KEY_DELIMITER }, qualifier));
369      }
370    }
371
372    return key;
373  }
374
375  /**
376   * Returns {@code true} if the given region is part of the {@code _acl_} metadata table.
377   */
378  static boolean isAclRegion(Region region) {
379    return ACL_TABLE_NAME.equals(region.getTableDescriptor().getTableName());
380  }
381
382  /**
383   * Returns {@code true} if the given table is {@code _acl_} metadata table.
384   */
385  static boolean isAclTable(TableDescriptor desc) {
386    return ACL_TABLE_NAME.equals(desc.getTableName());
387  }
388
389  /**
390   * Loads all of the permission grants stored in a region of the {@code _acl_} table.
391   * @param aclRegion the acl region
392   * @return a map of the permissions for this table.
393   * @throws IOException if an error occurs
394   */
395  static Map<byte[], ListMultimap<String, UserPermission>> loadAll(Region aclRegion)
396    throws IOException {
397    if (!isAclRegion(aclRegion)) {
398      throw new IOException("Can only load permissions from " + ACL_TABLE_NAME);
399    }
400
401    Map<byte[], ListMultimap<String, UserPermission>> allPerms =
402      new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
403
404    // do a full scan of _acl_ table
405
406    Scan scan = new Scan();
407    scan.addFamily(ACL_LIST_FAMILY);
408
409    InternalScanner iScanner = null;
410    try {
411      iScanner = aclRegion.getScanner(scan);
412
413      while (true) {
414        List<Cell> row = new ArrayList<>();
415
416        boolean hasNext = iScanner.next(row);
417        ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
418        byte[] entry = null;
419        for (Cell kv : row) {
420          if (entry == null) {
421            entry = CellUtil.cloneRow(kv);
422          }
423          Pair<String, Permission> permissionsOfUserOnTable =
424            parsePermissionRecord(entry, kv, null, null, false, null);
425          if (permissionsOfUserOnTable != null) {
426            String username = permissionsOfUserOnTable.getFirst();
427            Permission permission = permissionsOfUserOnTable.getSecond();
428            perms.put(username, new UserPermission(username, permission));
429          }
430        }
431        if (entry != null) {
432          allPerms.put(entry, perms);
433        }
434        if (!hasNext) {
435          break;
436        }
437      }
438    } finally {
439      if (iScanner != null) {
440        iScanner.close();
441      }
442    }
443
444    return allPerms;
445  }
446
447  /**
448   * Load all permissions from the region server holding {@code _acl_}, primarily intended for
449   * testing purposes.
450   */
451  static Map<byte[], ListMultimap<String, UserPermission>> loadAll(Configuration conf)
452    throws IOException {
453    Map<byte[], ListMultimap<String, UserPermission>> allPerms =
454      new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
455
456    // do a full scan of _acl_, filtering on only first table region rows
457
458    Scan scan = new Scan();
459    scan.addFamily(ACL_LIST_FAMILY);
460
461    ResultScanner scanner = null;
462    // TODO: Pass in a Connection rather than create one each time.
463    try (Connection connection = ConnectionFactory.createConnection(conf)) {
464      try (Table table = connection.getTable(ACL_TABLE_NAME)) {
465        scanner = table.getScanner(scan);
466        try {
467          for (Result row : scanner) {
468            ListMultimap<String, UserPermission> resultPerms =
469              parsePermissions(row.getRow(), row, null, null, null, false);
470            allPerms.put(row.getRow(), resultPerms);
471          }
472        } finally {
473          if (scanner != null) {
474            scanner.close();
475          }
476        }
477      }
478    }
479
480    return allPerms;
481  }
482
483  public static ListMultimap<String, UserPermission> getTablePermissions(Configuration conf,
484    TableName tableName) throws IOException {
485    return getTablePermissions(conf, tableName, null);
486  }
487
488  public static ListMultimap<String, UserPermission> getTablePermissions(Configuration conf,
489    TableName tableName, Table t) throws IOException {
490    return getPermissions(conf, tableName != null ? tableName.getName() : null, t, null, null, null,
491      false);
492  }
493
494  public static ListMultimap<String, UserPermission> getNamespacePermissions(Configuration conf,
495    String namespace) throws IOException {
496    return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), null, null, null, null,
497      false);
498  }
499
500  public static ListMultimap<String, UserPermission> getGlobalPermissions(Configuration conf)
501    throws IOException {
502    return getPermissions(conf, null, null, null, null, null, false);
503  }
504
505  /**
506   * Reads user permission assignments stored in the <code>l:</code> column family of the first
507   * table row in <code>_acl_</code>.
508   * <p>
509   * See {@link PermissionStorage class documentation} for the key structure used for storage.
510   * </p>
511   */
512  static ListMultimap<String, UserPermission> getPermissions(Configuration conf, byte[] entryName,
513    Table t, byte[] cf, byte[] cq, String user, boolean hasFilterUser) throws IOException {
514    if (entryName == null) {
515      entryName = ACL_GLOBAL_NAME;
516    }
517    // for normal user tables, we just read the table row from _acl_
518    ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
519    Get get = new Get(entryName);
520    get.addFamily(ACL_LIST_FAMILY);
521    Result row = null;
522    if (t == null) {
523      try (Connection connection = ConnectionFactory.createConnection(conf)) {
524        try (Table table = connection.getTable(ACL_TABLE_NAME)) {
525          row = table.get(get);
526        }
527      }
528    } else {
529      row = t.get(get);
530    }
531    if (!row.isEmpty()) {
532      perms = parsePermissions(entryName, row, cf, cq, user, hasFilterUser);
533    } else {
534      LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
535        + Bytes.toString(entryName));
536    }
537
538    return perms;
539  }
540
541  /**
542   * Returns the currently granted permissions for a given table as the specified user plus
543   * associated permissions.
544   */
545  public static List<UserPermission> getUserTablePermissions(Configuration conf,
546    TableName tableName, byte[] cf, byte[] cq, String userName, boolean hasFilterUser)
547    throws IOException {
548    return getUserPermissions(conf, tableName == null ? null : tableName.getName(), cf, cq,
549      userName, hasFilterUser);
550  }
551
552  /**
553   * Returns the currently granted permissions for a given namespace as the specified user plus
554   * associated permissions.
555   */
556  public static List<UserPermission> getUserNamespacePermissions(Configuration conf,
557    String namespace, String user, boolean hasFilterUser) throws IOException {
558    return getUserPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), null, null, user,
559      hasFilterUser);
560  }
561
562  /**
563   * Returns the currently granted permissions for a given table/namespace with associated
564   * permissions based on the specified column family, column qualifier and user name.
565   * @param conf          the configuration
566   * @param entryName     Table name or the namespace
567   * @param cf            Column family
568   * @param cq            Column qualifier
569   * @param user          User name to be filtered from permission as requested
570   * @param hasFilterUser true if filter user is provided, otherwise false.
571   * @return List of UserPermissions
572   * @throws IOException on failure
573   */
574  public static List<UserPermission> getUserPermissions(Configuration conf, byte[] entryName,
575    byte[] cf, byte[] cq, String user, boolean hasFilterUser) throws IOException {
576    ListMultimap<String, UserPermission> allPerms =
577      getPermissions(conf, entryName, null, cf, cq, user, hasFilterUser);
578    List<UserPermission> perms = new ArrayList<>();
579    for (Map.Entry<String, UserPermission> entry : allPerms.entries()) {
580      perms.add(entry.getValue());
581    }
582    return perms;
583  }
584
585  /**
586   * Parse and filter permission based on the specified column family, column qualifier and user
587   * name.
588   */
589  private static ListMultimap<String, UserPermission> parsePermissions(byte[] entryName,
590    Result result, byte[] cf, byte[] cq, String user, boolean hasFilterUser) {
591    ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
592    if (result != null && result.size() > 0) {
593      for (Cell kv : result.rawCells()) {
594        Pair<String, Permission> permissionsOfUserOnTable =
595          parsePermissionRecord(entryName, kv, cf, cq, hasFilterUser, user);
596
597        if (permissionsOfUserOnTable != null) {
598          String username = permissionsOfUserOnTable.getFirst();
599          Permission permission = permissionsOfUserOnTable.getSecond();
600          perms.put(username, new UserPermission(username, permission));
601        }
602      }
603    }
604    return perms;
605  }
606
607  private static Pair<String, Permission> parsePermissionRecord(byte[] entryName, Cell kv,
608    byte[] cf, byte[] cq, boolean filterPerms, String filterUser) {
609    // return X given a set of permissions encoded in the permissionRecord kv.
610    byte[] family = CellUtil.cloneFamily(kv);
611    if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
612      return null;
613    }
614
615    byte[] key = CellUtil.cloneQualifier(kv);
616    byte[] value = CellUtil.cloneValue(kv);
617    if (LOG.isDebugEnabled()) {
618      LOG.debug("Read acl: entry[" + Bytes.toStringBinary(entryName) + "], kv ["
619        + Bytes.toStringBinary(key) + ": " + Bytes.toStringBinary(value) + "]");
620    }
621
622    // check for a column family appended to the key
623    // TODO: avoid the string conversion to make this more efficient
624    String username = Bytes.toString(key);
625
626    // Retrieve group list for the filterUser if cell key is a group.
627    // Group list is not required when filterUser itself a group
628    List<String> filterUserGroups = null;
629    if (filterPerms) {
630      if (
631        username.charAt(0) == '@' && !StringUtils.isEmpty(filterUser) && filterUser.charAt(0) != '@'
632      ) {
633        filterUserGroups = AccessChecker.getUserGroups(filterUser);
634      }
635    }
636
637    // Handle namespace entry
638    if (isNamespaceEntry(entryName)) {
639      // Filter the permissions cell record if client query
640      if (filterPerms && !validateFilterUser(username, filterUser, filterUserGroups)) {
641        return null;
642      }
643
644      return new Pair<>(username, Permission
645        .newBuilder(Bytes.toString(fromNamespaceEntry(entryName))).withActionCodes(value).build());
646    }
647
648    // Handle global entry
649    if (isGlobalEntry(entryName)) {
650      // Filter the permissions cell record if client query
651      if (filterPerms && !validateFilterUser(username, filterUser, filterUserGroups)) {
652        return null;
653      }
654
655      return new Pair<>(username, Permission.newBuilder().withActionCodes(value).build());
656    }
657
658    // Handle table entry
659    int idx = username.indexOf(ACL_KEY_DELIMITER);
660    byte[] permFamily = null;
661    byte[] permQualifier = null;
662    if (idx > 0 && idx < username.length() - 1) {
663      String remainder = username.substring(idx + 1);
664      username = username.substring(0, idx);
665      idx = remainder.indexOf(ACL_KEY_DELIMITER);
666      if (idx > 0 && idx < remainder.length() - 1) {
667        permFamily = Bytes.toBytes(remainder.substring(0, idx));
668        permQualifier = Bytes.toBytes(remainder.substring(idx + 1));
669      } else {
670        permFamily = Bytes.toBytes(remainder);
671      }
672    }
673
674    // Filter the permissions cell record if client query
675    if (filterPerms) {
676      // ACL table contain 3 types of cell key entries; hbase:Acl, namespace and table. So to filter
677      // the permission cell records additional validations are required at CF, CQ and username.
678      // Here we can proceed based on client input whether it contain filterUser.
679      // Validate the filterUser when specified
680      if (filterUser != null && !validateFilterUser(username, filterUser, filterUserGroups)) {
681        return null;
682      }
683      if (!validateCFAndCQ(permFamily, cf, permQualifier, cq)) {
684        return null;
685      }
686    }
687
688    return new Pair<>(username, Permission.newBuilder(TableName.valueOf(entryName))
689      .withFamily(permFamily).withQualifier(permQualifier).withActionCodes(value).build());
690  }
691
692  /*
693   * Validate the cell key with the client filterUser if specified in the query input. 1. If cell
694   * key (username) is not a group then check whether client filterUser is equal to username 2. If
695   * cell key (username) is a group then check whether client filterUser belongs to the cell key
696   * group (username) 3. In case when both filterUser and username are group names then cell will be
697   * filtered if not equal.
698   */
699  private static boolean validateFilterUser(String username, String filterUser,
700    List<String> filterUserGroups) {
701    if (filterUserGroups == null) {
702      // Validate user name or group names whether equal
703      if (filterUser.equals(username)) {
704        return true;
705      }
706    } else {
707      // Check whether filter user belongs to the cell key group.
708      return filterUserGroups.contains(username.substring(1));
709    }
710    return false;
711  }
712
713  /*
714   * Validate the cell with client CF and CQ if specified in the query input. 1. If CF is NULL, then
715   * no need of further validation, result should include all CF and CQ. 2. IF CF specified and
716   * equal then validation required at CQ level if CF specified in client input, otherwise return
717   * all CQ records.
718   */
719  private static boolean validateCFAndCQ(byte[] permFamily, byte[] cf, byte[] permQualifier,
720    byte[] cq) {
721    boolean include = true;
722    if (cf != null) {
723      if (Bytes.equals(cf, permFamily)) {
724        if (cq != null && !Bytes.equals(cq, permQualifier)) {
725          // if CQ specified and didn't match then ignore this cell
726          include = false;
727        }
728      } else {
729        // if CF specified and didn't match then ignore this cell
730        include = false;
731      }
732    }
733    return include;
734  }
735
736  /**
737   * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances and returns the
738   * resulting byte array. Writes a set of permission [user: table permission]
739   */
740  public static byte[] writePermissionsAsBytes(ListMultimap<String, UserPermission> perms,
741    Configuration conf) {
742    return ProtobufUtil
743      .prependPBMagic(AccessControlUtil.toUserTablePermissions(perms).toByteArray());
744  }
745
746  // This is part of the old HbaseObjectWritableFor96Migration.
747  private static final int LIST_CODE = 61;
748
749  private static final int WRITABLE_CODE = 14;
750
751  private static final int WRITABLE_NOT_ENCODED = 0;
752
753  private static List<Permission> readWritableUserPermission(DataInput in, Configuration conf)
754    throws IOException, ClassNotFoundException {
755    assert WritableUtils.readVInt(in) == LIST_CODE;
756    int length = in.readInt();
757    List<Permission> list = new ArrayList<>(length);
758    for (int i = 0; i < length; i++) {
759      assert WritableUtils.readVInt(in) == WRITABLE_CODE;
760      assert WritableUtils.readVInt(in) == WRITABLE_NOT_ENCODED;
761      String className = Text.readString(in);
762      Class<? extends Writable> clazz = conf.getClassByName(className).asSubclass(Writable.class);
763      Writable instance = WritableFactories.newInstance(clazz, conf);
764      instance.readFields(in);
765      list.add((Permission) instance);
766    }
767    return list;
768  }
769
770  public static ListMultimap<String, UserPermission> readUserPermission(byte[] data,
771    Configuration conf) throws DeserializationException {
772    if (ProtobufUtil.isPBMagicPrefix(data)) {
773      int pblen = ProtobufUtil.lengthOfPBMagic();
774      try {
775        AccessControlProtos.UsersAndPermissions.Builder builder =
776          AccessControlProtos.UsersAndPermissions.newBuilder();
777        ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
778        return AccessControlUtil.toUserPermission(builder.build());
779      } catch (IOException e) {
780        throw new DeserializationException(e);
781      }
782    } else {
783      // TODO: We have to re-write non-PB data as PB encoded. Otherwise we will carry old Writables
784      // forever (here and a couple of other places).
785      ListMultimap<String, UserPermission> userPermission = ArrayListMultimap.create();
786      try {
787        DataInput in = new DataInputStream(new ByteArrayInputStream(data));
788        int length = in.readInt();
789        for (int i = 0; i < length; i++) {
790          String user = Text.readString(in);
791          List<Permission> perms = readWritableUserPermission(in, conf);
792          for (Permission p : perms) {
793            userPermission.put(user, new UserPermission(user, p));
794          }
795        }
796      } catch (IOException | ClassNotFoundException e) {
797        throw new DeserializationException(e);
798      }
799      return userPermission;
800    }
801  }
802
803  public static ListMultimap<String, Permission> readPermissions(byte[] data, Configuration conf)
804    throws DeserializationException {
805    if (ProtobufUtil.isPBMagicPrefix(data)) {
806      int pblen = ProtobufUtil.lengthOfPBMagic();
807      try {
808        AccessControlProtos.UsersAndPermissions.Builder builder =
809          AccessControlProtos.UsersAndPermissions.newBuilder();
810        ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
811        return AccessControlUtil.toPermission(builder.build());
812      } catch (IOException e) {
813        throw new DeserializationException(e);
814      }
815    } else {
816      // TODO: We have to re-write non-PB data as PB encoded. Otherwise we will carry old Writables
817      // forever (here and a couple of other places).
818      ListMultimap<String, Permission> perms = ArrayListMultimap.create();
819      try {
820        DataInput in = new DataInputStream(new ByteArrayInputStream(data));
821        int length = in.readInt();
822        for (int i = 0; i < length; i++) {
823          String user = Text.readString(in);
824          perms.putAll(user, readWritableUserPermission(in, conf));
825        }
826      } catch (IOException | ClassNotFoundException e) {
827        throw new DeserializationException(e);
828      }
829      return perms;
830    }
831  }
832
833  public static boolean isGlobalEntry(byte[] entryName) {
834    return Bytes.equals(entryName, ACL_GLOBAL_NAME);
835  }
836
837  public static boolean isNamespaceEntry(String entryName) {
838    return isNamespaceEntry(Bytes.toBytes(entryName));
839  }
840
841  public static boolean isNamespaceEntry(byte[] entryName) {
842    return entryName != null && entryName.length != 0 && entryName[0] == NAMESPACE_PREFIX;
843  }
844
845  public static boolean isTableEntry(byte[] entryName) {
846    return !isNamespaceEntry(entryName) && !isGlobalEntry(entryName) && entryName != null;
847  }
848
849  public static String toNamespaceEntry(String namespace) {
850    return NAMESPACE_PREFIX + namespace;
851  }
852
853  public static String fromNamespaceEntry(String namespace) {
854    if (namespace.charAt(0) != NAMESPACE_PREFIX) {
855      throw new IllegalArgumentException("Argument is not a valid namespace entry");
856    }
857    return namespace.substring(1);
858  }
859
860  public static byte[] toNamespaceEntry(byte[] namespace) {
861    byte[] ret = new byte[namespace.length + 1];
862    ret[0] = NAMESPACE_PREFIX;
863    System.arraycopy(namespace, 0, ret, 1, namespace.length);
864    return ret;
865  }
866
867  public static byte[] fromNamespaceEntry(byte[] namespace) {
868    if (namespace[0] != NAMESPACE_PREFIX) {
869      throw new IllegalArgumentException(
870        "Argument is not a valid namespace entry: " + Bytes.toString(namespace));
871    }
872    return Arrays.copyOfRange(namespace, 1, namespace.length);
873  }
874
875  public static List<Permission> getCellPermissionsForUser(User user, ExtendedCell cell)
876    throws IOException {
877    // Save an object allocation where we can
878    if (cell.getTagsLength() == 0) {
879      return null;
880    }
881    List<Permission> results = Lists.newArrayList();
882    Iterator<Tag> tagsIterator = PrivateCellUtil.tagsIterator(cell);
883    while (tagsIterator.hasNext()) {
884      Tag tag = tagsIterator.next();
885      if (tag.getType() == ACL_TAG_TYPE) {
886        // Deserialize the table permissions from the KV
887        // TODO: This can be improved. Don't build UsersAndPermissions just to unpack it again,
888        // use the builder
889        AccessControlProtos.UsersAndPermissions.Builder builder =
890          AccessControlProtos.UsersAndPermissions.newBuilder();
891        if (tag.hasArray()) {
892          ProtobufUtil.mergeFrom(builder, tag.getValueArray(), tag.getValueOffset(),
893            tag.getValueLength());
894        } else {
895          ProtobufUtil.mergeFrom(builder, Tag.cloneValue(tag));
896        }
897        ListMultimap<String, Permission> kvPerms =
898          AccessControlUtil.toUsersAndPermissions(builder.build());
899        // Are there permissions for this user?
900        List<Permission> userPerms = kvPerms.get(user.getShortName());
901        if (userPerms != null) {
902          results.addAll(userPerms);
903        }
904        // Are there permissions for any of the groups this user belongs to?
905        String[] groupNames = user.getGroupNames();
906        if (groupNames != null) {
907          for (String group : groupNames) {
908            List<Permission> groupPerms = kvPerms.get(AuthUtil.toGroupEntry(group));
909            if (results != null) {
910              results.addAll(groupPerms);
911            }
912          }
913        }
914      }
915    }
916    return results;
917  }
918}