001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to you under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.hadoop.hbase.quotas; 018 019import java.io.IOException; 020import java.util.Arrays; 021import java.util.Collection; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Map.Entry; 027import java.util.Set; 028import java.util.concurrent.TimeUnit; 029import java.util.stream.Collectors; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.hbase.ScheduledChore; 034import org.apache.hadoop.hbase.Stoppable; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.Delete; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.master.HMaster; 044import org.apache.hadoop.hbase.master.MetricsMaster; 045 046import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 047import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 048 049/** 050 * A Master-invoked {@code Chore} that computes the size of each snapshot which was created from 051 * a table which has a space quota. 052 */ 053@InterfaceAudience.Private 054public class SnapshotQuotaObserverChore extends ScheduledChore { 055 private static final Logger LOG = LoggerFactory.getLogger(SnapshotQuotaObserverChore.class); 056 static final String SNAPSHOT_QUOTA_CHORE_PERIOD_KEY = 057 "hbase.master.quotas.snapshot.chore.period"; 058 static final int SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis 059 060 static final String SNAPSHOT_QUOTA_CHORE_DELAY_KEY = 061 "hbase.master.quotas.snapshot.chore.delay"; 062 static final long SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute in millis 063 064 static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY = 065 "hbase.master.quotas.snapshot.chore.timeunit"; 066 static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name(); 067 068 private final Connection conn; 069 private final Configuration conf; 070 private final MetricsMaster metrics; 071 private final FileSystem fs; 072 073 public SnapshotQuotaObserverChore(HMaster master, MetricsMaster metrics) { 074 this( 075 master.getConnection(), master.getConfiguration(), master.getFileSystem(), master, metrics); 076 } 077 078 SnapshotQuotaObserverChore( 079 Connection conn, Configuration conf, FileSystem fs, Stoppable stopper, 080 MetricsMaster metrics) { 081 super( 082 QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf), 083 getInitialDelay(conf), getTimeUnit(conf)); 084 this.conn = conn; 085 this.conf = conf; 086 this.metrics = metrics; 087 this.fs = fs; 088 } 089 090 @Override 091 protected void chore() { 092 try { 093 if (LOG.isTraceEnabled()) { 094 LOG.trace("Computing sizes of snapshots for quota management."); 095 } 096 long start = System.nanoTime(); 097 _chore(); 098 if (null != metrics) { 099 metrics.incrementSnapshotObserverTime((System.nanoTime() - start) / 1_000_000); 100 } 101 } catch (IOException e) { 102 LOG.warn("Failed to compute the size of snapshots, will retry", e); 103 } 104 } 105 106 void _chore() throws IOException { 107 // Gets all tables with quotas that also have snapshots. 108 // This values are all of the snapshots that we need to compute the size of. 109 long start = System.nanoTime(); 110 Multimap<TableName,String> snapshotsToComputeSize = getSnapshotsToComputeSize(); 111 if (null != metrics) { 112 metrics.incrementSnapshotFetchTime((System.nanoTime() - start) / 1_000_000); 113 } 114 115 // Remove old table snapshots data 116 pruneTableSnapshots(snapshotsToComputeSize); 117 118 // Remove old namespace snapshots data 119 pruneNamespaceSnapshots(snapshotsToComputeSize); 120 121 // For each table, compute the size of each snapshot 122 Map<String,Long> namespaceSnapshotSizes = computeSnapshotSizes(snapshotsToComputeSize); 123 124 // Write the size data by namespaces to the quota table. 125 // We need to do this "globally" since each FileArchiverNotifier is limited to its own Table. 126 persistSnapshotSizesForNamespaces(namespaceSnapshotSizes); 127 } 128 129 /** 130 * Removes the snapshot entries that are present in Quota table but not in snapshotsToComputeSize 131 * 132 * @param snapshotsToComputeSize list of snapshots to be persisted 133 */ 134 void pruneTableSnapshots(Multimap<TableName, String> snapshotsToComputeSize) throws IOException { 135 Multimap<TableName, String> existingSnapshotEntries = QuotaTableUtil.getTableSnapshots(conn); 136 Multimap<TableName, String> snapshotEntriesToRemove = HashMultimap.create(); 137 for (Entry<TableName, Collection<String>> entry : existingSnapshotEntries.asMap().entrySet()) { 138 TableName tn = entry.getKey(); 139 Set<String> setOfSnapshots = new HashSet<>(entry.getValue()); 140 for (String snapshot : snapshotsToComputeSize.get(tn)) { 141 setOfSnapshots.remove(snapshot); 142 } 143 144 for (String snapshot : setOfSnapshots) { 145 snapshotEntriesToRemove.put(tn, snapshot); 146 } 147 } 148 removeExistingTableSnapshotSizes(snapshotEntriesToRemove); 149 } 150 151 /** 152 * Removes the snapshot entries that are present in Quota table but not in snapshotsToComputeSize 153 * 154 * @param snapshotsToComputeSize list of snapshots to be persisted 155 */ 156 void pruneNamespaceSnapshots(Multimap<TableName, String> snapshotsToComputeSize) 157 throws IOException { 158 Set<String> existingSnapshotEntries = QuotaTableUtil.getNamespaceSnapshots(conn); 159 for (TableName tableName : snapshotsToComputeSize.keySet()) { 160 existingSnapshotEntries.remove(tableName.getNamespaceAsString()); 161 } 162 // here existingSnapshotEntries is left with the entries to be removed 163 removeExistingNamespaceSnapshotSizes(existingSnapshotEntries); 164 } 165 166 /** 167 * Fetches each table with a quota (table or namespace quota), and then fetch the name of each 168 * snapshot which was created from that table. 169 * 170 * @return A mapping of table to snapshots created from that table 171 */ 172 Multimap<TableName,String> getSnapshotsToComputeSize() throws IOException { 173 Set<TableName> tablesToFetchSnapshotsFrom = new HashSet<>(); 174 QuotaFilter filter = new QuotaFilter(); 175 filter.addTypeFilter(QuotaType.SPACE); 176 try (Admin admin = conn.getAdmin()) { 177 // Pull all of the tables that have quotas (direct, or from namespace) 178 for (QuotaSettings qs : QuotaRetriever.open(conf, filter)) { 179 if (qs.getQuotaType() == QuotaType.SPACE) { 180 String ns = qs.getNamespace(); 181 TableName tn = qs.getTableName(); 182 if ((null == ns && null == tn) || (null != ns && null != tn)) { 183 throw new IllegalStateException( 184 "Expected either one of namespace and tablename to be null but not both"); 185 } 186 // Collect either the table name itself, or all of the tables in the namespace 187 if (null != ns) { 188 tablesToFetchSnapshotsFrom.addAll(Arrays.asList(admin.listTableNamesByNamespace(ns))); 189 } else { 190 tablesToFetchSnapshotsFrom.add(tn); 191 } 192 } 193 } 194 // Fetch all snapshots that were created from these tables 195 return getSnapshotsFromTables(admin, tablesToFetchSnapshotsFrom); 196 } 197 } 198 199 /** 200 * Computes a mapping of originating {@code TableName} to snapshots, when the {@code TableName} 201 * exists in the provided {@code Set}. 202 */ 203 Multimap<TableName,String> getSnapshotsFromTables( 204 Admin admin, Set<TableName> tablesToFetchSnapshotsFrom) throws IOException { 205 Multimap<TableName,String> snapshotsToCompute = HashMultimap.create(); 206 for (org.apache.hadoop.hbase.client.SnapshotDescription sd : admin.listSnapshots()) { 207 TableName tn = sd.getTableName(); 208 if (tablesToFetchSnapshotsFrom.contains(tn)) { 209 snapshotsToCompute.put(tn, sd.getName()); 210 } 211 } 212 return snapshotsToCompute; 213 } 214 215 /** 216 * Computes the size of each snapshot provided given the current files referenced by the table. 217 * 218 * @param snapshotsToComputeSize The snapshots to compute the size of 219 * @return A mapping of table to snapshot created from that table and the snapshot's size. 220 */ 221 Map<String,Long> computeSnapshotSizes( 222 Multimap<TableName,String> snapshotsToComputeSize) throws IOException { 223 final Map<String,Long> snapshotSizesByNamespace = new HashMap<>(); 224 final long start = System.nanoTime(); 225 for (Entry<TableName,Collection<String>> entry : snapshotsToComputeSize.asMap().entrySet()) { 226 final TableName tn = entry.getKey(); 227 final Collection<String> snapshotNames = entry.getValue(); 228 229 // Get our notifier instance, this is tracking archivals that happen out-of-band of this chore 230 FileArchiverNotifier notifier = getNotifierForTable(tn); 231 232 // The total size consumed by all snapshots against this table 233 long totalSnapshotSize = notifier.computeAndStoreSnapshotSizes(snapshotNames); 234 // Bucket that size into the appropriate namespace 235 snapshotSizesByNamespace.merge(tn.getNamespaceAsString(), totalSnapshotSize, Long::sum); 236 } 237 238 // Update the amount of time it took to compute the size of the snapshots for a table 239 if (metrics != null) { 240 metrics.incrementSnapshotSizeComputationTime((System.nanoTime() - start) / 1_000_000); 241 } 242 243 return snapshotSizesByNamespace; 244 } 245 246 /** 247 * Returns the correct instance of {@link FileArchiverNotifier} for the given table name. 248 * 249 * @param tn The table name 250 * @return A {@link FileArchiverNotifier} instance 251 */ 252 FileArchiverNotifier getNotifierForTable(TableName tn) { 253 return FileArchiverNotifierFactoryImpl.getInstance().get(conn, conf, fs, tn); 254 } 255 256 /** 257 * Writes the size used by snapshots for each namespace to the quota table. 258 */ 259 void persistSnapshotSizesForNamespaces( 260 Map<String,Long> snapshotSizesByNamespace) throws IOException { 261 try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 262 quotaTable.put(snapshotSizesByNamespace.entrySet().stream() 263 .map(e -> QuotaTableUtil.createPutForNamespaceSnapshotSize(e.getKey(), e.getValue())) 264 .collect(Collectors.toList())); 265 } 266 } 267 268 void removeExistingTableSnapshotSizes(Multimap<TableName, String> snapshotEntriesToRemove) 269 throws IOException { 270 removeExistingSnapshotSizes( 271 QuotaTableUtil.createDeletesForExistingTableSnapshotSizes(snapshotEntriesToRemove)); 272 } 273 274 void removeExistingNamespaceSnapshotSizes(Set<String> snapshotEntriesToRemove) 275 throws IOException { 276 removeExistingSnapshotSizes( 277 QuotaTableUtil.createDeletesForExistingNamespaceSnapshotSizes(snapshotEntriesToRemove)); 278 } 279 280 void removeExistingSnapshotSizes(List<Delete> deletes) throws IOException { 281 try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 282 quotaTable.delete(deletes); 283 } 284 } 285 286 /** 287 * Extracts the period for the chore from the configuration. 288 * 289 * @param conf The configuration object. 290 * @return The configured chore period or the default value. 291 */ 292 static int getPeriod(Configuration conf) { 293 return conf.getInt(SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 294 SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT); 295 } 296 297 /** 298 * Extracts the initial delay for the chore from the configuration. 299 * 300 * @param conf The configuration object. 301 * @return The configured chore initial delay or the default value. 302 */ 303 static long getInitialDelay(Configuration conf) { 304 return conf.getLong(SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 305 SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT); 306 } 307 308 /** 309 * Extracts the time unit for the chore period and initial delay from the configuration. The 310 * configuration value for {@link #SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY} must correspond to 311 * a {@link TimeUnit} value. 312 * 313 * @param conf The configuration object. 314 * @return The configured time unit for the chore period and initial delay or the default value. 315 */ 316 static TimeUnit getTimeUnit(Configuration conf) { 317 return TimeUnit.valueOf(conf.get(SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY, 318 SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT)); 319 } 320}