001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to you under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.hadoop.hbase.quotas; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Map; 027import java.util.Map.Entry; 028import java.util.Objects; 029import java.util.Set; 030import java.util.concurrent.TimeUnit; 031import java.util.function.Predicate; 032import java.util.stream.Collectors; 033 034import org.apache.commons.lang3.builder.HashCodeBuilder; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HRegionInfo; 040import org.apache.hadoop.hbase.ScheduledChore; 041import org.apache.hadoop.hbase.Stoppable; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046import org.apache.hadoop.hbase.client.Admin; 047import org.apache.hadoop.hbase.client.Connection; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.master.HMaster; 050import org.apache.hadoop.hbase.master.MetricsMaster; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.FamilyFiles; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.StoreFile; 055import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; 056import org.apache.hadoop.hbase.snapshot.SnapshotManifest; 057import org.apache.hadoop.hbase.util.FSUtils; 058import org.apache.hadoop.hbase.util.HFileArchiveUtil; 059import org.apache.hadoop.util.StringUtils; 060 061import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 062import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 063 064/** 065 * A Master-invoked {@code Chore} that computes the size of each snapshot which was created from 066 * a table which has a space quota. 067 */ 068@InterfaceAudience.Private 069public class SnapshotQuotaObserverChore extends ScheduledChore { 070 private static final Logger LOG = LoggerFactory.getLogger(SnapshotQuotaObserverChore.class); 071 static final String SNAPSHOT_QUOTA_CHORE_PERIOD_KEY = 072 "hbase.master.quotas.snapshot.chore.period"; 073 static final int SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis 074 075 static final String SNAPSHOT_QUOTA_CHORE_DELAY_KEY = 076 "hbase.master.quotas.snapshot.chore.delay"; 077 static final long SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute in millis 078 079 static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY = 080 "hbase.master.quotas.snapshot.chore.timeunit"; 081 static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name(); 082 083 private final Connection conn; 084 private final Configuration conf; 085 private final MetricsMaster metrics; 086 private final FileSystem fs; 087 088 public SnapshotQuotaObserverChore(HMaster master, MetricsMaster metrics) { 089 this( 090 master.getConnection(), master.getConfiguration(), master.getFileSystem(), master, metrics); 091 } 092 093 SnapshotQuotaObserverChore( 094 Connection conn, Configuration conf, FileSystem fs, Stoppable stopper, 095 MetricsMaster metrics) { 096 super( 097 QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf), 098 getInitialDelay(conf), getTimeUnit(conf)); 099 this.conn = conn; 100 this.conf = conf; 101 this.metrics = metrics; 102 this.fs = fs; 103 } 104 105 @Override 106 protected void chore() { 107 try { 108 if (LOG.isTraceEnabled()) { 109 LOG.trace("Computing sizes of snapshots for quota management."); 110 } 111 long start = System.nanoTime(); 112 _chore(); 113 if (null != metrics) { 114 metrics.incrementSnapshotObserverTime((System.nanoTime() - start) / 1_000_000); 115 } 116 } catch (IOException e) { 117 LOG.warn("Failed to compute the size of snapshots, will retry", e); 118 } 119 } 120 121 void _chore() throws IOException { 122 // Gets all tables with quotas that also have snapshots. 123 // This values are all of the snapshots that we need to compute the size of. 124 long start = System.nanoTime(); 125 Multimap<TableName,String> snapshotsToComputeSize = getSnapshotsToComputeSize(); 126 if (null != metrics) { 127 metrics.incrementSnapshotFetchTime((System.nanoTime() - start) / 1_000_000); 128 } 129 130 // For each table, compute the size of each snapshot 131 Multimap<TableName,SnapshotWithSize> snapshotsWithSize = computeSnapshotSizes( 132 snapshotsToComputeSize); 133 134 // Write the size data to the quota table. 135 persistSnapshotSizes(snapshotsWithSize); 136 } 137 138 /** 139 * Fetches each table with a quota (table or namespace quota), and then fetch the name of each 140 * snapshot which was created from that table. 141 * 142 * @return A mapping of table to snapshots created from that table 143 */ 144 Multimap<TableName,String> getSnapshotsToComputeSize() throws IOException { 145 Set<TableName> tablesToFetchSnapshotsFrom = new HashSet<>(); 146 QuotaFilter filter = new QuotaFilter(); 147 filter.addTypeFilter(QuotaType.SPACE); 148 try (Admin admin = conn.getAdmin()) { 149 // Pull all of the tables that have quotas (direct, or from namespace) 150 for (QuotaSettings qs : QuotaRetriever.open(conf, filter)) { 151 String ns = qs.getNamespace(); 152 TableName tn = qs.getTableName(); 153 if ((null == ns && null == tn) || (null != ns && null != tn)) { 154 throw new IllegalStateException( 155 "Expected only one of namespace and tablename to be null"); 156 } 157 // Collect either the table name itself, or all of the tables in the namespace 158 if (null != ns) { 159 tablesToFetchSnapshotsFrom.addAll(Arrays.asList(admin.listTableNamesByNamespace(ns))); 160 } else { 161 tablesToFetchSnapshotsFrom.add(tn); 162 } 163 } 164 // Fetch all snapshots that were created from these tables 165 return getSnapshotsFromTables(admin, tablesToFetchSnapshotsFrom); 166 } 167 } 168 169 /** 170 * Computes a mapping of originating {@code TableName} to snapshots, when the {@code TableName} 171 * exists in the provided {@code Set}. 172 */ 173 Multimap<TableName,String> getSnapshotsFromTables( 174 Admin admin, Set<TableName> tablesToFetchSnapshotsFrom) throws IOException { 175 Multimap<TableName,String> snapshotsToCompute = HashMultimap.create(); 176 for (org.apache.hadoop.hbase.client.SnapshotDescription sd : admin.listSnapshots()) { 177 TableName tn = sd.getTableName(); 178 if (tablesToFetchSnapshotsFrom.contains(tn)) { 179 snapshotsToCompute.put(tn, sd.getName()); 180 } 181 } 182 return snapshotsToCompute; 183 } 184 185 /** 186 * Computes the size of each snapshot provided given the current files referenced by the table. 187 * 188 * @param snapshotsToComputeSize The snapshots to compute the size of 189 * @return A mapping of table to snapshot created from that table and the snapshot's size. 190 */ 191 Multimap<TableName,SnapshotWithSize> computeSnapshotSizes( 192 Multimap<TableName,String> snapshotsToComputeSize) throws IOException { 193 Multimap<TableName,SnapshotWithSize> snapshotSizes = HashMultimap.create(); 194 for (Entry<TableName,Collection<String>> entry : snapshotsToComputeSize.asMap().entrySet()) { 195 final TableName tn = entry.getKey(); 196 final List<String> snapshotNames = new ArrayList<>(entry.getValue()); 197 // Sort the snapshots so we process them in lexicographic order. This ensures that multiple 198 // invocations of this Chore do not more the size ownership of some files between snapshots 199 // that reference the file (prevents size ownership from moving between snapshots). 200 Collections.sort(snapshotNames); 201 final Path rootDir = FSUtils.getRootDir(conf); 202 // Get the map of store file names to store file path for this table 203 // TODO is the store-file name unique enough? Does this need to be region+family+storefile? 204 final Set<String> tableReferencedStoreFiles; 205 try { 206 tableReferencedStoreFiles = FSUtils.getTableStoreFilePathMap(fs, rootDir).keySet(); 207 } catch (InterruptedException e) { 208 Thread.currentThread().interrupt(); 209 return null; 210 } 211 212 if (LOG.isTraceEnabled()) { 213 LOG.trace("Paths for " + tn + ": " + tableReferencedStoreFiles); 214 } 215 216 // For each snapshot on this table, get the files which the snapshot references which 217 // the table does not. 218 Set<String> snapshotReferencedFiles = new HashSet<>(); 219 for (String snapshotName : snapshotNames) { 220 final long start = System.nanoTime(); 221 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); 222 SnapshotDescription sd = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 223 SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, sd); 224 225 if (LOG.isTraceEnabled()) { 226 LOG.trace("Files referenced by other snapshots: " + snapshotReferencedFiles); 227 } 228 229 // Get the set of files from the manifest that this snapshot references which are not also 230 // referenced by the originating table. 231 Set<StoreFileReference> unreferencedStoreFileNames = getStoreFilesFromSnapshot( 232 manifest, (sfn) -> !tableReferencedStoreFiles.contains(sfn) 233 && !snapshotReferencedFiles.contains(sfn)); 234 235 if (LOG.isTraceEnabled()) { 236 LOG.trace("Snapshot " + snapshotName + " solely references the files: " 237 + unreferencedStoreFileNames); 238 } 239 240 // Compute the size of the store files for this snapshot 241 long size = getSizeOfStoreFiles(tn, unreferencedStoreFileNames); 242 if (LOG.isTraceEnabled()) { 243 LOG.trace("Computed size of " + snapshotName + " to be " + size); 244 } 245 246 // Persist this snapshot's size into the map 247 snapshotSizes.put(tn, new SnapshotWithSize(snapshotName, size)); 248 249 // Make sure that we don't double-count the same file 250 for (StoreFileReference ref : unreferencedStoreFileNames) { 251 for (String fileName : ref.getFamilyToFilesMapping().values()) { 252 snapshotReferencedFiles.add(fileName); 253 } 254 } 255 // Update the amount of time it took to compute the snapshot's size 256 if (null != metrics) { 257 metrics.incrementSnapshotSizeComputationTime((System.nanoTime() - start) / 1_000_000); 258 } 259 } 260 } 261 return snapshotSizes; 262 } 263 264 /** 265 * Extracts the names of the store files referenced by this snapshot which satisfy the given 266 * predicate (the predicate returns {@code true}). 267 */ 268 Set<StoreFileReference> getStoreFilesFromSnapshot( 269 SnapshotManifest manifest, Predicate<String> filter) { 270 Set<StoreFileReference> references = new HashSet<>(); 271 // For each region referenced by the snapshot 272 for (SnapshotRegionManifest rm : manifest.getRegionManifests()) { 273 StoreFileReference regionReference = new StoreFileReference( 274 HRegionInfo.convert(rm.getRegionInfo()).getEncodedName()); 275 276 // For each column family in this region 277 for (FamilyFiles ff : rm.getFamilyFilesList()) { 278 final String familyName = ff.getFamilyName().toStringUtf8(); 279 // And each store file in that family 280 for (StoreFile sf : ff.getStoreFilesList()) { 281 String storeFileName = sf.getName(); 282 // A snapshot only "inherits" a files size if it uniquely refers to it (no table 283 // and no other snapshot references it). 284 if (filter.test(storeFileName)) { 285 regionReference.addFamilyStoreFile(familyName, storeFileName); 286 } 287 } 288 } 289 // Only add this Region reference if we retained any files. 290 if (!regionReference.getFamilyToFilesMapping().isEmpty()) { 291 references.add(regionReference); 292 } 293 } 294 return references; 295 } 296 297 /** 298 * Calculates the directory in HDFS for a table based on the configuration. 299 */ 300 Path getTableDir(TableName tn) throws IOException { 301 Path rootDir = FSUtils.getRootDir(conf); 302 return FSUtils.getTableDir(rootDir, tn); 303 } 304 305 /** 306 * Computes the size of each store file in {@code storeFileNames} 307 */ 308 long getSizeOfStoreFiles(TableName tn, Set<StoreFileReference> storeFileNames) { 309 return storeFileNames.stream() 310 .collect(Collectors.summingLong((sfr) -> getSizeOfStoreFile(tn, sfr))); 311 } 312 313 /** 314 * Computes the size of the store files for a single region. 315 */ 316 long getSizeOfStoreFile(TableName tn, StoreFileReference storeFileName) { 317 String regionName = storeFileName.getRegionName(); 318 return storeFileName.getFamilyToFilesMapping() 319 .entries().stream() 320 .collect(Collectors.summingLong((e) -> 321 getSizeOfStoreFile(tn, regionName, e.getKey(), e.getValue()))); 322 } 323 324 /** 325 * Computes the size of the store file given its name, region and family name in 326 * the archive directory. 327 */ 328 long getSizeOfStoreFile( 329 TableName tn, String regionName, String family, String storeFile) { 330 Path familyArchivePath; 331 try { 332 familyArchivePath = HFileArchiveUtil.getStoreArchivePath(conf, tn, regionName, family); 333 } catch (IOException e) { 334 LOG.warn("Could not compute path for the archive directory for the region", e); 335 return 0L; 336 } 337 Path fileArchivePath = new Path(familyArchivePath, storeFile); 338 try { 339 if (fs.exists(fileArchivePath)) { 340 FileStatus[] status = fs.listStatus(fileArchivePath); 341 if (1 != status.length) { 342 LOG.warn("Expected " + fileArchivePath + 343 " to be a file but was a directory, ignoring reference"); 344 return 0L; 345 } 346 return status[0].getLen(); 347 } 348 } catch (IOException e) { 349 LOG.warn("Could not obtain the status of " + fileArchivePath, e); 350 return 0L; 351 } 352 LOG.warn("Expected " + fileArchivePath + " to exist but does not, ignoring reference."); 353 return 0L; 354 } 355 356 /** 357 * Writes the snapshot sizes to the {@code hbase:quota} table. 358 * 359 * @param snapshotsWithSize The snapshot sizes to write. 360 */ 361 void persistSnapshotSizes( 362 Multimap<TableName,SnapshotWithSize> snapshotsWithSize) throws IOException { 363 try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) { 364 // Write each snapshot size for the table 365 persistSnapshotSizes(quotaTable, snapshotsWithSize); 366 // Write a size entry for all snapshots in a namespace 367 persistSnapshotSizesByNS(quotaTable, snapshotsWithSize); 368 } 369 } 370 371 /** 372 * Writes the snapshot sizes to the provided {@code table}. 373 */ 374 void persistSnapshotSizes( 375 Table table, Multimap<TableName,SnapshotWithSize> snapshotsWithSize) throws IOException { 376 // Convert each entry in the map to a Put and write them to the quota table 377 table.put(snapshotsWithSize.entries() 378 .stream() 379 .map(e -> QuotaTableUtil.createPutForSnapshotSize( 380 e.getKey(), e.getValue().getName(), e.getValue().getSize())) 381 .collect(Collectors.toList())); 382 } 383 384 /** 385 * Rolls up the snapshot sizes by namespace and writes a single record for each namespace 386 * which is the size of all snapshots in that namespace. 387 */ 388 void persistSnapshotSizesByNS( 389 Table quotaTable, Multimap<TableName,SnapshotWithSize> snapshotsWithSize) throws IOException { 390 Map<String,Long> namespaceSnapshotSizes = groupSnapshotSizesByNamespace(snapshotsWithSize); 391 quotaTable.put(namespaceSnapshotSizes.entrySet().stream() 392 .map(e -> QuotaTableUtil.createPutForNamespaceSnapshotSize( 393 e.getKey(), e.getValue())) 394 .collect(Collectors.toList())); 395 } 396 397 /** 398 * Sums the snapshot sizes for each namespace. 399 */ 400 Map<String,Long> groupSnapshotSizesByNamespace( 401 Multimap<TableName,SnapshotWithSize> snapshotsWithSize) { 402 return snapshotsWithSize.entries().stream() 403 .collect(Collectors.groupingBy( 404 // Convert TableName into the namespace string 405 (e) -> e.getKey().getNamespaceAsString(), 406 // Sum the values for namespace 407 Collectors.mapping( 408 Map.Entry::getValue, Collectors.summingLong((sws) -> sws.getSize())))); 409 } 410 411 /** 412 * A struct encapsulating the name of a snapshot and its "size" on the filesystem. This size is 413 * defined as the amount of filesystem space taken by the files the snapshot refers to which 414 * the originating table no longer refers to. 415 */ 416 static class SnapshotWithSize { 417 private final String name; 418 private final long size; 419 420 SnapshotWithSize(String name, long size) { 421 this.name = Objects.requireNonNull(name); 422 this.size = size; 423 } 424 425 String getName() { 426 return name; 427 } 428 429 long getSize() { 430 return size; 431 } 432 433 @Override 434 public int hashCode() { 435 return new HashCodeBuilder().append(name).append(size).toHashCode(); 436 } 437 438 @Override 439 public boolean equals(Object o) { 440 if (this == o) { 441 return true; 442 } 443 444 if (!(o instanceof SnapshotWithSize)) { 445 return false; 446 } 447 448 SnapshotWithSize other = (SnapshotWithSize) o; 449 return name.equals(other.name) && size == other.size; 450 } 451 452 @Override 453 public String toString() { 454 StringBuilder sb = new StringBuilder(32); 455 return sb.append("SnapshotWithSize:[").append(name).append(" ") 456 .append(StringUtils.byteDesc(size)).append("]").toString(); 457 } 458 } 459 460 /** 461 * A reference to a collection of files in the archive directory for a single region. 462 */ 463 static class StoreFileReference { 464 private final String regionName; 465 private final Multimap<String,String> familyToFiles; 466 467 StoreFileReference(String regionName) { 468 this.regionName = Objects.requireNonNull(regionName); 469 familyToFiles = HashMultimap.create(); 470 } 471 472 String getRegionName() { 473 return regionName; 474 } 475 476 Multimap<String,String> getFamilyToFilesMapping() { 477 return familyToFiles; 478 } 479 480 void addFamilyStoreFile(String family, String storeFileName) { 481 familyToFiles.put(family, storeFileName); 482 } 483 484 @Override 485 public int hashCode() { 486 return new HashCodeBuilder().append(regionName).append(familyToFiles).toHashCode(); 487 } 488 489 @Override 490 public boolean equals(Object o) { 491 if (this == o) { 492 return true; 493 } 494 if (!(o instanceof StoreFileReference)) { 495 return false; 496 } 497 StoreFileReference other = (StoreFileReference) o; 498 return regionName.equals(other.regionName) && familyToFiles.equals(other.familyToFiles); 499 } 500 501 @Override 502 public String toString() { 503 StringBuilder sb = new StringBuilder(); 504 return sb.append("StoreFileReference[region=").append(regionName).append(", files=") 505 .append(familyToFiles).append("]").toString(); 506 } 507 } 508 509 /** 510 * Extracts the period for the chore from the configuration. 511 * 512 * @param conf The configuration object. 513 * @return The configured chore period or the default value. 514 */ 515 static int getPeriod(Configuration conf) { 516 return conf.getInt(SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 517 SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT); 518 } 519 520 /** 521 * Extracts the initial delay for the chore from the configuration. 522 * 523 * @param conf The configuration object. 524 * @return The configured chore initial delay or the default value. 525 */ 526 static long getInitialDelay(Configuration conf) { 527 return conf.getLong(SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 528 SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT); 529 } 530 531 /** 532 * Extracts the time unit for the chore period and initial delay from the configuration. The 533 * configuration value for {@link #SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY} must correspond to 534 * a {@link TimeUnit} value. 535 * 536 * @param conf The configuration object. 537 * @return The configured time unit for the chore period and initial delay or the default value. 538 */ 539 static TimeUnit getTimeUnit(Configuration conf) { 540 return TimeUnit.valueOf(conf.get(SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY, 541 SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT)); 542 } 543}