001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.Iterator;
025import java.util.Map;
026import java.util.Map.Entry;
027import java.util.Optional;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import org.apache.commons.lang3.builder.HashCodeBuilder;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.NamespaceDescriptor;
035import org.apache.hadoop.hbase.RegionStateListener;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.master.MasterServices;
040import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
041import org.apache.hadoop.hbase.master.procedure.SwitchRpcThrottleProcedure;
042import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
043import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.apache.yetus.audience.InterfaceStability;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
051import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
052import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
053
054import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize;
065
066/**
067 * Master Quota Manager. It is responsible for initialize the quota table on the first-run and
068 * provide the admin operations to interact with the quota table. TODO: FUTURE: The master will be
069 * responsible to notify each RS of quota changes and it will do the "quota aggregation" when the
070 * QuotaScope is CLUSTER.
071 */
072@InterfaceAudience.Private
073@InterfaceStability.Evolving
074public class MasterQuotaManager implements RegionStateListener {
075  private static final Logger LOG = LoggerFactory.getLogger(MasterQuotaManager.class);
076  private static final Map<RegionInfo, Long> EMPTY_MAP =
077    Collections.unmodifiableMap(new HashMap<>());
078
079  private final MasterServices masterServices;
080  private NamedLock<String> namespaceLocks;
081  private NamedLock<TableName> tableLocks;
082  private NamedLock<String> userLocks;
083  private NamedLock<String> regionServerLocks;
084  private boolean initialized = false;
085  private NamespaceAuditor namespaceQuotaManager;
086  private ConcurrentHashMap<RegionInfo, SizeSnapshotWithTimestamp> regionSizes;
087  // Storage for quota rpc throttle
088  private RpcThrottleStorage rpcThrottleStorage;
089
090  public MasterQuotaManager(final MasterServices masterServices) {
091    this.masterServices = masterServices;
092  }
093
094  public void start() throws IOException {
095    // If the user doesn't want the quota support skip all the initializations.
096    if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
097      LOG.info("Quota support disabled");
098      return;
099    }
100
101    // Create the quota table if missing
102    if (!masterServices.getTableDescriptors().exists(QuotaUtil.QUOTA_TABLE_NAME)) {
103      LOG.info("Quota table not found. Creating...");
104      createQuotaTable();
105    }
106
107    LOG.info("Initializing quota support");
108    namespaceLocks = new NamedLock<>();
109    tableLocks = new NamedLock<>();
110    userLocks = new NamedLock<>();
111    regionServerLocks = new NamedLock<>();
112    regionSizes = new ConcurrentHashMap<>();
113
114    namespaceQuotaManager = new NamespaceAuditor(masterServices);
115    namespaceQuotaManager.start();
116    initialized = true;
117
118    rpcThrottleStorage =
119      new RpcThrottleStorage(masterServices.getZooKeeper(), masterServices.getConfiguration());
120  }
121
122  public void stop() {
123  }
124
125  public boolean isQuotaInitialized() {
126    return initialized && namespaceQuotaManager.isInitialized();
127  }
128
129  /*
130   * ========================================================================== Admin operations to
131   * manage the quota table
132   */
133  public SetQuotaResponse setQuota(final SetQuotaRequest req)
134    throws IOException, InterruptedException {
135    checkQuotaSupport();
136
137    if (req.hasUserName()) {
138      userLocks.lock(req.getUserName());
139      try {
140        if (req.hasTableName()) {
141          setUserQuota(req.getUserName(), ProtobufUtil.toTableName(req.getTableName()), req);
142        } else if (req.hasNamespace()) {
143          setUserQuota(req.getUserName(), req.getNamespace(), req);
144        } else {
145          setUserQuota(req.getUserName(), req);
146        }
147      } finally {
148        userLocks.unlock(req.getUserName());
149      }
150    } else if (req.hasTableName()) {
151      TableName table = ProtobufUtil.toTableName(req.getTableName());
152      tableLocks.lock(table);
153      try {
154        setTableQuota(table, req);
155      } finally {
156        tableLocks.unlock(table);
157      }
158    } else if (req.hasNamespace()) {
159      namespaceLocks.lock(req.getNamespace());
160      try {
161        setNamespaceQuota(req.getNamespace(), req);
162      } finally {
163        namespaceLocks.unlock(req.getNamespace());
164      }
165    } else if (req.hasRegionServer()) {
166      regionServerLocks.lock(req.getRegionServer());
167      try {
168        setRegionServerQuota(req.getRegionServer(), req);
169      } finally {
170        regionServerLocks.unlock(req.getRegionServer());
171      }
172    } else {
173      throw new DoNotRetryIOException(new UnsupportedOperationException(
174        "a user, a table, a namespace or region server must be specified"));
175    }
176    return SetQuotaResponse.newBuilder().build();
177  }
178
179  public void setUserQuota(final String userName, final SetQuotaRequest req)
180    throws IOException, InterruptedException {
181    setQuota(req, new SetQuotaOperations() {
182      @Override
183      public GlobalQuotaSettingsImpl fetch() throws IOException {
184        return new GlobalQuotaSettingsImpl(req.getUserName(), null, null, null,
185          QuotaUtil.getUserQuota(masterServices.getConnection(), userName));
186      }
187
188      @Override
189      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
190        QuotaUtil.addUserQuota(masterServices.getConnection(), userName, quotaPojo.toQuotas());
191      }
192
193      @Override
194      public void delete() throws IOException {
195        QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName);
196      }
197
198      @Override
199      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
200        masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, quotaPojo);
201      }
202
203      @Override
204      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
205        masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, quotaPojo);
206      }
207    });
208  }
209
210  public void setUserQuota(final String userName, final TableName table, final SetQuotaRequest req)
211    throws IOException, InterruptedException {
212    setQuota(req, new SetQuotaOperations() {
213      @Override
214      public GlobalQuotaSettingsImpl fetch() throws IOException {
215        return new GlobalQuotaSettingsImpl(userName, table, null, null,
216          QuotaUtil.getUserQuota(masterServices.getConnection(), userName, table));
217      }
218
219      @Override
220      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
221        QuotaUtil.addUserQuota(masterServices.getConnection(), userName, table,
222          quotaPojo.toQuotas());
223      }
224
225      @Override
226      public void delete() throws IOException {
227        QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, table);
228      }
229
230      @Override
231      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
232        masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, table, quotaPojo);
233      }
234
235      @Override
236      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
237        masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, table, quotaPojo);
238      }
239    });
240  }
241
242  public void setUserQuota(final String userName, final String namespace, final SetQuotaRequest req)
243    throws IOException, InterruptedException {
244    setQuota(req, new SetQuotaOperations() {
245      @Override
246      public GlobalQuotaSettingsImpl fetch() throws IOException {
247        return new GlobalQuotaSettingsImpl(userName, null, namespace, null,
248          QuotaUtil.getUserQuota(masterServices.getConnection(), userName, namespace));
249      }
250
251      @Override
252      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
253        QuotaUtil.addUserQuota(masterServices.getConnection(), userName, namespace,
254          quotaPojo.toQuotas());
255      }
256
257      @Override
258      public void delete() throws IOException {
259        QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, namespace);
260      }
261
262      @Override
263      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
264        masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, namespace, quotaPojo);
265      }
266
267      @Override
268      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
269        masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, namespace, quotaPojo);
270      }
271    });
272  }
273
274  public void setTableQuota(final TableName table, final SetQuotaRequest req)
275    throws IOException, InterruptedException {
276    setQuota(req, new SetQuotaOperations() {
277      @Override
278      public GlobalQuotaSettingsImpl fetch() throws IOException {
279        return new GlobalQuotaSettingsImpl(null, table, null, null,
280          QuotaUtil.getTableQuota(masterServices.getConnection(), table));
281      }
282
283      @Override
284      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
285        QuotaUtil.addTableQuota(masterServices.getConnection(), table, quotaPojo.toQuotas());
286      }
287
288      @Override
289      public void delete() throws IOException {
290        SpaceQuotaSnapshot currSnapshotOfTable =
291          QuotaTableUtil.getCurrentSnapshotFromQuotaTable(masterServices.getConnection(), table);
292        QuotaUtil.deleteTableQuota(masterServices.getConnection(), table);
293        if (currSnapshotOfTable != null) {
294          SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus();
295          if (
296            SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy().orElse(null)
297              && quotaStatus.isInViolation()
298          ) {
299            QuotaUtil.enableTableIfNotEnabled(masterServices.getConnection(), table);
300          }
301        }
302      }
303
304      @Override
305      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
306        masterServices.getMasterCoprocessorHost().preSetTableQuota(table, quotaPojo);
307      }
308
309      @Override
310      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
311        masterServices.getMasterCoprocessorHost().postSetTableQuota(table, quotaPojo);
312      }
313    });
314  }
315
316  public void setNamespaceQuota(final String namespace, final SetQuotaRequest req)
317    throws IOException, InterruptedException {
318    setQuota(req, new SetQuotaOperations() {
319      @Override
320      public GlobalQuotaSettingsImpl fetch() throws IOException {
321        return new GlobalQuotaSettingsImpl(null, null, namespace, null,
322          QuotaUtil.getNamespaceQuota(masterServices.getConnection(), namespace));
323      }
324
325      @Override
326      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
327        QuotaUtil.addNamespaceQuota(masterServices.getConnection(), namespace,
328          quotaPojo.toQuotas());
329      }
330
331      @Override
332      public void delete() throws IOException {
333        QuotaUtil.deleteNamespaceQuota(masterServices.getConnection(), namespace);
334      }
335
336      @Override
337      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
338        masterServices.getMasterCoprocessorHost().preSetNamespaceQuota(namespace, quotaPojo);
339      }
340
341      @Override
342      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
343        masterServices.getMasterCoprocessorHost().postSetNamespaceQuota(namespace, quotaPojo);
344      }
345    });
346  }
347
348  public void setRegionServerQuota(final String regionServer, final SetQuotaRequest req)
349    throws IOException, InterruptedException {
350    setQuota(req, new SetQuotaOperations() {
351      @Override
352      public GlobalQuotaSettingsImpl fetch() throws IOException {
353        return new GlobalQuotaSettingsImpl(null, null, null, regionServer,
354          QuotaUtil.getRegionServerQuota(masterServices.getConnection(), regionServer));
355      }
356
357      @Override
358      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
359        QuotaUtil.addRegionServerQuota(masterServices.getConnection(), regionServer,
360          quotaPojo.toQuotas());
361      }
362
363      @Override
364      public void delete() throws IOException {
365        QuotaUtil.deleteRegionServerQuota(masterServices.getConnection(), regionServer);
366      }
367
368      @Override
369      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
370        masterServices.getMasterCoprocessorHost().preSetRegionServerQuota(regionServer, quotaPojo);
371      }
372
373      @Override
374      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
375        masterServices.getMasterCoprocessorHost().postSetRegionServerQuota(regionServer, quotaPojo);
376      }
377    });
378  }
379
380  public void setNamespaceQuota(NamespaceDescriptor desc) throws IOException {
381    if (initialized) {
382      this.namespaceQuotaManager.addNamespace(desc);
383    }
384  }
385
386  public void removeNamespaceQuota(String namespace) throws IOException {
387    if (initialized) {
388      this.namespaceQuotaManager.deleteNamespace(namespace);
389    }
390  }
391
392  public SwitchRpcThrottleResponse switchRpcThrottle(SwitchRpcThrottleRequest request)
393    throws IOException {
394    boolean rpcThrottle = request.getRpcThrottleEnabled();
395    if (initialized) {
396      masterServices.getMasterCoprocessorHost().preSwitchRpcThrottle(rpcThrottle);
397      boolean oldRpcThrottle = rpcThrottleStorage.isRpcThrottleEnabled();
398      if (rpcThrottle != oldRpcThrottle) {
399        LOG.info("{} switch rpc throttle from {} to {}", masterServices.getClientIdAuditPrefix(),
400          oldRpcThrottle, rpcThrottle);
401        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
402        SwitchRpcThrottleProcedure procedure = new SwitchRpcThrottleProcedure(rpcThrottleStorage,
403          rpcThrottle, masterServices.getServerName(), latch);
404        masterServices.getMasterProcedureExecutor().submitProcedure(procedure);
405        latch.await();
406      } else {
407        LOG.warn("Skip switch rpc throttle to {} because it's the same with old value",
408          rpcThrottle);
409      }
410      SwitchRpcThrottleResponse response = SwitchRpcThrottleResponse.newBuilder()
411        .setPreviousRpcThrottleEnabled(oldRpcThrottle).build();
412      masterServices.getMasterCoprocessorHost().postSwitchRpcThrottle(oldRpcThrottle, rpcThrottle);
413      return response;
414    } else {
415      LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", rpcThrottle);
416      return SwitchRpcThrottleResponse.newBuilder().setPreviousRpcThrottleEnabled(false).build();
417    }
418  }
419
420  public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(IsRpcThrottleEnabledRequest request)
421    throws IOException {
422    if (initialized) {
423      masterServices.getMasterCoprocessorHost().preIsRpcThrottleEnabled();
424      boolean enabled = isRpcThrottleEnabled();
425      IsRpcThrottleEnabledResponse response =
426        IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(enabled).build();
427      masterServices.getMasterCoprocessorHost().postIsRpcThrottleEnabled(enabled);
428      return response;
429    } else {
430      LOG.warn("Skip get rpc throttle because rpc quota is disabled");
431      return IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(false).build();
432    }
433  }
434
435  public boolean isRpcThrottleEnabled() throws IOException {
436    return initialized ? rpcThrottleStorage.isRpcThrottleEnabled() : false;
437  }
438
439  public SwitchExceedThrottleQuotaResponse
440    switchExceedThrottleQuota(SwitchExceedThrottleQuotaRequest request) throws IOException {
441    boolean enabled = request.getExceedThrottleQuotaEnabled();
442    if (initialized) {
443      masterServices.getMasterCoprocessorHost().preSwitchExceedThrottleQuota(enabled);
444      boolean previousEnabled =
445        QuotaUtil.isExceedThrottleQuotaEnabled(masterServices.getConnection());
446      if (previousEnabled == enabled) {
447        LOG.warn("Skip switch exceed throttle quota to {} because it's the same with old value",
448          enabled);
449      } else {
450        QuotaUtil.switchExceedThrottleQuota(masterServices.getConnection(), enabled);
451        LOG.info("{} switch exceed throttle quota from {} to {}",
452          masterServices.getClientIdAuditPrefix(), previousEnabled, enabled);
453      }
454      SwitchExceedThrottleQuotaResponse response = SwitchExceedThrottleQuotaResponse.newBuilder()
455        .setPreviousExceedThrottleQuotaEnabled(previousEnabled).build();
456      masterServices.getMasterCoprocessorHost().postSwitchExceedThrottleQuota(previousEnabled,
457        enabled);
458      return response;
459    } else {
460      LOG.warn("Skip switch exceed throttle quota to {} because quota is disabled", enabled);
461      return SwitchExceedThrottleQuotaResponse.newBuilder()
462        .setPreviousExceedThrottleQuotaEnabled(false).build();
463    }
464  }
465
466  public boolean isExceedThrottleQuotaEnabled() throws IOException {
467    return initialized
468      ? QuotaUtil.isExceedThrottleQuotaEnabled(masterServices.getConnection())
469      : false;
470  }
471
472  private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps)
473    throws IOException, InterruptedException {
474    if (req.hasRemoveAll() && req.getRemoveAll() == true) {
475      quotaOps.preApply(null);
476      quotaOps.delete();
477      quotaOps.postApply(null);
478      return;
479    }
480
481    // Apply quota changes
482    GlobalQuotaSettingsImpl currentQuota = quotaOps.fetch();
483    if (LOG.isTraceEnabled()) {
484      LOG.trace(
485        "Current quota for request(" + TextFormat.shortDebugString(req) + "): " + currentQuota);
486    }
487    // Call the appropriate "pre" CP hook with the current quota value (may be null)
488    quotaOps.preApply(currentQuota);
489    // Translate the protobuf request back into a POJO
490    QuotaSettings newQuota = QuotaSettings.buildFromProto(req);
491    if (LOG.isTraceEnabled()) {
492      LOG.trace("Deserialized quota from request: " + newQuota);
493    }
494
495    // Merge the current quota settings with the new quota settings the user provided.
496    //
497    // NB: while SetQuotaRequest technically allows for multi types of quotas to be set in one
498    // message, the Java API (in Admin/AsyncAdmin) does not. Assume there is only one type.
499    GlobalQuotaSettingsImpl mergedQuota = currentQuota.merge(newQuota);
500    if (LOG.isTraceEnabled()) {
501      LOG.trace("Computed merged quota from current quota and user request: " + mergedQuota);
502    }
503
504    // Submit new changes
505    if (mergedQuota == null) {
506      quotaOps.delete();
507    } else {
508      quotaOps.update(mergedQuota);
509    }
510    // Advertise the final result via the "post" CP hook
511    quotaOps.postApply(mergedQuota);
512  }
513
514  public void checkNamespaceTableAndRegionQuota(TableName tName, int regions) throws IOException {
515    if (initialized) {
516      namespaceQuotaManager.checkQuotaToCreateTable(tName, regions);
517    }
518  }
519
520  public void checkAndUpdateNamespaceRegionQuota(TableName tName, int regions) throws IOException {
521    if (initialized) {
522      namespaceQuotaManager.checkQuotaToUpdateRegion(tName, regions);
523    }
524  }
525
526  /**
527   * @return cached region count, or -1 if quota manager is disabled or table status not found
528   */
529  public int getRegionCountOfTable(TableName tName) throws IOException {
530    if (initialized) {
531      return namespaceQuotaManager.getRegionCountOfTable(tName);
532    }
533    return -1;
534  }
535
536  @Override
537  public void onRegionMerged(RegionInfo mergedRegion) throws IOException {
538    if (initialized) {
539      namespaceQuotaManager.updateQuotaForRegionMerge(mergedRegion);
540    }
541  }
542
543  @Override
544  public void onRegionSplit(RegionInfo hri) throws IOException {
545    if (initialized) {
546      namespaceQuotaManager.checkQuotaToSplitRegion(hri);
547    }
548  }
549
550  /**
551   * Remove table from namespace quota.
552   * @param tName - The table name to update quota usage.
553   * @throws IOException Signals that an I/O exception has occurred.
554   */
555  public void removeTableFromNamespaceQuota(TableName tName) throws IOException {
556    if (initialized) {
557      namespaceQuotaManager.removeFromNamespaceUsage(tName);
558    }
559  }
560
561  public NamespaceAuditor getNamespaceQuotaManager() {
562    return this.namespaceQuotaManager;
563  }
564
565  /**
566   * Encapsulates CRUD quota operations for some subject.
567   */
568  private static interface SetQuotaOperations {
569    /**
570     * Fetches the current quota settings for the subject.
571     */
572    GlobalQuotaSettingsImpl fetch() throws IOException;
573
574    /**
575     * Deletes the quota for the subject.
576     */
577    void delete() throws IOException;
578
579    /**
580     * Persist the given quota for the subject.
581     */
582    void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
583
584    /**
585     * Performs some action before {@link #update(GlobalQuotaSettingsImpl)} with the current quota
586     * for the subject.
587     */
588    void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
589
590    /**
591     * Performs some action after {@link #update(GlobalQuotaSettingsImpl)} with the resulting quota
592     * from the request action for the subject.
593     */
594    void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
595  }
596
597  /*
598   * ========================================================================== Helpers
599   */
600
601  private void checkQuotaSupport() throws IOException {
602    if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
603      throw new DoNotRetryIOException(new UnsupportedOperationException("quota support disabled"));
604    }
605    if (!initialized) {
606      long maxWaitTime = masterServices.getConfiguration()
607        .getLong("hbase.master.wait.for.quota.manager.init", 30000); // default is 30 seconds.
608      long startTime = EnvironmentEdgeManager.currentTime();
609      do {
610        try {
611          Thread.sleep(100);
612        } catch (InterruptedException e) {
613          LOG.warn("Interrupted while waiting for Quota Manager to be initialized.");
614          break;
615        }
616      } while (!initialized && (EnvironmentEdgeManager.currentTime() - startTime) < maxWaitTime);
617      if (!initialized) {
618        throw new IOException("Quota manager is uninitialized, please retry later.");
619      }
620    }
621  }
622
623  private void createQuotaTable() throws IOException {
624    masterServices.createSystemTable(QuotaUtil.QUOTA_TABLE_DESC);
625  }
626
627  private static class NamedLock<T> {
628    private final HashSet<T> locks = new HashSet<>();
629
630    public void lock(final T name) throws InterruptedException {
631      synchronized (locks) {
632        while (locks.contains(name)) {
633          locks.wait();
634        }
635        locks.add(name);
636      }
637    }
638
639    public void unlock(final T name) {
640      synchronized (locks) {
641        locks.remove(name);
642        locks.notifyAll();
643      }
644    }
645  }
646
647  @Override
648  public void onRegionSplitReverted(RegionInfo hri) throws IOException {
649    if (initialized) {
650      this.namespaceQuotaManager.removeRegionFromNamespaceUsage(hri);
651    }
652  }
653
654  /**
655   * Holds the size of a region at the given time, millis since the epoch.
656   */
657  private static class SizeSnapshotWithTimestamp {
658    private final long size;
659    private final long time;
660
661    public SizeSnapshotWithTimestamp(long size, long time) {
662      this.size = size;
663      this.time = time;
664    }
665
666    public long getSize() {
667      return size;
668    }
669
670    public long getTime() {
671      return time;
672    }
673
674    @Override
675    public boolean equals(Object o) {
676      if (o instanceof SizeSnapshotWithTimestamp) {
677        SizeSnapshotWithTimestamp other = (SizeSnapshotWithTimestamp) o;
678        return size == other.size && time == other.time;
679      }
680      return false;
681    }
682
683    @Override
684    public int hashCode() {
685      HashCodeBuilder hcb = new HashCodeBuilder();
686      return hcb.append(size).append(time).toHashCode();
687    }
688
689    @Override
690    public String toString() {
691      StringBuilder sb = new StringBuilder(32);
692      sb.append("SizeSnapshotWithTimestamp={size=").append(size).append("B, ");
693      sb.append("time=").append(time).append("}");
694      return sb.toString();
695    }
696  }
697
698  void initializeRegionSizes() {
699    assert regionSizes == null;
700    this.regionSizes = new ConcurrentHashMap<>();
701  }
702
703  public void addRegionSize(RegionInfo hri, long size, long time) {
704    if (regionSizes == null) {
705      return;
706    }
707    regionSizes.put(hri, new SizeSnapshotWithTimestamp(size, time));
708  }
709
710  public Map<RegionInfo, Long> snapshotRegionSizes() {
711    if (regionSizes == null) {
712      return EMPTY_MAP;
713    }
714
715    Map<RegionInfo, Long> copy = new HashMap<>();
716    for (Entry<RegionInfo, SizeSnapshotWithTimestamp> entry : regionSizes.entrySet()) {
717      copy.put(entry.getKey(), entry.getValue().getSize());
718    }
719    return copy;
720  }
721
722  int pruneEntriesOlderThan(long timeToPruneBefore, QuotaObserverChore quotaObserverChore) {
723    if (regionSizes == null) {
724      return 0;
725    }
726    int numEntriesRemoved = 0;
727    Iterator<Entry<RegionInfo, SizeSnapshotWithTimestamp>> iterator =
728      regionSizes.entrySet().iterator();
729    while (iterator.hasNext()) {
730      RegionInfo regionInfo = iterator.next().getKey();
731      long currentEntryTime = regionSizes.get(regionInfo).getTime();
732      // do not prune the entries if table is in violation and
733      // violation policy is disable to avoid cycle of enable/disable.
734      // Please refer HBASE-22012 for more details.
735      // prune entries older than time.
736      if (
737        currentEntryTime < timeToPruneBefore
738          && !isInViolationAndPolicyDisable(regionInfo.getTable(), quotaObserverChore)
739      ) {
740        iterator.remove();
741        numEntriesRemoved++;
742      }
743    }
744    return numEntriesRemoved;
745  }
746
747  /**
748   * Method to check if a table is in violation and policy set on table is DISABLE.
749   * @param tableName          tableName to check.
750   * @param quotaObserverChore QuotaObserverChore instance
751   * @return returns true if table is in violation and policy is disable else false.
752   */
753  private boolean isInViolationAndPolicyDisable(TableName tableName,
754    QuotaObserverChore quotaObserverChore) {
755    boolean isInViolationAtTable = false;
756    boolean isInViolationAtNamespace = false;
757    SpaceViolationPolicy tablePolicy = null;
758    SpaceViolationPolicy namespacePolicy = null;
759    // Get Current Snapshot for the given table
760    SpaceQuotaSnapshot tableQuotaSnapshot = quotaObserverChore.getTableQuotaSnapshot(tableName);
761    SpaceQuotaSnapshot namespaceQuotaSnapshot =
762      quotaObserverChore.getNamespaceQuotaSnapshot(tableName.getNamespaceAsString());
763    if (tableQuotaSnapshot != null) {
764      // check if table in violation
765      isInViolationAtTable = tableQuotaSnapshot.getQuotaStatus().isInViolation();
766      Optional<SpaceViolationPolicy> policy = tableQuotaSnapshot.getQuotaStatus().getPolicy();
767      if (policy.isPresent()) {
768        tablePolicy = policy.get();
769      }
770    }
771    if (namespaceQuotaSnapshot != null) {
772      // check namespace in violation
773      isInViolationAtNamespace = namespaceQuotaSnapshot.getQuotaStatus().isInViolation();
774      Optional<SpaceViolationPolicy> policy = namespaceQuotaSnapshot.getQuotaStatus().getPolicy();
775      if (policy.isPresent()) {
776        namespacePolicy = policy.get();
777      }
778    }
779    return (tablePolicy == SpaceViolationPolicy.DISABLE && isInViolationAtTable)
780      || (namespacePolicy == SpaceViolationPolicy.DISABLE && isInViolationAtNamespace);
781  }
782
783  public void processFileArchivals(FileArchiveNotificationRequest request, Connection conn,
784    Configuration conf, FileSystem fs) throws IOException {
785    final HashMultimap<TableName, Entry<String, Long>> archivedFilesByTable = HashMultimap.create();
786    // Group the archived files by table
787    for (FileWithSize fileWithSize : request.getArchivedFilesList()) {
788      TableName tn = ProtobufUtil.toTableName(fileWithSize.getTableName());
789      archivedFilesByTable.put(tn,
790        Maps.immutableEntry(fileWithSize.getName(), fileWithSize.getSize()));
791    }
792    if (LOG.isTraceEnabled()) {
793      LOG.trace("Grouped archived files by table: " + archivedFilesByTable);
794    }
795    // Report each set of files to the appropriate object
796    for (TableName tn : archivedFilesByTable.keySet()) {
797      final Set<Entry<String, Long>> filesWithSize = archivedFilesByTable.get(tn);
798      final FileArchiverNotifier notifier =
799        FileArchiverNotifierFactoryImpl.getInstance().get(conn, conf, fs, tn);
800      notifier.addArchivedFiles(filesWithSize);
801    }
802  }
803
804  /**
805   * Removes each region size entry where the RegionInfo references the provided TableName.
806   * @param tableName tableName.
807   */
808  public void removeRegionSizesForTable(TableName tableName) {
809    regionSizes.keySet().removeIf(regionInfo -> regionInfo.getTable().equals(tableName));
810  }
811}