001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to you under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.hadoop.hbase.quotas; 018 019import java.io.IOException; 020import java.util.Arrays; 021import java.util.Collection; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Map.Entry; 027import java.util.Set; 028import java.util.concurrent.TimeUnit; 029import java.util.stream.Collectors; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.hbase.ScheduledChore; 034import org.apache.hadoop.hbase.Stoppable; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.Delete; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.master.HMaster; 044import org.apache.hadoop.hbase.master.MetricsMaster; 045 046import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 047import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 048 049/** 050 * A Master-invoked {@code Chore} that computes the size of each snapshot which was created from 051 * a table which has a space quota. 052 */ 053@InterfaceAudience.Private 054public class SnapshotQuotaObserverChore extends ScheduledChore { 055 private static final Logger LOG = LoggerFactory.getLogger(SnapshotQuotaObserverChore.class); 056 static final String SNAPSHOT_QUOTA_CHORE_PERIOD_KEY = 057 "hbase.master.quotas.snapshot.chore.period"; 058 static final int SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis 059 060 static final String SNAPSHOT_QUOTA_CHORE_DELAY_KEY = 061 "hbase.master.quotas.snapshot.chore.delay"; 062 static final long SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute in millis 063 064 static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY = 065 "hbase.master.quotas.snapshot.chore.timeunit"; 066 static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name(); 067 068 private final Connection conn; 069 private final Configuration conf; 070 private final MetricsMaster metrics; 071 private final FileSystem fs; 072 073 public SnapshotQuotaObserverChore(HMaster master, MetricsMaster metrics) { 074 this( 075 master.getConnection(), master.getConfiguration(), master.getFileSystem(), master, metrics); 076 } 077 078 SnapshotQuotaObserverChore( 079 Connection conn, Configuration conf, FileSystem fs, Stoppable stopper, 080 MetricsMaster metrics) { 081 super( 082 QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf), 083 getInitialDelay(conf), getTimeUnit(conf)); 084 this.conn = conn; 085 this.conf = conf; 086 this.metrics = metrics; 087 this.fs = fs; 088 } 089 090 @Override 091 protected void chore() { 092 try { 093 if (LOG.isTraceEnabled()) { 094 LOG.trace("Computing sizes of snapshots for quota management."); 095 } 096 long start = System.nanoTime(); 097 _chore(); 098 if (null != metrics) { 099 metrics.incrementSnapshotObserverTime((System.nanoTime() - start) / 1_000_000); 100 } 101 } catch (IOException e) { 102 LOG.warn("Failed to compute the size of snapshots, will retry", e); 103 } 104 } 105 106 void _chore() throws IOException { 107 // Gets all tables with quotas that also have snapshots. 108 // This values are all of the snapshots that we need to compute the size of. 109 long start = System.nanoTime(); 110 Multimap<TableName,String> snapshotsToComputeSize = getSnapshotsToComputeSize(); 111 if (null != metrics) { 112 metrics.incrementSnapshotFetchTime((System.nanoTime() - start) / 1_000_000); 113 } 114 115 // Remove old table snapshots data 116 pruneTableSnapshots(snapshotsToComputeSize); 117 118 // Remove old namespace snapshots data 119 pruneNamespaceSnapshots(snapshotsToComputeSize); 120 121 // For each table, compute the size of each snapshot 122 Map<String,Long> namespaceSnapshotSizes = computeSnapshotSizes(snapshotsToComputeSize); 123 124 // Write the size data by namespaces to the quota table. 125 // We need to do this "globally" since each FileArchiverNotifier is limited to its own Table. 126 persistSnapshotSizesForNamespaces(namespaceSnapshotSizes); 127 } 128 129 /** 130 * Removes the snapshot entries that are present in Quota table but not in snapshotsToComputeSize 131 * 132 * @param snapshotsToComputeSize list of snapshots to be persisted 133 */ 134 void pruneTableSnapshots(Multimap<TableName, String> snapshotsToComputeSize) throws IOException { 135 Multimap<TableName, String> existingSnapshotEntries = QuotaTableUtil.getTableSnapshots(conn); 136 Multimap<TableName, String> snapshotEntriesToRemove = HashMultimap.create(); 137 for (Entry<TableName, Collection<String>> entry : existingSnapshotEntries.asMap().entrySet()) { 138 TableName tn = entry.getKey(); 139 Set<String> setOfSnapshots = new HashSet<>(entry.getValue()); 140 for (String snapshot : snapshotsToComputeSize.get(tn)) { 141 setOfSnapshots.remove(snapshot); 142 } 143 144 for (String snapshot : setOfSnapshots) { 145 snapshotEntriesToRemove.put(tn, snapshot); 146 } 147 } 148 removeExistingTableSnapshotSizes(snapshotEntriesToRemove); 149 } 150 151 /** 152 * Removes the snapshot entries that are present in Quota table but not in snapshotsToComputeSize 153 * 154 * @param snapshotsToComputeSize list of snapshots to be persisted 155 */ 156 void pruneNamespaceSnapshots(Multimap<TableName, String> snapshotsToComputeSize) 157 throws IOException { 158 Set<String> existingSnapshotEntries = QuotaTableUtil.getNamespaceSnapshots(conn); 159 for (TableName tableName : snapshotsToComputeSize.keySet()) { 160 existingSnapshotEntries.remove(tableName.getNamespaceAsString()); 161 } 162 // here existingSnapshotEntries is left with the entries to be removed 163 removeExistingNamespaceSnapshotSizes(existingSnapshotEntries); 164 } 165 166 /** 167 * Fetches each table with a quota (table or namespace quota), and then fetch the name of each 168 * snapshot which was created from that table. 169 * 170 * @return A mapping of table to snapshots created from that table 171 */ 172 Multimap<TableName,String> getSnapshotsToComputeSize() throws IOException { 173 Set<TableName> tablesToFetchSnapshotsFrom = new HashSet<>(); 174 QuotaFilter filter = new QuotaFilter(); 175 filter.addTypeFilter(QuotaType.SPACE); 176 try (Admin admin = conn.getAdmin()) { 177 // Pull all of the tables that have quotas (direct, or from namespace) 178 for (QuotaSettings qs : QuotaRetriever.open(conf, filter)) { 179 String ns = qs.getNamespace(); 180 TableName tn = qs.getTableName(); 181 if ((null == ns && null == tn) || (null != ns && null != tn)) { 182 throw new IllegalStateException( 183 "Expected only one of namespace and tablename to be null"); 184 } 185 // Collect either the table name itself, or all of the tables in the namespace 186 if (null != ns) { 187 tablesToFetchSnapshotsFrom.addAll(Arrays.asList(admin.listTableNamesByNamespace(ns))); 188 } else { 189 tablesToFetchSnapshotsFrom.add(tn); 190 } 191 } 192 // Fetch all snapshots that were created from these tables 193 return getSnapshotsFromTables(admin, tablesToFetchSnapshotsFrom); 194 } 195 } 196 197 /** 198 * Computes a mapping of originating {@code TableName} to snapshots, when the {@code TableName} 199 * exists in the provided {@code Set}. 200 */ 201 Multimap<TableName,String> getSnapshotsFromTables( 202 Admin admin, Set<TableName> tablesToFetchSnapshotsFrom) throws IOException { 203 Multimap<TableName,String> snapshotsToCompute = HashMultimap.create(); 204 for (org.apache.hadoop.hbase.client.SnapshotDescription sd : admin.listSnapshots()) { 205 TableName tn = sd.getTableName(); 206 if (tablesToFetchSnapshotsFrom.contains(tn)) { 207 snapshotsToCompute.put(tn, sd.getName()); 208 } 209 } 210 return snapshotsToCompute; 211 } 212 213 /** 214 * Computes the size of each snapshot provided given the current files referenced by the table. 215 * 216 * @param snapshotsToComputeSize The snapshots to compute the size of 217 * @return A mapping of table to snapshot created from that table and the snapshot's size. 218 */ 219 Map<String,Long> computeSnapshotSizes( 220 Multimap<TableName,String> snapshotsToComputeSize) throws IOException { 221 final Map<String,Long> snapshotSizesByNamespace = new HashMap<>(); 222 final long start = System.nanoTime(); 223 for (Entry<TableName,Collection<String>> entry : snapshotsToComputeSize.asMap().entrySet()) { 224 final TableName tn = entry.getKey(); 225 final Collection<String> snapshotNames = entry.getValue(); 226 227 // Get our notifier instance, this is tracking archivals that happen out-of-band of this chore 228 FileArchiverNotifier notifier = getNotifierForTable(tn); 229 230 // The total size consumed by all snapshots against this table 231 long totalSnapshotSize = notifier.computeAndStoreSnapshotSizes(snapshotNames); 232 // Bucket that size into the appropriate namespace 233 snapshotSizesByNamespace.merge(tn.getNamespaceAsString(), totalSnapshotSize, Long::sum); 234 } 235 236 // Update the amount of time it took to compute the size of the snapshots for a table 237 if (metrics != null) { 238 metrics.incrementSnapshotSizeComputationTime((System.nanoTime() - start) / 1_000_000); 239 } 240 241 return snapshotSizesByNamespace; 242 } 243 244 /** 245 * Returns the correct instance of {@link FileArchiverNotifier} for the given table name. 246 * 247 * @param tn The table name 248 * @return A {@link FileArchiverNotifier} instance 249 */ 250 FileArchiverNotifier getNotifierForTable(TableName tn) { 251 return FileArchiverNotifierFactoryImpl.getInstance().get(conn, conf, fs, tn); 252 } 253 254 /** 255 * Writes the size used by snapshots for each namespace to the quota table. 256 */ 257 void persistSnapshotSizesForNamespaces( 258 Map<String,Long> snapshotSizesByNamespace) throws IOException { 259 try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 260 quotaTable.put(snapshotSizesByNamespace.entrySet().stream() 261 .map(e -> QuotaTableUtil.createPutForNamespaceSnapshotSize(e.getKey(), e.getValue())) 262 .collect(Collectors.toList())); 263 } 264 } 265 266 void removeExistingTableSnapshotSizes(Multimap<TableName, String> snapshotEntriesToRemove) 267 throws IOException { 268 removeExistingSnapshotSizes( 269 QuotaTableUtil.createDeletesForExistingTableSnapshotSizes(snapshotEntriesToRemove)); 270 } 271 272 void removeExistingNamespaceSnapshotSizes(Set<String> snapshotEntriesToRemove) 273 throws IOException { 274 removeExistingSnapshotSizes( 275 QuotaTableUtil.createDeletesForExistingNamespaceSnapshotSizes(snapshotEntriesToRemove)); 276 } 277 278 void removeExistingSnapshotSizes(List<Delete> deletes) throws IOException { 279 try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 280 quotaTable.delete(deletes); 281 } 282 } 283 284 /** 285 * Extracts the period for the chore from the configuration. 286 * 287 * @param conf The configuration object. 288 * @return The configured chore period or the default value. 289 */ 290 static int getPeriod(Configuration conf) { 291 return conf.getInt(SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 292 SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT); 293 } 294 295 /** 296 * Extracts the initial delay for the chore from the configuration. 297 * 298 * @param conf The configuration object. 299 * @return The configured chore initial delay or the default value. 300 */ 301 static long getInitialDelay(Configuration conf) { 302 return conf.getLong(SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 303 SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT); 304 } 305 306 /** 307 * Extracts the time unit for the chore period and initial delay from the configuration. The 308 * configuration value for {@link #SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY} must correspond to 309 * a {@link TimeUnit} value. 310 * 311 * @param conf The configuration object. 312 * @return The configured time unit for the chore period and initial delay or the default value. 313 */ 314 static TimeUnit getTimeUnit(Configuration conf) { 315 return TimeUnit.valueOf(conf.get(SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY, 316 SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT)); 317 } 318}