001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.DoNotRetryIOException; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.TableNotDisabledException; 031import org.apache.hadoop.hbase.TableNotEnabledException; 032import org.apache.hadoop.hbase.TableNotFoundException; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.Delete; 036import org.apache.hadoop.hbase.client.Get; 037import org.apache.hadoop.hbase.client.Mutation; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.regionserver.BloomType; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 046import org.apache.hadoop.hbase.util.Pair; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.apache.yetus.audience.InterfaceStability; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota; 057 058/** 059 * Helper class to interact with the quota table 060 */ 061@InterfaceAudience.Private 062@InterfaceStability.Evolving 063public class QuotaUtil extends QuotaTableUtil { 064 private static final Logger LOG = LoggerFactory.getLogger(QuotaUtil.class); 065 066 public static final String QUOTA_CONF_KEY = "hbase.quota.enabled"; 067 private static final boolean QUOTA_ENABLED_DEFAULT = false; 068 069 public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit"; 070 // the default one read capacity unit is 1024 bytes (1KB) 071 public static final long DEFAULT_READ_CAPACITY_UNIT = 1024; 072 public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit"; 073 // the default one write capacity unit is 1024 bytes (1KB) 074 public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024; 075 076 /** Table descriptor for Quota internal table */ 077 public static final TableDescriptor QUOTA_TABLE_DESC = 078 TableDescriptorBuilder.newBuilder(QUOTA_TABLE_NAME) 079 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_INFO) 080 .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW) 081 .setMaxVersions(1).build()) 082 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_USAGE) 083 .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW) 084 .setMaxVersions(1).build()) 085 .build(); 086 087 /** Returns true if the support for quota is enabled */ 088 public static boolean isQuotaEnabled(final Configuration conf) { 089 return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT); 090 } 091 092 /* 093 * ========================================================================= Quota "settings" 094 * helpers 095 */ 096 public static void addTableQuota(final Connection connection, final TableName table, 097 final Quotas data) throws IOException { 098 addQuotas(connection, getTableRowKey(table), data); 099 } 100 101 public static void deleteTableQuota(final Connection connection, final TableName table) 102 throws IOException { 103 deleteQuotas(connection, getTableRowKey(table)); 104 } 105 106 public static void addNamespaceQuota(final Connection connection, final String namespace, 107 final Quotas data) throws IOException { 108 addQuotas(connection, getNamespaceRowKey(namespace), data); 109 } 110 111 public static void deleteNamespaceQuota(final Connection connection, final String namespace) 112 throws IOException { 113 deleteQuotas(connection, getNamespaceRowKey(namespace)); 114 } 115 116 public static void addUserQuota(final Connection connection, final String user, final Quotas data) 117 throws IOException { 118 addQuotas(connection, getUserRowKey(user), data); 119 } 120 121 public static void addUserQuota(final Connection connection, final String user, 122 final TableName table, final Quotas data) throws IOException { 123 addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table), data); 124 } 125 126 public static void addUserQuota(final Connection connection, final String user, 127 final String namespace, final Quotas data) throws IOException { 128 addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace), 129 data); 130 } 131 132 public static void deleteUserQuota(final Connection connection, final String user) 133 throws IOException { 134 deleteQuotas(connection, getUserRowKey(user)); 135 } 136 137 public static void deleteUserQuota(final Connection connection, final String user, 138 final TableName table) throws IOException { 139 deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table)); 140 } 141 142 public static void deleteUserQuota(final Connection connection, final String user, 143 final String namespace) throws IOException { 144 deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace)); 145 } 146 147 public static void addRegionServerQuota(final Connection connection, final String regionServer, 148 final Quotas data) throws IOException { 149 addQuotas(connection, getRegionServerRowKey(regionServer), data); 150 } 151 152 public static void deleteRegionServerQuota(final Connection connection, final String regionServer) 153 throws IOException { 154 deleteQuotas(connection, getRegionServerRowKey(regionServer)); 155 } 156 157 protected static void switchExceedThrottleQuota(final Connection connection, 158 boolean exceedThrottleQuotaEnabled) throws IOException { 159 if (exceedThrottleQuotaEnabled) { 160 checkRSQuotaToEnableExceedThrottle( 161 getRegionServerQuota(connection, QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY)); 162 } 163 164 Put put = new Put(getExceedThrottleQuotaRowKey()); 165 put.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS, 166 Bytes.toBytes(exceedThrottleQuotaEnabled)); 167 doPut(connection, put); 168 } 169 170 private static void checkRSQuotaToEnableExceedThrottle(Quotas quotas) throws IOException { 171 if (quotas != null && quotas.hasThrottle()) { 172 Throttle throttle = quotas.getThrottle(); 173 // If enable exceed throttle quota, make sure that there are at least one read(req/read + 174 // num/size/cu) and one write(req/write + num/size/cu) region server throttle quotas. 175 boolean hasReadQuota = false; 176 boolean hasWriteQuota = false; 177 if (throttle.hasReqNum() || throttle.hasReqSize() || throttle.hasReqCapacityUnit()) { 178 hasReadQuota = true; 179 hasWriteQuota = true; 180 } 181 if ( 182 !hasReadQuota 183 && (throttle.hasReadNum() || throttle.hasReadSize() || throttle.hasReadCapacityUnit()) 184 ) { 185 hasReadQuota = true; 186 } 187 if (!hasReadQuota) { 188 throw new DoNotRetryIOException( 189 "Please set at least one read region server quota before enable exceed throttle quota"); 190 } 191 if ( 192 !hasWriteQuota 193 && (throttle.hasWriteNum() || throttle.hasWriteSize() || throttle.hasWriteCapacityUnit()) 194 ) { 195 hasWriteQuota = true; 196 } 197 if (!hasWriteQuota) { 198 throw new DoNotRetryIOException("Please set at least one write region server quota " 199 + "before enable exceed throttle quota"); 200 } 201 // If enable exceed throttle quota, make sure that region server throttle quotas are in 202 // seconds time unit. Because once previous requests exceed their quota and consume region 203 // server quota, quota in other time units may be refilled in a long time, this may affect 204 // later requests. 205 List<Pair<Boolean, TimedQuota>> list = 206 Arrays.asList(Pair.newPair(throttle.hasReqNum(), throttle.getReqNum()), 207 Pair.newPair(throttle.hasReadNum(), throttle.getReadNum()), 208 Pair.newPair(throttle.hasWriteNum(), throttle.getWriteNum()), 209 Pair.newPair(throttle.hasReqSize(), throttle.getReqSize()), 210 Pair.newPair(throttle.hasReadSize(), throttle.getReadSize()), 211 Pair.newPair(throttle.hasWriteSize(), throttle.getWriteSize()), 212 Pair.newPair(throttle.hasReqCapacityUnit(), throttle.getReqCapacityUnit()), 213 Pair.newPair(throttle.hasReadCapacityUnit(), throttle.getReadCapacityUnit()), 214 Pair.newPair(throttle.hasWriteCapacityUnit(), throttle.getWriteCapacityUnit())); 215 for (Pair<Boolean, TimedQuota> pair : list) { 216 if (pair.getFirst()) { 217 if (pair.getSecond().getTimeUnit() != TimeUnit.SECONDS) { 218 throw new DoNotRetryIOException("All region server quota must be " 219 + "in seconds time unit if enable exceed throttle quota"); 220 } 221 } 222 } 223 } else { 224 // If enable exceed throttle quota, make sure that region server quota is already set 225 throw new DoNotRetryIOException( 226 "Please set region server quota before enable exceed throttle quota"); 227 } 228 } 229 230 protected static boolean isExceedThrottleQuotaEnabled(final Connection connection) 231 throws IOException { 232 Get get = new Get(getExceedThrottleQuotaRowKey()); 233 get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 234 Result result = doGet(connection, get); 235 if (result.isEmpty()) { 236 return false; 237 } 238 return Bytes.toBoolean(result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS)); 239 } 240 241 private static void addQuotas(final Connection connection, final byte[] rowKey, final Quotas data) 242 throws IOException { 243 addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data); 244 } 245 246 private static void addQuotas(final Connection connection, final byte[] rowKey, 247 final byte[] qualifier, final Quotas data) throws IOException { 248 Put put = new Put(rowKey); 249 put.addColumn(QUOTA_FAMILY_INFO, qualifier, quotasToData(data)); 250 doPut(connection, put); 251 } 252 253 private static void deleteQuotas(final Connection connection, final byte[] rowKey) 254 throws IOException { 255 deleteQuotas(connection, rowKey, null); 256 } 257 258 private static void deleteQuotas(final Connection connection, final byte[] rowKey, 259 final byte[] qualifier) throws IOException { 260 Delete delete = new Delete(rowKey); 261 if (qualifier != null) { 262 delete.addColumns(QUOTA_FAMILY_INFO, qualifier); 263 } 264 if (isNamespaceRowKey(rowKey)) { 265 String ns = getNamespaceFromRowKey(rowKey); 266 Quotas namespaceQuota = getNamespaceQuota(connection, ns); 267 if (namespaceQuota != null && namespaceQuota.hasSpace()) { 268 // When deleting namespace space quota, also delete table usage(u:p) snapshots 269 deleteTableUsageSnapshotsForNamespace(connection, ns); 270 } 271 } 272 doDelete(connection, delete); 273 } 274 275 public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection, 276 final List<Get> gets, Map<TableName, Double> tableMachineQuotaFactors, double factor) 277 throws IOException { 278 long nowTs = EnvironmentEdgeManager.currentTime(); 279 Result[] results = doGet(connection, gets); 280 281 Map<String, UserQuotaState> userQuotas = new HashMap<>(results.length); 282 for (int i = 0; i < results.length; ++i) { 283 byte[] key = gets.get(i).getRow(); 284 assert isUserRowKey(key); 285 String user = getUserFromRowKey(key); 286 287 final UserQuotaState quotaInfo = new UserQuotaState(nowTs); 288 userQuotas.put(user, quotaInfo); 289 290 if (results[i].isEmpty()) continue; 291 assert Bytes.equals(key, results[i].getRow()); 292 293 try { 294 parseUserResult(user, results[i], new UserQuotasVisitor() { 295 @Override 296 public void visitUserQuotas(String userName, String namespace, Quotas quotas) { 297 quotas = updateClusterQuotaToMachineQuota(quotas, factor); 298 quotaInfo.setQuotas(namespace, quotas); 299 } 300 301 @Override 302 public void visitUserQuotas(String userName, TableName table, Quotas quotas) { 303 quotas = updateClusterQuotaToMachineQuota(quotas, 304 tableMachineQuotaFactors.containsKey(table) 305 ? tableMachineQuotaFactors.get(table) 306 : 1); 307 quotaInfo.setQuotas(table, quotas); 308 } 309 310 @Override 311 public void visitUserQuotas(String userName, Quotas quotas) { 312 quotas = updateClusterQuotaToMachineQuota(quotas, factor); 313 quotaInfo.setQuotas(quotas); 314 } 315 }); 316 } catch (IOException e) { 317 LOG.error("Unable to parse user '" + user + "' quotas", e); 318 userQuotas.remove(user); 319 } 320 } 321 return userQuotas; 322 } 323 324 public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection, 325 final List<Get> gets, Map<TableName, Double> tableMachineFactors) throws IOException { 326 return fetchGlobalQuotas("table", connection, gets, new KeyFromRow<TableName>() { 327 @Override 328 public TableName getKeyFromRow(final byte[] row) { 329 assert isTableRowKey(row); 330 return getTableFromRowKey(row); 331 } 332 333 @Override 334 public double getFactor(TableName tableName) { 335 return tableMachineFactors.containsKey(tableName) ? tableMachineFactors.get(tableName) : 1; 336 } 337 }); 338 } 339 340 public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection, 341 final List<Get> gets, double factor) throws IOException { 342 return fetchGlobalQuotas("namespace", connection, gets, new KeyFromRow<String>() { 343 @Override 344 public String getKeyFromRow(final byte[] row) { 345 assert isNamespaceRowKey(row); 346 return getNamespaceFromRowKey(row); 347 } 348 349 @Override 350 public double getFactor(String s) { 351 return factor; 352 } 353 }); 354 } 355 356 public static Map<String, QuotaState> fetchRegionServerQuotas(final Connection connection, 357 final List<Get> gets) throws IOException { 358 return fetchGlobalQuotas("regionServer", connection, gets, new KeyFromRow<String>() { 359 @Override 360 public String getKeyFromRow(final byte[] row) { 361 assert isRegionServerRowKey(row); 362 return getRegionServerFromRowKey(row); 363 } 364 365 @Override 366 public double getFactor(String s) { 367 return 1; 368 } 369 }); 370 } 371 372 public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type, 373 final Connection connection, final List<Get> gets, final KeyFromRow<K> kfr) throws IOException { 374 long nowTs = EnvironmentEdgeManager.currentTime(); 375 Result[] results = doGet(connection, gets); 376 377 Map<K, QuotaState> globalQuotas = new HashMap<>(results.length); 378 for (int i = 0; i < results.length; ++i) { 379 byte[] row = gets.get(i).getRow(); 380 K key = kfr.getKeyFromRow(row); 381 382 QuotaState quotaInfo = new QuotaState(nowTs); 383 globalQuotas.put(key, quotaInfo); 384 385 if (results[i].isEmpty()) continue; 386 assert Bytes.equals(row, results[i].getRow()); 387 388 byte[] data = results[i].getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 389 if (data == null) continue; 390 391 try { 392 Quotas quotas = quotasFromData(data); 393 quotas = updateClusterQuotaToMachineQuota(quotas, kfr.getFactor(key)); 394 quotaInfo.setQuotas(quotas); 395 } catch (IOException e) { 396 LOG.error("Unable to parse " + type + " '" + key + "' quotas", e); 397 globalQuotas.remove(key); 398 } 399 } 400 return globalQuotas; 401 } 402 403 /** 404 * Convert cluster scope quota to machine scope quota 405 * @param quotas the original quota 406 * @param factor factor used to divide cluster limiter to machine limiter 407 * @return the converted quota whose quota limiters all in machine scope 408 */ 409 private static Quotas updateClusterQuotaToMachineQuota(Quotas quotas, double factor) { 410 Quotas.Builder newQuotas = Quotas.newBuilder(quotas); 411 if (newQuotas.hasThrottle()) { 412 Throttle.Builder throttle = Throttle.newBuilder(newQuotas.getThrottle()); 413 if (throttle.hasReqNum()) { 414 throttle.setReqNum(updateTimedQuota(throttle.getReqNum(), factor)); 415 } 416 if (throttle.hasReqSize()) { 417 throttle.setReqSize(updateTimedQuota(throttle.getReqSize(), factor)); 418 } 419 if (throttle.hasReadNum()) { 420 throttle.setReadNum(updateTimedQuota(throttle.getReadNum(), factor)); 421 } 422 if (throttle.hasReadSize()) { 423 throttle.setReadSize(updateTimedQuota(throttle.getReadSize(), factor)); 424 } 425 if (throttle.hasWriteNum()) { 426 throttle.setWriteNum(updateTimedQuota(throttle.getWriteNum(), factor)); 427 } 428 if (throttle.hasWriteSize()) { 429 throttle.setWriteSize(updateTimedQuota(throttle.getWriteSize(), factor)); 430 } 431 if (throttle.hasReqCapacityUnit()) { 432 throttle.setReqCapacityUnit(updateTimedQuota(throttle.getReqCapacityUnit(), factor)); 433 } 434 if (throttle.hasReadCapacityUnit()) { 435 throttle.setReadCapacityUnit(updateTimedQuota(throttle.getReadCapacityUnit(), factor)); 436 } 437 if (throttle.hasWriteCapacityUnit()) { 438 throttle.setWriteCapacityUnit(updateTimedQuota(throttle.getWriteCapacityUnit(), factor)); 439 } 440 newQuotas.setThrottle(throttle.build()); 441 } 442 return newQuotas.build(); 443 } 444 445 private static TimedQuota updateTimedQuota(TimedQuota timedQuota, double factor) { 446 if (timedQuota.getScope() == QuotaScope.CLUSTER) { 447 TimedQuota.Builder newTimedQuota = TimedQuota.newBuilder(timedQuota); 448 newTimedQuota.setSoftLimit(Math.max(1, (long) (timedQuota.getSoftLimit() * factor))) 449 .setScope(QuotaScope.MACHINE); 450 return newTimedQuota.build(); 451 } else { 452 return timedQuota; 453 } 454 } 455 456 private static interface KeyFromRow<T> { 457 T getKeyFromRow(final byte[] row); 458 459 double getFactor(T t); 460 } 461 462 /* 463 * ========================================================================= HTable helpers 464 */ 465 private static void doPut(final Connection connection, final Put put) throws IOException { 466 try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 467 table.put(put); 468 } 469 } 470 471 private static void doDelete(final Connection connection, final Delete delete) 472 throws IOException { 473 try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 474 table.delete(delete); 475 } 476 } 477 478 /* 479 * ========================================================================= Data Size Helpers 480 */ 481 public static long calculateMutationSize(final Mutation mutation) { 482 long size = 0; 483 for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) { 484 for (Cell cell : entry.getValue()) { 485 size += cell.getSerializedSize(); 486 } 487 } 488 return size; 489 } 490 491 public static long calculateResultSize(final Result result) { 492 long size = 0; 493 for (Cell cell : result.rawCells()) { 494 size += cell.getSerializedSize(); 495 } 496 return size; 497 } 498 499 public static long calculateResultSize(final List<Result> results) { 500 long size = 0; 501 for (Result result : results) { 502 for (Cell cell : result.rawCells()) { 503 size += cell.getSerializedSize(); 504 } 505 } 506 return size; 507 } 508 509 /** 510 * Method to enable a table, if not already enabled. This method suppresses 511 * {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling 512 * the table. 513 * @param conn connection to re-use 514 * @param tableName name of the table to be enabled 515 */ 516 public static void enableTableIfNotEnabled(Connection conn, TableName tableName) 517 throws IOException { 518 try { 519 conn.getAdmin().enableTable(tableName); 520 } catch (TableNotDisabledException | TableNotFoundException e) { 521 // ignore 522 } 523 } 524 525 /** 526 * Method to disable a table, if not already disabled. This method suppresses 527 * {@link TableNotEnabledException}, if thrown while disabling the table. 528 * @param conn connection to re-use 529 * @param tableName table name which has moved into space quota violation 530 */ 531 public static void disableTableIfNotDisabled(Connection conn, TableName tableName) 532 throws IOException { 533 try { 534 conn.getAdmin().disableTable(tableName); 535 } catch (TableNotEnabledException | TableNotFoundException e) { 536 // ignore 537 } 538 } 539}