001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to you under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.hadoop.hbase.quotas;
018
019import java.io.IOException;
020import java.util.HashMap;
021import java.util.Map;
022import java.util.Map.Entry;
023import java.util.concurrent.TimeUnit;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.MetaTableAccessor;
027import org.apache.hadoop.hbase.ScheduledChore;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.Result;
034import org.apache.hadoop.hbase.client.ResultScanner;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.util.Bytes;
037
038/**
039 * A {@link ScheduledChore} which periodically updates the {@link RegionServerSpaceQuotaManager}
040 * with information from the hbase:quota.
041 */
042@InterfaceAudience.Private
043public class SpaceQuotaRefresherChore extends ScheduledChore {
044  private static final Logger LOG = LoggerFactory.getLogger(SpaceQuotaRefresherChore.class);
045
046  static final String POLICY_REFRESHER_CHORE_PERIOD_KEY =
047      "hbase.regionserver.quotas.policy.refresher.chore.period";
048  static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 1; // 1 minute in millis
049
050  static final String POLICY_REFRESHER_CHORE_DELAY_KEY =
051      "hbase.regionserver.quotas.policy.refresher.chore.delay";
052  static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 15L; // 15 seconds in millis
053
054  static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY =
055      "hbase.regionserver.quotas.policy.refresher.chore.timeunit";
056  static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
057
058  static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY =
059      "hbase.regionserver.quotas.policy.refresher.report.percent";
060  static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
061
062  private final RegionServerSpaceQuotaManager manager;
063  private final Connection conn;
064  private boolean quotaTablePresent = false;
065
066  public SpaceQuotaRefresherChore(RegionServerSpaceQuotaManager manager, Connection conn) {
067    super(SpaceQuotaRefresherChore.class.getSimpleName(),
068        manager.getRegionServerServices(),
069        getPeriod(manager.getRegionServerServices().getConfiguration()),
070        getInitialDelay(manager.getRegionServerServices().getConfiguration()),
071        getTimeUnit(manager.getRegionServerServices().getConfiguration()));
072    this.manager = manager;
073    this.conn = conn;
074  }
075
076  @Override
077  protected void chore() {
078    try {
079      // check whether quotaTable is present or not.
080      if (!quotaTablePresent && !checkQuotaTableExists()) {
081        LOG.info("Quota table not found, skipping quota manager cache refresh.");
082        return;
083      }
084      // since quotaTable is present so setting the flag as true.
085      quotaTablePresent = true;
086      if (LOG.isTraceEnabled()) {
087        LOG.trace("Reading current quota snapshots from hbase:quota.");
088      }
089      // Get the snapshots that the quota manager is currently aware of
090      final Map<TableName, SpaceQuotaSnapshot> currentSnapshots =
091          getManager().copyQuotaSnapshots();
092      // Read the new snapshots from the quota table
093      final Map<TableName, SpaceQuotaSnapshot> newSnapshots = fetchSnapshotsFromQuotaTable();
094      if (LOG.isTraceEnabled()) {
095        LOG.trace(currentSnapshots.size() + " table quota snapshots are collected, "
096            + "read " + newSnapshots.size() + " from the quota table.");
097      }
098      // Iterate over each new quota snapshot
099      for (Entry<TableName, SpaceQuotaSnapshot> entry : newSnapshots.entrySet()) {
100        final TableName tableName = entry.getKey();
101        final SpaceQuotaSnapshot newSnapshot = entry.getValue();
102        // May be null!
103        final SpaceQuotaSnapshot currentSnapshot = currentSnapshots.get(tableName);
104        if (LOG.isTraceEnabled()) {
105          LOG.trace(tableName + ": current=" + currentSnapshot + ", new=" + newSnapshot);
106        }
107        if (!newSnapshot.equals(currentSnapshot)) {
108          // We have a new snapshot.
109          // We might need to enforce it or disable the enforcement or switch policy
110          boolean currInViolation = isInViolation(currentSnapshot);
111          boolean newInViolation = newSnapshot.getQuotaStatus().isInViolation();
112          if (!currInViolation && newInViolation) {
113            if (LOG.isTraceEnabled()) {
114              LOG.trace("Enabling " + newSnapshot + " on " + tableName);
115            }
116            getManager().enforceViolationPolicy(tableName, newSnapshot);
117          } else if (currInViolation && !newInViolation) {
118            if (LOG.isTraceEnabled()) {
119              LOG.trace("Removing quota violation policy on " + tableName);
120            }
121            getManager().disableViolationPolicyEnforcement(tableName);
122          } else if (currInViolation && newInViolation) {
123            if (LOG.isTraceEnabled()) {
124              LOG.trace("Switching quota violation policy on " + tableName + " from "
125                  + currentSnapshot + " to " + newSnapshot);
126            }
127            getManager().enforceViolationPolicy(tableName, newSnapshot);
128          }
129        }
130      }
131
132      // Disable violation policy for all such tables which have been removed in new snapshot
133      for (TableName tableName : currentSnapshots.keySet()) {
134        // check whether table was removed in new snapshot
135        if (!newSnapshots.containsKey(tableName)) {
136          if (LOG.isTraceEnabled()) {
137            LOG.trace("Removing quota violation policy on " + tableName);
138          }
139          getManager().disableViolationPolicyEnforcement(tableName);
140        }
141      }
142
143      // We're intentionally ignoring anything extra with the currentSnapshots. If we were missing
144      // information from the RegionServers to create an accurate SpaceQuotaSnapshot in the Master,
145      // the Master will generate a new SpaceQuotaSnapshot which represents this state. This lets
146      // us avoid having to do anything special with currentSnapshots here.
147
148      // Update the snapshots in the manager
149      getManager().updateQuotaSnapshot(newSnapshots);
150    } catch (IOException e) {
151      LOG.warn(
152          "Caught exception while refreshing enforced quota violation policies, will retry.", e);
153    }
154  }
155
156  /**
157   * Checks if hbase:quota exists in hbase:meta
158   *
159   * @return true if hbase:quota table is in meta, else returns false.
160   * @throws IOException throws IOException
161   */
162  boolean checkQuotaTableExists() throws IOException {
163    return MetaTableAccessor.tableExists(getConnection(), QuotaUtil.QUOTA_TABLE_NAME);
164  }
165
166  /**
167   * Checks if the given <code>snapshot</code> is in violation, allowing the snapshot to be null.
168   * If the snapshot is null, this is interpreted as no snapshot which implies not in violation.
169   *
170   * @param snapshot The snapshot to operate on.
171   * @return true if the snapshot is in violation, false otherwise.
172   */
173  boolean isInViolation(SpaceQuotaSnapshot snapshot) {
174    if (snapshot == null) {
175      return false;
176    }
177    return snapshot.getQuotaStatus().isInViolation();
178  }
179
180  /**
181   * Reads all quota snapshots from the quota table.
182   *
183   * @return The current "view" of space use by each table.
184   */
185  public Map<TableName, SpaceQuotaSnapshot> fetchSnapshotsFromQuotaTable() throws IOException {
186    try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME);
187        ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaSnapshotScan())) {
188      Map<TableName,SpaceQuotaSnapshot> snapshots = new HashMap<>();
189      for (Result result : scanner) {
190        try {
191          extractQuotaSnapshot(result, snapshots);
192        } catch (IllegalArgumentException e) {
193          final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow());
194          LOG.error(msg, e);
195          throw new IOException(msg, e);
196        }
197      }
198      return snapshots;
199    }
200  }
201
202  /**
203   * Wrapper around {@link QuotaTableUtil#extractQuotaSnapshot(Result, Map)} for testing.
204   */
205  void extractQuotaSnapshot(Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) {
206    QuotaTableUtil.extractQuotaSnapshot(result, snapshots);
207  }
208
209  Connection getConnection() {
210    return conn;
211  }
212
213  RegionServerSpaceQuotaManager getManager() {
214    return manager;
215  }
216
217  /**
218   * Extracts the period for the chore from the configuration.
219   *
220   * @param conf The configuration object.
221   * @return The configured chore period or the default value.
222   */
223  static int getPeriod(Configuration conf) {
224    return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY,
225        POLICY_REFRESHER_CHORE_PERIOD_DEFAULT);
226  }
227
228  /**
229   * Extracts the initial delay for the chore from the configuration.
230   *
231   * @param conf The configuration object.
232   * @return The configured chore initial delay or the default value.
233   */
234  static long getInitialDelay(Configuration conf) {
235    return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY,
236        POLICY_REFRESHER_CHORE_DELAY_DEFAULT);
237  }
238
239  /**
240   * Extracts the time unit for the chore period and initial delay from the configuration. The
241   * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to
242   * a {@link TimeUnit} value.
243   *
244   * @param conf The configuration object.
245   * @return The configured time unit for the chore period and initial delay or the default value.
246   */
247  static TimeUnit getTimeUnit(Configuration conf) {
248    return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY,
249        POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT));
250  }
251
252  /**
253   * Extracts the percent of Regions for a table to have been reported to enable quota violation
254   * state change.
255   *
256   * @param conf The configuration object.
257   * @return The percent of regions reported to use.
258   */
259  static Double getRegionReportPercent(Configuration conf) {
260    return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY,
261        POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT);
262  }
263}