001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to you under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.hadoop.hbase.quotas; 018 019import java.io.IOException; 020import java.util.HashMap; 021import java.util.Map; 022import java.util.Map.Entry; 023import java.util.concurrent.TimeUnit; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.ScheduledChore; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.client.Admin; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.client.Result; 030import org.apache.hadoop.hbase.client.ResultScanner; 031import org.apache.hadoop.hbase.client.Table; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * A {@link ScheduledChore} which periodically updates the {@link RegionServerSpaceQuotaManager} 039 * with information from the hbase:quota. 040 */ 041@InterfaceAudience.Private 042public class SpaceQuotaRefresherChore extends ScheduledChore { 043 private static final Logger LOG = LoggerFactory.getLogger(SpaceQuotaRefresherChore.class); 044 045 static final String POLICY_REFRESHER_CHORE_PERIOD_KEY = 046 "hbase.regionserver.quotas.policy.refresher.chore.period"; 047 static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 1; // 1 minute in millis 048 049 static final String POLICY_REFRESHER_CHORE_DELAY_KEY = 050 "hbase.regionserver.quotas.policy.refresher.chore.delay"; 051 static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 15L; // 15 seconds in millis 052 053 static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY = 054 "hbase.regionserver.quotas.policy.refresher.chore.timeunit"; 055 static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name(); 056 057 static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY = 058 "hbase.regionserver.quotas.policy.refresher.report.percent"; 059 static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95; 060 061 private final RegionServerSpaceQuotaManager manager; 062 private final Connection conn; 063 private boolean quotaTablePresent = false; 064 065 public SpaceQuotaRefresherChore(RegionServerSpaceQuotaManager manager, Connection conn) { 066 super(SpaceQuotaRefresherChore.class.getSimpleName(), 067 manager.getRegionServerServices(), 068 getPeriod(manager.getRegionServerServices().getConfiguration()), 069 getInitialDelay(manager.getRegionServerServices().getConfiguration()), 070 getTimeUnit(manager.getRegionServerServices().getConfiguration())); 071 this.manager = manager; 072 this.conn = conn; 073 } 074 075 @Override 076 protected void chore() { 077 try { 078 // check whether quotaTable is present or not. 079 if (!quotaTablePresent && !checkQuotaTableExists()) { 080 LOG.info("Quota table not found, skipping quota manager cache refresh."); 081 return; 082 } 083 // since quotaTable is present so setting the flag as true. 084 quotaTablePresent = true; 085 if (LOG.isTraceEnabled()) { 086 LOG.trace("Reading current quota snapshots from hbase:quota."); 087 } 088 // Get the snapshots that the quota manager is currently aware of 089 final Map<TableName, SpaceQuotaSnapshot> currentSnapshots = 090 getManager().copyQuotaSnapshots(); 091 // Read the new snapshots from the quota table 092 final Map<TableName, SpaceQuotaSnapshot> newSnapshots = fetchSnapshotsFromQuotaTable(); 093 if (LOG.isTraceEnabled()) { 094 LOG.trace(currentSnapshots.size() + " table quota snapshots are collected, " 095 + "read " + newSnapshots.size() + " from the quota table."); 096 } 097 // Iterate over each new quota snapshot 098 for (Entry<TableName, SpaceQuotaSnapshot> entry : newSnapshots.entrySet()) { 099 final TableName tableName = entry.getKey(); 100 final SpaceQuotaSnapshot newSnapshot = entry.getValue(); 101 // May be null! 102 final SpaceQuotaSnapshot currentSnapshot = currentSnapshots.get(tableName); 103 if (LOG.isTraceEnabled()) { 104 LOG.trace(tableName + ": current=" + currentSnapshot + ", new=" + newSnapshot); 105 } 106 if (!newSnapshot.equals(currentSnapshot)) { 107 // We have a new snapshot. 108 // We might need to enforce it or disable the enforcement or switch policy 109 boolean currInViolation = isInViolation(currentSnapshot); 110 boolean newInViolation = newSnapshot.getQuotaStatus().isInViolation(); 111 if (!currInViolation && newInViolation) { 112 if (LOG.isTraceEnabled()) { 113 LOG.trace("Enabling " + newSnapshot + " on " + tableName); 114 } 115 getManager().enforceViolationPolicy(tableName, newSnapshot); 116 } else if (currInViolation && !newInViolation) { 117 if (LOG.isTraceEnabled()) { 118 LOG.trace("Removing quota violation policy on " + tableName); 119 } 120 getManager().disableViolationPolicyEnforcement(tableName); 121 } else if (currInViolation && newInViolation) { 122 if (LOG.isTraceEnabled()) { 123 LOG.trace("Switching quota violation policy on " + tableName + " from " 124 + currentSnapshot + " to " + newSnapshot); 125 } 126 getManager().enforceViolationPolicy(tableName, newSnapshot); 127 } 128 } 129 } 130 131 // Disable violation policy for all such tables which have been removed in new snapshot 132 for (TableName tableName : currentSnapshots.keySet()) { 133 // check whether table was removed in new snapshot 134 if (!newSnapshots.containsKey(tableName)) { 135 if (LOG.isTraceEnabled()) { 136 LOG.trace("Removing quota violation policy on " + tableName); 137 } 138 getManager().disableViolationPolicyEnforcement(tableName); 139 } 140 } 141 142 // We're intentionally ignoring anything extra with the currentSnapshots. If we were missing 143 // information from the RegionServers to create an accurate SpaceQuotaSnapshot in the Master, 144 // the Master will generate a new SpaceQuotaSnapshot which represents this state. This lets 145 // us avoid having to do anything special with currentSnapshots here. 146 147 // Update the snapshots in the manager 148 getManager().updateQuotaSnapshot(newSnapshots); 149 } catch (IOException e) { 150 LOG.warn( 151 "Caught exception while refreshing enforced quota violation policies, will retry.", e); 152 } 153 } 154 155 /** 156 * Checks if hbase:quota exists in hbase:meta 157 * 158 * @return true if hbase:quota table is in meta, else returns false. 159 * @throws IOException throws IOException 160 */ 161 boolean checkQuotaTableExists() throws IOException { 162 try (Admin admin = getConnection().getAdmin()) { 163 return admin.tableExists(QuotaUtil.QUOTA_TABLE_NAME); 164 } 165 } 166 167 /** 168 * Checks if the given <code>snapshot</code> is in violation, allowing the snapshot to be null. 169 * If the snapshot is null, this is interpreted as no snapshot which implies not in violation. 170 * 171 * @param snapshot The snapshot to operate on. 172 * @return true if the snapshot is in violation, false otherwise. 173 */ 174 boolean isInViolation(SpaceQuotaSnapshot snapshot) { 175 if (snapshot == null) { 176 return false; 177 } 178 return snapshot.getQuotaStatus().isInViolation(); 179 } 180 181 /** 182 * Reads all quota snapshots from the quota table. 183 * 184 * @return The current "view" of space use by each table. 185 */ 186 public Map<TableName, SpaceQuotaSnapshot> fetchSnapshotsFromQuotaTable() throws IOException { 187 try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME); 188 ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaSnapshotScan())) { 189 Map<TableName,SpaceQuotaSnapshot> snapshots = new HashMap<>(); 190 for (Result result : scanner) { 191 try { 192 extractQuotaSnapshot(result, snapshots); 193 } catch (IllegalArgumentException e) { 194 final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow()); 195 LOG.error(msg, e); 196 throw new IOException(msg, e); 197 } 198 } 199 return snapshots; 200 } 201 } 202 203 /** 204 * Wrapper around {@link QuotaTableUtil#extractQuotaSnapshot(Result, Map)} for testing. 205 */ 206 void extractQuotaSnapshot(Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) { 207 QuotaTableUtil.extractQuotaSnapshot(result, snapshots); 208 } 209 210 Connection getConnection() { 211 return conn; 212 } 213 214 RegionServerSpaceQuotaManager getManager() { 215 return manager; 216 } 217 218 /** 219 * Extracts the period for the chore from the configuration. 220 * 221 * @param conf The configuration object. 222 * @return The configured chore period or the default value. 223 */ 224 static int getPeriod(Configuration conf) { 225 return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY, 226 POLICY_REFRESHER_CHORE_PERIOD_DEFAULT); 227 } 228 229 /** 230 * Extracts the initial delay for the chore from the configuration. 231 * 232 * @param conf The configuration object. 233 * @return The configured chore initial delay or the default value. 234 */ 235 static long getInitialDelay(Configuration conf) { 236 return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY, 237 POLICY_REFRESHER_CHORE_DELAY_DEFAULT); 238 } 239 240 /** 241 * Extracts the time unit for the chore period and initial delay from the configuration. The 242 * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to 243 * a {@link TimeUnit} value. 244 * 245 * @param conf The configuration object. 246 * @return The configured time unit for the chore period and initial delay or the default value. 247 */ 248 static TimeUnit getTimeUnit(Configuration conf) { 249 return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY, 250 POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT)); 251 } 252 253 /** 254 * Extracts the percent of Regions for a table to have been reported to enable quota violation 255 * state change. 256 * 257 * @param conf The configuration object. 258 * @return The percent of regions reported to use. 259 */ 260 static Double getRegionReportPercent(Configuration conf) { 261 return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY, 262 POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT); 263 } 264}