001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.ByteArrayInputStream;
021import java.io.ByteArrayOutputStream;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Objects;
030import java.util.Set;
031import java.util.regex.Pattern;
032import org.apache.commons.lang3.StringUtils;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellScanner;
035import org.apache.hadoop.hbase.CompareOperator;
036import org.apache.hadoop.hbase.NamespaceDescriptor;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.Delete;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.ResultScanner;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
047import org.apache.hadoop.hbase.filter.Filter;
048import org.apache.hadoop.hbase.filter.FilterList;
049import org.apache.hadoop.hbase.filter.QualifierFilter;
050import org.apache.hadoop.hbase.filter.RegexStringComparator;
051import org.apache.hadoop.hbase.filter.RowFilter;
052import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.apache.yetus.audience.InterfaceStability;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
060import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
061import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
062import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
063import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
064
065import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
069
070/**
071 * Helper class to interact with the quota table.
072 * <table>
073 * <tr>
074 * <th>ROW-KEY</th>
075 * <th>FAM/QUAL</th>
076 * <th>DATA</th>
077 * <th>DESC</th>
078 * </tr>
079 * <tr>
080 * <td>n.&lt;namespace&gt;</td>
081 * <td>q:s</td>
082 * <td>&lt;global-quotas&gt;</td>
083 * </tr>
084 * <tr>
085 * <td>n.&lt;namespace&gt;</td>
086 * <td>u:p</td>
087 * <td>&lt;namespace-quota policy&gt;</td>
088 * </tr>
089 * <tr>
090 * <td>n.&lt;namespace&gt;</td>
091 * <td>u:s</td>
092 * <td>&lt;SpaceQuotaSnapshot&gt;</td>
093 * <td>The size of all snapshots against tables in the namespace</td>
094 * </tr>
095 * <tr>
096 * <td>t.&lt;table&gt;</td>
097 * <td>q:s</td>
098 * <td>&lt;global-quotas&gt;</td>
099 * </tr>
100 * <tr>
101 * <td>t.&lt;table&gt;</td>
102 * <td>u:p</td>
103 * <td>&lt;table-quota policy&gt;</td>
104 * </tr>
105 * <tr>
106 * <td>t.&lt;table&gt;</td>
107 * <td>u:ss.&lt;snapshot name&gt;</td>
108 * <td>&lt;SpaceQuotaSnapshot&gt;</td>
109 * <td>The size of a snapshot against a table</td>
110 * </tr>
111 * <tr>
112 * <td>u.&lt;user&gt;</td>
113 * <td>q:s</td>
114 * <td>&lt;global-quotas&gt;</td>
115 * </tr>
116 * <tr>
117 * <td>u.&lt;user&gt;</td>
118 * <td>q:s.&lt;table&gt;</td>
119 * <td>&lt;table-quotas&gt;</td>
120 * </tr>
121 * <tr>
122 * <td>u.&lt;user&gt;</td>
123 * <td>q:s.&lt;ns&gt;</td>
124 * <td>&lt;namespace-quotas&gt;</td>
125 * </tr>
126 * </table>
127 */
128@InterfaceAudience.Private
129@InterfaceStability.Evolving
130public class QuotaTableUtil {
131  private static final Logger LOG = LoggerFactory.getLogger(QuotaTableUtil.class);
132
133  /** System table for quotas */
134  public static final TableName QUOTA_TABLE_NAME =
135    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "quota");
136
137  protected static final byte[] QUOTA_FAMILY_INFO = Bytes.toBytes("q");
138  protected static final byte[] QUOTA_FAMILY_USAGE = Bytes.toBytes("u");
139  protected static final byte[] QUOTA_QUALIFIER_SETTINGS = Bytes.toBytes("s");
140  protected static final byte[] QUOTA_QUALIFIER_SETTINGS_PREFIX = Bytes.toBytes("s.");
141  protected static final byte[] QUOTA_QUALIFIER_POLICY = Bytes.toBytes("p");
142  protected static final byte[] QUOTA_SNAPSHOT_SIZE_QUALIFIER = Bytes.toBytes("ss");
143  protected static final String QUOTA_POLICY_COLUMN =
144    Bytes.toString(QUOTA_FAMILY_USAGE) + ":" + Bytes.toString(QUOTA_QUALIFIER_POLICY);
145  protected static final byte[] QUOTA_USER_ROW_KEY_PREFIX = Bytes.toBytes("u.");
146  protected static final byte[] QUOTA_TABLE_ROW_KEY_PREFIX = Bytes.toBytes("t.");
147  protected static final byte[] QUOTA_NAMESPACE_ROW_KEY_PREFIX = Bytes.toBytes("n.");
148  protected static final byte[] QUOTA_REGION_SERVER_ROW_KEY_PREFIX = Bytes.toBytes("r.");
149  private static final byte[] QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY =
150    Bytes.toBytes("exceedThrottleQuota");
151
152  /*
153   * TODO: Setting specified region server quota isn't supported currently and the row key "r.all"
154   * represents the throttle quota of all region servers
155   */
156  public static final String QUOTA_REGION_SERVER_ROW_KEY = "all";
157
158  /*
159   * ========================================================================= Quota "settings"
160   * helpers
161   */
162  public static Quotas getTableQuota(final Connection connection, final TableName table)
163    throws IOException {
164    return getQuotas(connection, getTableRowKey(table));
165  }
166
167  public static Quotas getNamespaceQuota(final Connection connection, final String namespace)
168    throws IOException {
169    return getQuotas(connection, getNamespaceRowKey(namespace));
170  }
171
172  public static Quotas getUserQuota(final Connection connection, final String user)
173    throws IOException {
174    return getQuotas(connection, getUserRowKey(user));
175  }
176
177  public static Quotas getUserQuota(final Connection connection, final String user,
178    final TableName table) throws IOException {
179    return getQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table));
180  }
181
182  public static Quotas getUserQuota(final Connection connection, final String user,
183    final String namespace) throws IOException {
184    return getQuotas(connection, getUserRowKey(user),
185      getSettingsQualifierForUserNamespace(namespace));
186  }
187
188  private static Quotas getQuotas(final Connection connection, final byte[] rowKey)
189    throws IOException {
190    return getQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS);
191  }
192
193  public static Quotas getRegionServerQuota(final Connection connection, final String regionServer)
194    throws IOException {
195    return getQuotas(connection, getRegionServerRowKey(regionServer));
196  }
197
198  private static Quotas getQuotas(final Connection connection, final byte[] rowKey,
199    final byte[] qualifier) throws IOException {
200    Get get = new Get(rowKey);
201    get.addColumn(QUOTA_FAMILY_INFO, qualifier);
202    Result result = doGet(connection, get);
203    if (result.isEmpty()) {
204      return null;
205    }
206    return quotasFromData(result.getValue(QUOTA_FAMILY_INFO, qualifier));
207  }
208
209  public static Scan makeScan(final QuotaFilter filter) {
210    Scan scan = new Scan();
211    scan.addFamily(QUOTA_FAMILY_INFO);
212    if (filter != null && !filter.isNull()) {
213      scan.setFilter(makeFilter(filter));
214    }
215    return scan;
216  }
217
218  /**
219   * converts quotafilter to serializeable filterlists.
220   */
221  public static Filter makeFilter(final QuotaFilter filter) {
222    FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
223    if (StringUtils.isNotEmpty(filter.getUserFilter())) {
224      FilterList userFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
225      boolean hasFilter = false;
226
227      if (StringUtils.isNotEmpty(filter.getNamespaceFilter())) {
228        FilterList nsFilters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
229        nsFilters.addFilter(new RowFilter(CompareOperator.EQUAL,
230          new RegexStringComparator(getUserRowKeyRegex(filter.getUserFilter()), 0)));
231        nsFilters.addFilter(new QualifierFilter(CompareOperator.EQUAL, new RegexStringComparator(
232          getSettingsQualifierRegexForUserNamespace(filter.getNamespaceFilter()), 0)));
233        userFilters.addFilter(nsFilters);
234        hasFilter = true;
235      }
236      if (StringUtils.isNotEmpty(filter.getTableFilter())) {
237        FilterList tableFilters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
238        tableFilters.addFilter(new RowFilter(CompareOperator.EQUAL,
239          new RegexStringComparator(getUserRowKeyRegex(filter.getUserFilter()), 0)));
240        tableFilters.addFilter(new QualifierFilter(CompareOperator.EQUAL, new RegexStringComparator(
241          getSettingsQualifierRegexForUserTable(filter.getTableFilter()), 0)));
242        userFilters.addFilter(tableFilters);
243        hasFilter = true;
244      }
245      if (!hasFilter) {
246        userFilters.addFilter(new RowFilter(CompareOperator.EQUAL,
247          new RegexStringComparator(getUserRowKeyRegex(filter.getUserFilter()), 0)));
248      }
249
250      filterList.addFilter(userFilters);
251    } else if (StringUtils.isNotEmpty(filter.getTableFilter())) {
252      filterList.addFilter(new RowFilter(CompareOperator.EQUAL,
253        new RegexStringComparator(getTableRowKeyRegex(filter.getTableFilter()), 0)));
254    } else if (StringUtils.isNotEmpty(filter.getNamespaceFilter())) {
255      filterList.addFilter(new RowFilter(CompareOperator.EQUAL,
256        new RegexStringComparator(getNamespaceRowKeyRegex(filter.getNamespaceFilter()), 0)));
257    } else if (StringUtils.isNotEmpty(filter.getRegionServerFilter())) {
258      filterList.addFilter(new RowFilter(CompareOperator.EQUAL,
259        new RegexStringComparator(getRegionServerRowKeyRegex(filter.getRegionServerFilter()), 0)));
260    }
261    return filterList;
262  }
263
264  /**
265   * Creates a {@link Scan} which returns only quota snapshots from the quota table.
266   */
267  public static Scan makeQuotaSnapshotScan() {
268    return makeQuotaSnapshotScanForTable(null);
269  }
270
271  /**
272   * Fetches all {@link SpaceQuotaSnapshot} objects from the {@code hbase:quota} table.
273   * @param conn The HBase connection
274   * @return A map of table names and their computed snapshot.
275   */
276  public static Map<TableName, SpaceQuotaSnapshot> getSnapshots(Connection conn)
277    throws IOException {
278    Map<TableName, SpaceQuotaSnapshot> snapshots = new HashMap<>();
279    try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
280      ResultScanner rs = quotaTable.getScanner(makeQuotaSnapshotScan())) {
281      for (Result r : rs) {
282        extractQuotaSnapshot(r, snapshots);
283      }
284    }
285    return snapshots;
286  }
287
288  /**
289   * Creates a {@link Scan} which returns only {@link SpaceQuotaSnapshot} from the quota table for a
290   * specific table.
291   * @param tn Optionally, a table name to limit the scan's rowkey space. Can be null.
292   */
293  public static Scan makeQuotaSnapshotScanForTable(TableName tn) {
294    Scan s = new Scan();
295    // Limit to "u:v" column
296    s.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
297    if (null == tn) {
298      s.setStartStopRowForPrefixScan(QUOTA_TABLE_ROW_KEY_PREFIX);
299    } else {
300      byte[] row = getTableRowKey(tn);
301      // Limit rowspace to the "t:" prefix
302      s.withStartRow(row, true).withStopRow(row, true);
303    }
304    return s;
305  }
306
307  /**
308   * Creates a {@link Get} which returns only {@link SpaceQuotaSnapshot} from the quota table for a
309   * specific table.
310   * @param tn table name to get from. Can't be null.
311   */
312  public static Get makeQuotaSnapshotGetForTable(TableName tn) {
313    Get g = new Get(getTableRowKey(tn));
314    // Limit to "u:v" column
315    g.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
316    return g;
317  }
318
319  /**
320   * Extracts the {@link SpaceViolationPolicy} and {@link TableName} from the provided
321   * {@link Result} and adds them to the given {@link Map}. If the result does not contain the
322   * expected information or the serialized policy in the value is invalid, this method will throw
323   * an {@link IllegalArgumentException}.
324   * @param result    A row from the quota table.
325   * @param snapshots A map of snapshots to add the result of this method into.
326   */
327  public static void extractQuotaSnapshot(Result result,
328    Map<TableName, SpaceQuotaSnapshot> snapshots) {
329    byte[] row = Objects.requireNonNull(result).getRow();
330    if (row == null || row.length == 0) {
331      throw new IllegalArgumentException("Provided result had a null row");
332    }
333    final TableName targetTableName = getTableFromRowKey(row);
334    Cell c = result.getColumnLatestCell(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
335    if (c == null) {
336      throw new IllegalArgumentException("Result did not contain the expected column "
337        + QUOTA_POLICY_COLUMN + ", " + result.toString());
338    }
339    ByteString buffer =
340      UnsafeByteOperations.unsafeWrap(c.getValueArray(), c.getValueOffset(), c.getValueLength());
341    try {
342      QuotaProtos.SpaceQuotaSnapshot snapshot = QuotaProtos.SpaceQuotaSnapshot.parseFrom(buffer);
343      snapshots.put(targetTableName, SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot));
344    } catch (InvalidProtocolBufferException e) {
345      throw new IllegalArgumentException(
346        "Result did not contain a valid SpaceQuota protocol buffer message", e);
347    }
348  }
349
350  public static interface UserQuotasVisitor {
351    void visitUserQuotas(final String userName, final Quotas quotas) throws IOException;
352
353    void visitUserQuotas(final String userName, final TableName table, final Quotas quotas)
354      throws IOException;
355
356    void visitUserQuotas(final String userName, final String namespace, final Quotas quotas)
357      throws IOException;
358  }
359
360  public static interface TableQuotasVisitor {
361    void visitTableQuotas(final TableName tableName, final Quotas quotas) throws IOException;
362  }
363
364  public static interface NamespaceQuotasVisitor {
365    void visitNamespaceQuotas(final String namespace, final Quotas quotas) throws IOException;
366  }
367
368  private static interface RegionServerQuotasVisitor {
369    void visitRegionServerQuotas(final String regionServer, final Quotas quotas) throws IOException;
370  }
371
372  public static interface QuotasVisitor extends UserQuotasVisitor, TableQuotasVisitor,
373    NamespaceQuotasVisitor, RegionServerQuotasVisitor {
374  }
375
376  public static void parseResult(final Result result, final QuotasVisitor visitor)
377    throws IOException {
378    byte[] row = result.getRow();
379    if (isNamespaceRowKey(row)) {
380      parseNamespaceResult(result, visitor);
381    } else if (isTableRowKey(row)) {
382      parseTableResult(result, visitor);
383    } else if (isUserRowKey(row)) {
384      parseUserResult(result, visitor);
385    } else if (isRegionServerRowKey(row)) {
386      parseRegionServerResult(result, visitor);
387    } else if (isExceedThrottleQuotaRowKey(row)) {
388      // skip exceed throttle quota row key
389      if (LOG.isDebugEnabled()) {
390        LOG.debug("Skip exceedThrottleQuota row-key when parse quota result");
391      }
392    } else {
393      LOG.warn("unexpected row-key: " + Bytes.toString(row));
394    }
395  }
396
397  public static void parseResultToCollection(final Result result,
398    Collection<QuotaSettings> quotaSettings) throws IOException {
399
400    QuotaTableUtil.parseResult(result, new QuotaTableUtil.QuotasVisitor() {
401      @Override
402      public void visitUserQuotas(String userName, Quotas quotas) {
403        quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, quotas));
404      }
405
406      @Override
407      public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
408        quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, table, quotas));
409      }
410
411      @Override
412      public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
413        quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, namespace, quotas));
414      }
415
416      @Override
417      public void visitTableQuotas(TableName tableName, Quotas quotas) {
418        quotaSettings.addAll(QuotaSettingsFactory.fromTableQuotas(tableName, quotas));
419      }
420
421      @Override
422      public void visitNamespaceQuotas(String namespace, Quotas quotas) {
423        quotaSettings.addAll(QuotaSettingsFactory.fromNamespaceQuotas(namespace, quotas));
424      }
425
426      @Override
427      public void visitRegionServerQuotas(String regionServer, Quotas quotas) {
428        quotaSettings.addAll(QuotaSettingsFactory.fromRegionServerQuotas(regionServer, quotas));
429      }
430    });
431  }
432
433  public static void parseNamespaceResult(final Result result, final NamespaceQuotasVisitor visitor)
434    throws IOException {
435    String namespace = getNamespaceFromRowKey(result.getRow());
436    parseNamespaceResult(namespace, result, visitor);
437  }
438
439  protected static void parseNamespaceResult(final String namespace, final Result result,
440    final NamespaceQuotasVisitor visitor) throws IOException {
441    byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
442    if (data != null) {
443      Quotas quotas = quotasFromData(data);
444      visitor.visitNamespaceQuotas(namespace, quotas);
445    }
446  }
447
448  private static void parseRegionServerResult(final Result result,
449    final RegionServerQuotasVisitor visitor) throws IOException {
450    String rs = getRegionServerFromRowKey(result.getRow());
451    parseRegionServerResult(rs, result, visitor);
452  }
453
454  private static void parseRegionServerResult(final String regionServer, final Result result,
455    final RegionServerQuotasVisitor visitor) throws IOException {
456    byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
457    if (data != null) {
458      Quotas quotas = quotasFromData(data);
459      visitor.visitRegionServerQuotas(regionServer, quotas);
460    }
461  }
462
463  public static void parseTableResult(final Result result, final TableQuotasVisitor visitor)
464    throws IOException {
465    TableName table = getTableFromRowKey(result.getRow());
466    parseTableResult(table, result, visitor);
467  }
468
469  protected static void parseTableResult(final TableName table, final Result result,
470    final TableQuotasVisitor visitor) throws IOException {
471    byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
472    if (data != null) {
473      Quotas quotas = quotasFromData(data);
474      visitor.visitTableQuotas(table, quotas);
475    }
476  }
477
478  public static void parseUserResult(final Result result, final UserQuotasVisitor visitor)
479    throws IOException {
480    String userName = getUserFromRowKey(result.getRow());
481    parseUserResult(userName, result, visitor);
482  }
483
484  protected static void parseUserResult(final String userName, final Result result,
485    final UserQuotasVisitor visitor) throws IOException {
486    Map<byte[], byte[]> familyMap = result.getFamilyMap(QUOTA_FAMILY_INFO);
487    if (familyMap == null || familyMap.isEmpty()) return;
488
489    for (Map.Entry<byte[], byte[]> entry : familyMap.entrySet()) {
490      Quotas quotas = quotasFromData(entry.getValue());
491      if (Bytes.startsWith(entry.getKey(), QUOTA_QUALIFIER_SETTINGS_PREFIX)) {
492        String name = Bytes.toString(entry.getKey(), QUOTA_QUALIFIER_SETTINGS_PREFIX.length);
493        if (name.charAt(name.length() - 1) == TableName.NAMESPACE_DELIM) {
494          String namespace = name.substring(0, name.length() - 1);
495          visitor.visitUserQuotas(userName, namespace, quotas);
496        } else {
497          TableName table = TableName.valueOf(name);
498          visitor.visitUserQuotas(userName, table, quotas);
499        }
500      } else if (Bytes.equals(entry.getKey(), QUOTA_QUALIFIER_SETTINGS)) {
501        visitor.visitUserQuotas(userName, quotas);
502      }
503    }
504  }
505
506  /**
507   * Creates a {@link Put} to store the given {@code snapshot} for the given {@code tableName} in
508   * the quota table.
509   */
510  static Put createPutForSpaceSnapshot(TableName tableName, SpaceQuotaSnapshot snapshot) {
511    Put p = new Put(getTableRowKey(tableName));
512    p.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY,
513      SpaceQuotaSnapshot.toProtoSnapshot(snapshot).toByteArray());
514    return p;
515  }
516
517  /**
518   * Creates a {@link Get} for the HBase snapshot's size against the given table.
519   */
520  static Get makeGetForSnapshotSize(TableName tn, String snapshot) {
521    Get g = new Get(Bytes.add(QUOTA_TABLE_ROW_KEY_PREFIX, Bytes.toBytes(tn.toString())));
522    g.addColumn(QUOTA_FAMILY_USAGE,
523      Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshot)));
524    return g;
525  }
526
527  /**
528   * Creates a {@link Put} to persist the current size of the {@code snapshot} with respect to the
529   * given {@code table}.
530   */
531  static Put createPutForSnapshotSize(TableName tableName, String snapshot, long size) {
532    // We just need a pb message with some `long usage`, so we can just reuse the
533    // SpaceQuotaSnapshot message instead of creating a new one.
534    Put p = new Put(getTableRowKey(tableName));
535    p.addColumn(QUOTA_FAMILY_USAGE, getSnapshotSizeQualifier(snapshot),
536      org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot.newBuilder()
537        .setQuotaUsage(size).build().toByteArray());
538    return p;
539  }
540
541  /**
542   * Creates a {@code Put} for the namespace's total snapshot size.
543   */
544  static Put createPutForNamespaceSnapshotSize(String namespace, long size) {
545    Put p = new Put(getNamespaceRowKey(namespace));
546    p.addColumn(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER,
547      org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot.newBuilder()
548        .setQuotaUsage(size).build().toByteArray());
549    return p;
550  }
551
552  /**
553   * Returns a list of {@code Delete} to remove given table snapshot entries to remove from quota
554   * table
555   * @param snapshotEntriesToRemove the entries to remove
556   */
557  static List<Delete> createDeletesForExistingTableSnapshotSizes(
558    Multimap<TableName, String> snapshotEntriesToRemove) {
559    List<Delete> deletes = new ArrayList<>();
560    for (Map.Entry<TableName, Collection<String>> entry : snapshotEntriesToRemove.asMap()
561      .entrySet()) {
562      for (String snapshot : entry.getValue()) {
563        Delete d = new Delete(getTableRowKey(entry.getKey()));
564        d.addColumns(QUOTA_FAMILY_USAGE,
565          Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshot)));
566        deletes.add(d);
567      }
568    }
569    return deletes;
570  }
571
572  /**
573   * Returns a list of {@code Delete} to remove all table snapshot entries from quota table.
574   * @param connection connection to re-use
575   */
576  static List<Delete> createDeletesForExistingTableSnapshotSizes(Connection connection)
577    throws IOException {
578    return createDeletesForExistingSnapshotsFromScan(connection, createScanForSpaceSnapshotSizes());
579  }
580
581  /**
582   * Returns a list of {@code Delete} to remove given namespace snapshot entries to removefrom quota
583   * table
584   * @param snapshotEntriesToRemove the entries to remove
585   */
586  static List<Delete>
587    createDeletesForExistingNamespaceSnapshotSizes(Set<String> snapshotEntriesToRemove) {
588    List<Delete> deletes = new ArrayList<>();
589    for (String snapshot : snapshotEntriesToRemove) {
590      Delete d = new Delete(getNamespaceRowKey(snapshot));
591      d.addColumns(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER);
592      deletes.add(d);
593    }
594    return deletes;
595  }
596
597  /**
598   * Returns a list of {@code Delete} to remove all namespace snapshot entries from quota table.
599   * @param connection connection to re-use
600   */
601  static List<Delete> createDeletesForExistingNamespaceSnapshotSizes(Connection connection)
602    throws IOException {
603    return createDeletesForExistingSnapshotsFromScan(connection,
604      createScanForNamespaceSnapshotSizes());
605  }
606
607  /**
608   * Returns a list of {@code Delete} to remove all entries returned by the passed scanner.
609   * @param connection connection to re-use
610   * @param scan       the scanner to use to generate the list of deletes
611   */
612  static List<Delete> createDeletesForExistingSnapshotsFromScan(Connection connection, Scan scan)
613    throws IOException {
614    List<Delete> deletes = new ArrayList<>();
615    try (Table quotaTable = connection.getTable(QUOTA_TABLE_NAME);
616      ResultScanner rs = quotaTable.getScanner(scan)) {
617      for (Result r : rs) {
618        CellScanner cs = r.cellScanner();
619        while (cs.advance()) {
620          Cell c = cs.current();
621          byte[] family = Bytes.copy(c.getFamilyArray(), c.getFamilyOffset(), c.getFamilyLength());
622          byte[] qual =
623            Bytes.copy(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength());
624          Delete d = new Delete(r.getRow());
625          d.addColumns(family, qual);
626          deletes.add(d);
627        }
628      }
629      return deletes;
630    }
631  }
632
633  /**
634   * Remove table usage snapshots (u:p columns) for the namespace passed
635   * @param connection connection to re-use
636   * @param namespace  the namespace to fetch the list of table usage snapshots
637   */
638  static void deleteTableUsageSnapshotsForNamespace(Connection connection, String namespace)
639    throws IOException {
640    Scan s = new Scan();
641    // Get rows for all tables in namespace
642    s.setStartStopRowForPrefixScan(
643      Bytes.add(QUOTA_TABLE_ROW_KEY_PREFIX, Bytes.toBytes(namespace + TableName.NAMESPACE_DELIM)));
644    // Scan for table usage column (u:p) in quota table
645    s.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
646    // Scan for table quota column (q:s) if table has a space quota defined
647    s.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
648    try (Table quotaTable = connection.getTable(QUOTA_TABLE_NAME);
649      ResultScanner rs = quotaTable.getScanner(s)) {
650      for (Result r : rs) {
651        byte[] data = r.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
652        // if table does not have a table space quota defined, delete table usage column (u:p)
653        if (data == null) {
654          Delete delete = new Delete(r.getRow());
655          delete.addColumns(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
656          quotaTable.delete(delete);
657        }
658      }
659    }
660  }
661
662  /**
663   * Fetches the computed size of all snapshots against tables in a namespace for space quotas.
664   */
665  static long getNamespaceSnapshotSize(Connection conn, String namespace) throws IOException {
666    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
667      Result r = quotaTable.get(createGetNamespaceSnapshotSize(namespace));
668      if (r.isEmpty()) {
669        return 0L;
670      }
671      r.advance();
672      return parseSnapshotSize(r.current());
673    } catch (InvalidProtocolBufferException e) {
674      throw new IOException("Could not parse snapshot size value for namespace " + namespace, e);
675    }
676  }
677
678  /**
679   * Creates a {@code Get} to fetch the namespace's total snapshot size.
680   */
681  static Get createGetNamespaceSnapshotSize(String namespace) {
682    Get g = new Get(getNamespaceRowKey(namespace));
683    g.addColumn(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER);
684    return g;
685  }
686
687  /**
688   * Parses the snapshot size from the given Cell's value.
689   */
690  static long parseSnapshotSize(Cell c) throws InvalidProtocolBufferException {
691    ByteString bs =
692      UnsafeByteOperations.unsafeWrap(c.getValueArray(), c.getValueOffset(), c.getValueLength());
693    return QuotaProtos.SpaceQuotaSnapshot.parseFrom(bs).getQuotaUsage();
694  }
695
696  /**
697   * Returns a scanner for all existing namespace snapshot entries.
698   */
699  static Scan createScanForNamespaceSnapshotSizes() {
700    return createScanForNamespaceSnapshotSizes(null);
701  }
702
703  /**
704   * Returns a scanner for all namespace snapshot entries of the given namespace
705   * @param namespace name of the namespace whose snapshot entries are to be scanned
706   */
707  static Scan createScanForNamespaceSnapshotSizes(String namespace) {
708    Scan s = new Scan();
709    if (namespace == null || namespace.isEmpty()) {
710      // Read all namespaces, just look at the row prefix
711      s.setStartStopRowForPrefixScan(QUOTA_NAMESPACE_ROW_KEY_PREFIX);
712    } else {
713      // Fetch the exact row for the table
714      byte[] rowkey = getNamespaceRowKey(namespace);
715      // Fetch just this one row
716      s.withStartRow(rowkey).withStopRow(rowkey, true);
717    }
718
719    // Just the usage family and only the snapshot size qualifiers
720    return s.addFamily(QUOTA_FAMILY_USAGE)
721      .setFilter(new ColumnPrefixFilter(QUOTA_SNAPSHOT_SIZE_QUALIFIER));
722  }
723
724  static Scan createScanForSpaceSnapshotSizes() {
725    return createScanForSpaceSnapshotSizes(null);
726  }
727
728  static Scan createScanForSpaceSnapshotSizes(TableName table) {
729    Scan s = new Scan();
730    if (null == table) {
731      // Read all tables, just look at the row prefix
732      s.setStartStopRowForPrefixScan(QUOTA_TABLE_ROW_KEY_PREFIX);
733    } else {
734      // Fetch the exact row for the table
735      byte[] rowkey = getTableRowKey(table);
736      // Fetch just this one row
737      s.withStartRow(rowkey).withStopRow(rowkey, true);
738    }
739
740    // Just the usage family and only the snapshot size qualifiers
741    return s.addFamily(QUOTA_FAMILY_USAGE)
742      .setFilter(new ColumnPrefixFilter(QUOTA_SNAPSHOT_SIZE_QUALIFIER));
743  }
744
745  /**
746   * Fetches any persisted HBase snapshot sizes stored in the quota table. The sizes here are
747   * computed relative to the table which the snapshot was created from. A snapshot's size will not
748   * include the size of files which the table still refers. These sizes, in bytes, are what is used
749   * internally to compute quota violation for tables and namespaces.
750   * @return A map of snapshot name to size in bytes per space quota computations
751   */
752  public static Map<String, Long> getObservedSnapshotSizes(Connection conn) throws IOException {
753    try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
754      ResultScanner rs = quotaTable.getScanner(createScanForSpaceSnapshotSizes())) {
755      final Map<String, Long> snapshotSizes = new HashMap<>();
756      for (Result r : rs) {
757        CellScanner cs = r.cellScanner();
758        while (cs.advance()) {
759          Cell c = cs.current();
760          final String snapshot = extractSnapshotNameFromSizeCell(c);
761          final long size = parseSnapshotSize(c);
762          snapshotSizes.put(snapshot, size);
763        }
764      }
765      return snapshotSizes;
766    }
767  }
768
769  /**
770   * Returns a multimap for all existing table snapshot entries.
771   * @param conn connection to re-use
772   */
773  public static Multimap<TableName, String> getTableSnapshots(Connection conn) throws IOException {
774    try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
775      ResultScanner rs = quotaTable.getScanner(createScanForSpaceSnapshotSizes())) {
776      Multimap<TableName, String> snapshots = HashMultimap.create();
777      for (Result r : rs) {
778        CellScanner cs = r.cellScanner();
779        while (cs.advance()) {
780          Cell c = cs.current();
781
782          final String snapshot = extractSnapshotNameFromSizeCell(c);
783          snapshots.put(getTableFromRowKey(r.getRow()), snapshot);
784        }
785      }
786      return snapshots;
787    }
788  }
789
790  /**
791   * Returns a set of the names of all namespaces containing snapshot entries.
792   * @param conn connection to re-use
793   */
794  public static Set<String> getNamespaceSnapshots(Connection conn) throws IOException {
795    try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
796      ResultScanner rs = quotaTable.getScanner(createScanForNamespaceSnapshotSizes())) {
797      Set<String> snapshots = new HashSet<>();
798      for (Result r : rs) {
799        CellScanner cs = r.cellScanner();
800        while (cs.advance()) {
801          cs.current();
802          snapshots.add(getNamespaceFromRowKey(r.getRow()));
803        }
804      }
805      return snapshots;
806    }
807  }
808
809  /**
810   * Returns the current space quota snapshot of the given {@code tableName} from
811   * {@code QuotaTableUtil.QUOTA_TABLE_NAME} or null if the no quota information is available for
812   * that tableName.
813   * @param conn      connection to re-use
814   * @param tableName name of the table whose current snapshot is to be retreived
815   */
816  public static SpaceQuotaSnapshot getCurrentSnapshotFromQuotaTable(Connection conn,
817    TableName tableName) throws IOException {
818    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
819      Map<TableName, SpaceQuotaSnapshot> snapshots = new HashMap<>(1);
820      Result result = quotaTable.get(makeQuotaSnapshotGetForTable(tableName));
821      // if we don't have any row corresponding to this get, return null
822      if (result.isEmpty()) {
823        return null;
824      }
825      // otherwise, extract quota snapshot in snapshots object
826      extractQuotaSnapshot(result, snapshots);
827      return snapshots.get(tableName);
828    }
829  }
830
831  /*
832   * ========================================================================= Quotas protobuf
833   * helpers
834   */
835  protected static Quotas quotasFromData(final byte[] data) throws IOException {
836    return quotasFromData(data, 0, data.length);
837  }
838
839  protected static Quotas quotasFromData(final byte[] data, int offset, int length)
840    throws IOException {
841    int magicLen = ProtobufMagic.lengthOfPBMagic();
842    if (!ProtobufMagic.isPBMagicPrefix(data, offset, magicLen)) {
843      throw new IOException("Missing pb magic prefix");
844    }
845    return Quotas.parseFrom(new ByteArrayInputStream(data, offset + magicLen, length - magicLen));
846  }
847
848  protected static byte[] quotasToData(final Quotas data) throws IOException {
849    ByteArrayOutputStream stream = new ByteArrayOutputStream();
850    stream.write(ProtobufMagic.PB_MAGIC);
851    data.writeTo(stream);
852    return stream.toByteArray();
853  }
854
855  public static boolean isEmptyQuota(final Quotas quotas) {
856    boolean hasSettings = false;
857    hasSettings |= quotas.hasThrottle();
858    hasSettings |= quotas.hasBypassGlobals();
859    // Only when there is a space quota, make sure there's actually both fields provided
860    // Otherwise, it's a noop.
861    if (quotas.hasSpace()) {
862      hasSettings |= (quotas.getSpace().hasSoftLimit() && quotas.getSpace().hasViolationPolicy());
863    }
864    return !hasSettings;
865  }
866
867  /*
868   * ========================================================================= HTable helpers
869   */
870  protected static Result doGet(final Connection connection, final Get get) throws IOException {
871    try (Table table = connection.getTable(QUOTA_TABLE_NAME)) {
872      return table.get(get);
873    }
874  }
875
876  protected static Result[] doGet(final Connection connection, final List<Get> gets)
877    throws IOException {
878    try (Table table = connection.getTable(QUOTA_TABLE_NAME)) {
879      return table.get(gets);
880    }
881  }
882
883  /*
884   * ========================================================================= Quota table row key
885   * helpers
886   */
887  protected static byte[] getUserRowKey(final String user) {
888    return Bytes.add(QUOTA_USER_ROW_KEY_PREFIX, Bytes.toBytes(user));
889  }
890
891  protected static byte[] getTableRowKey(final TableName table) {
892    return Bytes.add(QUOTA_TABLE_ROW_KEY_PREFIX, table.getName());
893  }
894
895  protected static byte[] getNamespaceRowKey(final String namespace) {
896    return Bytes.add(QUOTA_NAMESPACE_ROW_KEY_PREFIX, Bytes.toBytes(namespace));
897  }
898
899  protected static byte[] getRegionServerRowKey(final String regionServer) {
900    return Bytes.add(QUOTA_REGION_SERVER_ROW_KEY_PREFIX, Bytes.toBytes(regionServer));
901  }
902
903  protected static byte[] getSettingsQualifierForUserTable(final TableName tableName) {
904    return Bytes.add(QUOTA_QUALIFIER_SETTINGS_PREFIX, tableName.getName());
905  }
906
907  protected static byte[] getSettingsQualifierForUserNamespace(final String namespace) {
908    return Bytes.add(QUOTA_QUALIFIER_SETTINGS_PREFIX,
909      Bytes.toBytes(namespace + TableName.NAMESPACE_DELIM));
910  }
911
912  protected static String getUserRowKeyRegex(final String user) {
913    return getRowKeyRegEx(QUOTA_USER_ROW_KEY_PREFIX, user);
914  }
915
916  protected static String getTableRowKeyRegex(final String table) {
917    return getRowKeyRegEx(QUOTA_TABLE_ROW_KEY_PREFIX, table);
918  }
919
920  protected static String getNamespaceRowKeyRegex(final String namespace) {
921    return getRowKeyRegEx(QUOTA_NAMESPACE_ROW_KEY_PREFIX, namespace);
922  }
923
924  private static String getRegionServerRowKeyRegex(final String regionServer) {
925    return getRowKeyRegEx(QUOTA_REGION_SERVER_ROW_KEY_PREFIX, regionServer);
926  }
927
928  protected static byte[] getExceedThrottleQuotaRowKey() {
929    return QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY;
930  }
931
932  private static String getRowKeyRegEx(final byte[] prefix, final String regex) {
933    return '^' + Pattern.quote(Bytes.toString(prefix)) + regex + '$';
934  }
935
936  protected static String getSettingsQualifierRegexForUserTable(final String table) {
937    return '^' + Pattern.quote(Bytes.toString(QUOTA_QUALIFIER_SETTINGS_PREFIX)) + table + "(?<!"
938      + Pattern.quote(Character.toString(TableName.NAMESPACE_DELIM)) + ")$";
939  }
940
941  protected static String getSettingsQualifierRegexForUserNamespace(final String namespace) {
942    return '^' + Pattern.quote(Bytes.toString(QUOTA_QUALIFIER_SETTINGS_PREFIX)) + namespace
943      + Pattern.quote(Character.toString(TableName.NAMESPACE_DELIM)) + '$';
944  }
945
946  protected static boolean isNamespaceRowKey(final byte[] key) {
947    return Bytes.startsWith(key, QUOTA_NAMESPACE_ROW_KEY_PREFIX);
948  }
949
950  protected static String getNamespaceFromRowKey(final byte[] key) {
951    return Bytes.toString(key, QUOTA_NAMESPACE_ROW_KEY_PREFIX.length);
952  }
953
954  protected static boolean isRegionServerRowKey(final byte[] key) {
955    return Bytes.startsWith(key, QUOTA_REGION_SERVER_ROW_KEY_PREFIX);
956  }
957
958  private static boolean isExceedThrottleQuotaRowKey(final byte[] key) {
959    return Bytes.equals(key, QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY);
960  }
961
962  protected static String getRegionServerFromRowKey(final byte[] key) {
963    return Bytes.toString(key, QUOTA_REGION_SERVER_ROW_KEY_PREFIX.length);
964  }
965
966  protected static boolean isTableRowKey(final byte[] key) {
967    return Bytes.startsWith(key, QUOTA_TABLE_ROW_KEY_PREFIX);
968  }
969
970  protected static TableName getTableFromRowKey(final byte[] key) {
971    return TableName.valueOf(Bytes.toString(key, QUOTA_TABLE_ROW_KEY_PREFIX.length));
972  }
973
974  protected static boolean isUserRowKey(final byte[] key) {
975    return Bytes.startsWith(key, QUOTA_USER_ROW_KEY_PREFIX);
976  }
977
978  protected static String getUserFromRowKey(final byte[] key) {
979    return Bytes.toString(key, QUOTA_USER_ROW_KEY_PREFIX.length);
980  }
981
982  protected static SpaceQuota getProtoViolationPolicy(SpaceViolationPolicy policy) {
983    return SpaceQuota.newBuilder().setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy))
984      .build();
985  }
986
987  protected static SpaceViolationPolicy getViolationPolicy(SpaceQuota proto) {
988    if (!proto.hasViolationPolicy()) {
989      throw new IllegalArgumentException("Protobuf SpaceQuota does not have violation policy.");
990    }
991    return ProtobufUtil.toViolationPolicy(proto.getViolationPolicy());
992  }
993
994  protected static byte[] getSnapshotSizeQualifier(String snapshotName) {
995    return Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshotName));
996  }
997
998  protected static String extractSnapshotNameFromSizeCell(Cell c) {
999    return Bytes.toString(c.getQualifierArray(),
1000      c.getQualifierOffset() + QUOTA_SNAPSHOT_SIZE_QUALIFIER.length,
1001      c.getQualifierLength() - QUOTA_SNAPSHOT_SIZE_QUALIFIER.length);
1002  }
1003
1004  protected static long extractSnapshotSize(byte[] data, int offset, int length)
1005    throws InvalidProtocolBufferException {
1006    ByteString byteStr = UnsafeByteOperations.unsafeWrap(data, offset, length);
1007    return org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot
1008      .parseFrom(byteStr).getQuotaUsage();
1009  }
1010}