1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.access;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Set;
29  import java.util.TreeMap;
30  import java.util.TreeSet;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.exceptions.DeserializationException;
36  import org.apache.hadoop.hbase.HColumnDescriptor;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.HTableDescriptor;
39  import org.apache.hadoop.hbase.KeyValue;
40  import org.apache.hadoop.hbase.catalog.MetaReader;
41  import org.apache.hadoop.hbase.client.Delete;
42  import org.apache.hadoop.hbase.client.Get;
43  import org.apache.hadoop.hbase.client.HTable;
44  import org.apache.hadoop.hbase.client.Put;
45  import org.apache.hadoop.hbase.client.Result;
46  import org.apache.hadoop.hbase.client.ResultScanner;
47  import org.apache.hadoop.hbase.client.Scan;
48  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
49  import org.apache.hadoop.hbase.filter.QualifierFilter;
50  import org.apache.hadoop.hbase.filter.RegexStringComparator;
51  import org.apache.hadoop.hbase.io.compress.Compression;
52  import org.apache.hadoop.hbase.master.MasterServices;
53  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
54  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
55  import org.apache.hadoop.hbase.regionserver.BloomType;
56  import org.apache.hadoop.hbase.regionserver.HRegion;
57  import org.apache.hadoop.hbase.regionserver.InternalScanner;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.Pair;
60  import org.apache.hadoop.io.Text;
61  
62  import com.google.common.collect.ArrayListMultimap;
63  import com.google.common.collect.ListMultimap;
64  import com.google.protobuf.InvalidProtocolBufferException;
65  
66  /**
67   * Maintains lists of permission grants to users and groups to allow for
68   * authorization checks by {@link AccessController}.
69   *
70   * <p>
71   * Access control lists are stored in an "internal" metadata table named
72   * {@code _acl_}. Each table's permission grants are stored as a separate row,
73   * keyed by the table name. KeyValues for permissions assignments are stored
74   * in one of the formats:
75   * <pre>
76   * Key                      Desc
77   * --------                 --------
78   * user                     table level permissions for a user [R=read, W=write]
79   * group                    table level permissions for a group
80   * user,family              column family level permissions for a user
81   * group,family             column family level permissions for a group
82   * user,family,qualifier    column qualifier level permissions for a user
83   * group,family,qualifier   column qualifier level permissions for a group
84   * </pre>
85   * All values are encoded as byte arrays containing the codes from the
86   * org.apache.hadoop.hbase.security.access.TablePermission.Action enum.
87   * </p>
88   */
89  public class AccessControlLists {
90    /** Internal storage table for access control lists */
91    public static final String ACL_TABLE_NAME_STR = "_acl_";
92    public static final byte[] ACL_TABLE_NAME = Bytes.toBytes(ACL_TABLE_NAME_STR);
93    public static final byte[] ACL_GLOBAL_NAME = ACL_TABLE_NAME;
94    /** Column family used to store ACL grants */
95    public static final String ACL_LIST_FAMILY_STR = "l";
96    public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
97  
98    /** Table descriptor for ACL internal table */
99    public static final HTableDescriptor ACL_TABLEDESC = new HTableDescriptor(
100       ACL_TABLE_NAME);
101   static {
102     ACL_TABLEDESC.addFamily(
103         new HColumnDescriptor(ACL_LIST_FAMILY,
104             10, // Ten is arbitrary number.  Keep versions to help debugging.
105             Compression.Algorithm.NONE.getName(), true, true, 8 * 1024,
106             HConstants.FOREVER, BloomType.NONE.toString(),
107             HConstants.REPLICATION_SCOPE_LOCAL));
108   }
109 
110   /**
111    * Delimiter to separate user, column family, and qualifier in
112    * _acl_ table info: column keys */
113   public static final char ACL_KEY_DELIMITER = ',';
114   /** Prefix character to denote group names */
115   public static final String GROUP_PREFIX = "@";
116   /** Configuration key for superusers */
117   public static final String SUPERUSER_CONF_KEY = "hbase.superuser";
118 
119   private static Log LOG = LogFactory.getLog(AccessControlLists.class);
120 
121   /**
122    * Check for existence of {@code _acl_} table and create it if it does not exist
123    * @param master reference to HMaster
124    */
125   static void init(MasterServices master) throws IOException {
126     if (!MetaReader.tableExists(master.getCatalogTracker(), ACL_TABLE_NAME_STR)) {
127       master.createTable(ACL_TABLEDESC, null);
128     }
129   }
130 
131   /**
132    * Stores a new user permission grant in the access control lists table.
133    * @param conf the configuration
134    * @param userPerm the details of the permission to be granted
135    * @throws IOException in the case of an error accessing the metadata table
136    */
137   static void addUserPermission(Configuration conf, UserPermission userPerm)
138       throws IOException {
139     Permission.Action[] actions = userPerm.getActions();
140 
141     Put p = new Put(userPerm.isGlobal() ? ACL_GLOBAL_NAME : userPerm.getTable());
142     byte[] key = userPermissionKey(userPerm);
143 
144     if ((actions == null) || (actions.length == 0)) {
145       String msg = "No actions associated with user '" + Bytes.toString(userPerm.getUser()) + "'";
146       LOG.warn(msg);
147       throw new IOException(msg);
148     }
149 
150     byte[] value = new byte[actions.length];
151     for (int i = 0; i < actions.length; i++) {
152       value[i] = actions[i].code();
153     }
154     p.add(ACL_LIST_FAMILY, key, value);
155     if (LOG.isDebugEnabled()) {
156       LOG.debug("Writing permission for table "+
157           Bytes.toString(userPerm.getTable())+" "+
158           Bytes.toString(key)+": "+Bytes.toStringBinary(value)
159       );
160     }
161     HTable acls = null;
162     try {
163       acls = new HTable(conf, ACL_TABLE_NAME);
164       acls.put(p);
165     } finally {
166       if (acls != null) acls.close();
167     }
168   }
169 
170   /**
171    * Removes a previously granted permission from the stored access control
172    * lists.  The {@link TablePermission} being removed must exactly match what
173    * is stored -- no wildcard matching is attempted.  Ie, if user "bob" has
174    * been granted "READ" access to the "data" table, but only to column family
175    * plus qualifier "info:colA", then trying to call this method with only
176    * user "bob" and the table name "data" (but without specifying the
177    * column qualifier "info:colA") will have no effect.
178    *
179    * @param conf the configuration
180    * @param userPerm the details of the permission to be revoked
181    * @throws IOException if there is an error accessing the metadata table
182    */
183   static void removeUserPermission(Configuration conf, UserPermission userPerm)
184       throws IOException {
185 
186     Delete d = new Delete(userPerm.isGlobal() ? ACL_GLOBAL_NAME : userPerm.getTable());
187     byte[] key = userPermissionKey(userPerm);
188 
189     if (LOG.isDebugEnabled()) {
190       LOG.debug("Removing permission "+ userPerm.toString());
191     }
192     d.deleteColumns(ACL_LIST_FAMILY, key);
193     HTable acls = null;
194     try {
195       acls = new HTable(conf, ACL_TABLE_NAME);
196       acls.delete(d);
197     } finally {
198       if (acls != null) acls.close();
199     }
200   }
201 
202   /**
203    * Remove specified table from the _acl_ table.
204    */
205   static void removeTablePermissions(Configuration conf, byte[] tableName)
206       throws IOException{
207     Delete d = new Delete(tableName);
208 
209     if (LOG.isDebugEnabled()) {
210       LOG.debug("Removing permissions of removed table "+ Bytes.toString(tableName));
211     }
212 
213     HTable acls = null;
214     try {
215       acls = new HTable(conf, ACL_TABLE_NAME);
216       acls.delete(d);
217     } finally {
218       if (acls != null) acls.close();
219     }
220   }
221 
222   /**
223    * Remove specified table column from the _acl_ table.
224    */
225   static void removeTablePermissions(Configuration conf, byte[] tableName, byte[] column)
226       throws IOException{
227 
228     if (LOG.isDebugEnabled()) {
229       LOG.debug("Removing permissions of removed column " + Bytes.toString(column) +
230                 " from table "+ Bytes.toString(tableName));
231     }
232 
233     HTable acls = null;
234     try {
235       acls = new HTable(conf, ACL_TABLE_NAME);
236 
237       Scan scan = new Scan();
238       scan.addFamily(ACL_LIST_FAMILY);
239 
240       String columnName = Bytes.toString(column);
241       scan.setFilter(new QualifierFilter(CompareOp.EQUAL, new RegexStringComparator(
242                      String.format("(%s%s%s)|(%s%s)$",
243                      ACL_KEY_DELIMITER, columnName, ACL_KEY_DELIMITER,
244                      ACL_KEY_DELIMITER, columnName))));
245 
246       Set<byte[]> qualifierSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
247       ResultScanner scanner = acls.getScanner(scan);
248       try {
249         for (Result res : scanner) {
250           for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
251             qualifierSet.add(q);
252           }
253         }
254       } finally {
255         scanner.close();
256       }
257 
258       if (qualifierSet.size() > 0) {
259         Delete d = new Delete(tableName);
260         for (byte[] qualifier : qualifierSet) {
261           d.deleteColumns(ACL_LIST_FAMILY, qualifier);
262         }
263         acls.delete(d);
264       }
265     } finally {
266       if (acls != null) acls.close();
267     }
268   }
269 
270   /**
271    * Build qualifier key from user permission:
272    *  username
273    *  username,family
274    *  username,family,qualifier
275    */
276   static byte[] userPermissionKey(UserPermission userPerm) {
277     byte[] qualifier = userPerm.getQualifier();
278     byte[] family = userPerm.getFamily();
279     byte[] key = userPerm.getUser();
280 
281     if (family != null && family.length > 0) {
282       key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, family));
283       if (qualifier != null && qualifier.length > 0) {
284         key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, qualifier));
285       }
286     }
287 
288     return key;
289   }
290 
291   /**
292    * Returns {@code true} if the given region is part of the {@code _acl_}
293    * metadata table.
294    */
295   static boolean isAclRegion(HRegion region) {
296     return Bytes.equals(ACL_TABLE_NAME, region.getTableDesc().getName());
297   }
298 
299   /**
300    * Returns {@code true} if the given table is {@code _acl_} metadata table.
301    */
302   static boolean isAclTable(HTableDescriptor desc) {
303     return Bytes.equals(ACL_TABLE_NAME, desc.getName());
304   }
305 
306   /**
307    * Loads all of the permission grants stored in a region of the {@code _acl_}
308    * table.
309    *
310    * @param aclRegion
311    * @return a map of the permissions for this table.
312    * @throws IOException
313    */
314   static Map<byte[],ListMultimap<String,TablePermission>> loadAll(
315       HRegion aclRegion)
316     throws IOException {
317 
318     if (!isAclRegion(aclRegion)) {
319       throw new IOException("Can only load permissions from "+ACL_TABLE_NAME_STR);
320     }
321 
322     Map<byte[],ListMultimap<String,TablePermission>> allPerms =
323         new TreeMap<byte[],ListMultimap<String,TablePermission>>(Bytes.BYTES_COMPARATOR);
324 
325     // do a full scan of _acl_ table
326 
327     Scan scan = new Scan();
328     scan.addFamily(ACL_LIST_FAMILY);
329 
330     InternalScanner iScanner = null;
331     try {
332       iScanner = aclRegion.getScanner(scan);
333 
334       while (true) {
335         List<KeyValue> row = new ArrayList<KeyValue>();
336 
337         boolean hasNext = iScanner.next(row);
338         ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
339         byte[] table = null;
340         for (KeyValue kv : row) {
341           if (table == null) {
342             table = kv.getRow();
343           }
344           Pair<String,TablePermission> permissionsOfUserOnTable =
345               parseTablePermissionRecord(table, kv);
346           if (permissionsOfUserOnTable != null) {
347             String username = permissionsOfUserOnTable.getFirst();
348             TablePermission permissions = permissionsOfUserOnTable.getSecond();
349             perms.put(username, permissions);
350           }
351         }
352         if (table != null) {
353           allPerms.put(table, perms);
354         }
355         if (!hasNext) {
356           break;
357         }
358       }
359     } finally {
360       if (iScanner != null) {
361         iScanner.close();
362       }
363     }
364 
365     return allPerms;
366   }
367 
368   /**
369    * Load all permissions from the region server holding {@code _acl_},
370    * primarily intended for testing purposes.
371    */
372   static Map<byte[],ListMultimap<String,TablePermission>> loadAll(
373       Configuration conf) throws IOException {
374     Map<byte[],ListMultimap<String,TablePermission>> allPerms =
375         new TreeMap<byte[],ListMultimap<String,TablePermission>>(Bytes.BYTES_COMPARATOR);
376 
377     // do a full scan of _acl_, filtering on only first table region rows
378 
379     Scan scan = new Scan();
380     scan.addFamily(ACL_LIST_FAMILY);
381 
382     HTable acls = null;
383     ResultScanner scanner = null;
384     try {
385       acls = new HTable(conf, ACL_TABLE_NAME);
386       scanner = acls.getScanner(scan);
387       for (Result row : scanner) {
388         ListMultimap<String,TablePermission> resultPerms =
389             parseTablePermissions(row.getRow(), row);
390         allPerms.put(row.getRow(), resultPerms);
391       }
392     } finally {
393       if (scanner != null) scanner.close();
394       if (acls != null) acls.close();
395     }
396 
397     return allPerms;
398   }
399 
400   /**
401    * Reads user permission assignments stored in the <code>l:</code> column
402    * family of the first table row in <code>_acl_</code>.
403    *
404    * <p>
405    * See {@link AccessControlLists class documentation} for the key structure
406    * used for storage.
407    * </p>
408    */
409   static ListMultimap<String, TablePermission> getTablePermissions(Configuration conf,
410       byte[] tableName) throws IOException {
411     if (tableName == null) tableName = ACL_TABLE_NAME;
412 
413     // for normal user tables, we just read the table row from _acl_
414     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
415     HTable acls = null;
416     try {
417       acls = new HTable(conf, ACL_TABLE_NAME);
418       Get get = new Get(tableName);
419       get.addFamily(ACL_LIST_FAMILY);
420       Result row = acls.get(get);
421       if (!row.isEmpty()) {
422         perms = parseTablePermissions(tableName, row);
423       } else {
424         LOG.info("No permissions found in " + ACL_TABLE_NAME_STR + " for table "
425             + Bytes.toString(tableName));
426       }
427     } finally {
428       if (acls != null) acls.close();
429     }
430 
431     return perms;
432   }
433 
434   /**
435    * Returns the currently granted permissions for a given table as a list of
436    * user plus associated permissions.
437    */
438   static List<UserPermission> getUserPermissions(
439       Configuration conf, byte[] tableName)
440   throws IOException {
441     ListMultimap<String,TablePermission> allPerms = getTablePermissions(
442       conf, tableName);
443 
444     List<UserPermission> perms = new ArrayList<UserPermission>();
445 
446     for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
447       UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
448           entry.getValue().getTable(), entry.getValue().getFamily(),
449           entry.getValue().getQualifier(), entry.getValue().getActions());
450       perms.add(up);
451     }
452     return perms;
453   }
454 
455   private static ListMultimap<String,TablePermission> parseTablePermissions(
456       byte[] table, Result result) {
457     ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
458     if (result != null && result.size() > 0) {
459       for (KeyValue kv : result.raw()) {
460 
461         Pair<String,TablePermission> permissionsOfUserOnTable =
462             parseTablePermissionRecord(table, kv);
463 
464         if (permissionsOfUserOnTable != null) {
465           String username = permissionsOfUserOnTable.getFirst();
466           TablePermission permissions = permissionsOfUserOnTable.getSecond();
467           perms.put(username, permissions);
468         }
469       }
470     }
471     return perms;
472   }
473 
474   private static Pair<String,TablePermission> parseTablePermissionRecord(
475       byte[] table, KeyValue kv) {
476     // return X given a set of permissions encoded in the permissionRecord kv.
477     byte[] family = kv.getFamily();
478 
479     if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
480       return null;
481     }
482 
483     byte[] key = kv.getQualifier();
484     byte[] value = kv.getValue();
485     if (LOG.isDebugEnabled()) {
486       LOG.debug("Read acl: kv ["+
487                 Bytes.toStringBinary(key)+": "+
488                 Bytes.toStringBinary(value)+"]");
489     }
490 
491     // check for a column family appended to the key
492     // TODO: avoid the string conversion to make this more efficient
493     String username = Bytes.toString(key);
494     int idx = username.indexOf(ACL_KEY_DELIMITER);
495     byte[] permFamily = null;
496     byte[] permQualifier = null;
497     if (idx > 0 && idx < username.length()-1) {
498       String remainder = username.substring(idx+1);
499       username = username.substring(0, idx);
500       idx = remainder.indexOf(ACL_KEY_DELIMITER);
501       if (idx > 0 && idx < remainder.length()-1) {
502         permFamily = Bytes.toBytes(remainder.substring(0, idx));
503         permQualifier = Bytes.toBytes(remainder.substring(idx+1));
504       } else {
505         permFamily = Bytes.toBytes(remainder);
506       }
507     }
508 
509     return new Pair<String,TablePermission>(
510         username, new TablePermission(table, permFamily, permQualifier, value));
511   }
512 
513   /**
514    * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances
515    * and returns the resulting byte array.
516    *
517    * Writes a set of permission [user: table permission]
518    */
519   public static byte[] writePermissionsAsBytes(ListMultimap<String, TablePermission> perms,
520       Configuration conf) {
521     return ProtobufUtil.prependPBMagic(ProtobufUtil.toUserTablePermissions(perms).toByteArray());
522   }
523 
524   /**
525    * Reads a set of permissions as {@link org.apache.hadoop.io.Writable} instances
526    * from the input stream.
527    */
528   public static ListMultimap<String, TablePermission> readPermissions(byte[] data,
529       Configuration conf)
530   throws DeserializationException {
531     if (ProtobufUtil.isPBMagicPrefix(data)) {
532       int pblen = ProtobufUtil.lengthOfPBMagic();
533       try {
534         AccessControlProtos.UserTablePermissions perms =
535           AccessControlProtos.UserTablePermissions.newBuilder().mergeFrom(
536             data, pblen, data.length - pblen).build();
537         return ProtobufUtil.toUserTablePermissions(perms);
538       } catch (InvalidProtocolBufferException e) {
539         throw new DeserializationException(e);
540       }
541     } else {
542       ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
543       try {
544         DataInput in = new DataInputStream(new ByteArrayInputStream(data));
545         int length = in.readInt();
546         for (int i=0; i<length; i++) {
547           String user = Text.readString(in);
548           List<TablePermission> userPerms =
549             (List)HbaseObjectWritableFor96Migration.readObject(in, conf);
550           perms.putAll(user, userPerms);
551         }
552       } catch (IOException e) {
553         throw new DeserializationException(e);
554       }
555       return perms;
556     }
557   }
558 
559   /**
560    * Returns whether or not the given name should be interpreted as a group
561    * principal.  Currently this simply checks if the name starts with the
562    * special group prefix character ("@").
563    */
564   public static boolean isGroupPrincipal(String name) {
565     return name != null && name.startsWith(GROUP_PREFIX);
566   }
567 
568   /**
569    * Returns the actual name for a group principal (stripped of the
570    * group prefix).
571    */
572   public static String getGroupName(String aclKey) {
573     if (!isGroupPrincipal(aclKey)) {
574       return aclKey;
575     }
576 
577     return aclKey.substring(GROUP_PREFIX.length());
578   }
579 }