001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.Collection; 022import java.util.HashMap; 023import java.util.Map; 024import java.util.Map.Entry; 025import java.util.Objects; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.atomic.AtomicReference; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.Connection; 030import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 031import org.apache.hadoop.hbase.regionserver.RegionServerServices; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 037 038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 041 042/** 043 * A manager for filesystem space quotas in the RegionServer. This class is the centralized point 044 * for what a RegionServer knows about space quotas on tables. For each table, it tracks two 045 * different things: the {@link SpaceQuotaSnapshot} and a {@link SpaceViolationPolicyEnforcement} 046 * (which may be null when a quota is not being violated). Both of these are sensitive on when they 047 * were last updated. The {link SpaceQutoaViolationPolicyRefresherChore} periodically runs and 048 * updates the state on <code>this</code>. 049 */ 050@InterfaceAudience.Private 051public class RegionServerSpaceQuotaManager { 052 private static final Logger LOG = LoggerFactory.getLogger(RegionServerSpaceQuotaManager.class); 053 054 private final RegionServerServices rsServices; 055 056 private SpaceQuotaRefresherChore spaceQuotaRefresher; 057 private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots; 058 private boolean started = false; 059 private final ConcurrentHashMap<TableName, SpaceViolationPolicyEnforcement> enforcedPolicies; 060 private SpaceViolationPolicyEnforcementFactory factory; 061 private RegionSizeStore regionSizeStore; 062 private RegionSizeReportingChore regionSizeReporter; 063 064 public RegionServerSpaceQuotaManager(RegionServerServices rsServices) { 065 this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance()); 066 } 067 068 RegionServerSpaceQuotaManager(RegionServerServices rsServices, 069 SpaceViolationPolicyEnforcementFactory factory) { 070 this.rsServices = Objects.requireNonNull(rsServices); 071 this.factory = factory; 072 this.enforcedPolicies = new ConcurrentHashMap<>(); 073 this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>()); 074 // Initialize the size store to not track anything -- create the real one if we're start()'ed 075 this.regionSizeStore = NoOpRegionSizeStore.getInstance(); 076 } 077 078 public synchronized void start() throws IOException { 079 if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { 080 LOG.info("Quota support disabled, not starting space quota manager."); 081 return; 082 } 083 084 if (started) { 085 LOG.warn("RegionServerSpaceQuotaManager has already been started!"); 086 return; 087 } 088 // Start the chores 089 this.spaceQuotaRefresher = 090 new SpaceQuotaRefresherChore(this, rsServices.getClusterConnection()); 091 rsServices.getChoreService().scheduleChore(spaceQuotaRefresher); 092 this.regionSizeReporter = new RegionSizeReportingChore(rsServices); 093 rsServices.getChoreService().scheduleChore(regionSizeReporter); 094 // Instantiate the real RegionSizeStore 095 this.regionSizeStore = RegionSizeStoreFactory.getInstance().createStore(); 096 started = true; 097 } 098 099 public synchronized void stop() { 100 if (spaceQuotaRefresher != null) { 101 spaceQuotaRefresher.shutdown(); 102 spaceQuotaRefresher = null; 103 } 104 if (regionSizeReporter != null) { 105 regionSizeReporter.shutdown(); 106 regionSizeReporter = null; 107 } 108 started = false; 109 } 110 111 /** Returns if the {@code Chore} has been started. */ 112 public boolean isStarted() { 113 return started; 114 } 115 116 /** 117 * Copies the last {@link SpaceQuotaSnapshot}s that were recorded. The current view of what the 118 * RegionServer thinks the table's utilization is. 119 */ 120 public Map<TableName, SpaceQuotaSnapshot> copyQuotaSnapshots() { 121 return new HashMap<>(currentQuotaSnapshots.get()); 122 } 123 124 /** 125 * Updates the current {@link SpaceQuotaSnapshot}s for the RegionServer. 126 * @param newSnapshots The space quota snapshots. 127 */ 128 public void updateQuotaSnapshot(Map<TableName, SpaceQuotaSnapshot> newSnapshots) { 129 currentQuotaSnapshots.set(Objects.requireNonNull(newSnapshots)); 130 } 131 132 /** 133 * Creates an object well-suited for the RegionServer to use in verifying active policies. 134 */ 135 public ActivePolicyEnforcement getActiveEnforcements() { 136 return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices); 137 } 138 139 /** 140 * Converts a map of table to {@link SpaceViolationPolicyEnforcement}s into 141 * {@link SpaceViolationPolicy}s. 142 */ 143 public Map<TableName, SpaceQuotaSnapshot> getActivePoliciesAsMap() { 144 final Map<TableName, SpaceViolationPolicyEnforcement> enforcements = copyActiveEnforcements(); 145 final Map<TableName, SpaceQuotaSnapshot> policies = new HashMap<>(); 146 for (Entry<TableName, SpaceViolationPolicyEnforcement> entry : enforcements.entrySet()) { 147 final SpaceQuotaSnapshot snapshot = entry.getValue().getQuotaSnapshot(); 148 if (snapshot != null) { 149 policies.put(entry.getKey(), snapshot); 150 } 151 } 152 return policies; 153 } 154 155 /** 156 * Enforces the given violationPolicy on the given table in this RegionServer. 157 */ 158 public void enforceViolationPolicy(TableName tableName, SpaceQuotaSnapshot snapshot) { 159 SpaceQuotaStatus status = snapshot.getQuotaStatus(); 160 if (!status.isInViolation()) { 161 throw new IllegalStateException( 162 tableName + " is not in violation. Violation policy should not be enabled."); 163 } 164 if (LOG.isTraceEnabled()) { 165 LOG.trace("Enabling violation policy enforcement on " + tableName + " with policy " 166 + status.getPolicy()); 167 } 168 // Construct this outside of the lock 169 final SpaceViolationPolicyEnforcement enforcement = 170 getFactory().create(getRegionServerServices(), tableName, snapshot); 171 // "Enables" the policy 172 // HBASE-XXXX: Should this synchronize on the actual table name instead of the map? That would 173 // allow policy enable/disable on different tables to happen concurrently. As written now, only 174 // one table will be allowed to transition at a time. This is probably OK, but not sure if 175 // it would become a bottleneck at large clusters/number of tables. 176 synchronized (enforcedPolicies) { 177 try { 178 enforcement.enable(); 179 } catch (IOException e) { 180 LOG.error("Failed to enable space violation policy for " + tableName 181 + ". This table will not enter violation.", e); 182 return; 183 } 184 enforcedPolicies.put(tableName, enforcement); 185 } 186 } 187 188 /** 189 * Disables enforcement on any violation policy on the given <code>tableName</code>. 190 */ 191 public void disableViolationPolicyEnforcement(TableName tableName) { 192 if (LOG.isTraceEnabled()) { 193 LOG.trace("Disabling violation policy enforcement on " + tableName); 194 } 195 // "Disables" the policy 196 synchronized (enforcedPolicies) { 197 SpaceViolationPolicyEnforcement enforcement = enforcedPolicies.remove(tableName); 198 if (enforcement != null) { 199 try { 200 enforcement.disable(); 201 } catch (IOException e) { 202 LOG.error("Failed to disable space violation policy for " + tableName 203 + ". This table will remain in violation.", e); 204 enforcedPolicies.put(tableName, enforcement); 205 } 206 } 207 } 208 } 209 210 /** 211 * Returns whether or not compactions should be disabled for the given <code>tableName</code> per 212 * a space quota violation policy. A convenience method. 213 * @param tableName The table to check 214 * @return True if compactions should be disabled for the table, false otherwise. 215 */ 216 public boolean areCompactionsDisabled(TableName tableName) { 217 SpaceViolationPolicyEnforcement enforcement = 218 this.enforcedPolicies.get(Objects.requireNonNull(tableName)); 219 if (enforcement != null) { 220 return enforcement.areCompactionsDisabled(); 221 } 222 return false; 223 } 224 225 /** 226 * Returns the {@link RegionSizeStore} tracking filesystem utilization by each region. 227 * @return A {@link RegionSizeStore} implementation. 228 */ 229 public RegionSizeStore getRegionSizeStore() { 230 return regionSizeStore; 231 } 232 233 /** 234 * Builds the protobuf message to inform the Master of files being archived. 235 * @param tn The table the files previously belonged to. 236 * @param archivedFiles The files and their size in bytes that were archived. 237 * @return The protobuf representation 238 */ 239 public RegionServerStatusProtos.FileArchiveNotificationRequest 240 buildFileArchiveRequest(TableName tn, Collection<Entry<String, Long>> archivedFiles) { 241 RegionServerStatusProtos.FileArchiveNotificationRequest.Builder builder = 242 RegionServerStatusProtos.FileArchiveNotificationRequest.newBuilder(); 243 HBaseProtos.TableName protoTn = ProtobufUtil.toProtoTableName(tn); 244 for (Entry<String, Long> archivedFile : archivedFiles) { 245 RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize fws = 246 RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize.newBuilder() 247 .setName(archivedFile.getKey()).setSize(archivedFile.getValue()).setTableName(protoTn) 248 .build(); 249 builder.addArchivedFiles(fws); 250 } 251 final RegionServerStatusProtos.FileArchiveNotificationRequest request = builder.build(); 252 if (LOG.isTraceEnabled()) { 253 LOG.trace("Reporting file archival to Master: " + TextFormat.shortDebugString(request)); 254 } 255 return request; 256 } 257 258 /** 259 * Returns the collection of tables which have quota violation policies enforced on this 260 * RegionServer. 261 */ 262 Map<TableName, SpaceViolationPolicyEnforcement> copyActiveEnforcements() { 263 // Allows reads to happen concurrently (or while the map is being updated) 264 return new HashMap<>(this.enforcedPolicies); 265 } 266 267 RegionServerServices getRegionServerServices() { 268 return rsServices; 269 } 270 271 Connection getConnection() { 272 return rsServices.getConnection(); 273 } 274 275 SpaceViolationPolicyEnforcementFactory getFactory() { 276 return factory; 277 } 278}