001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.DoNotRetryIOException;
028import org.apache.hadoop.hbase.HColumnDescriptor;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.HTableDescriptor;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.TableNotDisabledException;
033import org.apache.hadoop.hbase.TableNotEnabledException;
034import org.apache.hadoop.hbase.TableNotFoundException;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.Delete;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Mutation;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.regionserver.BloomType;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.hadoop.hbase.util.Pair;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.apache.yetus.audience.InterfaceStability;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
056
057/**
058 * Helper class to interact with the quota table
059 */
060@InterfaceAudience.Private
061@InterfaceStability.Evolving
062public class QuotaUtil extends QuotaTableUtil {
063  private static final Logger LOG = LoggerFactory.getLogger(QuotaUtil.class);
064
065  public static final String QUOTA_CONF_KEY = "hbase.quota.enabled";
066  private static final boolean QUOTA_ENABLED_DEFAULT = false;
067
068  public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit";
069  // the default one read capacity unit is 1024 bytes (1KB)
070  public static final long DEFAULT_READ_CAPACITY_UNIT = 1024;
071  public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit";
072  // the default one write capacity unit is 1024 bytes (1KB)
073  public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024;
074
075  /** Table descriptor for Quota internal table */
076  public static final HTableDescriptor QUOTA_TABLE_DESC = new HTableDescriptor(QUOTA_TABLE_NAME);
077  static {
078    QUOTA_TABLE_DESC.addFamily(
079      new HColumnDescriptor(QUOTA_FAMILY_INFO).setScope(HConstants.REPLICATION_SCOPE_LOCAL)
080        .setBloomFilterType(BloomType.ROW).setMaxVersions(1));
081    QUOTA_TABLE_DESC.addFamily(
082      new HColumnDescriptor(QUOTA_FAMILY_USAGE).setScope(HConstants.REPLICATION_SCOPE_LOCAL)
083        .setBloomFilterType(BloomType.ROW).setMaxVersions(1));
084  }
085
086  /** Returns true if the support for quota is enabled */
087  public static boolean isQuotaEnabled(final Configuration conf) {
088    return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT);
089  }
090
091  /*
092   * ========================================================================= Quota "settings"
093   * helpers
094   */
095  public static void addTableQuota(final Connection connection, final TableName table,
096    final Quotas data) throws IOException {
097    addQuotas(connection, getTableRowKey(table), data);
098  }
099
100  public static void deleteTableQuota(final Connection connection, final TableName table)
101    throws IOException {
102    deleteQuotas(connection, getTableRowKey(table));
103  }
104
105  public static void addNamespaceQuota(final Connection connection, final String namespace,
106    final Quotas data) throws IOException {
107    addQuotas(connection, getNamespaceRowKey(namespace), data);
108  }
109
110  public static void deleteNamespaceQuota(final Connection connection, final String namespace)
111    throws IOException {
112    deleteQuotas(connection, getNamespaceRowKey(namespace));
113  }
114
115  public static void addUserQuota(final Connection connection, final String user, final Quotas data)
116    throws IOException {
117    addQuotas(connection, getUserRowKey(user), data);
118  }
119
120  public static void addUserQuota(final Connection connection, final String user,
121    final TableName table, final Quotas data) throws IOException {
122    addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table), data);
123  }
124
125  public static void addUserQuota(final Connection connection, final String user,
126    final String namespace, final Quotas data) throws IOException {
127    addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace),
128      data);
129  }
130
131  public static void deleteUserQuota(final Connection connection, final String user)
132    throws IOException {
133    deleteQuotas(connection, getUserRowKey(user));
134  }
135
136  public static void deleteUserQuota(final Connection connection, final String user,
137    final TableName table) throws IOException {
138    deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table));
139  }
140
141  public static void deleteUserQuota(final Connection connection, final String user,
142    final String namespace) throws IOException {
143    deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace));
144  }
145
146  public static void addRegionServerQuota(final Connection connection, final String regionServer,
147    final Quotas data) throws IOException {
148    addQuotas(connection, getRegionServerRowKey(regionServer), data);
149  }
150
151  public static void deleteRegionServerQuota(final Connection connection, final String regionServer)
152    throws IOException {
153    deleteQuotas(connection, getRegionServerRowKey(regionServer));
154  }
155
156  protected static void switchExceedThrottleQuota(final Connection connection,
157    boolean exceedThrottleQuotaEnabled) throws IOException {
158    if (exceedThrottleQuotaEnabled) {
159      checkRSQuotaToEnableExceedThrottle(
160        getRegionServerQuota(connection, QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY));
161    }
162
163    Put put = new Put(getExceedThrottleQuotaRowKey());
164    put.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS,
165      Bytes.toBytes(exceedThrottleQuotaEnabled));
166    doPut(connection, put);
167  }
168
169  private static void checkRSQuotaToEnableExceedThrottle(Quotas quotas) throws IOException {
170    if (quotas != null && quotas.hasThrottle()) {
171      Throttle throttle = quotas.getThrottle();
172      // If enable exceed throttle quota, make sure that there are at least one read(req/read +
173      // num/size/cu) and one write(req/write + num/size/cu) region server throttle quotas.
174      boolean hasReadQuota = false;
175      boolean hasWriteQuota = false;
176      if (throttle.hasReqNum() || throttle.hasReqSize() || throttle.hasReqCapacityUnit()) {
177        hasReadQuota = true;
178        hasWriteQuota = true;
179      }
180      if (
181        !hasReadQuota
182          && (throttle.hasReadNum() || throttle.hasReadSize() || throttle.hasReadCapacityUnit())
183      ) {
184        hasReadQuota = true;
185      }
186      if (!hasReadQuota) {
187        throw new DoNotRetryIOException(
188          "Please set at least one read region server quota before enable exceed throttle quota");
189      }
190      if (
191        !hasWriteQuota
192          && (throttle.hasWriteNum() || throttle.hasWriteSize() || throttle.hasWriteCapacityUnit())
193      ) {
194        hasWriteQuota = true;
195      }
196      if (!hasWriteQuota) {
197        throw new DoNotRetryIOException("Please set at least one write region server quota "
198          + "before enable exceed throttle quota");
199      }
200      // If enable exceed throttle quota, make sure that region server throttle quotas are in
201      // seconds time unit. Because once previous requests exceed their quota and consume region
202      // server quota, quota in other time units may be refilled in a long time, this may affect
203      // later requests.
204      List<Pair<Boolean, TimedQuota>> list =
205        Arrays.asList(Pair.newPair(throttle.hasReqNum(), throttle.getReqNum()),
206          Pair.newPair(throttle.hasReadNum(), throttle.getReadNum()),
207          Pair.newPair(throttle.hasWriteNum(), throttle.getWriteNum()),
208          Pair.newPair(throttle.hasReqSize(), throttle.getReqSize()),
209          Pair.newPair(throttle.hasReadSize(), throttle.getReadSize()),
210          Pair.newPair(throttle.hasWriteSize(), throttle.getWriteSize()),
211          Pair.newPair(throttle.hasReqCapacityUnit(), throttle.getReqCapacityUnit()),
212          Pair.newPair(throttle.hasReadCapacityUnit(), throttle.getReadCapacityUnit()),
213          Pair.newPair(throttle.hasWriteCapacityUnit(), throttle.getWriteCapacityUnit()));
214      for (Pair<Boolean, TimedQuota> pair : list) {
215        if (pair.getFirst()) {
216          if (pair.getSecond().getTimeUnit() != TimeUnit.SECONDS) {
217            throw new DoNotRetryIOException("All region server quota must be "
218              + "in seconds time unit if enable exceed throttle quota");
219          }
220        }
221      }
222    } else {
223      // If enable exceed throttle quota, make sure that region server quota is already set
224      throw new DoNotRetryIOException(
225        "Please set region server quota before enable exceed throttle quota");
226    }
227  }
228
229  protected static boolean isExceedThrottleQuotaEnabled(final Connection connection)
230    throws IOException {
231    Get get = new Get(getExceedThrottleQuotaRowKey());
232    get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
233    Result result = doGet(connection, get);
234    if (result.isEmpty()) {
235      return false;
236    }
237    return Bytes.toBoolean(result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS));
238  }
239
240  private static void addQuotas(final Connection connection, final byte[] rowKey, final Quotas data)
241    throws IOException {
242    addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data);
243  }
244
245  private static void addQuotas(final Connection connection, final byte[] rowKey,
246    final byte[] qualifier, final Quotas data) throws IOException {
247    Put put = new Put(rowKey);
248    put.addColumn(QUOTA_FAMILY_INFO, qualifier, quotasToData(data));
249    doPut(connection, put);
250  }
251
252  private static void deleteQuotas(final Connection connection, final byte[] rowKey)
253    throws IOException {
254    deleteQuotas(connection, rowKey, null);
255  }
256
257  private static void deleteQuotas(final Connection connection, final byte[] rowKey,
258    final byte[] qualifier) throws IOException {
259    Delete delete = new Delete(rowKey);
260    if (qualifier != null) {
261      delete.addColumns(QUOTA_FAMILY_INFO, qualifier);
262    }
263    if (isNamespaceRowKey(rowKey)) {
264      String ns = getNamespaceFromRowKey(rowKey);
265      Quotas namespaceQuota = getNamespaceQuota(connection, ns);
266      if (namespaceQuota != null && namespaceQuota.hasSpace()) {
267        // When deleting namespace space quota, also delete table usage(u:p) snapshots
268        deleteTableUsageSnapshotsForNamespace(connection, ns);
269      }
270    }
271    doDelete(connection, delete);
272  }
273
274  public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection,
275    final List<Get> gets, Map<TableName, Double> tableMachineQuotaFactors, double factor)
276    throws IOException {
277    long nowTs = EnvironmentEdgeManager.currentTime();
278    Result[] results = doGet(connection, gets);
279
280    Map<String, UserQuotaState> userQuotas = new HashMap<>(results.length);
281    for (int i = 0; i < results.length; ++i) {
282      byte[] key = gets.get(i).getRow();
283      assert isUserRowKey(key);
284      String user = getUserFromRowKey(key);
285
286      final UserQuotaState quotaInfo = new UserQuotaState(nowTs);
287      userQuotas.put(user, quotaInfo);
288
289      if (results[i].isEmpty()) continue;
290      assert Bytes.equals(key, results[i].getRow());
291
292      try {
293        parseUserResult(user, results[i], new UserQuotasVisitor() {
294          @Override
295          public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
296            quotas = updateClusterQuotaToMachineQuota(quotas, factor);
297            quotaInfo.setQuotas(namespace, quotas);
298          }
299
300          @Override
301          public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
302            quotas = updateClusterQuotaToMachineQuota(quotas,
303              tableMachineQuotaFactors.containsKey(table)
304                ? tableMachineQuotaFactors.get(table)
305                : 1);
306            quotaInfo.setQuotas(table, quotas);
307          }
308
309          @Override
310          public void visitUserQuotas(String userName, Quotas quotas) {
311            quotas = updateClusterQuotaToMachineQuota(quotas, factor);
312            quotaInfo.setQuotas(quotas);
313          }
314        });
315      } catch (IOException e) {
316        LOG.error("Unable to parse user '" + user + "' quotas", e);
317        userQuotas.remove(user);
318      }
319    }
320    return userQuotas;
321  }
322
323  public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection,
324    final List<Get> gets, Map<TableName, Double> tableMachineFactors) throws IOException {
325    return fetchGlobalQuotas("table", connection, gets, new KeyFromRow<TableName>() {
326      @Override
327      public TableName getKeyFromRow(final byte[] row) {
328        assert isTableRowKey(row);
329        return getTableFromRowKey(row);
330      }
331
332      @Override
333      public double getFactor(TableName tableName) {
334        return tableMachineFactors.containsKey(tableName) ? tableMachineFactors.get(tableName) : 1;
335      }
336    });
337  }
338
339  public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection,
340    final List<Get> gets, double factor) throws IOException {
341    return fetchGlobalQuotas("namespace", connection, gets, new KeyFromRow<String>() {
342      @Override
343      public String getKeyFromRow(final byte[] row) {
344        assert isNamespaceRowKey(row);
345        return getNamespaceFromRowKey(row);
346      }
347
348      @Override
349      public double getFactor(String s) {
350        return factor;
351      }
352    });
353  }
354
355  public static Map<String, QuotaState> fetchRegionServerQuotas(final Connection connection,
356    final List<Get> gets) throws IOException {
357    return fetchGlobalQuotas("regionServer", connection, gets, new KeyFromRow<String>() {
358      @Override
359      public String getKeyFromRow(final byte[] row) {
360        assert isRegionServerRowKey(row);
361        return getRegionServerFromRowKey(row);
362      }
363
364      @Override
365      public double getFactor(String s) {
366        return 1;
367      }
368    });
369  }
370
371  public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type,
372    final Connection connection, final List<Get> gets, final KeyFromRow<K> kfr) throws IOException {
373    long nowTs = EnvironmentEdgeManager.currentTime();
374    Result[] results = doGet(connection, gets);
375
376    Map<K, QuotaState> globalQuotas = new HashMap<>(results.length);
377    for (int i = 0; i < results.length; ++i) {
378      byte[] row = gets.get(i).getRow();
379      K key = kfr.getKeyFromRow(row);
380
381      QuotaState quotaInfo = new QuotaState(nowTs);
382      globalQuotas.put(key, quotaInfo);
383
384      if (results[i].isEmpty()) continue;
385      assert Bytes.equals(row, results[i].getRow());
386
387      byte[] data = results[i].getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
388      if (data == null) continue;
389
390      try {
391        Quotas quotas = quotasFromData(data);
392        quotas = updateClusterQuotaToMachineQuota(quotas, kfr.getFactor(key));
393        quotaInfo.setQuotas(quotas);
394      } catch (IOException e) {
395        LOG.error("Unable to parse " + type + " '" + key + "' quotas", e);
396        globalQuotas.remove(key);
397      }
398    }
399    return globalQuotas;
400  }
401
402  /**
403   * Convert cluster scope quota to machine scope quota
404   * @param quotas the original quota
405   * @param factor factor used to divide cluster limiter to machine limiter
406   * @return the converted quota whose quota limiters all in machine scope
407   */
408  private static Quotas updateClusterQuotaToMachineQuota(Quotas quotas, double factor) {
409    Quotas.Builder newQuotas = Quotas.newBuilder(quotas);
410    if (newQuotas.hasThrottle()) {
411      Throttle.Builder throttle = Throttle.newBuilder(newQuotas.getThrottle());
412      if (throttle.hasReqNum()) {
413        throttle.setReqNum(updateTimedQuota(throttle.getReqNum(), factor));
414      }
415      if (throttle.hasReqSize()) {
416        throttle.setReqSize(updateTimedQuota(throttle.getReqSize(), factor));
417      }
418      if (throttle.hasReadNum()) {
419        throttle.setReadNum(updateTimedQuota(throttle.getReadNum(), factor));
420      }
421      if (throttle.hasReadSize()) {
422        throttle.setReadSize(updateTimedQuota(throttle.getReadSize(), factor));
423      }
424      if (throttle.hasWriteNum()) {
425        throttle.setWriteNum(updateTimedQuota(throttle.getWriteNum(), factor));
426      }
427      if (throttle.hasWriteSize()) {
428        throttle.setWriteSize(updateTimedQuota(throttle.getWriteSize(), factor));
429      }
430      if (throttle.hasReqCapacityUnit()) {
431        throttle.setReqCapacityUnit(updateTimedQuota(throttle.getReqCapacityUnit(), factor));
432      }
433      if (throttle.hasReadCapacityUnit()) {
434        throttle.setReadCapacityUnit(updateTimedQuota(throttle.getReadCapacityUnit(), factor));
435      }
436      if (throttle.hasWriteCapacityUnit()) {
437        throttle.setWriteCapacityUnit(updateTimedQuota(throttle.getWriteCapacityUnit(), factor));
438      }
439      newQuotas.setThrottle(throttle.build());
440    }
441    return newQuotas.build();
442  }
443
444  private static TimedQuota updateTimedQuota(TimedQuota timedQuota, double factor) {
445    if (timedQuota.getScope() == QuotaScope.CLUSTER) {
446      TimedQuota.Builder newTimedQuota = TimedQuota.newBuilder(timedQuota);
447      newTimedQuota.setSoftLimit(Math.max(1, (long) (timedQuota.getSoftLimit() * factor)))
448        .setScope(QuotaScope.MACHINE);
449      return newTimedQuota.build();
450    } else {
451      return timedQuota;
452    }
453  }
454
455  private static interface KeyFromRow<T> {
456    T getKeyFromRow(final byte[] row);
457
458    double getFactor(T t);
459  }
460
461  /*
462   * ========================================================================= HTable helpers
463   */
464  private static void doPut(final Connection connection, final Put put) throws IOException {
465    try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
466      table.put(put);
467    }
468  }
469
470  private static void doDelete(final Connection connection, final Delete delete)
471    throws IOException {
472    try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
473      table.delete(delete);
474    }
475  }
476
477  /*
478   * ========================================================================= Data Size Helpers
479   */
480  public static long calculateMutationSize(final Mutation mutation) {
481    long size = 0;
482    for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) {
483      for (Cell cell : entry.getValue()) {
484        size += cell.getSerializedSize();
485      }
486    }
487    return size;
488  }
489
490  public static long calculateResultSize(final Result result) {
491    long size = 0;
492    for (Cell cell : result.rawCells()) {
493      size += cell.getSerializedSize();
494    }
495    return size;
496  }
497
498  public static long calculateResultSize(final List<Result> results) {
499    long size = 0;
500    for (Result result : results) {
501      for (Cell cell : result.rawCells()) {
502        size += cell.getSerializedSize();
503      }
504    }
505    return size;
506  }
507
508  /**
509   * Method to enable a table, if not already enabled. This method suppresses
510   * {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling
511   * the table.
512   * @param conn      connection to re-use
513   * @param tableName name of the table to be enabled
514   */
515  public static void enableTableIfNotEnabled(Connection conn, TableName tableName)
516    throws IOException {
517    try {
518      conn.getAdmin().enableTable(tableName);
519    } catch (TableNotDisabledException | TableNotFoundException e) {
520      // ignore
521    }
522  }
523
524  /**
525   * Method to disable a table, if not already disabled. This method suppresses
526   * {@link TableNotEnabledException}, if thrown while disabling the table.
527   * @param conn      connection to re-use
528   * @param tableName table name which has moved into space quota violation
529   */
530  public static void disableTableIfNotDisabled(Connection conn, TableName tableName)
531    throws IOException {
532    try {
533      conn.getAdmin().disableTable(tableName);
534    } catch (TableNotEnabledException | TableNotFoundException e) {
535      // ignore
536    }
537  }
538}