001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.DoNotRetryIOException;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.TableNotDisabledException;
032import org.apache.hadoop.hbase.TableNotEnabledException;
033import org.apache.hadoop.hbase.TableNotFoundException;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.Delete;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Mutation;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.ResultScanner;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.client.TableDescriptor;
045import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
046import org.apache.hadoop.hbase.regionserver.BloomType;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.Pair;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.apache.yetus.audience.InterfaceStability;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
062
063/**
064 * Helper class to interact with the quota table
065 */
066@InterfaceAudience.Private
067@InterfaceStability.Evolving
068public class QuotaUtil extends QuotaTableUtil {
069  private static final Logger LOG = LoggerFactory.getLogger(QuotaUtil.class);
070
071  public static final String QUOTA_CONF_KEY = "hbase.quota.enabled";
072  private static final boolean QUOTA_ENABLED_DEFAULT = false;
073
074  public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit";
075  // the default one read capacity unit is 1024 bytes (1KB)
076  public static final long DEFAULT_READ_CAPACITY_UNIT = 1024;
077  public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit";
078  // the default one write capacity unit is 1024 bytes (1KB)
079  public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024;
080
081  /*
082   * The below defaults, if configured, will be applied to otherwise unthrottled users. For example,
083   * set `hbase.quota.default.user.machine.read.size` to `1048576` in your hbase-site.xml to ensure
084   * that any given user may not query more than 1mb per second from any given machine, unless
085   * explicitly permitted by a persisted quota. All of these defaults use TimeUnit.SECONDS and
086   * QuotaScope.MACHINE.
087   */
088  public static final String QUOTA_DEFAULT_USER_MACHINE_READ_NUM =
089    "hbase.quota.default.user.machine.read.num";
090  public static final String QUOTA_DEFAULT_USER_MACHINE_READ_SIZE =
091    "hbase.quota.default.user.machine.read.size";
092  public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM =
093    "hbase.quota.default.user.machine.request.num";
094  public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE =
095    "hbase.quota.default.user.machine.request.size";
096  public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM =
097    "hbase.quota.default.user.machine.write.num";
098  public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE =
099    "hbase.quota.default.user.machine.write.size";
100  public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE =
101    "hbase.quota.default.user.machine.atomic.read.size";
102  public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM =
103    "hbase.quota.default.user.machine.atomic.request.num";
104  public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE =
105    "hbase.quota.default.user.machine.atomic.write.size";
106  public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_HANDLER_USAGE_MS =
107    "hbase.quota.default.user.machine.request.handler.usage.ms";
108
109  /** Table descriptor for Quota internal table */
110  public static final TableDescriptor QUOTA_TABLE_DESC =
111    TableDescriptorBuilder.newBuilder(QUOTA_TABLE_NAME)
112      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_INFO)
113        .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW)
114        .setMaxVersions(1).build())
115      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_USAGE)
116        .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW)
117        .setMaxVersions(1).build())
118      .build();
119
120  /** Returns true if the support for quota is enabled */
121  public static boolean isQuotaEnabled(final Configuration conf) {
122    return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT);
123  }
124
125  /*
126   * ========================================================================= Quota "settings"
127   * helpers
128   */
129  public static void addTableQuota(final Connection connection, final TableName table,
130    final Quotas data) throws IOException {
131    addQuotas(connection, getTableRowKey(table), data);
132  }
133
134  public static void deleteTableQuota(final Connection connection, final TableName table)
135    throws IOException {
136    deleteQuotas(connection, getTableRowKey(table));
137  }
138
139  public static void addNamespaceQuota(final Connection connection, final String namespace,
140    final Quotas data) throws IOException {
141    addQuotas(connection, getNamespaceRowKey(namespace), data);
142  }
143
144  public static void deleteNamespaceQuota(final Connection connection, final String namespace)
145    throws IOException {
146    deleteQuotas(connection, getNamespaceRowKey(namespace));
147  }
148
149  public static void addUserQuota(final Connection connection, final String user, final Quotas data)
150    throws IOException {
151    addQuotas(connection, getUserRowKey(user), data);
152  }
153
154  public static void addUserQuota(final Connection connection, final String user,
155    final TableName table, final Quotas data) throws IOException {
156    addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table), data);
157  }
158
159  public static void addUserQuota(final Connection connection, final String user,
160    final String namespace, final Quotas data) throws IOException {
161    addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace),
162      data);
163  }
164
165  public static void deleteUserQuota(final Connection connection, final String user)
166    throws IOException {
167    deleteQuotas(connection, getUserRowKey(user));
168  }
169
170  public static void deleteUserQuota(final Connection connection, final String user,
171    final TableName table) throws IOException {
172    deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table));
173  }
174
175  public static void deleteUserQuota(final Connection connection, final String user,
176    final String namespace) throws IOException {
177    deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace));
178  }
179
180  public static void addRegionServerQuota(final Connection connection, final String regionServer,
181    final Quotas data) throws IOException {
182    addQuotas(connection, getRegionServerRowKey(regionServer), data);
183  }
184
185  public static void deleteRegionServerQuota(final Connection connection, final String regionServer)
186    throws IOException {
187    deleteQuotas(connection, getRegionServerRowKey(regionServer));
188  }
189
190  public static OperationQuota.OperationType getQuotaOperationType(ClientProtos.Action action,
191    boolean hasCondition) {
192    if (action.hasMutation()) {
193      return getQuotaOperationType(action.getMutation(), hasCondition);
194    }
195    return OperationQuota.OperationType.GET;
196  }
197
198  public static OperationQuota.OperationType
199    getQuotaOperationType(ClientProtos.MutateRequest mutateRequest) {
200    return getQuotaOperationType(mutateRequest.getMutation(), mutateRequest.hasCondition());
201  }
202
203  private static OperationQuota.OperationType
204    getQuotaOperationType(ClientProtos.MutationProto mutationProto, boolean hasCondition) {
205    ClientProtos.MutationProto.MutationType mutationType = mutationProto.getMutateType();
206    if (
207      hasCondition || mutationType == ClientProtos.MutationProto.MutationType.APPEND
208        || mutationType == ClientProtos.MutationProto.MutationType.INCREMENT
209    ) {
210      return OperationQuota.OperationType.CHECK_AND_MUTATE;
211    }
212    return OperationQuota.OperationType.MUTATE;
213  }
214
215  protected static void switchExceedThrottleQuota(final Connection connection,
216    boolean exceedThrottleQuotaEnabled) throws IOException {
217    if (exceedThrottleQuotaEnabled) {
218      checkRSQuotaToEnableExceedThrottle(
219        getRegionServerQuota(connection, QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY));
220    }
221
222    Put put = new Put(getExceedThrottleQuotaRowKey());
223    put.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS,
224      Bytes.toBytes(exceedThrottleQuotaEnabled));
225    doPut(connection, put);
226  }
227
228  private static void checkRSQuotaToEnableExceedThrottle(Quotas quotas) throws IOException {
229    if (quotas != null && quotas.hasThrottle()) {
230      Throttle throttle = quotas.getThrottle();
231      // If enable exceed throttle quota, make sure that there are at least one read(req/read +
232      // num/size/cu) and one write(req/write + num/size/cu) region server throttle quotas.
233      boolean hasReadQuota = false;
234      boolean hasWriteQuota = false;
235      if (throttle.hasReqNum() || throttle.hasReqSize() || throttle.hasReqCapacityUnit()) {
236        hasReadQuota = true;
237        hasWriteQuota = true;
238      }
239      if (
240        !hasReadQuota
241          && (throttle.hasReadNum() || throttle.hasReadSize() || throttle.hasReadCapacityUnit())
242      ) {
243        hasReadQuota = true;
244      }
245      if (!hasReadQuota) {
246        throw new DoNotRetryIOException(
247          "Please set at least one read region server quota before enable exceed throttle quota");
248      }
249      if (
250        !hasWriteQuota
251          && (throttle.hasWriteNum() || throttle.hasWriteSize() || throttle.hasWriteCapacityUnit())
252      ) {
253        hasWriteQuota = true;
254      }
255      if (!hasWriteQuota) {
256        throw new DoNotRetryIOException("Please set at least one write region server quota "
257          + "before enable exceed throttle quota");
258      }
259      // If enable exceed throttle quota, make sure that region server throttle quotas are in
260      // seconds time unit. Because once previous requests exceed their quota and consume region
261      // server quota, quota in other time units may be refilled in a long time, this may affect
262      // later requests.
263      List<Pair<Boolean, TimedQuota>> list =
264        Arrays.asList(Pair.newPair(throttle.hasReqNum(), throttle.getReqNum()),
265          Pair.newPair(throttle.hasReadNum(), throttle.getReadNum()),
266          Pair.newPair(throttle.hasWriteNum(), throttle.getWriteNum()),
267          Pair.newPair(throttle.hasReqSize(), throttle.getReqSize()),
268          Pair.newPair(throttle.hasReadSize(), throttle.getReadSize()),
269          Pair.newPair(throttle.hasWriteSize(), throttle.getWriteSize()),
270          Pair.newPair(throttle.hasReqCapacityUnit(), throttle.getReqCapacityUnit()),
271          Pair.newPair(throttle.hasReadCapacityUnit(), throttle.getReadCapacityUnit()),
272          Pair.newPair(throttle.hasWriteCapacityUnit(), throttle.getWriteCapacityUnit()));
273      for (Pair<Boolean, TimedQuota> pair : list) {
274        if (pair.getFirst()) {
275          if (pair.getSecond().getTimeUnit() != TimeUnit.SECONDS) {
276            throw new DoNotRetryIOException("All region server quota must be "
277              + "in seconds time unit if enable exceed throttle quota");
278          }
279        }
280      }
281    } else {
282      // If enable exceed throttle quota, make sure that region server quota is already set
283      throw new DoNotRetryIOException(
284        "Please set region server quota before enable exceed throttle quota");
285    }
286  }
287
288  protected static boolean isExceedThrottleQuotaEnabled(final Connection connection)
289    throws IOException {
290    Get get = new Get(getExceedThrottleQuotaRowKey());
291    get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
292    Result result = doGet(connection, get);
293    if (result.isEmpty()) {
294      return false;
295    }
296    return Bytes.toBoolean(result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS));
297  }
298
299  private static void addQuotas(final Connection connection, final byte[] rowKey, final Quotas data)
300    throws IOException {
301    addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data);
302  }
303
304  private static void addQuotas(final Connection connection, final byte[] rowKey,
305    final byte[] qualifier, final Quotas data) throws IOException {
306    Put put = new Put(rowKey);
307    put.addColumn(QUOTA_FAMILY_INFO, qualifier, quotasToData(data));
308    doPut(connection, put);
309  }
310
311  private static void deleteQuotas(final Connection connection, final byte[] rowKey)
312    throws IOException {
313    deleteQuotas(connection, rowKey, null);
314  }
315
316  private static void deleteQuotas(final Connection connection, final byte[] rowKey,
317    final byte[] qualifier) throws IOException {
318    Delete delete = new Delete(rowKey);
319    if (qualifier != null) {
320      delete.addColumns(QUOTA_FAMILY_INFO, qualifier);
321    }
322    if (isNamespaceRowKey(rowKey)) {
323      String ns = getNamespaceFromRowKey(rowKey);
324      Quotas namespaceQuota = getNamespaceQuota(connection, ns);
325      if (namespaceQuota != null && namespaceQuota.hasSpace()) {
326        // When deleting namespace space quota, also delete table usage(u:p) snapshots
327        deleteTableUsageSnapshotsForNamespace(connection, ns);
328      }
329    }
330    doDelete(connection, delete);
331  }
332
333  public static Map<String, UserQuotaState> fetchUserQuotas(final Configuration conf,
334    final Connection connection, Map<TableName, Double> tableMachineQuotaFactors, double factor)
335    throws IOException {
336    Map<String, UserQuotaState> userQuotas = new HashMap<>();
337    try (Table table = connection.getTable(QUOTA_TABLE_NAME)) {
338      Scan scan = new Scan();
339      scan.addFamily(QUOTA_FAMILY_INFO);
340      scan.setStartStopRowForPrefixScan(QUOTA_USER_ROW_KEY_PREFIX);
341      try (ResultScanner resultScanner = table.getScanner(scan)) {
342        for (Result result : resultScanner) {
343          byte[] key = result.getRow();
344          assert isUserRowKey(key);
345          String user = getUserFromRowKey(key);
346
347          final UserQuotaState quotaInfo = new UserQuotaState();
348          userQuotas.put(user, quotaInfo);
349
350          try {
351            parseUserResult(user, result, new UserQuotasVisitor() {
352              @Override
353              public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
354                quotas = updateClusterQuotaToMachineQuota(quotas, factor);
355                quotaInfo.setQuotas(conf, namespace, quotas);
356              }
357
358              @Override
359              public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
360                quotas = updateClusterQuotaToMachineQuota(quotas,
361                  tableMachineQuotaFactors.containsKey(table)
362                    ? tableMachineQuotaFactors.get(table)
363                    : 1);
364                quotaInfo.setQuotas(conf, table, quotas);
365              }
366
367              @Override
368              public void visitUserQuotas(String userName, Quotas quotas) {
369                quotas = updateClusterQuotaToMachineQuota(quotas, factor);
370                quotaInfo.setQuotas(conf, quotas);
371              }
372            });
373          } catch (IOException e) {
374            LOG.error("Unable to parse user '" + user + "' quotas", e);
375            userQuotas.remove(user);
376          }
377        }
378      }
379    }
380
381    return userQuotas;
382  }
383
384  protected static UserQuotaState buildDefaultUserQuotaState(Configuration conf) {
385    QuotaProtos.Throttle.Builder throttleBuilder = QuotaProtos.Throttle.newBuilder();
386
387    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_NUM)
388      .ifPresent(throttleBuilder::setReadNum);
389    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_SIZE)
390      .ifPresent(throttleBuilder::setReadSize);
391    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM)
392      .ifPresent(throttleBuilder::setReqNum);
393    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE)
394      .ifPresent(throttleBuilder::setReqSize);
395    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM)
396      .ifPresent(throttleBuilder::setWriteNum);
397    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE)
398      .ifPresent(throttleBuilder::setWriteSize);
399    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE)
400      .ifPresent(throttleBuilder::setAtomicReadSize);
401    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM)
402      .ifPresent(throttleBuilder::setAtomicReqNum);
403    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE)
404      .ifPresent(throttleBuilder::setAtomicWriteSize);
405    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_HANDLER_USAGE_MS)
406      .ifPresent(throttleBuilder::setReqHandlerUsageMs);
407
408    UserQuotaState state = new UserQuotaState();
409    QuotaProtos.Quotas defaultQuotas =
410      QuotaProtos.Quotas.newBuilder().setThrottle(throttleBuilder.build()).build();
411    state.setQuotas(conf, defaultQuotas);
412    return state;
413  }
414
415  private static Optional<TimedQuota> buildDefaultTimedQuota(Configuration conf, String key) {
416    int defaultSoftLimit = conf.getInt(key, -1);
417    if (defaultSoftLimit == -1) {
418      return Optional.empty();
419    }
420    return Optional.of(ProtobufUtil.toTimedQuota(defaultSoftLimit,
421      java.util.concurrent.TimeUnit.SECONDS, org.apache.hadoop.hbase.quotas.QuotaScope.MACHINE));
422  }
423
424  public static Map<TableName, QuotaState> fetchTableQuotas(final Configuration conf,
425    final Connection connection, Map<TableName, Double> tableMachineFactors) throws IOException {
426    Scan scan = new Scan();
427    scan.addFamily(QUOTA_FAMILY_INFO);
428    scan.setStartStopRowForPrefixScan(QUOTA_TABLE_ROW_KEY_PREFIX);
429    return fetchGlobalQuotas(conf, "table", scan, connection, new KeyFromRow<TableName>() {
430      @Override
431      public TableName getKeyFromRow(final byte[] row) {
432        assert isTableRowKey(row);
433        return getTableFromRowKey(row);
434      }
435
436      @Override
437      public double getFactor(TableName tableName) {
438        return tableMachineFactors.containsKey(tableName) ? tableMachineFactors.get(tableName) : 1;
439      }
440    });
441  }
442
443  public static Map<String, QuotaState> fetchNamespaceQuotas(final Configuration conf,
444    final Connection connection, double factor) throws IOException {
445    Scan scan = new Scan();
446    scan.addFamily(QUOTA_FAMILY_INFO);
447    scan.setStartStopRowForPrefixScan(QUOTA_NAMESPACE_ROW_KEY_PREFIX);
448    return fetchGlobalQuotas(conf, "namespace", scan, connection, new KeyFromRow<String>() {
449      @Override
450      public String getKeyFromRow(final byte[] row) {
451        assert isNamespaceRowKey(row);
452        return getNamespaceFromRowKey(row);
453      }
454
455      @Override
456      public double getFactor(String s) {
457        return factor;
458      }
459    });
460  }
461
462  public static Map<String, QuotaState> fetchRegionServerQuotas(final Configuration conf,
463    final Connection connection) throws IOException {
464    Scan scan = new Scan();
465    scan.addFamily(QUOTA_FAMILY_INFO);
466    scan.setStartStopRowForPrefixScan(QUOTA_REGION_SERVER_ROW_KEY_PREFIX);
467    return fetchGlobalQuotas(conf, "regionServer", scan, connection, new KeyFromRow<String>() {
468      @Override
469      public String getKeyFromRow(final byte[] row) {
470        assert isRegionServerRowKey(row);
471        return getRegionServerFromRowKey(row);
472      }
473
474      @Override
475      public double getFactor(String s) {
476        return 1;
477      }
478    });
479  }
480
481  public static <K> Map<K, QuotaState> fetchGlobalQuotas(final Configuration conf,
482    final String type, final Scan scan, final Connection connection, final KeyFromRow<K> kfr)
483    throws IOException {
484
485    Map<K, QuotaState> globalQuotas = new HashMap<>();
486    try (Table table = connection.getTable(QUOTA_TABLE_NAME)) {
487      try (ResultScanner resultScanner = table.getScanner(scan)) {
488        for (Result result : resultScanner) {
489
490          byte[] row = result.getRow();
491          K key = kfr.getKeyFromRow(row);
492
493          QuotaState quotaInfo = new QuotaState();
494          globalQuotas.put(key, quotaInfo);
495
496          byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
497          if (data == null) {
498            continue;
499          }
500
501          try {
502            Quotas quotas = quotasFromData(data);
503            quotas = updateClusterQuotaToMachineQuota(quotas, kfr.getFactor(key));
504            quotaInfo.setQuotas(conf, quotas);
505          } catch (IOException e) {
506            LOG.error("Unable to parse {} '{}' quotas", type, key, e);
507            globalQuotas.remove(key);
508          }
509        }
510      }
511    }
512    return globalQuotas;
513  }
514
515  /**
516   * Convert cluster scope quota to machine scope quota
517   * @param quotas the original quota
518   * @param factor factor used to divide cluster limiter to machine limiter
519   * @return the converted quota whose quota limiters all in machine scope
520   */
521  private static Quotas updateClusterQuotaToMachineQuota(Quotas quotas, double factor) {
522    Quotas.Builder newQuotas = Quotas.newBuilder(quotas);
523    if (newQuotas.hasThrottle()) {
524      Throttle.Builder throttle = Throttle.newBuilder(newQuotas.getThrottle());
525      if (throttle.hasReqNum()) {
526        throttle.setReqNum(updateTimedQuota(throttle.getReqNum(), factor));
527      }
528      if (throttle.hasReqSize()) {
529        throttle.setReqSize(updateTimedQuota(throttle.getReqSize(), factor));
530      }
531      if (throttle.hasReadNum()) {
532        throttle.setReadNum(updateTimedQuota(throttle.getReadNum(), factor));
533      }
534      if (throttle.hasReadSize()) {
535        throttle.setReadSize(updateTimedQuota(throttle.getReadSize(), factor));
536      }
537      if (throttle.hasWriteNum()) {
538        throttle.setWriteNum(updateTimedQuota(throttle.getWriteNum(), factor));
539      }
540      if (throttle.hasWriteSize()) {
541        throttle.setWriteSize(updateTimedQuota(throttle.getWriteSize(), factor));
542      }
543      if (throttle.hasReqCapacityUnit()) {
544        throttle.setReqCapacityUnit(updateTimedQuota(throttle.getReqCapacityUnit(), factor));
545      }
546      if (throttle.hasReadCapacityUnit()) {
547        throttle.setReadCapacityUnit(updateTimedQuota(throttle.getReadCapacityUnit(), factor));
548      }
549      if (throttle.hasWriteCapacityUnit()) {
550        throttle.setWriteCapacityUnit(updateTimedQuota(throttle.getWriteCapacityUnit(), factor));
551      }
552      newQuotas.setThrottle(throttle.build());
553    }
554    return newQuotas.build();
555  }
556
557  private static TimedQuota updateTimedQuota(TimedQuota timedQuota, double factor) {
558    if (timedQuota.getScope() == QuotaScope.CLUSTER) {
559      TimedQuota.Builder newTimedQuota = TimedQuota.newBuilder(timedQuota);
560      newTimedQuota.setSoftLimit(Math.max(1, (long) (timedQuota.getSoftLimit() * factor)))
561        .setScope(QuotaScope.MACHINE);
562      return newTimedQuota.build();
563    } else {
564      return timedQuota;
565    }
566  }
567
568  private static interface KeyFromRow<T> {
569    T getKeyFromRow(final byte[] row);
570
571    double getFactor(T t);
572  }
573
574  /*
575   * ========================================================================= HTable helpers
576   */
577  private static void doPut(final Connection connection, final Put put) throws IOException {
578    try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
579      table.put(put);
580    }
581  }
582
583  private static void doDelete(final Connection connection, final Delete delete)
584    throws IOException {
585    try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
586      table.delete(delete);
587    }
588  }
589
590  /*
591   * ========================================================================= Data Size Helpers
592   */
593  public static long calculateMutationSize(final Mutation mutation) {
594    long size = 0;
595    for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) {
596      for (Cell cell : entry.getValue()) {
597        size += cell.getSerializedSize();
598      }
599    }
600    return size;
601  }
602
603  public static long calculateResultSize(final Result result) {
604    long size = 0;
605    for (Cell cell : result.rawCells()) {
606      size += cell.getSerializedSize();
607    }
608    return size;
609  }
610
611  public static long calculateResultSize(final List<Result> results) {
612    long size = 0;
613    for (Result result : results) {
614      for (Cell cell : result.rawCells()) {
615        size += cell.getSerializedSize();
616      }
617    }
618    return size;
619  }
620
621  public static long calculateCellsSize(final List<Cell> cells) {
622    long size = 0;
623    for (Cell cell : cells) {
624      size += cell.getSerializedSize();
625    }
626    return size;
627  }
628
629  /**
630   * Method to enable a table, if not already enabled. This method suppresses
631   * {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling
632   * the table.
633   * @param conn      connection to re-use
634   * @param tableName name of the table to be enabled
635   */
636  public static void enableTableIfNotEnabled(Connection conn, TableName tableName)
637    throws IOException {
638    try {
639      conn.getAdmin().enableTable(tableName);
640    } catch (TableNotDisabledException | TableNotFoundException e) {
641      // ignore
642    }
643  }
644
645  /**
646   * Method to disable a table, if not already disabled. This method suppresses
647   * {@link TableNotEnabledException}, if thrown while disabling the table.
648   * @param conn      connection to re-use
649   * @param tableName table name which has moved into space quota violation
650   */
651  public static void disableTableIfNotDisabled(Connection conn, TableName tableName)
652    throws IOException {
653    try {
654      conn.getAdmin().disableTable(tableName);
655    } catch (TableNotEnabledException | TableNotFoundException e) {
656      // ignore
657    }
658  }
659}