001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.quotas;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Objects;
031import java.util.Set;
032import java.util.regex.Pattern;
033
034import org.apache.commons.lang3.StringUtils;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.CellScanner;
037import org.apache.hadoop.hbase.CompareOperator;
038import org.apache.hadoop.hbase.NamespaceDescriptor;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.Delete;
042import org.apache.hadoop.hbase.client.Get;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
049import org.apache.hadoop.hbase.filter.Filter;
050import org.apache.hadoop.hbase.filter.FilterList;
051import org.apache.hadoop.hbase.filter.QualifierFilter;
052import org.apache.hadoop.hbase.filter.RegexStringComparator;
053import org.apache.hadoop.hbase.filter.RowFilter;
054import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.apache.yetus.audience.InterfaceStability;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
062import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
063import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
064import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
065import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
066
067import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
071
072/**
073 * Helper class to interact with the quota table.
074 * <table>
075 *   <tr><th>ROW-KEY</th><th>FAM/QUAL</th><th>DATA</th><th>DESC</th></tr>
076 *   <tr><td>n.&lt;namespace&gt;</td><td>q:s</td><td>&lt;global-quotas&gt;</td></tr>
077 *   <tr><td>n.&lt;namespace&gt;</td><td>u:p</td><td>&lt;namespace-quota policy&gt;</td></tr>
078 *   <tr><td>n.&lt;namespace&gt;</td><td>u:s</td><td>&lt;SpaceQuotaSnapshot&gt;</td>
079 *      <td>The size of all snapshots against tables in the namespace</td></tr>
080 *   <tr><td>t.&lt;table&gt;</td><td>q:s</td><td>&lt;global-quotas&gt;</td></tr>
081 *   <tr><td>t.&lt;table&gt;</td><td>u:p</td><td>&lt;table-quota policy&gt;</td></tr>
082 *   <tr><td>t.&lt;table&gt;</td><td>u:ss.&lt;snapshot name&gt;</td>
083 *      <td>&lt;SpaceQuotaSnapshot&gt;</td><td>The size of a snapshot against a table</td></tr>
084 *   <tr><td>u.&lt;user&gt;</td><td>q:s</td><td>&lt;global-quotas&gt;</td></tr>
085 *   <tr><td>u.&lt;user&gt;</td><td>q:s.&lt;table&gt;</td><td>&lt;table-quotas&gt;</td></tr>
086 *   <tr><td>u.&lt;user&gt;</td><td>q:s.&lt;ns&gt;</td><td>&lt;namespace-quotas&gt;</td></tr>
087 * </table>
088 */
089@InterfaceAudience.Private
090@InterfaceStability.Evolving
091public class QuotaTableUtil {
092  private static final Logger LOG = LoggerFactory.getLogger(QuotaTableUtil.class);
093
094  /** System table for quotas */
095  public static final TableName QUOTA_TABLE_NAME =
096      TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "quota");
097
098  protected static final byte[] QUOTA_FAMILY_INFO = Bytes.toBytes("q");
099  protected static final byte[] QUOTA_FAMILY_USAGE = Bytes.toBytes("u");
100  protected static final byte[] QUOTA_QUALIFIER_SETTINGS = Bytes.toBytes("s");
101  protected static final byte[] QUOTA_QUALIFIER_SETTINGS_PREFIX = Bytes.toBytes("s.");
102  protected static final byte[] QUOTA_QUALIFIER_POLICY = Bytes.toBytes("p");
103  protected static final byte[] QUOTA_SNAPSHOT_SIZE_QUALIFIER = Bytes.toBytes("ss");
104  protected static final String QUOTA_POLICY_COLUMN =
105      Bytes.toString(QUOTA_FAMILY_USAGE) + ":" + Bytes.toString(QUOTA_QUALIFIER_POLICY);
106  protected static final byte[] QUOTA_USER_ROW_KEY_PREFIX = Bytes.toBytes("u.");
107  protected static final byte[] QUOTA_TABLE_ROW_KEY_PREFIX = Bytes.toBytes("t.");
108  protected static final byte[] QUOTA_NAMESPACE_ROW_KEY_PREFIX = Bytes.toBytes("n.");
109  protected static final byte[] QUOTA_REGION_SERVER_ROW_KEY_PREFIX = Bytes.toBytes("r.");
110  private static final byte[] QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY =
111      Bytes.toBytes("exceedThrottleQuota");
112
113  /*
114   * TODO: Setting specified region server quota isn't supported currently and the row key "r.all"
115   * represents the throttle quota of all region servers
116   */
117  public static final String QUOTA_REGION_SERVER_ROW_KEY = "all";
118
119  /* =========================================================================
120   *  Quota "settings" helpers
121   */
122  public static Quotas getTableQuota(final Connection connection, final TableName table)
123      throws IOException {
124    return getQuotas(connection, getTableRowKey(table));
125  }
126
127  public static Quotas getNamespaceQuota(final Connection connection, final String namespace)
128      throws IOException {
129    return getQuotas(connection, getNamespaceRowKey(namespace));
130  }
131
132  public static Quotas getUserQuota(final Connection connection, final String user)
133      throws IOException {
134    return getQuotas(connection, getUserRowKey(user));
135  }
136
137  public static Quotas getUserQuota(final Connection connection, final String user,
138      final TableName table) throws IOException {
139    return getQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table));
140  }
141
142  public static Quotas getUserQuota(final Connection connection, final String user,
143      final String namespace) throws IOException {
144    return getQuotas(connection, getUserRowKey(user),
145      getSettingsQualifierForUserNamespace(namespace));
146  }
147
148  private static Quotas getQuotas(final Connection connection, final byte[] rowKey)
149      throws IOException {
150    return getQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS);
151  }
152
153  public static Quotas getRegionServerQuota(final Connection connection, final String regionServer)
154      throws IOException {
155    return getQuotas(connection, getRegionServerRowKey(regionServer));
156  }
157
158  private static Quotas getQuotas(final Connection connection, final byte[] rowKey,
159      final byte[] qualifier) throws IOException {
160    Get get = new Get(rowKey);
161    get.addColumn(QUOTA_FAMILY_INFO, qualifier);
162    Result result = doGet(connection, get);
163    if (result.isEmpty()) {
164      return null;
165    }
166    return quotasFromData(result.getValue(QUOTA_FAMILY_INFO, qualifier));
167  }
168
169  public static Get makeGetForTableQuotas(final TableName table) {
170    Get get = new Get(getTableRowKey(table));
171    get.addFamily(QUOTA_FAMILY_INFO);
172    return get;
173  }
174
175  public static Get makeGetForNamespaceQuotas(final String namespace) {
176    Get get = new Get(getNamespaceRowKey(namespace));
177    get.addFamily(QUOTA_FAMILY_INFO);
178    return get;
179  }
180
181  public static Get makeGetForRegionServerQuotas(final String regionServer) {
182    Get get = new Get(getRegionServerRowKey(regionServer));
183    get.addFamily(QUOTA_FAMILY_INFO);
184    return get;
185  }
186
187  public static Get makeGetForUserQuotas(final String user, final Iterable<TableName> tables,
188      final Iterable<String> namespaces) {
189    Get get = new Get(getUserRowKey(user));
190    get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
191    for (final TableName table: tables) {
192      get.addColumn(QUOTA_FAMILY_INFO, getSettingsQualifierForUserTable(table));
193    }
194    for (final String ns: namespaces) {
195      get.addColumn(QUOTA_FAMILY_INFO, getSettingsQualifierForUserNamespace(ns));
196    }
197    return get;
198  }
199
200  public static Scan makeScan(final QuotaFilter filter) {
201    Scan scan = new Scan();
202    scan.addFamily(QUOTA_FAMILY_INFO);
203    if (filter != null && !filter.isNull()) {
204      scan.setFilter(makeFilter(filter));
205    }
206    return scan;
207  }
208
209  /**
210   * converts quotafilter to serializeable filterlists.
211   */
212  public static Filter makeFilter(final QuotaFilter filter) {
213    FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
214    if (StringUtils.isNotEmpty(filter.getUserFilter())) {
215      FilterList userFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
216      boolean hasFilter = false;
217
218      if (StringUtils.isNotEmpty(filter.getNamespaceFilter())) {
219        FilterList nsFilters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
220        nsFilters.addFilter(new RowFilter(CompareOperator.EQUAL,
221            new RegexStringComparator(getUserRowKeyRegex(filter.getUserFilter()), 0)));
222        nsFilters.addFilter(new QualifierFilter(CompareOperator.EQUAL,
223            new RegexStringComparator(
224              getSettingsQualifierRegexForUserNamespace(filter.getNamespaceFilter()), 0)));
225        userFilters.addFilter(nsFilters);
226        hasFilter = true;
227      }
228      if (StringUtils.isNotEmpty(filter.getTableFilter())) {
229        FilterList tableFilters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
230        tableFilters.addFilter(new RowFilter(CompareOperator.EQUAL,
231            new RegexStringComparator(getUserRowKeyRegex(filter.getUserFilter()), 0)));
232        tableFilters.addFilter(new QualifierFilter(CompareOperator.EQUAL,
233            new RegexStringComparator(
234              getSettingsQualifierRegexForUserTable(filter.getTableFilter()), 0)));
235        userFilters.addFilter(tableFilters);
236        hasFilter = true;
237      }
238      if (!hasFilter) {
239        userFilters.addFilter(new RowFilter(CompareOperator.EQUAL,
240            new RegexStringComparator(getUserRowKeyRegex(filter.getUserFilter()), 0)));
241      }
242
243      filterList.addFilter(userFilters);
244    } else if (StringUtils.isNotEmpty(filter.getTableFilter())) {
245      filterList.addFilter(new RowFilter(CompareOperator.EQUAL,
246          new RegexStringComparator(getTableRowKeyRegex(filter.getTableFilter()), 0)));
247    } else if (StringUtils.isNotEmpty(filter.getNamespaceFilter())) {
248      filterList.addFilter(new RowFilter(CompareOperator.EQUAL,
249          new RegexStringComparator(getNamespaceRowKeyRegex(filter.getNamespaceFilter()), 0)));
250    } else if (StringUtils.isNotEmpty(filter.getRegionServerFilter())) {
251      filterList.addFilter(new RowFilter(CompareOperator.EQUAL, new RegexStringComparator(
252          getRegionServerRowKeyRegex(filter.getRegionServerFilter()), 0)));
253    }
254    return filterList;
255  }
256
257  /**
258   * Creates a {@link Scan} which returns only quota snapshots from the quota table.
259   */
260  public static Scan makeQuotaSnapshotScan() {
261    return makeQuotaSnapshotScanForTable(null);
262  }
263
264  /**
265   * Fetches all {@link SpaceQuotaSnapshot} objects from the {@code hbase:quota} table.
266   *
267   * @param conn The HBase connection
268   * @return A map of table names and their computed snapshot.
269   */
270  public static Map<TableName,SpaceQuotaSnapshot> getSnapshots(Connection conn) throws IOException {
271    Map<TableName,SpaceQuotaSnapshot> snapshots = new HashMap<>();
272    try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
273        ResultScanner rs = quotaTable.getScanner(makeQuotaSnapshotScan())) {
274      for (Result r : rs) {
275        extractQuotaSnapshot(r, snapshots);
276      }
277    }
278    return snapshots;
279  }
280
281  /**
282   * Creates a {@link Scan} which returns only {@link SpaceQuotaSnapshot} from the quota table for a
283   * specific table.
284   * @param tn Optionally, a table name to limit the scan's rowkey space. Can be null.
285   */
286  public static Scan makeQuotaSnapshotScanForTable(TableName tn) {
287    Scan s = new Scan();
288    // Limit to "u:v" column
289    s.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
290    if (null == tn) {
291      s.setRowPrefixFilter(QUOTA_TABLE_ROW_KEY_PREFIX);
292    } else {
293      byte[] row = getTableRowKey(tn);
294      // Limit rowspace to the "t:" prefix
295      s.withStartRow(row, true).withStopRow(row, true);
296    }
297    return s;
298  }
299
300  /**
301   * Creates a {@link Get} which returns only {@link SpaceQuotaSnapshot} from the quota table for a
302   * specific table.
303   * @param tn table name to get from. Can't be null.
304   */
305  public static Get makeQuotaSnapshotGetForTable(TableName tn) {
306    Get g = new Get(getTableRowKey(tn));
307    // Limit to "u:v" column
308    g.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
309    return g;
310  }
311
312  /**
313   * Extracts the {@link SpaceViolationPolicy} and {@link TableName} from the provided
314   * {@link Result} and adds them to the given {@link Map}. If the result does not contain
315   * the expected information or the serialized policy in the value is invalid, this method
316   * will throw an {@link IllegalArgumentException}.
317   *
318   * @param result A row from the quota table.
319   * @param snapshots A map of snapshots to add the result of this method into.
320   */
321  public static void extractQuotaSnapshot(
322      Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) {
323    byte[] row = Objects.requireNonNull(result).getRow();
324    if (row == null || row.length == 0) {
325      throw new IllegalArgumentException("Provided result had a null row");
326    }
327    final TableName targetTableName = getTableFromRowKey(row);
328    Cell c = result.getColumnLatestCell(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
329    if (c == null) {
330      throw new IllegalArgumentException("Result did not contain the expected column "
331          + QUOTA_POLICY_COLUMN + ", " + result.toString());
332    }
333    ByteString buffer = UnsafeByteOperations.unsafeWrap(
334        c.getValueArray(), c.getValueOffset(), c.getValueLength());
335    try {
336      QuotaProtos.SpaceQuotaSnapshot snapshot = QuotaProtos.SpaceQuotaSnapshot.parseFrom(buffer);
337      snapshots.put(targetTableName, SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot));
338    } catch (InvalidProtocolBufferException e) {
339      throw new IllegalArgumentException(
340          "Result did not contain a valid SpaceQuota protocol buffer message", e);
341    }
342  }
343
344  public static interface UserQuotasVisitor {
345    void visitUserQuotas(final String userName, final Quotas quotas)
346      throws IOException;
347    void visitUserQuotas(final String userName, final TableName table, final Quotas quotas)
348      throws IOException;
349    void visitUserQuotas(final String userName, final String namespace, final Quotas quotas)
350      throws IOException;
351  }
352
353  public static interface TableQuotasVisitor {
354    void visitTableQuotas(final TableName tableName, final Quotas quotas)
355      throws IOException;
356  }
357
358  public static interface NamespaceQuotasVisitor {
359    void visitNamespaceQuotas(final String namespace, final Quotas quotas)
360      throws IOException;
361  }
362
363  private static interface RegionServerQuotasVisitor {
364    void visitRegionServerQuotas(final String regionServer, final Quotas quotas)
365      throws IOException;
366  }
367
368  public static interface QuotasVisitor extends UserQuotasVisitor, TableQuotasVisitor,
369      NamespaceQuotasVisitor, RegionServerQuotasVisitor {
370  }
371
372  public static void parseResult(final Result result, final QuotasVisitor visitor)
373      throws IOException {
374    byte[] row = result.getRow();
375    if (isNamespaceRowKey(row)) {
376      parseNamespaceResult(result, visitor);
377    } else if (isTableRowKey(row)) {
378      parseTableResult(result, visitor);
379    } else if (isUserRowKey(row)) {
380      parseUserResult(result, visitor);
381    } else if (isRegionServerRowKey(row)) {
382      parseRegionServerResult(result, visitor);
383    } else if (isExceedThrottleQuotaRowKey(row)) {
384      // skip exceed throttle quota row key
385      if (LOG.isDebugEnabled()) {
386        LOG.debug("Skip exceedThrottleQuota row-key when parse quota result");
387      }
388    } else {
389      LOG.warn("unexpected row-key: " + Bytes.toString(row));
390    }
391  }
392
393  public static void parseResultToCollection(final Result result,
394      Collection<QuotaSettings> quotaSettings) throws IOException {
395
396    QuotaTableUtil.parseResult(result, new QuotaTableUtil.QuotasVisitor() {
397      @Override
398      public void visitUserQuotas(String userName, Quotas quotas) {
399        quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, quotas));
400      }
401
402      @Override
403      public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
404        quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, table, quotas));
405      }
406
407      @Override
408      public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
409        quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, namespace, quotas));
410      }
411
412      @Override
413      public void visitTableQuotas(TableName tableName, Quotas quotas) {
414        quotaSettings.addAll(QuotaSettingsFactory.fromTableQuotas(tableName, quotas));
415      }
416
417      @Override
418      public void visitNamespaceQuotas(String namespace, Quotas quotas) {
419        quotaSettings.addAll(QuotaSettingsFactory.fromNamespaceQuotas(namespace, quotas));
420      }
421
422      @Override
423      public void visitRegionServerQuotas(String regionServer, Quotas quotas) {
424        quotaSettings.addAll(QuotaSettingsFactory.fromRegionServerQuotas(regionServer, quotas));
425      }
426    });
427  }
428
429  public static void parseNamespaceResult(final Result result,
430      final NamespaceQuotasVisitor visitor) throws IOException {
431    String namespace = getNamespaceFromRowKey(result.getRow());
432    parseNamespaceResult(namespace, result, visitor);
433  }
434
435  protected static void parseNamespaceResult(final String namespace, final Result result,
436      final NamespaceQuotasVisitor visitor) throws IOException {
437    byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
438    if (data != null) {
439      Quotas quotas = quotasFromData(data);
440      visitor.visitNamespaceQuotas(namespace, quotas);
441    }
442  }
443
444  private static void parseRegionServerResult(final Result result,
445      final RegionServerQuotasVisitor visitor) throws IOException {
446    String rs = getRegionServerFromRowKey(result.getRow());
447    parseRegionServerResult(rs, result, visitor);
448  }
449
450  private static void parseRegionServerResult(final String regionServer, final Result result,
451      final RegionServerQuotasVisitor visitor) throws IOException {
452    byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
453    if (data != null) {
454      Quotas quotas = quotasFromData(data);
455      visitor.visitRegionServerQuotas(regionServer, quotas);
456    }
457  }
458
459  public static void parseTableResult(final Result result, final TableQuotasVisitor visitor)
460      throws IOException {
461    TableName table = getTableFromRowKey(result.getRow());
462    parseTableResult(table, result, visitor);
463  }
464
465  protected static void parseTableResult(final TableName table, final Result result,
466      final TableQuotasVisitor visitor) throws IOException {
467    byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
468    if (data != null) {
469      Quotas quotas = quotasFromData(data);
470      visitor.visitTableQuotas(table, quotas);
471    }
472  }
473
474  public static void parseUserResult(final Result result, final UserQuotasVisitor visitor)
475      throws IOException {
476    String userName = getUserFromRowKey(result.getRow());
477    parseUserResult(userName, result, visitor);
478  }
479
480  protected static void parseUserResult(final String userName, final Result result,
481      final UserQuotasVisitor visitor) throws IOException {
482    Map<byte[], byte[]> familyMap = result.getFamilyMap(QUOTA_FAMILY_INFO);
483    if (familyMap == null || familyMap.isEmpty()) return;
484
485    for (Map.Entry<byte[], byte[]> entry: familyMap.entrySet()) {
486      Quotas quotas = quotasFromData(entry.getValue());
487      if (Bytes.startsWith(entry.getKey(), QUOTA_QUALIFIER_SETTINGS_PREFIX)) {
488        String name = Bytes.toString(entry.getKey(), QUOTA_QUALIFIER_SETTINGS_PREFIX.length);
489        if (name.charAt(name.length() - 1) == TableName.NAMESPACE_DELIM) {
490          String namespace = name.substring(0, name.length() - 1);
491          visitor.visitUserQuotas(userName, namespace, quotas);
492        } else {
493          TableName table = TableName.valueOf(name);
494          visitor.visitUserQuotas(userName, table, quotas);
495        }
496      } else if (Bytes.equals(entry.getKey(), QUOTA_QUALIFIER_SETTINGS)) {
497        visitor.visitUserQuotas(userName, quotas);
498      }
499    }
500  }
501
502  /**
503   * Creates a {@link Put} to store the given {@code snapshot} for the given {@code tableName} in
504   * the quota table.
505   */
506  static Put createPutForSpaceSnapshot(TableName tableName, SpaceQuotaSnapshot snapshot) {
507    Put p = new Put(getTableRowKey(tableName));
508    p.addColumn(
509        QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY,
510        SpaceQuotaSnapshot.toProtoSnapshot(snapshot).toByteArray());
511    return p;
512  }
513
514  /**
515   * Creates a {@link Get} for the HBase snapshot's size against the given table.
516   */
517  static Get makeGetForSnapshotSize(TableName tn, String snapshot) {
518    Get g = new Get(Bytes.add(QUOTA_TABLE_ROW_KEY_PREFIX, Bytes.toBytes(tn.toString())));
519    g.addColumn(
520        QUOTA_FAMILY_USAGE,
521        Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshot)));
522    return g;
523  }
524
525  /**
526   * Creates a {@link Put} to persist the current size of the {@code snapshot} with respect to
527   * the given {@code table}.
528   */
529  static Put createPutForSnapshotSize(TableName tableName, String snapshot, long size) {
530    // We just need a pb message with some `long usage`, so we can just reuse the
531    // SpaceQuotaSnapshot message instead of creating a new one.
532    Put p = new Put(getTableRowKey(tableName));
533    p.addColumn(QUOTA_FAMILY_USAGE, getSnapshotSizeQualifier(snapshot),
534        org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot
535            .newBuilder().setQuotaUsage(size).build().toByteArray());
536    return p;
537  }
538
539  /**
540   * Creates a {@code Put} for the namespace's total snapshot size.
541   */
542  static Put createPutForNamespaceSnapshotSize(String namespace, long size) {
543    Put p = new Put(getNamespaceRowKey(namespace));
544    p.addColumn(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER,
545        org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot
546            .newBuilder().setQuotaUsage(size).build().toByteArray());
547    return p;
548  }
549
550  /**
551   * Returns a list of {@code Delete} to remove given table snapshot
552   * entries to remove from quota table
553   * @param snapshotEntriesToRemove the entries to remove
554   */
555  static List<Delete> createDeletesForExistingTableSnapshotSizes(
556      Multimap<TableName, String> snapshotEntriesToRemove) {
557    List<Delete> deletes = new ArrayList<>();
558    for (Map.Entry<TableName, Collection<String>> entry : snapshotEntriesToRemove.asMap()
559        .entrySet()) {
560      for (String snapshot : entry.getValue()) {
561        Delete d = new Delete(getTableRowKey(entry.getKey()));
562        d.addColumns(QUOTA_FAMILY_USAGE,
563            Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshot)));
564        deletes.add(d);
565      }
566    }
567    return deletes;
568  }
569
570  /**
571   * Returns a list of {@code Delete} to remove all table snapshot entries from quota table.
572   * @param connection connection to re-use
573   */
574  static List<Delete> createDeletesForExistingTableSnapshotSizes(Connection connection)
575      throws IOException {
576    return createDeletesForExistingSnapshotsFromScan(connection, createScanForSpaceSnapshotSizes());
577  }
578
579  /**
580   * Returns a list of {@code Delete} to remove given namespace snapshot
581   * entries to removefrom quota table
582   * @param snapshotEntriesToRemove the entries to remove
583   */
584  static List<Delete> createDeletesForExistingNamespaceSnapshotSizes(
585      Set<String> snapshotEntriesToRemove) {
586    List<Delete> deletes = new ArrayList<>();
587    for (String snapshot : snapshotEntriesToRemove) {
588      Delete d = new Delete(getNamespaceRowKey(snapshot));
589      d.addColumns(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER);
590      deletes.add(d);
591    }
592    return deletes;
593  }
594
595  /**
596   * Returns a list of {@code Delete} to remove all namespace snapshot entries from quota table.
597   * @param connection connection to re-use
598   */
599  static List<Delete> createDeletesForExistingNamespaceSnapshotSizes(Connection connection)
600      throws IOException {
601    return createDeletesForExistingSnapshotsFromScan(connection,
602        createScanForNamespaceSnapshotSizes());
603  }
604
605  /**
606   * Returns a list of {@code Delete} to remove all entries returned by the passed scanner.
607   * @param connection connection to re-use
608   * @param scan the scanner to use to generate the list of deletes
609   */
610  static List<Delete> createDeletesForExistingSnapshotsFromScan(Connection connection, Scan scan)
611      throws IOException {
612    List<Delete> deletes = new ArrayList<>();
613    try (Table quotaTable = connection.getTable(QUOTA_TABLE_NAME);
614        ResultScanner rs = quotaTable.getScanner(scan)) {
615      for (Result r : rs) {
616        CellScanner cs = r.cellScanner();
617        while (cs.advance()) {
618          Cell c = cs.current();
619          byte[] family = Bytes.copy(c.getFamilyArray(), c.getFamilyOffset(), c.getFamilyLength());
620          byte[] qual =
621              Bytes.copy(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength());
622          Delete d = new Delete(r.getRow());
623          d.addColumns(family, qual);
624          deletes.add(d);
625        }
626      }
627      return deletes;
628    }
629  }
630
631  /**
632   * Remove table usage snapshots (u:p columns) for the namespace passed
633   * @param connection connection to re-use
634   * @param namespace the namespace to fetch the list of table usage snapshots
635   */
636  static void deleteTableUsageSnapshotsForNamespace(Connection connection, String namespace)
637    throws IOException {
638    Scan s = new Scan();
639    //Get rows for all tables in namespace
640    s.setRowPrefixFilter(Bytes.add(QUOTA_TABLE_ROW_KEY_PREFIX, Bytes.toBytes(namespace + TableName.NAMESPACE_DELIM)));
641    //Scan for table usage column (u:p) in quota table
642    s.addColumn(QUOTA_FAMILY_USAGE,QUOTA_QUALIFIER_POLICY);
643    //Scan for table quota column (q:s) if table has a space quota defined
644    s.addColumn(QUOTA_FAMILY_INFO,QUOTA_QUALIFIER_SETTINGS);
645    try (Table quotaTable = connection.getTable(QUOTA_TABLE_NAME);
646         ResultScanner rs = quotaTable.getScanner(s)) {
647      for (Result r : rs) {
648        byte[] data = r.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
649        //if table does not have a table space quota defined, delete table usage column (u:p)
650        if (data == null) {
651          Delete delete = new Delete(r.getRow());
652          delete.addColumns(QUOTA_FAMILY_USAGE,QUOTA_QUALIFIER_POLICY);
653          quotaTable.delete(delete);
654        }
655      }
656    }
657  }
658
659  /**
660   * Fetches the computed size of all snapshots against tables in a namespace for space quotas.
661   */
662  static long getNamespaceSnapshotSize(
663      Connection conn, String namespace) throws IOException {
664    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
665      Result r = quotaTable.get(createGetNamespaceSnapshotSize(namespace));
666      if (r.isEmpty()) {
667        return 0L;
668      }
669      r.advance();
670      return parseSnapshotSize(r.current());
671    } catch (InvalidProtocolBufferException e) {
672      throw new IOException("Could not parse snapshot size value for namespace " + namespace, e);
673    }
674  }
675
676  /**
677   * Creates a {@code Get} to fetch the namespace's total snapshot size.
678   */
679  static Get createGetNamespaceSnapshotSize(String namespace) {
680    Get g = new Get(getNamespaceRowKey(namespace));
681    g.addColumn(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER);
682    return g;
683  }
684
685  /**
686   * Parses the snapshot size from the given Cell's value.
687   */
688  static long parseSnapshotSize(Cell c) throws InvalidProtocolBufferException {
689    ByteString bs = UnsafeByteOperations.unsafeWrap(
690        c.getValueArray(), c.getValueOffset(), c.getValueLength());
691    return QuotaProtos.SpaceQuotaSnapshot.parseFrom(bs).getQuotaUsage();
692  }
693
694  /**
695   * Returns a scanner for all existing namespace snapshot entries.
696   */
697  static Scan createScanForNamespaceSnapshotSizes() {
698    return createScanForNamespaceSnapshotSizes(null);
699  }
700
701  /**
702   * Returns a scanner for all namespace snapshot entries of the given namespace
703   * @param namespace name of the namespace whose snapshot entries are to be scanned
704   */
705  static Scan createScanForNamespaceSnapshotSizes(String namespace) {
706    Scan s = new Scan();
707    if (namespace == null || namespace.isEmpty()) {
708      // Read all namespaces, just look at the row prefix
709      s.setRowPrefixFilter(QUOTA_NAMESPACE_ROW_KEY_PREFIX);
710    } else {
711      // Fetch the exact row for the table
712      byte[] rowkey = getNamespaceRowKey(namespace);
713      // Fetch just this one row
714      s.withStartRow(rowkey).withStopRow(rowkey, true);
715    }
716
717    // Just the usage family and only the snapshot size qualifiers
718    return s.addFamily(QUOTA_FAMILY_USAGE)
719        .setFilter(new ColumnPrefixFilter(QUOTA_SNAPSHOT_SIZE_QUALIFIER));
720  }
721
722  static Scan createScanForSpaceSnapshotSizes() {
723    return createScanForSpaceSnapshotSizes(null);
724  }
725
726  static Scan createScanForSpaceSnapshotSizes(TableName table) {
727    Scan s = new Scan();
728    if (null == table) {
729      // Read all tables, just look at the row prefix
730      s.setRowPrefixFilter(QUOTA_TABLE_ROW_KEY_PREFIX);
731    } else {
732      // Fetch the exact row for the table
733      byte[] rowkey = getTableRowKey(table);
734      // Fetch just this one row
735      s.withStartRow(rowkey).withStopRow(rowkey, true);
736    }
737
738    // Just the usage family and only the snapshot size qualifiers
739    return s.addFamily(QUOTA_FAMILY_USAGE).setFilter(
740        new ColumnPrefixFilter(QUOTA_SNAPSHOT_SIZE_QUALIFIER));
741  }
742
743  /**
744   * Fetches any persisted HBase snapshot sizes stored in the quota table. The sizes here are
745   * computed relative to the table which the snapshot was created from. A snapshot's size will
746   * not include the size of files which the table still refers. These sizes, in bytes, are what
747   * is used internally to compute quota violation for tables and namespaces.
748   *
749   * @return A map of snapshot name to size in bytes per space quota computations
750   */
751  public static Map<String,Long> getObservedSnapshotSizes(Connection conn) throws IOException {
752    try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
753        ResultScanner rs = quotaTable.getScanner(createScanForSpaceSnapshotSizes())) {
754      final Map<String,Long> snapshotSizes = new HashMap<>();
755      for (Result r : rs) {
756        CellScanner cs = r.cellScanner();
757        while (cs.advance()) {
758          Cell c = cs.current();
759          final String snapshot = extractSnapshotNameFromSizeCell(c);
760          final long size = parseSnapshotSize(c);
761          snapshotSizes.put(snapshot, size);
762        }
763      }
764      return snapshotSizes;
765    }
766  }
767
768  /**
769   * Returns a multimap for all existing table snapshot entries.
770   * @param conn connection to re-use
771   */
772  public static Multimap<TableName, String> getTableSnapshots(Connection conn) throws IOException {
773    try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
774        ResultScanner rs = quotaTable.getScanner(createScanForSpaceSnapshotSizes())) {
775      Multimap<TableName, String> snapshots = HashMultimap.create();
776      for (Result r : rs) {
777        CellScanner cs = r.cellScanner();
778        while (cs.advance()) {
779          Cell c = cs.current();
780
781          final String snapshot = extractSnapshotNameFromSizeCell(c);
782          snapshots.put(getTableFromRowKey(r.getRow()), snapshot);
783        }
784      }
785      return snapshots;
786    }
787  }
788
789  /**
790   * Returns a set of the names of all namespaces containing snapshot entries.
791   * @param conn connection to re-use
792   */
793  public static Set<String> getNamespaceSnapshots(Connection conn) throws IOException {
794    try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
795        ResultScanner rs = quotaTable.getScanner(createScanForNamespaceSnapshotSizes())) {
796      Set<String> snapshots = new HashSet<>();
797      for (Result r : rs) {
798        CellScanner cs = r.cellScanner();
799        while (cs.advance()) {
800          cs.current();
801          snapshots.add(getNamespaceFromRowKey(r.getRow()));
802        }
803      }
804      return snapshots;
805    }
806  }
807
808  /**
809   * Returns the current space quota snapshot of the given {@code tableName} from
810   * {@code QuotaTableUtil.QUOTA_TABLE_NAME} or null if the no quota information is available for
811   * that tableName.
812   * @param conn connection to re-use
813   * @param tableName name of the table whose current snapshot is to be retreived
814   */
815  public static SpaceQuotaSnapshot getCurrentSnapshotFromQuotaTable(Connection conn,
816      TableName tableName) throws IOException {
817    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
818      Map<TableName, SpaceQuotaSnapshot> snapshots = new HashMap<>(1);
819      Result result = quotaTable.get(makeQuotaSnapshotGetForTable(tableName));
820      // if we don't have any row corresponding to this get, return null
821      if (result.isEmpty()) {
822        return null;
823      }
824      // otherwise, extract quota snapshot in snapshots object
825      extractQuotaSnapshot(result, snapshots);
826      return snapshots.get(tableName);
827    }
828  }
829
830  /* =========================================================================
831   *  Quotas protobuf helpers
832   */
833  protected static Quotas quotasFromData(final byte[] data) throws IOException {
834    return quotasFromData(data, 0, data.length);
835  }
836
837  protected static Quotas quotasFromData(
838      final byte[] data, int offset, int length) throws IOException {
839    int magicLen = ProtobufMagic.lengthOfPBMagic();
840    if (!ProtobufMagic.isPBMagicPrefix(data, offset, magicLen)) {
841      throw new IOException("Missing pb magic prefix");
842    }
843    return Quotas.parseFrom(new ByteArrayInputStream(data, offset + magicLen, length - magicLen));
844  }
845
846  protected static byte[] quotasToData(final Quotas data) throws IOException {
847    ByteArrayOutputStream stream = new ByteArrayOutputStream();
848    stream.write(ProtobufMagic.PB_MAGIC);
849    data.writeTo(stream);
850    return stream.toByteArray();
851  }
852
853  public static boolean isEmptyQuota(final Quotas quotas) {
854    boolean hasSettings = false;
855    hasSettings |= quotas.hasThrottle();
856    hasSettings |= quotas.hasBypassGlobals();
857    // Only when there is a space quota, make sure there's actually both fields provided
858    // Otherwise, it's a noop.
859    if (quotas.hasSpace()) {
860      hasSettings |= (quotas.getSpace().hasSoftLimit() && quotas.getSpace().hasViolationPolicy());
861    }
862    return !hasSettings;
863  }
864
865  /* =========================================================================
866   *  HTable helpers
867   */
868  protected static Result doGet(final Connection connection, final Get get)
869      throws IOException {
870    try (Table table = connection.getTable(QUOTA_TABLE_NAME)) {
871      return table.get(get);
872    }
873  }
874
875  protected static Result[] doGet(final Connection connection, final List<Get> gets)
876      throws IOException {
877    try (Table table = connection.getTable(QUOTA_TABLE_NAME)) {
878      return table.get(gets);
879    }
880  }
881
882  /* =========================================================================
883   *  Quota table row key helpers
884   */
885  protected static byte[] getUserRowKey(final String user) {
886    return Bytes.add(QUOTA_USER_ROW_KEY_PREFIX, Bytes.toBytes(user));
887  }
888
889  protected static byte[] getTableRowKey(final TableName table) {
890    return Bytes.add(QUOTA_TABLE_ROW_KEY_PREFIX, table.getName());
891  }
892
893  protected static byte[] getNamespaceRowKey(final String namespace) {
894    return Bytes.add(QUOTA_NAMESPACE_ROW_KEY_PREFIX, Bytes.toBytes(namespace));
895  }
896
897  protected static byte[] getRegionServerRowKey(final String regionServer) {
898    return Bytes.add(QUOTA_REGION_SERVER_ROW_KEY_PREFIX, Bytes.toBytes(regionServer));
899  }
900
901  protected static byte[] getSettingsQualifierForUserTable(final TableName tableName) {
902    return Bytes.add(QUOTA_QUALIFIER_SETTINGS_PREFIX, tableName.getName());
903  }
904
905  protected static byte[] getSettingsQualifierForUserNamespace(final String namespace) {
906    return Bytes.add(QUOTA_QUALIFIER_SETTINGS_PREFIX,
907        Bytes.toBytes(namespace + TableName.NAMESPACE_DELIM));
908  }
909
910  protected static String getUserRowKeyRegex(final String user) {
911    return getRowKeyRegEx(QUOTA_USER_ROW_KEY_PREFIX, user);
912  }
913
914  protected static String getTableRowKeyRegex(final String table) {
915    return getRowKeyRegEx(QUOTA_TABLE_ROW_KEY_PREFIX, table);
916  }
917
918  protected static String getNamespaceRowKeyRegex(final String namespace) {
919    return getRowKeyRegEx(QUOTA_NAMESPACE_ROW_KEY_PREFIX, namespace);
920  }
921
922  private static String getRegionServerRowKeyRegex(final String regionServer) {
923    return getRowKeyRegEx(QUOTA_REGION_SERVER_ROW_KEY_PREFIX, regionServer);
924  }
925
926  protected static byte[] getExceedThrottleQuotaRowKey() {
927    return QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY;
928  }
929
930  private static String getRowKeyRegEx(final byte[] prefix, final String regex) {
931    return '^' + Pattern.quote(Bytes.toString(prefix)) + regex + '$';
932  }
933
934  protected static String getSettingsQualifierRegexForUserTable(final String table) {
935    return '^' + Pattern.quote(Bytes.toString(QUOTA_QUALIFIER_SETTINGS_PREFIX)) +
936          table + "(?<!" + Pattern.quote(Character.toString(TableName.NAMESPACE_DELIM)) + ")$";
937  }
938
939  protected static String getSettingsQualifierRegexForUserNamespace(final String namespace) {
940    return '^' + Pattern.quote(Bytes.toString(QUOTA_QUALIFIER_SETTINGS_PREFIX)) +
941                  namespace + Pattern.quote(Character.toString(TableName.NAMESPACE_DELIM)) + '$';
942  }
943
944  protected static boolean isNamespaceRowKey(final byte[] key) {
945    return Bytes.startsWith(key, QUOTA_NAMESPACE_ROW_KEY_PREFIX);
946  }
947
948  protected static String getNamespaceFromRowKey(final byte[] key) {
949    return Bytes.toString(key, QUOTA_NAMESPACE_ROW_KEY_PREFIX.length);
950  }
951
952  protected static boolean isRegionServerRowKey(final byte[] key) {
953    return Bytes.startsWith(key, QUOTA_REGION_SERVER_ROW_KEY_PREFIX);
954  }
955
956  private static boolean isExceedThrottleQuotaRowKey(final byte[] key) {
957    return Bytes.equals(key, QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY);
958  }
959
960  protected static String getRegionServerFromRowKey(final byte[] key) {
961    return Bytes.toString(key, QUOTA_REGION_SERVER_ROW_KEY_PREFIX.length);
962  }
963
964  protected static boolean isTableRowKey(final byte[] key) {
965    return Bytes.startsWith(key, QUOTA_TABLE_ROW_KEY_PREFIX);
966  }
967
968  protected static TableName getTableFromRowKey(final byte[] key) {
969    return TableName.valueOf(Bytes.toString(key, QUOTA_TABLE_ROW_KEY_PREFIX.length));
970  }
971
972  protected static boolean isUserRowKey(final byte[] key) {
973    return Bytes.startsWith(key, QUOTA_USER_ROW_KEY_PREFIX);
974  }
975
976  protected static String getUserFromRowKey(final byte[] key) {
977    return Bytes.toString(key, QUOTA_USER_ROW_KEY_PREFIX.length);
978  }
979
980  protected static SpaceQuota getProtoViolationPolicy(SpaceViolationPolicy policy) {
981    return SpaceQuota.newBuilder()
982          .setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy))
983          .build();
984  }
985
986  protected static SpaceViolationPolicy getViolationPolicy(SpaceQuota proto) {
987    if (!proto.hasViolationPolicy()) {
988      throw new IllegalArgumentException("Protobuf SpaceQuota does not have violation policy.");
989    }
990    return ProtobufUtil.toViolationPolicy(proto.getViolationPolicy());
991  }
992
993  protected static byte[] getSnapshotSizeQualifier(String snapshotName) {
994    return Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshotName));
995  }
996
997  protected static String extractSnapshotNameFromSizeCell(Cell c) {
998    return Bytes.toString(
999        c.getQualifierArray(), c.getQualifierOffset() + QUOTA_SNAPSHOT_SIZE_QUALIFIER.length,
1000        c.getQualifierLength() - QUOTA_SNAPSHOT_SIZE_QUALIFIER.length);
1001  }
1002
1003  protected static long extractSnapshotSize(
1004      byte[] data, int offset, int length) throws InvalidProtocolBufferException {
1005    ByteString byteStr = UnsafeByteOperations.unsafeWrap(data, offset, length);
1006    return org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot
1007        .parseFrom(byteStr).getQuotaUsage();
1008  }
1009}