001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to you under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.hadoop.hbase.quotas;
018
019import java.io.IOException;
020import java.util.HashMap;
021import java.util.Map;
022import java.util.Map.Entry;
023import java.util.concurrent.TimeUnit;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.ScheduledChore;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.client.Admin;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.client.Result;
030import org.apache.hadoop.hbase.client.ResultScanner;
031import org.apache.hadoop.hbase.client.Table;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * A {@link ScheduledChore} which periodically updates the {@link RegionServerSpaceQuotaManager}
039 * with information from the hbase:quota.
040 */
041@InterfaceAudience.Private
042public class SpaceQuotaRefresherChore extends ScheduledChore {
043  private static final Logger LOG = LoggerFactory.getLogger(SpaceQuotaRefresherChore.class);
044
045  static final String POLICY_REFRESHER_CHORE_PERIOD_KEY =
046      "hbase.regionserver.quotas.policy.refresher.chore.period";
047  static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 1; // 1 minute in millis
048
049  static final String POLICY_REFRESHER_CHORE_DELAY_KEY =
050      "hbase.regionserver.quotas.policy.refresher.chore.delay";
051  static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 15L; // 15 seconds in millis
052
053  static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY =
054      "hbase.regionserver.quotas.policy.refresher.chore.timeunit";
055  static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
056
057  static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY =
058      "hbase.regionserver.quotas.policy.refresher.report.percent";
059  static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
060
061  private final RegionServerSpaceQuotaManager manager;
062  private final Connection conn;
063  private boolean quotaTablePresent = false;
064
065  public SpaceQuotaRefresherChore(RegionServerSpaceQuotaManager manager, Connection conn) {
066    super(SpaceQuotaRefresherChore.class.getSimpleName(),
067        manager.getRegionServerServices(),
068        getPeriod(manager.getRegionServerServices().getConfiguration()),
069        getInitialDelay(manager.getRegionServerServices().getConfiguration()),
070        getTimeUnit(manager.getRegionServerServices().getConfiguration()));
071    this.manager = manager;
072    this.conn = conn;
073  }
074
075  @Override
076  protected void chore() {
077    try {
078      // check whether quotaTable is present or not.
079      if (!quotaTablePresent && !checkQuotaTableExists()) {
080        LOG.info("Quota table not found, skipping quota manager cache refresh.");
081        return;
082      }
083      // since quotaTable is present so setting the flag as true.
084      quotaTablePresent = true;
085      if (LOG.isTraceEnabled()) {
086        LOG.trace("Reading current quota snapshots from hbase:quota.");
087      }
088      // Get the snapshots that the quota manager is currently aware of
089      final Map<TableName, SpaceQuotaSnapshot> currentSnapshots =
090          getManager().copyQuotaSnapshots();
091      // Read the new snapshots from the quota table
092      final Map<TableName, SpaceQuotaSnapshot> newSnapshots = fetchSnapshotsFromQuotaTable();
093      if (LOG.isTraceEnabled()) {
094        LOG.trace(currentSnapshots.size() + " table quota snapshots are collected, "
095            + "read " + newSnapshots.size() + " from the quota table.");
096      }
097      // Iterate over each new quota snapshot
098      for (Entry<TableName, SpaceQuotaSnapshot> entry : newSnapshots.entrySet()) {
099        final TableName tableName = entry.getKey();
100        final SpaceQuotaSnapshot newSnapshot = entry.getValue();
101        // May be null!
102        final SpaceQuotaSnapshot currentSnapshot = currentSnapshots.get(tableName);
103        if (LOG.isTraceEnabled()) {
104          LOG.trace(tableName + ": current=" + currentSnapshot + ", new=" + newSnapshot);
105        }
106        if (!newSnapshot.equals(currentSnapshot)) {
107          // We have a new snapshot.
108          // We might need to enforce it or disable the enforcement or switch policy
109          boolean currInViolation = isInViolation(currentSnapshot);
110          boolean newInViolation = newSnapshot.getQuotaStatus().isInViolation();
111          if (!currInViolation && newInViolation) {
112            if (LOG.isTraceEnabled()) {
113              LOG.trace("Enabling " + newSnapshot + " on " + tableName);
114            }
115            getManager().enforceViolationPolicy(tableName, newSnapshot);
116          } else if (currInViolation && !newInViolation) {
117            if (LOG.isTraceEnabled()) {
118              LOG.trace("Removing quota violation policy on " + tableName);
119            }
120            getManager().disableViolationPolicyEnforcement(tableName);
121          } else if (currInViolation && newInViolation) {
122            if (LOG.isTraceEnabled()) {
123              LOG.trace("Switching quota violation policy on " + tableName + " from "
124                  + currentSnapshot + " to " + newSnapshot);
125            }
126            getManager().enforceViolationPolicy(tableName, newSnapshot);
127          }
128        }
129      }
130
131      // Disable violation policy for all such tables which have been removed in new snapshot
132      for (TableName tableName : currentSnapshots.keySet()) {
133        // check whether table was removed in new snapshot
134        if (!newSnapshots.containsKey(tableName)) {
135          if (LOG.isTraceEnabled()) {
136            LOG.trace("Removing quota violation policy on " + tableName);
137          }
138          getManager().disableViolationPolicyEnforcement(tableName);
139        }
140      }
141
142      // We're intentionally ignoring anything extra with the currentSnapshots. If we were missing
143      // information from the RegionServers to create an accurate SpaceQuotaSnapshot in the Master,
144      // the Master will generate a new SpaceQuotaSnapshot which represents this state. This lets
145      // us avoid having to do anything special with currentSnapshots here.
146
147      // Update the snapshots in the manager
148      getManager().updateQuotaSnapshot(newSnapshots);
149    } catch (IOException e) {
150      LOG.warn(
151          "Caught exception while refreshing enforced quota violation policies, will retry.", e);
152    }
153  }
154
155  /**
156   * Checks if hbase:quota exists in hbase:meta
157   *
158   * @return true if hbase:quota table is in meta, else returns false.
159   * @throws IOException throws IOException
160   */
161  boolean checkQuotaTableExists() throws IOException {
162    try (Admin admin = getConnection().getAdmin()) {
163      return admin.tableExists(QuotaUtil.QUOTA_TABLE_NAME);
164    }
165  }
166
167  /**
168   * Checks if the given <code>snapshot</code> is in violation, allowing the snapshot to be null.
169   * If the snapshot is null, this is interpreted as no snapshot which implies not in violation.
170   *
171   * @param snapshot The snapshot to operate on.
172   * @return true if the snapshot is in violation, false otherwise.
173   */
174  boolean isInViolation(SpaceQuotaSnapshot snapshot) {
175    if (snapshot == null) {
176      return false;
177    }
178    return snapshot.getQuotaStatus().isInViolation();
179  }
180
181  /**
182   * Reads all quota snapshots from the quota table.
183   *
184   * @return The current "view" of space use by each table.
185   */
186  public Map<TableName, SpaceQuotaSnapshot> fetchSnapshotsFromQuotaTable() throws IOException {
187    try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME);
188        ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaSnapshotScan())) {
189      Map<TableName,SpaceQuotaSnapshot> snapshots = new HashMap<>();
190      for (Result result : scanner) {
191        try {
192          extractQuotaSnapshot(result, snapshots);
193        } catch (IllegalArgumentException e) {
194          final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow());
195          LOG.error(msg, e);
196          throw new IOException(msg, e);
197        }
198      }
199      return snapshots;
200    }
201  }
202
203  /**
204   * Wrapper around {@link QuotaTableUtil#extractQuotaSnapshot(Result, Map)} for testing.
205   */
206  void extractQuotaSnapshot(Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) {
207    QuotaTableUtil.extractQuotaSnapshot(result, snapshots);
208  }
209
210  Connection getConnection() {
211    return conn;
212  }
213
214  RegionServerSpaceQuotaManager getManager() {
215    return manager;
216  }
217
218  /**
219   * Extracts the period for the chore from the configuration.
220   *
221   * @param conf The configuration object.
222   * @return The configured chore period or the default value.
223   */
224  static int getPeriod(Configuration conf) {
225    return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY,
226        POLICY_REFRESHER_CHORE_PERIOD_DEFAULT);
227  }
228
229  /**
230   * Extracts the initial delay for the chore from the configuration.
231   *
232   * @param conf The configuration object.
233   * @return The configured chore initial delay or the default value.
234   */
235  static long getInitialDelay(Configuration conf) {
236    return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY,
237        POLICY_REFRESHER_CHORE_DELAY_DEFAULT);
238  }
239
240  /**
241   * Extracts the time unit for the chore period and initial delay from the configuration. The
242   * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to
243   * a {@link TimeUnit} value.
244   *
245   * @param conf The configuration object.
246   * @return The configured time unit for the chore period and initial delay or the default value.
247   */
248  static TimeUnit getTimeUnit(Configuration conf) {
249    return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY,
250        POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT));
251  }
252
253  /**
254   * Extracts the percent of Regions for a table to have been reported to enable quota violation
255   * state change.
256   *
257   * @param conf The configuration object.
258   * @return The percent of regions reported to use.
259   */
260  static Double getRegionReportPercent(Configuration conf) {
261    return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY,
262        POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT);
263  }
264}