001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.quotas;
020
021import java.io.IOException;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.concurrent.ConcurrentHashMap;
029
030import org.apache.commons.lang3.builder.HashCodeBuilder;
031import org.apache.hadoop.hbase.DoNotRetryIOException;
032import org.apache.hadoop.hbase.MetaTableAccessor;
033import org.apache.hadoop.hbase.NamespaceDescriptor;
034import org.apache.hadoop.hbase.RegionStateListener;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.master.MasterServices;
038import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
039import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.apache.yetus.audience.InterfaceStability;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
046import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
050
051/**
052 * Master Quota Manager.
053 * It is responsible for initialize the quota table on the first-run and
054 * provide the admin operations to interact with the quota table.
055 *
056 * TODO: FUTURE: The master will be responsible to notify each RS of quota changes
057 * and it will do the "quota aggregation" when the QuotaScope is CLUSTER.
058 */
059@InterfaceAudience.Private
060@InterfaceStability.Evolving
061public class MasterQuotaManager implements RegionStateListener {
062  private static final Logger LOG = LoggerFactory.getLogger(MasterQuotaManager.class);
063  private static final Map<RegionInfo, Long> EMPTY_MAP = Collections.unmodifiableMap(
064      new HashMap<>());
065
066  private final MasterServices masterServices;
067  private NamedLock<String> namespaceLocks;
068  private NamedLock<TableName> tableLocks;
069  private NamedLock<String> userLocks;
070  private boolean initialized = false;
071  private NamespaceAuditor namespaceQuotaManager;
072  private ConcurrentHashMap<RegionInfo, SizeSnapshotWithTimestamp> regionSizes;
073
074  public MasterQuotaManager(final MasterServices masterServices) {
075    this.masterServices = masterServices;
076  }
077
078  public void start() throws IOException {
079    // If the user doesn't want the quota support skip all the initializations.
080    if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
081      LOG.info("Quota support disabled");
082      return;
083    }
084
085    // Create the quota table if missing
086    if (!MetaTableAccessor.tableExists(masterServices.getConnection(),
087          QuotaUtil.QUOTA_TABLE_NAME)) {
088      LOG.info("Quota table not found. Creating...");
089      createQuotaTable();
090    }
091
092    LOG.info("Initializing quota support");
093    namespaceLocks = new NamedLock<>();
094    tableLocks = new NamedLock<>();
095    userLocks = new NamedLock<>();
096    regionSizes = new ConcurrentHashMap<>();
097
098    namespaceQuotaManager = new NamespaceAuditor(masterServices);
099    namespaceQuotaManager.start();
100    initialized = true;
101  }
102
103  public void stop() {
104  }
105
106  public boolean isQuotaInitialized() {
107    return initialized && namespaceQuotaManager.isInitialized();
108  }
109
110  /* ==========================================================================
111   *  Admin operations to manage the quota table
112   */
113  public SetQuotaResponse setQuota(final SetQuotaRequest req)
114      throws IOException, InterruptedException {
115    checkQuotaSupport();
116
117    if (req.hasUserName()) {
118      userLocks.lock(req.getUserName());
119      try {
120        if (req.hasTableName()) {
121          setUserQuota(req.getUserName(), ProtobufUtil.toTableName(req.getTableName()), req);
122        } else if (req.hasNamespace()) {
123          setUserQuota(req.getUserName(), req.getNamespace(), req);
124        } else {
125          setUserQuota(req.getUserName(), req);
126        }
127      } finally {
128        userLocks.unlock(req.getUserName());
129      }
130    } else if (req.hasTableName()) {
131      TableName table = ProtobufUtil.toTableName(req.getTableName());
132      tableLocks.lock(table);
133      try {
134        setTableQuota(table, req);
135      } finally {
136        tableLocks.unlock(table);
137      }
138    } else if (req.hasNamespace()) {
139      namespaceLocks.lock(req.getNamespace());
140      try {
141        setNamespaceQuota(req.getNamespace(), req);
142      } finally {
143        namespaceLocks.unlock(req.getNamespace());
144      }
145    } else {
146      throw new DoNotRetryIOException(
147        new UnsupportedOperationException("a user, a table or a namespace must be specified"));
148    }
149    return SetQuotaResponse.newBuilder().build();
150  }
151
152  public void setUserQuota(final String userName, final SetQuotaRequest req)
153      throws IOException, InterruptedException {
154    setQuota(req, new SetQuotaOperations() {
155      @Override
156      public GlobalQuotaSettingsImpl fetch() throws IOException {
157        return new GlobalQuotaSettingsImpl(req.getUserName(), null, null, QuotaUtil.getUserQuota(
158            masterServices.getConnection(), userName));
159      }
160      @Override
161      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
162        QuotaUtil.addUserQuota(masterServices.getConnection(), userName, quotaPojo.toQuotas());
163      }
164      @Override
165      public void delete() throws IOException {
166        QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName);
167      }
168      @Override
169      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
170        masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, quotaPojo);
171      }
172      @Override
173      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
174        masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, quotaPojo);
175      }
176    });
177  }
178
179  public void setUserQuota(final String userName, final TableName table,
180      final SetQuotaRequest req) throws IOException, InterruptedException {
181    setQuota(req, new SetQuotaOperations() {
182      @Override
183      public GlobalQuotaSettingsImpl fetch() throws IOException {
184        return new GlobalQuotaSettingsImpl(userName, table, null, QuotaUtil.getUserQuota(
185            masterServices.getConnection(), userName, table));
186      }
187      @Override
188      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
189        QuotaUtil.addUserQuota(masterServices.getConnection(), userName, table,
190            quotaPojo.toQuotas());
191      }
192      @Override
193      public void delete() throws IOException {
194        QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, table);
195      }
196      @Override
197      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
198        masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, table, quotaPojo);
199      }
200      @Override
201      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
202        masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, table, quotaPojo);
203      }
204    });
205  }
206
207  public void setUserQuota(final String userName, final String namespace,
208      final SetQuotaRequest req) throws IOException, InterruptedException {
209    setQuota(req, new SetQuotaOperations() {
210      @Override
211      public GlobalQuotaSettingsImpl fetch() throws IOException {
212        return new GlobalQuotaSettingsImpl(userName, null, namespace, QuotaUtil.getUserQuota(
213            masterServices.getConnection(), userName, namespace));
214      }
215      @Override
216      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
217        QuotaUtil.addUserQuota(masterServices.getConnection(), userName, namespace,
218            quotaPojo.toQuotas());
219      }
220      @Override
221      public void delete() throws IOException {
222        QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, namespace);
223      }
224      @Override
225      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
226        masterServices.getMasterCoprocessorHost().preSetUserQuota(
227            userName, namespace, quotaPojo);
228      }
229      @Override
230      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
231        masterServices.getMasterCoprocessorHost().postSetUserQuota(
232            userName, namespace, quotaPojo);
233      }
234    });
235  }
236
237  public void setTableQuota(final TableName table, final SetQuotaRequest req)
238      throws IOException, InterruptedException {
239    setQuota(req, new SetQuotaOperations() {
240      @Override
241      public GlobalQuotaSettingsImpl fetch() throws IOException {
242        return new GlobalQuotaSettingsImpl(null, table, null, QuotaUtil.getTableQuota(
243            masterServices.getConnection(), table));
244      }
245      @Override
246      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
247        QuotaUtil.addTableQuota(masterServices.getConnection(), table, quotaPojo.toQuotas());
248      }
249      @Override
250      public void delete() throws IOException {
251        SpaceQuotaSnapshot currSnapshotOfTable =
252            QuotaTableUtil.getCurrentSnapshotFromQuotaTable(masterServices.getConnection(), table);
253        QuotaUtil.deleteTableQuota(masterServices.getConnection(), table);
254        if (currSnapshotOfTable != null) {
255          SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus();
256          if (SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy()
257              && quotaStatus.isInViolation()) {
258            QuotaUtil.enableTableIfNotEnabled(masterServices.getConnection(), table);
259          }
260        }
261      }
262      @Override
263      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
264        masterServices.getMasterCoprocessorHost().preSetTableQuota(table, quotaPojo);
265      }
266      @Override
267      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
268        masterServices.getMasterCoprocessorHost().postSetTableQuota(table, quotaPojo);
269      }
270    });
271  }
272
273  public void setNamespaceQuota(final String namespace, final SetQuotaRequest req)
274      throws IOException, InterruptedException {
275    setQuota(req, new SetQuotaOperations() {
276      @Override
277      public GlobalQuotaSettingsImpl fetch() throws IOException {
278        return new GlobalQuotaSettingsImpl(null, null, namespace, QuotaUtil.getNamespaceQuota(
279                masterServices.getConnection(), namespace));
280      }
281      @Override
282      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
283        QuotaUtil.addNamespaceQuota(masterServices.getConnection(), namespace,
284            ((GlobalQuotaSettingsImpl) quotaPojo).toQuotas());
285      }
286      @Override
287      public void delete() throws IOException {
288        QuotaUtil.deleteNamespaceQuota(masterServices.getConnection(), namespace);
289      }
290      @Override
291      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
292        masterServices.getMasterCoprocessorHost().preSetNamespaceQuota(namespace, quotaPojo);
293      }
294      @Override
295      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
296        masterServices.getMasterCoprocessorHost().postSetNamespaceQuota(namespace, quotaPojo);
297      }
298    });
299  }
300
301  public void setNamespaceQuota(NamespaceDescriptor desc) throws IOException {
302    if (initialized) {
303      this.namespaceQuotaManager.addNamespace(desc);
304    }
305  }
306
307  public void removeNamespaceQuota(String namespace) throws IOException {
308    if (initialized) {
309      this.namespaceQuotaManager.deleteNamespace(namespace);
310    }
311  }
312
313  private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps)
314      throws IOException, InterruptedException {
315    if (req.hasRemoveAll() && req.getRemoveAll() == true) {
316      quotaOps.preApply(null);
317      quotaOps.delete();
318      quotaOps.postApply(null);
319      return;
320    }
321
322    // Apply quota changes
323    GlobalQuotaSettingsImpl currentQuota = quotaOps.fetch();
324    if (LOG.isTraceEnabled()) {
325      LOG.trace(
326          "Current quota for request(" + TextFormat.shortDebugString(req)
327              + "): " + currentQuota);
328    }
329    // Call the appropriate "pre" CP hook with the current quota value (may be null)
330    quotaOps.preApply(currentQuota);
331    // Translate the protobuf request back into a POJO
332    QuotaSettings newQuota = QuotaSettings.buildFromProto(req);
333    if (LOG.isTraceEnabled()) {
334      LOG.trace("Deserialized quota from request: " + newQuota);
335    }
336
337    // Merge the current quota settings with the new quota settings the user provided.
338    //
339    // NB: while SetQuotaRequest technically allows for multi types of quotas to be set in one
340    // message, the Java API (in Admin/AsyncAdmin) does not. Assume there is only one type.
341    GlobalQuotaSettingsImpl mergedQuota = currentQuota.merge(newQuota);
342    if (LOG.isTraceEnabled()) {
343      LOG.trace("Computed merged quota from current quota and user request: " + mergedQuota);
344    }
345
346    // Submit new changes
347    if (mergedQuota == null) {
348      quotaOps.delete();
349    } else {
350      quotaOps.update(mergedQuota);
351    }
352    // Advertise the final result via the "post" CP hook
353    quotaOps.postApply(mergedQuota);
354  }
355
356  public void checkNamespaceTableAndRegionQuota(TableName tName, int regions) throws IOException {
357    if (initialized) {
358      namespaceQuotaManager.checkQuotaToCreateTable(tName, regions);
359    }
360  }
361
362  public void checkAndUpdateNamespaceRegionQuota(TableName tName, int regions) throws IOException {
363    if (initialized) {
364      namespaceQuotaManager.checkQuotaToUpdateRegion(tName, regions);
365    }
366  }
367
368  /**
369   * @return cached region count, or -1 if quota manager is disabled or table status not found
370  */
371  public int getRegionCountOfTable(TableName tName) throws IOException {
372    if (initialized) {
373      return namespaceQuotaManager.getRegionCountOfTable(tName);
374    }
375    return -1;
376  }
377
378  @Override
379  public void onRegionMerged(RegionInfo mergedRegion) throws IOException {
380    if (initialized) {
381      namespaceQuotaManager.updateQuotaForRegionMerge(mergedRegion);
382    }
383  }
384
385  @Override
386  public void onRegionSplit(RegionInfo hri) throws IOException {
387    if (initialized) {
388      namespaceQuotaManager.checkQuotaToSplitRegion(hri);
389    }
390  }
391
392  /**
393   * Remove table from namespace quota.
394   *
395   * @param tName - The table name to update quota usage.
396   * @throws IOException Signals that an I/O exception has occurred.
397   */
398  public void removeTableFromNamespaceQuota(TableName tName) throws IOException {
399    if (initialized) {
400      namespaceQuotaManager.removeFromNamespaceUsage(tName);
401    }
402  }
403
404  public NamespaceAuditor getNamespaceQuotaManager() {
405    return this.namespaceQuotaManager;
406  }
407
408  /**
409   * Encapsulates CRUD quota operations for some subject.
410   */
411  private static interface SetQuotaOperations {
412    /**
413     * Fetches the current quota settings for the subject.
414     */
415    GlobalQuotaSettingsImpl fetch() throws IOException;
416    /**
417     * Deletes the quota for the subject.
418     */
419    void delete() throws IOException;
420    /**
421     * Persist the given quota for the subject.
422     */
423    void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
424    /**
425     * Performs some action before {@link #update(GlobalQuotaSettingsImpl)} with the current
426     * quota for the subject.
427     */
428    void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
429    /**
430     * Performs some action after {@link #update(GlobalQuotaSettingsImpl)} with the resulting
431     * quota from the request action for the subject.
432     */
433    void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
434  }
435
436  /* ==========================================================================
437   *  Helpers
438   */
439
440  private void checkQuotaSupport() throws IOException {
441    if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
442      throw new DoNotRetryIOException(
443        new UnsupportedOperationException("quota support disabled"));
444    }
445    if (!initialized) {
446      long maxWaitTime = masterServices.getConfiguration().getLong(
447        "hbase.master.wait.for.quota.manager.init", 30000); // default is 30 seconds.
448      long startTime = EnvironmentEdgeManager.currentTime();
449      do {
450        try {
451          Thread.sleep(100);
452        } catch (InterruptedException e) {
453          LOG.warn("Interrupted while waiting for Quota Manager to be initialized.");
454          break;
455        }
456      } while (!initialized && (EnvironmentEdgeManager.currentTime() - startTime) < maxWaitTime);
457      if (!initialized) {
458        throw new IOException("Quota manager is uninitialized, please retry later.");
459      }
460    }
461  }
462
463  private void createQuotaTable() throws IOException {
464    masterServices.createSystemTable(QuotaUtil.QUOTA_TABLE_DESC);
465  }
466
467  private static class NamedLock<T> {
468    private final HashSet<T> locks = new HashSet<>();
469
470    public void lock(final T name) throws InterruptedException {
471      synchronized (locks) {
472        while (locks.contains(name)) {
473          locks.wait();
474        }
475        locks.add(name);
476      }
477    }
478
479    public void unlock(final T name) {
480      synchronized (locks) {
481        locks.remove(name);
482        locks.notifyAll();
483      }
484    }
485  }
486
487  @Override
488  public void onRegionSplitReverted(RegionInfo hri) throws IOException {
489    if (initialized) {
490      this.namespaceQuotaManager.removeRegionFromNamespaceUsage(hri);
491    }
492  }
493
494  /**
495   * Holds the size of a region at the given time, millis since the epoch.
496   */
497  private static class SizeSnapshotWithTimestamp {
498    private final long size;
499    private final long time;
500
501    public SizeSnapshotWithTimestamp(long size, long time) {
502      this.size = size;
503      this.time = time;
504    }
505
506    public long getSize() {
507      return size;
508    }
509
510    public long getTime() {
511      return time;
512    }
513
514    @Override
515    public boolean equals(Object o) {
516      if (o instanceof SizeSnapshotWithTimestamp) {
517        SizeSnapshotWithTimestamp other = (SizeSnapshotWithTimestamp) o;
518        return size == other.size && time == other.time;
519      }
520      return false;
521    }
522
523    @Override
524    public int hashCode() {
525      HashCodeBuilder hcb = new HashCodeBuilder();
526      return hcb.append(size).append(time).toHashCode();
527    }
528
529    @Override
530    public String toString() {
531      StringBuilder sb = new StringBuilder(32);
532      sb.append("SizeSnapshotWithTimestamp={size=").append(size).append("B, ");
533      sb.append("time=").append(time).append("}");
534      return sb.toString();
535    }
536  }
537
538  @VisibleForTesting
539  void initializeRegionSizes() {
540    assert regionSizes == null;
541    this.regionSizes = new ConcurrentHashMap<>();
542  }
543
544  public void addRegionSize(RegionInfo hri, long size, long time) {
545    if (regionSizes == null) {
546      return;
547    }
548    regionSizes.put(hri, new SizeSnapshotWithTimestamp(size, time));
549  }
550
551  public Map<RegionInfo, Long> snapshotRegionSizes() {
552    if (regionSizes == null) {
553      return EMPTY_MAP;
554    }
555
556    Map<RegionInfo, Long> copy = new HashMap<>();
557    for (Entry<RegionInfo, SizeSnapshotWithTimestamp> entry : regionSizes.entrySet()) {
558      copy.put(entry.getKey(), entry.getValue().getSize());
559    }
560    return copy;
561  }
562
563  int pruneEntriesOlderThan(long timeToPruneBefore) {
564    if (regionSizes == null) {
565      return 0;
566    }
567    int numEntriesRemoved = 0;
568    Iterator<Entry<RegionInfo,SizeSnapshotWithTimestamp>> iterator =
569        regionSizes.entrySet().iterator();
570    while (iterator.hasNext()) {
571      long currentEntryTime = iterator.next().getValue().getTime();
572      if (currentEntryTime < timeToPruneBefore) {
573        iterator.remove();
574        numEntriesRemoved++;
575      }
576    }
577    return numEntriesRemoved;
578  }
579}
580