001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.HashMap;
023import java.util.Map;
024import java.util.Map.Entry;
025import java.util.Objects;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.atomic.AtomicReference;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.Connection;
030import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
031import org.apache.hadoop.hbase.regionserver.RegionServerServices;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
037
038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
041
042/**
043 * A manager for filesystem space quotas in the RegionServer. This class is the centralized point
044 * for what a RegionServer knows about space quotas on tables. For each table, it tracks two
045 * different things: the {@link SpaceQuotaSnapshot} and a {@link SpaceViolationPolicyEnforcement}
046 * (which may be null when a quota is not being violated). Both of these are sensitive on when they
047 * were last updated. The {link SpaceQutoaViolationPolicyRefresherChore} periodically runs and
048 * updates the state on <code>this</code>.
049 */
050@InterfaceAudience.Private
051public class RegionServerSpaceQuotaManager {
052  private static final Logger LOG = LoggerFactory.getLogger(RegionServerSpaceQuotaManager.class);
053
054  private final RegionServerServices rsServices;
055
056  private SpaceQuotaRefresherChore spaceQuotaRefresher;
057  private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots;
058  private boolean started = false;
059  private final ConcurrentHashMap<TableName, SpaceViolationPolicyEnforcement> enforcedPolicies;
060  private SpaceViolationPolicyEnforcementFactory factory;
061  private RegionSizeStore regionSizeStore;
062  private RegionSizeReportingChore regionSizeReporter;
063
064  public RegionServerSpaceQuotaManager(RegionServerServices rsServices) {
065    this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance());
066  }
067
068  RegionServerSpaceQuotaManager(RegionServerServices rsServices,
069    SpaceViolationPolicyEnforcementFactory factory) {
070    this.rsServices = Objects.requireNonNull(rsServices);
071    this.factory = factory;
072    this.enforcedPolicies = new ConcurrentHashMap<>();
073    this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>());
074    // Initialize the size store to not track anything -- create the real one if we're start()'ed
075    this.regionSizeStore = NoOpRegionSizeStore.getInstance();
076  }
077
078  public synchronized void start() throws IOException {
079    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
080      LOG.info("Quota support disabled, not starting space quota manager.");
081      return;
082    }
083
084    if (started) {
085      LOG.warn("RegionServerSpaceQuotaManager has already been started!");
086      return;
087    }
088    // Start the chores
089    this.spaceQuotaRefresher =
090      new SpaceQuotaRefresherChore(this, rsServices.getClusterConnection());
091    rsServices.getChoreService().scheduleChore(spaceQuotaRefresher);
092    this.regionSizeReporter = new RegionSizeReportingChore(rsServices);
093    rsServices.getChoreService().scheduleChore(regionSizeReporter);
094    // Instantiate the real RegionSizeStore
095    this.regionSizeStore = RegionSizeStoreFactory.getInstance().createStore();
096    started = true;
097  }
098
099  public synchronized void stop() {
100    if (spaceQuotaRefresher != null) {
101      spaceQuotaRefresher.shutdown();
102      spaceQuotaRefresher = null;
103    }
104    if (regionSizeReporter != null) {
105      regionSizeReporter.shutdown();
106      regionSizeReporter = null;
107    }
108    started = false;
109  }
110
111  /** Returns if the {@code Chore} has been started. */
112  public boolean isStarted() {
113    return started;
114  }
115
116  /**
117   * Copies the last {@link SpaceQuotaSnapshot}s that were recorded. The current view of what the
118   * RegionServer thinks the table's utilization is.
119   */
120  public Map<TableName, SpaceQuotaSnapshot> copyQuotaSnapshots() {
121    return new HashMap<>(currentQuotaSnapshots.get());
122  }
123
124  /**
125   * Updates the current {@link SpaceQuotaSnapshot}s for the RegionServer.
126   * @param newSnapshots The space quota snapshots.
127   */
128  public void updateQuotaSnapshot(Map<TableName, SpaceQuotaSnapshot> newSnapshots) {
129    currentQuotaSnapshots.set(Objects.requireNonNull(newSnapshots));
130  }
131
132  /**
133   * Creates an object well-suited for the RegionServer to use in verifying active policies.
134   */
135  public ActivePolicyEnforcement getActiveEnforcements() {
136    return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices);
137  }
138
139  /**
140   * Converts a map of table to {@link SpaceViolationPolicyEnforcement}s into
141   * {@link SpaceViolationPolicy}s.
142   */
143  public Map<TableName, SpaceQuotaSnapshot> getActivePoliciesAsMap() {
144    final Map<TableName, SpaceViolationPolicyEnforcement> enforcements = copyActiveEnforcements();
145    final Map<TableName, SpaceQuotaSnapshot> policies = new HashMap<>();
146    for (Entry<TableName, SpaceViolationPolicyEnforcement> entry : enforcements.entrySet()) {
147      final SpaceQuotaSnapshot snapshot = entry.getValue().getQuotaSnapshot();
148      if (snapshot != null) {
149        policies.put(entry.getKey(), snapshot);
150      }
151    }
152    return policies;
153  }
154
155  /**
156   * Enforces the given violationPolicy on the given table in this RegionServer.
157   */
158  public void enforceViolationPolicy(TableName tableName, SpaceQuotaSnapshot snapshot) {
159    SpaceQuotaStatus status = snapshot.getQuotaStatus();
160    if (!status.isInViolation()) {
161      throw new IllegalStateException(
162        tableName + " is not in violation. Violation policy should not be enabled.");
163    }
164    if (LOG.isTraceEnabled()) {
165      LOG.trace("Enabling violation policy enforcement on " + tableName + " with policy "
166        + status.getPolicy());
167    }
168    // Construct this outside of the lock
169    final SpaceViolationPolicyEnforcement enforcement =
170      getFactory().create(getRegionServerServices(), tableName, snapshot);
171    // "Enables" the policy
172    // HBASE-XXXX: Should this synchronize on the actual table name instead of the map? That would
173    // allow policy enable/disable on different tables to happen concurrently. As written now, only
174    // one table will be allowed to transition at a time. This is probably OK, but not sure if
175    // it would become a bottleneck at large clusters/number of tables.
176    synchronized (enforcedPolicies) {
177      try {
178        enforcement.enable();
179      } catch (IOException e) {
180        LOG.error("Failed to enable space violation policy for " + tableName
181          + ". This table will not enter violation.", e);
182        return;
183      }
184      enforcedPolicies.put(tableName, enforcement);
185    }
186  }
187
188  /**
189   * Disables enforcement on any violation policy on the given <code>tableName</code>.
190   */
191  public void disableViolationPolicyEnforcement(TableName tableName) {
192    if (LOG.isTraceEnabled()) {
193      LOG.trace("Disabling violation policy enforcement on " + tableName);
194    }
195    // "Disables" the policy
196    synchronized (enforcedPolicies) {
197      SpaceViolationPolicyEnforcement enforcement = enforcedPolicies.remove(tableName);
198      if (enforcement != null) {
199        try {
200          enforcement.disable();
201        } catch (IOException e) {
202          LOG.error("Failed to disable space violation policy for " + tableName
203            + ". This table will remain in violation.", e);
204          enforcedPolicies.put(tableName, enforcement);
205        }
206      }
207    }
208  }
209
210  /**
211   * Returns whether or not compactions should be disabled for the given <code>tableName</code> per
212   * a space quota violation policy. A convenience method.
213   * @param tableName The table to check
214   * @return True if compactions should be disabled for the table, false otherwise.
215   */
216  public boolean areCompactionsDisabled(TableName tableName) {
217    SpaceViolationPolicyEnforcement enforcement =
218      this.enforcedPolicies.get(Objects.requireNonNull(tableName));
219    if (enforcement != null) {
220      return enforcement.areCompactionsDisabled();
221    }
222    return false;
223  }
224
225  /**
226   * Returns the {@link RegionSizeStore} tracking filesystem utilization by each region.
227   * @return A {@link RegionSizeStore} implementation.
228   */
229  public RegionSizeStore getRegionSizeStore() {
230    return regionSizeStore;
231  }
232
233  /**
234   * Builds the protobuf message to inform the Master of files being archived.
235   * @param tn            The table the files previously belonged to.
236   * @param archivedFiles The files and their size in bytes that were archived.
237   * @return The protobuf representation
238   */
239  public RegionServerStatusProtos.FileArchiveNotificationRequest
240    buildFileArchiveRequest(TableName tn, Collection<Entry<String, Long>> archivedFiles) {
241    RegionServerStatusProtos.FileArchiveNotificationRequest.Builder builder =
242      RegionServerStatusProtos.FileArchiveNotificationRequest.newBuilder();
243    HBaseProtos.TableName protoTn = ProtobufUtil.toProtoTableName(tn);
244    for (Entry<String, Long> archivedFile : archivedFiles) {
245      RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize fws =
246        RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize.newBuilder()
247          .setName(archivedFile.getKey()).setSize(archivedFile.getValue()).setTableName(protoTn)
248          .build();
249      builder.addArchivedFiles(fws);
250    }
251    final RegionServerStatusProtos.FileArchiveNotificationRequest request = builder.build();
252    if (LOG.isTraceEnabled()) {
253      LOG.trace("Reporting file archival to Master: " + TextFormat.shortDebugString(request));
254    }
255    return request;
256  }
257
258  /**
259   * Returns the collection of tables which have quota violation policies enforced on this
260   * RegionServer.
261   */
262  Map<TableName, SpaceViolationPolicyEnforcement> copyActiveEnforcements() {
263    // Allows reads to happen concurrently (or while the map is being updated)
264    return new HashMap<>(this.enforcedPolicies);
265  }
266
267  RegionServerServices getRegionServerServices() {
268    return rsServices;
269  }
270
271  Connection getConnection() {
272    return rsServices.getConnection();
273  }
274
275  SpaceViolationPolicyEnforcementFactory getFactory() {
276    return factory;
277  }
278}