001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.HashMap; 022import java.util.Map; 023import java.util.Map.Entry; 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.ScheduledChore; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.Admin; 029import org.apache.hadoop.hbase.client.Connection; 030import org.apache.hadoop.hbase.client.Result; 031import org.apache.hadoop.hbase.client.ResultScanner; 032import org.apache.hadoop.hbase.client.Table; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * A {@link ScheduledChore} which periodically updates the {@link RegionServerSpaceQuotaManager} 040 * with information from the hbase:quota. 041 */ 042@InterfaceAudience.Private 043public class SpaceQuotaRefresherChore extends ScheduledChore { 044 private static final Logger LOG = LoggerFactory.getLogger(SpaceQuotaRefresherChore.class); 045 046 static final String POLICY_REFRESHER_CHORE_PERIOD_KEY = 047 "hbase.regionserver.quotas.policy.refresher.chore.period"; 048 static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 1; // 1 minute in millis 049 050 static final String POLICY_REFRESHER_CHORE_DELAY_KEY = 051 "hbase.regionserver.quotas.policy.refresher.chore.delay"; 052 static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 15L; // 15 seconds in millis 053 054 static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY = 055 "hbase.regionserver.quotas.policy.refresher.chore.timeunit"; 056 static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name(); 057 058 static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY = 059 "hbase.regionserver.quotas.policy.refresher.report.percent"; 060 static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT = 0.95; 061 062 private final RegionServerSpaceQuotaManager manager; 063 private final Connection conn; 064 private boolean quotaTablePresent = false; 065 066 public SpaceQuotaRefresherChore(RegionServerSpaceQuotaManager manager, Connection conn) { 067 super(SpaceQuotaRefresherChore.class.getSimpleName(), manager.getRegionServerServices(), 068 getPeriod(manager.getRegionServerServices().getConfiguration()), 069 getInitialDelay(manager.getRegionServerServices().getConfiguration()), 070 getTimeUnit(manager.getRegionServerServices().getConfiguration())); 071 this.manager = manager; 072 this.conn = conn; 073 } 074 075 @Override 076 protected void chore() { 077 try { 078 // check whether quotaTable is present or not. 079 if (!quotaTablePresent && !checkQuotaTableExists()) { 080 LOG.info("Quota table not found, skipping quota manager cache refresh."); 081 return; 082 } 083 // since quotaTable is present so setting the flag as true. 084 quotaTablePresent = true; 085 if (LOG.isTraceEnabled()) { 086 LOG.trace("Reading current quota snapshots from hbase:quota."); 087 } 088 // Get the snapshots that the quota manager is currently aware of 089 final Map<TableName, SpaceQuotaSnapshot> currentSnapshots = getManager().copyQuotaSnapshots(); 090 // Read the new snapshots from the quota table 091 final Map<TableName, SpaceQuotaSnapshot> newSnapshots = fetchSnapshotsFromQuotaTable(); 092 if (LOG.isTraceEnabled()) { 093 LOG.trace(currentSnapshots.size() + " table quota snapshots are collected, " + "read " 094 + newSnapshots.size() + " from the quota table."); 095 } 096 // Iterate over each new quota snapshot 097 for (Entry<TableName, SpaceQuotaSnapshot> entry : newSnapshots.entrySet()) { 098 final TableName tableName = entry.getKey(); 099 final SpaceQuotaSnapshot newSnapshot = entry.getValue(); 100 // May be null! 101 final SpaceQuotaSnapshot currentSnapshot = currentSnapshots.get(tableName); 102 if (LOG.isTraceEnabled()) { 103 LOG.trace(tableName + ": current=" + currentSnapshot + ", new=" + newSnapshot); 104 } 105 if (!newSnapshot.equals(currentSnapshot)) { 106 // We have a new snapshot. 107 // We might need to enforce it or disable the enforcement or switch policy 108 boolean currInViolation = isInViolation(currentSnapshot); 109 boolean newInViolation = newSnapshot.getQuotaStatus().isInViolation(); 110 if (!currInViolation && newInViolation) { 111 if (LOG.isTraceEnabled()) { 112 LOG.trace("Enabling " + newSnapshot + " on " + tableName); 113 } 114 getManager().enforceViolationPolicy(tableName, newSnapshot); 115 } else if (currInViolation && !newInViolation) { 116 if (LOG.isTraceEnabled()) { 117 LOG.trace("Removing quota violation policy on " + tableName); 118 } 119 getManager().disableViolationPolicyEnforcement(tableName); 120 } else if (currInViolation && newInViolation) { 121 if (LOG.isTraceEnabled()) { 122 LOG.trace("Switching quota violation policy on " + tableName + " from " 123 + currentSnapshot + " to " + newSnapshot); 124 } 125 getManager().enforceViolationPolicy(tableName, newSnapshot); 126 } 127 } 128 } 129 130 // Disable violation policy for all such tables which have been removed in new snapshot 131 for (TableName tableName : currentSnapshots.keySet()) { 132 // check whether table was removed in new snapshot 133 if (!newSnapshots.containsKey(tableName)) { 134 if (LOG.isTraceEnabled()) { 135 LOG.trace("Removing quota violation policy on " + tableName); 136 } 137 getManager().disableViolationPolicyEnforcement(tableName); 138 } 139 } 140 141 // We're intentionally ignoring anything extra with the currentSnapshots. If we were missing 142 // information from the RegionServers to create an accurate SpaceQuotaSnapshot in the Master, 143 // the Master will generate a new SpaceQuotaSnapshot which represents this state. This lets 144 // us avoid having to do anything special with currentSnapshots here. 145 146 // Update the snapshots in the manager 147 getManager().updateQuotaSnapshot(newSnapshots); 148 } catch (IOException e) { 149 LOG.warn("Caught exception while refreshing enforced quota violation policies, will retry.", 150 e); 151 } 152 } 153 154 /** 155 * Checks if hbase:quota exists in hbase:meta 156 * @return true if hbase:quota table is in meta, else returns false. 157 * @throws IOException throws IOException 158 */ 159 boolean checkQuotaTableExists() throws IOException { 160 try (Admin admin = getConnection().getAdmin()) { 161 return admin.tableExists(QuotaUtil.QUOTA_TABLE_NAME); 162 } 163 } 164 165 /** 166 * Checks if the given <code>snapshot</code> is in violation, allowing the snapshot to be null. If 167 * the snapshot is null, this is interpreted as no snapshot which implies not in violation. 168 * @param snapshot The snapshot to operate on. 169 * @return true if the snapshot is in violation, false otherwise. 170 */ 171 boolean isInViolation(SpaceQuotaSnapshot snapshot) { 172 if (snapshot == null) { 173 return false; 174 } 175 return snapshot.getQuotaStatus().isInViolation(); 176 } 177 178 /** 179 * Reads all quota snapshots from the quota table. 180 * @return The current "view" of space use by each table. 181 */ 182 public Map<TableName, SpaceQuotaSnapshot> fetchSnapshotsFromQuotaTable() throws IOException { 183 try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME); 184 ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaSnapshotScan())) { 185 Map<TableName, SpaceQuotaSnapshot> snapshots = new HashMap<>(); 186 for (Result result : scanner) { 187 try { 188 extractQuotaSnapshot(result, snapshots); 189 } catch (IllegalArgumentException e) { 190 final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow()); 191 LOG.error(msg, e); 192 throw new IOException(msg, e); 193 } 194 } 195 return snapshots; 196 } 197 } 198 199 /** 200 * Wrapper around {@link QuotaTableUtil#extractQuotaSnapshot(Result, Map)} for testing. 201 */ 202 void extractQuotaSnapshot(Result result, Map<TableName, SpaceQuotaSnapshot> snapshots) { 203 QuotaTableUtil.extractQuotaSnapshot(result, snapshots); 204 } 205 206 Connection getConnection() { 207 return conn; 208 } 209 210 RegionServerSpaceQuotaManager getManager() { 211 return manager; 212 } 213 214 /** 215 * Extracts the period for the chore from the configuration. 216 * @param conf The configuration object. 217 * @return The configured chore period or the default value. 218 */ 219 static int getPeriod(Configuration conf) { 220 return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY, POLICY_REFRESHER_CHORE_PERIOD_DEFAULT); 221 } 222 223 /** 224 * Extracts the initial delay for the chore from the configuration. 225 * @param conf The configuration object. 226 * @return The configured chore initial delay or the default value. 227 */ 228 static long getInitialDelay(Configuration conf) { 229 return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY, POLICY_REFRESHER_CHORE_DELAY_DEFAULT); 230 } 231 232 /** 233 * Extracts the time unit for the chore period and initial delay from the configuration. The 234 * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to a 235 * {@link TimeUnit} value. 236 * @param conf The configuration object. 237 * @return The configured time unit for the chore period and initial delay or the default value. 238 */ 239 static TimeUnit getTimeUnit(Configuration conf) { 240 return TimeUnit.valueOf( 241 conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY, POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT)); 242 } 243 244 /** 245 * Extracts the percent of Regions for a table to have been reported to enable quota violation 246 * state change. 247 * @param conf The configuration object. 248 * @return The percent of regions reported to use. 249 */ 250 static Double getRegionReportPercent(Configuration conf) { 251 return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY, 252 POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT); 253 } 254}