001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.quotas; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Objects; 031import java.util.Set; 032import java.util.regex.Pattern; 033 034import org.apache.commons.lang3.StringUtils; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.CellScanner; 037import org.apache.hadoop.hbase.CompareOperator; 038import org.apache.hadoop.hbase.NamespaceDescriptor; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.Delete; 042import org.apache.hadoop.hbase.client.Get; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; 049import org.apache.hadoop.hbase.filter.Filter; 050import org.apache.hadoop.hbase.filter.FilterList; 051import org.apache.hadoop.hbase.filter.QualifierFilter; 052import org.apache.hadoop.hbase.filter.RegexStringComparator; 053import org.apache.hadoop.hbase.filter.RowFilter; 054import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.apache.yetus.audience.InterfaceStability; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 062import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 063import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 064import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 065import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 066 067import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; 071 072/** 073 * Helper class to interact with the quota table. 074 * <table> 075 * <tr><th>ROW-KEY</th><th>FAM/QUAL</th><th>DATA</th><th>DESC</th></tr> 076 * <tr><td>n.<namespace></td><td>q:s</td><td><global-quotas></td></tr> 077 * <tr><td>n.<namespace></td><td>u:p</td><td><namespace-quota policy></td></tr> 078 * <tr><td>n.<namespace></td><td>u:s</td><td><SpaceQuotaSnapshot></td> 079 * <td>The size of all snapshots against tables in the namespace</td></tr> 080 * <tr><td>t.<table></td><td>q:s</td><td><global-quotas></td></tr> 081 * <tr><td>t.<table></td><td>u:p</td><td><table-quota policy></td></tr> 082 * <tr><td>t.<table></td><td>u:ss.<snapshot name></td> 083 * <td><SpaceQuotaSnapshot></td><td>The size of a snapshot against a table</td></tr> 084 * <tr><td>u.<user></td><td>q:s</td><td><global-quotas></td></tr> 085 * <tr><td>u.<user></td><td>q:s.<table></td><td><table-quotas></td></tr> 086 * <tr><td>u.<user></td><td>q:s.<ns></td><td><namespace-quotas></td></tr> 087 * </table> 088 */ 089@InterfaceAudience.Private 090@InterfaceStability.Evolving 091public class QuotaTableUtil { 092 private static final Logger LOG = LoggerFactory.getLogger(QuotaTableUtil.class); 093 094 /** System table for quotas */ 095 public static final TableName QUOTA_TABLE_NAME = 096 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "quota"); 097 098 protected static final byte[] QUOTA_FAMILY_INFO = Bytes.toBytes("q"); 099 protected static final byte[] QUOTA_FAMILY_USAGE = Bytes.toBytes("u"); 100 protected static final byte[] QUOTA_QUALIFIER_SETTINGS = Bytes.toBytes("s"); 101 protected static final byte[] QUOTA_QUALIFIER_SETTINGS_PREFIX = Bytes.toBytes("s."); 102 protected static final byte[] QUOTA_QUALIFIER_POLICY = Bytes.toBytes("p"); 103 protected static final byte[] QUOTA_SNAPSHOT_SIZE_QUALIFIER = Bytes.toBytes("ss"); 104 protected static final String QUOTA_POLICY_COLUMN = 105 Bytes.toString(QUOTA_FAMILY_USAGE) + ":" + Bytes.toString(QUOTA_QUALIFIER_POLICY); 106 protected static final byte[] QUOTA_USER_ROW_KEY_PREFIX = Bytes.toBytes("u."); 107 protected static final byte[] QUOTA_TABLE_ROW_KEY_PREFIX = Bytes.toBytes("t."); 108 protected static final byte[] QUOTA_NAMESPACE_ROW_KEY_PREFIX = Bytes.toBytes("n."); 109 protected static final byte[] QUOTA_REGION_SERVER_ROW_KEY_PREFIX = Bytes.toBytes("r."); 110 private static final byte[] QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY = 111 Bytes.toBytes("exceedThrottleQuota"); 112 113 /* 114 * TODO: Setting specified region server quota isn't supported currently and the row key "r.all" 115 * represents the throttle quota of all region servers 116 */ 117 public static final String QUOTA_REGION_SERVER_ROW_KEY = "all"; 118 119 /* ========================================================================= 120 * Quota "settings" helpers 121 */ 122 public static Quotas getTableQuota(final Connection connection, final TableName table) 123 throws IOException { 124 return getQuotas(connection, getTableRowKey(table)); 125 } 126 127 public static Quotas getNamespaceQuota(final Connection connection, final String namespace) 128 throws IOException { 129 return getQuotas(connection, getNamespaceRowKey(namespace)); 130 } 131 132 public static Quotas getUserQuota(final Connection connection, final String user) 133 throws IOException { 134 return getQuotas(connection, getUserRowKey(user)); 135 } 136 137 public static Quotas getUserQuota(final Connection connection, final String user, 138 final TableName table) throws IOException { 139 return getQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table)); 140 } 141 142 public static Quotas getUserQuota(final Connection connection, final String user, 143 final String namespace) throws IOException { 144 return getQuotas(connection, getUserRowKey(user), 145 getSettingsQualifierForUserNamespace(namespace)); 146 } 147 148 private static Quotas getQuotas(final Connection connection, final byte[] rowKey) 149 throws IOException { 150 return getQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS); 151 } 152 153 public static Quotas getRegionServerQuota(final Connection connection, final String regionServer) 154 throws IOException { 155 return getQuotas(connection, getRegionServerRowKey(regionServer)); 156 } 157 158 private static Quotas getQuotas(final Connection connection, final byte[] rowKey, 159 final byte[] qualifier) throws IOException { 160 Get get = new Get(rowKey); 161 get.addColumn(QUOTA_FAMILY_INFO, qualifier); 162 Result result = doGet(connection, get); 163 if (result.isEmpty()) { 164 return null; 165 } 166 return quotasFromData(result.getValue(QUOTA_FAMILY_INFO, qualifier)); 167 } 168 169 public static Get makeGetForTableQuotas(final TableName table) { 170 Get get = new Get(getTableRowKey(table)); 171 get.addFamily(QUOTA_FAMILY_INFO); 172 return get; 173 } 174 175 public static Get makeGetForNamespaceQuotas(final String namespace) { 176 Get get = new Get(getNamespaceRowKey(namespace)); 177 get.addFamily(QUOTA_FAMILY_INFO); 178 return get; 179 } 180 181 public static Get makeGetForRegionServerQuotas(final String regionServer) { 182 Get get = new Get(getRegionServerRowKey(regionServer)); 183 get.addFamily(QUOTA_FAMILY_INFO); 184 return get; 185 } 186 187 public static Get makeGetForUserQuotas(final String user, final Iterable<TableName> tables, 188 final Iterable<String> namespaces) { 189 Get get = new Get(getUserRowKey(user)); 190 get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 191 for (final TableName table: tables) { 192 get.addColumn(QUOTA_FAMILY_INFO, getSettingsQualifierForUserTable(table)); 193 } 194 for (final String ns: namespaces) { 195 get.addColumn(QUOTA_FAMILY_INFO, getSettingsQualifierForUserNamespace(ns)); 196 } 197 return get; 198 } 199 200 public static Scan makeScan(final QuotaFilter filter) { 201 Scan scan = new Scan(); 202 scan.addFamily(QUOTA_FAMILY_INFO); 203 if (filter != null && !filter.isNull()) { 204 scan.setFilter(makeFilter(filter)); 205 } 206 return scan; 207 } 208 209 /** 210 * converts quotafilter to serializeable filterlists. 211 */ 212 public static Filter makeFilter(final QuotaFilter filter) { 213 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); 214 if (StringUtils.isNotEmpty(filter.getUserFilter())) { 215 FilterList userFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE); 216 boolean hasFilter = false; 217 218 if (StringUtils.isNotEmpty(filter.getNamespaceFilter())) { 219 FilterList nsFilters = new FilterList(FilterList.Operator.MUST_PASS_ALL); 220 nsFilters.addFilter(new RowFilter(CompareOperator.EQUAL, 221 new RegexStringComparator(getUserRowKeyRegex(filter.getUserFilter()), 0))); 222 nsFilters.addFilter(new QualifierFilter(CompareOperator.EQUAL, 223 new RegexStringComparator( 224 getSettingsQualifierRegexForUserNamespace(filter.getNamespaceFilter()), 0))); 225 userFilters.addFilter(nsFilters); 226 hasFilter = true; 227 } 228 if (StringUtils.isNotEmpty(filter.getTableFilter())) { 229 FilterList tableFilters = new FilterList(FilterList.Operator.MUST_PASS_ALL); 230 tableFilters.addFilter(new RowFilter(CompareOperator.EQUAL, 231 new RegexStringComparator(getUserRowKeyRegex(filter.getUserFilter()), 0))); 232 tableFilters.addFilter(new QualifierFilter(CompareOperator.EQUAL, 233 new RegexStringComparator( 234 getSettingsQualifierRegexForUserTable(filter.getTableFilter()), 0))); 235 userFilters.addFilter(tableFilters); 236 hasFilter = true; 237 } 238 if (!hasFilter) { 239 userFilters.addFilter(new RowFilter(CompareOperator.EQUAL, 240 new RegexStringComparator(getUserRowKeyRegex(filter.getUserFilter()), 0))); 241 } 242 243 filterList.addFilter(userFilters); 244 } else if (StringUtils.isNotEmpty(filter.getTableFilter())) { 245 filterList.addFilter(new RowFilter(CompareOperator.EQUAL, 246 new RegexStringComparator(getTableRowKeyRegex(filter.getTableFilter()), 0))); 247 } else if (StringUtils.isNotEmpty(filter.getNamespaceFilter())) { 248 filterList.addFilter(new RowFilter(CompareOperator.EQUAL, 249 new RegexStringComparator(getNamespaceRowKeyRegex(filter.getNamespaceFilter()), 0))); 250 } else if (StringUtils.isNotEmpty(filter.getRegionServerFilter())) { 251 filterList.addFilter(new RowFilter(CompareOperator.EQUAL, new RegexStringComparator( 252 getRegionServerRowKeyRegex(filter.getRegionServerFilter()), 0))); 253 } 254 return filterList; 255 } 256 257 /** 258 * Creates a {@link Scan} which returns only quota snapshots from the quota table. 259 */ 260 public static Scan makeQuotaSnapshotScan() { 261 return makeQuotaSnapshotScanForTable(null); 262 } 263 264 /** 265 * Fetches all {@link SpaceQuotaSnapshot} objects from the {@code hbase:quota} table. 266 * 267 * @param conn The HBase connection 268 * @return A map of table names and their computed snapshot. 269 */ 270 public static Map<TableName,SpaceQuotaSnapshot> getSnapshots(Connection conn) throws IOException { 271 Map<TableName,SpaceQuotaSnapshot> snapshots = new HashMap<>(); 272 try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME); 273 ResultScanner rs = quotaTable.getScanner(makeQuotaSnapshotScan())) { 274 for (Result r : rs) { 275 extractQuotaSnapshot(r, snapshots); 276 } 277 } 278 return snapshots; 279 } 280 281 /** 282 * Creates a {@link Scan} which returns only {@link SpaceQuotaSnapshot} from the quota table for a 283 * specific table. 284 * @param tn Optionally, a table name to limit the scan's rowkey space. Can be null. 285 */ 286 public static Scan makeQuotaSnapshotScanForTable(TableName tn) { 287 Scan s = new Scan(); 288 // Limit to "u:v" column 289 s.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY); 290 if (null == tn) { 291 s.setRowPrefixFilter(QUOTA_TABLE_ROW_KEY_PREFIX); 292 } else { 293 byte[] row = getTableRowKey(tn); 294 // Limit rowspace to the "t:" prefix 295 s.withStartRow(row, true).withStopRow(row, true); 296 } 297 return s; 298 } 299 300 /** 301 * Creates a {@link Get} which returns only {@link SpaceQuotaSnapshot} from the quota table for a 302 * specific table. 303 * @param tn table name to get from. Can't be null. 304 */ 305 public static Get makeQuotaSnapshotGetForTable(TableName tn) { 306 Get g = new Get(getTableRowKey(tn)); 307 // Limit to "u:v" column 308 g.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY); 309 return g; 310 } 311 312 /** 313 * Extracts the {@link SpaceViolationPolicy} and {@link TableName} from the provided 314 * {@link Result} and adds them to the given {@link Map}. If the result does not contain 315 * the expected information or the serialized policy in the value is invalid, this method 316 * will throw an {@link IllegalArgumentException}. 317 * 318 * @param result A row from the quota table. 319 * @param snapshots A map of snapshots to add the result of this method into. 320 */ 321 public static void extractQuotaSnapshot( 322 Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) { 323 byte[] row = Objects.requireNonNull(result).getRow(); 324 if (row == null || row.length == 0) { 325 throw new IllegalArgumentException("Provided result had a null row"); 326 } 327 final TableName targetTableName = getTableFromRowKey(row); 328 Cell c = result.getColumnLatestCell(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY); 329 if (c == null) { 330 throw new IllegalArgumentException("Result did not contain the expected column " 331 + QUOTA_POLICY_COLUMN + ", " + result.toString()); 332 } 333 ByteString buffer = UnsafeByteOperations.unsafeWrap( 334 c.getValueArray(), c.getValueOffset(), c.getValueLength()); 335 try { 336 QuotaProtos.SpaceQuotaSnapshot snapshot = QuotaProtos.SpaceQuotaSnapshot.parseFrom(buffer); 337 snapshots.put(targetTableName, SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot)); 338 } catch (InvalidProtocolBufferException e) { 339 throw new IllegalArgumentException( 340 "Result did not contain a valid SpaceQuota protocol buffer message", e); 341 } 342 } 343 344 public static interface UserQuotasVisitor { 345 void visitUserQuotas(final String userName, final Quotas quotas) 346 throws IOException; 347 void visitUserQuotas(final String userName, final TableName table, final Quotas quotas) 348 throws IOException; 349 void visitUserQuotas(final String userName, final String namespace, final Quotas quotas) 350 throws IOException; 351 } 352 353 public static interface TableQuotasVisitor { 354 void visitTableQuotas(final TableName tableName, final Quotas quotas) 355 throws IOException; 356 } 357 358 public static interface NamespaceQuotasVisitor { 359 void visitNamespaceQuotas(final String namespace, final Quotas quotas) 360 throws IOException; 361 } 362 363 private static interface RegionServerQuotasVisitor { 364 void visitRegionServerQuotas(final String regionServer, final Quotas quotas) 365 throws IOException; 366 } 367 368 public static interface QuotasVisitor extends UserQuotasVisitor, TableQuotasVisitor, 369 NamespaceQuotasVisitor, RegionServerQuotasVisitor { 370 } 371 372 public static void parseResult(final Result result, final QuotasVisitor visitor) 373 throws IOException { 374 byte[] row = result.getRow(); 375 if (isNamespaceRowKey(row)) { 376 parseNamespaceResult(result, visitor); 377 } else if (isTableRowKey(row)) { 378 parseTableResult(result, visitor); 379 } else if (isUserRowKey(row)) { 380 parseUserResult(result, visitor); 381 } else if (isRegionServerRowKey(row)) { 382 parseRegionServerResult(result, visitor); 383 } else if (isExceedThrottleQuotaRowKey(row)) { 384 // skip exceed throttle quota row key 385 if (LOG.isDebugEnabled()) { 386 LOG.debug("Skip exceedThrottleQuota row-key when parse quota result"); 387 } 388 } else { 389 LOG.warn("unexpected row-key: " + Bytes.toString(row)); 390 } 391 } 392 393 public static void parseResultToCollection(final Result result, 394 Collection<QuotaSettings> quotaSettings) throws IOException { 395 396 QuotaTableUtil.parseResult(result, new QuotaTableUtil.QuotasVisitor() { 397 @Override 398 public void visitUserQuotas(String userName, Quotas quotas) { 399 quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, quotas)); 400 } 401 402 @Override 403 public void visitUserQuotas(String userName, TableName table, Quotas quotas) { 404 quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, table, quotas)); 405 } 406 407 @Override 408 public void visitUserQuotas(String userName, String namespace, Quotas quotas) { 409 quotaSettings.addAll(QuotaSettingsFactory.fromUserQuotas(userName, namespace, quotas)); 410 } 411 412 @Override 413 public void visitTableQuotas(TableName tableName, Quotas quotas) { 414 quotaSettings.addAll(QuotaSettingsFactory.fromTableQuotas(tableName, quotas)); 415 } 416 417 @Override 418 public void visitNamespaceQuotas(String namespace, Quotas quotas) { 419 quotaSettings.addAll(QuotaSettingsFactory.fromNamespaceQuotas(namespace, quotas)); 420 } 421 422 @Override 423 public void visitRegionServerQuotas(String regionServer, Quotas quotas) { 424 quotaSettings.addAll(QuotaSettingsFactory.fromRegionServerQuotas(regionServer, quotas)); 425 } 426 }); 427 } 428 429 public static void parseNamespaceResult(final Result result, 430 final NamespaceQuotasVisitor visitor) throws IOException { 431 String namespace = getNamespaceFromRowKey(result.getRow()); 432 parseNamespaceResult(namespace, result, visitor); 433 } 434 435 protected static void parseNamespaceResult(final String namespace, final Result result, 436 final NamespaceQuotasVisitor visitor) throws IOException { 437 byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 438 if (data != null) { 439 Quotas quotas = quotasFromData(data); 440 visitor.visitNamespaceQuotas(namespace, quotas); 441 } 442 } 443 444 private static void parseRegionServerResult(final Result result, 445 final RegionServerQuotasVisitor visitor) throws IOException { 446 String rs = getRegionServerFromRowKey(result.getRow()); 447 parseRegionServerResult(rs, result, visitor); 448 } 449 450 private static void parseRegionServerResult(final String regionServer, final Result result, 451 final RegionServerQuotasVisitor visitor) throws IOException { 452 byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 453 if (data != null) { 454 Quotas quotas = quotasFromData(data); 455 visitor.visitRegionServerQuotas(regionServer, quotas); 456 } 457 } 458 459 public static void parseTableResult(final Result result, final TableQuotasVisitor visitor) 460 throws IOException { 461 TableName table = getTableFromRowKey(result.getRow()); 462 parseTableResult(table, result, visitor); 463 } 464 465 protected static void parseTableResult(final TableName table, final Result result, 466 final TableQuotasVisitor visitor) throws IOException { 467 byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 468 if (data != null) { 469 Quotas quotas = quotasFromData(data); 470 visitor.visitTableQuotas(table, quotas); 471 } 472 } 473 474 public static void parseUserResult(final Result result, final UserQuotasVisitor visitor) 475 throws IOException { 476 String userName = getUserFromRowKey(result.getRow()); 477 parseUserResult(userName, result, visitor); 478 } 479 480 protected static void parseUserResult(final String userName, final Result result, 481 final UserQuotasVisitor visitor) throws IOException { 482 Map<byte[], byte[]> familyMap = result.getFamilyMap(QUOTA_FAMILY_INFO); 483 if (familyMap == null || familyMap.isEmpty()) return; 484 485 for (Map.Entry<byte[], byte[]> entry: familyMap.entrySet()) { 486 Quotas quotas = quotasFromData(entry.getValue()); 487 if (Bytes.startsWith(entry.getKey(), QUOTA_QUALIFIER_SETTINGS_PREFIX)) { 488 String name = Bytes.toString(entry.getKey(), QUOTA_QUALIFIER_SETTINGS_PREFIX.length); 489 if (name.charAt(name.length() - 1) == TableName.NAMESPACE_DELIM) { 490 String namespace = name.substring(0, name.length() - 1); 491 visitor.visitUserQuotas(userName, namespace, quotas); 492 } else { 493 TableName table = TableName.valueOf(name); 494 visitor.visitUserQuotas(userName, table, quotas); 495 } 496 } else if (Bytes.equals(entry.getKey(), QUOTA_QUALIFIER_SETTINGS)) { 497 visitor.visitUserQuotas(userName, quotas); 498 } 499 } 500 } 501 502 /** 503 * Creates a {@link Put} to store the given {@code snapshot} for the given {@code tableName} in 504 * the quota table. 505 */ 506 static Put createPutForSpaceSnapshot(TableName tableName, SpaceQuotaSnapshot snapshot) { 507 Put p = new Put(getTableRowKey(tableName)); 508 p.addColumn( 509 QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY, 510 SpaceQuotaSnapshot.toProtoSnapshot(snapshot).toByteArray()); 511 return p; 512 } 513 514 /** 515 * Creates a {@link Get} for the HBase snapshot's size against the given table. 516 */ 517 static Get makeGetForSnapshotSize(TableName tn, String snapshot) { 518 Get g = new Get(Bytes.add(QUOTA_TABLE_ROW_KEY_PREFIX, Bytes.toBytes(tn.toString()))); 519 g.addColumn( 520 QUOTA_FAMILY_USAGE, 521 Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshot))); 522 return g; 523 } 524 525 /** 526 * Creates a {@link Put} to persist the current size of the {@code snapshot} with respect to 527 * the given {@code table}. 528 */ 529 static Put createPutForSnapshotSize(TableName tableName, String snapshot, long size) { 530 // We just need a pb message with some `long usage`, so we can just reuse the 531 // SpaceQuotaSnapshot message instead of creating a new one. 532 Put p = new Put(getTableRowKey(tableName)); 533 p.addColumn(QUOTA_FAMILY_USAGE, getSnapshotSizeQualifier(snapshot), 534 org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot 535 .newBuilder().setQuotaUsage(size).build().toByteArray()); 536 return p; 537 } 538 539 /** 540 * Creates a {@code Put} for the namespace's total snapshot size. 541 */ 542 static Put createPutForNamespaceSnapshotSize(String namespace, long size) { 543 Put p = new Put(getNamespaceRowKey(namespace)); 544 p.addColumn(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER, 545 org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot 546 .newBuilder().setQuotaUsage(size).build().toByteArray()); 547 return p; 548 } 549 550 /** 551 * Returns a list of {@code Delete} to remove given table snapshot 552 * entries to remove from quota table 553 * @param snapshotEntriesToRemove the entries to remove 554 */ 555 static List<Delete> createDeletesForExistingTableSnapshotSizes( 556 Multimap<TableName, String> snapshotEntriesToRemove) { 557 List<Delete> deletes = new ArrayList<>(); 558 for (Map.Entry<TableName, Collection<String>> entry : snapshotEntriesToRemove.asMap() 559 .entrySet()) { 560 for (String snapshot : entry.getValue()) { 561 Delete d = new Delete(getTableRowKey(entry.getKey())); 562 d.addColumns(QUOTA_FAMILY_USAGE, 563 Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshot))); 564 deletes.add(d); 565 } 566 } 567 return deletes; 568 } 569 570 /** 571 * Returns a list of {@code Delete} to remove all table snapshot entries from quota table. 572 * @param connection connection to re-use 573 */ 574 static List<Delete> createDeletesForExistingTableSnapshotSizes(Connection connection) 575 throws IOException { 576 return createDeletesForExistingSnapshotsFromScan(connection, createScanForSpaceSnapshotSizes()); 577 } 578 579 /** 580 * Returns a list of {@code Delete} to remove given namespace snapshot 581 * entries to removefrom quota table 582 * @param snapshotEntriesToRemove the entries to remove 583 */ 584 static List<Delete> createDeletesForExistingNamespaceSnapshotSizes( 585 Set<String> snapshotEntriesToRemove) { 586 List<Delete> deletes = new ArrayList<>(); 587 for (String snapshot : snapshotEntriesToRemove) { 588 Delete d = new Delete(getNamespaceRowKey(snapshot)); 589 d.addColumns(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER); 590 deletes.add(d); 591 } 592 return deletes; 593 } 594 595 /** 596 * Returns a list of {@code Delete} to remove all namespace snapshot entries from quota table. 597 * @param connection connection to re-use 598 */ 599 static List<Delete> createDeletesForExistingNamespaceSnapshotSizes(Connection connection) 600 throws IOException { 601 return createDeletesForExistingSnapshotsFromScan(connection, 602 createScanForNamespaceSnapshotSizes()); 603 } 604 605 /** 606 * Returns a list of {@code Delete} to remove all entries returned by the passed scanner. 607 * @param connection connection to re-use 608 * @param scan the scanner to use to generate the list of deletes 609 */ 610 static List<Delete> createDeletesForExistingSnapshotsFromScan(Connection connection, Scan scan) 611 throws IOException { 612 List<Delete> deletes = new ArrayList<>(); 613 try (Table quotaTable = connection.getTable(QUOTA_TABLE_NAME); 614 ResultScanner rs = quotaTable.getScanner(scan)) { 615 for (Result r : rs) { 616 CellScanner cs = r.cellScanner(); 617 while (cs.advance()) { 618 Cell c = cs.current(); 619 byte[] family = Bytes.copy(c.getFamilyArray(), c.getFamilyOffset(), c.getFamilyLength()); 620 byte[] qual = 621 Bytes.copy(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()); 622 Delete d = new Delete(r.getRow()); 623 d.addColumns(family, qual); 624 deletes.add(d); 625 } 626 } 627 return deletes; 628 } 629 } 630 631 /** 632 * Remove table usage snapshots (u:p columns) for the namespace passed 633 * @param connection connection to re-use 634 * @param namespace the namespace to fetch the list of table usage snapshots 635 */ 636 static void deleteTableUsageSnapshotsForNamespace(Connection connection, String namespace) 637 throws IOException { 638 Scan s = new Scan(); 639 //Get rows for all tables in namespace 640 s.setRowPrefixFilter(Bytes.add(QUOTA_TABLE_ROW_KEY_PREFIX, Bytes.toBytes(namespace + TableName.NAMESPACE_DELIM))); 641 //Scan for table usage column (u:p) in quota table 642 s.addColumn(QUOTA_FAMILY_USAGE,QUOTA_QUALIFIER_POLICY); 643 //Scan for table quota column (q:s) if table has a space quota defined 644 s.addColumn(QUOTA_FAMILY_INFO,QUOTA_QUALIFIER_SETTINGS); 645 try (Table quotaTable = connection.getTable(QUOTA_TABLE_NAME); 646 ResultScanner rs = quotaTable.getScanner(s)) { 647 for (Result r : rs) { 648 byte[] data = r.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 649 //if table does not have a table space quota defined, delete table usage column (u:p) 650 if (data == null) { 651 Delete delete = new Delete(r.getRow()); 652 delete.addColumns(QUOTA_FAMILY_USAGE,QUOTA_QUALIFIER_POLICY); 653 quotaTable.delete(delete); 654 } 655 } 656 } 657 } 658 659 /** 660 * Fetches the computed size of all snapshots against tables in a namespace for space quotas. 661 */ 662 static long getNamespaceSnapshotSize( 663 Connection conn, String namespace) throws IOException { 664 try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) { 665 Result r = quotaTable.get(createGetNamespaceSnapshotSize(namespace)); 666 if (r.isEmpty()) { 667 return 0L; 668 } 669 r.advance(); 670 return parseSnapshotSize(r.current()); 671 } catch (InvalidProtocolBufferException e) { 672 throw new IOException("Could not parse snapshot size value for namespace " + namespace, e); 673 } 674 } 675 676 /** 677 * Creates a {@code Get} to fetch the namespace's total snapshot size. 678 */ 679 static Get createGetNamespaceSnapshotSize(String namespace) { 680 Get g = new Get(getNamespaceRowKey(namespace)); 681 g.addColumn(QUOTA_FAMILY_USAGE, QUOTA_SNAPSHOT_SIZE_QUALIFIER); 682 return g; 683 } 684 685 /** 686 * Parses the snapshot size from the given Cell's value. 687 */ 688 static long parseSnapshotSize(Cell c) throws InvalidProtocolBufferException { 689 ByteString bs = UnsafeByteOperations.unsafeWrap( 690 c.getValueArray(), c.getValueOffset(), c.getValueLength()); 691 return QuotaProtos.SpaceQuotaSnapshot.parseFrom(bs).getQuotaUsage(); 692 } 693 694 /** 695 * Returns a scanner for all existing namespace snapshot entries. 696 */ 697 static Scan createScanForNamespaceSnapshotSizes() { 698 return createScanForNamespaceSnapshotSizes(null); 699 } 700 701 /** 702 * Returns a scanner for all namespace snapshot entries of the given namespace 703 * @param namespace name of the namespace whose snapshot entries are to be scanned 704 */ 705 static Scan createScanForNamespaceSnapshotSizes(String namespace) { 706 Scan s = new Scan(); 707 if (namespace == null || namespace.isEmpty()) { 708 // Read all namespaces, just look at the row prefix 709 s.setRowPrefixFilter(QUOTA_NAMESPACE_ROW_KEY_PREFIX); 710 } else { 711 // Fetch the exact row for the table 712 byte[] rowkey = getNamespaceRowKey(namespace); 713 // Fetch just this one row 714 s.withStartRow(rowkey).withStopRow(rowkey, true); 715 } 716 717 // Just the usage family and only the snapshot size qualifiers 718 return s.addFamily(QUOTA_FAMILY_USAGE) 719 .setFilter(new ColumnPrefixFilter(QUOTA_SNAPSHOT_SIZE_QUALIFIER)); 720 } 721 722 static Scan createScanForSpaceSnapshotSizes() { 723 return createScanForSpaceSnapshotSizes(null); 724 } 725 726 static Scan createScanForSpaceSnapshotSizes(TableName table) { 727 Scan s = new Scan(); 728 if (null == table) { 729 // Read all tables, just look at the row prefix 730 s.setRowPrefixFilter(QUOTA_TABLE_ROW_KEY_PREFIX); 731 } else { 732 // Fetch the exact row for the table 733 byte[] rowkey = getTableRowKey(table); 734 // Fetch just this one row 735 s.withStartRow(rowkey).withStopRow(rowkey, true); 736 } 737 738 // Just the usage family and only the snapshot size qualifiers 739 return s.addFamily(QUOTA_FAMILY_USAGE).setFilter( 740 new ColumnPrefixFilter(QUOTA_SNAPSHOT_SIZE_QUALIFIER)); 741 } 742 743 /** 744 * Fetches any persisted HBase snapshot sizes stored in the quota table. The sizes here are 745 * computed relative to the table which the snapshot was created from. A snapshot's size will 746 * not include the size of files which the table still refers. These sizes, in bytes, are what 747 * is used internally to compute quota violation for tables and namespaces. 748 * 749 * @return A map of snapshot name to size in bytes per space quota computations 750 */ 751 public static Map<String,Long> getObservedSnapshotSizes(Connection conn) throws IOException { 752 try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME); 753 ResultScanner rs = quotaTable.getScanner(createScanForSpaceSnapshotSizes())) { 754 final Map<String,Long> snapshotSizes = new HashMap<>(); 755 for (Result r : rs) { 756 CellScanner cs = r.cellScanner(); 757 while (cs.advance()) { 758 Cell c = cs.current(); 759 final String snapshot = extractSnapshotNameFromSizeCell(c); 760 final long size = parseSnapshotSize(c); 761 snapshotSizes.put(snapshot, size); 762 } 763 } 764 return snapshotSizes; 765 } 766 } 767 768 /** 769 * Returns a multimap for all existing table snapshot entries. 770 * @param conn connection to re-use 771 */ 772 public static Multimap<TableName, String> getTableSnapshots(Connection conn) throws IOException { 773 try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME); 774 ResultScanner rs = quotaTable.getScanner(createScanForSpaceSnapshotSizes())) { 775 Multimap<TableName, String> snapshots = HashMultimap.create(); 776 for (Result r : rs) { 777 CellScanner cs = r.cellScanner(); 778 while (cs.advance()) { 779 Cell c = cs.current(); 780 781 final String snapshot = extractSnapshotNameFromSizeCell(c); 782 snapshots.put(getTableFromRowKey(r.getRow()), snapshot); 783 } 784 } 785 return snapshots; 786 } 787 } 788 789 /** 790 * Returns a set of the names of all namespaces containing snapshot entries. 791 * @param conn connection to re-use 792 */ 793 public static Set<String> getNamespaceSnapshots(Connection conn) throws IOException { 794 try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME); 795 ResultScanner rs = quotaTable.getScanner(createScanForNamespaceSnapshotSizes())) { 796 Set<String> snapshots = new HashSet<>(); 797 for (Result r : rs) { 798 CellScanner cs = r.cellScanner(); 799 while (cs.advance()) { 800 cs.current(); 801 snapshots.add(getNamespaceFromRowKey(r.getRow())); 802 } 803 } 804 return snapshots; 805 } 806 } 807 808 /** 809 * Returns the current space quota snapshot of the given {@code tableName} from 810 * {@code QuotaTableUtil.QUOTA_TABLE_NAME} or null if the no quota information is available for 811 * that tableName. 812 * @param conn connection to re-use 813 * @param tableName name of the table whose current snapshot is to be retreived 814 */ 815 public static SpaceQuotaSnapshot getCurrentSnapshotFromQuotaTable(Connection conn, 816 TableName tableName) throws IOException { 817 try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) { 818 Map<TableName, SpaceQuotaSnapshot> snapshots = new HashMap<>(1); 819 Result result = quotaTable.get(makeQuotaSnapshotGetForTable(tableName)); 820 // if we don't have any row corresponding to this get, return null 821 if (result.isEmpty()) { 822 return null; 823 } 824 // otherwise, extract quota snapshot in snapshots object 825 extractQuotaSnapshot(result, snapshots); 826 return snapshots.get(tableName); 827 } 828 } 829 830 /* ========================================================================= 831 * Quotas protobuf helpers 832 */ 833 protected static Quotas quotasFromData(final byte[] data) throws IOException { 834 return quotasFromData(data, 0, data.length); 835 } 836 837 protected static Quotas quotasFromData( 838 final byte[] data, int offset, int length) throws IOException { 839 int magicLen = ProtobufMagic.lengthOfPBMagic(); 840 if (!ProtobufMagic.isPBMagicPrefix(data, offset, magicLen)) { 841 throw new IOException("Missing pb magic prefix"); 842 } 843 return Quotas.parseFrom(new ByteArrayInputStream(data, offset + magicLen, length - magicLen)); 844 } 845 846 protected static byte[] quotasToData(final Quotas data) throws IOException { 847 ByteArrayOutputStream stream = new ByteArrayOutputStream(); 848 stream.write(ProtobufMagic.PB_MAGIC); 849 data.writeTo(stream); 850 return stream.toByteArray(); 851 } 852 853 public static boolean isEmptyQuota(final Quotas quotas) { 854 boolean hasSettings = false; 855 hasSettings |= quotas.hasThrottle(); 856 hasSettings |= quotas.hasBypassGlobals(); 857 // Only when there is a space quota, make sure there's actually both fields provided 858 // Otherwise, it's a noop. 859 if (quotas.hasSpace()) { 860 hasSettings |= (quotas.getSpace().hasSoftLimit() && quotas.getSpace().hasViolationPolicy()); 861 } 862 return !hasSettings; 863 } 864 865 /* ========================================================================= 866 * HTable helpers 867 */ 868 protected static Result doGet(final Connection connection, final Get get) 869 throws IOException { 870 try (Table table = connection.getTable(QUOTA_TABLE_NAME)) { 871 return table.get(get); 872 } 873 } 874 875 protected static Result[] doGet(final Connection connection, final List<Get> gets) 876 throws IOException { 877 try (Table table = connection.getTable(QUOTA_TABLE_NAME)) { 878 return table.get(gets); 879 } 880 } 881 882 /* ========================================================================= 883 * Quota table row key helpers 884 */ 885 protected static byte[] getUserRowKey(final String user) { 886 return Bytes.add(QUOTA_USER_ROW_KEY_PREFIX, Bytes.toBytes(user)); 887 } 888 889 protected static byte[] getTableRowKey(final TableName table) { 890 return Bytes.add(QUOTA_TABLE_ROW_KEY_PREFIX, table.getName()); 891 } 892 893 protected static byte[] getNamespaceRowKey(final String namespace) { 894 return Bytes.add(QUOTA_NAMESPACE_ROW_KEY_PREFIX, Bytes.toBytes(namespace)); 895 } 896 897 protected static byte[] getRegionServerRowKey(final String regionServer) { 898 return Bytes.add(QUOTA_REGION_SERVER_ROW_KEY_PREFIX, Bytes.toBytes(regionServer)); 899 } 900 901 protected static byte[] getSettingsQualifierForUserTable(final TableName tableName) { 902 return Bytes.add(QUOTA_QUALIFIER_SETTINGS_PREFIX, tableName.getName()); 903 } 904 905 protected static byte[] getSettingsQualifierForUserNamespace(final String namespace) { 906 return Bytes.add(QUOTA_QUALIFIER_SETTINGS_PREFIX, 907 Bytes.toBytes(namespace + TableName.NAMESPACE_DELIM)); 908 } 909 910 protected static String getUserRowKeyRegex(final String user) { 911 return getRowKeyRegEx(QUOTA_USER_ROW_KEY_PREFIX, user); 912 } 913 914 protected static String getTableRowKeyRegex(final String table) { 915 return getRowKeyRegEx(QUOTA_TABLE_ROW_KEY_PREFIX, table); 916 } 917 918 protected static String getNamespaceRowKeyRegex(final String namespace) { 919 return getRowKeyRegEx(QUOTA_NAMESPACE_ROW_KEY_PREFIX, namespace); 920 } 921 922 private static String getRegionServerRowKeyRegex(final String regionServer) { 923 return getRowKeyRegEx(QUOTA_REGION_SERVER_ROW_KEY_PREFIX, regionServer); 924 } 925 926 protected static byte[] getExceedThrottleQuotaRowKey() { 927 return QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY; 928 } 929 930 private static String getRowKeyRegEx(final byte[] prefix, final String regex) { 931 return '^' + Pattern.quote(Bytes.toString(prefix)) + regex + '$'; 932 } 933 934 protected static String getSettingsQualifierRegexForUserTable(final String table) { 935 return '^' + Pattern.quote(Bytes.toString(QUOTA_QUALIFIER_SETTINGS_PREFIX)) + 936 table + "(?<!" + Pattern.quote(Character.toString(TableName.NAMESPACE_DELIM)) + ")$"; 937 } 938 939 protected static String getSettingsQualifierRegexForUserNamespace(final String namespace) { 940 return '^' + Pattern.quote(Bytes.toString(QUOTA_QUALIFIER_SETTINGS_PREFIX)) + 941 namespace + Pattern.quote(Character.toString(TableName.NAMESPACE_DELIM)) + '$'; 942 } 943 944 protected static boolean isNamespaceRowKey(final byte[] key) { 945 return Bytes.startsWith(key, QUOTA_NAMESPACE_ROW_KEY_PREFIX); 946 } 947 948 protected static String getNamespaceFromRowKey(final byte[] key) { 949 return Bytes.toString(key, QUOTA_NAMESPACE_ROW_KEY_PREFIX.length); 950 } 951 952 protected static boolean isRegionServerRowKey(final byte[] key) { 953 return Bytes.startsWith(key, QUOTA_REGION_SERVER_ROW_KEY_PREFIX); 954 } 955 956 private static boolean isExceedThrottleQuotaRowKey(final byte[] key) { 957 return Bytes.equals(key, QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY); 958 } 959 960 protected static String getRegionServerFromRowKey(final byte[] key) { 961 return Bytes.toString(key, QUOTA_REGION_SERVER_ROW_KEY_PREFIX.length); 962 } 963 964 protected static boolean isTableRowKey(final byte[] key) { 965 return Bytes.startsWith(key, QUOTA_TABLE_ROW_KEY_PREFIX); 966 } 967 968 protected static TableName getTableFromRowKey(final byte[] key) { 969 return TableName.valueOf(Bytes.toString(key, QUOTA_TABLE_ROW_KEY_PREFIX.length)); 970 } 971 972 protected static boolean isUserRowKey(final byte[] key) { 973 return Bytes.startsWith(key, QUOTA_USER_ROW_KEY_PREFIX); 974 } 975 976 protected static String getUserFromRowKey(final byte[] key) { 977 return Bytes.toString(key, QUOTA_USER_ROW_KEY_PREFIX.length); 978 } 979 980 protected static SpaceQuota getProtoViolationPolicy(SpaceViolationPolicy policy) { 981 return SpaceQuota.newBuilder() 982 .setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy)) 983 .build(); 984 } 985 986 protected static SpaceViolationPolicy getViolationPolicy(SpaceQuota proto) { 987 if (!proto.hasViolationPolicy()) { 988 throw new IllegalArgumentException("Protobuf SpaceQuota does not have violation policy."); 989 } 990 return ProtobufUtil.toViolationPolicy(proto.getViolationPolicy()); 991 } 992 993 protected static byte[] getSnapshotSizeQualifier(String snapshotName) { 994 return Bytes.add(QUOTA_SNAPSHOT_SIZE_QUALIFIER, Bytes.toBytes(snapshotName)); 995 } 996 997 protected static String extractSnapshotNameFromSizeCell(Cell c) { 998 return Bytes.toString( 999 c.getQualifierArray(), c.getQualifierOffset() + QUOTA_SNAPSHOT_SIZE_QUALIFIER.length, 1000 c.getQualifierLength() - QUOTA_SNAPSHOT_SIZE_QUALIFIER.length); 1001 } 1002 1003 protected static long extractSnapshotSize( 1004 byte[] data, int offset, int length) throws InvalidProtocolBufferException { 1005 ByteString byteStr = UnsafeByteOperations.unsafeWrap(data, offset, length); 1006 return org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuotaSnapshot 1007 .parseFrom(byteStr).getQuotaUsage(); 1008 } 1009}