001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.quotas;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Objects;
031import java.util.Set;
032import java.util.regex.Pattern;
033import org.apache.commons.lang3.StringUtils;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellScanner;
036import org.apache.hadoop.hbase.CompareOperator;
037import org.apache.hadoop.hbase.NamespaceDescriptor;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.Delete;
041import org.apache.hadoop.hbase.client.Get;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.ResultScanner;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
048import org.apache.hadoop.hbase.filter.Filter;
049import org.apache.hadoop.hbase.filter.FilterList;
050import org.apache.hadoop.hbase.filter.QualifierFilter;
051import org.apache.hadoop.hbase.filter.RegexStringComparator;
052import org.apache.hadoop.hbase.filter.RowFilter;
053import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.apache.yetus.audience.InterfaceStability;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
061import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
062import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
063import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
064import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
065
066import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
070
071/**
072 * Helper class to interact with the quota table.
073 * <table>
074 *   <tr><th>ROW-KEY</th><th>FAM/QUAL</th><th>DATA</th><th>DESC</th></tr>
075 *   <tr><td>n.&lt;namespace&gt;</td><td>q:s</td><td>&lt;global-quotas&gt;</td></tr>
076 *   <tr><td>n.&lt;namespace&gt;</td><td>u:p</td><td>&lt;namespace-quota policy&gt;</td></tr>
077 *   <tr><td>n.&lt;namespace&gt;</td><td>u:s</td><td>&lt;SpaceQuotaSnapshot&gt;</td>
078 *      <td>The size of all snapshots against tables in the namespace</td></tr>
079 *   <tr><td>t.&lt;table&gt;</td><td>q:s</td><td>&lt;global-quotas&gt;</td></tr>
080 *   <tr><td>t.&lt;table&gt;</td><td>u:p</td><td>&lt;table-quota policy&gt;</td></tr>
081 *   <tr><td>t.&lt;table&gt;</td><td>u:ss.&lt;snapshot name&gt;</td>
082 *      <td>&lt;SpaceQuotaSnapshot&gt;</td><td>The size of a snapshot against a table</td></tr>
083 *   <tr><td>u.&lt;user&gt;</td><td>q:s</td><td>&lt;global-quotas&gt;</td></tr>
084 *   <tr><td>u.&lt;user&gt;</td><td>q:s.&lt;table&gt;</td><td>&lt;table-quotas&gt;</td></tr>
085 *   <tr><td>u.&lt;user&gt;</td><td>q:s.&lt;ns&gt;</td><td>&lt;namespace-quotas&gt;</td></tr>
086 * </table>
087 */
088@InterfaceAudience.Private
089@InterfaceStability.Evolving
090public class QuotaTableUtil {
091  private static final Logger LOG = LoggerFactory.getLogger(QuotaTableUtil.class);
092
093  /** System table for quotas */
094  public static final TableName QUOTA_TABLE_NAME =
095      TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "quota");
096
097  protected static final byte[] QUOTA_FAMILY_INFO = Bytes.toBytes("q");
098  protected static final byte[] QUOTA_FAMILY_USAGE = Bytes.toBytes("u");
099  protected static final byte[] QUOTA_QUALIFIER_SETTINGS = Bytes.toBytes("s");
100  protected static final byte[] QUOTA_QUALIFIER_SETTINGS_PREFIX = Bytes.toBytes("s.");
101  protected static final byte[] QUOTA_QUALIFIER_POLICY = Bytes.toBytes("p");
102  protected static final byte[] QUOTA_SNAPSHOT_SIZE_QUALIFIER = Bytes.toBytes("ss");
103  protected static final String QUOTA_POLICY_COLUMN =
104      Bytes.toString(QUOTA_FAMILY_USAGE) + ":" + Bytes.toString(QUOTA_QUALIFIER_POLICY);
105  protected static final byte[] QUOTA_USER_ROW_KEY_PREFIX = Bytes.toBytes("u.");
106  protected static final byte[] QUOTA_TABLE_ROW_KEY_PREFIX = Bytes.toBytes("t.");
107  protected static final byte[] QUOTA_NAMESPACE_ROW_KEY_PREFIX = Bytes.toBytes("n.");
108  protected static final byte[] QUOTA_REGION_SERVER_ROW_KEY_PREFIX = Bytes.toBytes("r.");
109  private static final byte[] QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY =
110      Bytes.toBytes("exceedThrottleQuota");
111
112  /*
113   * TODO: Setting specified region server quota isn't supported currently and the row key "r.all"
114   * represents the throttle quota of all region servers
115   */
116  public static final String QUOTA_REGION_SERVER_ROW_KEY = "all";
117
118  /* =========================================================================
119   *  Quota "settings" helpers
120   */
121  public static Quotas getTableQuota(final Connection connection, final TableName table)
122      throws IOException {
123    return getQuotas(connection, getTableRowKey(table));
124  }
125
126  public static Quotas getNamespaceQuota(final Connection connection, final String namespace)
127      throws IOException {
128    return getQuotas(connection, getNamespaceRowKey(namespace));
129  }
130
131  public static Quotas getUserQuota(final Connection connection, final String user)
132      throws IOException {
133    return getQuotas(connection, getUserRowKey(user));
134  }
135
136  public static Quotas getUserQuota(final Connection connection, final String user,
137      final TableName table) throws IOException {
138    return getQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table));
139  }
140
141  public static Quotas getUserQuota(final Connection connection, final String user,
142      final String namespace) throws IOException {
143    return getQuotas(connection, getUserRowKey(user),
144      getSettingsQualifierForUserNamespace(namespace));
145  }
146
147  private static Quotas getQuotas(final Connection connection, final byte[] rowKey)
148      throws IOException {
149    return getQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS);
150  }
151
152  public static Quotas getRegionServerQuota(final Connection connection, final String regionServer)
153      throws IOException {
154    return getQuotas(connection, getRegionServerRowKey(regionServer));
155  }
156
157  private static Quotas getQuotas(final Connection connection, final byte[] rowKey,
158      final byte[] qualifier) throws IOException {
159    Get get = new Get(rowKey);
160    get.addColumn(QUOTA_FAMILY_INFO, qualifier);
161    Result result = doGet(connection, get);
162    if (result.isEmpty()) {
163      return null;
164    }
165    return quotasFromData(result.getValue(QUOTA_FAMILY_INFO, qualifier));
166  }
167
168  public static Get makeGetForTableQuotas(final TableName table) {
169    Get get = new Get(getTableRowKey(table));
170    get.addFamily(QUOTA_FAMILY_INFO);
171    return get;
172  }
173
174  public static Get makeGetForNamespaceQuotas(final String namespace) {
175    Get get = new Get(getNamespaceRowKey(namespace));
176    get.addFamily(QUOTA_FAMILY_INFO);
177    return get;
178  }
179
180  public static Get makeGetForRegionServerQuotas(final String regionServer) {
181    Get get = new Get(getRegionServerRowKey(regionServer));
182    get.addFamily(QUOTA_FAMILY_INFO);
183    return get;
184  }
185
186  public static Get makeGetForUserQuotas(final String user, final Iterable<TableName> tables,
187      final Iterable<String> namespaces) {
188    Get get = new Get(getUserRowKey(user));
189    get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
190    for (final TableName table: tables) {
191      get.addColumn(QUOTA_FAMILY_INFO, getSettingsQualifierForUserTable(table));
192    }
193    for (final String ns: namespaces) {
194      get.addColumn(QUOTA_FAMILY_INFO, getSettingsQualifierForUserNamespace(ns));
195    }
196    return get;
197  }
198
199  public static Scan makeScan(final QuotaFilter filter) {
200    Scan scan = new Scan();
201    scan.addFamily(QUOTA_FAMILY_INFO);
202    if (filter != null && !filter.isNull()) {
203      scan.setFilter(makeFilter(filter));
204    }
205    return scan;
206  }
207
208  /**
209   * converts quotafilter to serializeable filterlists.
210   */
211  public static Filter makeFilter(final QuotaFilter filter) {
212    FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
213    if (StringUtils.isNotEmpty(filter.getUserFilter())) {
214      FilterList userFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
215      boolean hasFilter = false;
216
217      if (StringUtils.isNotEmpty(filter.getNamespaceFilter())) {
218        FilterList nsFilters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
219        nsFilters.addFilter(new RowFilter(CompareOperator.EQUAL,
220            new RegexStringComparator(getUserRowKeyRegex(filter.getUserFilter()), 0)));
221        nsFilters.addFilter(new QualifierFilter(CompareOperator.EQUAL,
222            new RegexStringComparator(
223              getSettingsQualifierRegexForUserNamespace(filter.getNamespaceFilter()), 0)));
224        userFilters.addFilter(nsFilters);
225        hasFilter = true;
226      }
227      if (StringUtils.isNotEmpty(filter.getTableFilter())) {
228        FilterList tableFilters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
229        tableFilters.addFilter(new RowFilter(CompareOperator.EQUAL,
230            new RegexStringComparator(getUserRowKeyRegex(filter.getUserFilter()), 0)));
231        tableFilters.addFilter(new QualifierFilter(CompareOperator.EQUAL,
232            new RegexStringComparator(
233              getSettingsQualifierRegexForUserTable(filter.getTableFilter()), 0)));
234        userFilters.addFilter(tableFilters);
235        hasFilter = true;
236      }
237      if (!hasFilter) {
238        userFilters.addFilter(new RowFilter(CompareOperator.EQUAL,
239            new RegexStringComparator(getUserRowKeyRegex(filter.getUserFilter()), 0)));
240      }
241
242      filterList.addFilter(userFilters);
243    } else if (StringUtils.isNotEmpty(filter.getTableFilter())) {
244      filterList.addFilter(new RowFilter(CompareOperator.EQUAL,
245          new RegexStringComparator(getTableRowKeyRegex(filter.getTableFilter()), 0)));
246    } else if (StringUtils.isNotEmpty(filter.getNamespaceFilter())) {
247      filterList.addFilter(new RowFilter(CompareOperator.EQUAL,
248          new RegexStringComparator(getNamespaceRowKeyRegex(filter.getNamespaceFilter()), 0)));
249    } else if (StringUtils.isNotEmpty(filter.getRegionServerFilter())) {
250      filterList.addFilter(new RowFilter(CompareOperator.EQUAL, new RegexStringComparator(
251          getRegionServerRowKeyRegex(filter.getRegionServerFilter()), 0)));
252    }
253    return filterList;
254  }
255
256  /**
257   * Creates a {@link Scan} which returns only quota snapshots from the quota table.
258   */
259  public static Scan makeQuotaSnapshotScan() {
260    return makeQuotaSnapshotScanForTable(null);
261  }
262
263  /**
264   * Fetches all {@link SpaceQuotaSnapshot} objects from the {@code hbase:quota} table.
265   *
266   * @param conn The HBase connection
267   * @return A map of table names and their computed snapshot.
268   */
269  public static Map<TableName,SpaceQuotaSnapshot> getSnapshots(Connection conn) throws IOException {
270    Map<TableName,SpaceQuotaSnapshot> snapshots = new HashMap<>();
271    try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
272        ResultScanner rs = quotaTable.getScanner(makeQuotaSnapshotScan())) {
273      for (Result r : rs) {
274        extractQuotaSnapshot(r, snapshots);
275      }
276    }
277    return snapshots;
278  }
279
280  /**
281   * Creates a {@link Scan} which returns only {@link SpaceQuotaSnapshot} from the quota table for a
282   * specific table.
283   * @param tn Optionally, a table name to limit the scan's rowkey space. Can be null.
284   */
285  public static Scan makeQuotaSnapshotScanForTable(TableName tn) {
286    Scan s = new Scan();
287    // Limit to "u:v" column
288    s.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
289    if (null == tn) {
290      s.setRowPrefixFilter(QUOTA_TABLE_ROW_KEY_PREFIX);
291    } else {
292      byte[] row = getTableRowKey(tn);
293      // Limit rowspace to the "t:" prefix
294      s.withStartRow(row, true).withStopRow(row, true);
295    }
296    return s;
297  }
298
299  /**
300   * Creates a {@link Get} which returns only {@link SpaceQuotaSnapshot} from the quota table for a
301   * specific table.
302   * @param tn table name to get from. Can't be null.
303   */
304  public static Get makeQuotaSnapshotGetForTable(TableName tn) {
305    Get g = new Get(getTableRowKey(tn));
306    // Limit to "u:v" column
307    g.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
308    return g;
309  }
310
311  /**
312   * Extracts the {@link SpaceViolationPolicy} and {@link TableName} from the provided
313   * {@link Result} and adds them to the given {@link Map}. If the result does not contain
314   * the expected information or the serialized policy in the value is invalid, this method
315   * will throw an {@link IllegalArgumentException}.
316   *
317   * @param result A row from the quota table.
318   * @param snapshots A map of snapshots to add the result of this method into.
319   */
320  public static void extractQuotaSnapshot(
321      Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) {
322    byte[] row = Objects.requireNonNull(result).getRow();
323    if (row == null || row.length == 0) {
324      throw new IllegalArgumentException("Provided result had a null row");
325    }
326    final TableName targetTableName = getTableFromRowKey(row);
327    Cell c = result.getColumnLatestCell(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
328    if (c == null) {
329      throw new IllegalArgumentException("Result did not contain the expected column "
330          + QUOTA_POLICY_COLUMN + ", " + result.toString());
331    }
332    ByteString buffer = UnsafeByteOperations.unsafeWrap(
333        c.getValueArray(), c.getValueOffset(), c.getValueLength());
334    try {
335      QuotaProtos.SpaceQuotaSnapshot snapshot = QuotaProtos.SpaceQuotaSnapshot.parseFrom(buffer);
336      snapshots.put(targetTableName, SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot));
337    } catch (InvalidProtocolBufferException e) {
338      throw new IllegalArgumentException(
339          "Result did not contain a valid SpaceQuota protocol buffer message", e);
340    }
341  }
342
343  public static interface UserQuotasVisitor {
344    void visitUserQuotas(final String userName, final Quotas quotas)
345      throws IOException;
346    void visitUserQuotas(final String userName, final TableName table, final Quotas quotas)
347      throws IOException;
348    void visitUserQuotas(final String userName, final String namespace, final Quotas quotas)
349      throws IOException;
350  }
351
352  public static interface TableQuotasVisitor {
353    void visitTableQuotas(final TableName tableName, final Quotas quotas)
354      throws IOException;
355  }
356
357  public static interface NamespaceQuotasVisitor {
358    void visitNamespaceQuotas(final String namespace, final Quotas quotas)
359      throws IOException;
360  }
361
362  private static interface RegionServerQuotasVisitor {
363    void visitRegionServerQuotas(final String regionServer, final Quotas quotas)
364      throws IOException;
365  }
366
367  public static interface QuotasVisitor extends UserQuotasVisitor, TableQuotasVisitor,
368      NamespaceQuotasVisitor, RegionServerQuotasVisitor {
369  }
370
371  public static void parseResult(final Result result, final QuotasVisitor visitor)
372      throws IOException {
373    byte[] row = result.getRow();
374    if (isNamespaceRowKey(row)) {
375      parseNamespaceResult(result, visitor);
376    } else if (isTableRowKey(row)) {
377      parseTableResult(result, visitor);
378    } else if (isUserRowKey(row)) {
379      parseUserResult(result, visitor);
380    } else if (isRegionServerRowKey(row)) {
381      parseRegionServerResult(result, visitor);
382    } else if (isExceedThrottleQuotaRowKey(row)) {
383      // skip exceed throttle quota row key
384      if (LOG.isDebugEnabled()) {
385        LOG.debug("Skip exceedThrottleQuota row-key when parse quota result");
386      }
387    } else {
388      LOG.warn("unexpected row-key: " + Bytes.toString(row));
389    }
390  }
391
392  public static void parseResultToCollection(final Result result,
393      Collection<QuotaSettings> quotaSettings) throws IOException {
394
395    QuotaTableUtil.parseResult(result, new QuotaTableUtil.QuotasVisitor() {
396      @Override
397      public void visitUserQuotas(String userName, Quotas quotas) {
398        quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, quotas));
399      }
400
401      @Override
402      public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
403        quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, table, quotas));
404      }
405
406      @Override
407      public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
408        quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, namespace, quotas));
409      }
410
411      @Override
412      public void visitTableQuotas(TableName tableName, Quotas quotas) {
413        quotaSettings.addAll(QuotaSettingsFactory.fromTableQuotas(tableName, quotas));
414      }
415
416      @Override
417      public void visitNamespaceQuotas(String namespace, Quotas quotas) {
418        quotaSettings.addAll(QuotaSettingsFactory.fromNamespaceQuotas(namespace, quotas));
419      }
420
421      @Override
422      public void visitRegionServerQuotas(String regionServer, Quotas quotas) {
423        quotaSettings.addAll(QuotaSettingsFactory.fromRegionServerQuotas(regionServer, quotas));
424      }
425    });
426  }
427
428  public static void parseNamespaceResult(final Result result,
429      final NamespaceQuotasVisitor visitor) throws IOException {
430    String namespace = getNamespaceFromRowKey(result.getRow());
431    parseNamespaceResult(namespace, result, visitor);
432  }
433
434  protected static void parseNamespaceResult(final String namespace, final Result result,
435      final NamespaceQuotasVisitor visitor) throws IOException {
436    byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
437    if (data != null) {
438      Quotas quotas = quotasFromData(data);
439      visitor.visitNamespaceQuotas(namespace, quotas);
440    }
441  }
442
443  private static void parseRegionServerResult(final Result result,
444      final RegionServerQuotasVisitor visitor) throws IOException {
445    String rs = getRegionServerFromRowKey(result.getRow());
446    parseRegionServerResult(rs, result, visitor);
447  }
448
449  private static void parseRegionServerResult(final String regionServer, final Result result,
450      final RegionServerQuotasVisitor visitor) throws IOException {
451    byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
452    if (data != null) {
453      Quotas quotas = quotasFromData(data);
454      visitor.visitRegionServerQuotas(regionServer, quotas);
455    }
456  }
457
458  public static void parseTableResult(final Result result, final TableQuotasVisitor visitor)
459      throws IOException {
460    TableName table = getTableFromRowKey(result.getRow());
461    parseTableResult(table, result, visitor);
462  }
463
464  protected static void parseTableResult(final TableName table, final Result result,
465      final TableQuotasVisitor visitor) throws IOException {
466    byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
467    if (data != null) {
468      Quotas quotas = quotasFromData(data);
469      visitor.visitTableQuotas(table, quotas);
470    }
471  }
472
473  public static void parseUserResult(final Result result, final UserQuotasVisitor visitor)
474      throws IOException {
475    String userName = getUserFromRowKey(result.getRow());
476    parseUserResult(userName, result, visitor);
477  }
478
479  protected static void parseUserResult(final String userName, final Result result,
480      final UserQuotasVisitor visitor) throws IOException {
481    Map<byte[], byte[]> familyMap = result.getFamilyMap(QUOTA_FAMILY_INFO);
482    if (familyMap == null || familyMap.isEmpty()) return;
483
484    for (Map.Entry<byte[], byte[]> entry: familyMap.entrySet()) {
485      Quotas quotas = quotasFromData(entry.getValue());
486      if (Bytes.startsWith(entry.getKey(), QUOTA_QUALIFIER_SETTINGS_PREFIX)) {
487        String name = Bytes.toString(entry.getKey(), QUOTA_QUALIFIER_SETTINGS_PREFIX.length);
488        if (name.charAt(name.length() - 1) == TableName.NAMESPACE_DELIM) {
489          String namespace = name.substring(0, name.length() - 1);
490          visitor.visitUserQuotas(userName, namespace, quotas);
491        } else {
492          TableName table = TableName.valueOf(name);
493          visitor.visitUserQuotas(userName, table, quotas);
494        }
495      } else if (Bytes.equals(entry.getKey(), QUOTA_QUALIFIER_SETTINGS)) {
496        visitor.visitUserQuotas(userName, quotas);
497      }
498    }
499  }
500
501  /**
502   * Creates a {@link Put} to store the given {@code snapshot} for the given {@code tableName} in
503   * the quota table.
504   */
505  static Put createPutForSpaceSnapshot(TableName tableName, SpaceQuotaSnapshot snapshot) {
506    Put p = new Put(getTableRowKey(tableName));
507    p.addColumn(
508        QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY,
509        SpaceQuotaSnapshot.toProtoSnapshot(snapshot).toByteArray());
510    return p;
511  }
512
513  /**
514   * Creates a {@link Get} for the HBase snapshot's size against the given table.
515   */
516  static Get makeGetForSnapshotSize(TableName tn, String snapshot) {
517    Get g = new Get(Bytes.add(QUOTA_TABLE_ROW_KEY_PREFIX, Bytes.toBytes(tn.toString())));
518    g.addColumn(
519        QUOTA_FAMILY_USAGE,
520        Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshot)));
521    return g;
522  }
523
524  /**
525   * Creates a {@link Put} to persist the current size of the {@code snapshot} with respect to
526   * the given {@code table}.
527   */
528  static Put createPutForSnapshotSize(TableName tableName, String snapshot, long size) {
529    // We just need a pb message with some `long usage`, so we can just reuse the
530    // SpaceQuotaSnapshot message instead of creating a new one.
531    Put p = new Put(getTableRowKey(tableName));
532    p.addColumn(QUOTA_FAMILY_USAGE, getSnapshotSizeQualifier(snapshot),
533        org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot
534            .newBuilder().setQuotaUsage(size).build().toByteArray());
535    return p;
536  }
537
538  /**
539   * Creates a {@code Put} for the namespace's total snapshot size.
540   */
541  static Put createPutForNamespaceSnapshotSize(String namespace, long size) {
542    Put p = new Put(getNamespaceRowKey(namespace));
543    p.addColumn(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER,
544        org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot
545            .newBuilder().setQuotaUsage(size).build().toByteArray());
546    return p;
547  }
548
549  /**
550   * Returns a list of {@code Delete} to remove given table snapshot
551   * entries to remove from quota table
552   * @param snapshotEntriesToRemove the entries to remove
553   */
554  static List<Delete> createDeletesForExistingTableSnapshotSizes(
555      Multimap<TableName, String> snapshotEntriesToRemove) {
556    List<Delete> deletes = new ArrayList<>();
557    for (Map.Entry<TableName, Collection<String>> entry : snapshotEntriesToRemove.asMap()
558        .entrySet()) {
559      for (String snapshot : entry.getValue()) {
560        Delete d = new Delete(getTableRowKey(entry.getKey()));
561        d.addColumns(QUOTA_FAMILY_USAGE,
562            Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshot)));
563        deletes.add(d);
564      }
565    }
566    return deletes;
567  }
568
569  /**
570   * Returns a list of {@code Delete} to remove all table snapshot entries from quota table.
571   * @param connection connection to re-use
572   */
573  static List<Delete> createDeletesForExistingTableSnapshotSizes(Connection connection)
574      throws IOException {
575    return createDeletesForExistingSnapshotsFromScan(connection, createScanForSpaceSnapshotSizes());
576  }
577
578  /**
579   * Returns a list of {@code Delete} to remove given namespace snapshot
580   * entries to removefrom quota table
581   * @param snapshotEntriesToRemove the entries to remove
582   */
583  static List<Delete> createDeletesForExistingNamespaceSnapshotSizes(
584      Set<String> snapshotEntriesToRemove) {
585    List<Delete> deletes = new ArrayList<>();
586    for (String snapshot : snapshotEntriesToRemove) {
587      Delete d = new Delete(getNamespaceRowKey(snapshot));
588      d.addColumns(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER);
589      deletes.add(d);
590    }
591    return deletes;
592  }
593
594  /**
595   * Returns a list of {@code Delete} to remove all namespace snapshot entries from quota table.
596   * @param connection connection to re-use
597   */
598  static List<Delete> createDeletesForExistingNamespaceSnapshotSizes(Connection connection)
599      throws IOException {
600    return createDeletesForExistingSnapshotsFromScan(connection,
601        createScanForNamespaceSnapshotSizes());
602  }
603
604  /**
605   * Returns a list of {@code Delete} to remove all entries returned by the passed scanner.
606   * @param connection connection to re-use
607   * @param scan the scanner to use to generate the list of deletes
608   */
609  static List<Delete> createDeletesForExistingSnapshotsFromScan(Connection connection, Scan scan)
610      throws IOException {
611    List<Delete> deletes = new ArrayList<>();
612    try (Table quotaTable = connection.getTable(QUOTA_TABLE_NAME);
613        ResultScanner rs = quotaTable.getScanner(scan)) {
614      for (Result r : rs) {
615        CellScanner cs = r.cellScanner();
616        while (cs.advance()) {
617          Cell c = cs.current();
618          byte[] family = Bytes.copy(c.getFamilyArray(), c.getFamilyOffset(), c.getFamilyLength());
619          byte[] qual =
620              Bytes.copy(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength());
621          Delete d = new Delete(r.getRow());
622          d.addColumns(family, qual);
623          deletes.add(d);
624        }
625      }
626      return deletes;
627    }
628  }
629
630  /**
631   * Fetches the computed size of all snapshots against tables in a namespace for space quotas.
632   */
633  static long getNamespaceSnapshotSize(
634      Connection conn, String namespace) throws IOException {
635    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
636      Result r = quotaTable.get(createGetNamespaceSnapshotSize(namespace));
637      if (r.isEmpty()) {
638        return 0L;
639      }
640      r.advance();
641      return parseSnapshotSize(r.current());
642    } catch (InvalidProtocolBufferException e) {
643      throw new IOException("Could not parse snapshot size value for namespace " + namespace, e);
644    }
645  }
646
647  /**
648   * Creates a {@code Get} to fetch the namespace's total snapshot size.
649   */
650  static Get createGetNamespaceSnapshotSize(String namespace) {
651    Get g = new Get(getNamespaceRowKey(namespace));
652    g.addColumn(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER);
653    return g;
654  }
655
656  /**
657   * Parses the snapshot size from the given Cell's value.
658   */
659  static long parseSnapshotSize(Cell c) throws InvalidProtocolBufferException {
660    ByteString bs = UnsafeByteOperations.unsafeWrap(
661        c.getValueArray(), c.getValueOffset(), c.getValueLength());
662    return QuotaProtos.SpaceQuotaSnapshot.parseFrom(bs).getQuotaUsage();
663  }
664
665  /**
666   * Returns a scanner for all existing namespace snapshot entries.
667   */
668  static Scan createScanForNamespaceSnapshotSizes() {
669    return createScanForNamespaceSnapshotSizes(null);
670  }
671
672  /**
673   * Returns a scanner for all namespace snapshot entries of the given namespace
674   * @param namespace name of the namespace whose snapshot entries are to be scanned
675   */
676  static Scan createScanForNamespaceSnapshotSizes(String namespace) {
677    Scan s = new Scan();
678    if (namespace == null || namespace.isEmpty()) {
679      // Read all namespaces, just look at the row prefix
680      s.setRowPrefixFilter(QUOTA_NAMESPACE_ROW_KEY_PREFIX);
681    } else {
682      // Fetch the exact row for the table
683      byte[] rowkey = getNamespaceRowKey(namespace);
684      // Fetch just this one row
685      s.withStartRow(rowkey).withStopRow(rowkey, true);
686    }
687
688    // Just the usage family and only the snapshot size qualifiers
689    return s.addFamily(QUOTA_FAMILY_USAGE)
690        .setFilter(new ColumnPrefixFilter(QUOTA_SNAPSHOT_SIZE_QUALIFIER));
691  }
692
693  static Scan createScanForSpaceSnapshotSizes() {
694    return createScanForSpaceSnapshotSizes(null);
695  }
696
697  static Scan createScanForSpaceSnapshotSizes(TableName table) {
698    Scan s = new Scan();
699    if (null == table) {
700      // Read all tables, just look at the row prefix
701      s.setRowPrefixFilter(QUOTA_TABLE_ROW_KEY_PREFIX);
702    } else {
703      // Fetch the exact row for the table
704      byte[] rowkey = getTableRowKey(table);
705      // Fetch just this one row
706      s.withStartRow(rowkey).withStopRow(rowkey, true);
707    }
708
709    // Just the usage family and only the snapshot size qualifiers
710    return s.addFamily(QUOTA_FAMILY_USAGE).setFilter(
711        new ColumnPrefixFilter(QUOTA_SNAPSHOT_SIZE_QUALIFIER));
712  }
713
714  /**
715   * Fetches any persisted HBase snapshot sizes stored in the quota table. The sizes here are
716   * computed relative to the table which the snapshot was created from. A snapshot's size will
717   * not include the size of files which the table still refers. These sizes, in bytes, are what
718   * is used internally to compute quota violation for tables and namespaces.
719   *
720   * @return A map of snapshot name to size in bytes per space quota computations
721   */
722  public static Map<String,Long> getObservedSnapshotSizes(Connection conn) throws IOException {
723    try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
724        ResultScanner rs = quotaTable.getScanner(createScanForSpaceSnapshotSizes())) {
725      final Map<String,Long> snapshotSizes = new HashMap<>();
726      for (Result r : rs) {
727        CellScanner cs = r.cellScanner();
728        while (cs.advance()) {
729          Cell c = cs.current();
730          final String snapshot = extractSnapshotNameFromSizeCell(c);
731          final long size = parseSnapshotSize(c);
732          snapshotSizes.put(snapshot, size);
733        }
734      }
735      return snapshotSizes;
736    }
737  }
738
739  /**
740   * Returns a multimap for all existing table snapshot entries.
741   * @param conn connection to re-use
742   */
743  public static Multimap<TableName, String> getTableSnapshots(Connection conn) throws IOException {
744    try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
745        ResultScanner rs = quotaTable.getScanner(createScanForSpaceSnapshotSizes())) {
746      Multimap<TableName, String> snapshots = HashMultimap.create();
747      for (Result r : rs) {
748        CellScanner cs = r.cellScanner();
749        while (cs.advance()) {
750          Cell c = cs.current();
751
752          final String snapshot = extractSnapshotNameFromSizeCell(c);
753          snapshots.put(getTableFromRowKey(r.getRow()), snapshot);
754        }
755      }
756      return snapshots;
757    }
758  }
759
760  /**
761   * Returns a set of the names of all namespaces containing snapshot entries.
762   * @param conn connection to re-use
763   */
764  public static Set<String> getNamespaceSnapshots(Connection conn) throws IOException {
765    try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
766        ResultScanner rs = quotaTable.getScanner(createScanForNamespaceSnapshotSizes())) {
767      Set<String> snapshots = new HashSet<>();
768      for (Result r : rs) {
769        CellScanner cs = r.cellScanner();
770        while (cs.advance()) {
771          cs.current();
772          snapshots.add(getNamespaceFromRowKey(r.getRow()));
773        }
774      }
775      return snapshots;
776    }
777  }
778
779  /*
780   * Returns the current space quota snapshot of the given {@code tableName} from
781   * {@code QuotaTableUtil.QUOTA_TABLE_NAME} or null if the no quota information is available for
782   * that tableName.
783   * @param conn connection to re-use
784   * @param tableName name of the table whose current snapshot is to be retreived
785   */
786  public static SpaceQuotaSnapshot getCurrentSnapshotFromQuotaTable(Connection conn,
787      TableName tableName) throws IOException {
788    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
789      Map<TableName, SpaceQuotaSnapshot> snapshots = new HashMap<>(1);
790      Result result = quotaTable.get(makeQuotaSnapshotGetForTable(tableName));
791      // if we don't have any row corresponding to this get, return null
792      if (result.isEmpty()) {
793        return null;
794      }
795      // otherwise, extract quota snapshot in snapshots object
796      extractQuotaSnapshot(result, snapshots);
797      return snapshots.get(tableName);
798    }
799  }
800
801  /* =========================================================================
802   *  Quotas protobuf helpers
803   */
804  protected static Quotas quotasFromData(final byte[] data) throws IOException {
805    return quotasFromData(data, 0, data.length);
806  }
807
808  protected static Quotas quotasFromData(
809      final byte[] data, int offset, int length) throws IOException {
810    int magicLen = ProtobufMagic.lengthOfPBMagic();
811    if (!ProtobufMagic.isPBMagicPrefix(data, offset, magicLen)) {
812      throw new IOException("Missing pb magic prefix");
813    }
814    return Quotas.parseFrom(new ByteArrayInputStream(data, offset + magicLen, length - magicLen));
815  }
816
817  protected static byte[] quotasToData(final Quotas data) throws IOException {
818    ByteArrayOutputStream stream = new ByteArrayOutputStream();
819    stream.write(ProtobufMagic.PB_MAGIC);
820    data.writeTo(stream);
821    return stream.toByteArray();
822  }
823
824  public static boolean isEmptyQuota(final Quotas quotas) {
825    boolean hasSettings = false;
826    hasSettings |= quotas.hasThrottle();
827    hasSettings |= quotas.hasBypassGlobals();
828    // Only when there is a space quota, make sure there's actually both fields provided
829    // Otherwise, it's a noop.
830    if (quotas.hasSpace()) {
831      hasSettings |= (quotas.getSpace().hasSoftLimit() && quotas.getSpace().hasViolationPolicy());
832    }
833    return !hasSettings;
834  }
835
836  /* =========================================================================
837   *  HTable helpers
838   */
839  protected static Result doGet(final Connection connection, final Get get)
840      throws IOException {
841    try (Table table = connection.getTable(QUOTA_TABLE_NAME)) {
842      return table.get(get);
843    }
844  }
845
846  protected static Result[] doGet(final Connection connection, final List<Get> gets)
847      throws IOException {
848    try (Table table = connection.getTable(QUOTA_TABLE_NAME)) {
849      return table.get(gets);
850    }
851  }
852
853  /* =========================================================================
854   *  Quota table row key helpers
855   */
856  protected static byte[] getUserRowKey(final String user) {
857    return Bytes.add(QUOTA_USER_ROW_KEY_PREFIX, Bytes.toBytes(user));
858  }
859
860  protected static byte[] getTableRowKey(final TableName table) {
861    return Bytes.add(QUOTA_TABLE_ROW_KEY_PREFIX, table.getName());
862  }
863
864  protected static byte[] getNamespaceRowKey(final String namespace) {
865    return Bytes.add(QUOTA_NAMESPACE_ROW_KEY_PREFIX, Bytes.toBytes(namespace));
866  }
867
868  protected static byte[] getRegionServerRowKey(final String regionServer) {
869    return Bytes.add(QUOTA_REGION_SERVER_ROW_KEY_PREFIX, Bytes.toBytes(regionServer));
870  }
871
872  protected static byte[] getSettingsQualifierForUserTable(final TableName tableName) {
873    return Bytes.add(QUOTA_QUALIFIER_SETTINGS_PREFIX, tableName.getName());
874  }
875
876  protected static byte[] getSettingsQualifierForUserNamespace(final String namespace) {
877    return Bytes.add(QUOTA_QUALIFIER_SETTINGS_PREFIX,
878        Bytes.toBytes(namespace + TableName.NAMESPACE_DELIM));
879  }
880
881  protected static String getUserRowKeyRegex(final String user) {
882    return getRowKeyRegEx(QUOTA_USER_ROW_KEY_PREFIX, user);
883  }
884
885  protected static String getTableRowKeyRegex(final String table) {
886    return getRowKeyRegEx(QUOTA_TABLE_ROW_KEY_PREFIX, table);
887  }
888
889  protected static String getNamespaceRowKeyRegex(final String namespace) {
890    return getRowKeyRegEx(QUOTA_NAMESPACE_ROW_KEY_PREFIX, namespace);
891  }
892
893  private static String getRegionServerRowKeyRegex(final String regionServer) {
894    return getRowKeyRegEx(QUOTA_REGION_SERVER_ROW_KEY_PREFIX, regionServer);
895  }
896
897  protected static byte[] getExceedThrottleQuotaRowKey() {
898    return QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY;
899  }
900
901  private static String getRowKeyRegEx(final byte[] prefix, final String regex) {
902    return '^' + Pattern.quote(Bytes.toString(prefix)) + regex + '$';
903  }
904
905  protected static String getSettingsQualifierRegexForUserTable(final String table) {
906    return '^' + Pattern.quote(Bytes.toString(QUOTA_QUALIFIER_SETTINGS_PREFIX)) +
907          table + "(?<!" + Pattern.quote(Character.toString(TableName.NAMESPACE_DELIM)) + ")$";
908  }
909
910  protected static String getSettingsQualifierRegexForUserNamespace(final String namespace) {
911    return '^' + Pattern.quote(Bytes.toString(QUOTA_QUALIFIER_SETTINGS_PREFIX)) +
912                  namespace + Pattern.quote(Character.toString(TableName.NAMESPACE_DELIM)) + '$';
913  }
914
915  protected static boolean isNamespaceRowKey(final byte[] key) {
916    return Bytes.startsWith(key, QUOTA_NAMESPACE_ROW_KEY_PREFIX);
917  }
918
919  protected static String getNamespaceFromRowKey(final byte[] key) {
920    return Bytes.toString(key, QUOTA_NAMESPACE_ROW_KEY_PREFIX.length);
921  }
922
923  protected static boolean isRegionServerRowKey(final byte[] key) {
924    return Bytes.startsWith(key, QUOTA_REGION_SERVER_ROW_KEY_PREFIX);
925  }
926
927  private static boolean isExceedThrottleQuotaRowKey(final byte[] key) {
928    return Bytes.equals(key, QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY);
929  }
930
931  protected static String getRegionServerFromRowKey(final byte[] key) {
932    return Bytes.toString(key, QUOTA_REGION_SERVER_ROW_KEY_PREFIX.length);
933  }
934
935  protected static boolean isTableRowKey(final byte[] key) {
936    return Bytes.startsWith(key, QUOTA_TABLE_ROW_KEY_PREFIX);
937  }
938
939  protected static TableName getTableFromRowKey(final byte[] key) {
940    return TableName.valueOf(Bytes.toString(key, QUOTA_TABLE_ROW_KEY_PREFIX.length));
941  }
942
943  protected static boolean isUserRowKey(final byte[] key) {
944    return Bytes.startsWith(key, QUOTA_USER_ROW_KEY_PREFIX);
945  }
946
947  protected static String getUserFromRowKey(final byte[] key) {
948    return Bytes.toString(key, QUOTA_USER_ROW_KEY_PREFIX.length);
949  }
950
951  protected static SpaceQuota getProtoViolationPolicy(SpaceViolationPolicy policy) {
952    return SpaceQuota.newBuilder()
953          .setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy))
954          .build();
955  }
956
957  protected static SpaceViolationPolicy getViolationPolicy(SpaceQuota proto) {
958    if (!proto.hasViolationPolicy()) {
959      throw new IllegalArgumentException("Protobuf SpaceQuota does not have violation policy.");
960    }
961    return ProtobufUtil.toViolationPolicy(proto.getViolationPolicy());
962  }
963
964  protected static byte[] getSnapshotSizeQualifier(String snapshotName) {
965    return Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshotName));
966  }
967
968  protected static String extractSnapshotNameFromSizeCell(Cell c) {
969    return Bytes.toString(
970        c.getQualifierArray(), c.getQualifierOffset() + QUOTA_SNAPSHOT_SIZE_QUALIFIER.length,
971        c.getQualifierLength() - QUOTA_SNAPSHOT_SIZE_QUALIFIER.length);
972  }
973
974  protected static long extractSnapshotSize(
975      byte[] data, int offset, int length) throws InvalidProtocolBufferException {
976    ByteString byteStr = UnsafeByteOperations.unsafeWrap(data, offset, length);
977    return org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot
978        .parseFrom(byteStr).getQuotaUsage();
979  }
980}