001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to you under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.hadoop.hbase.quotas;
018
019import java.io.IOException;
020import java.util.Arrays;
021import java.util.Collection;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Map.Entry;
027import java.util.Set;
028import java.util.concurrent.TimeUnit;
029import java.util.stream.Collectors;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.hbase.ScheduledChore;
034import org.apache.hadoop.hbase.Stoppable;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.Delete;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.master.HMaster;
044import org.apache.hadoop.hbase.master.MetricsMaster;
045
046import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
047import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
048
049/**
050 * A Master-invoked {@code Chore} that computes the size of each snapshot which was created from
051 * a table which has a space quota.
052 */
053@InterfaceAudience.Private
054public class SnapshotQuotaObserverChore extends ScheduledChore {
055  private static final Logger LOG = LoggerFactory.getLogger(SnapshotQuotaObserverChore.class);
056  static final String SNAPSHOT_QUOTA_CHORE_PERIOD_KEY =
057      "hbase.master.quotas.snapshot.chore.period";
058  static final int SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
059
060  static final String SNAPSHOT_QUOTA_CHORE_DELAY_KEY =
061      "hbase.master.quotas.snapshot.chore.delay";
062  static final long SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute in millis
063
064  static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY =
065      "hbase.master.quotas.snapshot.chore.timeunit";
066  static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
067
068  private final Connection conn;
069  private final Configuration conf;
070  private final MetricsMaster metrics;
071  private final FileSystem fs;
072
073  public SnapshotQuotaObserverChore(HMaster master, MetricsMaster metrics) {
074    this(
075        master.getConnection(), master.getConfiguration(), master.getFileSystem(), master, metrics);
076  }
077
078  SnapshotQuotaObserverChore(
079      Connection conn, Configuration conf, FileSystem fs, Stoppable stopper,
080      MetricsMaster metrics) {
081    super(
082        QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf),
083        getInitialDelay(conf), getTimeUnit(conf));
084    this.conn = conn;
085    this.conf = conf;
086    this.metrics = metrics;
087    this.fs = fs;
088  }
089
090  @Override
091  protected void chore() {
092    try {
093      if (LOG.isTraceEnabled()) {
094        LOG.trace("Computing sizes of snapshots for quota management.");
095      }
096      long start = System.nanoTime();
097      _chore();
098      if (null != metrics) {
099        metrics.incrementSnapshotObserverTime((System.nanoTime() - start) / 1_000_000);
100      }
101    } catch (IOException e) {
102      LOG.warn("Failed to compute the size of snapshots, will retry", e);
103    }
104  }
105
106  void _chore() throws IOException {
107    // Gets all tables with quotas that also have snapshots.
108    // This values are all of the snapshots that we need to compute the size of.
109    long start = System.nanoTime();
110    Multimap<TableName,String> snapshotsToComputeSize = getSnapshotsToComputeSize();
111    if (null != metrics) {
112      metrics.incrementSnapshotFetchTime((System.nanoTime() - start) / 1_000_000);
113    }
114
115    // Remove old table snapshots data
116    pruneTableSnapshots(snapshotsToComputeSize);
117
118    // Remove old namespace snapshots data
119    pruneNamespaceSnapshots(snapshotsToComputeSize);
120
121    // For each table, compute the size of each snapshot
122    Map<String,Long> namespaceSnapshotSizes = computeSnapshotSizes(snapshotsToComputeSize);
123
124    // Write the size data by namespaces to the quota table.
125    // We need to do this "globally" since each FileArchiverNotifier is limited to its own Table.
126    persistSnapshotSizesForNamespaces(namespaceSnapshotSizes);
127  }
128
129  /**
130   * Removes the snapshot entries that are present in Quota table but not in snapshotsToComputeSize
131   *
132   * @param snapshotsToComputeSize list of snapshots to be persisted
133   */
134  void pruneTableSnapshots(Multimap<TableName, String> snapshotsToComputeSize) throws IOException {
135    Multimap<TableName, String> existingSnapshotEntries = QuotaTableUtil.getTableSnapshots(conn);
136    Multimap<TableName, String> snapshotEntriesToRemove = HashMultimap.create();
137    for (Entry<TableName, Collection<String>> entry : existingSnapshotEntries.asMap().entrySet()) {
138      TableName tn = entry.getKey();
139      Set<String> setOfSnapshots = new HashSet<>(entry.getValue());
140      for (String snapshot : snapshotsToComputeSize.get(tn)) {
141        setOfSnapshots.remove(snapshot);
142      }
143
144      for (String snapshot : setOfSnapshots) {
145        snapshotEntriesToRemove.put(tn, snapshot);
146      }
147    }
148    removeExistingTableSnapshotSizes(snapshotEntriesToRemove);
149  }
150
151  /**
152   * Removes the snapshot entries that are present in Quota table but not in snapshotsToComputeSize
153   *
154   * @param snapshotsToComputeSize list of snapshots to be persisted
155   */
156  void pruneNamespaceSnapshots(Multimap<TableName, String> snapshotsToComputeSize)
157      throws IOException {
158    Set<String> existingSnapshotEntries = QuotaTableUtil.getNamespaceSnapshots(conn);
159    for (TableName tableName : snapshotsToComputeSize.keySet()) {
160      existingSnapshotEntries.remove(tableName.getNamespaceAsString());
161    }
162    // here existingSnapshotEntries is left with the entries to be removed
163    removeExistingNamespaceSnapshotSizes(existingSnapshotEntries);
164  }
165
166  /**
167   * Fetches each table with a quota (table or namespace quota), and then fetch the name of each
168   * snapshot which was created from that table.
169   *
170   * @return A mapping of table to snapshots created from that table
171   */
172  Multimap<TableName,String> getSnapshotsToComputeSize() throws IOException {
173    Set<TableName> tablesToFetchSnapshotsFrom = new HashSet<>();
174    QuotaFilter filter = new QuotaFilter();
175    filter.addTypeFilter(QuotaType.SPACE);
176    try (Admin admin = conn.getAdmin()) {
177      // Pull all of the tables that have quotas (direct, or from namespace)
178      for (QuotaSettings qs : QuotaRetriever.open(conf, filter)) {
179        if (qs.getQuotaType() == QuotaType.SPACE) {
180          String ns = qs.getNamespace();
181          TableName tn = qs.getTableName();
182          if ((null == ns && null == tn) || (null != ns && null != tn)) {
183            throw new IllegalStateException(
184                "Expected either one of namespace and tablename to be null but not both");
185          }
186          // Collect either the table name itself, or all of the tables in the namespace
187          if (null != ns) {
188            tablesToFetchSnapshotsFrom.addAll(Arrays.asList(admin.listTableNamesByNamespace(ns)));
189          } else {
190            tablesToFetchSnapshotsFrom.add(tn);
191          }
192        }
193      }
194      // Fetch all snapshots that were created from these tables
195      return getSnapshotsFromTables(admin, tablesToFetchSnapshotsFrom);
196    }
197  }
198
199  /**
200   * Computes a mapping of originating {@code TableName} to snapshots, when the {@code TableName}
201   * exists in the provided {@code Set}.
202   */
203  Multimap<TableName,String> getSnapshotsFromTables(
204      Admin admin, Set<TableName> tablesToFetchSnapshotsFrom) throws IOException {
205    Multimap<TableName,String> snapshotsToCompute = HashMultimap.create();
206    for (org.apache.hadoop.hbase.client.SnapshotDescription sd : admin.listSnapshots()) {
207      TableName tn = sd.getTableName();
208      if (tablesToFetchSnapshotsFrom.contains(tn)) {
209        snapshotsToCompute.put(tn, sd.getName());
210      }
211    }
212    return snapshotsToCompute;
213  }
214
215  /**
216   * Computes the size of each snapshot provided given the current files referenced by the table.
217   *
218   * @param snapshotsToComputeSize The snapshots to compute the size of
219   * @return A mapping of table to snapshot created from that table and the snapshot's size.
220   */
221  Map<String,Long> computeSnapshotSizes(
222      Multimap<TableName,String> snapshotsToComputeSize) throws IOException {
223    final Map<String,Long> snapshotSizesByNamespace = new HashMap<>();
224    final long start = System.nanoTime();
225    for (Entry<TableName,Collection<String>> entry : snapshotsToComputeSize.asMap().entrySet()) {
226      final TableName tn = entry.getKey();
227      final Collection<String> snapshotNames = entry.getValue();
228
229      // Get our notifier instance, this is tracking archivals that happen out-of-band of this chore
230      FileArchiverNotifier notifier = getNotifierForTable(tn);
231
232      // The total size consumed by all snapshots against this table
233      long totalSnapshotSize = notifier.computeAndStoreSnapshotSizes(snapshotNames);
234      // Bucket that size into the appropriate namespace
235      snapshotSizesByNamespace.merge(tn.getNamespaceAsString(), totalSnapshotSize, Long::sum);
236    }
237
238    // Update the amount of time it took to compute the size of the snapshots for a table
239    if (metrics != null) {
240      metrics.incrementSnapshotSizeComputationTime((System.nanoTime() - start) / 1_000_000);
241    }
242
243    return snapshotSizesByNamespace;
244  }
245
246  /**
247   * Returns the correct instance of {@link FileArchiverNotifier} for the given table name.
248   *
249   * @param tn The table name
250   * @return A {@link FileArchiverNotifier} instance
251   */
252  FileArchiverNotifier getNotifierForTable(TableName tn) {
253    return FileArchiverNotifierFactoryImpl.getInstance().get(conn, conf, fs, tn);
254  }
255
256  /**
257   * Writes the size used by snapshots for each namespace to the quota table.
258   */
259  void persistSnapshotSizesForNamespaces(
260      Map<String,Long> snapshotSizesByNamespace) throws IOException {
261    try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
262      quotaTable.put(snapshotSizesByNamespace.entrySet().stream()
263          .map(e -> QuotaTableUtil.createPutForNamespaceSnapshotSize(e.getKey(), e.getValue()))
264          .collect(Collectors.toList()));
265    }
266  }
267
268  void removeExistingTableSnapshotSizes(Multimap<TableName, String> snapshotEntriesToRemove)
269      throws IOException {
270    removeExistingSnapshotSizes(
271        QuotaTableUtil.createDeletesForExistingTableSnapshotSizes(snapshotEntriesToRemove));
272  }
273
274  void removeExistingNamespaceSnapshotSizes(Set<String> snapshotEntriesToRemove)
275      throws IOException {
276    removeExistingSnapshotSizes(
277        QuotaTableUtil.createDeletesForExistingNamespaceSnapshotSizes(snapshotEntriesToRemove));
278  }
279
280  void removeExistingSnapshotSizes(List<Delete> deletes) throws IOException {
281    try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
282      quotaTable.delete(deletes);
283    }
284  }
285
286  /**
287   * Extracts the period for the chore from the configuration.
288   *
289   * @param conf The configuration object.
290   * @return The configured chore period or the default value.
291   */
292  static int getPeriod(Configuration conf) {
293    return conf.getInt(SNAPSHOT_QUOTA_CHORE_PERIOD_KEY,
294        SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT);
295  }
296
297  /**
298   * Extracts the initial delay for the chore from the configuration.
299   *
300   * @param conf The configuration object.
301   * @return The configured chore initial delay or the default value.
302   */
303  static long getInitialDelay(Configuration conf) {
304    return conf.getLong(SNAPSHOT_QUOTA_CHORE_DELAY_KEY,
305        SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT);
306  }
307
308  /**
309   * Extracts the time unit for the chore period and initial delay from the configuration. The
310   * configuration value for {@link #SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY} must correspond to
311   * a {@link TimeUnit} value.
312   *
313   * @param conf The configuration object.
314   * @return The configured time unit for the chore period and initial delay or the default value.
315   */
316  static TimeUnit getTimeUnit(Configuration conf) {
317    return TimeUnit.valueOf(conf.get(SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY,
318        SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT));
319  }
320}