001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.DoNotRetryIOException;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.TableNotDisabledException;
031import org.apache.hadoop.hbase.TableNotEnabledException;
032import org.apache.hadoop.hbase.TableNotFoundException;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.Delete;
036import org.apache.hadoop.hbase.client.Get;
037import org.apache.hadoop.hbase.client.Mutation;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.regionserver.BloomType;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
046import org.apache.hadoop.hbase.util.Pair;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.apache.yetus.audience.InterfaceStability;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
057
058/**
059 * Helper class to interact with the quota table
060 */
061@InterfaceAudience.Private
062@InterfaceStability.Evolving
063public class QuotaUtil extends QuotaTableUtil {
064  private static final Logger LOG = LoggerFactory.getLogger(QuotaUtil.class);
065
066  public static final String QUOTA_CONF_KEY = "hbase.quota.enabled";
067  private static final boolean QUOTA_ENABLED_DEFAULT = false;
068
069  public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit";
070  // the default one read capacity unit is 1024 bytes (1KB)
071  public static final long DEFAULT_READ_CAPACITY_UNIT = 1024;
072  public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit";
073  // the default one write capacity unit is 1024 bytes (1KB)
074  public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024;
075
076  /** Table descriptor for Quota internal table */
077  public static final TableDescriptor QUOTA_TABLE_DESC =
078    TableDescriptorBuilder.newBuilder(QUOTA_TABLE_NAME)
079      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_INFO)
080        .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW)
081        .setMaxVersions(1).build())
082      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_USAGE)
083        .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW)
084        .setMaxVersions(1).build())
085      .build();
086
087  /** Returns true if the support for quota is enabled */
088  public static boolean isQuotaEnabled(final Configuration conf) {
089    return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT);
090  }
091
092  /*
093   * ========================================================================= Quota "settings"
094   * helpers
095   */
096  public static void addTableQuota(final Connection connection, final TableName table,
097    final Quotas data) throws IOException {
098    addQuotas(connection, getTableRowKey(table), data);
099  }
100
101  public static void deleteTableQuota(final Connection connection, final TableName table)
102    throws IOException {
103    deleteQuotas(connection, getTableRowKey(table));
104  }
105
106  public static void addNamespaceQuota(final Connection connection, final String namespace,
107    final Quotas data) throws IOException {
108    addQuotas(connection, getNamespaceRowKey(namespace), data);
109  }
110
111  public static void deleteNamespaceQuota(final Connection connection, final String namespace)
112    throws IOException {
113    deleteQuotas(connection, getNamespaceRowKey(namespace));
114  }
115
116  public static void addUserQuota(final Connection connection, final String user, final Quotas data)
117    throws IOException {
118    addQuotas(connection, getUserRowKey(user), data);
119  }
120
121  public static void addUserQuota(final Connection connection, final String user,
122    final TableName table, final Quotas data) throws IOException {
123    addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table), data);
124  }
125
126  public static void addUserQuota(final Connection connection, final String user,
127    final String namespace, final Quotas data) throws IOException {
128    addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace),
129      data);
130  }
131
132  public static void deleteUserQuota(final Connection connection, final String user)
133    throws IOException {
134    deleteQuotas(connection, getUserRowKey(user));
135  }
136
137  public static void deleteUserQuota(final Connection connection, final String user,
138    final TableName table) throws IOException {
139    deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table));
140  }
141
142  public static void deleteUserQuota(final Connection connection, final String user,
143    final String namespace) throws IOException {
144    deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace));
145  }
146
147  public static void addRegionServerQuota(final Connection connection, final String regionServer,
148    final Quotas data) throws IOException {
149    addQuotas(connection, getRegionServerRowKey(regionServer), data);
150  }
151
152  public static void deleteRegionServerQuota(final Connection connection, final String regionServer)
153    throws IOException {
154    deleteQuotas(connection, getRegionServerRowKey(regionServer));
155  }
156
157  protected static void switchExceedThrottleQuota(final Connection connection,
158    boolean exceedThrottleQuotaEnabled) throws IOException {
159    if (exceedThrottleQuotaEnabled) {
160      checkRSQuotaToEnableExceedThrottle(
161        getRegionServerQuota(connection, QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY));
162    }
163
164    Put put = new Put(getExceedThrottleQuotaRowKey());
165    put.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS,
166      Bytes.toBytes(exceedThrottleQuotaEnabled));
167    doPut(connection, put);
168  }
169
170  private static void checkRSQuotaToEnableExceedThrottle(Quotas quotas) throws IOException {
171    if (quotas != null && quotas.hasThrottle()) {
172      Throttle throttle = quotas.getThrottle();
173      // If enable exceed throttle quota, make sure that there are at least one read(req/read +
174      // num/size/cu) and one write(req/write + num/size/cu) region server throttle quotas.
175      boolean hasReadQuota = false;
176      boolean hasWriteQuota = false;
177      if (throttle.hasReqNum() || throttle.hasReqSize() || throttle.hasReqCapacityUnit()) {
178        hasReadQuota = true;
179        hasWriteQuota = true;
180      }
181      if (
182        !hasReadQuota
183          && (throttle.hasReadNum() || throttle.hasReadSize() || throttle.hasReadCapacityUnit())
184      ) {
185        hasReadQuota = true;
186      }
187      if (!hasReadQuota) {
188        throw new DoNotRetryIOException(
189          "Please set at least one read region server quota before enable exceed throttle quota");
190      }
191      if (
192        !hasWriteQuota
193          && (throttle.hasWriteNum() || throttle.hasWriteSize() || throttle.hasWriteCapacityUnit())
194      ) {
195        hasWriteQuota = true;
196      }
197      if (!hasWriteQuota) {
198        throw new DoNotRetryIOException("Please set at least one write region server quota "
199          + "before enable exceed throttle quota");
200      }
201      // If enable exceed throttle quota, make sure that region server throttle quotas are in
202      // seconds time unit. Because once previous requests exceed their quota and consume region
203      // server quota, quota in other time units may be refilled in a long time, this may affect
204      // later requests.
205      List<Pair<Boolean, TimedQuota>> list =
206        Arrays.asList(Pair.newPair(throttle.hasReqNum(), throttle.getReqNum()),
207          Pair.newPair(throttle.hasReadNum(), throttle.getReadNum()),
208          Pair.newPair(throttle.hasWriteNum(), throttle.getWriteNum()),
209          Pair.newPair(throttle.hasReqSize(), throttle.getReqSize()),
210          Pair.newPair(throttle.hasReadSize(), throttle.getReadSize()),
211          Pair.newPair(throttle.hasWriteSize(), throttle.getWriteSize()),
212          Pair.newPair(throttle.hasReqCapacityUnit(), throttle.getReqCapacityUnit()),
213          Pair.newPair(throttle.hasReadCapacityUnit(), throttle.getReadCapacityUnit()),
214          Pair.newPair(throttle.hasWriteCapacityUnit(), throttle.getWriteCapacityUnit()));
215      for (Pair<Boolean, TimedQuota> pair : list) {
216        if (pair.getFirst()) {
217          if (pair.getSecond().getTimeUnit() != TimeUnit.SECONDS) {
218            throw new DoNotRetryIOException("All region server quota must be "
219              + "in seconds time unit if enable exceed throttle quota");
220          }
221        }
222      }
223    } else {
224      // If enable exceed throttle quota, make sure that region server quota is already set
225      throw new DoNotRetryIOException(
226        "Please set region server quota before enable exceed throttle quota");
227    }
228  }
229
230  protected static boolean isExceedThrottleQuotaEnabled(final Connection connection)
231    throws IOException {
232    Get get = new Get(getExceedThrottleQuotaRowKey());
233    get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
234    Result result = doGet(connection, get);
235    if (result.isEmpty()) {
236      return false;
237    }
238    return Bytes.toBoolean(result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS));
239  }
240
241  private static void addQuotas(final Connection connection, final byte[] rowKey, final Quotas data)
242    throws IOException {
243    addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data);
244  }
245
246  private static void addQuotas(final Connection connection, final byte[] rowKey,
247    final byte[] qualifier, final Quotas data) throws IOException {
248    Put put = new Put(rowKey);
249    put.addColumn(QUOTA_FAMILY_INFO, qualifier, quotasToData(data));
250    doPut(connection, put);
251  }
252
253  private static void deleteQuotas(final Connection connection, final byte[] rowKey)
254    throws IOException {
255    deleteQuotas(connection, rowKey, null);
256  }
257
258  private static void deleteQuotas(final Connection connection, final byte[] rowKey,
259    final byte[] qualifier) throws IOException {
260    Delete delete = new Delete(rowKey);
261    if (qualifier != null) {
262      delete.addColumns(QUOTA_FAMILY_INFO, qualifier);
263    }
264    if (isNamespaceRowKey(rowKey)) {
265      String ns = getNamespaceFromRowKey(rowKey);
266      Quotas namespaceQuota = getNamespaceQuota(connection, ns);
267      if (namespaceQuota != null && namespaceQuota.hasSpace()) {
268        // When deleting namespace space quota, also delete table usage(u:p) snapshots
269        deleteTableUsageSnapshotsForNamespace(connection, ns);
270      }
271    }
272    doDelete(connection, delete);
273  }
274
275  public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection,
276    final List<Get> gets, Map<TableName, Double> tableMachineQuotaFactors, double factor)
277    throws IOException {
278    long nowTs = EnvironmentEdgeManager.currentTime();
279    Result[] results = doGet(connection, gets);
280
281    Map<String, UserQuotaState> userQuotas = new HashMap<>(results.length);
282    for (int i = 0; i < results.length; ++i) {
283      byte[] key = gets.get(i).getRow();
284      assert isUserRowKey(key);
285      String user = getUserFromRowKey(key);
286
287      final UserQuotaState quotaInfo = new UserQuotaState(nowTs);
288      userQuotas.put(user, quotaInfo);
289
290      if (results[i].isEmpty()) continue;
291      assert Bytes.equals(key, results[i].getRow());
292
293      try {
294        parseUserResult(user, results[i], new UserQuotasVisitor() {
295          @Override
296          public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
297            quotas = updateClusterQuotaToMachineQuota(quotas, factor);
298            quotaInfo.setQuotas(namespace, quotas);
299          }
300
301          @Override
302          public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
303            quotas = updateClusterQuotaToMachineQuota(quotas,
304              tableMachineQuotaFactors.containsKey(table)
305                ? tableMachineQuotaFactors.get(table)
306                : 1);
307            quotaInfo.setQuotas(table, quotas);
308          }
309
310          @Override
311          public void visitUserQuotas(String userName, Quotas quotas) {
312            quotas = updateClusterQuotaToMachineQuota(quotas, factor);
313            quotaInfo.setQuotas(quotas);
314          }
315        });
316      } catch (IOException e) {
317        LOG.error("Unable to parse user '" + user + "' quotas", e);
318        userQuotas.remove(user);
319      }
320    }
321    return userQuotas;
322  }
323
324  public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection,
325    final List<Get> gets, Map<TableName, Double> tableMachineFactors) throws IOException {
326    return fetchGlobalQuotas("table", connection, gets, new KeyFromRow<TableName>() {
327      @Override
328      public TableName getKeyFromRow(final byte[] row) {
329        assert isTableRowKey(row);
330        return getTableFromRowKey(row);
331      }
332
333      @Override
334      public double getFactor(TableName tableName) {
335        return tableMachineFactors.containsKey(tableName) ? tableMachineFactors.get(tableName) : 1;
336      }
337    });
338  }
339
340  public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection,
341    final List<Get> gets, double factor) throws IOException {
342    return fetchGlobalQuotas("namespace", connection, gets, new KeyFromRow<String>() {
343      @Override
344      public String getKeyFromRow(final byte[] row) {
345        assert isNamespaceRowKey(row);
346        return getNamespaceFromRowKey(row);
347      }
348
349      @Override
350      public double getFactor(String s) {
351        return factor;
352      }
353    });
354  }
355
356  public static Map<String, QuotaState> fetchRegionServerQuotas(final Connection connection,
357    final List<Get> gets) throws IOException {
358    return fetchGlobalQuotas("regionServer", connection, gets, new KeyFromRow<String>() {
359      @Override
360      public String getKeyFromRow(final byte[] row) {
361        assert isRegionServerRowKey(row);
362        return getRegionServerFromRowKey(row);
363      }
364
365      @Override
366      public double getFactor(String s) {
367        return 1;
368      }
369    });
370  }
371
372  public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type,
373    final Connection connection, final List<Get> gets, final KeyFromRow<K> kfr) throws IOException {
374    long nowTs = EnvironmentEdgeManager.currentTime();
375    Result[] results = doGet(connection, gets);
376
377    Map<K, QuotaState> globalQuotas = new HashMap<>(results.length);
378    for (int i = 0; i < results.length; ++i) {
379      byte[] row = gets.get(i).getRow();
380      K key = kfr.getKeyFromRow(row);
381
382      QuotaState quotaInfo = new QuotaState(nowTs);
383      globalQuotas.put(key, quotaInfo);
384
385      if (results[i].isEmpty()) continue;
386      assert Bytes.equals(row, results[i].getRow());
387
388      byte[] data = results[i].getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
389      if (data == null) continue;
390
391      try {
392        Quotas quotas = quotasFromData(data);
393        quotas = updateClusterQuotaToMachineQuota(quotas, kfr.getFactor(key));
394        quotaInfo.setQuotas(quotas);
395      } catch (IOException e) {
396        LOG.error("Unable to parse " + type + " '" + key + "' quotas", e);
397        globalQuotas.remove(key);
398      }
399    }
400    return globalQuotas;
401  }
402
403  /**
404   * Convert cluster scope quota to machine scope quota
405   * @param quotas the original quota
406   * @param factor factor used to divide cluster limiter to machine limiter
407   * @return the converted quota whose quota limiters all in machine scope
408   */
409  private static Quotas updateClusterQuotaToMachineQuota(Quotas quotas, double factor) {
410    Quotas.Builder newQuotas = Quotas.newBuilder(quotas);
411    if (newQuotas.hasThrottle()) {
412      Throttle.Builder throttle = Throttle.newBuilder(newQuotas.getThrottle());
413      if (throttle.hasReqNum()) {
414        throttle.setReqNum(updateTimedQuota(throttle.getReqNum(), factor));
415      }
416      if (throttle.hasReqSize()) {
417        throttle.setReqSize(updateTimedQuota(throttle.getReqSize(), factor));
418      }
419      if (throttle.hasReadNum()) {
420        throttle.setReadNum(updateTimedQuota(throttle.getReadNum(), factor));
421      }
422      if (throttle.hasReadSize()) {
423        throttle.setReadSize(updateTimedQuota(throttle.getReadSize(), factor));
424      }
425      if (throttle.hasWriteNum()) {
426        throttle.setWriteNum(updateTimedQuota(throttle.getWriteNum(), factor));
427      }
428      if (throttle.hasWriteSize()) {
429        throttle.setWriteSize(updateTimedQuota(throttle.getWriteSize(), factor));
430      }
431      if (throttle.hasReqCapacityUnit()) {
432        throttle.setReqCapacityUnit(updateTimedQuota(throttle.getReqCapacityUnit(), factor));
433      }
434      if (throttle.hasReadCapacityUnit()) {
435        throttle.setReadCapacityUnit(updateTimedQuota(throttle.getReadCapacityUnit(), factor));
436      }
437      if (throttle.hasWriteCapacityUnit()) {
438        throttle.setWriteCapacityUnit(updateTimedQuota(throttle.getWriteCapacityUnit(), factor));
439      }
440      newQuotas.setThrottle(throttle.build());
441    }
442    return newQuotas.build();
443  }
444
445  private static TimedQuota updateTimedQuota(TimedQuota timedQuota, double factor) {
446    if (timedQuota.getScope() == QuotaScope.CLUSTER) {
447      TimedQuota.Builder newTimedQuota = TimedQuota.newBuilder(timedQuota);
448      newTimedQuota.setSoftLimit(Math.max(1, (long) (timedQuota.getSoftLimit() * factor)))
449        .setScope(QuotaScope.MACHINE);
450      return newTimedQuota.build();
451    } else {
452      return timedQuota;
453    }
454  }
455
456  private static interface KeyFromRow<T> {
457    T getKeyFromRow(final byte[] row);
458
459    double getFactor(T t);
460  }
461
462  /*
463   * ========================================================================= HTable helpers
464   */
465  private static void doPut(final Connection connection, final Put put) throws IOException {
466    try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
467      table.put(put);
468    }
469  }
470
471  private static void doDelete(final Connection connection, final Delete delete)
472    throws IOException {
473    try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
474      table.delete(delete);
475    }
476  }
477
478  /*
479   * ========================================================================= Data Size Helpers
480   */
481  public static long calculateMutationSize(final Mutation mutation) {
482    long size = 0;
483    for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) {
484      for (Cell cell : entry.getValue()) {
485        size += cell.getSerializedSize();
486      }
487    }
488    return size;
489  }
490
491  public static long calculateResultSize(final Result result) {
492    long size = 0;
493    for (Cell cell : result.rawCells()) {
494      size += cell.getSerializedSize();
495    }
496    return size;
497  }
498
499  public static long calculateResultSize(final List<Result> results) {
500    long size = 0;
501    for (Result result : results) {
502      for (Cell cell : result.rawCells()) {
503        size += cell.getSerializedSize();
504      }
505    }
506    return size;
507  }
508
509  /**
510   * Method to enable a table, if not already enabled. This method suppresses
511   * {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling
512   * the table.
513   * @param conn      connection to re-use
514   * @param tableName name of the table to be enabled
515   */
516  public static void enableTableIfNotEnabled(Connection conn, TableName tableName)
517    throws IOException {
518    try {
519      conn.getAdmin().enableTable(tableName);
520    } catch (TableNotDisabledException | TableNotFoundException e) {
521      // ignore
522    }
523  }
524
525  /**
526   * Method to disable a table, if not already disabled. This method suppresses
527   * {@link TableNotEnabledException}, if thrown while disabling the table.
528   * @param conn      connection to re-use
529   * @param tableName table name which has moved into space quota violation
530   */
531  public static void disableTableIfNotDisabled(Connection conn, TableName tableName)
532    throws IOException {
533    try {
534      conn.getAdmin().disableTable(tableName);
535    } catch (TableNotEnabledException | TableNotFoundException e) {
536      // ignore
537    }
538  }
539}