001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to you under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.hadoop.hbase.quotas; 018 019import java.io.IOException; 020import java.util.HashMap; 021import java.util.Map; 022import java.util.Map.Entry; 023import java.util.concurrent.TimeUnit; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.ScheduledChore; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031import org.apache.hadoop.hbase.client.Connection; 032import org.apache.hadoop.hbase.client.Result; 033import org.apache.hadoop.hbase.client.ResultScanner; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.util.Bytes; 036 037/** 038 * A {@link ScheduledChore} which periodically updates the {@link RegionServerSpaceQuotaManager} 039 * with information from the hbase:quota. 040 */ 041@InterfaceAudience.Private 042public class SpaceQuotaRefresherChore extends ScheduledChore { 043 private static final Logger LOG = LoggerFactory.getLogger(SpaceQuotaRefresherChore.class); 044 045 static final String POLICY_REFRESHER_CHORE_PERIOD_KEY = 046 "hbase.regionserver.quotas.policy.refresher.chore.period"; 047 static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 1; // 1 minute in millis 048 049 static final String POLICY_REFRESHER_CHORE_DELAY_KEY = 050 "hbase.regionserver.quotas.policy.refresher.chore.delay"; 051 static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 15L; // 15 seconds in millis 052 053 static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY = 054 "hbase.regionserver.quotas.policy.refresher.chore.timeunit"; 055 static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name(); 056 057 static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY = 058 "hbase.regionserver.quotas.policy.refresher.report.percent"; 059 static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95; 060 061 private final RegionServerSpaceQuotaManager manager; 062 private final Connection conn; 063 064 public SpaceQuotaRefresherChore(RegionServerSpaceQuotaManager manager, Connection conn) { 065 super(SpaceQuotaRefresherChore.class.getSimpleName(), 066 manager.getRegionServerServices(), 067 getPeriod(manager.getRegionServerServices().getConfiguration()), 068 getInitialDelay(manager.getRegionServerServices().getConfiguration()), 069 getTimeUnit(manager.getRegionServerServices().getConfiguration())); 070 this.manager = manager; 071 this.conn = conn; 072 } 073 074 @Override 075 protected void chore() { 076 try { 077 if (LOG.isTraceEnabled()) { 078 LOG.trace("Reading current quota snapshots from hbase:quota."); 079 } 080 // Get the snapshots that the quota manager is currently aware of 081 final Map<TableName, SpaceQuotaSnapshot> currentSnapshots = 082 getManager().copyQuotaSnapshots(); 083 // Read the new snapshots from the quota table 084 final Map<TableName, SpaceQuotaSnapshot> newSnapshots = fetchSnapshotsFromQuotaTable(); 085 if (LOG.isTraceEnabled()) { 086 LOG.trace(currentSnapshots.size() + " table quota snapshots are collected, " 087 + "read " + newSnapshots.size() + " from the quota table."); 088 } 089 // Iterate over each new quota snapshot 090 for (Entry<TableName, SpaceQuotaSnapshot> entry : newSnapshots.entrySet()) { 091 final TableName tableName = entry.getKey(); 092 final SpaceQuotaSnapshot newSnapshot = entry.getValue(); 093 // May be null! 094 final SpaceQuotaSnapshot currentSnapshot = currentSnapshots.get(tableName); 095 if (LOG.isTraceEnabled()) { 096 LOG.trace(tableName + ": current=" + currentSnapshot + ", new=" + newSnapshot); 097 } 098 if (!newSnapshot.equals(currentSnapshot)) { 099 // We have a new snapshot. 100 // We might need to enforce it or disable the enforcement or switch policy 101 boolean currInViolation = isInViolation(currentSnapshot); 102 boolean newInViolation = newSnapshot.getQuotaStatus().isInViolation(); 103 if (!currInViolation && newInViolation) { 104 if (LOG.isTraceEnabled()) { 105 LOG.trace("Enabling " + newSnapshot + " on " + tableName); 106 } 107 getManager().enforceViolationPolicy(tableName, newSnapshot); 108 } else if (currInViolation && !newInViolation) { 109 if (LOG.isTraceEnabled()) { 110 LOG.trace("Removing quota violation policy on " + tableName); 111 } 112 getManager().disableViolationPolicyEnforcement(tableName); 113 } else if (currInViolation && newInViolation) { 114 if (LOG.isTraceEnabled()) { 115 LOG.trace("Switching quota violation policy on " + tableName + " from " 116 + currentSnapshot + " to " + newSnapshot); 117 } 118 getManager().enforceViolationPolicy(tableName, newSnapshot); 119 } 120 } 121 } 122 123 // Disable violation policy for all such tables which have been removed in new snapshot 124 for (TableName tableName : currentSnapshots.keySet()) { 125 // check whether table was removed in new snapshot 126 if (!newSnapshots.containsKey(tableName)) { 127 if (LOG.isTraceEnabled()) { 128 LOG.trace("Removing quota violation policy on " + tableName); 129 } 130 getManager().disableViolationPolicyEnforcement(tableName); 131 } 132 } 133 134 // We're intentionally ignoring anything extra with the currentSnapshots. If we were missing 135 // information from the RegionServers to create an accurate SpaceQuotaSnapshot in the Master, 136 // the Master will generate a new SpaceQuotaSnapshot which represents this state. This lets 137 // us avoid having to do anything special with currentSnapshots here. 138 139 // Update the snapshots in the manager 140 getManager().updateQuotaSnapshot(newSnapshots); 141 } catch (IOException e) { 142 LOG.warn( 143 "Caught exception while refreshing enforced quota violation policies, will retry.", e); 144 } 145 } 146 147 /** 148 * Checks if the given <code>snapshot</code> is in violation, allowing the snapshot to be null. 149 * If the snapshot is null, this is interpreted as no snapshot which implies not in violation. 150 * 151 * @param snapshot The snapshot to operate on. 152 * @return true if the snapshot is in violation, false otherwise. 153 */ 154 boolean isInViolation(SpaceQuotaSnapshot snapshot) { 155 if (snapshot == null) { 156 return false; 157 } 158 return snapshot.getQuotaStatus().isInViolation(); 159 } 160 161 /** 162 * Reads all quota snapshots from the quota table. 163 * 164 * @return The current "view" of space use by each table. 165 */ 166 public Map<TableName, SpaceQuotaSnapshot> fetchSnapshotsFromQuotaTable() throws IOException { 167 try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME); 168 ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaSnapshotScan())) { 169 Map<TableName,SpaceQuotaSnapshot> snapshots = new HashMap<>(); 170 for (Result result : scanner) { 171 try { 172 extractQuotaSnapshot(result, snapshots); 173 } catch (IllegalArgumentException e) { 174 final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow()); 175 LOG.error(msg, e); 176 throw new IOException(msg, e); 177 } 178 } 179 return snapshots; 180 } 181 } 182 183 /** 184 * Wrapper around {@link QuotaTableUtil#extractQuotaSnapshot(Result, Map)} for testing. 185 */ 186 void extractQuotaSnapshot(Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) { 187 QuotaTableUtil.extractQuotaSnapshot(result, snapshots); 188 } 189 190 Connection getConnection() { 191 return conn; 192 } 193 194 RegionServerSpaceQuotaManager getManager() { 195 return manager; 196 } 197 198 /** 199 * Extracts the period for the chore from the configuration. 200 * 201 * @param conf The configuration object. 202 * @return The configured chore period or the default value. 203 */ 204 static int getPeriod(Configuration conf) { 205 return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY, 206 POLICY_REFRESHER_CHORE_PERIOD_DEFAULT); 207 } 208 209 /** 210 * Extracts the initial delay for the chore from the configuration. 211 * 212 * @param conf The configuration object. 213 * @return The configured chore initial delay or the default value. 214 */ 215 static long getInitialDelay(Configuration conf) { 216 return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY, 217 POLICY_REFRESHER_CHORE_DELAY_DEFAULT); 218 } 219 220 /** 221 * Extracts the time unit for the chore period and initial delay from the configuration. The 222 * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to 223 * a {@link TimeUnit} value. 224 * 225 * @param conf The configuration object. 226 * @return The configured time unit for the chore period and initial delay or the default value. 227 */ 228 static TimeUnit getTimeUnit(Configuration conf) { 229 return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY, 230 POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT)); 231 } 232 233 /** 234 * Extracts the percent of Regions for a table to have been reported to enable quota violation 235 * state change. 236 * 237 * @param conf The configuration object. 238 * @return The percent of regions reported to use. 239 */ 240 static Double getRegionReportPercent(Configuration conf) { 241 return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY, 242 POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT); 243 } 244}