001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security.access;
020
021import java.io.ByteArrayInputStream;
022import java.io.DataInput;
023import java.io.DataInputStream;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.TreeMap;
032import java.util.TreeSet;
033
034import org.apache.commons.lang3.StringUtils;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.AuthUtil;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.Cell.Type;
039import org.apache.hadoop.hbase.CellBuilderFactory;
040import org.apache.hadoop.hbase.CellBuilderType;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.CompareOperator;
043import org.apache.hadoop.hbase.NamespaceDescriptor;
044import org.apache.hadoop.hbase.PrivateCellUtil;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.Tag;
047import org.apache.hadoop.hbase.TagType;
048import org.apache.hadoop.hbase.client.Connection;
049import org.apache.hadoop.hbase.client.ConnectionFactory;
050import org.apache.hadoop.hbase.client.Delete;
051import org.apache.hadoop.hbase.client.Get;
052import org.apache.hadoop.hbase.client.Put;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.ResultScanner;
055import org.apache.hadoop.hbase.client.Scan;
056import org.apache.hadoop.hbase.client.Table;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.exceptions.DeserializationException;
059import org.apache.hadoop.hbase.filter.QualifierFilter;
060import org.apache.hadoop.hbase.filter.RegexStringComparator;
061import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
062import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
063import org.apache.hadoop.hbase.regionserver.InternalScanner;
064import org.apache.hadoop.hbase.regionserver.Region;
065import org.apache.hadoop.hbase.security.User;
066import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
067import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
068import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
069import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
070import org.apache.hadoop.hbase.util.Bytes;
071import org.apache.hadoop.hbase.util.Pair;
072import org.apache.hadoop.io.Text;
073import org.apache.hadoop.io.Writable;
074import org.apache.hadoop.io.WritableFactories;
075import org.apache.hadoop.io.WritableUtils;
076import org.apache.yetus.audience.InterfaceAudience;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080/**
081 * Maintains lists of permission grants to users and groups to allow for
082 * authorization checks by {@link AccessController}.
083 *
084 * <p>
085 * Access control lists are stored in an "internal" metadata table named
086 * {@code _acl_}. Each table's permission grants are stored as a separate row,
087 * keyed by the table name. KeyValues for permissions assignments are stored
088 * in one of the formats:
089 * <pre>
090 * Key                      Desc
091 * --------                 --------
092 * user                     table level permissions for a user [R=read, W=write]
093 * group                    table level permissions for a group
094 * user,family              column family level permissions for a user
095 * group,family             column family level permissions for a group
096 * user,family,qualifier    column qualifier level permissions for a user
097 * group,family,qualifier   column qualifier level permissions for a group
098 * </pre>
099 * <p>
100 * All values are encoded as byte arrays containing the codes from the
101 * org.apache.hadoop.hbase.security.access.TablePermission.Action enum.
102 * </p>
103 */
104@InterfaceAudience.Private
105public class AccessControlLists {
106  /** Internal storage table for access control lists */
107  public static final TableName ACL_TABLE_NAME =
108      TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
109  public static final byte[] ACL_GLOBAL_NAME = ACL_TABLE_NAME.getName();
110  /** Column family used to store ACL grants */
111  public static final String ACL_LIST_FAMILY_STR = "l";
112  public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
113  /** KV tag to store per cell access control lists */
114  public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE;
115
116  public static final char NAMESPACE_PREFIX = '@';
117
118  /**
119   * Delimiter to separate user, column family, and qualifier in
120   * _acl_ table info: column keys */
121  public static final char ACL_KEY_DELIMITER = ',';
122
123  private static final Logger LOG = LoggerFactory.getLogger(AccessControlLists.class);
124
125  /**
126   * Stores a new user permission grant in the access control lists table.
127   * @param conf the configuration
128   * @param userPerm the details of the permission to be granted
129   * @param t acl table instance. It is closed upon method return.
130   * @throws IOException in the case of an error accessing the metadata table
131   */
132  public static void addUserPermission(Configuration conf, UserPermission userPerm, Table t,
133      boolean mergeExistingPermissions) throws IOException {
134    Permission permission = userPerm.getPermission();
135    Permission.Action[] actions = permission.getActions();
136    byte[] rowKey = userPermissionRowKey(permission);
137    Put p = new Put(rowKey);
138    byte[] key = userPermissionKey(userPerm);
139
140    if ((actions == null) || (actions.length == 0)) {
141      String msg = "No actions associated with user '" + userPerm.getUser() + "'";
142      LOG.warn(msg);
143      throw new IOException(msg);
144    }
145
146    Set<Permission.Action> actionSet = new TreeSet<Permission.Action>();
147    if(mergeExistingPermissions){
148      List<UserPermission> perms = getUserPermissions(conf, rowKey, null, null, null, false);
149      UserPermission currentPerm = null;
150      for (UserPermission perm : perms) {
151        if (userPerm.equalsExceptActions(perm)) {
152          currentPerm = perm;
153          break;
154        }
155      }
156
157      if (currentPerm != null && currentPerm.getPermission().getActions() != null){
158        actionSet.addAll(Arrays.asList(currentPerm.getPermission().getActions()));
159      }
160    }
161
162    // merge current action with new action.
163    actionSet.addAll(Arrays.asList(actions));
164
165    // serialize to byte array.
166    byte[] value = new byte[actionSet.size()];
167    int index = 0;
168    for (Permission.Action action : actionSet) {
169      value[index++] = action.code();
170    }
171    p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
172        .setRow(p.getRow())
173        .setFamily(ACL_LIST_FAMILY)
174        .setQualifier(key)
175        .setTimestamp(p.getTimestamp())
176        .setType(Type.Put)
177        .setValue(value)
178        .build());
179    if (LOG.isDebugEnabled()) {
180      LOG.debug("Writing permission with rowKey "+
181          Bytes.toString(rowKey)+" "+
182          Bytes.toString(key)+": "+Bytes.toStringBinary(value)
183          );
184    }
185    try {
186      t.put(p);
187    } finally {
188      t.close();
189    }
190  }
191
192  static void addUserPermission(Configuration conf, UserPermission userPerm, Table t)
193          throws IOException{
194    addUserPermission(conf, userPerm, t, false);
195  }
196
197  /**
198   * Removes a previously granted permission from the stored access control
199   * lists.  The {@link TablePermission} being removed must exactly match what
200   * is stored -- no wildcard matching is attempted.  Ie, if user "bob" has
201   * been granted "READ" access to the "data" table, but only to column family
202   * plus qualifier "info:colA", then trying to call this method with only
203   * user "bob" and the table name "data" (but without specifying the
204   * column qualifier "info:colA") will have no effect.
205   *
206   * @param conf the configuration
207   * @param userPerm the details of the permission to be revoked
208   * @param t acl table
209   * @throws IOException if there is an error accessing the metadata table
210   */
211  public static void removeUserPermission(Configuration conf, UserPermission userPerm, Table t)
212      throws IOException {
213    if (null == userPerm.getPermission().getActions() ||
214        userPerm.getPermission().getActions().length == 0) {
215      removePermissionRecord(conf, userPerm, t);
216    } else {
217      // Get all the global user permissions from the acl table
218      List<UserPermission> permsList =
219        getUserPermissions(conf, userPermissionRowKey(userPerm.getPermission()),
220          null, null, null, false);
221      List<Permission.Action> remainingActions = new ArrayList<>();
222      List<Permission.Action> dropActions = Arrays.asList(userPerm.getPermission().getActions());
223      for (UserPermission perm : permsList) {
224        // Find the user and remove only the requested permissions
225        if (perm.getUser().equals(userPerm.getUser())) {
226          for (Permission.Action oldAction : perm.getPermission().getActions()) {
227            if (!dropActions.contains(oldAction)) {
228              remainingActions.add(oldAction);
229            }
230          }
231          if (!remainingActions.isEmpty()) {
232            perm.getPermission().setActions(
233              remainingActions.toArray(new Permission.Action[remainingActions.size()]));
234            addUserPermission(conf, perm, t);
235          } else {
236            removePermissionRecord(conf, userPerm, t);
237          }
238          break;
239        }
240      }
241    }
242    if (LOG.isDebugEnabled()) {
243      LOG.debug("Removed permission "+ userPerm.toString());
244    }
245  }
246
247  private static void removePermissionRecord(Configuration conf, UserPermission userPerm, Table t)
248      throws IOException {
249    Delete d = new Delete(userPermissionRowKey(userPerm.getPermission()));
250    d.addColumns(ACL_LIST_FAMILY, userPermissionKey(userPerm));
251    try {
252      t.delete(d);
253    } finally {
254      t.close();
255    }
256  }
257
258  /**
259   * Remove specified table from the _acl_ table.
260   */
261  static void removeTablePermissions(Configuration conf, TableName tableName, Table t)
262      throws IOException{
263    Delete d = new Delete(tableName.getName());
264
265    if (LOG.isDebugEnabled()) {
266      LOG.debug("Removing permissions of removed table "+ tableName);
267    }
268    try {
269      t.delete(d);
270    } finally {
271      t.close();
272    }
273  }
274
275  /**
276   * Remove specified namespace from the acl table.
277   */
278  static void removeNamespacePermissions(Configuration conf, String namespace, Table t)
279      throws IOException{
280    Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
281
282    if (LOG.isDebugEnabled()) {
283      LOG.debug("Removing permissions of removed namespace "+ namespace);
284    }
285
286    try {
287      t.delete(d);
288    } finally {
289      t.close();
290    }
291  }
292
293  static private void removeTablePermissions(TableName tableName, byte[] column, Table table,
294      boolean closeTable) throws IOException {
295    Scan scan = new Scan();
296    scan.addFamily(ACL_LIST_FAMILY);
297
298    String columnName = Bytes.toString(column);
299    scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new RegexStringComparator(
300        String.format("(%s%s%s)|(%s%s)$",
301            ACL_KEY_DELIMITER, columnName, ACL_KEY_DELIMITER,
302            ACL_KEY_DELIMITER, columnName))));
303
304    Set<byte[]> qualifierSet = new TreeSet<>(Bytes.BYTES_COMPARATOR);
305    ResultScanner scanner = null;
306    try {
307      scanner = table.getScanner(scan);
308      for (Result res : scanner) {
309        for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
310          qualifierSet.add(q);
311        }
312      }
313
314      if (qualifierSet.size() > 0) {
315        Delete d = new Delete(tableName.getName());
316        for (byte[] qualifier : qualifierSet) {
317          d.addColumns(ACL_LIST_FAMILY, qualifier);
318        }
319        table.delete(d);
320      }
321    } finally {
322      if (scanner != null) scanner.close();
323      if (closeTable) table.close();
324    }
325  }
326
327  /**
328   * Remove specified table column from the acl table.
329   */
330  static void removeTablePermissions(Configuration conf, TableName tableName, byte[] column,
331      Table t) throws IOException {
332    if (LOG.isDebugEnabled()) {
333      LOG.debug("Removing permissions of removed column " + Bytes.toString(column) +
334          " from table "+ tableName);
335    }
336    removeTablePermissions(tableName, column, t, true);
337  }
338
339  static byte[] userPermissionRowKey(Permission permission) {
340    byte[] row;
341    if (permission instanceof TablePermission) {
342      TablePermission tablePerm = (TablePermission) permission;
343      row = tablePerm.getTableName().getName();
344    } else if (permission instanceof NamespacePermission) {
345      NamespacePermission nsPerm = (NamespacePermission) permission;
346      row = Bytes.toBytes(toNamespaceEntry(nsPerm.getNamespace()));
347    } else {
348      // permission instanceof TablePermission
349      row = ACL_GLOBAL_NAME;
350    }
351    return row;
352  }
353
354  /**
355   * Build qualifier key from user permission:
356   *  username
357   *  username,family
358   *  username,family,qualifier
359   */
360  static byte[] userPermissionKey(UserPermission permission) {
361    byte[] key = Bytes.toBytes(permission.getUser());
362    byte[] qualifier = null;
363    byte[] family = null;
364    if (permission.getPermission().getAccessScope() == Permission.Scope.TABLE) {
365      TablePermission tablePermission = (TablePermission) permission.getPermission();
366      family = tablePermission.getFamily();
367      qualifier = tablePermission.getQualifier();
368    }
369
370    if (family != null && family.length > 0) {
371      key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, family));
372      if (qualifier != null && qualifier.length > 0) {
373        key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, qualifier));
374      }
375    }
376
377    return key;
378  }
379
380  /**
381   * Returns {@code true} if the given region is part of the {@code _acl_}
382   * metadata table.
383   */
384  static boolean isAclRegion(Region region) {
385    return ACL_TABLE_NAME.equals(region.getTableDescriptor().getTableName());
386  }
387
388  /**
389   * Returns {@code true} if the given table is {@code _acl_} metadata table.
390   */
391  static boolean isAclTable(TableDescriptor desc) {
392    return ACL_TABLE_NAME.equals(desc.getTableName());
393  }
394
395  /**
396   * Loads all of the permission grants stored in a region of the {@code _acl_}
397   * table.
398   *
399   * @param aclRegion
400   * @return a map of the permissions for this table.
401   * @throws IOException
402   */
403  static Map<byte[], ListMultimap<String, UserPermission>> loadAll(Region aclRegion)
404      throws IOException {
405
406    if (!isAclRegion(aclRegion)) {
407      throw new IOException("Can only load permissions from "+ACL_TABLE_NAME);
408    }
409
410    Map<byte[], ListMultimap<String, UserPermission>> allPerms =
411      new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
412
413    // do a full scan of _acl_ table
414
415    Scan scan = new Scan();
416    scan.addFamily(ACL_LIST_FAMILY);
417
418    InternalScanner iScanner = null;
419    try {
420      iScanner = aclRegion.getScanner(scan);
421
422      while (true) {
423        List<Cell> row = new ArrayList<>();
424
425        boolean hasNext = iScanner.next(row);
426        ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
427        byte[] entry = null;
428        for (Cell kv : row) {
429          if (entry == null) {
430            entry = CellUtil.cloneRow(kv);
431          }
432          Pair<String, Permission> permissionsOfUserOnTable =
433              parsePermissionRecord(entry, kv, null, null, false, null);
434          if (permissionsOfUserOnTable != null) {
435            String username = permissionsOfUserOnTable.getFirst();
436            Permission permission = permissionsOfUserOnTable.getSecond();
437            perms.put(username, new UserPermission(username, permission));
438          }
439        }
440        if (entry != null) {
441          allPerms.put(entry, perms);
442        }
443        if (!hasNext) {
444          break;
445        }
446      }
447    } finally {
448      if (iScanner != null) {
449        iScanner.close();
450      }
451    }
452
453    return allPerms;
454  }
455
456  /**
457   * Load all permissions from the region server holding {@code _acl_},
458   * primarily intended for testing purposes.
459   */
460  static Map<byte[], ListMultimap<String, UserPermission>> loadAll(
461      Configuration conf) throws IOException {
462    Map<byte[], ListMultimap<String, UserPermission>> allPerms =
463      new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
464
465    // do a full scan of _acl_, filtering on only first table region rows
466
467    Scan scan = new Scan();
468    scan.addFamily(ACL_LIST_FAMILY);
469
470    ResultScanner scanner = null;
471    // TODO: Pass in a Connection rather than create one each time.
472    try (Connection connection = ConnectionFactory.createConnection(conf)) {
473      try (Table table = connection.getTable(ACL_TABLE_NAME)) {
474        scanner = table.getScanner(scan);
475        try {
476          for (Result row : scanner) {
477            ListMultimap<String, UserPermission> resultPerms =
478                parsePermissions(row.getRow(), row, null, null, null, false);
479            allPerms.put(row.getRow(), resultPerms);
480          }
481        } finally {
482          if (scanner != null) scanner.close();
483        }
484      }
485    }
486
487    return allPerms;
488  }
489
490  public static ListMultimap<String, UserPermission> getTablePermissions(Configuration conf,
491      TableName tableName) throws IOException {
492    return getPermissions(conf, tableName != null ? tableName.getName() : null, null, null, null,
493      null, false);
494  }
495
496  @VisibleForTesting
497  public static ListMultimap<String, UserPermission> getNamespacePermissions(Configuration conf,
498      String namespace) throws IOException {
499    return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), null, null, null, null,
500      false);
501  }
502
503  /**
504   * Reads user permission assignments stored in the <code>l:</code> column family of the first
505   * table row in <code>_acl_</code>.
506   * <p>
507   * See {@link AccessControlLists class documentation} for the key structure used for storage.
508   * </p>
509   */
510  static ListMultimap<String, UserPermission> getPermissions(Configuration conf, byte[] entryName,
511      Table t, byte[] cf, byte[] cq, String user, boolean hasFilterUser) throws IOException {
512    if (entryName == null) entryName = ACL_GLOBAL_NAME;
513    // for normal user tables, we just read the table row from _acl_
514    ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
515    Get get = new Get(entryName);
516    get.addFamily(ACL_LIST_FAMILY);
517    Result row = null;
518    if (t == null) {
519      try (Connection connection = ConnectionFactory.createConnection(conf)) {
520        try (Table table = connection.getTable(ACL_TABLE_NAME)) {
521          row = table.get(get);
522        }
523      }
524    } else {
525      row = t.get(get);
526    }
527    if (!row.isEmpty()) {
528      perms = parsePermissions(entryName, row, cf, cq, user, hasFilterUser);
529    } else {
530      LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
531          + Bytes.toString(entryName));
532    }
533
534    return perms;
535  }
536
537  /**
538   * Returns the currently granted permissions for a given table as the specified user plus
539   * associated permissions.
540   */
541  public static List<UserPermission> getUserTablePermissions(Configuration conf,
542      TableName tableName, byte[] cf, byte[] cq, String userName, boolean hasFilterUser)
543      throws IOException {
544    return getUserPermissions(conf, tableName == null ? null : tableName.getName(), cf, cq,
545      userName, hasFilterUser);
546  }
547
548  /**
549   * Returns the currently granted permissions for a given namespace as the specified user plus
550   * associated permissions.
551   */
552  public static List<UserPermission> getUserNamespacePermissions(Configuration conf,
553      String namespace, String user, boolean hasFilterUser) throws IOException {
554    return getUserPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), null, null, user,
555      hasFilterUser);
556  }
557
558  /**
559   * Returns the currently granted permissions for a given table/namespace with associated
560   * permissions based on the specified column family, column qualifier and user name.
561   * @param conf the configuration
562   * @param entryName Table name or the namespace
563   * @param cf Column family
564   * @param cq Column qualifier
565   * @param user User name to be filtered from permission as requested
566   * @param hasFilterUser true if filter user is provided, otherwise false.
567   * @return List of UserPermissions
568   * @throws IOException on failure
569   */
570  public static List<UserPermission> getUserPermissions(Configuration conf, byte[] entryName,
571      byte[] cf, byte[] cq, String user, boolean hasFilterUser) throws IOException {
572    ListMultimap<String, UserPermission> allPerms =
573        getPermissions(conf, entryName, null, cf, cq, user, hasFilterUser);
574    List<UserPermission> perms = new ArrayList<>();
575    for (Map.Entry<String, UserPermission> entry : allPerms.entries()) {
576      perms.add(entry.getValue());
577    }
578    return perms;
579  }
580
581  /**
582   * Parse and filter permission based on the specified column family, column qualifier and user
583   * name.
584   */
585  private static ListMultimap<String, UserPermission> parsePermissions(byte[] entryName,
586      Result result, byte[] cf, byte[] cq, String user, boolean hasFilterUser) {
587    ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
588    if (result != null && result.size() > 0) {
589      for (Cell kv : result.rawCells()) {
590        Pair<String, Permission> permissionsOfUserOnTable =
591            parsePermissionRecord(entryName, kv, cf, cq, hasFilterUser, user);
592
593        if (permissionsOfUserOnTable != null) {
594          String username = permissionsOfUserOnTable.getFirst();
595          Permission permission = permissionsOfUserOnTable.getSecond();
596          perms.put(username, new UserPermission(username, permission));
597        }
598      }
599    }
600    return perms;
601  }
602
603  private static Pair<String, Permission> parsePermissionRecord(byte[] entryName, Cell kv,
604      byte[] cf, byte[] cq, boolean filterPerms, String filterUser) {
605    // return X given a set of permissions encoded in the permissionRecord kv.
606    byte[] family = CellUtil.cloneFamily(kv);
607    if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
608      return null;
609    }
610
611    byte[] key = CellUtil.cloneQualifier(kv);
612    byte[] value = CellUtil.cloneValue(kv);
613    if (LOG.isDebugEnabled()) {
614      LOG.debug("Read acl: entry[" +
615        Bytes.toStringBinary(entryName) + "], kv [" +
616        Bytes.toStringBinary(key) + ": " +
617        Bytes.toStringBinary(value)+"]");
618    }
619
620    // check for a column family appended to the key
621    // TODO: avoid the string conversion to make this more efficient
622    String username = Bytes.toString(key);
623
624    // Retrieve group list for the filterUser if cell key is a group.
625    // Group list is not required when filterUser itself a group
626    List<String> filterUserGroups = null;
627    if (filterPerms) {
628      if (username.charAt(0) == '@' && !StringUtils.isEmpty(filterUser)
629          && filterUser.charAt(0) != '@') {
630        filterUserGroups = AccessChecker.getUserGroups(filterUser);
631      }
632    }
633
634    // Handle namespace entry
635    if (isNamespaceEntry(entryName)) {
636      // Filter the permissions cell record if client query
637      if (filterPerms && !validateFilterUser(username, filterUser, filterUserGroups)) {
638        return null;
639      }
640
641      return new Pair<>(username,
642          Permission.newBuilder(Bytes.toString(fromNamespaceEntry(entryName)))
643              .withActionCodes(value).build());
644    }
645
646    // Handle global entry
647    if (isGlobalEntry(entryName)) {
648      // Filter the permissions cell record if client query
649      if (filterPerms && !validateFilterUser(username, filterUser, filterUserGroups)) {
650        return null;
651      }
652
653      return new Pair<>(username, Permission.newBuilder().withActionCodes(value).build());
654    }
655
656    // Handle table entry
657    int idx = username.indexOf(ACL_KEY_DELIMITER);
658    byte[] permFamily = null;
659    byte[] permQualifier = null;
660    if (idx > 0 && idx < username.length()-1) {
661      String remainder = username.substring(idx+1);
662      username = username.substring(0, idx);
663      idx = remainder.indexOf(ACL_KEY_DELIMITER);
664      if (idx > 0 && idx < remainder.length()-1) {
665        permFamily = Bytes.toBytes(remainder.substring(0, idx));
666        permQualifier = Bytes.toBytes(remainder.substring(idx+1));
667      } else {
668        permFamily = Bytes.toBytes(remainder);
669      }
670    }
671
672    // Filter the permissions cell record if client query
673    if (filterPerms) {
674      // ACL table contain 3 types of cell key entries; hbase:Acl, namespace and table. So to filter
675      // the permission cell records additional validations are required at CF, CQ and username.
676      // Here we can proceed based on client input whether it contain filterUser.
677      // Validate the filterUser when specified
678      if (filterUser != null && !validateFilterUser(username, filterUser, filterUserGroups)) {
679        return null;
680      }
681      if (!validateCFAndCQ(permFamily, cf, permQualifier, cq)) {
682        return null;
683      }
684    }
685
686    return new Pair<>(username, Permission.newBuilder(TableName.valueOf(entryName))
687        .withFamily(permFamily).withQualifier(permQualifier).withActionCodes(value).build());
688  }
689
690  /*
691   * Validate the cell key with the client filterUser if specified in the query input. 1. If cell
692   * key (username) is not a group then check whether client filterUser is equal to username 2. If
693   * cell key (username) is a group then check whether client filterUser belongs to the cell key
694   * group (username) 3. In case when both filterUser and username are group names then cell will be
695   * filtered if not equal.
696   */
697  private static boolean validateFilterUser(String username, String filterUser,
698      List<String> filterUserGroups) {
699    if (filterUserGroups == null) {
700      // Validate user name or group names whether equal
701      if (filterUser.equals(username)) {
702        return true;
703      }
704    } else {
705      // Check whether filter user belongs to the cell key group.
706      return filterUserGroups.contains(username.substring(1));
707    }
708    return false;
709  }
710
711  /*
712   * Validate the cell with client CF and CQ if specified in the query input. 1. If CF is NULL, then
713   * no need of further validation, result should include all CF and CQ. 2. IF CF specified and
714   * equal then validation required at CQ level if CF specified in client input, otherwise return
715   * all CQ records.
716   */
717  private static boolean validateCFAndCQ(byte[] permFamily, byte[] cf, byte[] permQualifier,
718      byte[] cq) {
719    boolean include = true;
720    if (cf != null) {
721      if (Bytes.equals(cf, permFamily)) {
722        if (cq != null && !Bytes.equals(cq, permQualifier)) {
723          // if CQ specified and didn't match then ignore this cell
724          include = false;
725        }
726      } else {
727        // if CF specified and didn't match then ignore this cell
728        include = false;
729      }
730    }
731    return include;
732  }
733
734  /**
735   * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances and returns the
736   * resulting byte array. Writes a set of permission [user: table permission]
737   */
738  public static byte[] writePermissionsAsBytes(ListMultimap<String, UserPermission> perms,
739      Configuration conf) {
740    return ProtobufUtil.prependPBMagic(AccessControlUtil.toUserTablePermissions(perms).toByteArray());
741  }
742
743  // This is part of the old HbaseObjectWritableFor96Migration.
744  private static final int LIST_CODE = 61;
745
746  private static final int WRITABLE_CODE = 14;
747
748  private static final int WRITABLE_NOT_ENCODED = 0;
749
750  private static List<Permission> readWritableUserPermission(DataInput in,
751      Configuration conf) throws IOException, ClassNotFoundException {
752    assert WritableUtils.readVInt(in) == LIST_CODE;
753    int length = in.readInt();
754    List<Permission> list = new ArrayList<>(length);
755    for (int i = 0; i < length; i++) {
756      assert WritableUtils.readVInt(in) == WRITABLE_CODE;
757      assert WritableUtils.readVInt(in) == WRITABLE_NOT_ENCODED;
758      String className = Text.readString(in);
759      Class<? extends Writable> clazz = conf.getClassByName(className).asSubclass(Writable.class);
760      Writable instance = WritableFactories.newInstance(clazz, conf);
761      instance.readFields(in);
762      list.add((Permission) instance);
763    }
764    return list;
765  }
766
767  @VisibleForTesting
768  public static ListMultimap<String, UserPermission> readUserPermission(byte[] data,
769      Configuration conf) throws DeserializationException {
770    if (ProtobufUtil.isPBMagicPrefix(data)) {
771      int pblen = ProtobufUtil.lengthOfPBMagic();
772      try {
773        AccessControlProtos.UsersAndPermissions.Builder builder =
774          AccessControlProtos.UsersAndPermissions.newBuilder();
775        ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
776        return AccessControlUtil.toUserPermission(builder.build());
777      } catch (IOException e) {
778        throw new DeserializationException(e);
779      }
780    } else {
781      // TODO: We have to re-write non-PB data as PB encoded. Otherwise we will carry old Writables
782      // forever (here and a couple of other places).
783      ListMultimap<String, UserPermission> userPermission = ArrayListMultimap.create();
784      try {
785        DataInput in = new DataInputStream(new ByteArrayInputStream(data));
786        int length = in.readInt();
787        for (int i = 0; i < length; i++) {
788          String user = Text.readString(in);
789          List<Permission> perms = readWritableUserPermission(in, conf);
790          for (Permission p : perms) {
791            userPermission.put(user, new UserPermission(user, p));
792          }
793        }
794      } catch (IOException | ClassNotFoundException e) {
795        throw new DeserializationException(e);
796      }
797      return userPermission;
798    }
799  }
800
801  public static ListMultimap<String, Permission> readPermissions(byte[] data,
802      Configuration conf) throws DeserializationException {
803    if (ProtobufUtil.isPBMagicPrefix(data)) {
804      int pblen = ProtobufUtil.lengthOfPBMagic();
805      try {
806        AccessControlProtos.UsersAndPermissions.Builder builder =
807          AccessControlProtos.UsersAndPermissions.newBuilder();
808        ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
809        return AccessControlUtil.toPermission(builder.build());
810      } catch (IOException e) {
811        throw new DeserializationException(e);
812      }
813    } else {
814      // TODO: We have to re-write non-PB data as PB encoded. Otherwise we will carry old Writables
815      // forever (here and a couple of other places).
816      ListMultimap<String, Permission> perms = ArrayListMultimap.create();
817      try {
818        DataInput in = new DataInputStream(new ByteArrayInputStream(data));
819        int length = in.readInt();
820        for (int i = 0; i < length; i++) {
821          String user = Text.readString(in);
822          perms.putAll(user, readWritableUserPermission(in, conf));
823        }
824      } catch (IOException | ClassNotFoundException e) {
825        throw new DeserializationException(e);
826      }
827      return perms;
828    }
829  }
830
831  public static boolean isGlobalEntry(byte[] entryName) {
832    return entryName != null && TableName.valueOf(entryName).equals(ACL_TABLE_NAME);
833  }
834
835  public static boolean isNamespaceEntry(String entryName) {
836    return entryName != null && entryName.charAt(0) == NAMESPACE_PREFIX;
837  }
838
839  public static boolean isNamespaceEntry(byte[] entryName) {
840    return entryName != null && entryName.length !=0 && entryName[0] == NAMESPACE_PREFIX;
841  }
842
843  public static String toNamespaceEntry(String namespace) {
844    return NAMESPACE_PREFIX + namespace;
845  }
846
847  public static String fromNamespaceEntry(String namespace) {
848    if(namespace.charAt(0) != NAMESPACE_PREFIX)
849      throw new IllegalArgumentException("Argument is not a valid namespace entry");
850    return namespace.substring(1);
851  }
852
853  public static byte[] toNamespaceEntry(byte[] namespace) {
854    byte[] ret = new byte[namespace.length+1];
855    ret[0] = NAMESPACE_PREFIX;
856    System.arraycopy(namespace, 0, ret, 1, namespace.length);
857    return ret;
858  }
859
860  public static byte[] fromNamespaceEntry(byte[] namespace) {
861    if(namespace[0] != NAMESPACE_PREFIX) {
862      throw new IllegalArgumentException("Argument is not a valid namespace entry: " +
863          Bytes.toString(namespace));
864    }
865    return Arrays.copyOfRange(namespace, 1, namespace.length);
866  }
867
868  public static List<Permission> getCellPermissionsForUser(User user, Cell cell)
869      throws IOException {
870    // Save an object allocation where we can
871    if (cell.getTagsLength() == 0) {
872      return null;
873    }
874    List<Permission> results = Lists.newArrayList();
875    Iterator<Tag> tagsIterator = PrivateCellUtil.tagsIterator(cell);
876    while (tagsIterator.hasNext()) {
877      Tag tag = tagsIterator.next();
878      if (tag.getType() == ACL_TAG_TYPE) {
879        // Deserialize the table permissions from the KV
880        // TODO: This can be improved. Don't build UsersAndPermissions just to unpack it again,
881        // use the builder
882        AccessControlProtos.UsersAndPermissions.Builder builder =
883            AccessControlProtos.UsersAndPermissions.newBuilder();
884        if (tag.hasArray()) {
885          ProtobufUtil.mergeFrom(builder, tag.getValueArray(), tag.getValueOffset(), tag.getValueLength());
886        } else {
887          ProtobufUtil.mergeFrom(builder, Tag.cloneValue(tag));
888        }
889        ListMultimap<String,Permission> kvPerms =
890            AccessControlUtil.toUsersAndPermissions(builder.build());
891        // Are there permissions for this user?
892        List<Permission> userPerms = kvPerms.get(user.getShortName());
893        if (userPerms != null) {
894          results.addAll(userPerms);
895        }
896        // Are there permissions for any of the groups this user belongs to?
897        String groupNames[] = user.getGroupNames();
898        if (groupNames != null) {
899          for (String group : groupNames) {
900            List<Permission> groupPerms = kvPerms.get(AuthUtil.toGroupEntry(group));
901            if (results != null) {
902              results.addAll(groupPerms);
903            }
904          }
905        }
906      }
907    }
908    return results;
909  }
910}