001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.DoNotRetryIOException; 028import org.apache.hadoop.hbase.HColumnDescriptor; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.HTableDescriptor; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.TableNotDisabledException; 033import org.apache.hadoop.hbase.TableNotEnabledException; 034import org.apache.hadoop.hbase.TableNotFoundException; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.Delete; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.Mutation; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.regionserver.BloomType; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.apache.hadoop.hbase.util.Pair; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.apache.yetus.audience.InterfaceStability; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota; 056 057/** 058 * Helper class to interact with the quota table 059 */ 060@InterfaceAudience.Private 061@InterfaceStability.Evolving 062public class QuotaUtil extends QuotaTableUtil { 063 private static final Logger LOG = LoggerFactory.getLogger(QuotaUtil.class); 064 065 public static final String QUOTA_CONF_KEY = "hbase.quota.enabled"; 066 private static final boolean QUOTA_ENABLED_DEFAULT = false; 067 068 public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit"; 069 // the default one read capacity unit is 1024 bytes (1KB) 070 public static final long DEFAULT_READ_CAPACITY_UNIT = 1024; 071 public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit"; 072 // the default one write capacity unit is 1024 bytes (1KB) 073 public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024; 074 075 /** Table descriptor for Quota internal table */ 076 public static final HTableDescriptor QUOTA_TABLE_DESC = new HTableDescriptor(QUOTA_TABLE_NAME); 077 static { 078 QUOTA_TABLE_DESC.addFamily( 079 new HColumnDescriptor(QUOTA_FAMILY_INFO).setScope(HConstants.REPLICATION_SCOPE_LOCAL) 080 .setBloomFilterType(BloomType.ROW).setMaxVersions(1)); 081 QUOTA_TABLE_DESC.addFamily( 082 new HColumnDescriptor(QUOTA_FAMILY_USAGE).setScope(HConstants.REPLICATION_SCOPE_LOCAL) 083 .setBloomFilterType(BloomType.ROW).setMaxVersions(1)); 084 } 085 086 /** Returns true if the support for quota is enabled */ 087 public static boolean isQuotaEnabled(final Configuration conf) { 088 return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT); 089 } 090 091 /* 092 * ========================================================================= Quota "settings" 093 * helpers 094 */ 095 public static void addTableQuota(final Connection connection, final TableName table, 096 final Quotas data) throws IOException { 097 addQuotas(connection, getTableRowKey(table), data); 098 } 099 100 public static void deleteTableQuota(final Connection connection, final TableName table) 101 throws IOException { 102 deleteQuotas(connection, getTableRowKey(table)); 103 } 104 105 public static void addNamespaceQuota(final Connection connection, final String namespace, 106 final Quotas data) throws IOException { 107 addQuotas(connection, getNamespaceRowKey(namespace), data); 108 } 109 110 public static void deleteNamespaceQuota(final Connection connection, final String namespace) 111 throws IOException { 112 deleteQuotas(connection, getNamespaceRowKey(namespace)); 113 } 114 115 public static void addUserQuota(final Connection connection, final String user, final Quotas data) 116 throws IOException { 117 addQuotas(connection, getUserRowKey(user), data); 118 } 119 120 public static void addUserQuota(final Connection connection, final String user, 121 final TableName table, final Quotas data) throws IOException { 122 addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table), data); 123 } 124 125 public static void addUserQuota(final Connection connection, final String user, 126 final String namespace, final Quotas data) throws IOException { 127 addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace), 128 data); 129 } 130 131 public static void deleteUserQuota(final Connection connection, final String user) 132 throws IOException { 133 deleteQuotas(connection, getUserRowKey(user)); 134 } 135 136 public static void deleteUserQuota(final Connection connection, final String user, 137 final TableName table) throws IOException { 138 deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table)); 139 } 140 141 public static void deleteUserQuota(final Connection connection, final String user, 142 final String namespace) throws IOException { 143 deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace)); 144 } 145 146 public static void addRegionServerQuota(final Connection connection, final String regionServer, 147 final Quotas data) throws IOException { 148 addQuotas(connection, getRegionServerRowKey(regionServer), data); 149 } 150 151 public static void deleteRegionServerQuota(final Connection connection, final String regionServer) 152 throws IOException { 153 deleteQuotas(connection, getRegionServerRowKey(regionServer)); 154 } 155 156 protected static void switchExceedThrottleQuota(final Connection connection, 157 boolean exceedThrottleQuotaEnabled) throws IOException { 158 if (exceedThrottleQuotaEnabled) { 159 checkRSQuotaToEnableExceedThrottle( 160 getRegionServerQuota(connection, QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY)); 161 } 162 163 Put put = new Put(getExceedThrottleQuotaRowKey()); 164 put.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS, 165 Bytes.toBytes(exceedThrottleQuotaEnabled)); 166 doPut(connection, put); 167 } 168 169 private static void checkRSQuotaToEnableExceedThrottle(Quotas quotas) throws IOException { 170 if (quotas != null && quotas.hasThrottle()) { 171 Throttle throttle = quotas.getThrottle(); 172 // If enable exceed throttle quota, make sure that there are at least one read(req/read + 173 // num/size/cu) and one write(req/write + num/size/cu) region server throttle quotas. 174 boolean hasReadQuota = false; 175 boolean hasWriteQuota = false; 176 if (throttle.hasReqNum() || throttle.hasReqSize() || throttle.hasReqCapacityUnit()) { 177 hasReadQuota = true; 178 hasWriteQuota = true; 179 } 180 if ( 181 !hasReadQuota 182 && (throttle.hasReadNum() || throttle.hasReadSize() || throttle.hasReadCapacityUnit()) 183 ) { 184 hasReadQuota = true; 185 } 186 if (!hasReadQuota) { 187 throw new DoNotRetryIOException( 188 "Please set at least one read region server quota before enable exceed throttle quota"); 189 } 190 if ( 191 !hasWriteQuota 192 && (throttle.hasWriteNum() || throttle.hasWriteSize() || throttle.hasWriteCapacityUnit()) 193 ) { 194 hasWriteQuota = true; 195 } 196 if (!hasWriteQuota) { 197 throw new DoNotRetryIOException("Please set at least one write region server quota " 198 + "before enable exceed throttle quota"); 199 } 200 // If enable exceed throttle quota, make sure that region server throttle quotas are in 201 // seconds time unit. Because once previous requests exceed their quota and consume region 202 // server quota, quota in other time units may be refilled in a long time, this may affect 203 // later requests. 204 List<Pair<Boolean, TimedQuota>> list = 205 Arrays.asList(Pair.newPair(throttle.hasReqNum(), throttle.getReqNum()), 206 Pair.newPair(throttle.hasReadNum(), throttle.getReadNum()), 207 Pair.newPair(throttle.hasWriteNum(), throttle.getWriteNum()), 208 Pair.newPair(throttle.hasReqSize(), throttle.getReqSize()), 209 Pair.newPair(throttle.hasReadSize(), throttle.getReadSize()), 210 Pair.newPair(throttle.hasWriteSize(), throttle.getWriteSize()), 211 Pair.newPair(throttle.hasReqCapacityUnit(), throttle.getReqCapacityUnit()), 212 Pair.newPair(throttle.hasReadCapacityUnit(), throttle.getReadCapacityUnit()), 213 Pair.newPair(throttle.hasWriteCapacityUnit(), throttle.getWriteCapacityUnit())); 214 for (Pair<Boolean, TimedQuota> pair : list) { 215 if (pair.getFirst()) { 216 if (pair.getSecond().getTimeUnit() != TimeUnit.SECONDS) { 217 throw new DoNotRetryIOException("All region server quota must be " 218 + "in seconds time unit if enable exceed throttle quota"); 219 } 220 } 221 } 222 } else { 223 // If enable exceed throttle quota, make sure that region server quota is already set 224 throw new DoNotRetryIOException( 225 "Please set region server quota before enable exceed throttle quota"); 226 } 227 } 228 229 protected static boolean isExceedThrottleQuotaEnabled(final Connection connection) 230 throws IOException { 231 Get get = new Get(getExceedThrottleQuotaRowKey()); 232 get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 233 Result result = doGet(connection, get); 234 if (result.isEmpty()) { 235 return false; 236 } 237 return Bytes.toBoolean(result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS)); 238 } 239 240 private static void addQuotas(final Connection connection, final byte[] rowKey, final Quotas data) 241 throws IOException { 242 addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data); 243 } 244 245 private static void addQuotas(final Connection connection, final byte[] rowKey, 246 final byte[] qualifier, final Quotas data) throws IOException { 247 Put put = new Put(rowKey); 248 put.addColumn(QUOTA_FAMILY_INFO, qualifier, quotasToData(data)); 249 doPut(connection, put); 250 } 251 252 private static void deleteQuotas(final Connection connection, final byte[] rowKey) 253 throws IOException { 254 deleteQuotas(connection, rowKey, null); 255 } 256 257 private static void deleteQuotas(final Connection connection, final byte[] rowKey, 258 final byte[] qualifier) throws IOException { 259 Delete delete = new Delete(rowKey); 260 if (qualifier != null) { 261 delete.addColumns(QUOTA_FAMILY_INFO, qualifier); 262 } 263 if (isNamespaceRowKey(rowKey)) { 264 String ns = getNamespaceFromRowKey(rowKey); 265 Quotas namespaceQuota = getNamespaceQuota(connection, ns); 266 if (namespaceQuota != null && namespaceQuota.hasSpace()) { 267 // When deleting namespace space quota, also delete table usage(u:p) snapshots 268 deleteTableUsageSnapshotsForNamespace(connection, ns); 269 } 270 } 271 doDelete(connection, delete); 272 } 273 274 public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection, 275 final List<Get> gets, Map<TableName, Double> tableMachineQuotaFactors, double factor) 276 throws IOException { 277 long nowTs = EnvironmentEdgeManager.currentTime(); 278 Result[] results = doGet(connection, gets); 279 280 Map<String, UserQuotaState> userQuotas = new HashMap<>(results.length); 281 for (int i = 0; i < results.length; ++i) { 282 byte[] key = gets.get(i).getRow(); 283 assert isUserRowKey(key); 284 String user = getUserFromRowKey(key); 285 286 final UserQuotaState quotaInfo = new UserQuotaState(nowTs); 287 userQuotas.put(user, quotaInfo); 288 289 if (results[i].isEmpty()) continue; 290 assert Bytes.equals(key, results[i].getRow()); 291 292 try { 293 parseUserResult(user, results[i], new UserQuotasVisitor() { 294 @Override 295 public void visitUserQuotas(String userName, String namespace, Quotas quotas) { 296 quotas = updateClusterQuotaToMachineQuota(quotas, factor); 297 quotaInfo.setQuotas(namespace, quotas); 298 } 299 300 @Override 301 public void visitUserQuotas(String userName, TableName table, Quotas quotas) { 302 quotas = updateClusterQuotaToMachineQuota(quotas, 303 tableMachineQuotaFactors.containsKey(table) 304 ? tableMachineQuotaFactors.get(table) 305 : 1); 306 quotaInfo.setQuotas(table, quotas); 307 } 308 309 @Override 310 public void visitUserQuotas(String userName, Quotas quotas) { 311 quotas = updateClusterQuotaToMachineQuota(quotas, factor); 312 quotaInfo.setQuotas(quotas); 313 } 314 }); 315 } catch (IOException e) { 316 LOG.error("Unable to parse user '" + user + "' quotas", e); 317 userQuotas.remove(user); 318 } 319 } 320 return userQuotas; 321 } 322 323 public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection, 324 final List<Get> gets, Map<TableName, Double> tableMachineFactors) throws IOException { 325 return fetchGlobalQuotas("table", connection, gets, new KeyFromRow<TableName>() { 326 @Override 327 public TableName getKeyFromRow(final byte[] row) { 328 assert isTableRowKey(row); 329 return getTableFromRowKey(row); 330 } 331 332 @Override 333 public double getFactor(TableName tableName) { 334 return tableMachineFactors.containsKey(tableName) ? tableMachineFactors.get(tableName) : 1; 335 } 336 }); 337 } 338 339 public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection, 340 final List<Get> gets, double factor) throws IOException { 341 return fetchGlobalQuotas("namespace", connection, gets, new KeyFromRow<String>() { 342 @Override 343 public String getKeyFromRow(final byte[] row) { 344 assert isNamespaceRowKey(row); 345 return getNamespaceFromRowKey(row); 346 } 347 348 @Override 349 public double getFactor(String s) { 350 return factor; 351 } 352 }); 353 } 354 355 public static Map<String, QuotaState> fetchRegionServerQuotas(final Connection connection, 356 final List<Get> gets) throws IOException { 357 return fetchGlobalQuotas("regionServer", connection, gets, new KeyFromRow<String>() { 358 @Override 359 public String getKeyFromRow(final byte[] row) { 360 assert isRegionServerRowKey(row); 361 return getRegionServerFromRowKey(row); 362 } 363 364 @Override 365 public double getFactor(String s) { 366 return 1; 367 } 368 }); 369 } 370 371 public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type, 372 final Connection connection, final List<Get> gets, final KeyFromRow<K> kfr) throws IOException { 373 long nowTs = EnvironmentEdgeManager.currentTime(); 374 Result[] results = doGet(connection, gets); 375 376 Map<K, QuotaState> globalQuotas = new HashMap<>(results.length); 377 for (int i = 0; i < results.length; ++i) { 378 byte[] row = gets.get(i).getRow(); 379 K key = kfr.getKeyFromRow(row); 380 381 QuotaState quotaInfo = new QuotaState(nowTs); 382 globalQuotas.put(key, quotaInfo); 383 384 if (results[i].isEmpty()) continue; 385 assert Bytes.equals(row, results[i].getRow()); 386 387 byte[] data = results[i].getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 388 if (data == null) continue; 389 390 try { 391 Quotas quotas = quotasFromData(data); 392 quotas = updateClusterQuotaToMachineQuota(quotas, kfr.getFactor(key)); 393 quotaInfo.setQuotas(quotas); 394 } catch (IOException e) { 395 LOG.error("Unable to parse " + type + " '" + key + "' quotas", e); 396 globalQuotas.remove(key); 397 } 398 } 399 return globalQuotas; 400 } 401 402 /** 403 * Convert cluster scope quota to machine scope quota 404 * @param quotas the original quota 405 * @param factor factor used to divide cluster limiter to machine limiter 406 * @return the converted quota whose quota limiters all in machine scope 407 */ 408 private static Quotas updateClusterQuotaToMachineQuota(Quotas quotas, double factor) { 409 Quotas.Builder newQuotas = Quotas.newBuilder(quotas); 410 if (newQuotas.hasThrottle()) { 411 Throttle.Builder throttle = Throttle.newBuilder(newQuotas.getThrottle()); 412 if (throttle.hasReqNum()) { 413 throttle.setReqNum(updateTimedQuota(throttle.getReqNum(), factor)); 414 } 415 if (throttle.hasReqSize()) { 416 throttle.setReqSize(updateTimedQuota(throttle.getReqSize(), factor)); 417 } 418 if (throttle.hasReadNum()) { 419 throttle.setReadNum(updateTimedQuota(throttle.getReadNum(), factor)); 420 } 421 if (throttle.hasReadSize()) { 422 throttle.setReadSize(updateTimedQuota(throttle.getReadSize(), factor)); 423 } 424 if (throttle.hasWriteNum()) { 425 throttle.setWriteNum(updateTimedQuota(throttle.getWriteNum(), factor)); 426 } 427 if (throttle.hasWriteSize()) { 428 throttle.setWriteSize(updateTimedQuota(throttle.getWriteSize(), factor)); 429 } 430 if (throttle.hasReqCapacityUnit()) { 431 throttle.setReqCapacityUnit(updateTimedQuota(throttle.getReqCapacityUnit(), factor)); 432 } 433 if (throttle.hasReadCapacityUnit()) { 434 throttle.setReadCapacityUnit(updateTimedQuota(throttle.getReadCapacityUnit(), factor)); 435 } 436 if (throttle.hasWriteCapacityUnit()) { 437 throttle.setWriteCapacityUnit(updateTimedQuota(throttle.getWriteCapacityUnit(), factor)); 438 } 439 newQuotas.setThrottle(throttle.build()); 440 } 441 return newQuotas.build(); 442 } 443 444 private static TimedQuota updateTimedQuota(TimedQuota timedQuota, double factor) { 445 if (timedQuota.getScope() == QuotaScope.CLUSTER) { 446 TimedQuota.Builder newTimedQuota = TimedQuota.newBuilder(timedQuota); 447 newTimedQuota.setSoftLimit(Math.max(1, (long) (timedQuota.getSoftLimit() * factor))) 448 .setScope(QuotaScope.MACHINE); 449 return newTimedQuota.build(); 450 } else { 451 return timedQuota; 452 } 453 } 454 455 private static interface KeyFromRow<T> { 456 T getKeyFromRow(final byte[] row); 457 458 double getFactor(T t); 459 } 460 461 /* 462 * ========================================================================= HTable helpers 463 */ 464 private static void doPut(final Connection connection, final Put put) throws IOException { 465 try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 466 table.put(put); 467 } 468 } 469 470 private static void doDelete(final Connection connection, final Delete delete) 471 throws IOException { 472 try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 473 table.delete(delete); 474 } 475 } 476 477 /* 478 * ========================================================================= Data Size Helpers 479 */ 480 public static long calculateMutationSize(final Mutation mutation) { 481 long size = 0; 482 for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) { 483 for (Cell cell : entry.getValue()) { 484 size += cell.getSerializedSize(); 485 } 486 } 487 return size; 488 } 489 490 public static long calculateResultSize(final Result result) { 491 long size = 0; 492 for (Cell cell : result.rawCells()) { 493 size += cell.getSerializedSize(); 494 } 495 return size; 496 } 497 498 public static long calculateResultSize(final List<Result> results) { 499 long size = 0; 500 for (Result result : results) { 501 for (Cell cell : result.rawCells()) { 502 size += cell.getSerializedSize(); 503 } 504 } 505 return size; 506 } 507 508 /** 509 * Method to enable a table, if not already enabled. This method suppresses 510 * {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling 511 * the table. 512 * @param conn connection to re-use 513 * @param tableName name of the table to be enabled 514 */ 515 public static void enableTableIfNotEnabled(Connection conn, TableName tableName) 516 throws IOException { 517 try { 518 conn.getAdmin().enableTable(tableName); 519 } catch (TableNotDisabledException | TableNotFoundException e) { 520 // ignore 521 } 522 } 523 524 /** 525 * Method to disable a table, if not already disabled. This method suppresses 526 * {@link TableNotEnabledException}, if thrown while disabling the table. 527 * @param conn connection to re-use 528 * @param tableName table name which has moved into space quota violation 529 */ 530 public static void disableTableIfNotDisabled(Connection conn, TableName tableName) 531 throws IOException { 532 try { 533 conn.getAdmin().disableTable(tableName); 534 } catch (TableNotEnabledException | TableNotFoundException e) { 535 // ignore 536 } 537 } 538}