001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.ByteArrayInputStream;
021import java.io.ByteArrayOutputStream;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Objects;
030import java.util.Set;
031import java.util.regex.Pattern;
032import org.apache.commons.lang3.StringUtils;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellScanner;
035import org.apache.hadoop.hbase.CompareOperator;
036import org.apache.hadoop.hbase.NamespaceDescriptor;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.Delete;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.ResultScanner;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
047import org.apache.hadoop.hbase.filter.Filter;
048import org.apache.hadoop.hbase.filter.FilterList;
049import org.apache.hadoop.hbase.filter.QualifierFilter;
050import org.apache.hadoop.hbase.filter.RegexStringComparator;
051import org.apache.hadoop.hbase.filter.RowFilter;
052import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.apache.yetus.audience.InterfaceStability;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
060import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
061import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
062import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
063import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
064
065import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
069
070/**
071 * Helper class to interact with the quota table.
072 * <table>
073 * <tr>
074 * <th>ROW-KEY</th>
075 * <th>FAM/QUAL</th>
076 * <th>DATA</th>
077 * <th>DESC</th>
078 * </tr>
079 * <tr>
080 * <td>n.&lt;namespace&gt;</td>
081 * <td>q:s</td>
082 * <td>&lt;global-quotas&gt;</td>
083 * </tr>
084 * <tr>
085 * <td>n.&lt;namespace&gt;</td>
086 * <td>u:p</td>
087 * <td>&lt;namespace-quota policy&gt;</td>
088 * </tr>
089 * <tr>
090 * <td>n.&lt;namespace&gt;</td>
091 * <td>u:s</td>
092 * <td>&lt;SpaceQuotaSnapshot&gt;</td>
093 * <td>The size of all snapshots against tables in the namespace</td>
094 * </tr>
095 * <tr>
096 * <td>t.&lt;table&gt;</td>
097 * <td>q:s</td>
098 * <td>&lt;global-quotas&gt;</td>
099 * </tr>
100 * <tr>
101 * <td>t.&lt;table&gt;</td>
102 * <td>u:p</td>
103 * <td>&lt;table-quota policy&gt;</td>
104 * </tr>
105 * <tr>
106 * <td>t.&lt;table&gt;</td>
107 * <td>u:ss.&lt;snapshot name&gt;</td>
108 * <td>&lt;SpaceQuotaSnapshot&gt;</td>
109 * <td>The size of a snapshot against a table</td>
110 * </tr>
111 * <tr>
112 * <td>u.&lt;user&gt;</td>
113 * <td>q:s</td>
114 * <td>&lt;global-quotas&gt;</td>
115 * </tr>
116 * <tr>
117 * <td>u.&lt;user&gt;</td>
118 * <td>q:s.&lt;table&gt;</td>
119 * <td>&lt;table-quotas&gt;</td>
120 * </tr>
121 * <tr>
122 * <td>u.&lt;user&gt;</td>
123 * <td>q:s.&lt;ns&gt;</td>
124 * <td>&lt;namespace-quotas&gt;</td>
125 * </tr>
126 * </table>
127 */
128@InterfaceAudience.Private
129@InterfaceStability.Evolving
130public class QuotaTableUtil {
131  private static final Logger LOG = LoggerFactory.getLogger(QuotaTableUtil.class);
132
133  /** System table for quotas */
134  public static final TableName QUOTA_TABLE_NAME =
135    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "quota");
136
137  protected static final byte[] QUOTA_FAMILY_INFO = Bytes.toBytes("q");
138  protected static final byte[] QUOTA_FAMILY_USAGE = Bytes.toBytes("u");
139  protected static final byte[] QUOTA_QUALIFIER_SETTINGS = Bytes.toBytes("s");
140  protected static final byte[] QUOTA_QUALIFIER_SETTINGS_PREFIX = Bytes.toBytes("s.");
141  protected static final byte[] QUOTA_QUALIFIER_POLICY = Bytes.toBytes("p");
142  protected static final byte[] QUOTA_SNAPSHOT_SIZE_QUALIFIER = Bytes.toBytes("ss");
143  protected static final String QUOTA_POLICY_COLUMN =
144    Bytes.toString(QUOTA_FAMILY_USAGE) + ":" + Bytes.toString(QUOTA_QUALIFIER_POLICY);
145  protected static final byte[] QUOTA_USER_ROW_KEY_PREFIX = Bytes.toBytes("u.");
146  protected static final byte[] QUOTA_TABLE_ROW_KEY_PREFIX = Bytes.toBytes("t.");
147  protected static final byte[] QUOTA_NAMESPACE_ROW_KEY_PREFIX = Bytes.toBytes("n.");
148  protected static final byte[] QUOTA_REGION_SERVER_ROW_KEY_PREFIX = Bytes.toBytes("r.");
149  private static final byte[] QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY =
150    Bytes.toBytes("exceedThrottleQuota");
151
152  /*
153   * TODO: Setting specified region server quota isn't supported currently and the row key "r.all"
154   * represents the throttle quota of all region servers
155   */
156  public static final String QUOTA_REGION_SERVER_ROW_KEY = "all";
157
158  /*
159   * ========================================================================= Quota "settings"
160   * helpers
161   */
162  public static Quotas getTableQuota(final Connection connection, final TableName table)
163    throws IOException {
164    return getQuotas(connection, getTableRowKey(table));
165  }
166
167  public static Quotas getNamespaceQuota(final Connection connection, final String namespace)
168    throws IOException {
169    return getQuotas(connection, getNamespaceRowKey(namespace));
170  }
171
172  public static Quotas getUserQuota(final Connection connection, final String user)
173    throws IOException {
174    return getQuotas(connection, getUserRowKey(user));
175  }
176
177  public static Quotas getUserQuota(final Connection connection, final String user,
178    final TableName table) throws IOException {
179    return getQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table));
180  }
181
182  public static Quotas getUserQuota(final Connection connection, final String user,
183    final String namespace) throws IOException {
184    return getQuotas(connection, getUserRowKey(user),
185      getSettingsQualifierForUserNamespace(namespace));
186  }
187
188  private static Quotas getQuotas(final Connection connection, final byte[] rowKey)
189    throws IOException {
190    return getQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS);
191  }
192
193  public static Quotas getRegionServerQuota(final Connection connection, final String regionServer)
194    throws IOException {
195    return getQuotas(connection, getRegionServerRowKey(regionServer));
196  }
197
198  private static Quotas getQuotas(final Connection connection, final byte[] rowKey,
199    final byte[] qualifier) throws IOException {
200    Get get = new Get(rowKey);
201    get.addColumn(QUOTA_FAMILY_INFO, qualifier);
202    Result result = doGet(connection, get);
203    if (result.isEmpty()) {
204      return null;
205    }
206    return quotasFromData(result.getValue(QUOTA_FAMILY_INFO, qualifier));
207  }
208
209  public static Get makeGetForTableQuotas(final TableName table) {
210    Get get = new Get(getTableRowKey(table));
211    get.addFamily(QUOTA_FAMILY_INFO);
212    return get;
213  }
214
215  public static Get makeGetForNamespaceQuotas(final String namespace) {
216    Get get = new Get(getNamespaceRowKey(namespace));
217    get.addFamily(QUOTA_FAMILY_INFO);
218    return get;
219  }
220
221  public static Get makeGetForRegionServerQuotas(final String regionServer) {
222    Get get = new Get(getRegionServerRowKey(regionServer));
223    get.addFamily(QUOTA_FAMILY_INFO);
224    return get;
225  }
226
227  public static Get makeGetForUserQuotas(final String user, final Iterable<TableName> tables,
228    final Iterable<String> namespaces) {
229    Get get = new Get(getUserRowKey(user));
230    get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
231    for (final TableName table : tables) {
232      get.addColumn(QUOTA_FAMILY_INFO, getSettingsQualifierForUserTable(table));
233    }
234    for (final String ns : namespaces) {
235      get.addColumn(QUOTA_FAMILY_INFO, getSettingsQualifierForUserNamespace(ns));
236    }
237    return get;
238  }
239
240  public static Scan makeScan(final QuotaFilter filter) {
241    Scan scan = new Scan();
242    scan.addFamily(QUOTA_FAMILY_INFO);
243    if (filter != null && !filter.isNull()) {
244      scan.setFilter(makeFilter(filter));
245    }
246    return scan;
247  }
248
249  /**
250   * converts quotafilter to serializeable filterlists.
251   */
252  public static Filter makeFilter(final QuotaFilter filter) {
253    FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
254    if (StringUtils.isNotEmpty(filter.getUserFilter())) {
255      FilterList userFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
256      boolean hasFilter = false;
257
258      if (StringUtils.isNotEmpty(filter.getNamespaceFilter())) {
259        FilterList nsFilters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
260        nsFilters.addFilter(new RowFilter(CompareOperator.EQUAL,
261          new RegexStringComparator(getUserRowKeyRegex(filter.getUserFilter()), 0)));
262        nsFilters.addFilter(new QualifierFilter(CompareOperator.EQUAL, new RegexStringComparator(
263          getSettingsQualifierRegexForUserNamespace(filter.getNamespaceFilter()), 0)));
264        userFilters.addFilter(nsFilters);
265        hasFilter = true;
266      }
267      if (StringUtils.isNotEmpty(filter.getTableFilter())) {
268        FilterList tableFilters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
269        tableFilters.addFilter(new RowFilter(CompareOperator.EQUAL,
270          new RegexStringComparator(getUserRowKeyRegex(filter.getUserFilter()), 0)));
271        tableFilters.addFilter(new QualifierFilter(CompareOperator.EQUAL, new RegexStringComparator(
272          getSettingsQualifierRegexForUserTable(filter.getTableFilter()), 0)));
273        userFilters.addFilter(tableFilters);
274        hasFilter = true;
275      }
276      if (!hasFilter) {
277        userFilters.addFilter(new RowFilter(CompareOperator.EQUAL,
278          new RegexStringComparator(getUserRowKeyRegex(filter.getUserFilter()), 0)));
279      }
280
281      filterList.addFilter(userFilters);
282    } else if (StringUtils.isNotEmpty(filter.getTableFilter())) {
283      filterList.addFilter(new RowFilter(CompareOperator.EQUAL,
284        new RegexStringComparator(getTableRowKeyRegex(filter.getTableFilter()), 0)));
285    } else if (StringUtils.isNotEmpty(filter.getNamespaceFilter())) {
286      filterList.addFilter(new RowFilter(CompareOperator.EQUAL,
287        new RegexStringComparator(getNamespaceRowKeyRegex(filter.getNamespaceFilter()), 0)));
288    } else if (StringUtils.isNotEmpty(filter.getRegionServerFilter())) {
289      filterList.addFilter(new RowFilter(CompareOperator.EQUAL,
290        new RegexStringComparator(getRegionServerRowKeyRegex(filter.getRegionServerFilter()), 0)));
291    }
292    return filterList;
293  }
294
295  /**
296   * Creates a {@link Scan} which returns only quota snapshots from the quota table.
297   */
298  public static Scan makeQuotaSnapshotScan() {
299    return makeQuotaSnapshotScanForTable(null);
300  }
301
302  /**
303   * Fetches all {@link SpaceQuotaSnapshot} objects from the {@code hbase:quota} table.
304   * @param conn The HBase connection
305   * @return A map of table names and their computed snapshot.
306   */
307  public static Map<TableName, SpaceQuotaSnapshot> getSnapshots(Connection conn)
308    throws IOException {
309    Map<TableName, SpaceQuotaSnapshot> snapshots = new HashMap<>();
310    try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
311      ResultScanner rs = quotaTable.getScanner(makeQuotaSnapshotScan())) {
312      for (Result r : rs) {
313        extractQuotaSnapshot(r, snapshots);
314      }
315    }
316    return snapshots;
317  }
318
319  /**
320   * Creates a {@link Scan} which returns only {@link SpaceQuotaSnapshot} from the quota table for a
321   * specific table.
322   * @param tn Optionally, a table name to limit the scan's rowkey space. Can be null.
323   */
324  public static Scan makeQuotaSnapshotScanForTable(TableName tn) {
325    Scan s = new Scan();
326    // Limit to "u:v" column
327    s.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
328    if (null == tn) {
329      s.setStartStopRowForPrefixScan(QUOTA_TABLE_ROW_KEY_PREFIX);
330    } else {
331      byte[] row = getTableRowKey(tn);
332      // Limit rowspace to the "t:" prefix
333      s.withStartRow(row, true).withStopRow(row, true);
334    }
335    return s;
336  }
337
338  /**
339   * Creates a {@link Get} which returns only {@link SpaceQuotaSnapshot} from the quota table for a
340   * specific table.
341   * @param tn table name to get from. Can't be null.
342   */
343  public static Get makeQuotaSnapshotGetForTable(TableName tn) {
344    Get g = new Get(getTableRowKey(tn));
345    // Limit to "u:v" column
346    g.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
347    return g;
348  }
349
350  /**
351   * Extracts the {@link SpaceViolationPolicy} and {@link TableName} from the provided
352   * {@link Result} and adds them to the given {@link Map}. If the result does not contain the
353   * expected information or the serialized policy in the value is invalid, this method will throw
354   * an {@link IllegalArgumentException}.
355   * @param result    A row from the quota table.
356   * @param snapshots A map of snapshots to add the result of this method into.
357   */
358  public static void extractQuotaSnapshot(Result result,
359    Map<TableName, SpaceQuotaSnapshot> snapshots) {
360    byte[] row = Objects.requireNonNull(result).getRow();
361    if (row == null || row.length == 0) {
362      throw new IllegalArgumentException("Provided result had a null row");
363    }
364    final TableName targetTableName = getTableFromRowKey(row);
365    Cell c = result.getColumnLatestCell(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
366    if (c == null) {
367      throw new IllegalArgumentException("Result did not contain the expected column "
368        + QUOTA_POLICY_COLUMN + ", " + result.toString());
369    }
370    ByteString buffer =
371      UnsafeByteOperations.unsafeWrap(c.getValueArray(), c.getValueOffset(), c.getValueLength());
372    try {
373      QuotaProtos.SpaceQuotaSnapshot snapshot = QuotaProtos.SpaceQuotaSnapshot.parseFrom(buffer);
374      snapshots.put(targetTableName, SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot));
375    } catch (InvalidProtocolBufferException e) {
376      throw new IllegalArgumentException(
377        "Result did not contain a valid SpaceQuota protocol buffer message", e);
378    }
379  }
380
381  public static interface UserQuotasVisitor {
382    void visitUserQuotas(final String userName, final Quotas quotas) throws IOException;
383
384    void visitUserQuotas(final String userName, final TableName table, final Quotas quotas)
385      throws IOException;
386
387    void visitUserQuotas(final String userName, final String namespace, final Quotas quotas)
388      throws IOException;
389  }
390
391  public static interface TableQuotasVisitor {
392    void visitTableQuotas(final TableName tableName, final Quotas quotas) throws IOException;
393  }
394
395  public static interface NamespaceQuotasVisitor {
396    void visitNamespaceQuotas(final String namespace, final Quotas quotas) throws IOException;
397  }
398
399  private static interface RegionServerQuotasVisitor {
400    void visitRegionServerQuotas(final String regionServer, final Quotas quotas) throws IOException;
401  }
402
403  public static interface QuotasVisitor extends UserQuotasVisitor, TableQuotasVisitor,
404    NamespaceQuotasVisitor, RegionServerQuotasVisitor {
405  }
406
407  public static void parseResult(final Result result, final QuotasVisitor visitor)
408    throws IOException {
409    byte[] row = result.getRow();
410    if (isNamespaceRowKey(row)) {
411      parseNamespaceResult(result, visitor);
412    } else if (isTableRowKey(row)) {
413      parseTableResult(result, visitor);
414    } else if (isUserRowKey(row)) {
415      parseUserResult(result, visitor);
416    } else if (isRegionServerRowKey(row)) {
417      parseRegionServerResult(result, visitor);
418    } else if (isExceedThrottleQuotaRowKey(row)) {
419      // skip exceed throttle quota row key
420      if (LOG.isDebugEnabled()) {
421        LOG.debug("Skip exceedThrottleQuota row-key when parse quota result");
422      }
423    } else {
424      LOG.warn("unexpected row-key: " + Bytes.toString(row));
425    }
426  }
427
428  public static void parseResultToCollection(final Result result,
429    Collection<QuotaSettings> quotaSettings) throws IOException {
430
431    QuotaTableUtil.parseResult(result, new QuotaTableUtil.QuotasVisitor() {
432      @Override
433      public void visitUserQuotas(String userName, Quotas quotas) {
434        quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, quotas));
435      }
436
437      @Override
438      public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
439        quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, table, quotas));
440      }
441
442      @Override
443      public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
444        quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, namespace, quotas));
445      }
446
447      @Override
448      public void visitTableQuotas(TableName tableName, Quotas quotas) {
449        quotaSettings.addAll(QuotaSettingsFactory.fromTableQuotas(tableName, quotas));
450      }
451
452      @Override
453      public void visitNamespaceQuotas(String namespace, Quotas quotas) {
454        quotaSettings.addAll(QuotaSettingsFactory.fromNamespaceQuotas(namespace, quotas));
455      }
456
457      @Override
458      public void visitRegionServerQuotas(String regionServer, Quotas quotas) {
459        quotaSettings.addAll(QuotaSettingsFactory.fromRegionServerQuotas(regionServer, quotas));
460      }
461    });
462  }
463
464  public static void parseNamespaceResult(final Result result, final NamespaceQuotasVisitor visitor)
465    throws IOException {
466    String namespace = getNamespaceFromRowKey(result.getRow());
467    parseNamespaceResult(namespace, result, visitor);
468  }
469
470  protected static void parseNamespaceResult(final String namespace, final Result result,
471    final NamespaceQuotasVisitor visitor) throws IOException {
472    byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
473    if (data != null) {
474      Quotas quotas = quotasFromData(data);
475      visitor.visitNamespaceQuotas(namespace, quotas);
476    }
477  }
478
479  private static void parseRegionServerResult(final Result result,
480    final RegionServerQuotasVisitor visitor) throws IOException {
481    String rs = getRegionServerFromRowKey(result.getRow());
482    parseRegionServerResult(rs, result, visitor);
483  }
484
485  private static void parseRegionServerResult(final String regionServer, final Result result,
486    final RegionServerQuotasVisitor visitor) throws IOException {
487    byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
488    if (data != null) {
489      Quotas quotas = quotasFromData(data);
490      visitor.visitRegionServerQuotas(regionServer, quotas);
491    }
492  }
493
494  public static void parseTableResult(final Result result, final TableQuotasVisitor visitor)
495    throws IOException {
496    TableName table = getTableFromRowKey(result.getRow());
497    parseTableResult(table, result, visitor);
498  }
499
500  protected static void parseTableResult(final TableName table, final Result result,
501    final TableQuotasVisitor visitor) throws IOException {
502    byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
503    if (data != null) {
504      Quotas quotas = quotasFromData(data);
505      visitor.visitTableQuotas(table, quotas);
506    }
507  }
508
509  public static void parseUserResult(final Result result, final UserQuotasVisitor visitor)
510    throws IOException {
511    String userName = getUserFromRowKey(result.getRow());
512    parseUserResult(userName, result, visitor);
513  }
514
515  protected static void parseUserResult(final String userName, final Result result,
516    final UserQuotasVisitor visitor) throws IOException {
517    Map<byte[], byte[]> familyMap = result.getFamilyMap(QUOTA_FAMILY_INFO);
518    if (familyMap == null || familyMap.isEmpty()) return;
519
520    for (Map.Entry<byte[], byte[]> entry : familyMap.entrySet()) {
521      Quotas quotas = quotasFromData(entry.getValue());
522      if (Bytes.startsWith(entry.getKey(), QUOTA_QUALIFIER_SETTINGS_PREFIX)) {
523        String name = Bytes.toString(entry.getKey(), QUOTA_QUALIFIER_SETTINGS_PREFIX.length);
524        if (name.charAt(name.length() - 1) == TableName.NAMESPACE_DELIM) {
525          String namespace = name.substring(0, name.length() - 1);
526          visitor.visitUserQuotas(userName, namespace, quotas);
527        } else {
528          TableName table = TableName.valueOf(name);
529          visitor.visitUserQuotas(userName, table, quotas);
530        }
531      } else if (Bytes.equals(entry.getKey(), QUOTA_QUALIFIER_SETTINGS)) {
532        visitor.visitUserQuotas(userName, quotas);
533      }
534    }
535  }
536
537  /**
538   * Creates a {@link Put} to store the given {@code snapshot} for the given {@code tableName} in
539   * the quota table.
540   */
541  static Put createPutForSpaceSnapshot(TableName tableName, SpaceQuotaSnapshot snapshot) {
542    Put p = new Put(getTableRowKey(tableName));
543    p.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY,
544      SpaceQuotaSnapshot.toProtoSnapshot(snapshot).toByteArray());
545    return p;
546  }
547
548  /**
549   * Creates a {@link Get} for the HBase snapshot's size against the given table.
550   */
551  static Get makeGetForSnapshotSize(TableName tn, String snapshot) {
552    Get g = new Get(Bytes.add(QUOTA_TABLE_ROW_KEY_PREFIX, Bytes.toBytes(tn.toString())));
553    g.addColumn(QUOTA_FAMILY_USAGE,
554      Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshot)));
555    return g;
556  }
557
558  /**
559   * Creates a {@link Put} to persist the current size of the {@code snapshot} with respect to the
560   * given {@code table}.
561   */
562  static Put createPutForSnapshotSize(TableName tableName, String snapshot, long size) {
563    // We just need a pb message with some `long usage`, so we can just reuse the
564    // SpaceQuotaSnapshot message instead of creating a new one.
565    Put p = new Put(getTableRowKey(tableName));
566    p.addColumn(QUOTA_FAMILY_USAGE, getSnapshotSizeQualifier(snapshot),
567      org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot.newBuilder()
568        .setQuotaUsage(size).build().toByteArray());
569    return p;
570  }
571
572  /**
573   * Creates a {@code Put} for the namespace's total snapshot size.
574   */
575  static Put createPutForNamespaceSnapshotSize(String namespace, long size) {
576    Put p = new Put(getNamespaceRowKey(namespace));
577    p.addColumn(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER,
578      org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot.newBuilder()
579        .setQuotaUsage(size).build().toByteArray());
580    return p;
581  }
582
583  /**
584   * Returns a list of {@code Delete} to remove given table snapshot entries to remove from quota
585   * table
586   * @param snapshotEntriesToRemove the entries to remove
587   */
588  static List<Delete> createDeletesForExistingTableSnapshotSizes(
589    Multimap<TableName, String> snapshotEntriesToRemove) {
590    List<Delete> deletes = new ArrayList<>();
591    for (Map.Entry<TableName, Collection<String>> entry : snapshotEntriesToRemove.asMap()
592      .entrySet()) {
593      for (String snapshot : entry.getValue()) {
594        Delete d = new Delete(getTableRowKey(entry.getKey()));
595        d.addColumns(QUOTA_FAMILY_USAGE,
596          Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshot)));
597        deletes.add(d);
598      }
599    }
600    return deletes;
601  }
602
603  /**
604   * Returns a list of {@code Delete} to remove all table snapshot entries from quota table.
605   * @param connection connection to re-use
606   */
607  static List<Delete> createDeletesForExistingTableSnapshotSizes(Connection connection)
608    throws IOException {
609    return createDeletesForExistingSnapshotsFromScan(connection, createScanForSpaceSnapshotSizes());
610  }
611
612  /**
613   * Returns a list of {@code Delete} to remove given namespace snapshot entries to removefrom quota
614   * table
615   * @param snapshotEntriesToRemove the entries to remove
616   */
617  static List<Delete>
618    createDeletesForExistingNamespaceSnapshotSizes(Set<String> snapshotEntriesToRemove) {
619    List<Delete> deletes = new ArrayList<>();
620    for (String snapshot : snapshotEntriesToRemove) {
621      Delete d = new Delete(getNamespaceRowKey(snapshot));
622      d.addColumns(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER);
623      deletes.add(d);
624    }
625    return deletes;
626  }
627
628  /**
629   * Returns a list of {@code Delete} to remove all namespace snapshot entries from quota table.
630   * @param connection connection to re-use
631   */
632  static List<Delete> createDeletesForExistingNamespaceSnapshotSizes(Connection connection)
633    throws IOException {
634    return createDeletesForExistingSnapshotsFromScan(connection,
635      createScanForNamespaceSnapshotSizes());
636  }
637
638  /**
639   * Returns a list of {@code Delete} to remove all entries returned by the passed scanner.
640   * @param connection connection to re-use
641   * @param scan       the scanner to use to generate the list of deletes
642   */
643  static List<Delete> createDeletesForExistingSnapshotsFromScan(Connection connection, Scan scan)
644    throws IOException {
645    List<Delete> deletes = new ArrayList<>();
646    try (Table quotaTable = connection.getTable(QUOTA_TABLE_NAME);
647      ResultScanner rs = quotaTable.getScanner(scan)) {
648      for (Result r : rs) {
649        CellScanner cs = r.cellScanner();
650        while (cs.advance()) {
651          Cell c = cs.current();
652          byte[] family = Bytes.copy(c.getFamilyArray(), c.getFamilyOffset(), c.getFamilyLength());
653          byte[] qual =
654            Bytes.copy(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength());
655          Delete d = new Delete(r.getRow());
656          d.addColumns(family, qual);
657          deletes.add(d);
658        }
659      }
660      return deletes;
661    }
662  }
663
664  /**
665   * Remove table usage snapshots (u:p columns) for the namespace passed
666   * @param connection connection to re-use
667   * @param namespace  the namespace to fetch the list of table usage snapshots
668   */
669  static void deleteTableUsageSnapshotsForNamespace(Connection connection, String namespace)
670    throws IOException {
671    Scan s = new Scan();
672    // Get rows for all tables in namespace
673    s.setStartStopRowForPrefixScan(
674      Bytes.add(QUOTA_TABLE_ROW_KEY_PREFIX, Bytes.toBytes(namespace + TableName.NAMESPACE_DELIM)));
675    // Scan for table usage column (u:p) in quota table
676    s.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
677    // Scan for table quota column (q:s) if table has a space quota defined
678    s.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
679    try (Table quotaTable = connection.getTable(QUOTA_TABLE_NAME);
680      ResultScanner rs = quotaTable.getScanner(s)) {
681      for (Result r : rs) {
682        byte[] data = r.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
683        // if table does not have a table space quota defined, delete table usage column (u:p)
684        if (data == null) {
685          Delete delete = new Delete(r.getRow());
686          delete.addColumns(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
687          quotaTable.delete(delete);
688        }
689      }
690    }
691  }
692
693  /**
694   * Fetches the computed size of all snapshots against tables in a namespace for space quotas.
695   */
696  static long getNamespaceSnapshotSize(Connection conn, String namespace) throws IOException {
697    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
698      Result r = quotaTable.get(createGetNamespaceSnapshotSize(namespace));
699      if (r.isEmpty()) {
700        return 0L;
701      }
702      r.advance();
703      return parseSnapshotSize(r.current());
704    } catch (InvalidProtocolBufferException e) {
705      throw new IOException("Could not parse snapshot size value for namespace " + namespace, e);
706    }
707  }
708
709  /**
710   * Creates a {@code Get} to fetch the namespace's total snapshot size.
711   */
712  static Get createGetNamespaceSnapshotSize(String namespace) {
713    Get g = new Get(getNamespaceRowKey(namespace));
714    g.addColumn(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER);
715    return g;
716  }
717
718  /**
719   * Parses the snapshot size from the given Cell's value.
720   */
721  static long parseSnapshotSize(Cell c) throws InvalidProtocolBufferException {
722    ByteString bs =
723      UnsafeByteOperations.unsafeWrap(c.getValueArray(), c.getValueOffset(), c.getValueLength());
724    return QuotaProtos.SpaceQuotaSnapshot.parseFrom(bs).getQuotaUsage();
725  }
726
727  /**
728   * Returns a scanner for all existing namespace snapshot entries.
729   */
730  static Scan createScanForNamespaceSnapshotSizes() {
731    return createScanForNamespaceSnapshotSizes(null);
732  }
733
734  /**
735   * Returns a scanner for all namespace snapshot entries of the given namespace
736   * @param namespace name of the namespace whose snapshot entries are to be scanned
737   */
738  static Scan createScanForNamespaceSnapshotSizes(String namespace) {
739    Scan s = new Scan();
740    if (namespace == null || namespace.isEmpty()) {
741      // Read all namespaces, just look at the row prefix
742      s.setStartStopRowForPrefixScan(QUOTA_NAMESPACE_ROW_KEY_PREFIX);
743    } else {
744      // Fetch the exact row for the table
745      byte[] rowkey = getNamespaceRowKey(namespace);
746      // Fetch just this one row
747      s.withStartRow(rowkey).withStopRow(rowkey, true);
748    }
749
750    // Just the usage family and only the snapshot size qualifiers
751    return s.addFamily(QUOTA_FAMILY_USAGE)
752      .setFilter(new ColumnPrefixFilter(QUOTA_SNAPSHOT_SIZE_QUALIFIER));
753  }
754
755  static Scan createScanForSpaceSnapshotSizes() {
756    return createScanForSpaceSnapshotSizes(null);
757  }
758
759  static Scan createScanForSpaceSnapshotSizes(TableName table) {
760    Scan s = new Scan();
761    if (null == table) {
762      // Read all tables, just look at the row prefix
763      s.setStartStopRowForPrefixScan(QUOTA_TABLE_ROW_KEY_PREFIX);
764    } else {
765      // Fetch the exact row for the table
766      byte[] rowkey = getTableRowKey(table);
767      // Fetch just this one row
768      s.withStartRow(rowkey).withStopRow(rowkey, true);
769    }
770
771    // Just the usage family and only the snapshot size qualifiers
772    return s.addFamily(QUOTA_FAMILY_USAGE)
773      .setFilter(new ColumnPrefixFilter(QUOTA_SNAPSHOT_SIZE_QUALIFIER));
774  }
775
776  /**
777   * Fetches any persisted HBase snapshot sizes stored in the quota table. The sizes here are
778   * computed relative to the table which the snapshot was created from. A snapshot's size will not
779   * include the size of files which the table still refers. These sizes, in bytes, are what is used
780   * internally to compute quota violation for tables and namespaces.
781   * @return A map of snapshot name to size in bytes per space quota computations
782   */
783  public static Map<String, Long> getObservedSnapshotSizes(Connection conn) throws IOException {
784    try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
785      ResultScanner rs = quotaTable.getScanner(createScanForSpaceSnapshotSizes())) {
786      final Map<String, Long> snapshotSizes = new HashMap<>();
787      for (Result r : rs) {
788        CellScanner cs = r.cellScanner();
789        while (cs.advance()) {
790          Cell c = cs.current();
791          final String snapshot = extractSnapshotNameFromSizeCell(c);
792          final long size = parseSnapshotSize(c);
793          snapshotSizes.put(snapshot, size);
794        }
795      }
796      return snapshotSizes;
797    }
798  }
799
800  /**
801   * Returns a multimap for all existing table snapshot entries.
802   * @param conn connection to re-use
803   */
804  public static Multimap<TableName, String> getTableSnapshots(Connection conn) throws IOException {
805    try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
806      ResultScanner rs = quotaTable.getScanner(createScanForSpaceSnapshotSizes())) {
807      Multimap<TableName, String> snapshots = HashMultimap.create();
808      for (Result r : rs) {
809        CellScanner cs = r.cellScanner();
810        while (cs.advance()) {
811          Cell c = cs.current();
812
813          final String snapshot = extractSnapshotNameFromSizeCell(c);
814          snapshots.put(getTableFromRowKey(r.getRow()), snapshot);
815        }
816      }
817      return snapshots;
818    }
819  }
820
821  /**
822   * Returns a set of the names of all namespaces containing snapshot entries.
823   * @param conn connection to re-use
824   */
825  public static Set<String> getNamespaceSnapshots(Connection conn) throws IOException {
826    try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
827      ResultScanner rs = quotaTable.getScanner(createScanForNamespaceSnapshotSizes())) {
828      Set<String> snapshots = new HashSet<>();
829      for (Result r : rs) {
830        CellScanner cs = r.cellScanner();
831        while (cs.advance()) {
832          cs.current();
833          snapshots.add(getNamespaceFromRowKey(r.getRow()));
834        }
835      }
836      return snapshots;
837    }
838  }
839
840  /**
841   * Returns the current space quota snapshot of the given {@code tableName} from
842   * {@code QuotaTableUtil.QUOTA_TABLE_NAME} or null if the no quota information is available for
843   * that tableName.
844   * @param conn      connection to re-use
845   * @param tableName name of the table whose current snapshot is to be retreived
846   */
847  public static SpaceQuotaSnapshot getCurrentSnapshotFromQuotaTable(Connection conn,
848    TableName tableName) throws IOException {
849    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
850      Map<TableName, SpaceQuotaSnapshot> snapshots = new HashMap<>(1);
851      Result result = quotaTable.get(makeQuotaSnapshotGetForTable(tableName));
852      // if we don't have any row corresponding to this get, return null
853      if (result.isEmpty()) {
854        return null;
855      }
856      // otherwise, extract quota snapshot in snapshots object
857      extractQuotaSnapshot(result, snapshots);
858      return snapshots.get(tableName);
859    }
860  }
861
862  /*
863   * ========================================================================= Quotas protobuf
864   * helpers
865   */
866  protected static Quotas quotasFromData(final byte[] data) throws IOException {
867    return quotasFromData(data, 0, data.length);
868  }
869
870  protected static Quotas quotasFromData(final byte[] data, int offset, int length)
871    throws IOException {
872    int magicLen = ProtobufMagic.lengthOfPBMagic();
873    if (!ProtobufMagic.isPBMagicPrefix(data, offset, magicLen)) {
874      throw new IOException("Missing pb magic prefix");
875    }
876    return Quotas.parseFrom(new ByteArrayInputStream(data, offset + magicLen, length - magicLen));
877  }
878
879  protected static byte[] quotasToData(final Quotas data) throws IOException {
880    ByteArrayOutputStream stream = new ByteArrayOutputStream();
881    stream.write(ProtobufMagic.PB_MAGIC);
882    data.writeTo(stream);
883    return stream.toByteArray();
884  }
885
886  public static boolean isEmptyQuota(final Quotas quotas) {
887    boolean hasSettings = false;
888    hasSettings |= quotas.hasThrottle();
889    hasSettings |= quotas.hasBypassGlobals();
890    // Only when there is a space quota, make sure there's actually both fields provided
891    // Otherwise, it's a noop.
892    if (quotas.hasSpace()) {
893      hasSettings |= (quotas.getSpace().hasSoftLimit() && quotas.getSpace().hasViolationPolicy());
894    }
895    return !hasSettings;
896  }
897
898  /*
899   * ========================================================================= HTable helpers
900   */
901  protected static Result doGet(final Connection connection, final Get get) throws IOException {
902    try (Table table = connection.getTable(QUOTA_TABLE_NAME)) {
903      return table.get(get);
904    }
905  }
906
907  protected static Result[] doGet(final Connection connection, final List<Get> gets)
908    throws IOException {
909    try (Table table = connection.getTable(QUOTA_TABLE_NAME)) {
910      return table.get(gets);
911    }
912  }
913
914  /*
915   * ========================================================================= Quota table row key
916   * helpers
917   */
918  protected static byte[] getUserRowKey(final String user) {
919    return Bytes.add(QUOTA_USER_ROW_KEY_PREFIX, Bytes.toBytes(user));
920  }
921
922  protected static byte[] getTableRowKey(final TableName table) {
923    return Bytes.add(QUOTA_TABLE_ROW_KEY_PREFIX, table.getName());
924  }
925
926  protected static byte[] getNamespaceRowKey(final String namespace) {
927    return Bytes.add(QUOTA_NAMESPACE_ROW_KEY_PREFIX, Bytes.toBytes(namespace));
928  }
929
930  protected static byte[] getRegionServerRowKey(final String regionServer) {
931    return Bytes.add(QUOTA_REGION_SERVER_ROW_KEY_PREFIX, Bytes.toBytes(regionServer));
932  }
933
934  protected static byte[] getSettingsQualifierForUserTable(final TableName tableName) {
935    return Bytes.add(QUOTA_QUALIFIER_SETTINGS_PREFIX, tableName.getName());
936  }
937
938  protected static byte[] getSettingsQualifierForUserNamespace(final String namespace) {
939    return Bytes.add(QUOTA_QUALIFIER_SETTINGS_PREFIX,
940      Bytes.toBytes(namespace + TableName.NAMESPACE_DELIM));
941  }
942
943  protected static String getUserRowKeyRegex(final String user) {
944    return getRowKeyRegEx(QUOTA_USER_ROW_KEY_PREFIX, user);
945  }
946
947  protected static String getTableRowKeyRegex(final String table) {
948    return getRowKeyRegEx(QUOTA_TABLE_ROW_KEY_PREFIX, table);
949  }
950
951  protected static String getNamespaceRowKeyRegex(final String namespace) {
952    return getRowKeyRegEx(QUOTA_NAMESPACE_ROW_KEY_PREFIX, namespace);
953  }
954
955  private static String getRegionServerRowKeyRegex(final String regionServer) {
956    return getRowKeyRegEx(QUOTA_REGION_SERVER_ROW_KEY_PREFIX, regionServer);
957  }
958
959  protected static byte[] getExceedThrottleQuotaRowKey() {
960    return QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY;
961  }
962
963  private static String getRowKeyRegEx(final byte[] prefix, final String regex) {
964    return '^' + Pattern.quote(Bytes.toString(prefix)) + regex + '$';
965  }
966
967  protected static String getSettingsQualifierRegexForUserTable(final String table) {
968    return '^' + Pattern.quote(Bytes.toString(QUOTA_QUALIFIER_SETTINGS_PREFIX)) + table + "(?<!"
969      + Pattern.quote(Character.toString(TableName.NAMESPACE_DELIM)) + ")$";
970  }
971
972  protected static String getSettingsQualifierRegexForUserNamespace(final String namespace) {
973    return '^' + Pattern.quote(Bytes.toString(QUOTA_QUALIFIER_SETTINGS_PREFIX)) + namespace
974      + Pattern.quote(Character.toString(TableName.NAMESPACE_DELIM)) + '$';
975  }
976
977  protected static boolean isNamespaceRowKey(final byte[] key) {
978    return Bytes.startsWith(key, QUOTA_NAMESPACE_ROW_KEY_PREFIX);
979  }
980
981  protected static String getNamespaceFromRowKey(final byte[] key) {
982    return Bytes.toString(key, QUOTA_NAMESPACE_ROW_KEY_PREFIX.length);
983  }
984
985  protected static boolean isRegionServerRowKey(final byte[] key) {
986    return Bytes.startsWith(key, QUOTA_REGION_SERVER_ROW_KEY_PREFIX);
987  }
988
989  private static boolean isExceedThrottleQuotaRowKey(final byte[] key) {
990    return Bytes.equals(key, QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY);
991  }
992
993  protected static String getRegionServerFromRowKey(final byte[] key) {
994    return Bytes.toString(key, QUOTA_REGION_SERVER_ROW_KEY_PREFIX.length);
995  }
996
997  protected static boolean isTableRowKey(final byte[] key) {
998    return Bytes.startsWith(key, QUOTA_TABLE_ROW_KEY_PREFIX);
999  }
1000
1001  protected static TableName getTableFromRowKey(final byte[] key) {
1002    return TableName.valueOf(Bytes.toString(key, QUOTA_TABLE_ROW_KEY_PREFIX.length));
1003  }
1004
1005  protected static boolean isUserRowKey(final byte[] key) {
1006    return Bytes.startsWith(key, QUOTA_USER_ROW_KEY_PREFIX);
1007  }
1008
1009  protected static String getUserFromRowKey(final byte[] key) {
1010    return Bytes.toString(key, QUOTA_USER_ROW_KEY_PREFIX.length);
1011  }
1012
1013  protected static SpaceQuota getProtoViolationPolicy(SpaceViolationPolicy policy) {
1014    return SpaceQuota.newBuilder().setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy))
1015      .build();
1016  }
1017
1018  protected static SpaceViolationPolicy getViolationPolicy(SpaceQuota proto) {
1019    if (!proto.hasViolationPolicy()) {
1020      throw new IllegalArgumentException("Protobuf SpaceQuota does not have violation policy.");
1021    }
1022    return ProtobufUtil.toViolationPolicy(proto.getViolationPolicy());
1023  }
1024
1025  protected static byte[] getSnapshotSizeQualifier(String snapshotName) {
1026    return Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshotName));
1027  }
1028
1029  protected static String extractSnapshotNameFromSizeCell(Cell c) {
1030    return Bytes.toString(c.getQualifierArray(),
1031      c.getQualifierOffset() + QUOTA_SNAPSHOT_SIZE_QUALIFIER.length,
1032      c.getQualifierLength() - QUOTA_SNAPSHOT_SIZE_QUALIFIER.length);
1033  }
1034
1035  protected static long extractSnapshotSize(byte[] data, int offset, int length)
1036    throws InvalidProtocolBufferException {
1037    ByteString byteStr = UnsafeByteOperations.unsafeWrap(data, offset, length);
1038    return org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot
1039      .parseFrom(byteStr).getQuotaUsage();
1040  }
1041}