001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.quotas;
020
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.DoNotRetryIOException;
030import org.apache.hadoop.hbase.HColumnDescriptor;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.HTableDescriptor;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.TableNotDisabledException;
035import org.apache.hadoop.hbase.TableNotEnabledException;
036import org.apache.hadoop.hbase.TableNotFoundException;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.Delete;
039import org.apache.hadoop.hbase.client.Get;
040import org.apache.hadoop.hbase.client.Mutation;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.regionserver.BloomType;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.hbase.util.Pair;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.apache.yetus.audience.InterfaceStability;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
058
059/**
060 * Helper class to interact with the quota table
061 */
062@InterfaceAudience.Private
063@InterfaceStability.Evolving
064public class QuotaUtil extends QuotaTableUtil {
065  private static final Logger LOG = LoggerFactory.getLogger(QuotaUtil.class);
066
067  public static final String QUOTA_CONF_KEY = "hbase.quota.enabled";
068  private static final boolean QUOTA_ENABLED_DEFAULT = false;
069
070  public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit";
071  // the default one read capacity unit is 1024 bytes (1KB)
072  public static final long DEFAULT_READ_CAPACITY_UNIT = 1024;
073  public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit";
074  // the default one write capacity unit is 1024 bytes (1KB)
075  public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024;
076
077  /** Table descriptor for Quota internal table */
078  public static final HTableDescriptor QUOTA_TABLE_DESC =
079    new HTableDescriptor(QUOTA_TABLE_NAME);
080  static {
081    QUOTA_TABLE_DESC.addFamily(
082      new HColumnDescriptor(QUOTA_FAMILY_INFO)
083        .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
084        .setBloomFilterType(BloomType.ROW)
085        .setMaxVersions(1)
086    );
087    QUOTA_TABLE_DESC.addFamily(
088      new HColumnDescriptor(QUOTA_FAMILY_USAGE)
089        .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
090        .setBloomFilterType(BloomType.ROW)
091        .setMaxVersions(1)
092    );
093  }
094
095  /** Returns true if the support for quota is enabled */
096  public static boolean isQuotaEnabled(final Configuration conf) {
097    return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT);
098  }
099
100  /* =========================================================================
101   *  Quota "settings" helpers
102   */
103  public static void addTableQuota(final Connection connection, final TableName table,
104      final Quotas data) throws IOException {
105    addQuotas(connection, getTableRowKey(table), data);
106  }
107
108  public static void deleteTableQuota(final Connection connection, final TableName table)
109      throws IOException {
110    deleteQuotas(connection, getTableRowKey(table));
111  }
112
113  public static void addNamespaceQuota(final Connection connection, final String namespace,
114      final Quotas data) throws IOException {
115    addQuotas(connection, getNamespaceRowKey(namespace), data);
116  }
117
118  public static void deleteNamespaceQuota(final Connection connection, final String namespace)
119      throws IOException {
120    deleteQuotas(connection, getNamespaceRowKey(namespace));
121  }
122
123  public static void addUserQuota(final Connection connection, final String user,
124      final Quotas data) throws IOException {
125    addQuotas(connection, getUserRowKey(user), data);
126  }
127
128  public static void addUserQuota(final Connection connection, final String user,
129      final TableName table, final Quotas data) throws IOException {
130    addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table), data);
131  }
132
133  public static void addUserQuota(final Connection connection, final String user,
134      final String namespace, final Quotas data) throws IOException {
135    addQuotas(connection, getUserRowKey(user),
136        getSettingsQualifierForUserNamespace(namespace), data);
137  }
138
139  public static void deleteUserQuota(final Connection connection, final String user)
140      throws IOException {
141    deleteQuotas(connection, getUserRowKey(user));
142  }
143
144  public static void deleteUserQuota(final Connection connection, final String user,
145      final TableName table) throws IOException {
146    deleteQuotas(connection, getUserRowKey(user),
147        getSettingsQualifierForUserTable(table));
148  }
149
150  public static void deleteUserQuota(final Connection connection, final String user,
151      final String namespace) throws IOException {
152    deleteQuotas(connection, getUserRowKey(user),
153        getSettingsQualifierForUserNamespace(namespace));
154  }
155
156  public static void addRegionServerQuota(final Connection connection, final String regionServer,
157      final Quotas data) throws IOException {
158    addQuotas(connection, getRegionServerRowKey(regionServer), data);
159  }
160
161  public static void deleteRegionServerQuota(final Connection connection, final String regionServer)
162      throws IOException {
163    deleteQuotas(connection, getRegionServerRowKey(regionServer));
164  }
165
166  protected static void switchExceedThrottleQuota(final Connection connection,
167      boolean exceedThrottleQuotaEnabled) throws IOException {
168    if (exceedThrottleQuotaEnabled) {
169      checkRSQuotaToEnableExceedThrottle(
170        getRegionServerQuota(connection, QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY));
171    }
172
173    Put put = new Put(getExceedThrottleQuotaRowKey());
174    put.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS,
175      Bytes.toBytes(exceedThrottleQuotaEnabled));
176    doPut(connection, put);
177  }
178
179  private static void checkRSQuotaToEnableExceedThrottle(Quotas quotas) throws IOException {
180    if (quotas != null && quotas.hasThrottle()) {
181      Throttle throttle = quotas.getThrottle();
182      // If enable exceed throttle quota, make sure that there are at least one read(req/read +
183      // num/size/cu) and one write(req/write + num/size/cu) region server throttle quotas.
184      boolean hasReadQuota = false;
185      boolean hasWriteQuota = false;
186      if (throttle.hasReqNum() || throttle.hasReqSize() || throttle.hasReqCapacityUnit()) {
187        hasReadQuota = true;
188        hasWriteQuota = true;
189      }
190      if (!hasReadQuota
191          && (throttle.hasReadNum() || throttle.hasReadSize() || throttle.hasReadCapacityUnit())) {
192        hasReadQuota = true;
193      }
194      if (!hasReadQuota) {
195        throw new DoNotRetryIOException(
196            "Please set at least one read region server quota before enable exceed throttle quota");
197      }
198      if (!hasWriteQuota && (throttle.hasWriteNum() || throttle.hasWriteSize()
199          || throttle.hasWriteCapacityUnit())) {
200        hasWriteQuota = true;
201      }
202      if (!hasWriteQuota) {
203        throw new DoNotRetryIOException("Please set at least one write region server quota "
204            + "before enable exceed throttle quota");
205      }
206      // If enable exceed throttle quota, make sure that region server throttle quotas are in
207      // seconds time unit. Because once previous requests exceed their quota and consume region
208      // server quota, quota in other time units may be refilled in a long time, this may affect
209      // later requests.
210      List<Pair<Boolean, TimedQuota>> list =
211          Arrays.asList(Pair.newPair(throttle.hasReqNum(), throttle.getReqNum()),
212            Pair.newPair(throttle.hasReadNum(), throttle.getReadNum()),
213            Pair.newPair(throttle.hasWriteNum(), throttle.getWriteNum()),
214            Pair.newPair(throttle.hasReqSize(), throttle.getReqSize()),
215            Pair.newPair(throttle.hasReadSize(), throttle.getReadSize()),
216            Pair.newPair(throttle.hasWriteSize(), throttle.getWriteSize()),
217            Pair.newPair(throttle.hasReqCapacityUnit(), throttle.getReqCapacityUnit()),
218            Pair.newPair(throttle.hasReadCapacityUnit(), throttle.getReadCapacityUnit()),
219            Pair.newPair(throttle.hasWriteCapacityUnit(), throttle.getWriteCapacityUnit()));
220      for (Pair<Boolean, TimedQuota> pair : list) {
221        if (pair.getFirst()) {
222          if (pair.getSecond().getTimeUnit() != TimeUnit.SECONDS) {
223            throw new DoNotRetryIOException("All region server quota must be "
224                + "in seconds time unit if enable exceed throttle quota");
225          }
226        }
227      }
228    } else {
229      // If enable exceed throttle quota, make sure that region server quota is already set
230      throw new DoNotRetryIOException(
231          "Please set region server quota before enable exceed throttle quota");
232    }
233  }
234
235  protected static boolean isExceedThrottleQuotaEnabled(final Connection connection)
236      throws IOException {
237    Get get = new Get(getExceedThrottleQuotaRowKey());
238    get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
239    Result result = doGet(connection, get);
240    if (result.isEmpty()) {
241      return false;
242    }
243    return Bytes.toBoolean(result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS));
244  }
245
246  private static void addQuotas(final Connection connection, final byte[] rowKey,
247      final Quotas data) throws IOException {
248    addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data);
249  }
250
251  private static void addQuotas(final Connection connection, final byte[] rowKey,
252      final byte[] qualifier, final Quotas data) throws IOException {
253    Put put = new Put(rowKey);
254    put.addColumn(QUOTA_FAMILY_INFO, qualifier, quotasToData(data));
255    doPut(connection, put);
256  }
257
258  private static void deleteQuotas(final Connection connection, final byte[] rowKey)
259      throws IOException {
260    deleteQuotas(connection, rowKey, null);
261  }
262
263  private static void deleteQuotas(final Connection connection, final byte[] rowKey,
264      final byte[] qualifier) throws IOException {
265    Delete delete = new Delete(rowKey);
266    if (qualifier != null) {
267      delete.addColumns(QUOTA_FAMILY_INFO, qualifier);
268    }
269    if (isNamespaceRowKey(rowKey)) {
270      String ns = getNamespaceFromRowKey(rowKey);
271      Quotas namespaceQuota = getNamespaceQuota(connection,ns);
272      if (namespaceQuota != null && namespaceQuota.hasSpace()) {
273        // When deleting namespace space quota, also delete table usage(u:p) snapshots
274        deleteTableUsageSnapshotsForNamespace(connection, ns);
275      }
276    }
277    doDelete(connection, delete);
278  }
279
280  public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection,
281      final List<Get> gets, Map<TableName, Double> tableMachineQuotaFactors, double factor)
282      throws IOException {
283    long nowTs = EnvironmentEdgeManager.currentTime();
284    Result[] results = doGet(connection, gets);
285
286    Map<String, UserQuotaState> userQuotas = new HashMap<>(results.length);
287    for (int i = 0; i < results.length; ++i) {
288      byte[] key = gets.get(i).getRow();
289      assert isUserRowKey(key);
290      String user = getUserFromRowKey(key);
291
292      final UserQuotaState quotaInfo = new UserQuotaState(nowTs);
293      userQuotas.put(user, quotaInfo);
294
295      if (results[i].isEmpty()) continue;
296      assert Bytes.equals(key, results[i].getRow());
297
298      try {
299        parseUserResult(user, results[i], new UserQuotasVisitor() {
300          @Override
301          public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
302            quotas = updateClusterQuotaToMachineQuota(quotas, factor);
303            quotaInfo.setQuotas(namespace, quotas);
304          }
305
306          @Override
307          public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
308            quotas = updateClusterQuotaToMachineQuota(quotas,
309              tableMachineQuotaFactors.containsKey(table) ? tableMachineQuotaFactors.get(table)
310                  : 1);
311            quotaInfo.setQuotas(table, quotas);
312          }
313
314          @Override
315          public void visitUserQuotas(String userName, Quotas quotas) {
316            quotas = updateClusterQuotaToMachineQuota(quotas, factor);
317            quotaInfo.setQuotas(quotas);
318          }
319        });
320      } catch (IOException e) {
321        LOG.error("Unable to parse user '" + user + "' quotas", e);
322        userQuotas.remove(user);
323      }
324    }
325    return userQuotas;
326  }
327
328  public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection,
329      final List<Get> gets, Map<TableName, Double> tableMachineFactors) throws IOException {
330    return fetchGlobalQuotas("table", connection, gets, new KeyFromRow<TableName>() {
331      @Override
332      public TableName getKeyFromRow(final byte[] row) {
333        assert isTableRowKey(row);
334        return getTableFromRowKey(row);
335      }
336
337      @Override
338      public double getFactor(TableName tableName) {
339        return tableMachineFactors.containsKey(tableName) ? tableMachineFactors.get(tableName) : 1;
340      }
341    });
342  }
343
344  public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection,
345      final List<Get> gets, double factor) throws IOException {
346    return fetchGlobalQuotas("namespace", connection, gets, new KeyFromRow<String>() {
347      @Override
348      public String getKeyFromRow(final byte[] row) {
349        assert isNamespaceRowKey(row);
350        return getNamespaceFromRowKey(row);
351      }
352
353      @Override
354      public double getFactor(String s) {
355        return factor;
356      }
357    });
358  }
359
360  public static Map<String, QuotaState> fetchRegionServerQuotas(final Connection connection,
361      final List<Get> gets) throws IOException {
362    return fetchGlobalQuotas("regionServer", connection, gets, new KeyFromRow<String>() {
363      @Override
364      public String getKeyFromRow(final byte[] row) {
365        assert isRegionServerRowKey(row);
366        return getRegionServerFromRowKey(row);
367      }
368
369      @Override
370      public double getFactor(String s) {
371        return 1;
372      }
373    });
374  }
375
376  public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type,
377      final Connection connection, final List<Get> gets, final KeyFromRow<K> kfr)
378  throws IOException {
379    long nowTs = EnvironmentEdgeManager.currentTime();
380    Result[] results = doGet(connection, gets);
381
382    Map<K, QuotaState> globalQuotas = new HashMap<>(results.length);
383    for (int i = 0; i < results.length; ++i) {
384      byte[] row = gets.get(i).getRow();
385      K key = kfr.getKeyFromRow(row);
386
387      QuotaState quotaInfo = new QuotaState(nowTs);
388      globalQuotas.put(key, quotaInfo);
389
390      if (results[i].isEmpty()) continue;
391      assert Bytes.equals(row, results[i].getRow());
392
393      byte[] data = results[i].getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
394      if (data == null) continue;
395
396      try {
397        Quotas quotas = quotasFromData(data);
398        quotas = updateClusterQuotaToMachineQuota(quotas,
399          kfr.getFactor(key));
400        quotaInfo.setQuotas(quotas);
401      } catch (IOException e) {
402        LOG.error("Unable to parse " + type + " '" + key + "' quotas", e);
403        globalQuotas.remove(key);
404      }
405    }
406    return globalQuotas;
407  }
408
409  /**
410   * Convert cluster scope quota to machine scope quota
411   * @param quotas the original quota
412   * @param factor factor used to divide cluster limiter to machine limiter
413   * @return the converted quota whose quota limiters all in machine scope
414   */
415  private static Quotas updateClusterQuotaToMachineQuota(Quotas quotas, double factor) {
416    Quotas.Builder newQuotas = Quotas.newBuilder(quotas);
417    if (newQuotas.hasThrottle()) {
418      Throttle.Builder throttle = Throttle.newBuilder(newQuotas.getThrottle());
419      if (throttle.hasReqNum()) {
420        throttle.setReqNum(updateTimedQuota(throttle.getReqNum(), factor));
421      }
422      if (throttle.hasReqSize()) {
423        throttle.setReqSize(updateTimedQuota(throttle.getReqSize(), factor));
424      }
425      if (throttle.hasReadNum()) {
426        throttle.setReadNum(updateTimedQuota(throttle.getReadNum(), factor));
427      }
428      if (throttle.hasReadSize()) {
429        throttle.setReadSize(updateTimedQuota(throttle.getReadSize(), factor));
430      }
431      if (throttle.hasWriteNum()) {
432        throttle.setWriteNum(updateTimedQuota(throttle.getWriteNum(), factor));
433      }
434      if (throttle.hasWriteSize()) {
435        throttle.setWriteSize(updateTimedQuota(throttle.getWriteSize(), factor));
436      }
437      if (throttle.hasReqCapacityUnit()) {
438        throttle.setReqCapacityUnit(updateTimedQuota(throttle.getReqCapacityUnit(), factor));
439      }
440      if (throttle.hasReadCapacityUnit()) {
441        throttle.setReadCapacityUnit(updateTimedQuota(throttle.getReadCapacityUnit(), factor));
442      }
443      if (throttle.hasWriteCapacityUnit()) {
444        throttle.setWriteCapacityUnit(updateTimedQuota(throttle.getWriteCapacityUnit(), factor));
445      }
446      newQuotas.setThrottle(throttle.build());
447    }
448    return newQuotas.build();
449  }
450
451  private static TimedQuota updateTimedQuota(TimedQuota timedQuota, double factor) {
452    if (timedQuota.getScope() == QuotaScope.CLUSTER) {
453      TimedQuota.Builder newTimedQuota = TimedQuota.newBuilder(timedQuota);
454      newTimedQuota.setSoftLimit(Math.max(1, (long) (timedQuota.getSoftLimit() * factor)))
455          .setScope(QuotaScope.MACHINE);
456      return newTimedQuota.build();
457    } else {
458      return timedQuota;
459    }
460  }
461
462  private static interface KeyFromRow<T> {
463    T getKeyFromRow(final byte[] row);
464    double getFactor(T t);
465  }
466
467  /* =========================================================================
468   *  HTable helpers
469   */
470  private static void doPut(final Connection connection, final Put put)
471  throws IOException {
472    try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
473      table.put(put);
474    }
475  }
476
477  private static void doDelete(final Connection connection, final Delete delete)
478  throws IOException {
479    try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
480      table.delete(delete);
481    }
482  }
483
484  /* =========================================================================
485   *  Data Size Helpers
486   */
487  public static long calculateMutationSize(final Mutation mutation) {
488    long size = 0;
489    for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) {
490      for (Cell cell : entry.getValue()) {
491        size += cell.getSerializedSize();
492      }
493    }
494    return size;
495  }
496
497  public static long calculateResultSize(final Result result) {
498    long size = 0;
499    for (Cell cell : result.rawCells()) {
500      size += cell.getSerializedSize();
501    }
502    return size;
503  }
504
505  public static long calculateResultSize(final List<Result> results) {
506    long size = 0;
507    for (Result result: results) {
508      for (Cell cell : result.rawCells()) {
509        size += cell.getSerializedSize();
510      }
511    }
512    return size;
513  }
514
515  /**
516   * Method to enable a table, if not already enabled. This method suppresses
517   * {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling
518   * the table.
519   * @param conn connection to re-use
520   * @param tableName name of the table to be enabled
521   */
522  public static void enableTableIfNotEnabled(Connection conn, TableName tableName)
523      throws IOException {
524    try {
525      conn.getAdmin().enableTable(tableName);
526    } catch (TableNotDisabledException | TableNotFoundException e) {
527      // ignore
528    }
529  }
530
531  /**
532   * Method to disable a table, if not already disabled. This method suppresses
533   * {@link TableNotEnabledException}, if thrown while disabling the table.
534   * @param conn connection to re-use
535   * @param tableName table name which has moved into space quota violation
536   */
537  public static void disableTableIfNotDisabled(Connection conn, TableName tableName)
538      throws IOException {
539    try {
540      conn.getAdmin().disableTable(tableName);
541    } catch (TableNotEnabledException | TableNotFoundException e) {
542      // ignore
543    }
544  }
545}