001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.Optional; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.DoNotRetryIOException; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.TableNotDisabledException; 032import org.apache.hadoop.hbase.TableNotEnabledException; 033import org.apache.hadoop.hbase.TableNotFoundException; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.Delete; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.Mutation; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.client.TableDescriptor; 045import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 046import org.apache.hadoop.hbase.regionserver.BloomType; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.Pair; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.apache.yetus.audience.InterfaceStability; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota; 062 063/** 064 * Helper class to interact with the quota table 065 */ 066@InterfaceAudience.Private 067@InterfaceStability.Evolving 068public class QuotaUtil extends QuotaTableUtil { 069 private static final Logger LOG = LoggerFactory.getLogger(QuotaUtil.class); 070 071 public static final String QUOTA_CONF_KEY = "hbase.quota.enabled"; 072 private static final boolean QUOTA_ENABLED_DEFAULT = false; 073 074 public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit"; 075 // the default one read capacity unit is 1024 bytes (1KB) 076 public static final long DEFAULT_READ_CAPACITY_UNIT = 1024; 077 public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit"; 078 // the default one write capacity unit is 1024 bytes (1KB) 079 public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024; 080 081 /* 082 * The below defaults, if configured, will be applied to otherwise unthrottled users. For example, 083 * set `hbase.quota.default.user.machine.read.size` to `1048576` in your hbase-site.xml to ensure 084 * that any given user may not query more than 1mb per second from any given machine, unless 085 * explicitly permitted by a persisted quota. All of these defaults use TimeUnit.SECONDS and 086 * QuotaScope.MACHINE. 087 */ 088 public static final String QUOTA_DEFAULT_USER_MACHINE_READ_NUM = 089 "hbase.quota.default.user.machine.read.num"; 090 public static final String QUOTA_DEFAULT_USER_MACHINE_READ_SIZE = 091 "hbase.quota.default.user.machine.read.size"; 092 public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM = 093 "hbase.quota.default.user.machine.request.num"; 094 public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE = 095 "hbase.quota.default.user.machine.request.size"; 096 public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM = 097 "hbase.quota.default.user.machine.write.num"; 098 public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE = 099 "hbase.quota.default.user.machine.write.size"; 100 public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE = 101 "hbase.quota.default.user.machine.atomic.read.size"; 102 public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM = 103 "hbase.quota.default.user.machine.atomic.request.num"; 104 public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE = 105 "hbase.quota.default.user.machine.atomic.write.size"; 106 public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_HANDLER_USAGE_MS = 107 "hbase.quota.default.user.machine.request.handler.usage.ms"; 108 109 /** Table descriptor for Quota internal table */ 110 public static final TableDescriptor QUOTA_TABLE_DESC = 111 TableDescriptorBuilder.newBuilder(QUOTA_TABLE_NAME) 112 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_INFO) 113 .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW) 114 .setMaxVersions(1).build()) 115 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_USAGE) 116 .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW) 117 .setMaxVersions(1).build()) 118 .build(); 119 120 /** Returns true if the support for quota is enabled */ 121 public static boolean isQuotaEnabled(final Configuration conf) { 122 return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT); 123 } 124 125 /* 126 * ========================================================================= Quota "settings" 127 * helpers 128 */ 129 public static void addTableQuota(final Connection connection, final TableName table, 130 final Quotas data) throws IOException { 131 addQuotas(connection, getTableRowKey(table), data); 132 } 133 134 public static void deleteTableQuota(final Connection connection, final TableName table) 135 throws IOException { 136 deleteQuotas(connection, getTableRowKey(table)); 137 } 138 139 public static void addNamespaceQuota(final Connection connection, final String namespace, 140 final Quotas data) throws IOException { 141 addQuotas(connection, getNamespaceRowKey(namespace), data); 142 } 143 144 public static void deleteNamespaceQuota(final Connection connection, final String namespace) 145 throws IOException { 146 deleteQuotas(connection, getNamespaceRowKey(namespace)); 147 } 148 149 public static void addUserQuota(final Connection connection, final String user, final Quotas data) 150 throws IOException { 151 addQuotas(connection, getUserRowKey(user), data); 152 } 153 154 public static void addUserQuota(final Connection connection, final String user, 155 final TableName table, final Quotas data) throws IOException { 156 addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table), data); 157 } 158 159 public static void addUserQuota(final Connection connection, final String user, 160 final String namespace, final Quotas data) throws IOException { 161 addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace), 162 data); 163 } 164 165 public static void deleteUserQuota(final Connection connection, final String user) 166 throws IOException { 167 deleteQuotas(connection, getUserRowKey(user)); 168 } 169 170 public static void deleteUserQuota(final Connection connection, final String user, 171 final TableName table) throws IOException { 172 deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table)); 173 } 174 175 public static void deleteUserQuota(final Connection connection, final String user, 176 final String namespace) throws IOException { 177 deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace)); 178 } 179 180 public static void addRegionServerQuota(final Connection connection, final String regionServer, 181 final Quotas data) throws IOException { 182 addQuotas(connection, getRegionServerRowKey(regionServer), data); 183 } 184 185 public static void deleteRegionServerQuota(final Connection connection, final String regionServer) 186 throws IOException { 187 deleteQuotas(connection, getRegionServerRowKey(regionServer)); 188 } 189 190 public static OperationQuota.OperationType getQuotaOperationType(ClientProtos.Action action, 191 boolean hasCondition) { 192 if (action.hasMutation()) { 193 return getQuotaOperationType(action.getMutation(), hasCondition); 194 } 195 return OperationQuota.OperationType.GET; 196 } 197 198 public static OperationQuota.OperationType 199 getQuotaOperationType(ClientProtos.MutateRequest mutateRequest) { 200 return getQuotaOperationType(mutateRequest.getMutation(), mutateRequest.hasCondition()); 201 } 202 203 private static OperationQuota.OperationType 204 getQuotaOperationType(ClientProtos.MutationProto mutationProto, boolean hasCondition) { 205 ClientProtos.MutationProto.MutationType mutationType = mutationProto.getMutateType(); 206 if ( 207 hasCondition || mutationType == ClientProtos.MutationProto.MutationType.APPEND 208 || mutationType == ClientProtos.MutationProto.MutationType.INCREMENT 209 ) { 210 return OperationQuota.OperationType.CHECK_AND_MUTATE; 211 } 212 return OperationQuota.OperationType.MUTATE; 213 } 214 215 protected static void switchExceedThrottleQuota(final Connection connection, 216 boolean exceedThrottleQuotaEnabled) throws IOException { 217 if (exceedThrottleQuotaEnabled) { 218 checkRSQuotaToEnableExceedThrottle( 219 getRegionServerQuota(connection, QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY)); 220 } 221 222 Put put = new Put(getExceedThrottleQuotaRowKey()); 223 put.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS, 224 Bytes.toBytes(exceedThrottleQuotaEnabled)); 225 doPut(connection, put); 226 } 227 228 private static void checkRSQuotaToEnableExceedThrottle(Quotas quotas) throws IOException { 229 if (quotas != null && quotas.hasThrottle()) { 230 Throttle throttle = quotas.getThrottle(); 231 // If enable exceed throttle quota, make sure that there are at least one read(req/read + 232 // num/size/cu) and one write(req/write + num/size/cu) region server throttle quotas. 233 boolean hasReadQuota = false; 234 boolean hasWriteQuota = false; 235 if (throttle.hasReqNum() || throttle.hasReqSize() || throttle.hasReqCapacityUnit()) { 236 hasReadQuota = true; 237 hasWriteQuota = true; 238 } 239 if ( 240 !hasReadQuota 241 && (throttle.hasReadNum() || throttle.hasReadSize() || throttle.hasReadCapacityUnit()) 242 ) { 243 hasReadQuota = true; 244 } 245 if (!hasReadQuota) { 246 throw new DoNotRetryIOException( 247 "Please set at least one read region server quota before enable exceed throttle quota"); 248 } 249 if ( 250 !hasWriteQuota 251 && (throttle.hasWriteNum() || throttle.hasWriteSize() || throttle.hasWriteCapacityUnit()) 252 ) { 253 hasWriteQuota = true; 254 } 255 if (!hasWriteQuota) { 256 throw new DoNotRetryIOException("Please set at least one write region server quota " 257 + "before enable exceed throttle quota"); 258 } 259 // If enable exceed throttle quota, make sure that region server throttle quotas are in 260 // seconds time unit. Because once previous requests exceed their quota and consume region 261 // server quota, quota in other time units may be refilled in a long time, this may affect 262 // later requests. 263 List<Pair<Boolean, TimedQuota>> list = 264 Arrays.asList(Pair.newPair(throttle.hasReqNum(), throttle.getReqNum()), 265 Pair.newPair(throttle.hasReadNum(), throttle.getReadNum()), 266 Pair.newPair(throttle.hasWriteNum(), throttle.getWriteNum()), 267 Pair.newPair(throttle.hasReqSize(), throttle.getReqSize()), 268 Pair.newPair(throttle.hasReadSize(), throttle.getReadSize()), 269 Pair.newPair(throttle.hasWriteSize(), throttle.getWriteSize()), 270 Pair.newPair(throttle.hasReqCapacityUnit(), throttle.getReqCapacityUnit()), 271 Pair.newPair(throttle.hasReadCapacityUnit(), throttle.getReadCapacityUnit()), 272 Pair.newPair(throttle.hasWriteCapacityUnit(), throttle.getWriteCapacityUnit())); 273 for (Pair<Boolean, TimedQuota> pair : list) { 274 if (pair.getFirst()) { 275 if (pair.getSecond().getTimeUnit() != TimeUnit.SECONDS) { 276 throw new DoNotRetryIOException("All region server quota must be " 277 + "in seconds time unit if enable exceed throttle quota"); 278 } 279 } 280 } 281 } else { 282 // If enable exceed throttle quota, make sure that region server quota is already set 283 throw new DoNotRetryIOException( 284 "Please set region server quota before enable exceed throttle quota"); 285 } 286 } 287 288 protected static boolean isExceedThrottleQuotaEnabled(final Connection connection) 289 throws IOException { 290 Get get = new Get(getExceedThrottleQuotaRowKey()); 291 get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 292 Result result = doGet(connection, get); 293 if (result.isEmpty()) { 294 return false; 295 } 296 return Bytes.toBoolean(result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS)); 297 } 298 299 private static void addQuotas(final Connection connection, final byte[] rowKey, final Quotas data) 300 throws IOException { 301 addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data); 302 } 303 304 private static void addQuotas(final Connection connection, final byte[] rowKey, 305 final byte[] qualifier, final Quotas data) throws IOException { 306 Put put = new Put(rowKey); 307 put.addColumn(QUOTA_FAMILY_INFO, qualifier, quotasToData(data)); 308 doPut(connection, put); 309 } 310 311 private static void deleteQuotas(final Connection connection, final byte[] rowKey) 312 throws IOException { 313 deleteQuotas(connection, rowKey, null); 314 } 315 316 private static void deleteQuotas(final Connection connection, final byte[] rowKey, 317 final byte[] qualifier) throws IOException { 318 Delete delete = new Delete(rowKey); 319 if (qualifier != null) { 320 delete.addColumns(QUOTA_FAMILY_INFO, qualifier); 321 } 322 if (isNamespaceRowKey(rowKey)) { 323 String ns = getNamespaceFromRowKey(rowKey); 324 Quotas namespaceQuota = getNamespaceQuota(connection, ns); 325 if (namespaceQuota != null && namespaceQuota.hasSpace()) { 326 // When deleting namespace space quota, also delete table usage(u:p) snapshots 327 deleteTableUsageSnapshotsForNamespace(connection, ns); 328 } 329 } 330 doDelete(connection, delete); 331 } 332 333 public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection, 334 Map<TableName, Double> tableMachineQuotaFactors, double factor) throws IOException { 335 Map<String, UserQuotaState> userQuotas = new HashMap<>(); 336 try (Table table = connection.getTable(QUOTA_TABLE_NAME)) { 337 Scan scan = new Scan(); 338 scan.addFamily(QUOTA_FAMILY_INFO); 339 scan.setStartStopRowForPrefixScan(QUOTA_USER_ROW_KEY_PREFIX); 340 try (ResultScanner resultScanner = table.getScanner(scan)) { 341 for (Result result : resultScanner) { 342 byte[] key = result.getRow(); 343 assert isUserRowKey(key); 344 String user = getUserFromRowKey(key); 345 346 final UserQuotaState quotaInfo = new UserQuotaState(); 347 userQuotas.put(user, quotaInfo); 348 349 try { 350 parseUserResult(user, result, new UserQuotasVisitor() { 351 @Override 352 public void visitUserQuotas(String userName, String namespace, Quotas quotas) { 353 quotas = updateClusterQuotaToMachineQuota(quotas, factor); 354 quotaInfo.setQuotas(namespace, quotas); 355 } 356 357 @Override 358 public void visitUserQuotas(String userName, TableName table, Quotas quotas) { 359 quotas = updateClusterQuotaToMachineQuota(quotas, 360 tableMachineQuotaFactors.containsKey(table) 361 ? tableMachineQuotaFactors.get(table) 362 : 1); 363 quotaInfo.setQuotas(table, quotas); 364 } 365 366 @Override 367 public void visitUserQuotas(String userName, Quotas quotas) { 368 quotas = updateClusterQuotaToMachineQuota(quotas, factor); 369 quotaInfo.setQuotas(quotas); 370 } 371 }); 372 } catch (IOException e) { 373 LOG.error("Unable to parse user '" + user + "' quotas", e); 374 userQuotas.remove(user); 375 } 376 } 377 } 378 } 379 380 return userQuotas; 381 } 382 383 protected static UserQuotaState buildDefaultUserQuotaState(Configuration conf) { 384 QuotaProtos.Throttle.Builder throttleBuilder = QuotaProtos.Throttle.newBuilder(); 385 386 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_NUM) 387 .ifPresent(throttleBuilder::setReadNum); 388 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_SIZE) 389 .ifPresent(throttleBuilder::setReadSize); 390 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM) 391 .ifPresent(throttleBuilder::setReqNum); 392 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE) 393 .ifPresent(throttleBuilder::setReqSize); 394 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM) 395 .ifPresent(throttleBuilder::setWriteNum); 396 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE) 397 .ifPresent(throttleBuilder::setWriteSize); 398 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE) 399 .ifPresent(throttleBuilder::setAtomicReadSize); 400 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM) 401 .ifPresent(throttleBuilder::setAtomicReqNum); 402 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE) 403 .ifPresent(throttleBuilder::setAtomicWriteSize); 404 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_HANDLER_USAGE_MS) 405 .ifPresent(throttleBuilder::setReqHandlerUsageMs); 406 407 UserQuotaState state = new UserQuotaState(); 408 QuotaProtos.Quotas defaultQuotas = 409 QuotaProtos.Quotas.newBuilder().setThrottle(throttleBuilder.build()).build(); 410 state.setQuotas(defaultQuotas); 411 return state; 412 } 413 414 private static Optional<TimedQuota> buildDefaultTimedQuota(Configuration conf, String key) { 415 int defaultSoftLimit = conf.getInt(key, -1); 416 if (defaultSoftLimit == -1) { 417 return Optional.empty(); 418 } 419 return Optional.of(ProtobufUtil.toTimedQuota(defaultSoftLimit, 420 java.util.concurrent.TimeUnit.SECONDS, org.apache.hadoop.hbase.quotas.QuotaScope.MACHINE)); 421 } 422 423 public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection, 424 Map<TableName, Double> tableMachineFactors) throws IOException { 425 Scan scan = new Scan(); 426 scan.addFamily(QUOTA_FAMILY_INFO); 427 scan.setStartStopRowForPrefixScan(QUOTA_TABLE_ROW_KEY_PREFIX); 428 return fetchGlobalQuotas("table", scan, connection, new KeyFromRow<TableName>() { 429 @Override 430 public TableName getKeyFromRow(final byte[] row) { 431 assert isTableRowKey(row); 432 return getTableFromRowKey(row); 433 } 434 435 @Override 436 public double getFactor(TableName tableName) { 437 return tableMachineFactors.containsKey(tableName) ? tableMachineFactors.get(tableName) : 1; 438 } 439 }); 440 } 441 442 public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection, 443 double factor) throws IOException { 444 Scan scan = new Scan(); 445 scan.addFamily(QUOTA_FAMILY_INFO); 446 scan.setStartStopRowForPrefixScan(QUOTA_NAMESPACE_ROW_KEY_PREFIX); 447 return fetchGlobalQuotas("namespace", scan, connection, new KeyFromRow<String>() { 448 @Override 449 public String getKeyFromRow(final byte[] row) { 450 assert isNamespaceRowKey(row); 451 return getNamespaceFromRowKey(row); 452 } 453 454 @Override 455 public double getFactor(String s) { 456 return factor; 457 } 458 }); 459 } 460 461 public static Map<String, QuotaState> fetchRegionServerQuotas(final Connection connection) 462 throws IOException { 463 Scan scan = new Scan(); 464 scan.addFamily(QUOTA_FAMILY_INFO); 465 scan.setStartStopRowForPrefixScan(QUOTA_REGION_SERVER_ROW_KEY_PREFIX); 466 return fetchGlobalQuotas("regionServer", scan, connection, new KeyFromRow<String>() { 467 @Override 468 public String getKeyFromRow(final byte[] row) { 469 assert isRegionServerRowKey(row); 470 return getRegionServerFromRowKey(row); 471 } 472 473 @Override 474 public double getFactor(String s) { 475 return 1; 476 } 477 }); 478 } 479 480 public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type, final Scan scan, 481 final Connection connection, final KeyFromRow<K> kfr) throws IOException { 482 483 Map<K, QuotaState> globalQuotas = new HashMap<>(); 484 try (Table table = connection.getTable(QUOTA_TABLE_NAME)) { 485 try (ResultScanner resultScanner = table.getScanner(scan)) { 486 for (Result result : resultScanner) { 487 488 byte[] row = result.getRow(); 489 K key = kfr.getKeyFromRow(row); 490 491 QuotaState quotaInfo = new QuotaState(); 492 globalQuotas.put(key, quotaInfo); 493 494 byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 495 if (data == null) { 496 continue; 497 } 498 499 try { 500 Quotas quotas = quotasFromData(data); 501 quotas = updateClusterQuotaToMachineQuota(quotas, kfr.getFactor(key)); 502 quotaInfo.setQuotas(quotas); 503 } catch (IOException e) { 504 LOG.error("Unable to parse {} '{}' quotas", type, key, e); 505 globalQuotas.remove(key); 506 } 507 } 508 } 509 } 510 return globalQuotas; 511 } 512 513 /** 514 * Convert cluster scope quota to machine scope quota 515 * @param quotas the original quota 516 * @param factor factor used to divide cluster limiter to machine limiter 517 * @return the converted quota whose quota limiters all in machine scope 518 */ 519 private static Quotas updateClusterQuotaToMachineQuota(Quotas quotas, double factor) { 520 Quotas.Builder newQuotas = Quotas.newBuilder(quotas); 521 if (newQuotas.hasThrottle()) { 522 Throttle.Builder throttle = Throttle.newBuilder(newQuotas.getThrottle()); 523 if (throttle.hasReqNum()) { 524 throttle.setReqNum(updateTimedQuota(throttle.getReqNum(), factor)); 525 } 526 if (throttle.hasReqSize()) { 527 throttle.setReqSize(updateTimedQuota(throttle.getReqSize(), factor)); 528 } 529 if (throttle.hasReadNum()) { 530 throttle.setReadNum(updateTimedQuota(throttle.getReadNum(), factor)); 531 } 532 if (throttle.hasReadSize()) { 533 throttle.setReadSize(updateTimedQuota(throttle.getReadSize(), factor)); 534 } 535 if (throttle.hasWriteNum()) { 536 throttle.setWriteNum(updateTimedQuota(throttle.getWriteNum(), factor)); 537 } 538 if (throttle.hasWriteSize()) { 539 throttle.setWriteSize(updateTimedQuota(throttle.getWriteSize(), factor)); 540 } 541 if (throttle.hasReqCapacityUnit()) { 542 throttle.setReqCapacityUnit(updateTimedQuota(throttle.getReqCapacityUnit(), factor)); 543 } 544 if (throttle.hasReadCapacityUnit()) { 545 throttle.setReadCapacityUnit(updateTimedQuota(throttle.getReadCapacityUnit(), factor)); 546 } 547 if (throttle.hasWriteCapacityUnit()) { 548 throttle.setWriteCapacityUnit(updateTimedQuota(throttle.getWriteCapacityUnit(), factor)); 549 } 550 newQuotas.setThrottle(throttle.build()); 551 } 552 return newQuotas.build(); 553 } 554 555 private static TimedQuota updateTimedQuota(TimedQuota timedQuota, double factor) { 556 if (timedQuota.getScope() == QuotaScope.CLUSTER) { 557 TimedQuota.Builder newTimedQuota = TimedQuota.newBuilder(timedQuota); 558 newTimedQuota.setSoftLimit(Math.max(1, (long) (timedQuota.getSoftLimit() * factor))) 559 .setScope(QuotaScope.MACHINE); 560 return newTimedQuota.build(); 561 } else { 562 return timedQuota; 563 } 564 } 565 566 private static interface KeyFromRow<T> { 567 T getKeyFromRow(final byte[] row); 568 569 double getFactor(T t); 570 } 571 572 /* 573 * ========================================================================= HTable helpers 574 */ 575 private static void doPut(final Connection connection, final Put put) throws IOException { 576 try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 577 table.put(put); 578 } 579 } 580 581 private static void doDelete(final Connection connection, final Delete delete) 582 throws IOException { 583 try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 584 table.delete(delete); 585 } 586 } 587 588 /* 589 * ========================================================================= Data Size Helpers 590 */ 591 public static long calculateMutationSize(final Mutation mutation) { 592 long size = 0; 593 for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) { 594 for (Cell cell : entry.getValue()) { 595 size += cell.getSerializedSize(); 596 } 597 } 598 return size; 599 } 600 601 public static long calculateResultSize(final Result result) { 602 long size = 0; 603 for (Cell cell : result.rawCells()) { 604 size += cell.getSerializedSize(); 605 } 606 return size; 607 } 608 609 public static long calculateResultSize(final List<Result> results) { 610 long size = 0; 611 for (Result result : results) { 612 for (Cell cell : result.rawCells()) { 613 size += cell.getSerializedSize(); 614 } 615 } 616 return size; 617 } 618 619 public static long calculateCellsSize(final List<Cell> cells) { 620 long size = 0; 621 for (Cell cell : cells) { 622 size += cell.getSerializedSize(); 623 } 624 return size; 625 } 626 627 /** 628 * Method to enable a table, if not already enabled. This method suppresses 629 * {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling 630 * the table. 631 * @param conn connection to re-use 632 * @param tableName name of the table to be enabled 633 */ 634 public static void enableTableIfNotEnabled(Connection conn, TableName tableName) 635 throws IOException { 636 try { 637 conn.getAdmin().enableTable(tableName); 638 } catch (TableNotDisabledException | TableNotFoundException e) { 639 // ignore 640 } 641 } 642 643 /** 644 * Method to disable a table, if not already disabled. This method suppresses 645 * {@link TableNotEnabledException}, if thrown while disabling the table. 646 * @param conn connection to re-use 647 * @param tableName table name which has moved into space quota violation 648 */ 649 public static void disableTableIfNotDisabled(Connection conn, TableName tableName) 650 throws IOException { 651 try { 652 conn.getAdmin().disableTable(tableName); 653 } catch (TableNotEnabledException | TableNotFoundException e) { 654 // ignore 655 } 656 } 657}