001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to you under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.hadoop.hbase.quotas; 018 019import java.io.IOException; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.Map; 023import java.util.Objects; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.atomic.AtomicReference; 026import java.util.Map.Entry; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 030import org.apache.hadoop.hbase.regionserver.RegionServerServices; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 036 037import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 040 041/** 042 * A manager for filesystem space quotas in the RegionServer. 043 * 044 * This class is the centralized point for what a RegionServer knows about space quotas 045 * on tables. For each table, it tracks two different things: the {@link SpaceQuotaSnapshot} 046 * and a {@link SpaceViolationPolicyEnforcement} (which may be null when a quota is not 047 * being violated). Both of these are sensitive on when they were last updated. The 048 * {link SpaceQutoaViolationPolicyRefresherChore} periodically runs and updates 049 * the state on <code>this</code>. 050 */ 051@InterfaceAudience.Private 052public class RegionServerSpaceQuotaManager { 053 private static final Logger LOG = LoggerFactory.getLogger(RegionServerSpaceQuotaManager.class); 054 055 private final RegionServerServices rsServices; 056 057 private SpaceQuotaRefresherChore spaceQuotaRefresher; 058 private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots; 059 private boolean started = false; 060 private final ConcurrentHashMap<TableName,SpaceViolationPolicyEnforcement> enforcedPolicies; 061 private SpaceViolationPolicyEnforcementFactory factory; 062 private RegionSizeStore regionSizeStore; 063 private RegionSizeReportingChore regionSizeReporter; 064 065 public RegionServerSpaceQuotaManager(RegionServerServices rsServices) { 066 this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance()); 067 } 068 069 RegionServerSpaceQuotaManager( 070 RegionServerServices rsServices, SpaceViolationPolicyEnforcementFactory factory) { 071 this.rsServices = Objects.requireNonNull(rsServices); 072 this.factory = factory; 073 this.enforcedPolicies = new ConcurrentHashMap<>(); 074 this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>()); 075 // Initialize the size store to not track anything -- create the real one if we're start()'ed 076 this.regionSizeStore = NoOpRegionSizeStore.getInstance(); 077 } 078 079 public synchronized void start() throws IOException { 080 if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { 081 LOG.info("Quota support disabled, not starting space quota manager."); 082 return; 083 } 084 085 if (started) { 086 LOG.warn("RegionServerSpaceQuotaManager has already been started!"); 087 return; 088 } 089 // Start the chores 090 this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getClusterConnection()); 091 rsServices.getChoreService().scheduleChore(spaceQuotaRefresher); 092 this.regionSizeReporter = new RegionSizeReportingChore(rsServices); 093 rsServices.getChoreService().scheduleChore(regionSizeReporter); 094 // Instantiate the real RegionSizeStore 095 this.regionSizeStore = RegionSizeStoreFactory.getInstance().createStore(); 096 started = true; 097 } 098 099 public synchronized void stop() { 100 if (spaceQuotaRefresher != null) { 101 spaceQuotaRefresher.cancel(); 102 spaceQuotaRefresher = null; 103 } 104 if (regionSizeReporter != null) { 105 regionSizeReporter.cancel(); 106 regionSizeReporter = null; 107 } 108 started = false; 109 } 110 111 /** 112 * @return if the {@code Chore} has been started. 113 */ 114 public boolean isStarted() { 115 return started; 116 } 117 118 /** 119 * Copies the last {@link SpaceQuotaSnapshot}s that were recorded. The current view 120 * of what the RegionServer thinks the table's utilization is. 121 */ 122 public Map<TableName,SpaceQuotaSnapshot> copyQuotaSnapshots() { 123 return new HashMap<>(currentQuotaSnapshots.get()); 124 } 125 126 /** 127 * Updates the current {@link SpaceQuotaSnapshot}s for the RegionServer. 128 * 129 * @param newSnapshots The space quota snapshots. 130 */ 131 public void updateQuotaSnapshot(Map<TableName,SpaceQuotaSnapshot> newSnapshots) { 132 currentQuotaSnapshots.set(Objects.requireNonNull(newSnapshots)); 133 } 134 135 /** 136 * Creates an object well-suited for the RegionServer to use in verifying active policies. 137 */ 138 public ActivePolicyEnforcement getActiveEnforcements() { 139 return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices); 140 } 141 142 /** 143 * Converts a map of table to {@link SpaceViolationPolicyEnforcement}s into 144 * {@link SpaceViolationPolicy}s. 145 */ 146 public Map<TableName, SpaceQuotaSnapshot> getActivePoliciesAsMap() { 147 final Map<TableName, SpaceViolationPolicyEnforcement> enforcements = 148 copyActiveEnforcements(); 149 final Map<TableName, SpaceQuotaSnapshot> policies = new HashMap<>(); 150 for (Entry<TableName, SpaceViolationPolicyEnforcement> entry : enforcements.entrySet()) { 151 final SpaceQuotaSnapshot snapshot = entry.getValue().getQuotaSnapshot(); 152 if (snapshot != null) { 153 policies.put(entry.getKey(), snapshot); 154 } 155 } 156 return policies; 157 } 158 159 /** 160 * Enforces the given violationPolicy on the given table in this RegionServer. 161 */ 162 public void enforceViolationPolicy(TableName tableName, SpaceQuotaSnapshot snapshot) { 163 SpaceQuotaStatus status = snapshot.getQuotaStatus(); 164 if (!status.isInViolation()) { 165 throw new IllegalStateException( 166 tableName + " is not in violation. Violation policy should not be enabled."); 167 } 168 if (LOG.isTraceEnabled()) { 169 LOG.trace( 170 "Enabling violation policy enforcement on " + tableName 171 + " with policy " + status.getPolicy()); 172 } 173 // Construct this outside of the lock 174 final SpaceViolationPolicyEnforcement enforcement = getFactory().create( 175 getRegionServerServices(), tableName, snapshot); 176 // "Enables" the policy 177 // HBASE-XXXX: Should this synchronize on the actual table name instead of the map? That would 178 // allow policy enable/disable on different tables to happen concurrently. As written now, only 179 // one table will be allowed to transition at a time. This is probably OK, but not sure if 180 // it would become a bottleneck at large clusters/number of tables. 181 synchronized (enforcedPolicies) { 182 try { 183 enforcement.enable(); 184 } catch (IOException e) { 185 LOG.error("Failed to enable space violation policy for " + tableName 186 + ". This table will not enter violation.", e); 187 return; 188 } 189 enforcedPolicies.put(tableName, enforcement); 190 } 191 } 192 193 /** 194 * Disables enforcement on any violation policy on the given <code>tableName</code>. 195 */ 196 public void disableViolationPolicyEnforcement(TableName tableName) { 197 if (LOG.isTraceEnabled()) { 198 LOG.trace("Disabling violation policy enforcement on " + tableName); 199 } 200 // "Disables" the policy 201 synchronized (enforcedPolicies) { 202 SpaceViolationPolicyEnforcement enforcement = enforcedPolicies.remove(tableName); 203 if (enforcement != null) { 204 try { 205 enforcement.disable(); 206 } catch (IOException e) { 207 LOG.error("Failed to disable space violation policy for " + tableName 208 + ". This table will remain in violation.", e); 209 enforcedPolicies.put(tableName, enforcement); 210 } 211 } 212 } 213 } 214 215 /** 216 * Returns whether or not compactions should be disabled for the given <code>tableName</code> per 217 * a space quota violation policy. A convenience method. 218 * 219 * @param tableName The table to check 220 * @return True if compactions should be disabled for the table, false otherwise. 221 */ 222 public boolean areCompactionsDisabled(TableName tableName) { 223 SpaceViolationPolicyEnforcement enforcement = this.enforcedPolicies.get(Objects.requireNonNull(tableName)); 224 if (enforcement != null) { 225 return enforcement.areCompactionsDisabled(); 226 } 227 return false; 228 } 229 230 /** 231 * Returns the {@link RegionSizeStore} tracking filesystem utilization by each region. 232 * 233 * @return A {@link RegionSizeStore} implementation. 234 */ 235 public RegionSizeStore getRegionSizeStore() { 236 return regionSizeStore; 237 } 238 239 /** 240 * Builds the protobuf message to inform the Master of files being archived. 241 * 242 * @param tn The table the files previously belonged to. 243 * @param archivedFiles The files and their size in bytes that were archived. 244 * @return The protobuf representation 245 */ 246 public RegionServerStatusProtos.FileArchiveNotificationRequest buildFileArchiveRequest( 247 TableName tn, Collection<Entry<String,Long>> archivedFiles) { 248 RegionServerStatusProtos.FileArchiveNotificationRequest.Builder builder = 249 RegionServerStatusProtos.FileArchiveNotificationRequest.newBuilder(); 250 HBaseProtos.TableName protoTn = ProtobufUtil.toProtoTableName(tn); 251 for (Entry<String,Long> archivedFile : archivedFiles) { 252 RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize fws = 253 RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize.newBuilder() 254 .setName(archivedFile.getKey()) 255 .setSize(archivedFile.getValue()) 256 .setTableName(protoTn) 257 .build(); 258 builder.addArchivedFiles(fws); 259 } 260 final RegionServerStatusProtos.FileArchiveNotificationRequest request = builder.build(); 261 if (LOG.isTraceEnabled()) { 262 LOG.trace("Reporting file archival to Master: " + TextFormat.shortDebugString(request)); 263 } 264 return request; 265 } 266 267 /** 268 * Returns the collection of tables which have quota violation policies enforced on 269 * this RegionServer. 270 */ 271 Map<TableName,SpaceViolationPolicyEnforcement> copyActiveEnforcements() { 272 // Allows reads to happen concurrently (or while the map is being updated) 273 return new HashMap<>(this.enforcedPolicies); 274 } 275 276 RegionServerServices getRegionServerServices() { 277 return rsServices; 278 } 279 280 Connection getConnection() { 281 return rsServices.getConnection(); 282 } 283 284 SpaceViolationPolicyEnforcementFactory getFactory() { 285 return factory; 286 } 287}