001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to you under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.hadoop.hbase.quotas;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.Objects;
029import java.util.Set;
030import java.util.concurrent.TimeUnit;
031import java.util.function.Predicate;
032import java.util.stream.Collectors;
033
034import org.apache.commons.lang3.builder.HashCodeBuilder;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HRegionInfo;
040import org.apache.hadoop.hbase.ScheduledChore;
041import org.apache.hadoop.hbase.Stoppable;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046import org.apache.hadoop.hbase.client.Admin;
047import org.apache.hadoop.hbase.client.Connection;
048import org.apache.hadoop.hbase.client.Table;
049import org.apache.hadoop.hbase.master.HMaster;
050import org.apache.hadoop.hbase.master.MetricsMaster;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.FamilyFiles;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.StoreFile;
055import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
056import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
057import org.apache.hadoop.hbase.util.FSUtils;
058import org.apache.hadoop.hbase.util.HFileArchiveUtil;
059import org.apache.hadoop.util.StringUtils;
060
061import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
062import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
063
064/**
065 * A Master-invoked {@code Chore} that computes the size of each snapshot which was created from
066 * a table which has a space quota.
067 */
068@InterfaceAudience.Private
069public class SnapshotQuotaObserverChore extends ScheduledChore {
070  private static final Logger LOG = LoggerFactory.getLogger(SnapshotQuotaObserverChore.class);
071  static final String SNAPSHOT_QUOTA_CHORE_PERIOD_KEY =
072      "hbase.master.quotas.snapshot.chore.period";
073  static final int SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
074
075  static final String SNAPSHOT_QUOTA_CHORE_DELAY_KEY =
076      "hbase.master.quotas.snapshot.chore.delay";
077  static final long SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute in millis
078
079  static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY =
080      "hbase.master.quotas.snapshot.chore.timeunit";
081  static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
082
083  private final Connection conn;
084  private final Configuration conf;
085  private final MetricsMaster metrics;
086  private final FileSystem fs;
087
088  public SnapshotQuotaObserverChore(HMaster master, MetricsMaster metrics) {
089    this(
090        master.getConnection(), master.getConfiguration(), master.getFileSystem(), master, metrics);
091  }
092
093  SnapshotQuotaObserverChore(
094      Connection conn, Configuration conf, FileSystem fs, Stoppable stopper,
095      MetricsMaster metrics) {
096    super(
097        QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf),
098        getInitialDelay(conf), getTimeUnit(conf));
099    this.conn = conn;
100    this.conf = conf;
101    this.metrics = metrics;
102    this.fs = fs;
103  }
104
105  @Override
106  protected void chore() {
107    try {
108      if (LOG.isTraceEnabled()) {
109        LOG.trace("Computing sizes of snapshots for quota management.");
110      }
111      long start = System.nanoTime();
112      _chore();
113      if (null != metrics) {
114        metrics.incrementSnapshotObserverTime((System.nanoTime() - start) / 1_000_000);
115      }
116    } catch (IOException e) {
117      LOG.warn("Failed to compute the size of snapshots, will retry", e);
118    }
119  }
120
121  void _chore() throws IOException {
122    // Gets all tables with quotas that also have snapshots.
123    // This values are all of the snapshots that we need to compute the size of.
124    long start = System.nanoTime();
125    Multimap<TableName,String> snapshotsToComputeSize = getSnapshotsToComputeSize();
126    if (null != metrics) {
127      metrics.incrementSnapshotFetchTime((System.nanoTime() - start) / 1_000_000);
128    }
129
130    // For each table, compute the size of each snapshot
131    Multimap<TableName,SnapshotWithSize> snapshotsWithSize = computeSnapshotSizes(
132        snapshotsToComputeSize);
133
134    // Write the size data to the quota table.
135    persistSnapshotSizes(snapshotsWithSize);
136  }
137
138  /**
139   * Fetches each table with a quota (table or namespace quota), and then fetch the name of each
140   * snapshot which was created from that table.
141   *
142   * @return A mapping of table to snapshots created from that table
143   */
144  Multimap<TableName,String> getSnapshotsToComputeSize() throws IOException {
145    Set<TableName> tablesToFetchSnapshotsFrom = new HashSet<>();
146    QuotaFilter filter = new QuotaFilter();
147    filter.addTypeFilter(QuotaType.SPACE);
148    try (Admin admin = conn.getAdmin()) {
149      // Pull all of the tables that have quotas (direct, or from namespace)
150      for (QuotaSettings qs : QuotaRetriever.open(conf, filter)) {
151        String ns = qs.getNamespace();
152        TableName tn = qs.getTableName();
153        if ((null == ns && null == tn) || (null != ns && null != tn)) {
154          throw new IllegalStateException(
155              "Expected only one of namespace and tablename to be null");
156        }
157        // Collect either the table name itself, or all of the tables in the namespace
158        if (null != ns) {
159          tablesToFetchSnapshotsFrom.addAll(Arrays.asList(admin.listTableNamesByNamespace(ns)));
160        } else {
161          tablesToFetchSnapshotsFrom.add(tn);
162        }
163      }
164      // Fetch all snapshots that were created from these tables
165      return getSnapshotsFromTables(admin, tablesToFetchSnapshotsFrom);
166    }
167  }
168
169  /**
170   * Computes a mapping of originating {@code TableName} to snapshots, when the {@code TableName}
171   * exists in the provided {@code Set}.
172   */
173  Multimap<TableName,String> getSnapshotsFromTables(
174      Admin admin, Set<TableName> tablesToFetchSnapshotsFrom) throws IOException {
175    Multimap<TableName,String> snapshotsToCompute = HashMultimap.create();
176    for (org.apache.hadoop.hbase.client.SnapshotDescription sd : admin.listSnapshots()) {
177      TableName tn = sd.getTableName();
178      if (tablesToFetchSnapshotsFrom.contains(tn)) {
179        snapshotsToCompute.put(tn, sd.getName());
180      }
181    }
182    return snapshotsToCompute;
183  }
184
185  /**
186   * Computes the size of each snapshot provided given the current files referenced by the table.
187   *
188   * @param snapshotsToComputeSize The snapshots to compute the size of
189   * @return A mapping of table to snapshot created from that table and the snapshot's size.
190   */
191  Multimap<TableName,SnapshotWithSize> computeSnapshotSizes(
192      Multimap<TableName,String> snapshotsToComputeSize) throws IOException {
193    Multimap<TableName,SnapshotWithSize> snapshotSizes = HashMultimap.create();
194    for (Entry<TableName,Collection<String>> entry : snapshotsToComputeSize.asMap().entrySet()) {
195      final TableName tn = entry.getKey();
196      final List<String> snapshotNames = new ArrayList<>(entry.getValue());
197      // Sort the snapshots so we process them in lexicographic order. This ensures that multiple
198      // invocations of this Chore do not more the size ownership of some files between snapshots
199      // that reference the file (prevents size ownership from moving between snapshots).
200      Collections.sort(snapshotNames);
201      final Path rootDir = FSUtils.getRootDir(conf);
202      // Get the map of store file names to store file path for this table
203      // TODO is the store-file name unique enough? Does this need to be region+family+storefile?
204      final Set<String> tableReferencedStoreFiles;
205      try {
206        tableReferencedStoreFiles = FSUtils.getTableStoreFilePathMap(fs, rootDir).keySet();
207      } catch (InterruptedException e) {
208        Thread.currentThread().interrupt();
209        return null;
210      }
211
212      if (LOG.isTraceEnabled()) {
213        LOG.trace("Paths for " + tn + ": " + tableReferencedStoreFiles);
214      }
215
216      // For each snapshot on this table, get the files which the snapshot references which
217      // the table does not.
218      Set<String> snapshotReferencedFiles = new HashSet<>();
219      for (String snapshotName : snapshotNames) {
220        final long start = System.nanoTime();
221        Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
222        SnapshotDescription sd = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
223        SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, sd);
224
225        if (LOG.isTraceEnabled()) {
226          LOG.trace("Files referenced by other snapshots: " + snapshotReferencedFiles);
227        }
228
229        // Get the set of files from the manifest that this snapshot references which are not also
230        // referenced by the originating table.
231        Set<StoreFileReference> unreferencedStoreFileNames = getStoreFilesFromSnapshot(
232            manifest, (sfn) -> !tableReferencedStoreFiles.contains(sfn)
233                && !snapshotReferencedFiles.contains(sfn));
234
235        if (LOG.isTraceEnabled()) {
236          LOG.trace("Snapshot " + snapshotName + " solely references the files: "
237              + unreferencedStoreFileNames);
238        }
239
240        // Compute the size of the store files for this snapshot
241        long size = getSizeOfStoreFiles(tn, unreferencedStoreFileNames);
242        if (LOG.isTraceEnabled()) {
243          LOG.trace("Computed size of " + snapshotName + " to be " + size);
244        }
245
246        // Persist this snapshot's size into the map
247        snapshotSizes.put(tn, new SnapshotWithSize(snapshotName, size));
248
249        // Make sure that we don't double-count the same file
250        for (StoreFileReference ref : unreferencedStoreFileNames) {
251          for (String fileName : ref.getFamilyToFilesMapping().values()) {
252            snapshotReferencedFiles.add(fileName);
253          }
254        }
255        // Update the amount of time it took to compute the snapshot's size
256        if (null != metrics) {
257          metrics.incrementSnapshotSizeComputationTime((System.nanoTime() - start) / 1_000_000);
258        }
259      }
260    }
261    return snapshotSizes;
262  }
263
264  /**
265   * Extracts the names of the store files referenced by this snapshot which satisfy the given
266   * predicate (the predicate returns {@code true}).
267   */
268  Set<StoreFileReference> getStoreFilesFromSnapshot(
269      SnapshotManifest manifest, Predicate<String> filter) {
270    Set<StoreFileReference> references = new HashSet<>();
271    // For each region referenced by the snapshot
272    for (SnapshotRegionManifest rm : manifest.getRegionManifests()) {
273      StoreFileReference regionReference = new StoreFileReference(
274          HRegionInfo.convert(rm.getRegionInfo()).getEncodedName());
275
276      // For each column family in this region
277      for (FamilyFiles ff : rm.getFamilyFilesList()) {
278        final String familyName = ff.getFamilyName().toStringUtf8();
279        // And each store file in that family
280        for (StoreFile sf : ff.getStoreFilesList()) {
281          String storeFileName = sf.getName();
282          // A snapshot only "inherits" a files size if it uniquely refers to it (no table
283          // and no other snapshot references it).
284          if (filter.test(storeFileName)) {
285            regionReference.addFamilyStoreFile(familyName, storeFileName);
286          }
287        }
288      }
289      // Only add this Region reference if we retained any files.
290      if (!regionReference.getFamilyToFilesMapping().isEmpty()) {
291        references.add(regionReference);
292      }
293    }
294    return references;
295  }
296
297  /**
298   * Calculates the directory in HDFS for a table based on the configuration.
299   */
300  Path getTableDir(TableName tn) throws IOException {
301    Path rootDir = FSUtils.getRootDir(conf);
302    return FSUtils.getTableDir(rootDir, tn);
303  }
304
305  /**
306   * Computes the size of each store file in {@code storeFileNames}
307   */
308  long getSizeOfStoreFiles(TableName tn, Set<StoreFileReference> storeFileNames) {
309    return storeFileNames.stream()
310        .collect(Collectors.summingLong((sfr) -> getSizeOfStoreFile(tn, sfr)));
311  }
312
313  /**
314   * Computes the size of the store files for a single region.
315   */
316  long getSizeOfStoreFile(TableName tn, StoreFileReference storeFileName) {
317    String regionName = storeFileName.getRegionName();
318    return storeFileName.getFamilyToFilesMapping()
319        .entries().stream()
320        .collect(Collectors.summingLong((e) ->
321            getSizeOfStoreFile(tn, regionName, e.getKey(), e.getValue())));
322  }
323
324  /**
325   * Computes the size of the store file given its name, region and family name in
326   * the archive directory.
327   */
328  long getSizeOfStoreFile(
329      TableName tn, String regionName, String family, String storeFile) {
330    Path familyArchivePath;
331    try {
332      familyArchivePath = HFileArchiveUtil.getStoreArchivePath(conf, tn, regionName, family);
333    } catch (IOException e) {
334      LOG.warn("Could not compute path for the archive directory for the region", e);
335      return 0L;
336    }
337    Path fileArchivePath = new Path(familyArchivePath, storeFile);
338    try {
339      if (fs.exists(fileArchivePath)) {
340        FileStatus[] status = fs.listStatus(fileArchivePath);
341        if (1 != status.length) {
342          LOG.warn("Expected " + fileArchivePath +
343              " to be a file but was a directory, ignoring reference");
344          return 0L;
345        }
346        return status[0].getLen();
347      }
348    } catch (IOException e) {
349      LOG.warn("Could not obtain the status of " + fileArchivePath, e);
350      return 0L;
351    }
352    LOG.warn("Expected " + fileArchivePath + " to exist but does not, ignoring reference.");
353    return 0L;
354  }
355
356  /**
357   * Writes the snapshot sizes to the {@code hbase:quota} table.
358   *
359   * @param snapshotsWithSize The snapshot sizes to write.
360   */
361  void persistSnapshotSizes(
362      Multimap<TableName,SnapshotWithSize> snapshotsWithSize) throws IOException {
363    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
364      // Write each snapshot size for the table
365      persistSnapshotSizes(quotaTable, snapshotsWithSize);
366      // Write a size entry for all snapshots in a namespace
367      persistSnapshotSizesByNS(quotaTable, snapshotsWithSize);
368    }
369  }
370
371  /**
372   * Writes the snapshot sizes to the provided {@code table}.
373   */
374  void persistSnapshotSizes(
375      Table table, Multimap<TableName,SnapshotWithSize> snapshotsWithSize) throws IOException {
376    // Convert each entry in the map to a Put and write them to the quota table
377    table.put(snapshotsWithSize.entries()
378        .stream()
379        .map(e -> QuotaTableUtil.createPutForSnapshotSize(
380            e.getKey(), e.getValue().getName(), e.getValue().getSize()))
381        .collect(Collectors.toList()));
382  }
383
384  /**
385   * Rolls up the snapshot sizes by namespace and writes a single record for each namespace
386   * which is the size of all snapshots in that namespace.
387   */
388  void persistSnapshotSizesByNS(
389      Table quotaTable, Multimap<TableName,SnapshotWithSize> snapshotsWithSize) throws IOException {
390    Map<String,Long> namespaceSnapshotSizes = groupSnapshotSizesByNamespace(snapshotsWithSize);
391    quotaTable.put(namespaceSnapshotSizes.entrySet().stream()
392        .map(e -> QuotaTableUtil.createPutForNamespaceSnapshotSize(
393            e.getKey(), e.getValue()))
394        .collect(Collectors.toList()));
395  }
396
397  /**
398   * Sums the snapshot sizes for each namespace.
399   */
400  Map<String,Long> groupSnapshotSizesByNamespace(
401      Multimap<TableName,SnapshotWithSize> snapshotsWithSize) {
402    return snapshotsWithSize.entries().stream()
403        .collect(Collectors.groupingBy(
404            // Convert TableName into the namespace string
405            (e) -> e.getKey().getNamespaceAsString(),
406            // Sum the values for namespace
407            Collectors.mapping(
408                Map.Entry::getValue, Collectors.summingLong((sws) -> sws.getSize()))));
409  }
410
411  /**
412   * A struct encapsulating the name of a snapshot and its "size" on the filesystem. This size is
413   * defined as the amount of filesystem space taken by the files the snapshot refers to which
414   * the originating table no longer refers to.
415   */
416  static class SnapshotWithSize {
417    private final String name;
418    private final long size;
419
420    SnapshotWithSize(String name, long size) {
421      this.name = Objects.requireNonNull(name);
422      this.size = size;
423    }
424
425    String getName() {
426      return name;
427    }
428
429    long getSize() {
430      return size;
431    }
432
433    @Override
434    public int hashCode() {
435      return new HashCodeBuilder().append(name).append(size).toHashCode();
436    }
437
438    @Override
439    public boolean equals(Object o) {
440      if (this == o) {
441        return true;
442      }
443
444      if (!(o instanceof SnapshotWithSize)) {
445        return false;
446      }
447
448      SnapshotWithSize other = (SnapshotWithSize) o;
449      return name.equals(other.name) && size == other.size;
450    }
451
452    @Override
453    public String toString() {
454      StringBuilder sb = new StringBuilder(32);
455      return sb.append("SnapshotWithSize:[").append(name).append(" ")
456          .append(StringUtils.byteDesc(size)).append("]").toString();
457    }
458  }
459
460  /**
461   * A reference to a collection of files in the archive directory for a single region.
462   */
463  static class StoreFileReference {
464    private final String regionName;
465    private final Multimap<String,String> familyToFiles;
466
467    StoreFileReference(String regionName) {
468      this.regionName = Objects.requireNonNull(regionName);
469      familyToFiles = HashMultimap.create();
470    }
471
472    String getRegionName() {
473      return regionName;
474    }
475
476    Multimap<String,String> getFamilyToFilesMapping() {
477      return familyToFiles;
478    }
479
480    void addFamilyStoreFile(String family, String storeFileName) {
481      familyToFiles.put(family, storeFileName);
482    }
483
484    @Override
485    public int hashCode() {
486      return new HashCodeBuilder().append(regionName).append(familyToFiles).toHashCode();
487    }
488
489    @Override
490    public boolean equals(Object o) {
491      if (this == o) {
492        return true;
493      }
494      if (!(o instanceof StoreFileReference)) {
495        return false;
496      }
497      StoreFileReference other = (StoreFileReference) o;
498      return regionName.equals(other.regionName) && familyToFiles.equals(other.familyToFiles);
499    }
500
501    @Override
502    public String toString() {
503      StringBuilder sb = new StringBuilder();
504      return sb.append("StoreFileReference[region=").append(regionName).append(", files=")
505          .append(familyToFiles).append("]").toString();
506    }
507  }
508
509  /**
510   * Extracts the period for the chore from the configuration.
511   *
512   * @param conf The configuration object.
513   * @return The configured chore period or the default value.
514   */
515  static int getPeriod(Configuration conf) {
516    return conf.getInt(SNAPSHOT_QUOTA_CHORE_PERIOD_KEY,
517        SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT);
518  }
519
520  /**
521   * Extracts the initial delay for the chore from the configuration.
522   *
523   * @param conf The configuration object.
524   * @return The configured chore initial delay or the default value.
525   */
526  static long getInitialDelay(Configuration conf) {
527    return conf.getLong(SNAPSHOT_QUOTA_CHORE_DELAY_KEY,
528        SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT);
529  }
530
531  /**
532   * Extracts the time unit for the chore period and initial delay from the configuration. The
533   * configuration value for {@link #SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY} must correspond to
534   * a {@link TimeUnit} value.
535   *
536   * @param conf The configuration object.
537   * @return The configured time unit for the chore period and initial delay or the default value.
538   */
539  static TimeUnit getTimeUnit(Configuration conf) {
540    return TimeUnit.valueOf(conf.get(SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY,
541        SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT));
542  }
543}