001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to you under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.hadoop.hbase.quotas;
018
019import java.io.IOException;
020import java.util.HashMap;
021import java.util.Map;
022import java.util.Map.Entry;
023import java.util.concurrent.TimeUnit;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.ScheduledChore;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031import org.apache.hadoop.hbase.client.Connection;
032import org.apache.hadoop.hbase.client.Result;
033import org.apache.hadoop.hbase.client.ResultScanner;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.util.Bytes;
036
037/**
038 * A {@link ScheduledChore} which periodically updates the {@link RegionServerSpaceQuotaManager}
039 * with information from the hbase:quota.
040 */
041@InterfaceAudience.Private
042public class SpaceQuotaRefresherChore extends ScheduledChore {
043  private static final Logger LOG = LoggerFactory.getLogger(SpaceQuotaRefresherChore.class);
044
045  static final String POLICY_REFRESHER_CHORE_PERIOD_KEY =
046      "hbase.regionserver.quotas.policy.refresher.chore.period";
047  static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 1; // 1 minute in millis
048
049  static final String POLICY_REFRESHER_CHORE_DELAY_KEY =
050      "hbase.regionserver.quotas.policy.refresher.chore.delay";
051  static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 15L; // 15 seconds in millis
052
053  static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY =
054      "hbase.regionserver.quotas.policy.refresher.chore.timeunit";
055  static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
056
057  static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY =
058      "hbase.regionserver.quotas.policy.refresher.report.percent";
059  static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
060
061  private final RegionServerSpaceQuotaManager manager;
062  private final Connection conn;
063
064  public SpaceQuotaRefresherChore(RegionServerSpaceQuotaManager manager, Connection conn) {
065    super(SpaceQuotaRefresherChore.class.getSimpleName(),
066        manager.getRegionServerServices(),
067        getPeriod(manager.getRegionServerServices().getConfiguration()),
068        getInitialDelay(manager.getRegionServerServices().getConfiguration()),
069        getTimeUnit(manager.getRegionServerServices().getConfiguration()));
070    this.manager = manager;
071    this.conn = conn;
072  }
073
074  @Override
075  protected void chore() {
076    try {
077      if (LOG.isTraceEnabled()) {
078        LOG.trace("Reading current quota snapshots from hbase:quota.");
079      }
080      // Get the snapshots that the quota manager is currently aware of
081      final Map<TableName, SpaceQuotaSnapshot> currentSnapshots =
082          getManager().copyQuotaSnapshots();
083      // Read the new snapshots from the quota table
084      final Map<TableName, SpaceQuotaSnapshot> newSnapshots = fetchSnapshotsFromQuotaTable();
085      if (LOG.isTraceEnabled()) {
086        LOG.trace(currentSnapshots.size() + " table quota snapshots are collected, "
087            + "read " + newSnapshots.size() + " from the quota table.");
088      }
089      // Iterate over each new quota snapshot
090      for (Entry<TableName, SpaceQuotaSnapshot> entry : newSnapshots.entrySet()) {
091        final TableName tableName = entry.getKey();
092        final SpaceQuotaSnapshot newSnapshot = entry.getValue();
093        // May be null!
094        final SpaceQuotaSnapshot currentSnapshot = currentSnapshots.get(tableName);
095        if (LOG.isTraceEnabled()) {
096          LOG.trace(tableName + ": current=" + currentSnapshot + ", new=" + newSnapshot);
097        }
098        if (!newSnapshot.equals(currentSnapshot)) {
099          // We have a new snapshot.
100          // We might need to enforce it or disable the enforcement or switch policy
101          boolean currInViolation = isInViolation(currentSnapshot);
102          boolean newInViolation = newSnapshot.getQuotaStatus().isInViolation();
103          if (!currInViolation && newInViolation) {
104            if (LOG.isTraceEnabled()) {
105              LOG.trace("Enabling " + newSnapshot + " on " + tableName);
106            }
107            getManager().enforceViolationPolicy(tableName, newSnapshot);
108          } else if (currInViolation && !newInViolation) {
109            if (LOG.isTraceEnabled()) {
110              LOG.trace("Removing quota violation policy on " + tableName);
111            }
112            getManager().disableViolationPolicyEnforcement(tableName);
113          } else if (currInViolation && newInViolation) {
114            if (LOG.isTraceEnabled()) {
115              LOG.trace("Switching quota violation policy on " + tableName + " from "
116                  + currentSnapshot + " to " + newSnapshot);
117            }
118            getManager().enforceViolationPolicy(tableName, newSnapshot);
119          }
120        }
121      }
122
123      // Disable violation policy for all such tables which have been removed in new snapshot
124      for (TableName tableName : currentSnapshots.keySet()) {
125        // check whether table was removed in new snapshot
126        if (!newSnapshots.containsKey(tableName)) {
127          if (LOG.isTraceEnabled()) {
128            LOG.trace("Removing quota violation policy on " + tableName);
129          }
130          getManager().disableViolationPolicyEnforcement(tableName);
131        }
132      }
133
134      // We're intentionally ignoring anything extra with the currentSnapshots. If we were missing
135      // information from the RegionServers to create an accurate SpaceQuotaSnapshot in the Master,
136      // the Master will generate a new SpaceQuotaSnapshot which represents this state. This lets
137      // us avoid having to do anything special with currentSnapshots here.
138
139      // Update the snapshots in the manager
140      getManager().updateQuotaSnapshot(newSnapshots);
141    } catch (IOException e) {
142      LOG.warn(
143          "Caught exception while refreshing enforced quota violation policies, will retry.", e);
144    }
145  }
146
147  /**
148   * Checks if the given <code>snapshot</code> is in violation, allowing the snapshot to be null.
149   * If the snapshot is null, this is interpreted as no snapshot which implies not in violation.
150   *
151   * @param snapshot The snapshot to operate on.
152   * @return true if the snapshot is in violation, false otherwise.
153   */
154  boolean isInViolation(SpaceQuotaSnapshot snapshot) {
155    if (snapshot == null) {
156      return false;
157    }
158    return snapshot.getQuotaStatus().isInViolation();
159  }
160
161  /**
162   * Reads all quota snapshots from the quota table.
163   *
164   * @return The current "view" of space use by each table.
165   */
166  public Map<TableName, SpaceQuotaSnapshot> fetchSnapshotsFromQuotaTable() throws IOException {
167    try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME);
168        ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaSnapshotScan())) {
169      Map<TableName,SpaceQuotaSnapshot> snapshots = new HashMap<>();
170      for (Result result : scanner) {
171        try {
172          extractQuotaSnapshot(result, snapshots);
173        } catch (IllegalArgumentException e) {
174          final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow());
175          LOG.error(msg, e);
176          throw new IOException(msg, e);
177        }
178      }
179      return snapshots;
180    }
181  }
182
183  /**
184   * Wrapper around {@link QuotaTableUtil#extractQuotaSnapshot(Result, Map)} for testing.
185   */
186  void extractQuotaSnapshot(Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) {
187    QuotaTableUtil.extractQuotaSnapshot(result, snapshots);
188  }
189
190  Connection getConnection() {
191    return conn;
192  }
193
194  RegionServerSpaceQuotaManager getManager() {
195    return manager;
196  }
197
198  /**
199   * Extracts the period for the chore from the configuration.
200   *
201   * @param conf The configuration object.
202   * @return The configured chore period or the default value.
203   */
204  static int getPeriod(Configuration conf) {
205    return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY,
206        POLICY_REFRESHER_CHORE_PERIOD_DEFAULT);
207  }
208
209  /**
210   * Extracts the initial delay for the chore from the configuration.
211   *
212   * @param conf The configuration object.
213   * @return The configured chore initial delay or the default value.
214   */
215  static long getInitialDelay(Configuration conf) {
216    return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY,
217        POLICY_REFRESHER_CHORE_DELAY_DEFAULT);
218  }
219
220  /**
221   * Extracts the time unit for the chore period and initial delay from the configuration. The
222   * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to
223   * a {@link TimeUnit} value.
224   *
225   * @param conf The configuration object.
226   * @return The configured time unit for the chore period and initial delay or the default value.
227   */
228  static TimeUnit getTimeUnit(Configuration conf) {
229    return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY,
230        POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT));
231  }
232
233  /**
234   * Extracts the percent of Regions for a table to have been reported to enable quota violation
235   * state change.
236   *
237   * @param conf The configuration object.
238   * @return The percent of regions reported to use.
239   */
240  static Double getRegionReportPercent(Configuration conf) {
241    return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY,
242        POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT);
243  }
244}