001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security.access;
020
021import java.io.ByteArrayInputStream;
022import java.io.DataInput;
023import java.io.DataInputStream;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.TreeMap;
032import java.util.TreeSet;
033
034import org.apache.commons.lang3.StringUtils;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.AuthUtil;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.Cell.Type;
039import org.apache.hadoop.hbase.CellBuilderFactory;
040import org.apache.hadoop.hbase.CellBuilderType;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.CompareOperator;
043import org.apache.hadoop.hbase.NamespaceDescriptor;
044import org.apache.hadoop.hbase.PrivateCellUtil;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.Tag;
047import org.apache.hadoop.hbase.TagType;
048import org.apache.hadoop.hbase.client.Connection;
049import org.apache.hadoop.hbase.client.ConnectionFactory;
050import org.apache.hadoop.hbase.client.Delete;
051import org.apache.hadoop.hbase.client.Get;
052import org.apache.hadoop.hbase.client.Put;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.ResultScanner;
055import org.apache.hadoop.hbase.client.Scan;
056import org.apache.hadoop.hbase.client.Table;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.exceptions.DeserializationException;
059import org.apache.hadoop.hbase.filter.QualifierFilter;
060import org.apache.hadoop.hbase.filter.RegexStringComparator;
061import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
062import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
063import org.apache.hadoop.hbase.regionserver.InternalScanner;
064import org.apache.hadoop.hbase.regionserver.Region;
065import org.apache.hadoop.hbase.security.User;
066import org.apache.hadoop.hbase.util.Bytes;
067import org.apache.hadoop.hbase.util.Pair;
068import org.apache.hadoop.io.Text;
069import org.apache.hadoop.io.Writable;
070import org.apache.hadoop.io.WritableFactories;
071import org.apache.hadoop.io.WritableUtils;
072import org.apache.yetus.audience.InterfaceAudience;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
077import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
078import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
079import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
080
081/**
082 * Maintains lists of permission grants to users and groups to allow for
083 * authorization checks by {@link AccessController}.
084 *
085 * <p>
086 * Access control lists are stored in an "internal" metadata table named
087 * {@code _acl_}. Each table's permission grants are stored as a separate row,
088 * keyed by the table name. KeyValues for permissions assignments are stored
089 * in one of the formats:
090 * <pre>
091 * Key                      Desc
092 * --------                 --------
093 * user                     table level permissions for a user [R=read, W=write]
094 * group                    table level permissions for a group
095 * user,family              column family level permissions for a user
096 * group,family             column family level permissions for a group
097 * user,family,qualifier    column qualifier level permissions for a user
098 * group,family,qualifier   column qualifier level permissions for a group
099 * </pre>
100 * <p>
101 * All values are encoded as byte arrays containing the codes from the
102 * org.apache.hadoop.hbase.security.access.TablePermission.Action enum.
103 * </p>
104 */
105@InterfaceAudience.Private
106public final class PermissionStorage {
107  /** Internal storage table for access control lists */
108  public static final TableName ACL_TABLE_NAME =
109      TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
110  public static final byte[] ACL_GLOBAL_NAME = ACL_TABLE_NAME.getName();
111  /** Column family used to store ACL grants */
112  public static final String ACL_LIST_FAMILY_STR = "l";
113  public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
114  /** KV tag to store per cell access control lists */
115  public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE;
116
117  public static final char NAMESPACE_PREFIX = '@';
118
119  /**
120   * Delimiter to separate user, column family, and qualifier in
121   * _acl_ table info: column keys */
122  public static final char ACL_KEY_DELIMITER = ',';
123
124  private static final Logger LOG = LoggerFactory.getLogger(PermissionStorage.class);
125
126  private PermissionStorage() {
127  }
128
129  /**
130   * Stores a new user permission grant in the access control lists table.
131   * @param conf the configuration
132   * @param userPerm the details of the permission to be granted
133   * @param t acl table instance. It is closed upon method return.
134   * @throws IOException in the case of an error accessing the metadata table
135   */
136  public static void addUserPermission(Configuration conf, UserPermission userPerm, Table t,
137      boolean mergeExistingPermissions) throws IOException {
138    Permission permission = userPerm.getPermission();
139    Permission.Action[] actions = permission.getActions();
140    byte[] rowKey = userPermissionRowKey(permission);
141    Put p = new Put(rowKey);
142    byte[] key = userPermissionKey(userPerm);
143
144    if ((actions == null) || (actions.length == 0)) {
145      String msg = "No actions associated with user '" + userPerm.getUser() + "'";
146      LOG.warn(msg);
147      throw new IOException(msg);
148    }
149
150    Set<Permission.Action> actionSet = new TreeSet<Permission.Action>();
151    if(mergeExistingPermissions){
152      List<UserPermission> perms = getUserPermissions(conf, rowKey, null, null, null, false);
153      UserPermission currentPerm = null;
154      for (UserPermission perm : perms) {
155        if (userPerm.equalsExceptActions(perm)) {
156          currentPerm = perm;
157          break;
158        }
159      }
160
161      if (currentPerm != null && currentPerm.getPermission().getActions() != null){
162        actionSet.addAll(Arrays.asList(currentPerm.getPermission().getActions()));
163      }
164    }
165
166    // merge current action with new action.
167    actionSet.addAll(Arrays.asList(actions));
168
169    // serialize to byte array.
170    byte[] value = new byte[actionSet.size()];
171    int index = 0;
172    for (Permission.Action action : actionSet) {
173      value[index++] = action.code();
174    }
175    p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
176        .setRow(p.getRow())
177        .setFamily(ACL_LIST_FAMILY)
178        .setQualifier(key)
179        .setTimestamp(p.getTimestamp())
180        .setType(Type.Put)
181        .setValue(value)
182        .build());
183    if (LOG.isDebugEnabled()) {
184      LOG.debug("Writing permission with rowKey " + Bytes.toString(rowKey) + " "
185          + Bytes.toString(key) + ": " + Bytes.toStringBinary(value));
186    }
187    try {
188      t.put(p);
189    } finally {
190      t.close();
191    }
192  }
193
194  static void addUserPermission(Configuration conf, UserPermission userPerm, Table t)
195          throws IOException{
196    addUserPermission(conf, userPerm, t, false);
197  }
198
199  /**
200   * Removes a previously granted permission from the stored access control
201   * lists.  The {@link TablePermission} being removed must exactly match what
202   * is stored -- no wildcard matching is attempted.  Ie, if user "bob" has
203   * been granted "READ" access to the "data" table, but only to column family
204   * plus qualifier "info:colA", then trying to call this method with only
205   * user "bob" and the table name "data" (but without specifying the
206   * column qualifier "info:colA") will have no effect.
207   *
208   * @param conf the configuration
209   * @param userPerm the details of the permission to be revoked
210   * @param t acl table
211   * @throws IOException if there is an error accessing the metadata table
212   */
213  public static void removeUserPermission(Configuration conf, UserPermission userPerm, Table t)
214      throws IOException {
215    if (null == userPerm.getPermission().getActions() ||
216        userPerm.getPermission().getActions().length == 0) {
217      removePermissionRecord(conf, userPerm, t);
218    } else {
219      // Get all the global user permissions from the acl table
220      List<UserPermission> permsList =
221        getUserPermissions(conf, userPermissionRowKey(userPerm.getPermission()),
222          null, null, null, false);
223      List<Permission.Action> remainingActions = new ArrayList<>();
224      List<Permission.Action> dropActions = Arrays.asList(userPerm.getPermission().getActions());
225      for (UserPermission perm : permsList) {
226        // Find the user and remove only the requested permissions
227        if (perm.getUser().equals(userPerm.getUser())) {
228          for (Permission.Action oldAction : perm.getPermission().getActions()) {
229            if (!dropActions.contains(oldAction)) {
230              remainingActions.add(oldAction);
231            }
232          }
233          if (!remainingActions.isEmpty()) {
234            perm.getPermission().setActions(
235              remainingActions.toArray(new Permission.Action[remainingActions.size()]));
236            addUserPermission(conf, perm, t);
237          } else {
238            removePermissionRecord(conf, userPerm, t);
239          }
240          break;
241        }
242      }
243    }
244    if (LOG.isDebugEnabled()) {
245      LOG.debug("Removed permission "+ userPerm.toString());
246    }
247  }
248
249  private static void removePermissionRecord(Configuration conf, UserPermission userPerm, Table t)
250      throws IOException {
251    Delete d = new Delete(userPermissionRowKey(userPerm.getPermission()));
252    d.addColumns(ACL_LIST_FAMILY, userPermissionKey(userPerm));
253    try {
254      t.delete(d);
255    } finally {
256      t.close();
257    }
258  }
259
260  /**
261   * Remove specified table from the _acl_ table.
262   */
263  static void removeTablePermissions(Configuration conf, TableName tableName, Table t)
264      throws IOException{
265    Delete d = new Delete(tableName.getName());
266    d.addFamily(ACL_LIST_FAMILY);
267
268    if (LOG.isDebugEnabled()) {
269      LOG.debug("Removing permissions of removed table "+ tableName);
270    }
271    try {
272      t.delete(d);
273    } finally {
274      t.close();
275    }
276  }
277
278  /**
279   * Remove specified namespace from the acl table.
280   */
281  static void removeNamespacePermissions(Configuration conf, String namespace, Table t)
282      throws IOException{
283    Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
284    d.addFamily(ACL_LIST_FAMILY);
285    if (LOG.isDebugEnabled()) {
286      LOG.debug("Removing permissions of removed namespace "+ namespace);
287    }
288
289    try {
290      t.delete(d);
291    } finally {
292      t.close();
293    }
294  }
295
296  static private void removeTablePermissions(TableName tableName, byte[] column, Table table,
297      boolean closeTable) throws IOException {
298    Scan scan = new Scan();
299    scan.addFamily(ACL_LIST_FAMILY);
300
301    String columnName = Bytes.toString(column);
302    scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new RegexStringComparator(
303        String.format("(%s%s%s)|(%s%s)$",
304            ACL_KEY_DELIMITER, columnName, ACL_KEY_DELIMITER,
305            ACL_KEY_DELIMITER, columnName))));
306
307    Set<byte[]> qualifierSet = new TreeSet<>(Bytes.BYTES_COMPARATOR);
308    ResultScanner scanner = null;
309    try {
310      scanner = table.getScanner(scan);
311      for (Result res : scanner) {
312        for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
313          qualifierSet.add(q);
314        }
315      }
316
317      if (qualifierSet.size() > 0) {
318        Delete d = new Delete(tableName.getName());
319        for (byte[] qualifier : qualifierSet) {
320          d.addColumns(ACL_LIST_FAMILY, qualifier);
321        }
322        table.delete(d);
323      }
324    } finally {
325      if (scanner != null) {
326        scanner.close();
327      }
328      if (closeTable) {
329        table.close();
330      }
331    }
332  }
333
334  /**
335   * Remove specified table column from the acl table.
336   */
337  static void removeTablePermissions(Configuration conf, TableName tableName, byte[] column,
338      Table t) throws IOException {
339    if (LOG.isDebugEnabled()) {
340      LOG.debug("Removing permissions of removed column " + Bytes.toString(column) +
341          " from table "+ tableName);
342    }
343    removeTablePermissions(tableName, column, t, true);
344  }
345
346  static byte[] userPermissionRowKey(Permission permission) {
347    byte[] row;
348    if (permission instanceof TablePermission) {
349      TablePermission tablePerm = (TablePermission) permission;
350      row = tablePerm.getTableName().getName();
351    } else if (permission instanceof NamespacePermission) {
352      NamespacePermission nsPerm = (NamespacePermission) permission;
353      row = Bytes.toBytes(toNamespaceEntry(nsPerm.getNamespace()));
354    } else {
355      // permission instanceof TablePermission
356      row = ACL_GLOBAL_NAME;
357    }
358    return row;
359  }
360
361  /**
362   * Build qualifier key from user permission:
363   *  username
364   *  username,family
365   *  username,family,qualifier
366   */
367  static byte[] userPermissionKey(UserPermission permission) {
368    byte[] key = Bytes.toBytes(permission.getUser());
369    byte[] qualifier = null;
370    byte[] family = null;
371    if (permission.getPermission().getAccessScope() == Permission.Scope.TABLE) {
372      TablePermission tablePermission = (TablePermission) permission.getPermission();
373      family = tablePermission.getFamily();
374      qualifier = tablePermission.getQualifier();
375    }
376
377    if (family != null && family.length > 0) {
378      key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, family));
379      if (qualifier != null && qualifier.length > 0) {
380        key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, qualifier));
381      }
382    }
383
384    return key;
385  }
386
387  /**
388   * Returns {@code true} if the given region is part of the {@code _acl_}
389   * metadata table.
390   */
391  static boolean isAclRegion(Region region) {
392    return ACL_TABLE_NAME.equals(region.getTableDescriptor().getTableName());
393  }
394
395  /**
396   * Returns {@code true} if the given table is {@code _acl_} metadata table.
397   */
398  static boolean isAclTable(TableDescriptor desc) {
399    return ACL_TABLE_NAME.equals(desc.getTableName());
400  }
401
402  /**
403   * Loads all of the permission grants stored in a region of the {@code _acl_}
404   * table.
405   *
406   * @param aclRegion the acl region
407   * @return a map of the permissions for this table.
408   * @throws IOException if an error occurs
409   */
410  static Map<byte[], ListMultimap<String, UserPermission>> loadAll(Region aclRegion)
411      throws IOException {
412    if (!isAclRegion(aclRegion)) {
413      throw new IOException("Can only load permissions from "+ACL_TABLE_NAME);
414    }
415
416    Map<byte[], ListMultimap<String, UserPermission>> allPerms =
417      new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
418
419    // do a full scan of _acl_ table
420
421    Scan scan = new Scan();
422    scan.addFamily(ACL_LIST_FAMILY);
423
424    InternalScanner iScanner = null;
425    try {
426      iScanner = aclRegion.getScanner(scan);
427
428      while (true) {
429        List<Cell> row = new ArrayList<>();
430
431        boolean hasNext = iScanner.next(row);
432        ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
433        byte[] entry = null;
434        for (Cell kv : row) {
435          if (entry == null) {
436            entry = CellUtil.cloneRow(kv);
437          }
438          Pair<String, Permission> permissionsOfUserOnTable =
439              parsePermissionRecord(entry, kv, null, null, false, null);
440          if (permissionsOfUserOnTable != null) {
441            String username = permissionsOfUserOnTable.getFirst();
442            Permission permission = permissionsOfUserOnTable.getSecond();
443            perms.put(username, new UserPermission(username, permission));
444          }
445        }
446        if (entry != null) {
447          allPerms.put(entry, perms);
448        }
449        if (!hasNext) {
450          break;
451        }
452      }
453    } finally {
454      if (iScanner != null) {
455        iScanner.close();
456      }
457    }
458
459    return allPerms;
460  }
461
462  /**
463   * Load all permissions from the region server holding {@code _acl_},
464   * primarily intended for testing purposes.
465   */
466  static Map<byte[], ListMultimap<String, UserPermission>> loadAll(
467      Configuration conf) throws IOException {
468    Map<byte[], ListMultimap<String, UserPermission>> allPerms =
469      new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
470
471    // do a full scan of _acl_, filtering on only first table region rows
472
473    Scan scan = new Scan();
474    scan.addFamily(ACL_LIST_FAMILY);
475
476    ResultScanner scanner = null;
477    // TODO: Pass in a Connection rather than create one each time.
478    try (Connection connection = ConnectionFactory.createConnection(conf)) {
479      try (Table table = connection.getTable(ACL_TABLE_NAME)) {
480        scanner = table.getScanner(scan);
481        try {
482          for (Result row : scanner) {
483            ListMultimap<String, UserPermission> resultPerms =
484                parsePermissions(row.getRow(), row, null, null, null, false);
485            allPerms.put(row.getRow(), resultPerms);
486          }
487        } finally {
488          if (scanner != null) {
489            scanner.close();
490          }
491        }
492      }
493    }
494
495    return allPerms;
496  }
497
498  public static ListMultimap<String, UserPermission> getTablePermissions(Configuration conf,
499      TableName tableName) throws IOException {
500    return getPermissions(conf, tableName != null ? tableName.getName() : null, null, null, null,
501      null, false);
502  }
503
504  @VisibleForTesting
505  public static ListMultimap<String, UserPermission> getNamespacePermissions(Configuration conf,
506      String namespace) throws IOException {
507    return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), null, null, null, null,
508      false);
509  }
510
511  public static ListMultimap<String, UserPermission> getGlobalPermissions(Configuration conf)
512      throws IOException {
513    return getPermissions(conf, null, null, null, null, null, false);
514  }
515
516  /**
517   * Reads user permission assignments stored in the <code>l:</code> column family of the first
518   * table row in <code>_acl_</code>.
519   * <p>
520   * See {@link PermissionStorage class documentation} for the key structure used for storage.
521   * </p>
522   */
523  static ListMultimap<String, UserPermission> getPermissions(Configuration conf, byte[] entryName,
524      Table t, byte[] cf, byte[] cq, String user, boolean hasFilterUser) throws IOException {
525    if (entryName == null) {
526      entryName = ACL_GLOBAL_NAME;
527    }
528    // for normal user tables, we just read the table row from _acl_
529    ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
530    Get get = new Get(entryName);
531    get.addFamily(ACL_LIST_FAMILY);
532    Result row = null;
533    if (t == null) {
534      try (Connection connection = ConnectionFactory.createConnection(conf)) {
535        try (Table table = connection.getTable(ACL_TABLE_NAME)) {
536          row = table.get(get);
537        }
538      }
539    } else {
540      row = t.get(get);
541    }
542    if (!row.isEmpty()) {
543      perms = parsePermissions(entryName, row, cf, cq, user, hasFilterUser);
544    } else {
545      LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
546          + Bytes.toString(entryName));
547    }
548
549    return perms;
550  }
551
552  /**
553   * Returns the currently granted permissions for a given table as the specified user plus
554   * associated permissions.
555   */
556  public static List<UserPermission> getUserTablePermissions(Configuration conf,
557      TableName tableName, byte[] cf, byte[] cq, String userName, boolean hasFilterUser)
558      throws IOException {
559    return getUserPermissions(conf, tableName == null ? null : tableName.getName(), cf, cq,
560      userName, hasFilterUser);
561  }
562
563  /**
564   * Returns the currently granted permissions for a given namespace as the specified user plus
565   * associated permissions.
566   */
567  public static List<UserPermission> getUserNamespacePermissions(Configuration conf,
568      String namespace, String user, boolean hasFilterUser) throws IOException {
569    return getUserPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), null, null, user,
570      hasFilterUser);
571  }
572
573  /**
574   * Returns the currently granted permissions for a given table/namespace with associated
575   * permissions based on the specified column family, column qualifier and user name.
576   * @param conf the configuration
577   * @param entryName Table name or the namespace
578   * @param cf Column family
579   * @param cq Column qualifier
580   * @param user User name to be filtered from permission as requested
581   * @param hasFilterUser true if filter user is provided, otherwise false.
582   * @return List of UserPermissions
583   * @throws IOException on failure
584   */
585  public static List<UserPermission> getUserPermissions(Configuration conf, byte[] entryName,
586      byte[] cf, byte[] cq, String user, boolean hasFilterUser) throws IOException {
587    ListMultimap<String, UserPermission> allPerms =
588        getPermissions(conf, entryName, null, cf, cq, user, hasFilterUser);
589    List<UserPermission> perms = new ArrayList<>();
590    for (Map.Entry<String, UserPermission> entry : allPerms.entries()) {
591      perms.add(entry.getValue());
592    }
593    return perms;
594  }
595
596  /**
597   * Parse and filter permission based on the specified column family, column qualifier and user
598   * name.
599   */
600  private static ListMultimap<String, UserPermission> parsePermissions(byte[] entryName,
601      Result result, byte[] cf, byte[] cq, String user, boolean hasFilterUser) {
602    ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
603    if (result != null && result.size() > 0) {
604      for (Cell kv : result.rawCells()) {
605        Pair<String, Permission> permissionsOfUserOnTable =
606            parsePermissionRecord(entryName, kv, cf, cq, hasFilterUser, user);
607
608        if (permissionsOfUserOnTable != null) {
609          String username = permissionsOfUserOnTable.getFirst();
610          Permission permission = permissionsOfUserOnTable.getSecond();
611          perms.put(username, new UserPermission(username, permission));
612        }
613      }
614    }
615    return perms;
616  }
617
618  private static Pair<String, Permission> parsePermissionRecord(byte[] entryName, Cell kv,
619      byte[] cf, byte[] cq, boolean filterPerms, String filterUser) {
620    // return X given a set of permissions encoded in the permissionRecord kv.
621    byte[] family = CellUtil.cloneFamily(kv);
622    if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
623      return null;
624    }
625
626    byte[] key = CellUtil.cloneQualifier(kv);
627    byte[] value = CellUtil.cloneValue(kv);
628    if (LOG.isDebugEnabled()) {
629      LOG.debug("Read acl: entry[" +
630        Bytes.toStringBinary(entryName) + "], kv [" +
631        Bytes.toStringBinary(key) + ": " +
632        Bytes.toStringBinary(value)+"]");
633    }
634
635    // check for a column family appended to the key
636    // TODO: avoid the string conversion to make this more efficient
637    String username = Bytes.toString(key);
638
639    // Retrieve group list for the filterUser if cell key is a group.
640    // Group list is not required when filterUser itself a group
641    List<String> filterUserGroups = null;
642    if (filterPerms) {
643      if (username.charAt(0) == '@' && !StringUtils.isEmpty(filterUser)
644          && filterUser.charAt(0) != '@') {
645        filterUserGroups = AccessChecker.getUserGroups(filterUser);
646      }
647    }
648
649    // Handle namespace entry
650    if (isNamespaceEntry(entryName)) {
651      // Filter the permissions cell record if client query
652      if (filterPerms && !validateFilterUser(username, filterUser, filterUserGroups)) {
653        return null;
654      }
655
656      return new Pair<>(username,
657          Permission.newBuilder(Bytes.toString(fromNamespaceEntry(entryName)))
658              .withActionCodes(value).build());
659    }
660
661    // Handle global entry
662    if (isGlobalEntry(entryName)) {
663      // Filter the permissions cell record if client query
664      if (filterPerms && !validateFilterUser(username, filterUser, filterUserGroups)) {
665        return null;
666      }
667
668      return new Pair<>(username, Permission.newBuilder().withActionCodes(value).build());
669    }
670
671    // Handle table entry
672    int idx = username.indexOf(ACL_KEY_DELIMITER);
673    byte[] permFamily = null;
674    byte[] permQualifier = null;
675    if (idx > 0 && idx < username.length()-1) {
676      String remainder = username.substring(idx+1);
677      username = username.substring(0, idx);
678      idx = remainder.indexOf(ACL_KEY_DELIMITER);
679      if (idx > 0 && idx < remainder.length()-1) {
680        permFamily = Bytes.toBytes(remainder.substring(0, idx));
681        permQualifier = Bytes.toBytes(remainder.substring(idx+1));
682      } else {
683        permFamily = Bytes.toBytes(remainder);
684      }
685    }
686
687    // Filter the permissions cell record if client query
688    if (filterPerms) {
689      // ACL table contain 3 types of cell key entries; hbase:Acl, namespace and table. So to filter
690      // the permission cell records additional validations are required at CF, CQ and username.
691      // Here we can proceed based on client input whether it contain filterUser.
692      // Validate the filterUser when specified
693      if (filterUser != null && !validateFilterUser(username, filterUser, filterUserGroups)) {
694        return null;
695      }
696      if (!validateCFAndCQ(permFamily, cf, permQualifier, cq)) {
697        return null;
698      }
699    }
700
701    return new Pair<>(username, Permission.newBuilder(TableName.valueOf(entryName))
702        .withFamily(permFamily).withQualifier(permQualifier).withActionCodes(value).build());
703  }
704
705  /*
706   * Validate the cell key with the client filterUser if specified in the query input. 1. If cell
707   * key (username) is not a group then check whether client filterUser is equal to username 2. If
708   * cell key (username) is a group then check whether client filterUser belongs to the cell key
709   * group (username) 3. In case when both filterUser and username are group names then cell will be
710   * filtered if not equal.
711   */
712  private static boolean validateFilterUser(String username, String filterUser,
713      List<String> filterUserGroups) {
714    if (filterUserGroups == null) {
715      // Validate user name or group names whether equal
716      if (filterUser.equals(username)) {
717        return true;
718      }
719    } else {
720      // Check whether filter user belongs to the cell key group.
721      return filterUserGroups.contains(username.substring(1));
722    }
723    return false;
724  }
725
726  /*
727   * Validate the cell with client CF and CQ if specified in the query input. 1. If CF is NULL, then
728   * no need of further validation, result should include all CF and CQ. 2. IF CF specified and
729   * equal then validation required at CQ level if CF specified in client input, otherwise return
730   * all CQ records.
731   */
732  private static boolean validateCFAndCQ(byte[] permFamily, byte[] cf, byte[] permQualifier,
733      byte[] cq) {
734    boolean include = true;
735    if (cf != null) {
736      if (Bytes.equals(cf, permFamily)) {
737        if (cq != null && !Bytes.equals(cq, permQualifier)) {
738          // if CQ specified and didn't match then ignore this cell
739          include = false;
740        }
741      } else {
742        // if CF specified and didn't match then ignore this cell
743        include = false;
744      }
745    }
746    return include;
747  }
748
749  /**
750   * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances and returns the
751   * resulting byte array. Writes a set of permission [user: table permission]
752   */
753  public static byte[] writePermissionsAsBytes(ListMultimap<String, UserPermission> perms,
754      Configuration conf) {
755    return ProtobufUtil
756        .prependPBMagic(AccessControlUtil.toUserTablePermissions(perms).toByteArray());
757  }
758
759  // This is part of the old HbaseObjectWritableFor96Migration.
760  private static final int LIST_CODE = 61;
761
762  private static final int WRITABLE_CODE = 14;
763
764  private static final int WRITABLE_NOT_ENCODED = 0;
765
766  private static List<Permission> readWritableUserPermission(DataInput in,
767      Configuration conf) throws IOException, ClassNotFoundException {
768    assert WritableUtils.readVInt(in) == LIST_CODE;
769    int length = in.readInt();
770    List<Permission> list = new ArrayList<>(length);
771    for (int i = 0; i < length; i++) {
772      assert WritableUtils.readVInt(in) == WRITABLE_CODE;
773      assert WritableUtils.readVInt(in) == WRITABLE_NOT_ENCODED;
774      String className = Text.readString(in);
775      Class<? extends Writable> clazz = conf.getClassByName(className).asSubclass(Writable.class);
776      Writable instance = WritableFactories.newInstance(clazz, conf);
777      instance.readFields(in);
778      list.add((Permission) instance);
779    }
780    return list;
781  }
782
783  @VisibleForTesting
784  public static ListMultimap<String, UserPermission> readUserPermission(byte[] data,
785      Configuration conf) throws DeserializationException {
786    if (ProtobufUtil.isPBMagicPrefix(data)) {
787      int pblen = ProtobufUtil.lengthOfPBMagic();
788      try {
789        AccessControlProtos.UsersAndPermissions.Builder builder =
790          AccessControlProtos.UsersAndPermissions.newBuilder();
791        ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
792        return AccessControlUtil.toUserPermission(builder.build());
793      } catch (IOException e) {
794        throw new DeserializationException(e);
795      }
796    } else {
797      // TODO: We have to re-write non-PB data as PB encoded. Otherwise we will carry old Writables
798      // forever (here and a couple of other places).
799      ListMultimap<String, UserPermission> userPermission = ArrayListMultimap.create();
800      try {
801        DataInput in = new DataInputStream(new ByteArrayInputStream(data));
802        int length = in.readInt();
803        for (int i = 0; i < length; i++) {
804          String user = Text.readString(in);
805          List<Permission> perms = readWritableUserPermission(in, conf);
806          for (Permission p : perms) {
807            userPermission.put(user, new UserPermission(user, p));
808          }
809        }
810      } catch (IOException | ClassNotFoundException e) {
811        throw new DeserializationException(e);
812      }
813      return userPermission;
814    }
815  }
816
817  public static ListMultimap<String, Permission> readPermissions(byte[] data,
818      Configuration conf) throws DeserializationException {
819    if (ProtobufUtil.isPBMagicPrefix(data)) {
820      int pblen = ProtobufUtil.lengthOfPBMagic();
821      try {
822        AccessControlProtos.UsersAndPermissions.Builder builder =
823          AccessControlProtos.UsersAndPermissions.newBuilder();
824        ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
825        return AccessControlUtil.toPermission(builder.build());
826      } catch (IOException e) {
827        throw new DeserializationException(e);
828      }
829    } else {
830      // TODO: We have to re-write non-PB data as PB encoded. Otherwise we will carry old Writables
831      // forever (here and a couple of other places).
832      ListMultimap<String, Permission> perms = ArrayListMultimap.create();
833      try {
834        DataInput in = new DataInputStream(new ByteArrayInputStream(data));
835        int length = in.readInt();
836        for (int i = 0; i < length; i++) {
837          String user = Text.readString(in);
838          perms.putAll(user, readWritableUserPermission(in, conf));
839        }
840      } catch (IOException | ClassNotFoundException e) {
841        throw new DeserializationException(e);
842      }
843      return perms;
844    }
845  }
846
847  public static boolean isGlobalEntry(byte[] entryName) {
848    return Bytes.equals(entryName, ACL_GLOBAL_NAME);
849  }
850
851  public static boolean isNamespaceEntry(String entryName) {
852    return isNamespaceEntry(Bytes.toBytes(entryName));
853  }
854
855  public static boolean isNamespaceEntry(byte[] entryName) {
856    return entryName != null && entryName.length !=0 && entryName[0] == NAMESPACE_PREFIX;
857  }
858
859  public static boolean isTableEntry(byte[] entryName) {
860    return !isNamespaceEntry(entryName) && !isGlobalEntry(entryName) && entryName != null;
861  }
862
863  public static String toNamespaceEntry(String namespace) {
864    return NAMESPACE_PREFIX + namespace;
865  }
866
867  public static String fromNamespaceEntry(String namespace) {
868    if (namespace.charAt(0) != NAMESPACE_PREFIX) {
869      throw new IllegalArgumentException("Argument is not a valid namespace entry");
870    }
871    return namespace.substring(1);
872  }
873
874  public static byte[] toNamespaceEntry(byte[] namespace) {
875    byte[] ret = new byte[namespace.length+1];
876    ret[0] = NAMESPACE_PREFIX;
877    System.arraycopy(namespace, 0, ret, 1, namespace.length);
878    return ret;
879  }
880
881  public static byte[] fromNamespaceEntry(byte[] namespace) {
882    if(namespace[0] != NAMESPACE_PREFIX) {
883      throw new IllegalArgumentException("Argument is not a valid namespace entry: " +
884          Bytes.toString(namespace));
885    }
886    return Arrays.copyOfRange(namespace, 1, namespace.length);
887  }
888
889  public static List<Permission> getCellPermissionsForUser(User user, Cell cell)
890      throws IOException {
891    // Save an object allocation where we can
892    if (cell.getTagsLength() == 0) {
893      return null;
894    }
895    List<Permission> results = Lists.newArrayList();
896    Iterator<Tag> tagsIterator = PrivateCellUtil.tagsIterator(cell);
897    while (tagsIterator.hasNext()) {
898      Tag tag = tagsIterator.next();
899      if (tag.getType() == ACL_TAG_TYPE) {
900        // Deserialize the table permissions from the KV
901        // TODO: This can be improved. Don't build UsersAndPermissions just to unpack it again,
902        // use the builder
903        AccessControlProtos.UsersAndPermissions.Builder builder =
904            AccessControlProtos.UsersAndPermissions.newBuilder();
905        if (tag.hasArray()) {
906          ProtobufUtil.mergeFrom(builder, tag.getValueArray(), tag.getValueOffset(),
907            tag.getValueLength());
908        } else {
909          ProtobufUtil.mergeFrom(builder, Tag.cloneValue(tag));
910        }
911        ListMultimap<String,Permission> kvPerms =
912            AccessControlUtil.toUsersAndPermissions(builder.build());
913        // Are there permissions for this user?
914        List<Permission> userPerms = kvPerms.get(user.getShortName());
915        if (userPerms != null) {
916          results.addAll(userPerms);
917        }
918        // Are there permissions for any of the groups this user belongs to?
919        String[] groupNames = user.getGroupNames();
920        if (groupNames != null) {
921          for (String group : groupNames) {
922            List<Permission> groupPerms = kvPerms.get(AuthUtil.toGroupEntry(group));
923            if (results != null) {
924              results.addAll(groupPerms);
925            }
926          }
927        }
928      }
929    }
930    return results;
931  }
932}