001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.quotas; 020 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.DoNotRetryIOException; 030import org.apache.hadoop.hbase.HColumnDescriptor; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.HTableDescriptor; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.TableNotDisabledException; 035import org.apache.hadoop.hbase.TableNotEnabledException; 036import org.apache.hadoop.hbase.TableNotFoundException; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.Delete; 039import org.apache.hadoop.hbase.client.Get; 040import org.apache.hadoop.hbase.client.Mutation; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.regionserver.BloomType; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.hbase.util.Pair; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.apache.yetus.audience.InterfaceStability; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota; 058 059/** 060 * Helper class to interact with the quota table 061 */ 062@InterfaceAudience.Private 063@InterfaceStability.Evolving 064public class QuotaUtil extends QuotaTableUtil { 065 private static final Logger LOG = LoggerFactory.getLogger(QuotaUtil.class); 066 067 public static final String QUOTA_CONF_KEY = "hbase.quota.enabled"; 068 private static final boolean QUOTA_ENABLED_DEFAULT = false; 069 070 public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit"; 071 // the default one read capacity unit is 1024 bytes (1KB) 072 public static final long DEFAULT_READ_CAPACITY_UNIT = 1024; 073 public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit"; 074 // the default one write capacity unit is 1024 bytes (1KB) 075 public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024; 076 077 /** Table descriptor for Quota internal table */ 078 public static final HTableDescriptor QUOTA_TABLE_DESC = 079 new HTableDescriptor(QUOTA_TABLE_NAME); 080 static { 081 QUOTA_TABLE_DESC.addFamily( 082 new HColumnDescriptor(QUOTA_FAMILY_INFO) 083 .setScope(HConstants.REPLICATION_SCOPE_LOCAL) 084 .setBloomFilterType(BloomType.ROW) 085 .setMaxVersions(1) 086 ); 087 QUOTA_TABLE_DESC.addFamily( 088 new HColumnDescriptor(QUOTA_FAMILY_USAGE) 089 .setScope(HConstants.REPLICATION_SCOPE_LOCAL) 090 .setBloomFilterType(BloomType.ROW) 091 .setMaxVersions(1) 092 ); 093 } 094 095 /** Returns true if the support for quota is enabled */ 096 public static boolean isQuotaEnabled(final Configuration conf) { 097 return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT); 098 } 099 100 /* ========================================================================= 101 * Quota "settings" helpers 102 */ 103 public static void addTableQuota(final Connection connection, final TableName table, 104 final Quotas data) throws IOException { 105 addQuotas(connection, getTableRowKey(table), data); 106 } 107 108 public static void deleteTableQuota(final Connection connection, final TableName table) 109 throws IOException { 110 deleteQuotas(connection, getTableRowKey(table)); 111 } 112 113 public static void addNamespaceQuota(final Connection connection, final String namespace, 114 final Quotas data) throws IOException { 115 addQuotas(connection, getNamespaceRowKey(namespace), data); 116 } 117 118 public static void deleteNamespaceQuota(final Connection connection, final String namespace) 119 throws IOException { 120 deleteQuotas(connection, getNamespaceRowKey(namespace)); 121 } 122 123 public static void addUserQuota(final Connection connection, final String user, 124 final Quotas data) throws IOException { 125 addQuotas(connection, getUserRowKey(user), data); 126 } 127 128 public static void addUserQuota(final Connection connection, final String user, 129 final TableName table, final Quotas data) throws IOException { 130 addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table), data); 131 } 132 133 public static void addUserQuota(final Connection connection, final String user, 134 final String namespace, final Quotas data) throws IOException { 135 addQuotas(connection, getUserRowKey(user), 136 getSettingsQualifierForUserNamespace(namespace), data); 137 } 138 139 public static void deleteUserQuota(final Connection connection, final String user) 140 throws IOException { 141 deleteQuotas(connection, getUserRowKey(user)); 142 } 143 144 public static void deleteUserQuota(final Connection connection, final String user, 145 final TableName table) throws IOException { 146 deleteQuotas(connection, getUserRowKey(user), 147 getSettingsQualifierForUserTable(table)); 148 } 149 150 public static void deleteUserQuota(final Connection connection, final String user, 151 final String namespace) throws IOException { 152 deleteQuotas(connection, getUserRowKey(user), 153 getSettingsQualifierForUserNamespace(namespace)); 154 } 155 156 public static void addRegionServerQuota(final Connection connection, final String regionServer, 157 final Quotas data) throws IOException { 158 addQuotas(connection, getRegionServerRowKey(regionServer), data); 159 } 160 161 public static void deleteRegionServerQuota(final Connection connection, final String regionServer) 162 throws IOException { 163 deleteQuotas(connection, getRegionServerRowKey(regionServer)); 164 } 165 166 protected static void switchExceedThrottleQuota(final Connection connection, 167 boolean exceedThrottleQuotaEnabled) throws IOException { 168 if (exceedThrottleQuotaEnabled) { 169 checkRSQuotaToEnableExceedThrottle( 170 getRegionServerQuota(connection, QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY)); 171 } 172 173 Put put = new Put(getExceedThrottleQuotaRowKey()); 174 put.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS, 175 Bytes.toBytes(exceedThrottleQuotaEnabled)); 176 doPut(connection, put); 177 } 178 179 private static void checkRSQuotaToEnableExceedThrottle(Quotas quotas) throws IOException { 180 if (quotas != null && quotas.hasThrottle()) { 181 Throttle throttle = quotas.getThrottle(); 182 // If enable exceed throttle quota, make sure that there are at least one read(req/read + 183 // num/size/cu) and one write(req/write + num/size/cu) region server throttle quotas. 184 boolean hasReadQuota = false; 185 boolean hasWriteQuota = false; 186 if (throttle.hasReqNum() || throttle.hasReqSize() || throttle.hasReqCapacityUnit()) { 187 hasReadQuota = true; 188 hasWriteQuota = true; 189 } 190 if (!hasReadQuota 191 && (throttle.hasReadNum() || throttle.hasReadSize() || throttle.hasReadCapacityUnit())) { 192 hasReadQuota = true; 193 } 194 if (!hasReadQuota) { 195 throw new DoNotRetryIOException( 196 "Please set at least one read region server quota before enable exceed throttle quota"); 197 } 198 if (!hasWriteQuota && (throttle.hasWriteNum() || throttle.hasWriteSize() 199 || throttle.hasWriteCapacityUnit())) { 200 hasWriteQuota = true; 201 } 202 if (!hasWriteQuota) { 203 throw new DoNotRetryIOException("Please set at least one write region server quota " 204 + "before enable exceed throttle quota"); 205 } 206 // If enable exceed throttle quota, make sure that region server throttle quotas are in 207 // seconds time unit. Because once previous requests exceed their quota and consume region 208 // server quota, quota in other time units may be refilled in a long time, this may affect 209 // later requests. 210 List<Pair<Boolean, TimedQuota>> list = 211 Arrays.asList(Pair.newPair(throttle.hasReqNum(), throttle.getReqNum()), 212 Pair.newPair(throttle.hasReadNum(), throttle.getReadNum()), 213 Pair.newPair(throttle.hasWriteNum(), throttle.getWriteNum()), 214 Pair.newPair(throttle.hasReqSize(), throttle.getReqSize()), 215 Pair.newPair(throttle.hasReadSize(), throttle.getReadSize()), 216 Pair.newPair(throttle.hasWriteSize(), throttle.getWriteSize()), 217 Pair.newPair(throttle.hasReqCapacityUnit(), throttle.getReqCapacityUnit()), 218 Pair.newPair(throttle.hasReadCapacityUnit(), throttle.getReadCapacityUnit()), 219 Pair.newPair(throttle.hasWriteCapacityUnit(), throttle.getWriteCapacityUnit())); 220 for (Pair<Boolean, TimedQuota> pair : list) { 221 if (pair.getFirst()) { 222 if (pair.getSecond().getTimeUnit() != TimeUnit.SECONDS) { 223 throw new DoNotRetryIOException("All region server quota must be " 224 + "in seconds time unit if enable exceed throttle quota"); 225 } 226 } 227 } 228 } else { 229 // If enable exceed throttle quota, make sure that region server quota is already set 230 throw new DoNotRetryIOException( 231 "Please set region server quota before enable exceed throttle quota"); 232 } 233 } 234 235 protected static boolean isExceedThrottleQuotaEnabled(final Connection connection) 236 throws IOException { 237 Get get = new Get(getExceedThrottleQuotaRowKey()); 238 get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 239 Result result = doGet(connection, get); 240 if (result.isEmpty()) { 241 return false; 242 } 243 return Bytes.toBoolean(result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS)); 244 } 245 246 private static void addQuotas(final Connection connection, final byte[] rowKey, 247 final Quotas data) throws IOException { 248 addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data); 249 } 250 251 private static void addQuotas(final Connection connection, final byte[] rowKey, 252 final byte[] qualifier, final Quotas data) throws IOException { 253 Put put = new Put(rowKey); 254 put.addColumn(QUOTA_FAMILY_INFO, qualifier, quotasToData(data)); 255 doPut(connection, put); 256 } 257 258 private static void deleteQuotas(final Connection connection, final byte[] rowKey) 259 throws IOException { 260 deleteQuotas(connection, rowKey, null); 261 } 262 263 private static void deleteQuotas(final Connection connection, final byte[] rowKey, 264 final byte[] qualifier) throws IOException { 265 Delete delete = new Delete(rowKey); 266 if (qualifier != null) { 267 delete.addColumns(QUOTA_FAMILY_INFO, qualifier); 268 } 269 doDelete(connection, delete); 270 } 271 272 public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection, 273 final List<Get> gets, Map<TableName, Double> tableMachineQuotaFactors, double factor) 274 throws IOException { 275 long nowTs = EnvironmentEdgeManager.currentTime(); 276 Result[] results = doGet(connection, gets); 277 278 Map<String, UserQuotaState> userQuotas = new HashMap<>(results.length); 279 for (int i = 0; i < results.length; ++i) { 280 byte[] key = gets.get(i).getRow(); 281 assert isUserRowKey(key); 282 String user = getUserFromRowKey(key); 283 284 final UserQuotaState quotaInfo = new UserQuotaState(nowTs); 285 userQuotas.put(user, quotaInfo); 286 287 if (results[i].isEmpty()) continue; 288 assert Bytes.equals(key, results[i].getRow()); 289 290 try { 291 parseUserResult(user, results[i], new UserQuotasVisitor() { 292 @Override 293 public void visitUserQuotas(String userName, String namespace, Quotas quotas) { 294 quotas = updateClusterQuotaToMachineQuota(quotas, factor); 295 quotaInfo.setQuotas(namespace, quotas); 296 } 297 298 @Override 299 public void visitUserQuotas(String userName, TableName table, Quotas quotas) { 300 quotas = updateClusterQuotaToMachineQuota(quotas, 301 tableMachineQuotaFactors.containsKey(table) ? tableMachineQuotaFactors.get(table) 302 : 1); 303 quotaInfo.setQuotas(table, quotas); 304 } 305 306 @Override 307 public void visitUserQuotas(String userName, Quotas quotas) { 308 quotas = updateClusterQuotaToMachineQuota(quotas, factor); 309 quotaInfo.setQuotas(quotas); 310 } 311 }); 312 } catch (IOException e) { 313 LOG.error("Unable to parse user '" + user + "' quotas", e); 314 userQuotas.remove(user); 315 } 316 } 317 return userQuotas; 318 } 319 320 public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection, 321 final List<Get> gets, Map<TableName, Double> tableMachineFactors) throws IOException { 322 return fetchGlobalQuotas("table", connection, gets, new KeyFromRow<TableName>() { 323 @Override 324 public TableName getKeyFromRow(final byte[] row) { 325 assert isTableRowKey(row); 326 return getTableFromRowKey(row); 327 } 328 329 @Override 330 public double getFactor(TableName tableName) { 331 return tableMachineFactors.containsKey(tableName) ? tableMachineFactors.get(tableName) : 1; 332 } 333 }); 334 } 335 336 public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection, 337 final List<Get> gets, double factor) throws IOException { 338 return fetchGlobalQuotas("namespace", connection, gets, new KeyFromRow<String>() { 339 @Override 340 public String getKeyFromRow(final byte[] row) { 341 assert isNamespaceRowKey(row); 342 return getNamespaceFromRowKey(row); 343 } 344 345 @Override 346 public double getFactor(String s) { 347 return factor; 348 } 349 }); 350 } 351 352 public static Map<String, QuotaState> fetchRegionServerQuotas(final Connection connection, 353 final List<Get> gets) throws IOException { 354 return fetchGlobalQuotas("regionServer", connection, gets, new KeyFromRow<String>() { 355 @Override 356 public String getKeyFromRow(final byte[] row) { 357 assert isRegionServerRowKey(row); 358 return getRegionServerFromRowKey(row); 359 } 360 361 @Override 362 public double getFactor(String s) { 363 return 1; 364 } 365 }); 366 } 367 368 public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type, 369 final Connection connection, final List<Get> gets, final KeyFromRow<K> kfr) 370 throws IOException { 371 long nowTs = EnvironmentEdgeManager.currentTime(); 372 Result[] results = doGet(connection, gets); 373 374 Map<K, QuotaState> globalQuotas = new HashMap<>(results.length); 375 for (int i = 0; i < results.length; ++i) { 376 byte[] row = gets.get(i).getRow(); 377 K key = kfr.getKeyFromRow(row); 378 379 QuotaState quotaInfo = new QuotaState(nowTs); 380 globalQuotas.put(key, quotaInfo); 381 382 if (results[i].isEmpty()) continue; 383 assert Bytes.equals(row, results[i].getRow()); 384 385 byte[] data = results[i].getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 386 if (data == null) continue; 387 388 try { 389 Quotas quotas = quotasFromData(data); 390 quotas = updateClusterQuotaToMachineQuota(quotas, 391 kfr.getFactor(key)); 392 quotaInfo.setQuotas(quotas); 393 } catch (IOException e) { 394 LOG.error("Unable to parse " + type + " '" + key + "' quotas", e); 395 globalQuotas.remove(key); 396 } 397 } 398 return globalQuotas; 399 } 400 401 /** 402 * Convert cluster scope quota to machine scope quota 403 * @param quotas the original quota 404 * @param factor factor used to divide cluster limiter to machine limiter 405 * @return the converted quota whose quota limiters all in machine scope 406 */ 407 private static Quotas updateClusterQuotaToMachineQuota(Quotas quotas, double factor) { 408 Quotas.Builder newQuotas = Quotas.newBuilder(quotas); 409 if (newQuotas.hasThrottle()) { 410 Throttle.Builder throttle = Throttle.newBuilder(newQuotas.getThrottle()); 411 if (throttle.hasReqNum()) { 412 throttle.setReqNum(updateTimedQuota(throttle.getReqNum(), factor)); 413 } 414 if (throttle.hasReqSize()) { 415 throttle.setReqSize(updateTimedQuota(throttle.getReqSize(), factor)); 416 } 417 if (throttle.hasReadNum()) { 418 throttle.setReadNum(updateTimedQuota(throttle.getReadNum(), factor)); 419 } 420 if (throttle.hasReadSize()) { 421 throttle.setReadSize(updateTimedQuota(throttle.getReadSize(), factor)); 422 } 423 if (throttle.hasWriteNum()) { 424 throttle.setWriteNum(updateTimedQuota(throttle.getWriteNum(), factor)); 425 } 426 if (throttle.hasWriteSize()) { 427 throttle.setWriteSize(updateTimedQuota(throttle.getWriteSize(), factor)); 428 } 429 if (throttle.hasReqCapacityUnit()) { 430 throttle.setReqCapacityUnit(updateTimedQuota(throttle.getReqCapacityUnit(), factor)); 431 } 432 if (throttle.hasReadCapacityUnit()) { 433 throttle.setReadCapacityUnit(updateTimedQuota(throttle.getReadCapacityUnit(), factor)); 434 } 435 if (throttle.hasWriteCapacityUnit()) { 436 throttle.setWriteCapacityUnit(updateTimedQuota(throttle.getWriteCapacityUnit(), factor)); 437 } 438 newQuotas.setThrottle(throttle.build()); 439 } 440 return newQuotas.build(); 441 } 442 443 private static TimedQuota updateTimedQuota(TimedQuota timedQuota, double factor) { 444 if (timedQuota.getScope() == QuotaScope.CLUSTER) { 445 TimedQuota.Builder newTimedQuota = TimedQuota.newBuilder(timedQuota); 446 newTimedQuota.setSoftLimit(Math.max(1, (long) (timedQuota.getSoftLimit() * factor))) 447 .setScope(QuotaScope.MACHINE); 448 return newTimedQuota.build(); 449 } else { 450 return timedQuota; 451 } 452 } 453 454 private static interface KeyFromRow<T> { 455 T getKeyFromRow(final byte[] row); 456 double getFactor(T t); 457 } 458 459 /* ========================================================================= 460 * HTable helpers 461 */ 462 private static void doPut(final Connection connection, final Put put) 463 throws IOException { 464 try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 465 table.put(put); 466 } 467 } 468 469 private static void doDelete(final Connection connection, final Delete delete) 470 throws IOException { 471 try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 472 table.delete(delete); 473 } 474 } 475 476 /* ========================================================================= 477 * Data Size Helpers 478 */ 479 public static long calculateMutationSize(final Mutation mutation) { 480 long size = 0; 481 for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) { 482 for (Cell cell : entry.getValue()) { 483 size += cell.getSerializedSize(); 484 } 485 } 486 return size; 487 } 488 489 public static long calculateResultSize(final Result result) { 490 long size = 0; 491 for (Cell cell : result.rawCells()) { 492 size += cell.getSerializedSize(); 493 } 494 return size; 495 } 496 497 public static long calculateResultSize(final List<Result> results) { 498 long size = 0; 499 for (Result result: results) { 500 for (Cell cell : result.rawCells()) { 501 size += cell.getSerializedSize(); 502 } 503 } 504 return size; 505 } 506 507 /** 508 * Method to enable a table, if not already enabled. This method suppresses 509 * {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling 510 * the table. 511 * @param conn connection to re-use 512 * @param tableName name of the table to be enabled 513 */ 514 public static void enableTableIfNotEnabled(Connection conn, TableName tableName) 515 throws IOException { 516 try { 517 conn.getAdmin().enableTable(tableName); 518 } catch (TableNotDisabledException | TableNotFoundException e) { 519 // ignore 520 } 521 } 522 523 /** 524 * Method to disable a table, if not already disabled. This method suppresses 525 * {@link TableNotEnabledException}, if thrown while disabling the table. 526 * @param conn connection to re-use 527 * @param tableName table name which has moved into space quota violation 528 */ 529 public static void disableTableIfNotDisabled(Connection conn, TableName tableName) 530 throws IOException { 531 try { 532 conn.getAdmin().disableTable(tableName); 533 } catch (TableNotEnabledException | TableNotFoundException e) { 534 // ignore 535 } 536 } 537}