001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security.access;
020
021import java.io.ByteArrayInputStream;
022import java.io.DataInput;
023import java.io.DataInputStream;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.TreeMap;
032import java.util.TreeSet;
033import org.apache.commons.lang3.StringUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.AuthUtil;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.Cell.Type;
038import org.apache.hadoop.hbase.CellBuilderFactory;
039import org.apache.hadoop.hbase.CellBuilderType;
040import org.apache.hadoop.hbase.CellUtil;
041import org.apache.hadoop.hbase.CompareOperator;
042import org.apache.hadoop.hbase.NamespaceDescriptor;
043import org.apache.hadoop.hbase.PrivateCellUtil;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.Tag;
046import org.apache.hadoop.hbase.TagType;
047import org.apache.hadoop.hbase.client.Connection;
048import org.apache.hadoop.hbase.client.ConnectionFactory;
049import org.apache.hadoop.hbase.client.Delete;
050import org.apache.hadoop.hbase.client.Get;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.Result;
053import org.apache.hadoop.hbase.client.ResultScanner;
054import org.apache.hadoop.hbase.client.Scan;
055import org.apache.hadoop.hbase.client.Table;
056import org.apache.hadoop.hbase.client.TableDescriptor;
057import org.apache.hadoop.hbase.exceptions.DeserializationException;
058import org.apache.hadoop.hbase.filter.QualifierFilter;
059import org.apache.hadoop.hbase.filter.RegexStringComparator;
060import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
061import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
062import org.apache.hadoop.hbase.regionserver.InternalScanner;
063import org.apache.hadoop.hbase.regionserver.Region;
064import org.apache.hadoop.hbase.security.User;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.Pair;
067import org.apache.hadoop.io.Text;
068import org.apache.hadoop.io.Writable;
069import org.apache.hadoop.io.WritableFactories;
070import org.apache.hadoop.io.WritableUtils;
071import org.apache.yetus.audience.InterfaceAudience;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
076import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
077import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
078
079/**
080 * Maintains lists of permission grants to users and groups to allow for
081 * authorization checks by {@link AccessController}.
082 *
083 * <p>
084 * Access control lists are stored in an "internal" metadata table named
085 * {@code _acl_}. Each table's permission grants are stored as a separate row,
086 * keyed by the table name. KeyValues for permissions assignments are stored
087 * in one of the formats:
088 * <pre>
089 * Key                      Desc
090 * --------                 --------
091 * user                     table level permissions for a user [R=read, W=write]
092 * group                    table level permissions for a group
093 * user,family              column family level permissions for a user
094 * group,family             column family level permissions for a group
095 * user,family,qualifier    column qualifier level permissions for a user
096 * group,family,qualifier   column qualifier level permissions for a group
097 * </pre>
098 * <p>
099 * All values are encoded as byte arrays containing the codes from the
100 * org.apache.hadoop.hbase.security.access.TablePermission.Action enum.
101 * </p>
102 */
103@InterfaceAudience.Private
104public final class PermissionStorage {
105  /** Internal storage table for access control lists */
106  public static final TableName ACL_TABLE_NAME =
107      TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
108  public static final byte[] ACL_GLOBAL_NAME = ACL_TABLE_NAME.getName();
109  /** Column family used to store ACL grants */
110  public static final String ACL_LIST_FAMILY_STR = "l";
111  public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
112  /** KV tag to store per cell access control lists */
113  public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE;
114
115  public static final char NAMESPACE_PREFIX = '@';
116
117  /**
118   * Delimiter to separate user, column family, and qualifier in
119   * _acl_ table info: column keys */
120  public static final char ACL_KEY_DELIMITER = ',';
121
122  private static final Logger LOG = LoggerFactory.getLogger(PermissionStorage.class);
123
124  private PermissionStorage() {
125  }
126
127  /**
128   * Stores a new user permission grant in the access control lists table.
129   * @param conf the configuration
130   * @param userPerm the details of the permission to be granted
131   * @param t acl table instance. It is closed upon method return.
132   * @throws IOException in the case of an error accessing the metadata table
133   */
134  public static void addUserPermission(Configuration conf, UserPermission userPerm, Table t,
135      boolean mergeExistingPermissions) throws IOException {
136    Permission permission = userPerm.getPermission();
137    Permission.Action[] actions = permission.getActions();
138    byte[] rowKey = userPermissionRowKey(permission);
139    Put p = new Put(rowKey);
140    byte[] key = userPermissionKey(userPerm);
141
142    if ((actions == null) || (actions.length == 0)) {
143      String msg = "No actions associated with user '" + userPerm.getUser() + "'";
144      LOG.warn(msg);
145      throw new IOException(msg);
146    }
147
148    Set<Permission.Action> actionSet = new TreeSet<Permission.Action>();
149    if(mergeExistingPermissions){
150      List<UserPermission> perms = getUserPermissions(conf, rowKey, null, null, null, false);
151      UserPermission currentPerm = null;
152      for (UserPermission perm : perms) {
153        if (userPerm.equalsExceptActions(perm)) {
154          currentPerm = perm;
155          break;
156        }
157      }
158
159      if (currentPerm != null && currentPerm.getPermission().getActions() != null){
160        actionSet.addAll(Arrays.asList(currentPerm.getPermission().getActions()));
161      }
162    }
163
164    // merge current action with new action.
165    actionSet.addAll(Arrays.asList(actions));
166
167    // serialize to byte array.
168    byte[] value = new byte[actionSet.size()];
169    int index = 0;
170    for (Permission.Action action : actionSet) {
171      value[index++] = action.code();
172    }
173    p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
174        .setRow(p.getRow())
175        .setFamily(ACL_LIST_FAMILY)
176        .setQualifier(key)
177        .setTimestamp(p.getTimestamp())
178        .setType(Type.Put)
179        .setValue(value)
180        .build());
181    if (LOG.isDebugEnabled()) {
182      LOG.debug("Writing permission with rowKey " + Bytes.toString(rowKey) + " "
183          + Bytes.toString(key) + ": " + Bytes.toStringBinary(value));
184    }
185    try {
186      t.put(p);
187    } finally {
188      t.close();
189    }
190  }
191
192  static void addUserPermission(Configuration conf, UserPermission userPerm, Table t)
193          throws IOException{
194    addUserPermission(conf, userPerm, t, false);
195  }
196
197  /**
198   * Removes a previously granted permission from the stored access control
199   * lists.  The {@link TablePermission} being removed must exactly match what
200   * is stored -- no wildcard matching is attempted.  Ie, if user "bob" has
201   * been granted "READ" access to the "data" table, but only to column family
202   * plus qualifier "info:colA", then trying to call this method with only
203   * user "bob" and the table name "data" (but without specifying the
204   * column qualifier "info:colA") will have no effect.
205   *
206   * @param conf the configuration
207   * @param userPerm the details of the permission to be revoked
208   * @param t acl table
209   * @throws IOException if there is an error accessing the metadata table
210   */
211  public static void removeUserPermission(Configuration conf, UserPermission userPerm, Table t)
212      throws IOException {
213    if (null == userPerm.getPermission().getActions() ||
214        userPerm.getPermission().getActions().length == 0) {
215      removePermissionRecord(conf, userPerm, t);
216    } else {
217      // Get all the global user permissions from the acl table
218      List<UserPermission> permsList =
219        getUserPermissions(conf, userPermissionRowKey(userPerm.getPermission()),
220          null, null, null, false);
221      List<Permission.Action> remainingActions = new ArrayList<>();
222      List<Permission.Action> dropActions = Arrays.asList(userPerm.getPermission().getActions());
223      for (UserPermission perm : permsList) {
224        // Find the user and remove only the requested permissions
225        if (perm.getUser().equals(userPerm.getUser())) {
226          for (Permission.Action oldAction : perm.getPermission().getActions()) {
227            if (!dropActions.contains(oldAction)) {
228              remainingActions.add(oldAction);
229            }
230          }
231          if (!remainingActions.isEmpty()) {
232            perm.getPermission().setActions(
233              remainingActions.toArray(new Permission.Action[remainingActions.size()]));
234            addUserPermission(conf, perm, t);
235          } else {
236            removePermissionRecord(conf, userPerm, t);
237          }
238          break;
239        }
240      }
241    }
242    if (LOG.isDebugEnabled()) {
243      LOG.debug("Removed permission "+ userPerm.toString());
244    }
245  }
246
247  private static void removePermissionRecord(Configuration conf, UserPermission userPerm, Table t)
248      throws IOException {
249    Delete d = new Delete(userPermissionRowKey(userPerm.getPermission()));
250    d.addColumns(ACL_LIST_FAMILY, userPermissionKey(userPerm));
251    try {
252      t.delete(d);
253    } finally {
254      t.close();
255    }
256  }
257
258  /**
259   * Remove specified table from the _acl_ table.
260   */
261  static void removeTablePermissions(Configuration conf, TableName tableName, Table t)
262      throws IOException{
263    Delete d = new Delete(tableName.getName());
264    d.addFamily(ACL_LIST_FAMILY);
265
266    if (LOG.isDebugEnabled()) {
267      LOG.debug("Removing permissions of removed table "+ tableName);
268    }
269    try {
270      t.delete(d);
271    } finally {
272      t.close();
273    }
274  }
275
276  /**
277   * Remove specified namespace from the acl table.
278   */
279  static void removeNamespacePermissions(Configuration conf, String namespace, Table t)
280      throws IOException{
281    Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
282    d.addFamily(ACL_LIST_FAMILY);
283    if (LOG.isDebugEnabled()) {
284      LOG.debug("Removing permissions of removed namespace "+ namespace);
285    }
286
287    try {
288      t.delete(d);
289    } finally {
290      t.close();
291    }
292  }
293
294  static private void removeTablePermissions(TableName tableName, byte[] column, Table table,
295      boolean closeTable) throws IOException {
296    Scan scan = new Scan();
297    scan.addFamily(ACL_LIST_FAMILY);
298
299    String columnName = Bytes.toString(column);
300    scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new RegexStringComparator(
301        String.format("(%s%s%s)|(%s%s)$",
302            ACL_KEY_DELIMITER, columnName, ACL_KEY_DELIMITER,
303            ACL_KEY_DELIMITER, columnName))));
304
305    Set<byte[]> qualifierSet = new TreeSet<>(Bytes.BYTES_COMPARATOR);
306    ResultScanner scanner = null;
307    try {
308      scanner = table.getScanner(scan);
309      for (Result res : scanner) {
310        for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
311          qualifierSet.add(q);
312        }
313      }
314
315      if (qualifierSet.size() > 0) {
316        Delete d = new Delete(tableName.getName());
317        for (byte[] qualifier : qualifierSet) {
318          d.addColumns(ACL_LIST_FAMILY, qualifier);
319        }
320        table.delete(d);
321      }
322    } finally {
323      if (scanner != null) {
324        scanner.close();
325      }
326      if (closeTable) {
327        table.close();
328      }
329    }
330  }
331
332  /**
333   * Remove specified table column from the acl table.
334   */
335  static void removeTablePermissions(Configuration conf, TableName tableName, byte[] column,
336      Table t) throws IOException {
337    if (LOG.isDebugEnabled()) {
338      LOG.debug("Removing permissions of removed column " + Bytes.toString(column) +
339          " from table "+ tableName);
340    }
341    removeTablePermissions(tableName, column, t, true);
342  }
343
344  static byte[] userPermissionRowKey(Permission permission) {
345    byte[] row;
346    if (permission instanceof TablePermission) {
347      TablePermission tablePerm = (TablePermission) permission;
348      row = tablePerm.getTableName().getName();
349    } else if (permission instanceof NamespacePermission) {
350      NamespacePermission nsPerm = (NamespacePermission) permission;
351      row = Bytes.toBytes(toNamespaceEntry(nsPerm.getNamespace()));
352    } else {
353      // permission instanceof TablePermission
354      row = ACL_GLOBAL_NAME;
355    }
356    return row;
357  }
358
359  /**
360   * Build qualifier key from user permission:
361   *  username
362   *  username,family
363   *  username,family,qualifier
364   */
365  static byte[] userPermissionKey(UserPermission permission) {
366    byte[] key = Bytes.toBytes(permission.getUser());
367    byte[] qualifier = null;
368    byte[] family = null;
369    if (permission.getPermission().getAccessScope() == Permission.Scope.TABLE) {
370      TablePermission tablePermission = (TablePermission) permission.getPermission();
371      family = tablePermission.getFamily();
372      qualifier = tablePermission.getQualifier();
373    }
374
375    if (family != null && family.length > 0) {
376      key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, family));
377      if (qualifier != null && qualifier.length > 0) {
378        key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, qualifier));
379      }
380    }
381
382    return key;
383  }
384
385  /**
386   * Returns {@code true} if the given region is part of the {@code _acl_}
387   * metadata table.
388   */
389  static boolean isAclRegion(Region region) {
390    return ACL_TABLE_NAME.equals(region.getTableDescriptor().getTableName());
391  }
392
393  /**
394   * Returns {@code true} if the given table is {@code _acl_} metadata table.
395   */
396  static boolean isAclTable(TableDescriptor desc) {
397    return ACL_TABLE_NAME.equals(desc.getTableName());
398  }
399
400  /**
401   * Loads all of the permission grants stored in a region of the {@code _acl_}
402   * table.
403   *
404   * @param aclRegion the acl region
405   * @return a map of the permissions for this table.
406   * @throws IOException if an error occurs
407   */
408  static Map<byte[], ListMultimap<String, UserPermission>> loadAll(Region aclRegion)
409      throws IOException {
410    if (!isAclRegion(aclRegion)) {
411      throw new IOException("Can only load permissions from "+ACL_TABLE_NAME);
412    }
413
414    Map<byte[], ListMultimap<String, UserPermission>> allPerms =
415      new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
416
417    // do a full scan of _acl_ table
418
419    Scan scan = new Scan();
420    scan.addFamily(ACL_LIST_FAMILY);
421
422    InternalScanner iScanner = null;
423    try {
424      iScanner = aclRegion.getScanner(scan);
425
426      while (true) {
427        List<Cell> row = new ArrayList<>();
428
429        boolean hasNext = iScanner.next(row);
430        ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
431        byte[] entry = null;
432        for (Cell kv : row) {
433          if (entry == null) {
434            entry = CellUtil.cloneRow(kv);
435          }
436          Pair<String, Permission> permissionsOfUserOnTable =
437              parsePermissionRecord(entry, kv, null, null, false, null);
438          if (permissionsOfUserOnTable != null) {
439            String username = permissionsOfUserOnTable.getFirst();
440            Permission permission = permissionsOfUserOnTable.getSecond();
441            perms.put(username, new UserPermission(username, permission));
442          }
443        }
444        if (entry != null) {
445          allPerms.put(entry, perms);
446        }
447        if (!hasNext) {
448          break;
449        }
450      }
451    } finally {
452      if (iScanner != null) {
453        iScanner.close();
454      }
455    }
456
457    return allPerms;
458  }
459
460  /**
461   * Load all permissions from the region server holding {@code _acl_},
462   * primarily intended for testing purposes.
463   */
464  static Map<byte[], ListMultimap<String, UserPermission>> loadAll(
465      Configuration conf) throws IOException {
466    Map<byte[], ListMultimap<String, UserPermission>> allPerms =
467      new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
468
469    // do a full scan of _acl_, filtering on only first table region rows
470
471    Scan scan = new Scan();
472    scan.addFamily(ACL_LIST_FAMILY);
473
474    ResultScanner scanner = null;
475    // TODO: Pass in a Connection rather than create one each time.
476    try (Connection connection = ConnectionFactory.createConnection(conf)) {
477      try (Table table = connection.getTable(ACL_TABLE_NAME)) {
478        scanner = table.getScanner(scan);
479        try {
480          for (Result row : scanner) {
481            ListMultimap<String, UserPermission> resultPerms =
482                parsePermissions(row.getRow(), row, null, null, null, false);
483            allPerms.put(row.getRow(), resultPerms);
484          }
485        } finally {
486          if (scanner != null) {
487            scanner.close();
488          }
489        }
490      }
491    }
492
493    return allPerms;
494  }
495
496  public static ListMultimap<String, UserPermission> getTablePermissions(Configuration conf,
497      TableName tableName) throws IOException {
498    return getPermissions(conf, tableName != null ? tableName.getName() : null, null, null, null,
499      null, false);
500  }
501
502  public static ListMultimap<String, UserPermission> getNamespacePermissions(Configuration conf,
503      String namespace) throws IOException {
504    return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), null, null, null, null,
505      false);
506  }
507
508  public static ListMultimap<String, UserPermission> getGlobalPermissions(Configuration conf)
509      throws IOException {
510    return getPermissions(conf, null, null, null, null, null, false);
511  }
512
513  /**
514   * Reads user permission assignments stored in the <code>l:</code> column family of the first
515   * table row in <code>_acl_</code>.
516   * <p>
517   * See {@link PermissionStorage class documentation} for the key structure used for storage.
518   * </p>
519   */
520  static ListMultimap<String, UserPermission> getPermissions(Configuration conf, byte[] entryName,
521      Table t, byte[] cf, byte[] cq, String user, boolean hasFilterUser) throws IOException {
522    if (entryName == null) {
523      entryName = ACL_GLOBAL_NAME;
524    }
525    // for normal user tables, we just read the table row from _acl_
526    ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
527    Get get = new Get(entryName);
528    get.addFamily(ACL_LIST_FAMILY);
529    Result row = null;
530    if (t == null) {
531      try (Connection connection = ConnectionFactory.createConnection(conf)) {
532        try (Table table = connection.getTable(ACL_TABLE_NAME)) {
533          row = table.get(get);
534        }
535      }
536    } else {
537      row = t.get(get);
538    }
539    if (!row.isEmpty()) {
540      perms = parsePermissions(entryName, row, cf, cq, user, hasFilterUser);
541    } else {
542      LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
543          + Bytes.toString(entryName));
544    }
545
546    return perms;
547  }
548
549  /**
550   * Returns the currently granted permissions for a given table as the specified user plus
551   * associated permissions.
552   */
553  public static List<UserPermission> getUserTablePermissions(Configuration conf,
554      TableName tableName, byte[] cf, byte[] cq, String userName, boolean hasFilterUser)
555      throws IOException {
556    return getUserPermissions(conf, tableName == null ? null : tableName.getName(), cf, cq,
557      userName, hasFilterUser);
558  }
559
560  /**
561   * Returns the currently granted permissions for a given namespace as the specified user plus
562   * associated permissions.
563   */
564  public static List<UserPermission> getUserNamespacePermissions(Configuration conf,
565      String namespace, String user, boolean hasFilterUser) throws IOException {
566    return getUserPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), null, null, user,
567      hasFilterUser);
568  }
569
570  /**
571   * Returns the currently granted permissions for a given table/namespace with associated
572   * permissions based on the specified column family, column qualifier and user name.
573   * @param conf the configuration
574   * @param entryName Table name or the namespace
575   * @param cf Column family
576   * @param cq Column qualifier
577   * @param user User name to be filtered from permission as requested
578   * @param hasFilterUser true if filter user is provided, otherwise false.
579   * @return List of UserPermissions
580   * @throws IOException on failure
581   */
582  public static List<UserPermission> getUserPermissions(Configuration conf, byte[] entryName,
583      byte[] cf, byte[] cq, String user, boolean hasFilterUser) throws IOException {
584    ListMultimap<String, UserPermission> allPerms =
585        getPermissions(conf, entryName, null, cf, cq, user, hasFilterUser);
586    List<UserPermission> perms = new ArrayList<>();
587    for (Map.Entry<String, UserPermission> entry : allPerms.entries()) {
588      perms.add(entry.getValue());
589    }
590    return perms;
591  }
592
593  /**
594   * Parse and filter permission based on the specified column family, column qualifier and user
595   * name.
596   */
597  private static ListMultimap<String, UserPermission> parsePermissions(byte[] entryName,
598      Result result, byte[] cf, byte[] cq, String user, boolean hasFilterUser) {
599    ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
600    if (result != null && result.size() > 0) {
601      for (Cell kv : result.rawCells()) {
602        Pair<String, Permission> permissionsOfUserOnTable =
603            parsePermissionRecord(entryName, kv, cf, cq, hasFilterUser, user);
604
605        if (permissionsOfUserOnTable != null) {
606          String username = permissionsOfUserOnTable.getFirst();
607          Permission permission = permissionsOfUserOnTable.getSecond();
608          perms.put(username, new UserPermission(username, permission));
609        }
610      }
611    }
612    return perms;
613  }
614
615  private static Pair<String, Permission> parsePermissionRecord(byte[] entryName, Cell kv,
616      byte[] cf, byte[] cq, boolean filterPerms, String filterUser) {
617    // return X given a set of permissions encoded in the permissionRecord kv.
618    byte[] family = CellUtil.cloneFamily(kv);
619    if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
620      return null;
621    }
622
623    byte[] key = CellUtil.cloneQualifier(kv);
624    byte[] value = CellUtil.cloneValue(kv);
625    if (LOG.isDebugEnabled()) {
626      LOG.debug("Read acl: entry[" +
627        Bytes.toStringBinary(entryName) + "], kv [" +
628        Bytes.toStringBinary(key) + ": " +
629        Bytes.toStringBinary(value)+"]");
630    }
631
632    // check for a column family appended to the key
633    // TODO: avoid the string conversion to make this more efficient
634    String username = Bytes.toString(key);
635
636    // Retrieve group list for the filterUser if cell key is a group.
637    // Group list is not required when filterUser itself a group
638    List<String> filterUserGroups = null;
639    if (filterPerms) {
640      if (username.charAt(0) == '@' && !StringUtils.isEmpty(filterUser)
641          && filterUser.charAt(0) != '@') {
642        filterUserGroups = AccessChecker.getUserGroups(filterUser);
643      }
644    }
645
646    // Handle namespace entry
647    if (isNamespaceEntry(entryName)) {
648      // Filter the permissions cell record if client query
649      if (filterPerms && !validateFilterUser(username, filterUser, filterUserGroups)) {
650        return null;
651      }
652
653      return new Pair<>(username,
654          Permission.newBuilder(Bytes.toString(fromNamespaceEntry(entryName)))
655              .withActionCodes(value).build());
656    }
657
658    // Handle global entry
659    if (isGlobalEntry(entryName)) {
660      // Filter the permissions cell record if client query
661      if (filterPerms && !validateFilterUser(username, filterUser, filterUserGroups)) {
662        return null;
663      }
664
665      return new Pair<>(username, Permission.newBuilder().withActionCodes(value).build());
666    }
667
668    // Handle table entry
669    int idx = username.indexOf(ACL_KEY_DELIMITER);
670    byte[] permFamily = null;
671    byte[] permQualifier = null;
672    if (idx > 0 && idx < username.length()-1) {
673      String remainder = username.substring(idx+1);
674      username = username.substring(0, idx);
675      idx = remainder.indexOf(ACL_KEY_DELIMITER);
676      if (idx > 0 && idx < remainder.length()-1) {
677        permFamily = Bytes.toBytes(remainder.substring(0, idx));
678        permQualifier = Bytes.toBytes(remainder.substring(idx+1));
679      } else {
680        permFamily = Bytes.toBytes(remainder);
681      }
682    }
683
684    // Filter the permissions cell record if client query
685    if (filterPerms) {
686      // ACL table contain 3 types of cell key entries; hbase:Acl, namespace and table. So to filter
687      // the permission cell records additional validations are required at CF, CQ and username.
688      // Here we can proceed based on client input whether it contain filterUser.
689      // Validate the filterUser when specified
690      if (filterUser != null && !validateFilterUser(username, filterUser, filterUserGroups)) {
691        return null;
692      }
693      if (!validateCFAndCQ(permFamily, cf, permQualifier, cq)) {
694        return null;
695      }
696    }
697
698    return new Pair<>(username, Permission.newBuilder(TableName.valueOf(entryName))
699        .withFamily(permFamily).withQualifier(permQualifier).withActionCodes(value).build());
700  }
701
702  /*
703   * Validate the cell key with the client filterUser if specified in the query input. 1. If cell
704   * key (username) is not a group then check whether client filterUser is equal to username 2. If
705   * cell key (username) is a group then check whether client filterUser belongs to the cell key
706   * group (username) 3. In case when both filterUser and username are group names then cell will be
707   * filtered if not equal.
708   */
709  private static boolean validateFilterUser(String username, String filterUser,
710      List<String> filterUserGroups) {
711    if (filterUserGroups == null) {
712      // Validate user name or group names whether equal
713      if (filterUser.equals(username)) {
714        return true;
715      }
716    } else {
717      // Check whether filter user belongs to the cell key group.
718      return filterUserGroups.contains(username.substring(1));
719    }
720    return false;
721  }
722
723  /*
724   * Validate the cell with client CF and CQ if specified in the query input. 1. If CF is NULL, then
725   * no need of further validation, result should include all CF and CQ. 2. IF CF specified and
726   * equal then validation required at CQ level if CF specified in client input, otherwise return
727   * all CQ records.
728   */
729  private static boolean validateCFAndCQ(byte[] permFamily, byte[] cf, byte[] permQualifier,
730      byte[] cq) {
731    boolean include = true;
732    if (cf != null) {
733      if (Bytes.equals(cf, permFamily)) {
734        if (cq != null && !Bytes.equals(cq, permQualifier)) {
735          // if CQ specified and didn't match then ignore this cell
736          include = false;
737        }
738      } else {
739        // if CF specified and didn't match then ignore this cell
740        include = false;
741      }
742    }
743    return include;
744  }
745
746  /**
747   * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances and returns the
748   * resulting byte array. Writes a set of permission [user: table permission]
749   */
750  public static byte[] writePermissionsAsBytes(ListMultimap<String, UserPermission> perms,
751      Configuration conf) {
752    return ProtobufUtil
753        .prependPBMagic(AccessControlUtil.toUserTablePermissions(perms).toByteArray());
754  }
755
756  // This is part of the old HbaseObjectWritableFor96Migration.
757  private static final int LIST_CODE = 61;
758
759  private static final int WRITABLE_CODE = 14;
760
761  private static final int WRITABLE_NOT_ENCODED = 0;
762
763  private static List<Permission> readWritableUserPermission(DataInput in,
764      Configuration conf) throws IOException, ClassNotFoundException {
765    assert WritableUtils.readVInt(in) == LIST_CODE;
766    int length = in.readInt();
767    List<Permission> list = new ArrayList<>(length);
768    for (int i = 0; i < length; i++) {
769      assert WritableUtils.readVInt(in) == WRITABLE_CODE;
770      assert WritableUtils.readVInt(in) == WRITABLE_NOT_ENCODED;
771      String className = Text.readString(in);
772      Class<? extends Writable> clazz = conf.getClassByName(className).asSubclass(Writable.class);
773      Writable instance = WritableFactories.newInstance(clazz, conf);
774      instance.readFields(in);
775      list.add((Permission) instance);
776    }
777    return list;
778  }
779
780  public static ListMultimap<String, UserPermission> readUserPermission(byte[] data,
781      Configuration conf) throws DeserializationException {
782    if (ProtobufUtil.isPBMagicPrefix(data)) {
783      int pblen = ProtobufUtil.lengthOfPBMagic();
784      try {
785        AccessControlProtos.UsersAndPermissions.Builder builder =
786          AccessControlProtos.UsersAndPermissions.newBuilder();
787        ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
788        return AccessControlUtil.toUserPermission(builder.build());
789      } catch (IOException e) {
790        throw new DeserializationException(e);
791      }
792    } else {
793      // TODO: We have to re-write non-PB data as PB encoded. Otherwise we will carry old Writables
794      // forever (here and a couple of other places).
795      ListMultimap<String, UserPermission> userPermission = ArrayListMultimap.create();
796      try {
797        DataInput in = new DataInputStream(new ByteArrayInputStream(data));
798        int length = in.readInt();
799        for (int i = 0; i < length; i++) {
800          String user = Text.readString(in);
801          List<Permission> perms = readWritableUserPermission(in, conf);
802          for (Permission p : perms) {
803            userPermission.put(user, new UserPermission(user, p));
804          }
805        }
806      } catch (IOException | ClassNotFoundException e) {
807        throw new DeserializationException(e);
808      }
809      return userPermission;
810    }
811  }
812
813  public static ListMultimap<String, Permission> readPermissions(byte[] data,
814      Configuration conf) throws DeserializationException {
815    if (ProtobufUtil.isPBMagicPrefix(data)) {
816      int pblen = ProtobufUtil.lengthOfPBMagic();
817      try {
818        AccessControlProtos.UsersAndPermissions.Builder builder =
819          AccessControlProtos.UsersAndPermissions.newBuilder();
820        ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
821        return AccessControlUtil.toPermission(builder.build());
822      } catch (IOException e) {
823        throw new DeserializationException(e);
824      }
825    } else {
826      // TODO: We have to re-write non-PB data as PB encoded. Otherwise we will carry old Writables
827      // forever (here and a couple of other places).
828      ListMultimap<String, Permission> perms = ArrayListMultimap.create();
829      try {
830        DataInput in = new DataInputStream(new ByteArrayInputStream(data));
831        int length = in.readInt();
832        for (int i = 0; i < length; i++) {
833          String user = Text.readString(in);
834          perms.putAll(user, readWritableUserPermission(in, conf));
835        }
836      } catch (IOException | ClassNotFoundException e) {
837        throw new DeserializationException(e);
838      }
839      return perms;
840    }
841  }
842
843  public static boolean isGlobalEntry(byte[] entryName) {
844    return Bytes.equals(entryName, ACL_GLOBAL_NAME);
845  }
846
847  public static boolean isNamespaceEntry(String entryName) {
848    return isNamespaceEntry(Bytes.toBytes(entryName));
849  }
850
851  public static boolean isNamespaceEntry(byte[] entryName) {
852    return entryName != null && entryName.length !=0 && entryName[0] == NAMESPACE_PREFIX;
853  }
854
855  public static boolean isTableEntry(byte[] entryName) {
856    return !isNamespaceEntry(entryName) && !isGlobalEntry(entryName) && entryName != null;
857  }
858
859  public static String toNamespaceEntry(String namespace) {
860    return NAMESPACE_PREFIX + namespace;
861  }
862
863  public static String fromNamespaceEntry(String namespace) {
864    if (namespace.charAt(0) != NAMESPACE_PREFIX) {
865      throw new IllegalArgumentException("Argument is not a valid namespace entry");
866    }
867    return namespace.substring(1);
868  }
869
870  public static byte[] toNamespaceEntry(byte[] namespace) {
871    byte[] ret = new byte[namespace.length+1];
872    ret[0] = NAMESPACE_PREFIX;
873    System.arraycopy(namespace, 0, ret, 1, namespace.length);
874    return ret;
875  }
876
877  public static byte[] fromNamespaceEntry(byte[] namespace) {
878    if(namespace[0] != NAMESPACE_PREFIX) {
879      throw new IllegalArgumentException("Argument is not a valid namespace entry: " +
880          Bytes.toString(namespace));
881    }
882    return Arrays.copyOfRange(namespace, 1, namespace.length);
883  }
884
885  public static List<Permission> getCellPermissionsForUser(User user, Cell cell)
886      throws IOException {
887    // Save an object allocation where we can
888    if (cell.getTagsLength() == 0) {
889      return null;
890    }
891    List<Permission> results = Lists.newArrayList();
892    Iterator<Tag> tagsIterator = PrivateCellUtil.tagsIterator(cell);
893    while (tagsIterator.hasNext()) {
894      Tag tag = tagsIterator.next();
895      if (tag.getType() == ACL_TAG_TYPE) {
896        // Deserialize the table permissions from the KV
897        // TODO: This can be improved. Don't build UsersAndPermissions just to unpack it again,
898        // use the builder
899        AccessControlProtos.UsersAndPermissions.Builder builder =
900            AccessControlProtos.UsersAndPermissions.newBuilder();
901        if (tag.hasArray()) {
902          ProtobufUtil.mergeFrom(builder, tag.getValueArray(), tag.getValueOffset(),
903            tag.getValueLength());
904        } else {
905          ProtobufUtil.mergeFrom(builder, Tag.cloneValue(tag));
906        }
907        ListMultimap<String,Permission> kvPerms =
908            AccessControlUtil.toUsersAndPermissions(builder.build());
909        // Are there permissions for this user?
910        List<Permission> userPerms = kvPerms.get(user.getShortName());
911        if (userPerms != null) {
912          results.addAll(userPerms);
913        }
914        // Are there permissions for any of the groups this user belongs to?
915        String[] groupNames = user.getGroupNames();
916        if (groupNames != null) {
917          for (String group : groupNames) {
918            List<Permission> groupPerms = kvPerms.get(AuthUtil.toGroupEntry(group));
919            if (results != null) {
920              results.addAll(groupPerms);
921            }
922          }
923        }
924      }
925    }
926    return results;
927  }
928}