001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.quotas; 020 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.DoNotRetryIOException; 030import org.apache.hadoop.hbase.HColumnDescriptor; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.HTableDescriptor; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.TableNotDisabledException; 035import org.apache.hadoop.hbase.TableNotEnabledException; 036import org.apache.hadoop.hbase.TableNotFoundException; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.Delete; 039import org.apache.hadoop.hbase.client.Get; 040import org.apache.hadoop.hbase.client.Mutation; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.regionserver.BloomType; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.hbase.util.Pair; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.apache.yetus.audience.InterfaceStability; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota; 058 059/** 060 * Helper class to interact with the quota table 061 */ 062@InterfaceAudience.Private 063@InterfaceStability.Evolving 064public class QuotaUtil extends QuotaTableUtil { 065 private static final Logger LOG = LoggerFactory.getLogger(QuotaUtil.class); 066 067 public static final String QUOTA_CONF_KEY = "hbase.quota.enabled"; 068 private static final boolean QUOTA_ENABLED_DEFAULT = false; 069 070 public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit"; 071 // the default one read capacity unit is 1024 bytes (1KB) 072 public static final long DEFAULT_READ_CAPACITY_UNIT = 1024; 073 public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit"; 074 // the default one write capacity unit is 1024 bytes (1KB) 075 public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024; 076 077 /** Table descriptor for Quota internal table */ 078 public static final HTableDescriptor QUOTA_TABLE_DESC = 079 new HTableDescriptor(QUOTA_TABLE_NAME); 080 static { 081 QUOTA_TABLE_DESC.addFamily( 082 new HColumnDescriptor(QUOTA_FAMILY_INFO) 083 .setScope(HConstants.REPLICATION_SCOPE_LOCAL) 084 .setBloomFilterType(BloomType.ROW) 085 .setMaxVersions(1) 086 ); 087 QUOTA_TABLE_DESC.addFamily( 088 new HColumnDescriptor(QUOTA_FAMILY_USAGE) 089 .setScope(HConstants.REPLICATION_SCOPE_LOCAL) 090 .setBloomFilterType(BloomType.ROW) 091 .setMaxVersions(1) 092 ); 093 } 094 095 /** Returns true if the support for quota is enabled */ 096 public static boolean isQuotaEnabled(final Configuration conf) { 097 return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT); 098 } 099 100 /* ========================================================================= 101 * Quota "settings" helpers 102 */ 103 public static void addTableQuota(final Connection connection, final TableName table, 104 final Quotas data) throws IOException { 105 addQuotas(connection, getTableRowKey(table), data); 106 } 107 108 public static void deleteTableQuota(final Connection connection, final TableName table) 109 throws IOException { 110 deleteQuotas(connection, getTableRowKey(table)); 111 } 112 113 public static void addNamespaceQuota(final Connection connection, final String namespace, 114 final Quotas data) throws IOException { 115 addQuotas(connection, getNamespaceRowKey(namespace), data); 116 } 117 118 public static void deleteNamespaceQuota(final Connection connection, final String namespace) 119 throws IOException { 120 deleteQuotas(connection, getNamespaceRowKey(namespace)); 121 } 122 123 public static void addUserQuota(final Connection connection, final String user, 124 final Quotas data) throws IOException { 125 addQuotas(connection, getUserRowKey(user), data); 126 } 127 128 public static void addUserQuota(final Connection connection, final String user, 129 final TableName table, final Quotas data) throws IOException { 130 addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table), data); 131 } 132 133 public static void addUserQuota(final Connection connection, final String user, 134 final String namespace, final Quotas data) throws IOException { 135 addQuotas(connection, getUserRowKey(user), 136 getSettingsQualifierForUserNamespace(namespace), data); 137 } 138 139 public static void deleteUserQuota(final Connection connection, final String user) 140 throws IOException { 141 deleteQuotas(connection, getUserRowKey(user)); 142 } 143 144 public static void deleteUserQuota(final Connection connection, final String user, 145 final TableName table) throws IOException { 146 deleteQuotas(connection, getUserRowKey(user), 147 getSettingsQualifierForUserTable(table)); 148 } 149 150 public static void deleteUserQuota(final Connection connection, final String user, 151 final String namespace) throws IOException { 152 deleteQuotas(connection, getUserRowKey(user), 153 getSettingsQualifierForUserNamespace(namespace)); 154 } 155 156 public static void addRegionServerQuota(final Connection connection, final String regionServer, 157 final Quotas data) throws IOException { 158 addQuotas(connection, getRegionServerRowKey(regionServer), data); 159 } 160 161 public static void deleteRegionServerQuota(final Connection connection, final String regionServer) 162 throws IOException { 163 deleteQuotas(connection, getRegionServerRowKey(regionServer)); 164 } 165 166 protected static void switchExceedThrottleQuota(final Connection connection, 167 boolean exceedThrottleQuotaEnabled) throws IOException { 168 if (exceedThrottleQuotaEnabled) { 169 checkRSQuotaToEnableExceedThrottle( 170 getRegionServerQuota(connection, QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY)); 171 } 172 173 Put put = new Put(getExceedThrottleQuotaRowKey()); 174 put.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS, 175 Bytes.toBytes(exceedThrottleQuotaEnabled)); 176 doPut(connection, put); 177 } 178 179 private static void checkRSQuotaToEnableExceedThrottle(Quotas quotas) throws IOException { 180 if (quotas != null && quotas.hasThrottle()) { 181 Throttle throttle = quotas.getThrottle(); 182 // If enable exceed throttle quota, make sure that there are at least one read(req/read + 183 // num/size/cu) and one write(req/write + num/size/cu) region server throttle quotas. 184 boolean hasReadQuota = false; 185 boolean hasWriteQuota = false; 186 if (throttle.hasReqNum() || throttle.hasReqSize() || throttle.hasReqCapacityUnit()) { 187 hasReadQuota = true; 188 hasWriteQuota = true; 189 } 190 if (!hasReadQuota 191 && (throttle.hasReadNum() || throttle.hasReadSize() || throttle.hasReadCapacityUnit())) { 192 hasReadQuota = true; 193 } 194 if (!hasReadQuota) { 195 throw new DoNotRetryIOException( 196 "Please set at least one read region server quota before enable exceed throttle quota"); 197 } 198 if (!hasWriteQuota && (throttle.hasWriteNum() || throttle.hasWriteSize() 199 || throttle.hasWriteCapacityUnit())) { 200 hasWriteQuota = true; 201 } 202 if (!hasWriteQuota) { 203 throw new DoNotRetryIOException("Please set at least one write region server quota " 204 + "before enable exceed throttle quota"); 205 } 206 // If enable exceed throttle quota, make sure that region server throttle quotas are in 207 // seconds time unit. Because once previous requests exceed their quota and consume region 208 // server quota, quota in other time units may be refilled in a long time, this may affect 209 // later requests. 210 List<Pair<Boolean, TimedQuota>> list = 211 Arrays.asList(Pair.newPair(throttle.hasReqNum(), throttle.getReqNum()), 212 Pair.newPair(throttle.hasReadNum(), throttle.getReadNum()), 213 Pair.newPair(throttle.hasWriteNum(), throttle.getWriteNum()), 214 Pair.newPair(throttle.hasReqSize(), throttle.getReqSize()), 215 Pair.newPair(throttle.hasReadSize(), throttle.getReadSize()), 216 Pair.newPair(throttle.hasWriteSize(), throttle.getWriteSize()), 217 Pair.newPair(throttle.hasReqCapacityUnit(), throttle.getReqCapacityUnit()), 218 Pair.newPair(throttle.hasReadCapacityUnit(), throttle.getReadCapacityUnit()), 219 Pair.newPair(throttle.hasWriteCapacityUnit(), throttle.getWriteCapacityUnit())); 220 for (Pair<Boolean, TimedQuota> pair : list) { 221 if (pair.getFirst()) { 222 if (pair.getSecond().getTimeUnit() != TimeUnit.SECONDS) { 223 throw new DoNotRetryIOException("All region server quota must be " 224 + "in seconds time unit if enable exceed throttle quota"); 225 } 226 } 227 } 228 } else { 229 // If enable exceed throttle quota, make sure that region server quota is already set 230 throw new DoNotRetryIOException( 231 "Please set region server quota before enable exceed throttle quota"); 232 } 233 } 234 235 protected static boolean isExceedThrottleQuotaEnabled(final Connection connection) 236 throws IOException { 237 Get get = new Get(getExceedThrottleQuotaRowKey()); 238 get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 239 Result result = doGet(connection, get); 240 if (result.isEmpty()) { 241 return false; 242 } 243 return Bytes.toBoolean(result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS)); 244 } 245 246 private static void addQuotas(final Connection connection, final byte[] rowKey, 247 final Quotas data) throws IOException { 248 addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data); 249 } 250 251 private static void addQuotas(final Connection connection, final byte[] rowKey, 252 final byte[] qualifier, final Quotas data) throws IOException { 253 Put put = new Put(rowKey); 254 put.addColumn(QUOTA_FAMILY_INFO, qualifier, quotasToData(data)); 255 doPut(connection, put); 256 } 257 258 private static void deleteQuotas(final Connection connection, final byte[] rowKey) 259 throws IOException { 260 deleteQuotas(connection, rowKey, null); 261 } 262 263 private static void deleteQuotas(final Connection connection, final byte[] rowKey, 264 final byte[] qualifier) throws IOException { 265 Delete delete = new Delete(rowKey); 266 if (qualifier != null) { 267 delete.addColumns(QUOTA_FAMILY_INFO, qualifier); 268 } 269 if (isNamespaceRowKey(rowKey)) { 270 String ns = getNamespaceFromRowKey(rowKey); 271 Quotas namespaceQuota = getNamespaceQuota(connection,ns); 272 if (namespaceQuota != null && namespaceQuota.hasSpace()) { 273 // When deleting namespace space quota, also delete table usage(u:p) snapshots 274 deleteTableUsageSnapshotsForNamespace(connection, ns); 275 } 276 } 277 doDelete(connection, delete); 278 } 279 280 public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection, 281 final List<Get> gets, Map<TableName, Double> tableMachineQuotaFactors, double factor) 282 throws IOException { 283 long nowTs = EnvironmentEdgeManager.currentTime(); 284 Result[] results = doGet(connection, gets); 285 286 Map<String, UserQuotaState> userQuotas = new HashMap<>(results.length); 287 for (int i = 0; i < results.length; ++i) { 288 byte[] key = gets.get(i).getRow(); 289 assert isUserRowKey(key); 290 String user = getUserFromRowKey(key); 291 292 final UserQuotaState quotaInfo = new UserQuotaState(nowTs); 293 userQuotas.put(user, quotaInfo); 294 295 if (results[i].isEmpty()) continue; 296 assert Bytes.equals(key, results[i].getRow()); 297 298 try { 299 parseUserResult(user, results[i], new UserQuotasVisitor() { 300 @Override 301 public void visitUserQuotas(String userName, String namespace, Quotas quotas) { 302 quotas = updateClusterQuotaToMachineQuota(quotas, factor); 303 quotaInfo.setQuotas(namespace, quotas); 304 } 305 306 @Override 307 public void visitUserQuotas(String userName, TableName table, Quotas quotas) { 308 quotas = updateClusterQuotaToMachineQuota(quotas, 309 tableMachineQuotaFactors.containsKey(table) ? tableMachineQuotaFactors.get(table) 310 : 1); 311 quotaInfo.setQuotas(table, quotas); 312 } 313 314 @Override 315 public void visitUserQuotas(String userName, Quotas quotas) { 316 quotas = updateClusterQuotaToMachineQuota(quotas, factor); 317 quotaInfo.setQuotas(quotas); 318 } 319 }); 320 } catch (IOException e) { 321 LOG.error("Unable to parse user '" + user + "' quotas", e); 322 userQuotas.remove(user); 323 } 324 } 325 return userQuotas; 326 } 327 328 public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection, 329 final List<Get> gets, Map<TableName, Double> tableMachineFactors) throws IOException { 330 return fetchGlobalQuotas("table", connection, gets, new KeyFromRow<TableName>() { 331 @Override 332 public TableName getKeyFromRow(final byte[] row) { 333 assert isTableRowKey(row); 334 return getTableFromRowKey(row); 335 } 336 337 @Override 338 public double getFactor(TableName tableName) { 339 return tableMachineFactors.containsKey(tableName) ? tableMachineFactors.get(tableName) : 1; 340 } 341 }); 342 } 343 344 public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection, 345 final List<Get> gets, double factor) throws IOException { 346 return fetchGlobalQuotas("namespace", connection, gets, new KeyFromRow<String>() { 347 @Override 348 public String getKeyFromRow(final byte[] row) { 349 assert isNamespaceRowKey(row); 350 return getNamespaceFromRowKey(row); 351 } 352 353 @Override 354 public double getFactor(String s) { 355 return factor; 356 } 357 }); 358 } 359 360 public static Map<String, QuotaState> fetchRegionServerQuotas(final Connection connection, 361 final List<Get> gets) throws IOException { 362 return fetchGlobalQuotas("regionServer", connection, gets, new KeyFromRow<String>() { 363 @Override 364 public String getKeyFromRow(final byte[] row) { 365 assert isRegionServerRowKey(row); 366 return getRegionServerFromRowKey(row); 367 } 368 369 @Override 370 public double getFactor(String s) { 371 return 1; 372 } 373 }); 374 } 375 376 public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type, 377 final Connection connection, final List<Get> gets, final KeyFromRow<K> kfr) 378 throws IOException { 379 long nowTs = EnvironmentEdgeManager.currentTime(); 380 Result[] results = doGet(connection, gets); 381 382 Map<K, QuotaState> globalQuotas = new HashMap<>(results.length); 383 for (int i = 0; i < results.length; ++i) { 384 byte[] row = gets.get(i).getRow(); 385 K key = kfr.getKeyFromRow(row); 386 387 QuotaState quotaInfo = new QuotaState(nowTs); 388 globalQuotas.put(key, quotaInfo); 389 390 if (results[i].isEmpty()) continue; 391 assert Bytes.equals(row, results[i].getRow()); 392 393 byte[] data = results[i].getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 394 if (data == null) continue; 395 396 try { 397 Quotas quotas = quotasFromData(data); 398 quotas = updateClusterQuotaToMachineQuota(quotas, 399 kfr.getFactor(key)); 400 quotaInfo.setQuotas(quotas); 401 } catch (IOException e) { 402 LOG.error("Unable to parse " + type + " '" + key + "' quotas", e); 403 globalQuotas.remove(key); 404 } 405 } 406 return globalQuotas; 407 } 408 409 /** 410 * Convert cluster scope quota to machine scope quota 411 * @param quotas the original quota 412 * @param factor factor used to divide cluster limiter to machine limiter 413 * @return the converted quota whose quota limiters all in machine scope 414 */ 415 private static Quotas updateClusterQuotaToMachineQuota(Quotas quotas, double factor) { 416 Quotas.Builder newQuotas = Quotas.newBuilder(quotas); 417 if (newQuotas.hasThrottle()) { 418 Throttle.Builder throttle = Throttle.newBuilder(newQuotas.getThrottle()); 419 if (throttle.hasReqNum()) { 420 throttle.setReqNum(updateTimedQuota(throttle.getReqNum(), factor)); 421 } 422 if (throttle.hasReqSize()) { 423 throttle.setReqSize(updateTimedQuota(throttle.getReqSize(), factor)); 424 } 425 if (throttle.hasReadNum()) { 426 throttle.setReadNum(updateTimedQuota(throttle.getReadNum(), factor)); 427 } 428 if (throttle.hasReadSize()) { 429 throttle.setReadSize(updateTimedQuota(throttle.getReadSize(), factor)); 430 } 431 if (throttle.hasWriteNum()) { 432 throttle.setWriteNum(updateTimedQuota(throttle.getWriteNum(), factor)); 433 } 434 if (throttle.hasWriteSize()) { 435 throttle.setWriteSize(updateTimedQuota(throttle.getWriteSize(), factor)); 436 } 437 if (throttle.hasReqCapacityUnit()) { 438 throttle.setReqCapacityUnit(updateTimedQuota(throttle.getReqCapacityUnit(), factor)); 439 } 440 if (throttle.hasReadCapacityUnit()) { 441 throttle.setReadCapacityUnit(updateTimedQuota(throttle.getReadCapacityUnit(), factor)); 442 } 443 if (throttle.hasWriteCapacityUnit()) { 444 throttle.setWriteCapacityUnit(updateTimedQuota(throttle.getWriteCapacityUnit(), factor)); 445 } 446 newQuotas.setThrottle(throttle.build()); 447 } 448 return newQuotas.build(); 449 } 450 451 private static TimedQuota updateTimedQuota(TimedQuota timedQuota, double factor) { 452 if (timedQuota.getScope() == QuotaScope.CLUSTER) { 453 TimedQuota.Builder newTimedQuota = TimedQuota.newBuilder(timedQuota); 454 newTimedQuota.setSoftLimit(Math.max(1, (long) (timedQuota.getSoftLimit() * factor))) 455 .setScope(QuotaScope.MACHINE); 456 return newTimedQuota.build(); 457 } else { 458 return timedQuota; 459 } 460 } 461 462 private static interface KeyFromRow<T> { 463 T getKeyFromRow(final byte[] row); 464 double getFactor(T t); 465 } 466 467 /* ========================================================================= 468 * HTable helpers 469 */ 470 private static void doPut(final Connection connection, final Put put) 471 throws IOException { 472 try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 473 table.put(put); 474 } 475 } 476 477 private static void doDelete(final Connection connection, final Delete delete) 478 throws IOException { 479 try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 480 table.delete(delete); 481 } 482 } 483 484 /* ========================================================================= 485 * Data Size Helpers 486 */ 487 public static long calculateMutationSize(final Mutation mutation) { 488 long size = 0; 489 for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) { 490 for (Cell cell : entry.getValue()) { 491 size += cell.getSerializedSize(); 492 } 493 } 494 return size; 495 } 496 497 public static long calculateResultSize(final Result result) { 498 long size = 0; 499 for (Cell cell : result.rawCells()) { 500 size += cell.getSerializedSize(); 501 } 502 return size; 503 } 504 505 public static long calculateResultSize(final List<Result> results) { 506 long size = 0; 507 for (Result result: results) { 508 for (Cell cell : result.rawCells()) { 509 size += cell.getSerializedSize(); 510 } 511 } 512 return size; 513 } 514 515 /** 516 * Method to enable a table, if not already enabled. This method suppresses 517 * {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling 518 * the table. 519 * @param conn connection to re-use 520 * @param tableName name of the table to be enabled 521 */ 522 public static void enableTableIfNotEnabled(Connection conn, TableName tableName) 523 throws IOException { 524 try { 525 conn.getAdmin().enableTable(tableName); 526 } catch (TableNotDisabledException | TableNotFoundException e) { 527 // ignore 528 } 529 } 530 531 /** 532 * Method to disable a table, if not already disabled. This method suppresses 533 * {@link TableNotEnabledException}, if thrown while disabling the table. 534 * @param conn connection to re-use 535 * @param tableName table name which has moved into space quota violation 536 */ 537 public static void disableTableIfNotDisabled(Connection conn, TableName tableName) 538 throws IOException { 539 try { 540 conn.getAdmin().disableTable(tableName); 541 } catch (TableNotEnabledException | TableNotFoundException e) { 542 // ignore 543 } 544 } 545}