001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.Collection; 022import java.util.HashMap; 023import java.util.Map; 024import java.util.Map.Entry; 025import java.util.Objects; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.atomic.AtomicReference; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.Connection; 030import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 031import org.apache.hadoop.hbase.regionserver.RegionServerServices; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 037 038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 041 042/** 043 * A manager for filesystem space quotas in the RegionServer. This class is the centralized point 044 * for what a RegionServer knows about space quotas on tables. For each table, it tracks two 045 * different things: the {@link SpaceQuotaSnapshot} and a {@link SpaceViolationPolicyEnforcement} 046 * (which may be null when a quota is not being violated). Both of these are sensitive on when they 047 * were last updated. The {link SpaceQutoaViolationPolicyRefresherChore} periodically runs and 048 * updates the state on <code>this</code>. 049 */ 050@InterfaceAudience.Private 051public class RegionServerSpaceQuotaManager { 052 private static final Logger LOG = LoggerFactory.getLogger(RegionServerSpaceQuotaManager.class); 053 054 private final RegionServerServices rsServices; 055 056 private SpaceQuotaRefresherChore spaceQuotaRefresher; 057 private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots; 058 private boolean started = false; 059 private final ConcurrentHashMap<TableName, SpaceViolationPolicyEnforcement> enforcedPolicies; 060 private SpaceViolationPolicyEnforcementFactory factory; 061 private RegionSizeStore regionSizeStore; 062 private RegionSizeReportingChore regionSizeReporter; 063 064 public RegionServerSpaceQuotaManager(RegionServerServices rsServices) { 065 this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance()); 066 } 067 068 RegionServerSpaceQuotaManager(RegionServerServices rsServices, 069 SpaceViolationPolicyEnforcementFactory factory) { 070 this.rsServices = Objects.requireNonNull(rsServices); 071 this.factory = factory; 072 this.enforcedPolicies = new ConcurrentHashMap<>(); 073 this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>()); 074 // Initialize the size store to not track anything -- create the real one if we're start()'ed 075 this.regionSizeStore = NoOpRegionSizeStore.getInstance(); 076 } 077 078 public synchronized void start() throws IOException { 079 if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { 080 LOG.info("Quota support disabled, not starting space quota manager."); 081 return; 082 } 083 084 if (started) { 085 LOG.warn("RegionServerSpaceQuotaManager has already been started!"); 086 return; 087 } 088 // Start the chores 089 this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getConnection()); 090 rsServices.getChoreService().scheduleChore(spaceQuotaRefresher); 091 this.regionSizeReporter = new RegionSizeReportingChore(rsServices); 092 rsServices.getChoreService().scheduleChore(regionSizeReporter); 093 // Instantiate the real RegionSizeStore 094 this.regionSizeStore = RegionSizeStoreFactory.getInstance().createStore(); 095 started = true; 096 } 097 098 public synchronized void stop() { 099 if (spaceQuotaRefresher != null) { 100 spaceQuotaRefresher.shutdown(); 101 spaceQuotaRefresher = null; 102 } 103 if (regionSizeReporter != null) { 104 regionSizeReporter.shutdown(); 105 regionSizeReporter = null; 106 } 107 started = false; 108 } 109 110 /** 111 * @return if the {@code Chore} has been started. 112 */ 113 public boolean isStarted() { 114 return started; 115 } 116 117 /** 118 * Copies the last {@link SpaceQuotaSnapshot}s that were recorded. The current view of what the 119 * RegionServer thinks the table's utilization is. 120 */ 121 public Map<TableName, SpaceQuotaSnapshot> copyQuotaSnapshots() { 122 return new HashMap<>(currentQuotaSnapshots.get()); 123 } 124 125 /** 126 * Updates the current {@link SpaceQuotaSnapshot}s for the RegionServer. 127 * @param newSnapshots The space quota snapshots. 128 */ 129 public void updateQuotaSnapshot(Map<TableName, SpaceQuotaSnapshot> newSnapshots) { 130 currentQuotaSnapshots.set(Objects.requireNonNull(newSnapshots)); 131 } 132 133 /** 134 * Creates an object well-suited for the RegionServer to use in verifying active policies. 135 */ 136 public ActivePolicyEnforcement getActiveEnforcements() { 137 return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices); 138 } 139 140 /** 141 * Converts a map of table to {@link SpaceViolationPolicyEnforcement}s into 142 * {@link SpaceViolationPolicy}s. 143 */ 144 public Map<TableName, SpaceQuotaSnapshot> getActivePoliciesAsMap() { 145 final Map<TableName, SpaceViolationPolicyEnforcement> enforcements = copyActiveEnforcements(); 146 final Map<TableName, SpaceQuotaSnapshot> policies = new HashMap<>(); 147 for (Entry<TableName, SpaceViolationPolicyEnforcement> entry : enforcements.entrySet()) { 148 final SpaceQuotaSnapshot snapshot = entry.getValue().getQuotaSnapshot(); 149 if (snapshot != null) { 150 policies.put(entry.getKey(), snapshot); 151 } 152 } 153 return policies; 154 } 155 156 /** 157 * Enforces the given violationPolicy on the given table in this RegionServer. 158 */ 159 public void enforceViolationPolicy(TableName tableName, SpaceQuotaSnapshot snapshot) { 160 SpaceQuotaStatus status = snapshot.getQuotaStatus(); 161 if (!status.isInViolation()) { 162 throw new IllegalStateException( 163 tableName + " is not in violation. Violation policy should not be enabled."); 164 } 165 if (LOG.isTraceEnabled()) { 166 LOG.trace("Enabling violation policy enforcement on " + tableName + " with policy " 167 + status.getPolicy()); 168 } 169 // Construct this outside of the lock 170 final SpaceViolationPolicyEnforcement enforcement = 171 getFactory().create(getRegionServerServices(), tableName, snapshot); 172 // "Enables" the policy 173 // HBASE-XXXX: Should this synchronize on the actual table name instead of the map? That would 174 // allow policy enable/disable on different tables to happen concurrently. As written now, only 175 // one table will be allowed to transition at a time. This is probably OK, but not sure if 176 // it would become a bottleneck at large clusters/number of tables. 177 synchronized (enforcedPolicies) { 178 try { 179 enforcement.enable(); 180 } catch (IOException e) { 181 LOG.error("Failed to enable space violation policy for " + tableName 182 + ". This table will not enter violation.", e); 183 return; 184 } 185 enforcedPolicies.put(tableName, enforcement); 186 } 187 } 188 189 /** 190 * Disables enforcement on any violation policy on the given <code>tableName</code>. 191 */ 192 public void disableViolationPolicyEnforcement(TableName tableName) { 193 if (LOG.isTraceEnabled()) { 194 LOG.trace("Disabling violation policy enforcement on " + tableName); 195 } 196 // "Disables" the policy 197 synchronized (enforcedPolicies) { 198 SpaceViolationPolicyEnforcement enforcement = enforcedPolicies.remove(tableName); 199 if (enforcement != null) { 200 try { 201 enforcement.disable(); 202 } catch (IOException e) { 203 LOG.error("Failed to disable space violation policy for " + tableName 204 + ". This table will remain in violation.", e); 205 enforcedPolicies.put(tableName, enforcement); 206 } 207 } 208 } 209 } 210 211 /** 212 * Returns whether or not compactions should be disabled for the given <code>tableName</code> per 213 * a space quota violation policy. A convenience method. 214 * @param tableName The table to check 215 * @return True if compactions should be disabled for the table, false otherwise. 216 */ 217 public boolean areCompactionsDisabled(TableName tableName) { 218 SpaceViolationPolicyEnforcement enforcement = 219 this.enforcedPolicies.get(Objects.requireNonNull(tableName)); 220 if (enforcement != null) { 221 return enforcement.areCompactionsDisabled(); 222 } 223 return false; 224 } 225 226 /** 227 * Returns the {@link RegionSizeStore} tracking filesystem utilization by each region. 228 * @return A {@link RegionSizeStore} implementation. 229 */ 230 public RegionSizeStore getRegionSizeStore() { 231 return regionSizeStore; 232 } 233 234 /** 235 * Builds the protobuf message to inform the Master of files being archived. 236 * @param tn The table the files previously belonged to. 237 * @param archivedFiles The files and their size in bytes that were archived. 238 * @return The protobuf representation 239 */ 240 public RegionServerStatusProtos.FileArchiveNotificationRequest 241 buildFileArchiveRequest(TableName tn, Collection<Entry<String, Long>> archivedFiles) { 242 RegionServerStatusProtos.FileArchiveNotificationRequest.Builder builder = 243 RegionServerStatusProtos.FileArchiveNotificationRequest.newBuilder(); 244 HBaseProtos.TableName protoTn = ProtobufUtil.toProtoTableName(tn); 245 for (Entry<String, Long> archivedFile : archivedFiles) { 246 RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize fws = 247 RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize.newBuilder() 248 .setName(archivedFile.getKey()).setSize(archivedFile.getValue()).setTableName(protoTn) 249 .build(); 250 builder.addArchivedFiles(fws); 251 } 252 final RegionServerStatusProtos.FileArchiveNotificationRequest request = builder.build(); 253 if (LOG.isTraceEnabled()) { 254 LOG.trace("Reporting file archival to Master: " + TextFormat.shortDebugString(request)); 255 } 256 return request; 257 } 258 259 /** 260 * Returns the collection of tables which have quota violation policies enforced on this 261 * RegionServer. 262 */ 263 Map<TableName, SpaceViolationPolicyEnforcement> copyActiveEnforcements() { 264 // Allows reads to happen concurrently (or while the map is being updated) 265 return new HashMap<>(this.enforcedPolicies); 266 } 267 268 RegionServerServices getRegionServerServices() { 269 return rsServices; 270 } 271 272 Connection getConnection() { 273 return rsServices.getConnection(); 274 } 275 276 SpaceViolationPolicyEnforcementFactory getFactory() { 277 return factory; 278 } 279}