001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.Map;
023import java.util.Map.Entry;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.ScheduledChore;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.Admin;
029import org.apache.hadoop.hbase.client.Connection;
030import org.apache.hadoop.hbase.client.Result;
031import org.apache.hadoop.hbase.client.ResultScanner;
032import org.apache.hadoop.hbase.client.Table;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * A {@link ScheduledChore} which periodically updates the {@link RegionServerSpaceQuotaManager}
040 * with information from the hbase:quota.
041 */
042@InterfaceAudience.Private
043public class SpaceQuotaRefresherChore extends ScheduledChore {
044  private static final Logger LOG = LoggerFactory.getLogger(SpaceQuotaRefresherChore.class);
045
046  static final String POLICY_REFRESHER_CHORE_PERIOD_KEY =
047    "hbase.regionserver.quotas.policy.refresher.chore.period";
048  static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 1; // 1 minute in millis
049
050  static final String POLICY_REFRESHER_CHORE_DELAY_KEY =
051    "hbase.regionserver.quotas.policy.refresher.chore.delay";
052  static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 15L; // 15 seconds in millis
053
054  static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY =
055    "hbase.regionserver.quotas.policy.refresher.chore.timeunit";
056  static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
057
058  static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY =
059    "hbase.regionserver.quotas.policy.refresher.report.percent";
060  static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT = 0.95;
061
062  private final RegionServerSpaceQuotaManager manager;
063  private final Connection conn;
064  private boolean quotaTablePresent = false;
065
066  public SpaceQuotaRefresherChore(RegionServerSpaceQuotaManager manager, Connection conn) {
067    super(SpaceQuotaRefresherChore.class.getSimpleName(), manager.getRegionServerServices(),
068      getPeriod(manager.getRegionServerServices().getConfiguration()),
069      getInitialDelay(manager.getRegionServerServices().getConfiguration()),
070      getTimeUnit(manager.getRegionServerServices().getConfiguration()));
071    this.manager = manager;
072    this.conn = conn;
073  }
074
075  @Override
076  protected void chore() {
077    try {
078      // check whether quotaTable is present or not.
079      if (!quotaTablePresent && !checkQuotaTableExists()) {
080        LOG.info("Quota table not found, skipping quota manager cache refresh.");
081        return;
082      }
083      // since quotaTable is present so setting the flag as true.
084      quotaTablePresent = true;
085      if (LOG.isTraceEnabled()) {
086        LOG.trace("Reading current quota snapshots from hbase:quota.");
087      }
088      // Get the snapshots that the quota manager is currently aware of
089      final Map<TableName, SpaceQuotaSnapshot> currentSnapshots = getManager().copyQuotaSnapshots();
090      // Read the new snapshots from the quota table
091      final Map<TableName, SpaceQuotaSnapshot> newSnapshots = fetchSnapshotsFromQuotaTable();
092      if (LOG.isTraceEnabled()) {
093        LOG.trace(currentSnapshots.size() + " table quota snapshots are collected, " + "read "
094          + newSnapshots.size() + " from the quota table.");
095      }
096      // Iterate over each new quota snapshot
097      for (Entry<TableName, SpaceQuotaSnapshot> entry : newSnapshots.entrySet()) {
098        final TableName tableName = entry.getKey();
099        final SpaceQuotaSnapshot newSnapshot = entry.getValue();
100        // May be null!
101        final SpaceQuotaSnapshot currentSnapshot = currentSnapshots.get(tableName);
102        if (LOG.isTraceEnabled()) {
103          LOG.trace(tableName + ": current=" + currentSnapshot + ", new=" + newSnapshot);
104        }
105        if (!newSnapshot.equals(currentSnapshot)) {
106          // We have a new snapshot.
107          // We might need to enforce it or disable the enforcement or switch policy
108          boolean currInViolation = isInViolation(currentSnapshot);
109          boolean newInViolation = newSnapshot.getQuotaStatus().isInViolation();
110          if (!currInViolation && newInViolation) {
111            if (LOG.isTraceEnabled()) {
112              LOG.trace("Enabling " + newSnapshot + " on " + tableName);
113            }
114            getManager().enforceViolationPolicy(tableName, newSnapshot);
115          } else if (currInViolation && !newInViolation) {
116            if (LOG.isTraceEnabled()) {
117              LOG.trace("Removing quota violation policy on " + tableName);
118            }
119            getManager().disableViolationPolicyEnforcement(tableName);
120          } else if (currInViolation && newInViolation) {
121            if (LOG.isTraceEnabled()) {
122              LOG.trace("Switching quota violation policy on " + tableName + " from "
123                + currentSnapshot + " to " + newSnapshot);
124            }
125            getManager().enforceViolationPolicy(tableName, newSnapshot);
126          }
127        }
128      }
129
130      // Disable violation policy for all such tables which have been removed in new snapshot
131      for (TableName tableName : currentSnapshots.keySet()) {
132        // check whether table was removed in new snapshot
133        if (!newSnapshots.containsKey(tableName)) {
134          if (LOG.isTraceEnabled()) {
135            LOG.trace("Removing quota violation policy on " + tableName);
136          }
137          getManager().disableViolationPolicyEnforcement(tableName);
138        }
139      }
140
141      // We're intentionally ignoring anything extra with the currentSnapshots. If we were missing
142      // information from the RegionServers to create an accurate SpaceQuotaSnapshot in the Master,
143      // the Master will generate a new SpaceQuotaSnapshot which represents this state. This lets
144      // us avoid having to do anything special with currentSnapshots here.
145
146      // Update the snapshots in the manager
147      getManager().updateQuotaSnapshot(newSnapshots);
148    } catch (IOException e) {
149      LOG.warn("Caught exception while refreshing enforced quota violation policies, will retry.",
150        e);
151    }
152  }
153
154  /**
155   * Checks if hbase:quota exists in hbase:meta
156   * @return true if hbase:quota table is in meta, else returns false.
157   * @throws IOException throws IOException
158   */
159  boolean checkQuotaTableExists() throws IOException {
160    try (Admin admin = getConnection().getAdmin()) {
161      return admin.tableExists(QuotaUtil.QUOTA_TABLE_NAME);
162    }
163  }
164
165  /**
166   * Checks if the given <code>snapshot</code> is in violation, allowing the snapshot to be null. If
167   * the snapshot is null, this is interpreted as no snapshot which implies not in violation.
168   * @param snapshot The snapshot to operate on.
169   * @return true if the snapshot is in violation, false otherwise.
170   */
171  boolean isInViolation(SpaceQuotaSnapshot snapshot) {
172    if (snapshot == null) {
173      return false;
174    }
175    return snapshot.getQuotaStatus().isInViolation();
176  }
177
178  /**
179   * Reads all quota snapshots from the quota table.
180   * @return The current "view" of space use by each table.
181   */
182  public Map<TableName, SpaceQuotaSnapshot> fetchSnapshotsFromQuotaTable() throws IOException {
183    try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME);
184      ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaSnapshotScan())) {
185      Map<TableName, SpaceQuotaSnapshot> snapshots = new HashMap<>();
186      for (Result result : scanner) {
187        try {
188          extractQuotaSnapshot(result, snapshots);
189        } catch (IllegalArgumentException e) {
190          final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow());
191          LOG.error(msg, e);
192          throw new IOException(msg, e);
193        }
194      }
195      return snapshots;
196    }
197  }
198
199  /**
200   * Wrapper around {@link QuotaTableUtil#extractQuotaSnapshot(Result, Map)} for testing.
201   */
202  void extractQuotaSnapshot(Result result, Map<TableName, SpaceQuotaSnapshot> snapshots) {
203    QuotaTableUtil.extractQuotaSnapshot(result, snapshots);
204  }
205
206  Connection getConnection() {
207    return conn;
208  }
209
210  RegionServerSpaceQuotaManager getManager() {
211    return manager;
212  }
213
214  /**
215   * Extracts the period for the chore from the configuration.
216   * @param conf The configuration object.
217   * @return The configured chore period or the default value.
218   */
219  static int getPeriod(Configuration conf) {
220    return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY, POLICY_REFRESHER_CHORE_PERIOD_DEFAULT);
221  }
222
223  /**
224   * Extracts the initial delay for the chore from the configuration.
225   * @param conf The configuration object.
226   * @return The configured chore initial delay or the default value.
227   */
228  static long getInitialDelay(Configuration conf) {
229    return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY, POLICY_REFRESHER_CHORE_DELAY_DEFAULT);
230  }
231
232  /**
233   * Extracts the time unit for the chore period and initial delay from the configuration. The
234   * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to a
235   * {@link TimeUnit} value.
236   * @param conf The configuration object.
237   * @return The configured time unit for the chore period and initial delay or the default value.
238   */
239  static TimeUnit getTimeUnit(Configuration conf) {
240    return TimeUnit.valueOf(
241      conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY, POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT));
242  }
243
244  /**
245   * Extracts the percent of Regions for a table to have been reported to enable quota violation
246   * state change.
247   * @param conf The configuration object.
248   * @return The percent of regions reported to use.
249   */
250  static Double getRegionReportPercent(Configuration conf) {
251    return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY,
252      POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT);
253  }
254}