001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to you under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.hadoop.hbase.quotas; 018 019import java.io.IOException; 020import java.util.HashMap; 021import java.util.Map; 022import java.util.Objects; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.atomic.AtomicReference; 025import java.util.Map.Entry; 026 027import org.apache.hadoop.hbase.TableName; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031import org.apache.hadoop.hbase.client.Connection; 032import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 033import org.apache.hadoop.hbase.regionserver.RegionServerServices; 034 035import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 036 037/** 038 * A manager for filesystem space quotas in the RegionServer. 039 * 040 * This class is the centralized point for what a RegionServer knows about space quotas 041 * on tables. For each table, it tracks two different things: the {@link SpaceQuotaSnapshot} 042 * and a {@link SpaceViolationPolicyEnforcement} (which may be null when a quota is not 043 * being violated). Both of these are sensitive on when they were last updated. The 044 * {link SpaceQutoaViolationPolicyRefresherChore} periodically runs and updates 045 * the state on <code>this</code>. 046 */ 047@InterfaceAudience.Private 048public class RegionServerSpaceQuotaManager { 049 private static final Logger LOG = LoggerFactory.getLogger(RegionServerSpaceQuotaManager.class); 050 051 private final RegionServerServices rsServices; 052 053 private SpaceQuotaRefresherChore spaceQuotaRefresher; 054 private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots; 055 private boolean started = false; 056 private final ConcurrentHashMap<TableName,SpaceViolationPolicyEnforcement> enforcedPolicies; 057 private SpaceViolationPolicyEnforcementFactory factory; 058 059 public RegionServerSpaceQuotaManager(RegionServerServices rsServices) { 060 this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance()); 061 } 062 063 @VisibleForTesting 064 RegionServerSpaceQuotaManager( 065 RegionServerServices rsServices, SpaceViolationPolicyEnforcementFactory factory) { 066 this.rsServices = Objects.requireNonNull(rsServices); 067 this.factory = factory; 068 this.enforcedPolicies = new ConcurrentHashMap<>(); 069 this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>()); 070 } 071 072 public synchronized void start() throws IOException { 073 if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { 074 LOG.info("Quota support disabled, not starting space quota manager."); 075 return; 076 } 077 078 if (started) { 079 LOG.warn("RegionServerSpaceQuotaManager has already been started!"); 080 return; 081 } 082 this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getClusterConnection()); 083 rsServices.getChoreService().scheduleChore(spaceQuotaRefresher); 084 started = true; 085 } 086 087 public synchronized void stop() { 088 if (spaceQuotaRefresher != null) { 089 spaceQuotaRefresher.cancel(); 090 spaceQuotaRefresher = null; 091 } 092 started = false; 093 } 094 095 /** 096 * @return if the {@code Chore} has been started. 097 */ 098 public boolean isStarted() { 099 return started; 100 } 101 102 /** 103 * Copies the last {@link SpaceQuotaSnapshot}s that were recorded. The current view 104 * of what the RegionServer thinks the table's utilization is. 105 */ 106 public Map<TableName,SpaceQuotaSnapshot> copyQuotaSnapshots() { 107 return new HashMap<>(currentQuotaSnapshots.get()); 108 } 109 110 /** 111 * Updates the current {@link SpaceQuotaSnapshot}s for the RegionServer. 112 * 113 * @param newSnapshots The space quota snapshots. 114 */ 115 public void updateQuotaSnapshot(Map<TableName,SpaceQuotaSnapshot> newSnapshots) { 116 currentQuotaSnapshots.set(Objects.requireNonNull(newSnapshots)); 117 } 118 119 /** 120 * Creates an object well-suited for the RegionServer to use in verifying active policies. 121 */ 122 public ActivePolicyEnforcement getActiveEnforcements() { 123 return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices); 124 } 125 126 /** 127 * Converts a map of table to {@link SpaceViolationPolicyEnforcement}s into 128 * {@link SpaceViolationPolicy}s. 129 */ 130 public Map<TableName, SpaceQuotaSnapshot> getActivePoliciesAsMap() { 131 final Map<TableName, SpaceViolationPolicyEnforcement> enforcements = 132 copyActiveEnforcements(); 133 final Map<TableName, SpaceQuotaSnapshot> policies = new HashMap<>(); 134 for (Entry<TableName, SpaceViolationPolicyEnforcement> entry : enforcements.entrySet()) { 135 final SpaceQuotaSnapshot snapshot = entry.getValue().getQuotaSnapshot(); 136 if (snapshot != null) { 137 policies.put(entry.getKey(), snapshot); 138 } 139 } 140 return policies; 141 } 142 143 /** 144 * Enforces the given violationPolicy on the given table in this RegionServer. 145 */ 146 public void enforceViolationPolicy(TableName tableName, SpaceQuotaSnapshot snapshot) { 147 SpaceQuotaStatus status = snapshot.getQuotaStatus(); 148 if (!status.isInViolation()) { 149 throw new IllegalStateException( 150 tableName + " is not in violation. Violation policy should not be enabled."); 151 } 152 if (LOG.isTraceEnabled()) { 153 LOG.trace( 154 "Enabling violation policy enforcement on " + tableName 155 + " with policy " + status.getPolicy()); 156 } 157 // Construct this outside of the lock 158 final SpaceViolationPolicyEnforcement enforcement = getFactory().create( 159 getRegionServerServices(), tableName, snapshot); 160 // "Enables" the policy 161 // HBASE-XXXX: Should this synchronize on the actual table name instead of the map? That would 162 // allow policy enable/disable on different tables to happen concurrently. As written now, only 163 // one table will be allowed to transition at a time. This is probably OK, but not sure if 164 // it would become a bottleneck at large clusters/number of tables. 165 synchronized (enforcedPolicies) { 166 try { 167 enforcement.enable(); 168 } catch (IOException e) { 169 LOG.error("Failed to enable space violation policy for " + tableName 170 + ". This table will not enter violation.", e); 171 return; 172 } 173 enforcedPolicies.put(tableName, enforcement); 174 } 175 } 176 177 /** 178 * Disables enforcement on any violation policy on the given <code>tableName</code>. 179 */ 180 public void disableViolationPolicyEnforcement(TableName tableName) { 181 if (LOG.isTraceEnabled()) { 182 LOG.trace("Disabling violation policy enforcement on " + tableName); 183 } 184 // "Disables" the policy 185 synchronized (enforcedPolicies) { 186 SpaceViolationPolicyEnforcement enforcement = enforcedPolicies.remove(tableName); 187 if (enforcement != null) { 188 try { 189 enforcement.disable(); 190 } catch (IOException e) { 191 LOG.error("Failed to disable space violation policy for " + tableName 192 + ". This table will remain in violation.", e); 193 enforcedPolicies.put(tableName, enforcement); 194 } 195 } 196 } 197 } 198 199 /** 200 * Returns whether or not compactions should be disabled for the given <code>tableName</code> per 201 * a space quota violation policy. A convenience method. 202 * 203 * @param tableName The table to check 204 * @return True if compactions should be disabled for the table, false otherwise. 205 */ 206 public boolean areCompactionsDisabled(TableName tableName) { 207 SpaceViolationPolicyEnforcement enforcement = this.enforcedPolicies.get(Objects.requireNonNull(tableName)); 208 if (enforcement != null) { 209 return enforcement.areCompactionsDisabled(); 210 } 211 return false; 212 } 213 214 /** 215 * Returns the collection of tables which have quota violation policies enforced on 216 * this RegionServer. 217 */ 218 Map<TableName,SpaceViolationPolicyEnforcement> copyActiveEnforcements() { 219 // Allows reads to happen concurrently (or while the map is being updated) 220 return new HashMap<>(this.enforcedPolicies); 221 } 222 223 RegionServerServices getRegionServerServices() { 224 return rsServices; 225 } 226 227 Connection getConnection() { 228 return rsServices.getConnection(); 229 } 230 231 SpaceViolationPolicyEnforcementFactory getFactory() { 232 return factory; 233 } 234}