001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.quotas;
020
021import java.io.IOException;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.Optional;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031
032import org.apache.commons.lang3.builder.HashCodeBuilder;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.hbase.DoNotRetryIOException;
036import org.apache.hadoop.hbase.MetaTableAccessor;
037import org.apache.hadoop.hbase.NamespaceDescriptor;
038import org.apache.hadoop.hbase.RegionStateListener;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.RegionInfo;
042import org.apache.hadoop.hbase.master.MasterServices;
043import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
044import org.apache.hadoop.hbase.master.procedure.SwitchRpcThrottleProcedure;
045import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
046import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.apache.yetus.audience.InterfaceStability;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
054import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
055import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
056import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
057
058import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize;
069
070/**
071 * Master Quota Manager.
072 * It is responsible for initialize the quota table on the first-run and
073 * provide the admin operations to interact with the quota table.
074 *
075 * TODO: FUTURE: The master will be responsible to notify each RS of quota changes
076 * and it will do the "quota aggregation" when the QuotaScope is CLUSTER.
077 */
078@InterfaceAudience.Private
079@InterfaceStability.Evolving
080public class MasterQuotaManager implements RegionStateListener {
081  private static final Logger LOG = LoggerFactory.getLogger(MasterQuotaManager.class);
082  private static final Map<RegionInfo, Long> EMPTY_MAP = Collections.unmodifiableMap(
083      new HashMap<>());
084
085  private final MasterServices masterServices;
086  private NamedLock<String> namespaceLocks;
087  private NamedLock<TableName> tableLocks;
088  private NamedLock<String> userLocks;
089  private NamedLock<String> regionServerLocks;
090  private boolean initialized = false;
091  private NamespaceAuditor namespaceQuotaManager;
092  private ConcurrentHashMap<RegionInfo, SizeSnapshotWithTimestamp> regionSizes;
093  // Storage for quota rpc throttle
094  private RpcThrottleStorage rpcThrottleStorage;
095
096  public MasterQuotaManager(final MasterServices masterServices) {
097    this.masterServices = masterServices;
098  }
099
100  public void start() throws IOException {
101    // If the user doesn't want the quota support skip all the initializations.
102    if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
103      LOG.info("Quota support disabled");
104      return;
105    }
106
107    // Create the quota table if missing
108    if (!MetaTableAccessor.tableExists(masterServices.getConnection(),
109          QuotaUtil.QUOTA_TABLE_NAME)) {
110      LOG.info("Quota table not found. Creating...");
111      createQuotaTable();
112    }
113
114    LOG.info("Initializing quota support");
115    namespaceLocks = new NamedLock<>();
116    tableLocks = new NamedLock<>();
117    userLocks = new NamedLock<>();
118    regionServerLocks = new NamedLock<>();
119    regionSizes = new ConcurrentHashMap<>();
120
121    namespaceQuotaManager = new NamespaceAuditor(masterServices);
122    namespaceQuotaManager.start();
123    initialized = true;
124
125    rpcThrottleStorage =
126        new RpcThrottleStorage(masterServices.getZooKeeper(), masterServices.getConfiguration());
127  }
128
129  public void stop() {
130  }
131
132  public boolean isQuotaInitialized() {
133    return initialized && namespaceQuotaManager.isInitialized();
134  }
135
136  /* ==========================================================================
137   *  Admin operations to manage the quota table
138   */
139  public SetQuotaResponse setQuota(final SetQuotaRequest req)
140      throws IOException, InterruptedException {
141    checkQuotaSupport();
142
143    if (req.hasUserName()) {
144      userLocks.lock(req.getUserName());
145      try {
146        if (req.hasTableName()) {
147          setUserQuota(req.getUserName(), ProtobufUtil.toTableName(req.getTableName()), req);
148        } else if (req.hasNamespace()) {
149          setUserQuota(req.getUserName(), req.getNamespace(), req);
150        } else {
151          setUserQuota(req.getUserName(), req);
152        }
153      } finally {
154        userLocks.unlock(req.getUserName());
155      }
156    } else if (req.hasTableName()) {
157      TableName table = ProtobufUtil.toTableName(req.getTableName());
158      tableLocks.lock(table);
159      try {
160        setTableQuota(table, req);
161      } finally {
162        tableLocks.unlock(table);
163      }
164    } else if (req.hasNamespace()) {
165      namespaceLocks.lock(req.getNamespace());
166      try {
167        setNamespaceQuota(req.getNamespace(), req);
168      } finally {
169        namespaceLocks.unlock(req.getNamespace());
170      }
171    } else if (req.hasRegionServer()) {
172      regionServerLocks.lock(req.getRegionServer());
173      try {
174        setRegionServerQuota(req.getRegionServer(), req);
175      } finally {
176        regionServerLocks.unlock(req.getRegionServer());
177      }
178    } else {
179      throw new DoNotRetryIOException(new UnsupportedOperationException(
180          "a user, a table, a namespace or region server must be specified"));
181    }
182    return SetQuotaResponse.newBuilder().build();
183  }
184
185  public void setUserQuota(final String userName, final SetQuotaRequest req)
186      throws IOException, InterruptedException {
187    setQuota(req, new SetQuotaOperations() {
188      @Override
189      public GlobalQuotaSettingsImpl fetch() throws IOException {
190        return new GlobalQuotaSettingsImpl(req.getUserName(), null, null, null,
191            QuotaUtil.getUserQuota(masterServices.getConnection(), userName));
192      }
193      @Override
194      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
195        QuotaUtil.addUserQuota(masterServices.getConnection(), userName, quotaPojo.toQuotas());
196      }
197      @Override
198      public void delete() throws IOException {
199        QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName);
200      }
201      @Override
202      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
203        masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, quotaPojo);
204      }
205      @Override
206      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
207        masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, quotaPojo);
208      }
209    });
210  }
211
212  public void setUserQuota(final String userName, final TableName table,
213      final SetQuotaRequest req) throws IOException, InterruptedException {
214    setQuota(req, new SetQuotaOperations() {
215      @Override
216      public GlobalQuotaSettingsImpl fetch() throws IOException {
217        return new GlobalQuotaSettingsImpl(userName, table, null, null,
218            QuotaUtil.getUserQuota(masterServices.getConnection(), userName, table));
219      }
220      @Override
221      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
222        QuotaUtil.addUserQuota(masterServices.getConnection(), userName, table,
223            quotaPojo.toQuotas());
224      }
225      @Override
226      public void delete() throws IOException {
227        QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, table);
228      }
229      @Override
230      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
231        masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, table, quotaPojo);
232      }
233      @Override
234      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
235        masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, table, quotaPojo);
236      }
237    });
238  }
239
240  public void setUserQuota(final String userName, final String namespace,
241      final SetQuotaRequest req) throws IOException, InterruptedException {
242    setQuota(req, new SetQuotaOperations() {
243      @Override
244      public GlobalQuotaSettingsImpl fetch() throws IOException {
245        return new GlobalQuotaSettingsImpl(userName, null, namespace, null,
246            QuotaUtil.getUserQuota(masterServices.getConnection(), userName, namespace));
247      }
248      @Override
249      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
250        QuotaUtil.addUserQuota(masterServices.getConnection(), userName, namespace,
251            quotaPojo.toQuotas());
252      }
253      @Override
254      public void delete() throws IOException {
255        QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, namespace);
256      }
257      @Override
258      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
259        masterServices.getMasterCoprocessorHost().preSetUserQuota(
260            userName, namespace, quotaPojo);
261      }
262      @Override
263      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
264        masterServices.getMasterCoprocessorHost().postSetUserQuota(
265            userName, namespace, quotaPojo);
266      }
267    });
268  }
269
270  public void setTableQuota(final TableName table, final SetQuotaRequest req)
271      throws IOException, InterruptedException {
272    setQuota(req, new SetQuotaOperations() {
273      @Override
274      public GlobalQuotaSettingsImpl fetch() throws IOException {
275        return new GlobalQuotaSettingsImpl(null, table, null, null,
276            QuotaUtil.getTableQuota(masterServices.getConnection(), table));
277      }
278      @Override
279      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
280        QuotaUtil.addTableQuota(masterServices.getConnection(), table, quotaPojo.toQuotas());
281      }
282      @Override
283      public void delete() throws IOException {
284        SpaceQuotaSnapshot currSnapshotOfTable =
285            QuotaTableUtil.getCurrentSnapshotFromQuotaTable(masterServices.getConnection(), table);
286        QuotaUtil.deleteTableQuota(masterServices.getConnection(), table);
287        if (currSnapshotOfTable != null) {
288          SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus();
289          if (SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy().orElse(null)
290              && quotaStatus.isInViolation()) {
291            QuotaUtil.enableTableIfNotEnabled(masterServices.getConnection(), table);
292          }
293        }
294      }
295      @Override
296      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
297        masterServices.getMasterCoprocessorHost().preSetTableQuota(table, quotaPojo);
298      }
299      @Override
300      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
301        masterServices.getMasterCoprocessorHost().postSetTableQuota(table, quotaPojo);
302      }
303    });
304  }
305
306  public void setNamespaceQuota(final String namespace, final SetQuotaRequest req)
307      throws IOException, InterruptedException {
308    setQuota(req, new SetQuotaOperations() {
309      @Override
310      public GlobalQuotaSettingsImpl fetch() throws IOException {
311        return new GlobalQuotaSettingsImpl(null, null, namespace, null,
312            QuotaUtil.getNamespaceQuota(masterServices.getConnection(), namespace));
313      }
314      @Override
315      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
316        QuotaUtil.addNamespaceQuota(masterServices.getConnection(), namespace,
317          quotaPojo.toQuotas());
318      }
319      @Override
320      public void delete() throws IOException {
321        QuotaUtil.deleteNamespaceQuota(masterServices.getConnection(), namespace);
322      }
323      @Override
324      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
325        masterServices.getMasterCoprocessorHost().preSetNamespaceQuota(namespace, quotaPojo);
326      }
327      @Override
328      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
329        masterServices.getMasterCoprocessorHost().postSetNamespaceQuota(namespace, quotaPojo);
330      }
331    });
332  }
333
334  public void setRegionServerQuota(final String regionServer, final SetQuotaRequest req)
335      throws IOException, InterruptedException {
336    setQuota(req, new SetQuotaOperations() {
337      @Override
338      public GlobalQuotaSettingsImpl fetch() throws IOException {
339        return new GlobalQuotaSettingsImpl(null, null, null, regionServer,
340            QuotaUtil.getRegionServerQuota(masterServices.getConnection(), regionServer));
341      }
342
343      @Override
344      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
345        QuotaUtil.addRegionServerQuota(masterServices.getConnection(), regionServer,
346          quotaPojo.toQuotas());
347      }
348
349      @Override
350      public void delete() throws IOException {
351        QuotaUtil.deleteRegionServerQuota(masterServices.getConnection(), regionServer);
352      }
353
354      @Override
355      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
356        masterServices.getMasterCoprocessorHost().preSetRegionServerQuota(regionServer, quotaPojo);
357      }
358
359      @Override
360      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
361        masterServices.getMasterCoprocessorHost().postSetRegionServerQuota(regionServer, quotaPojo);
362      }
363    });
364  }
365
366  public void setNamespaceQuota(NamespaceDescriptor desc) throws IOException {
367    if (initialized) {
368      this.namespaceQuotaManager.addNamespace(desc);
369    }
370  }
371
372  public void removeNamespaceQuota(String namespace) throws IOException {
373    if (initialized) {
374      this.namespaceQuotaManager.deleteNamespace(namespace);
375    }
376  }
377
378  public SwitchRpcThrottleResponse switchRpcThrottle(SwitchRpcThrottleRequest request)
379      throws IOException {
380    boolean rpcThrottle = request.getRpcThrottleEnabled();
381    if (initialized) {
382      masterServices.getMasterCoprocessorHost().preSwitchRpcThrottle(rpcThrottle);
383      boolean oldRpcThrottle = rpcThrottleStorage.isRpcThrottleEnabled();
384      if (rpcThrottle != oldRpcThrottle) {
385        LOG.info("{} switch rpc throttle from {} to {}", masterServices.getClientIdAuditPrefix(),
386          oldRpcThrottle, rpcThrottle);
387        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
388        SwitchRpcThrottleProcedure procedure = new SwitchRpcThrottleProcedure(rpcThrottleStorage,
389            rpcThrottle, masterServices.getServerName(), latch);
390        masterServices.getMasterProcedureExecutor().submitProcedure(procedure);
391        latch.await();
392      } else {
393        LOG.warn("Skip switch rpc throttle to {} because it's the same with old value",
394          rpcThrottle);
395      }
396      SwitchRpcThrottleResponse response = SwitchRpcThrottleResponse.newBuilder()
397          .setPreviousRpcThrottleEnabled(oldRpcThrottle).build();
398      masterServices.getMasterCoprocessorHost().postSwitchRpcThrottle(oldRpcThrottle, rpcThrottle);
399      return response;
400    } else {
401      LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", rpcThrottle);
402      return SwitchRpcThrottleResponse.newBuilder().setPreviousRpcThrottleEnabled(false).build();
403    }
404  }
405
406  public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(IsRpcThrottleEnabledRequest request)
407      throws IOException {
408    if (initialized) {
409      masterServices.getMasterCoprocessorHost().preIsRpcThrottleEnabled();
410      boolean enabled = isRpcThrottleEnabled();
411      IsRpcThrottleEnabledResponse response =
412          IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(enabled).build();
413      masterServices.getMasterCoprocessorHost().postIsRpcThrottleEnabled(enabled);
414      return response;
415    } else {
416      LOG.warn("Skip get rpc throttle because rpc quota is disabled");
417      return IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(false).build();
418    }
419  }
420
421  public boolean isRpcThrottleEnabled() throws IOException {
422    return initialized ? rpcThrottleStorage.isRpcThrottleEnabled() : false;
423  }
424
425  public SwitchExceedThrottleQuotaResponse
426      switchExceedThrottleQuota(SwitchExceedThrottleQuotaRequest request) throws IOException {
427    boolean enabled = request.getExceedThrottleQuotaEnabled();
428    if (initialized) {
429      masterServices.getMasterCoprocessorHost().preSwitchExceedThrottleQuota(enabled);
430      boolean previousEnabled =
431          QuotaUtil.isExceedThrottleQuotaEnabled(masterServices.getConnection());
432      if (previousEnabled == enabled) {
433        LOG.warn("Skip switch exceed throttle quota to {} because it's the same with old value",
434          enabled);
435      } else {
436        QuotaUtil.switchExceedThrottleQuota(masterServices.getConnection(), enabled);
437        LOG.info("{} switch exceed throttle quota from {} to {}",
438          masterServices.getClientIdAuditPrefix(), previousEnabled, enabled);
439      }
440      SwitchExceedThrottleQuotaResponse response = SwitchExceedThrottleQuotaResponse.newBuilder()
441          .setPreviousExceedThrottleQuotaEnabled(previousEnabled).build();
442      masterServices.getMasterCoprocessorHost().postSwitchExceedThrottleQuota(previousEnabled,
443        enabled);
444      return response;
445    } else {
446      LOG.warn("Skip switch exceed throttle quota to {} because quota is disabled", enabled);
447      return SwitchExceedThrottleQuotaResponse.newBuilder()
448          .setPreviousExceedThrottleQuotaEnabled(false).build();
449    }
450  }
451
452  public boolean isExceedThrottleQuotaEnabled() throws IOException {
453    return initialized ? QuotaUtil.isExceedThrottleQuotaEnabled(masterServices.getConnection())
454        : false;
455  }
456
457  private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps)
458      throws IOException, InterruptedException {
459    if (req.hasRemoveAll() && req.getRemoveAll() == true) {
460      quotaOps.preApply(null);
461      quotaOps.delete();
462      quotaOps.postApply(null);
463      return;
464    }
465
466    // Apply quota changes
467    GlobalQuotaSettingsImpl currentQuota = quotaOps.fetch();
468    if (LOG.isTraceEnabled()) {
469      LOG.trace(
470          "Current quota for request(" + TextFormat.shortDebugString(req)
471              + "): " + currentQuota);
472    }
473    // Call the appropriate "pre" CP hook with the current quota value (may be null)
474    quotaOps.preApply(currentQuota);
475    // Translate the protobuf request back into a POJO
476    QuotaSettings newQuota = QuotaSettings.buildFromProto(req);
477    if (LOG.isTraceEnabled()) {
478      LOG.trace("Deserialized quota from request: " + newQuota);
479    }
480
481    // Merge the current quota settings with the new quota settings the user provided.
482    //
483    // NB: while SetQuotaRequest technically allows for multi types of quotas to be set in one
484    // message, the Java API (in Admin/AsyncAdmin) does not. Assume there is only one type.
485    GlobalQuotaSettingsImpl mergedQuota = currentQuota.merge(newQuota);
486    if (LOG.isTraceEnabled()) {
487      LOG.trace("Computed merged quota from current quota and user request: " + mergedQuota);
488    }
489
490    // Submit new changes
491    if (mergedQuota == null) {
492      quotaOps.delete();
493    } else {
494      quotaOps.update(mergedQuota);
495    }
496    // Advertise the final result via the "post" CP hook
497    quotaOps.postApply(mergedQuota);
498  }
499
500  public void checkNamespaceTableAndRegionQuota(TableName tName, int regions) throws IOException {
501    if (initialized) {
502      namespaceQuotaManager.checkQuotaToCreateTable(tName, regions);
503    }
504  }
505
506  public void checkAndUpdateNamespaceRegionQuota(TableName tName, int regions) throws IOException {
507    if (initialized) {
508      namespaceQuotaManager.checkQuotaToUpdateRegion(tName, regions);
509    }
510  }
511
512  /**
513   * @return cached region count, or -1 if quota manager is disabled or table status not found
514  */
515  public int getRegionCountOfTable(TableName tName) throws IOException {
516    if (initialized) {
517      return namespaceQuotaManager.getRegionCountOfTable(tName);
518    }
519    return -1;
520  }
521
522  @Override
523  public void onRegionMerged(RegionInfo mergedRegion) throws IOException {
524    if (initialized) {
525      namespaceQuotaManager.updateQuotaForRegionMerge(mergedRegion);
526    }
527  }
528
529  @Override
530  public void onRegionSplit(RegionInfo hri) throws IOException {
531    if (initialized) {
532      namespaceQuotaManager.checkQuotaToSplitRegion(hri);
533    }
534  }
535
536  /**
537   * Remove table from namespace quota.
538   *
539   * @param tName - The table name to update quota usage.
540   * @throws IOException Signals that an I/O exception has occurred.
541   */
542  public void removeTableFromNamespaceQuota(TableName tName) throws IOException {
543    if (initialized) {
544      namespaceQuotaManager.removeFromNamespaceUsage(tName);
545    }
546  }
547
548  public NamespaceAuditor getNamespaceQuotaManager() {
549    return this.namespaceQuotaManager;
550  }
551
552  /**
553   * Encapsulates CRUD quota operations for some subject.
554   */
555  private static interface SetQuotaOperations {
556    /**
557     * Fetches the current quota settings for the subject.
558     */
559    GlobalQuotaSettingsImpl fetch() throws IOException;
560    /**
561     * Deletes the quota for the subject.
562     */
563    void delete() throws IOException;
564    /**
565     * Persist the given quota for the subject.
566     */
567    void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
568    /**
569     * Performs some action before {@link #update(GlobalQuotaSettingsImpl)} with the current
570     * quota for the subject.
571     */
572    void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
573    /**
574     * Performs some action after {@link #update(GlobalQuotaSettingsImpl)} with the resulting
575     * quota from the request action for the subject.
576     */
577    void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
578  }
579
580  /* ==========================================================================
581   *  Helpers
582   */
583
584  private void checkQuotaSupport() throws IOException {
585    if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
586      throw new DoNotRetryIOException(
587        new UnsupportedOperationException("quota support disabled"));
588    }
589    if (!initialized) {
590      long maxWaitTime = masterServices.getConfiguration().getLong(
591        "hbase.master.wait.for.quota.manager.init", 30000); // default is 30 seconds.
592      long startTime = EnvironmentEdgeManager.currentTime();
593      do {
594        try {
595          Thread.sleep(100);
596        } catch (InterruptedException e) {
597          LOG.warn("Interrupted while waiting for Quota Manager to be initialized.");
598          break;
599        }
600      } while (!initialized && (EnvironmentEdgeManager.currentTime() - startTime) < maxWaitTime);
601      if (!initialized) {
602        throw new IOException("Quota manager is uninitialized, please retry later.");
603      }
604    }
605  }
606
607  private void createQuotaTable() throws IOException {
608    masterServices.createSystemTable(QuotaUtil.QUOTA_TABLE_DESC);
609  }
610
611  private static class NamedLock<T> {
612    private final HashSet<T> locks = new HashSet<>();
613
614    public void lock(final T name) throws InterruptedException {
615      synchronized (locks) {
616        while (locks.contains(name)) {
617          locks.wait();
618        }
619        locks.add(name);
620      }
621    }
622
623    public void unlock(final T name) {
624      synchronized (locks) {
625        locks.remove(name);
626        locks.notifyAll();
627      }
628    }
629  }
630
631  @Override
632  public void onRegionSplitReverted(RegionInfo hri) throws IOException {
633    if (initialized) {
634      this.namespaceQuotaManager.removeRegionFromNamespaceUsage(hri);
635    }
636  }
637
638  /**
639   * Holds the size of a region at the given time, millis since the epoch.
640   */
641  private static class SizeSnapshotWithTimestamp {
642    private final long size;
643    private final long time;
644
645    public SizeSnapshotWithTimestamp(long size, long time) {
646      this.size = size;
647      this.time = time;
648    }
649
650    public long getSize() {
651      return size;
652    }
653
654    public long getTime() {
655      return time;
656    }
657
658    @Override
659    public boolean equals(Object o) {
660      if (o instanceof SizeSnapshotWithTimestamp) {
661        SizeSnapshotWithTimestamp other = (SizeSnapshotWithTimestamp) o;
662        return size == other.size && time == other.time;
663      }
664      return false;
665    }
666
667    @Override
668    public int hashCode() {
669      HashCodeBuilder hcb = new HashCodeBuilder();
670      return hcb.append(size).append(time).toHashCode();
671    }
672
673    @Override
674    public String toString() {
675      StringBuilder sb = new StringBuilder(32);
676      sb.append("SizeSnapshotWithTimestamp={size=").append(size).append("B, ");
677      sb.append("time=").append(time).append("}");
678      return sb.toString();
679    }
680  }
681
682  @VisibleForTesting
683  void initializeRegionSizes() {
684    assert regionSizes == null;
685    this.regionSizes = new ConcurrentHashMap<>();
686  }
687
688  public void addRegionSize(RegionInfo hri, long size, long time) {
689    if (regionSizes == null) {
690      return;
691    }
692    regionSizes.put(hri, new SizeSnapshotWithTimestamp(size, time));
693  }
694
695  public Map<RegionInfo, Long> snapshotRegionSizes() {
696    if (regionSizes == null) {
697      return EMPTY_MAP;
698    }
699
700    Map<RegionInfo, Long> copy = new HashMap<>();
701    for (Entry<RegionInfo, SizeSnapshotWithTimestamp> entry : regionSizes.entrySet()) {
702      copy.put(entry.getKey(), entry.getValue().getSize());
703    }
704    return copy;
705  }
706
707  int pruneEntriesOlderThan(long timeToPruneBefore, QuotaObserverChore quotaObserverChore) {
708    if (regionSizes == null) {
709      return 0;
710    }
711    int numEntriesRemoved = 0;
712    Iterator<Entry<RegionInfo, SizeSnapshotWithTimestamp>> iterator =
713        regionSizes.entrySet().iterator();
714    while (iterator.hasNext()) {
715      RegionInfo regionInfo = iterator.next().getKey();
716      long currentEntryTime = regionSizes.get(regionInfo).getTime();
717      // do not prune the entries if table is in violation and
718      // violation policy is disable to avoid cycle of enable/disable.
719      // Please refer HBASE-22012 for more details.
720      // prune entries older than time.
721      if (currentEntryTime < timeToPruneBefore && !isInViolationAndPolicyDisable(
722          regionInfo.getTable(), quotaObserverChore)) {
723        iterator.remove();
724        numEntriesRemoved++;
725      }
726    }
727    return numEntriesRemoved;
728  }
729
730  /**
731   * Method to check if a table is in violation and policy set on table is DISABLE.
732   *
733   * @param tableName          tableName to check.
734   * @param quotaObserverChore QuotaObserverChore instance
735   * @return returns true if table is in violation and policy is disable else false.
736   */
737  private boolean isInViolationAndPolicyDisable(TableName tableName,
738      QuotaObserverChore quotaObserverChore) {
739    boolean isInViolationAtTable = false;
740    boolean isInViolationAtNamespace = false;
741    SpaceViolationPolicy tablePolicy = null;
742    SpaceViolationPolicy namespacePolicy = null;
743    // Get Current Snapshot for the given table
744    SpaceQuotaSnapshot tableQuotaSnapshot = quotaObserverChore.getTableQuotaSnapshot(tableName);
745    SpaceQuotaSnapshot namespaceQuotaSnapshot =
746        quotaObserverChore.getNamespaceQuotaSnapshot(tableName.getNamespaceAsString());
747    if (tableQuotaSnapshot != null) {
748      // check if table in violation
749      isInViolationAtTable = tableQuotaSnapshot.getQuotaStatus().isInViolation();
750      Optional<SpaceViolationPolicy> policy = tableQuotaSnapshot.getQuotaStatus().getPolicy();
751      if (policy.isPresent()) {
752        tablePolicy = policy.get();
753      }
754    }
755    if (namespaceQuotaSnapshot != null) {
756      // check namespace in violation
757      isInViolationAtNamespace = namespaceQuotaSnapshot.getQuotaStatus().isInViolation();
758      Optional<SpaceViolationPolicy> policy = namespaceQuotaSnapshot.getQuotaStatus().getPolicy();
759      if (policy.isPresent()) {
760        namespacePolicy = policy.get();
761      }
762    }
763    return (tablePolicy == SpaceViolationPolicy.DISABLE && isInViolationAtTable) || (
764        namespacePolicy == SpaceViolationPolicy.DISABLE && isInViolationAtNamespace);
765  }
766
767  public void processFileArchivals(FileArchiveNotificationRequest request, Connection conn,
768      Configuration conf, FileSystem fs) throws IOException {
769    final HashMultimap<TableName,Entry<String,Long>> archivedFilesByTable = HashMultimap.create();
770    // Group the archived files by table
771    for (FileWithSize fileWithSize : request.getArchivedFilesList()) {
772      TableName tn = ProtobufUtil.toTableName(fileWithSize.getTableName());
773      archivedFilesByTable.put(
774          tn, Maps.immutableEntry(fileWithSize.getName(), fileWithSize.getSize()));
775    }
776    if (LOG.isTraceEnabled()) {
777      LOG.trace("Grouped archived files by table: " + archivedFilesByTable);
778    }
779    // Report each set of files to the appropriate object
780    for (TableName tn : archivedFilesByTable.keySet()) {
781      final Set<Entry<String,Long>> filesWithSize = archivedFilesByTable.get(tn);
782      final FileArchiverNotifier notifier = FileArchiverNotifierFactoryImpl.getInstance().get(
783          conn, conf, fs, tn);
784      notifier.addArchivedFiles(filesWithSize);
785    }
786  }
787
788  /**
789   * Removes each region size entry where the RegionInfo references the provided TableName.
790   *
791   * @param tableName tableName.
792   */
793  public void removeRegionSizesForTable(TableName tableName) {
794    regionSizes.keySet().removeIf(regionInfo -> regionInfo.getTable().equals(tableName));
795  }
796}
797