001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.Optional; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.DoNotRetryIOException; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.TableNotDisabledException; 032import org.apache.hadoop.hbase.TableNotEnabledException; 033import org.apache.hadoop.hbase.TableNotFoundException; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.Delete; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.Mutation; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.client.TableDescriptor; 045import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 046import org.apache.hadoop.hbase.regionserver.BloomType; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.Pair; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.apache.yetus.audience.InterfaceStability; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota; 062 063/** 064 * Helper class to interact with the quota table 065 */ 066@InterfaceAudience.Private 067@InterfaceStability.Evolving 068public class QuotaUtil extends QuotaTableUtil { 069 private static final Logger LOG = LoggerFactory.getLogger(QuotaUtil.class); 070 071 public static final String QUOTA_CONF_KEY = "hbase.quota.enabled"; 072 private static final boolean QUOTA_ENABLED_DEFAULT = false; 073 074 public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit"; 075 // the default one read capacity unit is 1024 bytes (1KB) 076 public static final long DEFAULT_READ_CAPACITY_UNIT = 1024; 077 public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit"; 078 // the default one write capacity unit is 1024 bytes (1KB) 079 public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024; 080 081 /* 082 * The below defaults, if configured, will be applied to otherwise unthrottled users. For example, 083 * set `hbase.quota.default.user.machine.read.size` to `1048576` in your hbase-site.xml to ensure 084 * that any given user may not query more than 1mb per second from any given machine, unless 085 * explicitly permitted by a persisted quota. All of these defaults use TimeUnit.SECONDS and 086 * QuotaScope.MACHINE. 087 */ 088 public static final String QUOTA_DEFAULT_USER_MACHINE_READ_NUM = 089 "hbase.quota.default.user.machine.read.num"; 090 public static final String QUOTA_DEFAULT_USER_MACHINE_READ_SIZE = 091 "hbase.quota.default.user.machine.read.size"; 092 public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM = 093 "hbase.quota.default.user.machine.request.num"; 094 public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE = 095 "hbase.quota.default.user.machine.request.size"; 096 public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM = 097 "hbase.quota.default.user.machine.write.num"; 098 public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE = 099 "hbase.quota.default.user.machine.write.size"; 100 public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE = 101 "hbase.quota.default.user.machine.atomic.read.size"; 102 public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM = 103 "hbase.quota.default.user.machine.atomic.request.num"; 104 public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE = 105 "hbase.quota.default.user.machine.atomic.write.size"; 106 public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_HANDLER_USAGE_MS = 107 "hbase.quota.default.user.machine.request.handler.usage.ms"; 108 109 /** Table descriptor for Quota internal table */ 110 public static final TableDescriptor QUOTA_TABLE_DESC = 111 TableDescriptorBuilder.newBuilder(QUOTA_TABLE_NAME) 112 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_INFO) 113 .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW) 114 .setMaxVersions(1).build()) 115 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_USAGE) 116 .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW) 117 .setMaxVersions(1).build()) 118 .build(); 119 120 /** Returns true if the support for quota is enabled */ 121 public static boolean isQuotaEnabled(final Configuration conf) { 122 return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT); 123 } 124 125 /* 126 * ========================================================================= Quota "settings" 127 * helpers 128 */ 129 public static void addTableQuota(final Connection connection, final TableName table, 130 final Quotas data) throws IOException { 131 addQuotas(connection, getTableRowKey(table), data); 132 } 133 134 public static void deleteTableQuota(final Connection connection, final TableName table) 135 throws IOException { 136 deleteQuotas(connection, getTableRowKey(table)); 137 } 138 139 public static void addNamespaceQuota(final Connection connection, final String namespace, 140 final Quotas data) throws IOException { 141 addQuotas(connection, getNamespaceRowKey(namespace), data); 142 } 143 144 public static void deleteNamespaceQuota(final Connection connection, final String namespace) 145 throws IOException { 146 deleteQuotas(connection, getNamespaceRowKey(namespace)); 147 } 148 149 public static void addUserQuota(final Connection connection, final String user, final Quotas data) 150 throws IOException { 151 addQuotas(connection, getUserRowKey(user), data); 152 } 153 154 public static void addUserQuota(final Connection connection, final String user, 155 final TableName table, final Quotas data) throws IOException { 156 addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table), data); 157 } 158 159 public static void addUserQuota(final Connection connection, final String user, 160 final String namespace, final Quotas data) throws IOException { 161 addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace), 162 data); 163 } 164 165 public static void deleteUserQuota(final Connection connection, final String user) 166 throws IOException { 167 deleteQuotas(connection, getUserRowKey(user)); 168 } 169 170 public static void deleteUserQuota(final Connection connection, final String user, 171 final TableName table) throws IOException { 172 deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table)); 173 } 174 175 public static void deleteUserQuota(final Connection connection, final String user, 176 final String namespace) throws IOException { 177 deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace)); 178 } 179 180 public static void addRegionServerQuota(final Connection connection, final String regionServer, 181 final Quotas data) throws IOException { 182 addQuotas(connection, getRegionServerRowKey(regionServer), data); 183 } 184 185 public static void deleteRegionServerQuota(final Connection connection, final String regionServer) 186 throws IOException { 187 deleteQuotas(connection, getRegionServerRowKey(regionServer)); 188 } 189 190 public static OperationQuota.OperationType getQuotaOperationType(ClientProtos.Action action, 191 boolean hasCondition) { 192 if (action.hasMutation()) { 193 return getQuotaOperationType(action.getMutation(), hasCondition); 194 } 195 return OperationQuota.OperationType.GET; 196 } 197 198 public static OperationQuota.OperationType 199 getQuotaOperationType(ClientProtos.MutateRequest mutateRequest) { 200 return getQuotaOperationType(mutateRequest.getMutation(), mutateRequest.hasCondition()); 201 } 202 203 private static OperationQuota.OperationType 204 getQuotaOperationType(ClientProtos.MutationProto mutationProto, boolean hasCondition) { 205 ClientProtos.MutationProto.MutationType mutationType = mutationProto.getMutateType(); 206 if ( 207 hasCondition || mutationType == ClientProtos.MutationProto.MutationType.APPEND 208 || mutationType == ClientProtos.MutationProto.MutationType.INCREMENT 209 ) { 210 return OperationQuota.OperationType.CHECK_AND_MUTATE; 211 } 212 return OperationQuota.OperationType.MUTATE; 213 } 214 215 protected static void switchExceedThrottleQuota(final Connection connection, 216 boolean exceedThrottleQuotaEnabled) throws IOException { 217 if (exceedThrottleQuotaEnabled) { 218 checkRSQuotaToEnableExceedThrottle( 219 getRegionServerQuota(connection, QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY)); 220 } 221 222 Put put = new Put(getExceedThrottleQuotaRowKey()); 223 put.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS, 224 Bytes.toBytes(exceedThrottleQuotaEnabled)); 225 doPut(connection, put); 226 } 227 228 private static void checkRSQuotaToEnableExceedThrottle(Quotas quotas) throws IOException { 229 if (quotas != null && quotas.hasThrottle()) { 230 Throttle throttle = quotas.getThrottle(); 231 // If enable exceed throttle quota, make sure that there are at least one read(req/read + 232 // num/size/cu) and one write(req/write + num/size/cu) region server throttle quotas. 233 boolean hasReadQuota = false; 234 boolean hasWriteQuota = false; 235 if (throttle.hasReqNum() || throttle.hasReqSize() || throttle.hasReqCapacityUnit()) { 236 hasReadQuota = true; 237 hasWriteQuota = true; 238 } 239 if ( 240 !hasReadQuota 241 && (throttle.hasReadNum() || throttle.hasReadSize() || throttle.hasReadCapacityUnit()) 242 ) { 243 hasReadQuota = true; 244 } 245 if (!hasReadQuota) { 246 throw new DoNotRetryIOException( 247 "Please set at least one read region server quota before enable exceed throttle quota"); 248 } 249 if ( 250 !hasWriteQuota 251 && (throttle.hasWriteNum() || throttle.hasWriteSize() || throttle.hasWriteCapacityUnit()) 252 ) { 253 hasWriteQuota = true; 254 } 255 if (!hasWriteQuota) { 256 throw new DoNotRetryIOException("Please set at least one write region server quota " 257 + "before enable exceed throttle quota"); 258 } 259 // If enable exceed throttle quota, make sure that region server throttle quotas are in 260 // seconds time unit. Because once previous requests exceed their quota and consume region 261 // server quota, quota in other time units may be refilled in a long time, this may affect 262 // later requests. 263 List<Pair<Boolean, TimedQuota>> list = 264 Arrays.asList(Pair.newPair(throttle.hasReqNum(), throttle.getReqNum()), 265 Pair.newPair(throttle.hasReadNum(), throttle.getReadNum()), 266 Pair.newPair(throttle.hasWriteNum(), throttle.getWriteNum()), 267 Pair.newPair(throttle.hasReqSize(), throttle.getReqSize()), 268 Pair.newPair(throttle.hasReadSize(), throttle.getReadSize()), 269 Pair.newPair(throttle.hasWriteSize(), throttle.getWriteSize()), 270 Pair.newPair(throttle.hasReqCapacityUnit(), throttle.getReqCapacityUnit()), 271 Pair.newPair(throttle.hasReadCapacityUnit(), throttle.getReadCapacityUnit()), 272 Pair.newPair(throttle.hasWriteCapacityUnit(), throttle.getWriteCapacityUnit())); 273 for (Pair<Boolean, TimedQuota> pair : list) { 274 if (pair.getFirst()) { 275 if (pair.getSecond().getTimeUnit() != TimeUnit.SECONDS) { 276 throw new DoNotRetryIOException("All region server quota must be " 277 + "in seconds time unit if enable exceed throttle quota"); 278 } 279 } 280 } 281 } else { 282 // If enable exceed throttle quota, make sure that region server quota is already set 283 throw new DoNotRetryIOException( 284 "Please set region server quota before enable exceed throttle quota"); 285 } 286 } 287 288 protected static boolean isExceedThrottleQuotaEnabled(final Connection connection) 289 throws IOException { 290 Get get = new Get(getExceedThrottleQuotaRowKey()); 291 get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 292 Result result = doGet(connection, get); 293 if (result.isEmpty()) { 294 return false; 295 } 296 return Bytes.toBoolean(result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS)); 297 } 298 299 private static void addQuotas(final Connection connection, final byte[] rowKey, final Quotas data) 300 throws IOException { 301 addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data); 302 } 303 304 private static void addQuotas(final Connection connection, final byte[] rowKey, 305 final byte[] qualifier, final Quotas data) throws IOException { 306 Put put = new Put(rowKey); 307 put.addColumn(QUOTA_FAMILY_INFO, qualifier, quotasToData(data)); 308 doPut(connection, put); 309 } 310 311 private static void deleteQuotas(final Connection connection, final byte[] rowKey) 312 throws IOException { 313 deleteQuotas(connection, rowKey, null); 314 } 315 316 private static void deleteQuotas(final Connection connection, final byte[] rowKey, 317 final byte[] qualifier) throws IOException { 318 Delete delete = new Delete(rowKey); 319 if (qualifier != null) { 320 delete.addColumns(QUOTA_FAMILY_INFO, qualifier); 321 } 322 if (isNamespaceRowKey(rowKey)) { 323 String ns = getNamespaceFromRowKey(rowKey); 324 Quotas namespaceQuota = getNamespaceQuota(connection, ns); 325 if (namespaceQuota != null && namespaceQuota.hasSpace()) { 326 // When deleting namespace space quota, also delete table usage(u:p) snapshots 327 deleteTableUsageSnapshotsForNamespace(connection, ns); 328 } 329 } 330 doDelete(connection, delete); 331 } 332 333 public static Map<String, UserQuotaState> fetchUserQuotas(final Configuration conf, 334 final Connection connection, Map<TableName, Double> tableMachineQuotaFactors, double factor) 335 throws IOException { 336 Map<String, UserQuotaState> userQuotas = new HashMap<>(); 337 try (Table table = connection.getTable(QUOTA_TABLE_NAME)) { 338 Scan scan = new Scan(); 339 scan.addFamily(QUOTA_FAMILY_INFO); 340 scan.setStartStopRowForPrefixScan(QUOTA_USER_ROW_KEY_PREFIX); 341 try (ResultScanner resultScanner = table.getScanner(scan)) { 342 for (Result result : resultScanner) { 343 byte[] key = result.getRow(); 344 assert isUserRowKey(key); 345 String user = getUserFromRowKey(key); 346 347 final UserQuotaState quotaInfo = new UserQuotaState(); 348 userQuotas.put(user, quotaInfo); 349 350 try { 351 parseUserResult(user, result, new UserQuotasVisitor() { 352 @Override 353 public void visitUserQuotas(String userName, String namespace, Quotas quotas) { 354 quotas = updateClusterQuotaToMachineQuota(quotas, factor); 355 quotaInfo.setQuotas(conf, namespace, quotas); 356 } 357 358 @Override 359 public void visitUserQuotas(String userName, TableName table, Quotas quotas) { 360 quotas = updateClusterQuotaToMachineQuota(quotas, 361 tableMachineQuotaFactors.containsKey(table) 362 ? tableMachineQuotaFactors.get(table) 363 : 1); 364 quotaInfo.setQuotas(conf, table, quotas); 365 } 366 367 @Override 368 public void visitUserQuotas(String userName, Quotas quotas) { 369 quotas = updateClusterQuotaToMachineQuota(quotas, factor); 370 quotaInfo.setQuotas(conf, quotas); 371 } 372 }); 373 } catch (IOException e) { 374 LOG.error("Unable to parse user '" + user + "' quotas", e); 375 userQuotas.remove(user); 376 } 377 } 378 } 379 } 380 381 return userQuotas; 382 } 383 384 protected static UserQuotaState buildDefaultUserQuotaState(Configuration conf) { 385 QuotaProtos.Throttle.Builder throttleBuilder = QuotaProtos.Throttle.newBuilder(); 386 387 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_NUM) 388 .ifPresent(throttleBuilder::setReadNum); 389 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_SIZE) 390 .ifPresent(throttleBuilder::setReadSize); 391 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM) 392 .ifPresent(throttleBuilder::setReqNum); 393 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE) 394 .ifPresent(throttleBuilder::setReqSize); 395 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM) 396 .ifPresent(throttleBuilder::setWriteNum); 397 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE) 398 .ifPresent(throttleBuilder::setWriteSize); 399 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE) 400 .ifPresent(throttleBuilder::setAtomicReadSize); 401 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM) 402 .ifPresent(throttleBuilder::setAtomicReqNum); 403 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE) 404 .ifPresent(throttleBuilder::setAtomicWriteSize); 405 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_HANDLER_USAGE_MS) 406 .ifPresent(throttleBuilder::setReqHandlerUsageMs); 407 408 UserQuotaState state = new UserQuotaState(); 409 QuotaProtos.Quotas defaultQuotas = 410 QuotaProtos.Quotas.newBuilder().setThrottle(throttleBuilder.build()).build(); 411 state.setQuotas(conf, defaultQuotas); 412 return state; 413 } 414 415 private static Optional<TimedQuota> buildDefaultTimedQuota(Configuration conf, String key) { 416 int defaultSoftLimit = conf.getInt(key, -1); 417 if (defaultSoftLimit == -1) { 418 return Optional.empty(); 419 } 420 return Optional.of(ProtobufUtil.toTimedQuota(defaultSoftLimit, 421 java.util.concurrent.TimeUnit.SECONDS, org.apache.hadoop.hbase.quotas.QuotaScope.MACHINE)); 422 } 423 424 public static Map<TableName, QuotaState> fetchTableQuotas(final Configuration conf, 425 final Connection connection, Map<TableName, Double> tableMachineFactors) throws IOException { 426 Scan scan = new Scan(); 427 scan.addFamily(QUOTA_FAMILY_INFO); 428 scan.setStartStopRowForPrefixScan(QUOTA_TABLE_ROW_KEY_PREFIX); 429 return fetchGlobalQuotas(conf, "table", scan, connection, new KeyFromRow<TableName>() { 430 @Override 431 public TableName getKeyFromRow(final byte[] row) { 432 assert isTableRowKey(row); 433 return getTableFromRowKey(row); 434 } 435 436 @Override 437 public double getFactor(TableName tableName) { 438 return tableMachineFactors.containsKey(tableName) ? tableMachineFactors.get(tableName) : 1; 439 } 440 }); 441 } 442 443 public static Map<String, QuotaState> fetchNamespaceQuotas(final Configuration conf, 444 final Connection connection, double factor) throws IOException { 445 Scan scan = new Scan(); 446 scan.addFamily(QUOTA_FAMILY_INFO); 447 scan.setStartStopRowForPrefixScan(QUOTA_NAMESPACE_ROW_KEY_PREFIX); 448 return fetchGlobalQuotas(conf, "namespace", scan, connection, new KeyFromRow<String>() { 449 @Override 450 public String getKeyFromRow(final byte[] row) { 451 assert isNamespaceRowKey(row); 452 return getNamespaceFromRowKey(row); 453 } 454 455 @Override 456 public double getFactor(String s) { 457 return factor; 458 } 459 }); 460 } 461 462 public static Map<String, QuotaState> fetchRegionServerQuotas(final Configuration conf, 463 final Connection connection) throws IOException { 464 Scan scan = new Scan(); 465 scan.addFamily(QUOTA_FAMILY_INFO); 466 scan.setStartStopRowForPrefixScan(QUOTA_REGION_SERVER_ROW_KEY_PREFIX); 467 return fetchGlobalQuotas(conf, "regionServer", scan, connection, new KeyFromRow<String>() { 468 @Override 469 public String getKeyFromRow(final byte[] row) { 470 assert isRegionServerRowKey(row); 471 return getRegionServerFromRowKey(row); 472 } 473 474 @Override 475 public double getFactor(String s) { 476 return 1; 477 } 478 }); 479 } 480 481 public static <K> Map<K, QuotaState> fetchGlobalQuotas(final Configuration conf, 482 final String type, final Scan scan, final Connection connection, final KeyFromRow<K> kfr) 483 throws IOException { 484 485 Map<K, QuotaState> globalQuotas = new HashMap<>(); 486 try (Table table = connection.getTable(QUOTA_TABLE_NAME)) { 487 try (ResultScanner resultScanner = table.getScanner(scan)) { 488 for (Result result : resultScanner) { 489 490 byte[] row = result.getRow(); 491 K key = kfr.getKeyFromRow(row); 492 493 QuotaState quotaInfo = new QuotaState(); 494 globalQuotas.put(key, quotaInfo); 495 496 byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 497 if (data == null) { 498 continue; 499 } 500 501 try { 502 Quotas quotas = quotasFromData(data); 503 quotas = updateClusterQuotaToMachineQuota(quotas, kfr.getFactor(key)); 504 quotaInfo.setQuotas(conf, quotas); 505 } catch (IOException e) { 506 LOG.error("Unable to parse {} '{}' quotas", type, key, e); 507 globalQuotas.remove(key); 508 } 509 } 510 } 511 } 512 return globalQuotas; 513 } 514 515 /** 516 * Convert cluster scope quota to machine scope quota 517 * @param quotas the original quota 518 * @param factor factor used to divide cluster limiter to machine limiter 519 * @return the converted quota whose quota limiters all in machine scope 520 */ 521 private static Quotas updateClusterQuotaToMachineQuota(Quotas quotas, double factor) { 522 Quotas.Builder newQuotas = Quotas.newBuilder(quotas); 523 if (newQuotas.hasThrottle()) { 524 Throttle.Builder throttle = Throttle.newBuilder(newQuotas.getThrottle()); 525 if (throttle.hasReqNum()) { 526 throttle.setReqNum(updateTimedQuota(throttle.getReqNum(), factor)); 527 } 528 if (throttle.hasReqSize()) { 529 throttle.setReqSize(updateTimedQuota(throttle.getReqSize(), factor)); 530 } 531 if (throttle.hasReadNum()) { 532 throttle.setReadNum(updateTimedQuota(throttle.getReadNum(), factor)); 533 } 534 if (throttle.hasReadSize()) { 535 throttle.setReadSize(updateTimedQuota(throttle.getReadSize(), factor)); 536 } 537 if (throttle.hasWriteNum()) { 538 throttle.setWriteNum(updateTimedQuota(throttle.getWriteNum(), factor)); 539 } 540 if (throttle.hasWriteSize()) { 541 throttle.setWriteSize(updateTimedQuota(throttle.getWriteSize(), factor)); 542 } 543 if (throttle.hasReqCapacityUnit()) { 544 throttle.setReqCapacityUnit(updateTimedQuota(throttle.getReqCapacityUnit(), factor)); 545 } 546 if (throttle.hasReadCapacityUnit()) { 547 throttle.setReadCapacityUnit(updateTimedQuota(throttle.getReadCapacityUnit(), factor)); 548 } 549 if (throttle.hasWriteCapacityUnit()) { 550 throttle.setWriteCapacityUnit(updateTimedQuota(throttle.getWriteCapacityUnit(), factor)); 551 } 552 newQuotas.setThrottle(throttle.build()); 553 } 554 return newQuotas.build(); 555 } 556 557 private static TimedQuota updateTimedQuota(TimedQuota timedQuota, double factor) { 558 if (timedQuota.getScope() == QuotaScope.CLUSTER) { 559 TimedQuota.Builder newTimedQuota = TimedQuota.newBuilder(timedQuota); 560 newTimedQuota.setSoftLimit(Math.max(1, (long) (timedQuota.getSoftLimit() * factor))) 561 .setScope(QuotaScope.MACHINE); 562 return newTimedQuota.build(); 563 } else { 564 return timedQuota; 565 } 566 } 567 568 private static interface KeyFromRow<T> { 569 T getKeyFromRow(final byte[] row); 570 571 double getFactor(T t); 572 } 573 574 /* 575 * ========================================================================= HTable helpers 576 */ 577 private static void doPut(final Connection connection, final Put put) throws IOException { 578 try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 579 table.put(put); 580 } 581 } 582 583 private static void doDelete(final Connection connection, final Delete delete) 584 throws IOException { 585 try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 586 table.delete(delete); 587 } 588 } 589 590 /* 591 * ========================================================================= Data Size Helpers 592 */ 593 public static long calculateMutationSize(final Mutation mutation) { 594 long size = 0; 595 for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) { 596 for (Cell cell : entry.getValue()) { 597 size += cell.getSerializedSize(); 598 } 599 } 600 return size; 601 } 602 603 public static long calculateResultSize(final Result result) { 604 long size = 0; 605 for (Cell cell : result.rawCells()) { 606 size += cell.getSerializedSize(); 607 } 608 return size; 609 } 610 611 public static long calculateResultSize(final List<Result> results) { 612 long size = 0; 613 for (Result result : results) { 614 for (Cell cell : result.rawCells()) { 615 size += cell.getSerializedSize(); 616 } 617 } 618 return size; 619 } 620 621 public static long calculateCellsSize(final List<Cell> cells) { 622 long size = 0; 623 for (Cell cell : cells) { 624 size += cell.getSerializedSize(); 625 } 626 return size; 627 } 628 629 /** 630 * Method to enable a table, if not already enabled. This method suppresses 631 * {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling 632 * the table. 633 * @param conn connection to re-use 634 * @param tableName name of the table to be enabled 635 */ 636 public static void enableTableIfNotEnabled(Connection conn, TableName tableName) 637 throws IOException { 638 try { 639 conn.getAdmin().enableTable(tableName); 640 } catch (TableNotDisabledException | TableNotFoundException e) { 641 // ignore 642 } 643 } 644 645 /** 646 * Method to disable a table, if not already disabled. This method suppresses 647 * {@link TableNotEnabledException}, if thrown while disabling the table. 648 * @param conn connection to re-use 649 * @param tableName table name which has moved into space quota violation 650 */ 651 public static void disableTableIfNotDisabled(Connection conn, TableName tableName) 652 throws IOException { 653 try { 654 conn.getAdmin().disableTable(tableName); 655 } catch (TableNotEnabledException | TableNotFoundException e) { 656 // ignore 657 } 658 } 659}