001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.DoNotRetryIOException;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.TableNotDisabledException;
032import org.apache.hadoop.hbase.TableNotEnabledException;
033import org.apache.hadoop.hbase.TableNotFoundException;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.Delete;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Mutation;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.ResultScanner;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.client.TableDescriptor;
045import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
046import org.apache.hadoop.hbase.regionserver.BloomType;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.Pair;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.apache.yetus.audience.InterfaceStability;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
062
063/**
064 * Helper class to interact with the quota table
065 */
066@InterfaceAudience.Private
067@InterfaceStability.Evolving
068public class QuotaUtil extends QuotaTableUtil {
069  private static final Logger LOG = LoggerFactory.getLogger(QuotaUtil.class);
070
071  public static final String QUOTA_CONF_KEY = "hbase.quota.enabled";
072  private static final boolean QUOTA_ENABLED_DEFAULT = false;
073
074  public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit";
075  // the default one read capacity unit is 1024 bytes (1KB)
076  public static final long DEFAULT_READ_CAPACITY_UNIT = 1024;
077  public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit";
078  // the default one write capacity unit is 1024 bytes (1KB)
079  public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024;
080
081  /*
082   * The below defaults, if configured, will be applied to otherwise unthrottled users. For example,
083   * set `hbase.quota.default.user.machine.read.size` to `1048576` in your hbase-site.xml to ensure
084   * that any given user may not query more than 1mb per second from any given machine, unless
085   * explicitly permitted by a persisted quota. All of these defaults use TimeUnit.SECONDS and
086   * QuotaScope.MACHINE.
087   */
088  public static final String QUOTA_DEFAULT_USER_MACHINE_READ_NUM =
089    "hbase.quota.default.user.machine.read.num";
090  public static final String QUOTA_DEFAULT_USER_MACHINE_READ_SIZE =
091    "hbase.quota.default.user.machine.read.size";
092  public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM =
093    "hbase.quota.default.user.machine.request.num";
094  public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE =
095    "hbase.quota.default.user.machine.request.size";
096  public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM =
097    "hbase.quota.default.user.machine.write.num";
098  public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE =
099    "hbase.quota.default.user.machine.write.size";
100  public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE =
101    "hbase.quota.default.user.machine.atomic.read.size";
102  public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM =
103    "hbase.quota.default.user.machine.atomic.request.num";
104  public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE =
105    "hbase.quota.default.user.machine.atomic.write.size";
106  public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_HANDLER_USAGE_MS =
107    "hbase.quota.default.user.machine.request.handler.usage.ms";
108
109  /** Table descriptor for Quota internal table */
110  public static final TableDescriptor QUOTA_TABLE_DESC =
111    TableDescriptorBuilder.newBuilder(QUOTA_TABLE_NAME)
112      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_INFO)
113        .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW)
114        .setMaxVersions(1).build())
115      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_USAGE)
116        .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW)
117        .setMaxVersions(1).build())
118      .build();
119
120  /** Returns true if the support for quota is enabled */
121  public static boolean isQuotaEnabled(final Configuration conf) {
122    return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT);
123  }
124
125  /*
126   * ========================================================================= Quota "settings"
127   * helpers
128   */
129  public static void addTableQuota(final Connection connection, final TableName table,
130    final Quotas data) throws IOException {
131    addQuotas(connection, getTableRowKey(table), data);
132  }
133
134  public static void deleteTableQuota(final Connection connection, final TableName table)
135    throws IOException {
136    deleteQuotas(connection, getTableRowKey(table));
137  }
138
139  public static void addNamespaceQuota(final Connection connection, final String namespace,
140    final Quotas data) throws IOException {
141    addQuotas(connection, getNamespaceRowKey(namespace), data);
142  }
143
144  public static void deleteNamespaceQuota(final Connection connection, final String namespace)
145    throws IOException {
146    deleteQuotas(connection, getNamespaceRowKey(namespace));
147  }
148
149  public static void addUserQuota(final Connection connection, final String user, final Quotas data)
150    throws IOException {
151    addQuotas(connection, getUserRowKey(user), data);
152  }
153
154  public static void addUserQuota(final Connection connection, final String user,
155    final TableName table, final Quotas data) throws IOException {
156    addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table), data);
157  }
158
159  public static void addUserQuota(final Connection connection, final String user,
160    final String namespace, final Quotas data) throws IOException {
161    addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace),
162      data);
163  }
164
165  public static void deleteUserQuota(final Connection connection, final String user)
166    throws IOException {
167    deleteQuotas(connection, getUserRowKey(user));
168  }
169
170  public static void deleteUserQuota(final Connection connection, final String user,
171    final TableName table) throws IOException {
172    deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table));
173  }
174
175  public static void deleteUserQuota(final Connection connection, final String user,
176    final String namespace) throws IOException {
177    deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace));
178  }
179
180  public static void addRegionServerQuota(final Connection connection, final String regionServer,
181    final Quotas data) throws IOException {
182    addQuotas(connection, getRegionServerRowKey(regionServer), data);
183  }
184
185  public static void deleteRegionServerQuota(final Connection connection, final String regionServer)
186    throws IOException {
187    deleteQuotas(connection, getRegionServerRowKey(regionServer));
188  }
189
190  public static OperationQuota.OperationType getQuotaOperationType(ClientProtos.Action action,
191    boolean hasCondition) {
192    if (action.hasMutation()) {
193      return getQuotaOperationType(action.getMutation(), hasCondition);
194    }
195    return OperationQuota.OperationType.GET;
196  }
197
198  public static OperationQuota.OperationType
199    getQuotaOperationType(ClientProtos.MutateRequest mutateRequest) {
200    return getQuotaOperationType(mutateRequest.getMutation(), mutateRequest.hasCondition());
201  }
202
203  private static OperationQuota.OperationType
204    getQuotaOperationType(ClientProtos.MutationProto mutationProto, boolean hasCondition) {
205    ClientProtos.MutationProto.MutationType mutationType = mutationProto.getMutateType();
206    if (
207      hasCondition || mutationType == ClientProtos.MutationProto.MutationType.APPEND
208        || mutationType == ClientProtos.MutationProto.MutationType.INCREMENT
209    ) {
210      return OperationQuota.OperationType.CHECK_AND_MUTATE;
211    }
212    return OperationQuota.OperationType.MUTATE;
213  }
214
215  protected static void switchExceedThrottleQuota(final Connection connection,
216    boolean exceedThrottleQuotaEnabled) throws IOException {
217    if (exceedThrottleQuotaEnabled) {
218      checkRSQuotaToEnableExceedThrottle(
219        getRegionServerQuota(connection, QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY));
220    }
221
222    Put put = new Put(getExceedThrottleQuotaRowKey());
223    put.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS,
224      Bytes.toBytes(exceedThrottleQuotaEnabled));
225    doPut(connection, put);
226  }
227
228  private static void checkRSQuotaToEnableExceedThrottle(Quotas quotas) throws IOException {
229    if (quotas != null && quotas.hasThrottle()) {
230      Throttle throttle = quotas.getThrottle();
231      // If enable exceed throttle quota, make sure that there are at least one read(req/read +
232      // num/size/cu) and one write(req/write + num/size/cu) region server throttle quotas.
233      boolean hasReadQuota = false;
234      boolean hasWriteQuota = false;
235      if (throttle.hasReqNum() || throttle.hasReqSize() || throttle.hasReqCapacityUnit()) {
236        hasReadQuota = true;
237        hasWriteQuota = true;
238      }
239      if (
240        !hasReadQuota
241          && (throttle.hasReadNum() || throttle.hasReadSize() || throttle.hasReadCapacityUnit())
242      ) {
243        hasReadQuota = true;
244      }
245      if (!hasReadQuota) {
246        throw new DoNotRetryIOException(
247          "Please set at least one read region server quota before enable exceed throttle quota");
248      }
249      if (
250        !hasWriteQuota
251          && (throttle.hasWriteNum() || throttle.hasWriteSize() || throttle.hasWriteCapacityUnit())
252      ) {
253        hasWriteQuota = true;
254      }
255      if (!hasWriteQuota) {
256        throw new DoNotRetryIOException("Please set at least one write region server quota "
257          + "before enable exceed throttle quota");
258      }
259      // If enable exceed throttle quota, make sure that region server throttle quotas are in
260      // seconds time unit. Because once previous requests exceed their quota and consume region
261      // server quota, quota in other time units may be refilled in a long time, this may affect
262      // later requests.
263      List<Pair<Boolean, TimedQuota>> list =
264        Arrays.asList(Pair.newPair(throttle.hasReqNum(), throttle.getReqNum()),
265          Pair.newPair(throttle.hasReadNum(), throttle.getReadNum()),
266          Pair.newPair(throttle.hasWriteNum(), throttle.getWriteNum()),
267          Pair.newPair(throttle.hasReqSize(), throttle.getReqSize()),
268          Pair.newPair(throttle.hasReadSize(), throttle.getReadSize()),
269          Pair.newPair(throttle.hasWriteSize(), throttle.getWriteSize()),
270          Pair.newPair(throttle.hasReqCapacityUnit(), throttle.getReqCapacityUnit()),
271          Pair.newPair(throttle.hasReadCapacityUnit(), throttle.getReadCapacityUnit()),
272          Pair.newPair(throttle.hasWriteCapacityUnit(), throttle.getWriteCapacityUnit()));
273      for (Pair<Boolean, TimedQuota> pair : list) {
274        if (pair.getFirst()) {
275          if (pair.getSecond().getTimeUnit() != TimeUnit.SECONDS) {
276            throw new DoNotRetryIOException("All region server quota must be "
277              + "in seconds time unit if enable exceed throttle quota");
278          }
279        }
280      }
281    } else {
282      // If enable exceed throttle quota, make sure that region server quota is already set
283      throw new DoNotRetryIOException(
284        "Please set region server quota before enable exceed throttle quota");
285    }
286  }
287
288  protected static boolean isExceedThrottleQuotaEnabled(final Connection connection)
289    throws IOException {
290    Get get = new Get(getExceedThrottleQuotaRowKey());
291    get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
292    Result result = doGet(connection, get);
293    if (result.isEmpty()) {
294      return false;
295    }
296    return Bytes.toBoolean(result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS));
297  }
298
299  private static void addQuotas(final Connection connection, final byte[] rowKey, final Quotas data)
300    throws IOException {
301    addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data);
302  }
303
304  private static void addQuotas(final Connection connection, final byte[] rowKey,
305    final byte[] qualifier, final Quotas data) throws IOException {
306    Put put = new Put(rowKey);
307    put.addColumn(QUOTA_FAMILY_INFO, qualifier, quotasToData(data));
308    doPut(connection, put);
309  }
310
311  private static void deleteQuotas(final Connection connection, final byte[] rowKey)
312    throws IOException {
313    deleteQuotas(connection, rowKey, null);
314  }
315
316  private static void deleteQuotas(final Connection connection, final byte[] rowKey,
317    final byte[] qualifier) throws IOException {
318    Delete delete = new Delete(rowKey);
319    if (qualifier != null) {
320      delete.addColumns(QUOTA_FAMILY_INFO, qualifier);
321    }
322    if (isNamespaceRowKey(rowKey)) {
323      String ns = getNamespaceFromRowKey(rowKey);
324      Quotas namespaceQuota = getNamespaceQuota(connection, ns);
325      if (namespaceQuota != null && namespaceQuota.hasSpace()) {
326        // When deleting namespace space quota, also delete table usage(u:p) snapshots
327        deleteTableUsageSnapshotsForNamespace(connection, ns);
328      }
329    }
330    doDelete(connection, delete);
331  }
332
333  public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection,
334    Map<TableName, Double> tableMachineQuotaFactors, double factor) throws IOException {
335    Map<String, UserQuotaState> userQuotas = new HashMap<>();
336    try (Table table = connection.getTable(QUOTA_TABLE_NAME)) {
337      Scan scan = new Scan();
338      scan.addFamily(QUOTA_FAMILY_INFO);
339      scan.setStartStopRowForPrefixScan(QUOTA_USER_ROW_KEY_PREFIX);
340      try (ResultScanner resultScanner = table.getScanner(scan)) {
341        for (Result result : resultScanner) {
342          byte[] key = result.getRow();
343          assert isUserRowKey(key);
344          String user = getUserFromRowKey(key);
345
346          final UserQuotaState quotaInfo = new UserQuotaState();
347          userQuotas.put(user, quotaInfo);
348
349          try {
350            parseUserResult(user, result, new UserQuotasVisitor() {
351              @Override
352              public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
353                quotas = updateClusterQuotaToMachineQuota(quotas, factor);
354                quotaInfo.setQuotas(namespace, quotas);
355              }
356
357              @Override
358              public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
359                quotas = updateClusterQuotaToMachineQuota(quotas,
360                  tableMachineQuotaFactors.containsKey(table)
361                    ? tableMachineQuotaFactors.get(table)
362                    : 1);
363                quotaInfo.setQuotas(table, quotas);
364              }
365
366              @Override
367              public void visitUserQuotas(String userName, Quotas quotas) {
368                quotas = updateClusterQuotaToMachineQuota(quotas, factor);
369                quotaInfo.setQuotas(quotas);
370              }
371            });
372          } catch (IOException e) {
373            LOG.error("Unable to parse user '" + user + "' quotas", e);
374            userQuotas.remove(user);
375          }
376        }
377      }
378    }
379
380    return userQuotas;
381  }
382
383  protected static UserQuotaState buildDefaultUserQuotaState(Configuration conf) {
384    QuotaProtos.Throttle.Builder throttleBuilder = QuotaProtos.Throttle.newBuilder();
385
386    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_NUM)
387      .ifPresent(throttleBuilder::setReadNum);
388    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_SIZE)
389      .ifPresent(throttleBuilder::setReadSize);
390    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM)
391      .ifPresent(throttleBuilder::setReqNum);
392    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE)
393      .ifPresent(throttleBuilder::setReqSize);
394    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM)
395      .ifPresent(throttleBuilder::setWriteNum);
396    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE)
397      .ifPresent(throttleBuilder::setWriteSize);
398    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE)
399      .ifPresent(throttleBuilder::setAtomicReadSize);
400    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM)
401      .ifPresent(throttleBuilder::setAtomicReqNum);
402    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE)
403      .ifPresent(throttleBuilder::setAtomicWriteSize);
404    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_HANDLER_USAGE_MS)
405      .ifPresent(throttleBuilder::setReqHandlerUsageMs);
406
407    UserQuotaState state = new UserQuotaState();
408    QuotaProtos.Quotas defaultQuotas =
409      QuotaProtos.Quotas.newBuilder().setThrottle(throttleBuilder.build()).build();
410    state.setQuotas(defaultQuotas);
411    return state;
412  }
413
414  private static Optional<TimedQuota> buildDefaultTimedQuota(Configuration conf, String key) {
415    int defaultSoftLimit = conf.getInt(key, -1);
416    if (defaultSoftLimit == -1) {
417      return Optional.empty();
418    }
419    return Optional.of(ProtobufUtil.toTimedQuota(defaultSoftLimit,
420      java.util.concurrent.TimeUnit.SECONDS, org.apache.hadoop.hbase.quotas.QuotaScope.MACHINE));
421  }
422
423  public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection,
424    Map<TableName, Double> tableMachineFactors) throws IOException {
425    Scan scan = new Scan();
426    scan.addFamily(QUOTA_FAMILY_INFO);
427    scan.setStartStopRowForPrefixScan(QUOTA_TABLE_ROW_KEY_PREFIX);
428    return fetchGlobalQuotas("table", scan, connection, new KeyFromRow<TableName>() {
429      @Override
430      public TableName getKeyFromRow(final byte[] row) {
431        assert isTableRowKey(row);
432        return getTableFromRowKey(row);
433      }
434
435      @Override
436      public double getFactor(TableName tableName) {
437        return tableMachineFactors.containsKey(tableName) ? tableMachineFactors.get(tableName) : 1;
438      }
439    });
440  }
441
442  public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection,
443    double factor) throws IOException {
444    Scan scan = new Scan();
445    scan.addFamily(QUOTA_FAMILY_INFO);
446    scan.setStartStopRowForPrefixScan(QUOTA_NAMESPACE_ROW_KEY_PREFIX);
447    return fetchGlobalQuotas("namespace", scan, connection, new KeyFromRow<String>() {
448      @Override
449      public String getKeyFromRow(final byte[] row) {
450        assert isNamespaceRowKey(row);
451        return getNamespaceFromRowKey(row);
452      }
453
454      @Override
455      public double getFactor(String s) {
456        return factor;
457      }
458    });
459  }
460
461  public static Map<String, QuotaState> fetchRegionServerQuotas(final Connection connection)
462    throws IOException {
463    Scan scan = new Scan();
464    scan.addFamily(QUOTA_FAMILY_INFO);
465    scan.setStartStopRowForPrefixScan(QUOTA_REGION_SERVER_ROW_KEY_PREFIX);
466    return fetchGlobalQuotas("regionServer", scan, connection, new KeyFromRow<String>() {
467      @Override
468      public String getKeyFromRow(final byte[] row) {
469        assert isRegionServerRowKey(row);
470        return getRegionServerFromRowKey(row);
471      }
472
473      @Override
474      public double getFactor(String s) {
475        return 1;
476      }
477    });
478  }
479
480  public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type, final Scan scan,
481    final Connection connection, final KeyFromRow<K> kfr) throws IOException {
482
483    Map<K, QuotaState> globalQuotas = new HashMap<>();
484    try (Table table = connection.getTable(QUOTA_TABLE_NAME)) {
485      try (ResultScanner resultScanner = table.getScanner(scan)) {
486        for (Result result : resultScanner) {
487
488          byte[] row = result.getRow();
489          K key = kfr.getKeyFromRow(row);
490
491          QuotaState quotaInfo = new QuotaState();
492          globalQuotas.put(key, quotaInfo);
493
494          byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
495          if (data == null) {
496            continue;
497          }
498
499          try {
500            Quotas quotas = quotasFromData(data);
501            quotas = updateClusterQuotaToMachineQuota(quotas, kfr.getFactor(key));
502            quotaInfo.setQuotas(quotas);
503          } catch (IOException e) {
504            LOG.error("Unable to parse {} '{}' quotas", type, key, e);
505            globalQuotas.remove(key);
506          }
507        }
508      }
509    }
510    return globalQuotas;
511  }
512
513  /**
514   * Convert cluster scope quota to machine scope quota
515   * @param quotas the original quota
516   * @param factor factor used to divide cluster limiter to machine limiter
517   * @return the converted quota whose quota limiters all in machine scope
518   */
519  private static Quotas updateClusterQuotaToMachineQuota(Quotas quotas, double factor) {
520    Quotas.Builder newQuotas = Quotas.newBuilder(quotas);
521    if (newQuotas.hasThrottle()) {
522      Throttle.Builder throttle = Throttle.newBuilder(newQuotas.getThrottle());
523      if (throttle.hasReqNum()) {
524        throttle.setReqNum(updateTimedQuota(throttle.getReqNum(), factor));
525      }
526      if (throttle.hasReqSize()) {
527        throttle.setReqSize(updateTimedQuota(throttle.getReqSize(), factor));
528      }
529      if (throttle.hasReadNum()) {
530        throttle.setReadNum(updateTimedQuota(throttle.getReadNum(), factor));
531      }
532      if (throttle.hasReadSize()) {
533        throttle.setReadSize(updateTimedQuota(throttle.getReadSize(), factor));
534      }
535      if (throttle.hasWriteNum()) {
536        throttle.setWriteNum(updateTimedQuota(throttle.getWriteNum(), factor));
537      }
538      if (throttle.hasWriteSize()) {
539        throttle.setWriteSize(updateTimedQuota(throttle.getWriteSize(), factor));
540      }
541      if (throttle.hasReqCapacityUnit()) {
542        throttle.setReqCapacityUnit(updateTimedQuota(throttle.getReqCapacityUnit(), factor));
543      }
544      if (throttle.hasReadCapacityUnit()) {
545        throttle.setReadCapacityUnit(updateTimedQuota(throttle.getReadCapacityUnit(), factor));
546      }
547      if (throttle.hasWriteCapacityUnit()) {
548        throttle.setWriteCapacityUnit(updateTimedQuota(throttle.getWriteCapacityUnit(), factor));
549      }
550      newQuotas.setThrottle(throttle.build());
551    }
552    return newQuotas.build();
553  }
554
555  private static TimedQuota updateTimedQuota(TimedQuota timedQuota, double factor) {
556    if (timedQuota.getScope() == QuotaScope.CLUSTER) {
557      TimedQuota.Builder newTimedQuota = TimedQuota.newBuilder(timedQuota);
558      newTimedQuota.setSoftLimit(Math.max(1, (long) (timedQuota.getSoftLimit() * factor)))
559        .setScope(QuotaScope.MACHINE);
560      return newTimedQuota.build();
561    } else {
562      return timedQuota;
563    }
564  }
565
566  private static interface KeyFromRow<T> {
567    T getKeyFromRow(final byte[] row);
568
569    double getFactor(T t);
570  }
571
572  /*
573   * ========================================================================= HTable helpers
574   */
575  private static void doPut(final Connection connection, final Put put) throws IOException {
576    try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
577      table.put(put);
578    }
579  }
580
581  private static void doDelete(final Connection connection, final Delete delete)
582    throws IOException {
583    try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
584      table.delete(delete);
585    }
586  }
587
588  /*
589   * ========================================================================= Data Size Helpers
590   */
591  public static long calculateMutationSize(final Mutation mutation) {
592    long size = 0;
593    for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) {
594      for (Cell cell : entry.getValue()) {
595        size += cell.getSerializedSize();
596      }
597    }
598    return size;
599  }
600
601  public static long calculateResultSize(final Result result) {
602    long size = 0;
603    for (Cell cell : result.rawCells()) {
604      size += cell.getSerializedSize();
605    }
606    return size;
607  }
608
609  public static long calculateResultSize(final List<Result> results) {
610    long size = 0;
611    for (Result result : results) {
612      for (Cell cell : result.rawCells()) {
613        size += cell.getSerializedSize();
614      }
615    }
616    return size;
617  }
618
619  public static long calculateCellsSize(final List<Cell> cells) {
620    long size = 0;
621    for (Cell cell : cells) {
622      size += cell.getSerializedSize();
623    }
624    return size;
625  }
626
627  /**
628   * Method to enable a table, if not already enabled. This method suppresses
629   * {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling
630   * the table.
631   * @param conn      connection to re-use
632   * @param tableName name of the table to be enabled
633   */
634  public static void enableTableIfNotEnabled(Connection conn, TableName tableName)
635    throws IOException {
636    try {
637      conn.getAdmin().enableTable(tableName);
638    } catch (TableNotDisabledException | TableNotFoundException e) {
639      // ignore
640    }
641  }
642
643  /**
644   * Method to disable a table, if not already disabled. This method suppresses
645   * {@link TableNotEnabledException}, if thrown while disabling the table.
646   * @param conn      connection to re-use
647   * @param tableName table name which has moved into space quota violation
648   */
649  public static void disableTableIfNotDisabled(Connection conn, TableName tableName)
650    throws IOException {
651    try {
652      conn.getAdmin().disableTable(tableName);
653    } catch (TableNotEnabledException | TableNotFoundException e) {
654      // ignore
655    }
656  }
657}