001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to you under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.hadoop.hbase.quotas; 018 019import java.io.IOException; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.Map; 023import java.util.Objects; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.atomic.AtomicReference; 026import java.util.Map.Entry; 027 028import org.apache.hadoop.hbase.TableName; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 034import org.apache.hadoop.hbase.regionserver.RegionServerServices; 035 036import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 037import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 038 039import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 042 043/** 044 * A manager for filesystem space quotas in the RegionServer. 045 * 046 * This class is the centralized point for what a RegionServer knows about space quotas 047 * on tables. For each table, it tracks two different things: the {@link SpaceQuotaSnapshot} 048 * and a {@link SpaceViolationPolicyEnforcement} (which may be null when a quota is not 049 * being violated). Both of these are sensitive on when they were last updated. The 050 * {link SpaceQutoaViolationPolicyRefresherChore} periodically runs and updates 051 * the state on <code>this</code>. 052 */ 053@InterfaceAudience.Private 054public class RegionServerSpaceQuotaManager { 055 private static final Logger LOG = LoggerFactory.getLogger(RegionServerSpaceQuotaManager.class); 056 057 private final RegionServerServices rsServices; 058 059 private SpaceQuotaRefresherChore spaceQuotaRefresher; 060 private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots; 061 private boolean started = false; 062 private final ConcurrentHashMap<TableName,SpaceViolationPolicyEnforcement> enforcedPolicies; 063 private SpaceViolationPolicyEnforcementFactory factory; 064 private RegionSizeStore regionSizeStore; 065 private RegionSizeReportingChore regionSizeReporter; 066 067 public RegionServerSpaceQuotaManager(RegionServerServices rsServices) { 068 this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance()); 069 } 070 071 @VisibleForTesting 072 RegionServerSpaceQuotaManager( 073 RegionServerServices rsServices, SpaceViolationPolicyEnforcementFactory factory) { 074 this.rsServices = Objects.requireNonNull(rsServices); 075 this.factory = factory; 076 this.enforcedPolicies = new ConcurrentHashMap<>(); 077 this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>()); 078 // Initialize the size store to not track anything -- create the real one if we're start()'ed 079 this.regionSizeStore = NoOpRegionSizeStore.getInstance(); 080 } 081 082 public synchronized void start() throws IOException { 083 if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { 084 LOG.info("Quota support disabled, not starting space quota manager."); 085 return; 086 } 087 088 if (started) { 089 LOG.warn("RegionServerSpaceQuotaManager has already been started!"); 090 return; 091 } 092 // Start the chores 093 this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getClusterConnection()); 094 rsServices.getChoreService().scheduleChore(spaceQuotaRefresher); 095 this.regionSizeReporter = new RegionSizeReportingChore(rsServices); 096 rsServices.getChoreService().scheduleChore(regionSizeReporter); 097 // Instantiate the real RegionSizeStore 098 this.regionSizeStore = RegionSizeStoreFactory.getInstance().createStore(); 099 started = true; 100 } 101 102 public synchronized void stop() { 103 if (spaceQuotaRefresher != null) { 104 spaceQuotaRefresher.cancel(); 105 spaceQuotaRefresher = null; 106 } 107 if (regionSizeReporter != null) { 108 regionSizeReporter.cancel(); 109 regionSizeReporter = null; 110 } 111 started = false; 112 } 113 114 /** 115 * @return if the {@code Chore} has been started. 116 */ 117 public boolean isStarted() { 118 return started; 119 } 120 121 /** 122 * Copies the last {@link SpaceQuotaSnapshot}s that were recorded. The current view 123 * of what the RegionServer thinks the table's utilization is. 124 */ 125 public Map<TableName,SpaceQuotaSnapshot> copyQuotaSnapshots() { 126 return new HashMap<>(currentQuotaSnapshots.get()); 127 } 128 129 /** 130 * Updates the current {@link SpaceQuotaSnapshot}s for the RegionServer. 131 * 132 * @param newSnapshots The space quota snapshots. 133 */ 134 public void updateQuotaSnapshot(Map<TableName,SpaceQuotaSnapshot> newSnapshots) { 135 currentQuotaSnapshots.set(Objects.requireNonNull(newSnapshots)); 136 } 137 138 /** 139 * Creates an object well-suited for the RegionServer to use in verifying active policies. 140 */ 141 public ActivePolicyEnforcement getActiveEnforcements() { 142 return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices); 143 } 144 145 /** 146 * Converts a map of table to {@link SpaceViolationPolicyEnforcement}s into 147 * {@link SpaceViolationPolicy}s. 148 */ 149 public Map<TableName, SpaceQuotaSnapshot> getActivePoliciesAsMap() { 150 final Map<TableName, SpaceViolationPolicyEnforcement> enforcements = 151 copyActiveEnforcements(); 152 final Map<TableName, SpaceQuotaSnapshot> policies = new HashMap<>(); 153 for (Entry<TableName, SpaceViolationPolicyEnforcement> entry : enforcements.entrySet()) { 154 final SpaceQuotaSnapshot snapshot = entry.getValue().getQuotaSnapshot(); 155 if (snapshot != null) { 156 policies.put(entry.getKey(), snapshot); 157 } 158 } 159 return policies; 160 } 161 162 /** 163 * Enforces the given violationPolicy on the given table in this RegionServer. 164 */ 165 public void enforceViolationPolicy(TableName tableName, SpaceQuotaSnapshot snapshot) { 166 SpaceQuotaStatus status = snapshot.getQuotaStatus(); 167 if (!status.isInViolation()) { 168 throw new IllegalStateException( 169 tableName + " is not in violation. Violation policy should not be enabled."); 170 } 171 if (LOG.isTraceEnabled()) { 172 LOG.trace( 173 "Enabling violation policy enforcement on " + tableName 174 + " with policy " + status.getPolicy()); 175 } 176 // Construct this outside of the lock 177 final SpaceViolationPolicyEnforcement enforcement = getFactory().create( 178 getRegionServerServices(), tableName, snapshot); 179 // "Enables" the policy 180 // HBASE-XXXX: Should this synchronize on the actual table name instead of the map? That would 181 // allow policy enable/disable on different tables to happen concurrently. As written now, only 182 // one table will be allowed to transition at a time. This is probably OK, but not sure if 183 // it would become a bottleneck at large clusters/number of tables. 184 synchronized (enforcedPolicies) { 185 try { 186 enforcement.enable(); 187 } catch (IOException e) { 188 LOG.error("Failed to enable space violation policy for " + tableName 189 + ". This table will not enter violation.", e); 190 return; 191 } 192 enforcedPolicies.put(tableName, enforcement); 193 } 194 } 195 196 /** 197 * Disables enforcement on any violation policy on the given <code>tableName</code>. 198 */ 199 public void disableViolationPolicyEnforcement(TableName tableName) { 200 if (LOG.isTraceEnabled()) { 201 LOG.trace("Disabling violation policy enforcement on " + tableName); 202 } 203 // "Disables" the policy 204 synchronized (enforcedPolicies) { 205 SpaceViolationPolicyEnforcement enforcement = enforcedPolicies.remove(tableName); 206 if (enforcement != null) { 207 try { 208 enforcement.disable(); 209 } catch (IOException e) { 210 LOG.error("Failed to disable space violation policy for " + tableName 211 + ". This table will remain in violation.", e); 212 enforcedPolicies.put(tableName, enforcement); 213 } 214 } 215 } 216 } 217 218 /** 219 * Returns whether or not compactions should be disabled for the given <code>tableName</code> per 220 * a space quota violation policy. A convenience method. 221 * 222 * @param tableName The table to check 223 * @return True if compactions should be disabled for the table, false otherwise. 224 */ 225 public boolean areCompactionsDisabled(TableName tableName) { 226 SpaceViolationPolicyEnforcement enforcement = this.enforcedPolicies.get(Objects.requireNonNull(tableName)); 227 if (enforcement != null) { 228 return enforcement.areCompactionsDisabled(); 229 } 230 return false; 231 } 232 233 /** 234 * Returns the {@link RegionSizeStore} tracking filesystem utilization by each region. 235 * 236 * @return A {@link RegionSizeStore} implementation. 237 */ 238 public RegionSizeStore getRegionSizeStore() { 239 return regionSizeStore; 240 } 241 242 /** 243 * Builds the protobuf message to inform the Master of files being archived. 244 * 245 * @param tn The table the files previously belonged to. 246 * @param archivedFiles The files and their size in bytes that were archived. 247 * @return The protobuf representation 248 */ 249 public RegionServerStatusProtos.FileArchiveNotificationRequest buildFileArchiveRequest( 250 TableName tn, Collection<Entry<String,Long>> archivedFiles) { 251 RegionServerStatusProtos.FileArchiveNotificationRequest.Builder builder = 252 RegionServerStatusProtos.FileArchiveNotificationRequest.newBuilder(); 253 HBaseProtos.TableName protoTn = ProtobufUtil.toProtoTableName(tn); 254 for (Entry<String,Long> archivedFile : archivedFiles) { 255 RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize fws = 256 RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize.newBuilder() 257 .setName(archivedFile.getKey()) 258 .setSize(archivedFile.getValue()) 259 .setTableName(protoTn) 260 .build(); 261 builder.addArchivedFiles(fws); 262 } 263 final RegionServerStatusProtos.FileArchiveNotificationRequest request = builder.build(); 264 if (LOG.isTraceEnabled()) { 265 LOG.trace("Reporting file archival to Master: " + TextFormat.shortDebugString(request)); 266 } 267 return request; 268 } 269 270 /** 271 * Returns the collection of tables which have quota violation policies enforced on 272 * this RegionServer. 273 */ 274 Map<TableName,SpaceViolationPolicyEnforcement> copyActiveEnforcements() { 275 // Allows reads to happen concurrently (or while the map is being updated) 276 return new HashMap<>(this.enforcedPolicies); 277 } 278 279 RegionServerServices getRegionServerServices() { 280 return rsServices; 281 } 282 283 Connection getConnection() { 284 return rsServices.getConnection(); 285 } 286 287 SpaceViolationPolicyEnforcementFactory getFactory() { 288 return factory; 289 } 290}