001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.HashMap;
023import java.util.Map;
024import java.util.Map.Entry;
025import java.util.Objects;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.atomic.AtomicReference;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.Connection;
030import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
031import org.apache.hadoop.hbase.regionserver.RegionServerServices;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
037
038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
041
042/**
043 * A manager for filesystem space quotas in the RegionServer. This class is the centralized point
044 * for what a RegionServer knows about space quotas on tables. For each table, it tracks two
045 * different things: the {@link SpaceQuotaSnapshot} and a {@link SpaceViolationPolicyEnforcement}
046 * (which may be null when a quota is not being violated). Both of these are sensitive on when they
047 * were last updated. The {link SpaceQutoaViolationPolicyRefresherChore} periodically runs and
048 * updates the state on <code>this</code>.
049 */
050@InterfaceAudience.Private
051public class RegionServerSpaceQuotaManager {
052  private static final Logger LOG = LoggerFactory.getLogger(RegionServerSpaceQuotaManager.class);
053
054  private final RegionServerServices rsServices;
055
056  private SpaceQuotaRefresherChore spaceQuotaRefresher;
057  private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots;
058  private boolean started = false;
059  private final ConcurrentHashMap<TableName, SpaceViolationPolicyEnforcement> enforcedPolicies;
060  private SpaceViolationPolicyEnforcementFactory factory;
061  private RegionSizeStore regionSizeStore;
062  private RegionSizeReportingChore regionSizeReporter;
063
064  public RegionServerSpaceQuotaManager(RegionServerServices rsServices) {
065    this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance());
066  }
067
068  RegionServerSpaceQuotaManager(RegionServerServices rsServices,
069    SpaceViolationPolicyEnforcementFactory factory) {
070    this.rsServices = Objects.requireNonNull(rsServices);
071    this.factory = factory;
072    this.enforcedPolicies = new ConcurrentHashMap<>();
073    this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>());
074    // Initialize the size store to not track anything -- create the real one if we're start()'ed
075    this.regionSizeStore = NoOpRegionSizeStore.getInstance();
076  }
077
078  public synchronized void start() throws IOException {
079    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
080      LOG.info("Quota support disabled, not starting space quota manager.");
081      return;
082    }
083
084    if (started) {
085      LOG.warn("RegionServerSpaceQuotaManager has already been started!");
086      return;
087    }
088    // Start the chores
089    this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getConnection());
090    rsServices.getChoreService().scheduleChore(spaceQuotaRefresher);
091    this.regionSizeReporter = new RegionSizeReportingChore(rsServices);
092    rsServices.getChoreService().scheduleChore(regionSizeReporter);
093    // Instantiate the real RegionSizeStore
094    this.regionSizeStore = RegionSizeStoreFactory.getInstance().createStore();
095    started = true;
096  }
097
098  public synchronized void stop() {
099    if (spaceQuotaRefresher != null) {
100      spaceQuotaRefresher.shutdown();
101      spaceQuotaRefresher = null;
102    }
103    if (regionSizeReporter != null) {
104      regionSizeReporter.shutdown();
105      regionSizeReporter = null;
106    }
107    started = false;
108  }
109
110  /**
111   * @return if the {@code Chore} has been started.
112   */
113  public boolean isStarted() {
114    return started;
115  }
116
117  /**
118   * Copies the last {@link SpaceQuotaSnapshot}s that were recorded. The current view of what the
119   * RegionServer thinks the table's utilization is.
120   */
121  public Map<TableName, SpaceQuotaSnapshot> copyQuotaSnapshots() {
122    return new HashMap<>(currentQuotaSnapshots.get());
123  }
124
125  /**
126   * Updates the current {@link SpaceQuotaSnapshot}s for the RegionServer.
127   * @param newSnapshots The space quota snapshots.
128   */
129  public void updateQuotaSnapshot(Map<TableName, SpaceQuotaSnapshot> newSnapshots) {
130    currentQuotaSnapshots.set(Objects.requireNonNull(newSnapshots));
131  }
132
133  /**
134   * Creates an object well-suited for the RegionServer to use in verifying active policies.
135   */
136  public ActivePolicyEnforcement getActiveEnforcements() {
137    return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices);
138  }
139
140  /**
141   * Converts a map of table to {@link SpaceViolationPolicyEnforcement}s into
142   * {@link SpaceViolationPolicy}s.
143   */
144  public Map<TableName, SpaceQuotaSnapshot> getActivePoliciesAsMap() {
145    final Map<TableName, SpaceViolationPolicyEnforcement> enforcements = copyActiveEnforcements();
146    final Map<TableName, SpaceQuotaSnapshot> policies = new HashMap<>();
147    for (Entry<TableName, SpaceViolationPolicyEnforcement> entry : enforcements.entrySet()) {
148      final SpaceQuotaSnapshot snapshot = entry.getValue().getQuotaSnapshot();
149      if (snapshot != null) {
150        policies.put(entry.getKey(), snapshot);
151      }
152    }
153    return policies;
154  }
155
156  /**
157   * Enforces the given violationPolicy on the given table in this RegionServer.
158   */
159  public void enforceViolationPolicy(TableName tableName, SpaceQuotaSnapshot snapshot) {
160    SpaceQuotaStatus status = snapshot.getQuotaStatus();
161    if (!status.isInViolation()) {
162      throw new IllegalStateException(
163        tableName + " is not in violation. Violation policy should not be enabled.");
164    }
165    if (LOG.isTraceEnabled()) {
166      LOG.trace("Enabling violation policy enforcement on " + tableName + " with policy "
167        + status.getPolicy());
168    }
169    // Construct this outside of the lock
170    final SpaceViolationPolicyEnforcement enforcement =
171      getFactory().create(getRegionServerServices(), tableName, snapshot);
172    // "Enables" the policy
173    // HBASE-XXXX: Should this synchronize on the actual table name instead of the map? That would
174    // allow policy enable/disable on different tables to happen concurrently. As written now, only
175    // one table will be allowed to transition at a time. This is probably OK, but not sure if
176    // it would become a bottleneck at large clusters/number of tables.
177    synchronized (enforcedPolicies) {
178      try {
179        enforcement.enable();
180      } catch (IOException e) {
181        LOG.error("Failed to enable space violation policy for " + tableName
182          + ". This table will not enter violation.", e);
183        return;
184      }
185      enforcedPolicies.put(tableName, enforcement);
186    }
187  }
188
189  /**
190   * Disables enforcement on any violation policy on the given <code>tableName</code>.
191   */
192  public void disableViolationPolicyEnforcement(TableName tableName) {
193    if (LOG.isTraceEnabled()) {
194      LOG.trace("Disabling violation policy enforcement on " + tableName);
195    }
196    // "Disables" the policy
197    synchronized (enforcedPolicies) {
198      SpaceViolationPolicyEnforcement enforcement = enforcedPolicies.remove(tableName);
199      if (enforcement != null) {
200        try {
201          enforcement.disable();
202        } catch (IOException e) {
203          LOG.error("Failed to disable space violation policy for " + tableName
204            + ". This table will remain in violation.", e);
205          enforcedPolicies.put(tableName, enforcement);
206        }
207      }
208    }
209  }
210
211  /**
212   * Returns whether or not compactions should be disabled for the given <code>tableName</code> per
213   * a space quota violation policy. A convenience method.
214   * @param tableName The table to check
215   * @return True if compactions should be disabled for the table, false otherwise.
216   */
217  public boolean areCompactionsDisabled(TableName tableName) {
218    SpaceViolationPolicyEnforcement enforcement =
219      this.enforcedPolicies.get(Objects.requireNonNull(tableName));
220    if (enforcement != null) {
221      return enforcement.areCompactionsDisabled();
222    }
223    return false;
224  }
225
226  /**
227   * Returns the {@link RegionSizeStore} tracking filesystem utilization by each region.
228   * @return A {@link RegionSizeStore} implementation.
229   */
230  public RegionSizeStore getRegionSizeStore() {
231    return regionSizeStore;
232  }
233
234  /**
235   * Builds the protobuf message to inform the Master of files being archived.
236   * @param tn            The table the files previously belonged to.
237   * @param archivedFiles The files and their size in bytes that were archived.
238   * @return The protobuf representation
239   */
240  public RegionServerStatusProtos.FileArchiveNotificationRequest
241    buildFileArchiveRequest(TableName tn, Collection<Entry<String, Long>> archivedFiles) {
242    RegionServerStatusProtos.FileArchiveNotificationRequest.Builder builder =
243      RegionServerStatusProtos.FileArchiveNotificationRequest.newBuilder();
244    HBaseProtos.TableName protoTn = ProtobufUtil.toProtoTableName(tn);
245    for (Entry<String, Long> archivedFile : archivedFiles) {
246      RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize fws =
247        RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize.newBuilder()
248          .setName(archivedFile.getKey()).setSize(archivedFile.getValue()).setTableName(protoTn)
249          .build();
250      builder.addArchivedFiles(fws);
251    }
252    final RegionServerStatusProtos.FileArchiveNotificationRequest request = builder.build();
253    if (LOG.isTraceEnabled()) {
254      LOG.trace("Reporting file archival to Master: " + TextFormat.shortDebugString(request));
255    }
256    return request;
257  }
258
259  /**
260   * Returns the collection of tables which have quota violation policies enforced on this
261   * RegionServer.
262   */
263  Map<TableName, SpaceViolationPolicyEnforcement> copyActiveEnforcements() {
264    // Allows reads to happen concurrently (or while the map is being updated)
265    return new HashMap<>(this.enforcedPolicies);
266  }
267
268  RegionServerServices getRegionServerServices() {
269    return rsServices;
270  }
271
272  Connection getConnection() {
273    return rsServices.getConnection();
274  }
275
276  SpaceViolationPolicyEnforcementFactory getFactory() {
277    return factory;
278  }
279}