001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to you under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.hadoop.hbase.quotas; 018 019import java.io.IOException; 020import java.util.HashMap; 021import java.util.Map; 022import java.util.Map.Entry; 023import java.util.concurrent.TimeUnit; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.MetaTableAccessor; 027import org.apache.hadoop.hbase.ScheduledChore; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.Result; 034import org.apache.hadoop.hbase.client.ResultScanner; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.util.Bytes; 037 038/** 039 * A {@link ScheduledChore} which periodically updates the {@link RegionServerSpaceQuotaManager} 040 * with information from the hbase:quota. 041 */ 042@InterfaceAudience.Private 043public class SpaceQuotaRefresherChore extends ScheduledChore { 044 private static final Logger LOG = LoggerFactory.getLogger(SpaceQuotaRefresherChore.class); 045 046 static final String POLICY_REFRESHER_CHORE_PERIOD_KEY = 047 "hbase.regionserver.quotas.policy.refresher.chore.period"; 048 static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 1; // 1 minute in millis 049 050 static final String POLICY_REFRESHER_CHORE_DELAY_KEY = 051 "hbase.regionserver.quotas.policy.refresher.chore.delay"; 052 static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 15L; // 15 seconds in millis 053 054 static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY = 055 "hbase.regionserver.quotas.policy.refresher.chore.timeunit"; 056 static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name(); 057 058 static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY = 059 "hbase.regionserver.quotas.policy.refresher.report.percent"; 060 static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95; 061 062 private final RegionServerSpaceQuotaManager manager; 063 private final Connection conn; 064 private boolean quotaTablePresent = false; 065 066 public SpaceQuotaRefresherChore(RegionServerSpaceQuotaManager manager, Connection conn) { 067 super(SpaceQuotaRefresherChore.class.getSimpleName(), 068 manager.getRegionServerServices(), 069 getPeriod(manager.getRegionServerServices().getConfiguration()), 070 getInitialDelay(manager.getRegionServerServices().getConfiguration()), 071 getTimeUnit(manager.getRegionServerServices().getConfiguration())); 072 this.manager = manager; 073 this.conn = conn; 074 } 075 076 @Override 077 protected void chore() { 078 try { 079 // check whether quotaTable is present or not. 080 if (!quotaTablePresent && !checkQuotaTableExists()) { 081 LOG.info("Quota table not found, skipping quota manager cache refresh."); 082 return; 083 } 084 // since quotaTable is present so setting the flag as true. 085 quotaTablePresent = true; 086 if (LOG.isTraceEnabled()) { 087 LOG.trace("Reading current quota snapshots from hbase:quota."); 088 } 089 // Get the snapshots that the quota manager is currently aware of 090 final Map<TableName, SpaceQuotaSnapshot> currentSnapshots = 091 getManager().copyQuotaSnapshots(); 092 // Read the new snapshots from the quota table 093 final Map<TableName, SpaceQuotaSnapshot> newSnapshots = fetchSnapshotsFromQuotaTable(); 094 if (LOG.isTraceEnabled()) { 095 LOG.trace(currentSnapshots.size() + " table quota snapshots are collected, " 096 + "read " + newSnapshots.size() + " from the quota table."); 097 } 098 // Iterate over each new quota snapshot 099 for (Entry<TableName, SpaceQuotaSnapshot> entry : newSnapshots.entrySet()) { 100 final TableName tableName = entry.getKey(); 101 final SpaceQuotaSnapshot newSnapshot = entry.getValue(); 102 // May be null! 103 final SpaceQuotaSnapshot currentSnapshot = currentSnapshots.get(tableName); 104 if (LOG.isTraceEnabled()) { 105 LOG.trace(tableName + ": current=" + currentSnapshot + ", new=" + newSnapshot); 106 } 107 if (!newSnapshot.equals(currentSnapshot)) { 108 // We have a new snapshot. 109 // We might need to enforce it or disable the enforcement or switch policy 110 boolean currInViolation = isInViolation(currentSnapshot); 111 boolean newInViolation = newSnapshot.getQuotaStatus().isInViolation(); 112 if (!currInViolation && newInViolation) { 113 if (LOG.isTraceEnabled()) { 114 LOG.trace("Enabling " + newSnapshot + " on " + tableName); 115 } 116 getManager().enforceViolationPolicy(tableName, newSnapshot); 117 } else if (currInViolation && !newInViolation) { 118 if (LOG.isTraceEnabled()) { 119 LOG.trace("Removing quota violation policy on " + tableName); 120 } 121 getManager().disableViolationPolicyEnforcement(tableName); 122 } else if (currInViolation && newInViolation) { 123 if (LOG.isTraceEnabled()) { 124 LOG.trace("Switching quota violation policy on " + tableName + " from " 125 + currentSnapshot + " to " + newSnapshot); 126 } 127 getManager().enforceViolationPolicy(tableName, newSnapshot); 128 } 129 } 130 } 131 132 // Disable violation policy for all such tables which have been removed in new snapshot 133 for (TableName tableName : currentSnapshots.keySet()) { 134 // check whether table was removed in new snapshot 135 if (!newSnapshots.containsKey(tableName)) { 136 if (LOG.isTraceEnabled()) { 137 LOG.trace("Removing quota violation policy on " + tableName); 138 } 139 getManager().disableViolationPolicyEnforcement(tableName); 140 } 141 } 142 143 // We're intentionally ignoring anything extra with the currentSnapshots. If we were missing 144 // information from the RegionServers to create an accurate SpaceQuotaSnapshot in the Master, 145 // the Master will generate a new SpaceQuotaSnapshot which represents this state. This lets 146 // us avoid having to do anything special with currentSnapshots here. 147 148 // Update the snapshots in the manager 149 getManager().updateQuotaSnapshot(newSnapshots); 150 } catch (IOException e) { 151 LOG.warn( 152 "Caught exception while refreshing enforced quota violation policies, will retry.", e); 153 } 154 } 155 156 /** 157 * Checks if hbase:quota exists in hbase:meta 158 * 159 * @return true if hbase:quota table is in meta, else returns false. 160 * @throws IOException throws IOException 161 */ 162 boolean checkQuotaTableExists() throws IOException { 163 return MetaTableAccessor.tableExists(getConnection(), QuotaUtil.QUOTA_TABLE_NAME); 164 } 165 166 /** 167 * Checks if the given <code>snapshot</code> is in violation, allowing the snapshot to be null. 168 * If the snapshot is null, this is interpreted as no snapshot which implies not in violation. 169 * 170 * @param snapshot The snapshot to operate on. 171 * @return true if the snapshot is in violation, false otherwise. 172 */ 173 boolean isInViolation(SpaceQuotaSnapshot snapshot) { 174 if (snapshot == null) { 175 return false; 176 } 177 return snapshot.getQuotaStatus().isInViolation(); 178 } 179 180 /** 181 * Reads all quota snapshots from the quota table. 182 * 183 * @return The current "view" of space use by each table. 184 */ 185 public Map<TableName, SpaceQuotaSnapshot> fetchSnapshotsFromQuotaTable() throws IOException { 186 try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME); 187 ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaSnapshotScan())) { 188 Map<TableName,SpaceQuotaSnapshot> snapshots = new HashMap<>(); 189 for (Result result : scanner) { 190 try { 191 extractQuotaSnapshot(result, snapshots); 192 } catch (IllegalArgumentException e) { 193 final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow()); 194 LOG.error(msg, e); 195 throw new IOException(msg, e); 196 } 197 } 198 return snapshots; 199 } 200 } 201 202 /** 203 * Wrapper around {@link QuotaTableUtil#extractQuotaSnapshot(Result, Map)} for testing. 204 */ 205 void extractQuotaSnapshot(Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) { 206 QuotaTableUtil.extractQuotaSnapshot(result, snapshots); 207 } 208 209 Connection getConnection() { 210 return conn; 211 } 212 213 RegionServerSpaceQuotaManager getManager() { 214 return manager; 215 } 216 217 /** 218 * Extracts the period for the chore from the configuration. 219 * 220 * @param conf The configuration object. 221 * @return The configured chore period or the default value. 222 */ 223 static int getPeriod(Configuration conf) { 224 return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY, 225 POLICY_REFRESHER_CHORE_PERIOD_DEFAULT); 226 } 227 228 /** 229 * Extracts the initial delay for the chore from the configuration. 230 * 231 * @param conf The configuration object. 232 * @return The configured chore initial delay or the default value. 233 */ 234 static long getInitialDelay(Configuration conf) { 235 return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY, 236 POLICY_REFRESHER_CHORE_DELAY_DEFAULT); 237 } 238 239 /** 240 * Extracts the time unit for the chore period and initial delay from the configuration. The 241 * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to 242 * a {@link TimeUnit} value. 243 * 244 * @param conf The configuration object. 245 * @return The configured time unit for the chore period and initial delay or the default value. 246 */ 247 static TimeUnit getTimeUnit(Configuration conf) { 248 return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY, 249 POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT)); 250 } 251 252 /** 253 * Extracts the percent of Regions for a table to have been reported to enable quota violation 254 * state change. 255 * 256 * @param conf The configuration object. 257 * @return The percent of regions reported to use. 258 */ 259 static Double getRegionReportPercent(Configuration conf) { 260 return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY, 261 POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT); 262 } 263}