001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.Set;
029import java.util.concurrent.TimeUnit;
030import java.util.stream.Collectors;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.hbase.ScheduledChore;
034import org.apache.hadoop.hbase.Stoppable;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.Delete;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.master.HMaster;
041import org.apache.hadoop.hbase.master.MetricsMaster;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
047import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
048
049/**
050 * A Master-invoked {@code Chore} that computes the size of each snapshot which was created from a
051 * table which has a space quota.
052 */
053@InterfaceAudience.Private
054public class SnapshotQuotaObserverChore extends ScheduledChore {
055  private static final Logger LOG = LoggerFactory.getLogger(SnapshotQuotaObserverChore.class);
056  static final String SNAPSHOT_QUOTA_CHORE_PERIOD_KEY = "hbase.master.quotas.snapshot.chore.period";
057  static final int SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
058
059  static final String SNAPSHOT_QUOTA_CHORE_DELAY_KEY = "hbase.master.quotas.snapshot.chore.delay";
060  static final long SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute in millis
061
062  static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY =
063    "hbase.master.quotas.snapshot.chore.timeunit";
064  static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
065
066  private final Connection conn;
067  private final Configuration conf;
068  private final MetricsMaster metrics;
069  private final FileSystem fs;
070
071  public SnapshotQuotaObserverChore(HMaster master, MetricsMaster metrics) {
072    this(master.getConnection(), master.getConfiguration(), master.getFileSystem(), master,
073      metrics);
074  }
075
076  SnapshotQuotaObserverChore(Connection conn, Configuration conf, FileSystem fs, Stoppable stopper,
077    MetricsMaster metrics) {
078    super(QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf), getInitialDelay(conf),
079      getTimeUnit(conf));
080    this.conn = conn;
081    this.conf = conf;
082    this.metrics = metrics;
083    this.fs = fs;
084  }
085
086  @Override
087  protected void chore() {
088    try {
089      if (LOG.isTraceEnabled()) {
090        LOG.trace("Computing sizes of snapshots for quota management.");
091      }
092      long start = System.nanoTime();
093      _chore();
094      if (null != metrics) {
095        metrics.incrementSnapshotObserverTime((System.nanoTime() - start) / 1_000_000);
096      }
097    } catch (IOException e) {
098      LOG.warn("Failed to compute the size of snapshots, will retry", e);
099    }
100  }
101
102  void _chore() throws IOException {
103    // Gets all tables with quotas that also have snapshots.
104    // This values are all of the snapshots that we need to compute the size of.
105    long start = System.nanoTime();
106    Multimap<TableName, String> snapshotsToComputeSize = getSnapshotsToComputeSize();
107    if (null != metrics) {
108      metrics.incrementSnapshotFetchTime((System.nanoTime() - start) / 1_000_000);
109    }
110
111    // Remove old table snapshots data
112    pruneTableSnapshots(snapshotsToComputeSize);
113
114    // Remove old namespace snapshots data
115    pruneNamespaceSnapshots(snapshotsToComputeSize);
116
117    // For each table, compute the size of each snapshot
118    Map<String, Long> namespaceSnapshotSizes = computeSnapshotSizes(snapshotsToComputeSize);
119
120    // Write the size data by namespaces to the quota table.
121    // We need to do this "globally" since each FileArchiverNotifier is limited to its own Table.
122    persistSnapshotSizesForNamespaces(namespaceSnapshotSizes);
123  }
124
125  /**
126   * Removes the snapshot entries that are present in Quota table but not in snapshotsToComputeSize
127   * @param snapshotsToComputeSize list of snapshots to be persisted
128   */
129  void pruneTableSnapshots(Multimap<TableName, String> snapshotsToComputeSize) throws IOException {
130    Multimap<TableName, String> existingSnapshotEntries = QuotaTableUtil.getTableSnapshots(conn);
131    Multimap<TableName, String> snapshotEntriesToRemove = HashMultimap.create();
132    for (Entry<TableName, Collection<String>> entry : existingSnapshotEntries.asMap().entrySet()) {
133      TableName tn = entry.getKey();
134      Set<String> setOfSnapshots = new HashSet<>(entry.getValue());
135      for (String snapshot : snapshotsToComputeSize.get(tn)) {
136        setOfSnapshots.remove(snapshot);
137      }
138
139      for (String snapshot : setOfSnapshots) {
140        snapshotEntriesToRemove.put(tn, snapshot);
141      }
142    }
143    removeExistingTableSnapshotSizes(snapshotEntriesToRemove);
144  }
145
146  /**
147   * Removes the snapshot entries that are present in Quota table but not in snapshotsToComputeSize
148   * @param snapshotsToComputeSize list of snapshots to be persisted
149   */
150  void pruneNamespaceSnapshots(Multimap<TableName, String> snapshotsToComputeSize)
151    throws IOException {
152    Set<String> existingSnapshotEntries = QuotaTableUtil.getNamespaceSnapshots(conn);
153    for (TableName tableName : snapshotsToComputeSize.keySet()) {
154      existingSnapshotEntries.remove(tableName.getNamespaceAsString());
155    }
156    // here existingSnapshotEntries is left with the entries to be removed
157    removeExistingNamespaceSnapshotSizes(existingSnapshotEntries);
158  }
159
160  /**
161   * Fetches each table with a quota (table or namespace quota), and then fetch the name of each
162   * snapshot which was created from that table.
163   * @return A mapping of table to snapshots created from that table
164   */
165  Multimap<TableName, String> getSnapshotsToComputeSize() throws IOException {
166    Set<TableName> tablesToFetchSnapshotsFrom = new HashSet<>();
167    QuotaFilter filter = new QuotaFilter();
168    filter.addTypeFilter(QuotaType.SPACE);
169    try (Admin admin = conn.getAdmin()) {
170      // Pull all of the tables that have quotas (direct, or from namespace)
171      for (QuotaSettings qs : QuotaRetriever.open(conf, filter)) {
172        if (qs.getQuotaType() == QuotaType.SPACE) {
173          String ns = qs.getNamespace();
174          TableName tn = qs.getTableName();
175          if ((null == ns && null == tn) || (null != ns && null != tn)) {
176            throw new IllegalStateException(
177              "Expected either one of namespace and tablename to be null but not both");
178          }
179          // Collect either the table name itself, or all of the tables in the namespace
180          if (null != ns) {
181            tablesToFetchSnapshotsFrom.addAll(Arrays.asList(admin.listTableNamesByNamespace(ns)));
182          } else {
183            tablesToFetchSnapshotsFrom.add(tn);
184          }
185        }
186      }
187      // Fetch all snapshots that were created from these tables
188      return getSnapshotsFromTables(admin, tablesToFetchSnapshotsFrom);
189    }
190  }
191
192  /**
193   * Computes a mapping of originating {@code TableName} to snapshots, when the {@code TableName}
194   * exists in the provided {@code Set}.
195   */
196  Multimap<TableName, String> getSnapshotsFromTables(Admin admin,
197    Set<TableName> tablesToFetchSnapshotsFrom) throws IOException {
198    Multimap<TableName, String> snapshotsToCompute = HashMultimap.create();
199    for (org.apache.hadoop.hbase.client.SnapshotDescription sd : admin.listSnapshots()) {
200      TableName tn = sd.getTableName();
201      if (tablesToFetchSnapshotsFrom.contains(tn)) {
202        snapshotsToCompute.put(tn, sd.getName());
203      }
204    }
205    return snapshotsToCompute;
206  }
207
208  /**
209   * Computes the size of each snapshot provided given the current files referenced by the table.
210   * @param snapshotsToComputeSize The snapshots to compute the size of
211   * @return A mapping of table to snapshot created from that table and the snapshot's size.
212   */
213  Map<String, Long> computeSnapshotSizes(Multimap<TableName, String> snapshotsToComputeSize)
214    throws IOException {
215    final Map<String, Long> snapshotSizesByNamespace = new HashMap<>();
216    final long start = System.nanoTime();
217    for (Entry<TableName, Collection<String>> entry : snapshotsToComputeSize.asMap().entrySet()) {
218      final TableName tn = entry.getKey();
219      final Collection<String> snapshotNames = entry.getValue();
220
221      // Get our notifier instance, this is tracking archivals that happen out-of-band of this chore
222      FileArchiverNotifier notifier = getNotifierForTable(tn);
223
224      // The total size consumed by all snapshots against this table
225      long totalSnapshotSize = notifier.computeAndStoreSnapshotSizes(snapshotNames);
226      // Bucket that size into the appropriate namespace
227      snapshotSizesByNamespace.merge(tn.getNamespaceAsString(), totalSnapshotSize, Long::sum);
228    }
229
230    // Update the amount of time it took to compute the size of the snapshots for a table
231    if (metrics != null) {
232      metrics.incrementSnapshotSizeComputationTime((System.nanoTime() - start) / 1_000_000);
233    }
234
235    return snapshotSizesByNamespace;
236  }
237
238  /**
239   * Returns the correct instance of {@link FileArchiverNotifier} for the given table name.
240   * @param tn The table name
241   * @return A {@link FileArchiverNotifier} instance
242   */
243  FileArchiverNotifier getNotifierForTable(TableName tn) {
244    return FileArchiverNotifierFactoryImpl.getInstance().get(conn, conf, fs, tn);
245  }
246
247  /**
248   * Writes the size used by snapshots for each namespace to the quota table.
249   */
250  void persistSnapshotSizesForNamespaces(Map<String, Long> snapshotSizesByNamespace)
251    throws IOException {
252    try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
253      quotaTable.put(snapshotSizesByNamespace.entrySet().stream()
254        .map(e -> QuotaTableUtil.createPutForNamespaceSnapshotSize(e.getKey(), e.getValue()))
255        .collect(Collectors.toList()));
256    }
257  }
258
259  void removeExistingTableSnapshotSizes(Multimap<TableName, String> snapshotEntriesToRemove)
260    throws IOException {
261    removeExistingSnapshotSizes(
262      QuotaTableUtil.createDeletesForExistingTableSnapshotSizes(snapshotEntriesToRemove));
263  }
264
265  void removeExistingNamespaceSnapshotSizes(Set<String> snapshotEntriesToRemove)
266    throws IOException {
267    removeExistingSnapshotSizes(
268      QuotaTableUtil.createDeletesForExistingNamespaceSnapshotSizes(snapshotEntriesToRemove));
269  }
270
271  void removeExistingSnapshotSizes(List<Delete> deletes) throws IOException {
272    try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
273      quotaTable.delete(deletes);
274    }
275  }
276
277  /**
278   * Extracts the period for the chore from the configuration.
279   * @param conf The configuration object.
280   * @return The configured chore period or the default value.
281   */
282  static int getPeriod(Configuration conf) {
283    return conf.getInt(SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT);
284  }
285
286  /**
287   * Extracts the initial delay for the chore from the configuration.
288   * @param conf The configuration object.
289   * @return The configured chore initial delay or the default value.
290   */
291  static long getInitialDelay(Configuration conf) {
292    return conf.getLong(SNAPSHOT_QUOTA_CHORE_DELAY_KEY, SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT);
293  }
294
295  /**
296   * Extracts the time unit for the chore period and initial delay from the configuration. The
297   * configuration value for {@link #SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY} must correspond to a
298   * {@link TimeUnit} value.
299   * @param conf The configuration object.
300   * @return The configured time unit for the chore period and initial delay or the default value.
301   */
302  static TimeUnit getTimeUnit(Configuration conf) {
303    return TimeUnit
304      .valueOf(conf.get(SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY, SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT));
305  }
306}