001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.Collection; 022import java.util.HashMap; 023import java.util.Map; 024import java.util.Map.Entry; 025import java.util.Objects; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.atomic.AtomicReference; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.Connection; 030import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 031import org.apache.hadoop.hbase.regionserver.RegionServerServices; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 037 038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 041 042/** 043 * A manager for filesystem space quotas in the RegionServer. This class is the centralized point 044 * for what a RegionServer knows about space quotas on tables. For each table, it tracks two 045 * different things: the {@link SpaceQuotaSnapshot} and a {@link SpaceViolationPolicyEnforcement} 046 * (which may be null when a quota is not being violated). Both of these are sensitive on when they 047 * were last updated. The {link SpaceQutoaViolationPolicyRefresherChore} periodically runs and 048 * updates the state on <code>this</code>. 049 */ 050@InterfaceAudience.Private 051public class RegionServerSpaceQuotaManager { 052 private static final Logger LOG = LoggerFactory.getLogger(RegionServerSpaceQuotaManager.class); 053 054 private final RegionServerServices rsServices; 055 056 private SpaceQuotaRefresherChore spaceQuotaRefresher; 057 private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots; 058 private boolean started = false; 059 private final ConcurrentHashMap<TableName, SpaceViolationPolicyEnforcement> enforcedPolicies; 060 private SpaceViolationPolicyEnforcementFactory factory; 061 private RegionSizeStore regionSizeStore; 062 private RegionSizeReportingChore regionSizeReporter; 063 064 public RegionServerSpaceQuotaManager(RegionServerServices rsServices) { 065 this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance()); 066 } 067 068 RegionServerSpaceQuotaManager(RegionServerServices rsServices, 069 SpaceViolationPolicyEnforcementFactory factory) { 070 this.rsServices = Objects.requireNonNull(rsServices); 071 this.factory = factory; 072 this.enforcedPolicies = new ConcurrentHashMap<>(); 073 this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>()); 074 // Initialize the size store to not track anything -- create the real one if we're start()'ed 075 this.regionSizeStore = NoOpRegionSizeStore.getInstance(); 076 } 077 078 public synchronized void start() throws IOException { 079 if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { 080 LOG.info("Quota support disabled, not starting space quota manager."); 081 return; 082 } 083 084 if (started) { 085 LOG.warn("RegionServerSpaceQuotaManager has already been started!"); 086 return; 087 } 088 // Start the chores 089 this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getConnection()); 090 rsServices.getChoreService().scheduleChore(spaceQuotaRefresher); 091 this.regionSizeReporter = new RegionSizeReportingChore(rsServices); 092 rsServices.getChoreService().scheduleChore(regionSizeReporter); 093 // Instantiate the real RegionSizeStore 094 this.regionSizeStore = RegionSizeStoreFactory.getInstance().createStore(); 095 started = true; 096 } 097 098 public synchronized void stop() { 099 if (spaceQuotaRefresher != null) { 100 spaceQuotaRefresher.shutdown(); 101 spaceQuotaRefresher = null; 102 } 103 if (regionSizeReporter != null) { 104 regionSizeReporter.shutdown(); 105 regionSizeReporter = null; 106 } 107 started = false; 108 } 109 110 /** Returns if the {@code Chore} has been started. */ 111 public boolean isStarted() { 112 return started; 113 } 114 115 /** 116 * Copies the last {@link SpaceQuotaSnapshot}s that were recorded. The current view of what the 117 * RegionServer thinks the table's utilization is. 118 */ 119 public Map<TableName, SpaceQuotaSnapshot> copyQuotaSnapshots() { 120 return new HashMap<>(currentQuotaSnapshots.get()); 121 } 122 123 /** 124 * Updates the current {@link SpaceQuotaSnapshot}s for the RegionServer. 125 * @param newSnapshots The space quota snapshots. 126 */ 127 public void updateQuotaSnapshot(Map<TableName, SpaceQuotaSnapshot> newSnapshots) { 128 currentQuotaSnapshots.set(Objects.requireNonNull(newSnapshots)); 129 } 130 131 /** 132 * Creates an object well-suited for the RegionServer to use in verifying active policies. 133 */ 134 public ActivePolicyEnforcement getActiveEnforcements() { 135 return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices); 136 } 137 138 /** 139 * Converts a map of table to {@link SpaceViolationPolicyEnforcement}s into 140 * {@link SpaceViolationPolicy}s. 141 */ 142 public Map<TableName, SpaceQuotaSnapshot> getActivePoliciesAsMap() { 143 final Map<TableName, SpaceViolationPolicyEnforcement> enforcements = copyActiveEnforcements(); 144 final Map<TableName, SpaceQuotaSnapshot> policies = new HashMap<>(); 145 for (Entry<TableName, SpaceViolationPolicyEnforcement> entry : enforcements.entrySet()) { 146 final SpaceQuotaSnapshot snapshot = entry.getValue().getQuotaSnapshot(); 147 if (snapshot != null) { 148 policies.put(entry.getKey(), snapshot); 149 } 150 } 151 return policies; 152 } 153 154 /** 155 * Enforces the given violationPolicy on the given table in this RegionServer. 156 */ 157 public void enforceViolationPolicy(TableName tableName, SpaceQuotaSnapshot snapshot) { 158 SpaceQuotaStatus status = snapshot.getQuotaStatus(); 159 if (!status.isInViolation()) { 160 throw new IllegalStateException( 161 tableName + " is not in violation. Violation policy should not be enabled."); 162 } 163 if (LOG.isTraceEnabled()) { 164 LOG.trace("Enabling violation policy enforcement on " + tableName + " with policy " 165 + status.getPolicy()); 166 } 167 // Construct this outside of the lock 168 final SpaceViolationPolicyEnforcement enforcement = 169 getFactory().create(getRegionServerServices(), tableName, snapshot); 170 // "Enables" the policy 171 // HBASE-XXXX: Should this synchronize on the actual table name instead of the map? That would 172 // allow policy enable/disable on different tables to happen concurrently. As written now, only 173 // one table will be allowed to transition at a time. This is probably OK, but not sure if 174 // it would become a bottleneck at large clusters/number of tables. 175 synchronized (enforcedPolicies) { 176 try { 177 enforcement.enable(); 178 } catch (IOException e) { 179 LOG.error("Failed to enable space violation policy for " + tableName 180 + ". This table will not enter violation.", e); 181 return; 182 } 183 enforcedPolicies.put(tableName, enforcement); 184 } 185 } 186 187 /** 188 * Disables enforcement on any violation policy on the given <code>tableName</code>. 189 */ 190 public void disableViolationPolicyEnforcement(TableName tableName) { 191 if (LOG.isTraceEnabled()) { 192 LOG.trace("Disabling violation policy enforcement on " + tableName); 193 } 194 // "Disables" the policy 195 synchronized (enforcedPolicies) { 196 SpaceViolationPolicyEnforcement enforcement = enforcedPolicies.remove(tableName); 197 if (enforcement != null) { 198 try { 199 enforcement.disable(); 200 } catch (IOException e) { 201 LOG.error("Failed to disable space violation policy for " + tableName 202 + ". This table will remain in violation.", e); 203 enforcedPolicies.put(tableName, enforcement); 204 } 205 } 206 } 207 } 208 209 /** 210 * Returns whether or not compactions should be disabled for the given <code>tableName</code> per 211 * a space quota violation policy. A convenience method. 212 * @param tableName The table to check 213 * @return True if compactions should be disabled for the table, false otherwise. 214 */ 215 public boolean areCompactionsDisabled(TableName tableName) { 216 SpaceViolationPolicyEnforcement enforcement = 217 this.enforcedPolicies.get(Objects.requireNonNull(tableName)); 218 if (enforcement != null) { 219 return enforcement.areCompactionsDisabled(); 220 } 221 return false; 222 } 223 224 /** 225 * Returns the {@link RegionSizeStore} tracking filesystem utilization by each region. 226 * @return A {@link RegionSizeStore} implementation. 227 */ 228 public RegionSizeStore getRegionSizeStore() { 229 return regionSizeStore; 230 } 231 232 /** 233 * Builds the protobuf message to inform the Master of files being archived. 234 * @param tn The table the files previously belonged to. 235 * @param archivedFiles The files and their size in bytes that were archived. 236 * @return The protobuf representation 237 */ 238 public RegionServerStatusProtos.FileArchiveNotificationRequest 239 buildFileArchiveRequest(TableName tn, Collection<Entry<String, Long>> archivedFiles) { 240 RegionServerStatusProtos.FileArchiveNotificationRequest.Builder builder = 241 RegionServerStatusProtos.FileArchiveNotificationRequest.newBuilder(); 242 HBaseProtos.TableName protoTn = ProtobufUtil.toProtoTableName(tn); 243 for (Entry<String, Long> archivedFile : archivedFiles) { 244 RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize fws = 245 RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize.newBuilder() 246 .setName(archivedFile.getKey()).setSize(archivedFile.getValue()).setTableName(protoTn) 247 .build(); 248 builder.addArchivedFiles(fws); 249 } 250 final RegionServerStatusProtos.FileArchiveNotificationRequest request = builder.build(); 251 if (LOG.isTraceEnabled()) { 252 LOG.trace("Reporting file archival to Master: " + TextFormat.shortDebugString(request)); 253 } 254 return request; 255 } 256 257 /** 258 * Returns the collection of tables which have quota violation policies enforced on this 259 * RegionServer. 260 */ 261 Map<TableName, SpaceViolationPolicyEnforcement> copyActiveEnforcements() { 262 // Allows reads to happen concurrently (or while the map is being updated) 263 return new HashMap<>(this.enforcedPolicies); 264 } 265 266 RegionServerServices getRegionServerServices() { 267 return rsServices; 268 } 269 270 Connection getConnection() { 271 return rsServices.getConnection(); 272 } 273 274 SpaceViolationPolicyEnforcementFactory getFactory() { 275 return factory; 276 } 277}