001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.quotas;
020
021import java.io.IOException;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.Optional;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import org.apache.commons.lang3.builder.HashCodeBuilder;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.NamespaceDescriptor;
036import org.apache.hadoop.hbase.RegionStateListener;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.master.MasterServices;
041import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
042import org.apache.hadoop.hbase.master.procedure.SwitchRpcThrottleProcedure;
043import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
044import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.apache.yetus.audience.InterfaceStability;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
052import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
053import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
054
055import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize;
066
067/**
068 * Master Quota Manager.
069 * It is responsible for initialize the quota table on the first-run and
070 * provide the admin operations to interact with the quota table.
071 *
072 * TODO: FUTURE: The master will be responsible to notify each RS of quota changes
073 * and it will do the "quota aggregation" when the QuotaScope is CLUSTER.
074 */
075@InterfaceAudience.Private
076@InterfaceStability.Evolving
077public class MasterQuotaManager implements RegionStateListener {
078  private static final Logger LOG = LoggerFactory.getLogger(MasterQuotaManager.class);
079  private static final Map<RegionInfo, Long> EMPTY_MAP = Collections.unmodifiableMap(
080      new HashMap<>());
081
082  private final MasterServices masterServices;
083  private NamedLock<String> namespaceLocks;
084  private NamedLock<TableName> tableLocks;
085  private NamedLock<String> userLocks;
086  private NamedLock<String> regionServerLocks;
087  private boolean initialized = false;
088  private NamespaceAuditor namespaceQuotaManager;
089  private ConcurrentHashMap<RegionInfo, SizeSnapshotWithTimestamp> regionSizes;
090  // Storage for quota rpc throttle
091  private RpcThrottleStorage rpcThrottleStorage;
092
093  public MasterQuotaManager(final MasterServices masterServices) {
094    this.masterServices = masterServices;
095  }
096
097  public void start() throws IOException {
098    // If the user doesn't want the quota support skip all the initializations.
099    if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
100      LOG.info("Quota support disabled");
101      return;
102    }
103
104    // Create the quota table if missing
105    if (!masterServices.getTableDescriptors().exists(QuotaUtil.QUOTA_TABLE_NAME)) {
106      LOG.info("Quota table not found. Creating...");
107      createQuotaTable();
108    }
109
110    LOG.info("Initializing quota support");
111    namespaceLocks = new NamedLock<>();
112    tableLocks = new NamedLock<>();
113    userLocks = new NamedLock<>();
114    regionServerLocks = new NamedLock<>();
115    regionSizes = new ConcurrentHashMap<>();
116
117    namespaceQuotaManager = new NamespaceAuditor(masterServices);
118    namespaceQuotaManager.start();
119    initialized = true;
120
121    rpcThrottleStorage =
122        new RpcThrottleStorage(masterServices.getZooKeeper(), masterServices.getConfiguration());
123  }
124
125  public void stop() {
126  }
127
128  public boolean isQuotaInitialized() {
129    return initialized && namespaceQuotaManager.isInitialized();
130  }
131
132  /* ==========================================================================
133   *  Admin operations to manage the quota table
134   */
135  public SetQuotaResponse setQuota(final SetQuotaRequest req)
136      throws IOException, InterruptedException {
137    checkQuotaSupport();
138
139    if (req.hasUserName()) {
140      userLocks.lock(req.getUserName());
141      try {
142        if (req.hasTableName()) {
143          setUserQuota(req.getUserName(), ProtobufUtil.toTableName(req.getTableName()), req);
144        } else if (req.hasNamespace()) {
145          setUserQuota(req.getUserName(), req.getNamespace(), req);
146        } else {
147          setUserQuota(req.getUserName(), req);
148        }
149      } finally {
150        userLocks.unlock(req.getUserName());
151      }
152    } else if (req.hasTableName()) {
153      TableName table = ProtobufUtil.toTableName(req.getTableName());
154      tableLocks.lock(table);
155      try {
156        setTableQuota(table, req);
157      } finally {
158        tableLocks.unlock(table);
159      }
160    } else if (req.hasNamespace()) {
161      namespaceLocks.lock(req.getNamespace());
162      try {
163        setNamespaceQuota(req.getNamespace(), req);
164      } finally {
165        namespaceLocks.unlock(req.getNamespace());
166      }
167    } else if (req.hasRegionServer()) {
168      regionServerLocks.lock(req.getRegionServer());
169      try {
170        setRegionServerQuota(req.getRegionServer(), req);
171      } finally {
172        regionServerLocks.unlock(req.getRegionServer());
173      }
174    } else {
175      throw new DoNotRetryIOException(new UnsupportedOperationException(
176          "a user, a table, a namespace or region server must be specified"));
177    }
178    return SetQuotaResponse.newBuilder().build();
179  }
180
181  public void setUserQuota(final String userName, final SetQuotaRequest req)
182      throws IOException, InterruptedException {
183    setQuota(req, new SetQuotaOperations() {
184      @Override
185      public GlobalQuotaSettingsImpl fetch() throws IOException {
186        return new GlobalQuotaSettingsImpl(req.getUserName(), null, null, null,
187            QuotaUtil.getUserQuota(masterServices.getConnection(), userName));
188      }
189      @Override
190      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
191        QuotaUtil.addUserQuota(masterServices.getConnection(), userName, quotaPojo.toQuotas());
192      }
193      @Override
194      public void delete() throws IOException {
195        QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName);
196      }
197      @Override
198      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
199        masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, quotaPojo);
200      }
201      @Override
202      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
203        masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, quotaPojo);
204      }
205    });
206  }
207
208  public void setUserQuota(final String userName, final TableName table,
209      final SetQuotaRequest req) throws IOException, InterruptedException {
210    setQuota(req, new SetQuotaOperations() {
211      @Override
212      public GlobalQuotaSettingsImpl fetch() throws IOException {
213        return new GlobalQuotaSettingsImpl(userName, table, null, null,
214            QuotaUtil.getUserQuota(masterServices.getConnection(), userName, table));
215      }
216      @Override
217      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
218        QuotaUtil.addUserQuota(masterServices.getConnection(), userName, table,
219            quotaPojo.toQuotas());
220      }
221      @Override
222      public void delete() throws IOException {
223        QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, table);
224      }
225      @Override
226      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
227        masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, table, quotaPojo);
228      }
229      @Override
230      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
231        masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, table, quotaPojo);
232      }
233    });
234  }
235
236  public void setUserQuota(final String userName, final String namespace,
237      final SetQuotaRequest req) throws IOException, InterruptedException {
238    setQuota(req, new SetQuotaOperations() {
239      @Override
240      public GlobalQuotaSettingsImpl fetch() throws IOException {
241        return new GlobalQuotaSettingsImpl(userName, null, namespace, null,
242            QuotaUtil.getUserQuota(masterServices.getConnection(), userName, namespace));
243      }
244      @Override
245      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
246        QuotaUtil.addUserQuota(masterServices.getConnection(), userName, namespace,
247            quotaPojo.toQuotas());
248      }
249      @Override
250      public void delete() throws IOException {
251        QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, namespace);
252      }
253      @Override
254      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
255        masterServices.getMasterCoprocessorHost().preSetUserQuota(
256            userName, namespace, quotaPojo);
257      }
258      @Override
259      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
260        masterServices.getMasterCoprocessorHost().postSetUserQuota(
261            userName, namespace, quotaPojo);
262      }
263    });
264  }
265
266  public void setTableQuota(final TableName table, final SetQuotaRequest req)
267      throws IOException, InterruptedException {
268    setQuota(req, new SetQuotaOperations() {
269      @Override
270      public GlobalQuotaSettingsImpl fetch() throws IOException {
271        return new GlobalQuotaSettingsImpl(null, table, null, null,
272            QuotaUtil.getTableQuota(masterServices.getConnection(), table));
273      }
274      @Override
275      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
276        QuotaUtil.addTableQuota(masterServices.getConnection(), table, quotaPojo.toQuotas());
277      }
278      @Override
279      public void delete() throws IOException {
280        SpaceQuotaSnapshot currSnapshotOfTable =
281            QuotaTableUtil.getCurrentSnapshotFromQuotaTable(masterServices.getConnection(), table);
282        QuotaUtil.deleteTableQuota(masterServices.getConnection(), table);
283        if (currSnapshotOfTable != null) {
284          SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus();
285          if (SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy().orElse(null)
286              && quotaStatus.isInViolation()) {
287            QuotaUtil.enableTableIfNotEnabled(masterServices.getConnection(), table);
288          }
289        }
290      }
291      @Override
292      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
293        masterServices.getMasterCoprocessorHost().preSetTableQuota(table, quotaPojo);
294      }
295      @Override
296      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
297        masterServices.getMasterCoprocessorHost().postSetTableQuota(table, quotaPojo);
298      }
299    });
300  }
301
302  public void setNamespaceQuota(final String namespace, final SetQuotaRequest req)
303      throws IOException, InterruptedException {
304    setQuota(req, new SetQuotaOperations() {
305      @Override
306      public GlobalQuotaSettingsImpl fetch() throws IOException {
307        return new GlobalQuotaSettingsImpl(null, null, namespace, null,
308            QuotaUtil.getNamespaceQuota(masterServices.getConnection(), namespace));
309      }
310      @Override
311      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
312        QuotaUtil.addNamespaceQuota(masterServices.getConnection(), namespace,
313          quotaPojo.toQuotas());
314      }
315      @Override
316      public void delete() throws IOException {
317        QuotaUtil.deleteNamespaceQuota(masterServices.getConnection(), namespace);
318      }
319      @Override
320      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
321        masterServices.getMasterCoprocessorHost().preSetNamespaceQuota(namespace, quotaPojo);
322      }
323      @Override
324      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
325        masterServices.getMasterCoprocessorHost().postSetNamespaceQuota(namespace, quotaPojo);
326      }
327    });
328  }
329
330  public void setRegionServerQuota(final String regionServer, final SetQuotaRequest req)
331      throws IOException, InterruptedException {
332    setQuota(req, new SetQuotaOperations() {
333      @Override
334      public GlobalQuotaSettingsImpl fetch() throws IOException {
335        return new GlobalQuotaSettingsImpl(null, null, null, regionServer,
336            QuotaUtil.getRegionServerQuota(masterServices.getConnection(), regionServer));
337      }
338
339      @Override
340      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
341        QuotaUtil.addRegionServerQuota(masterServices.getConnection(), regionServer,
342          quotaPojo.toQuotas());
343      }
344
345      @Override
346      public void delete() throws IOException {
347        QuotaUtil.deleteRegionServerQuota(masterServices.getConnection(), regionServer);
348      }
349
350      @Override
351      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
352        masterServices.getMasterCoprocessorHost().preSetRegionServerQuota(regionServer, quotaPojo);
353      }
354
355      @Override
356      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
357        masterServices.getMasterCoprocessorHost().postSetRegionServerQuota(regionServer, quotaPojo);
358      }
359    });
360  }
361
362  public void setNamespaceQuota(NamespaceDescriptor desc) throws IOException {
363    if (initialized) {
364      this.namespaceQuotaManager.addNamespace(desc);
365    }
366  }
367
368  public void removeNamespaceQuota(String namespace) throws IOException {
369    if (initialized) {
370      this.namespaceQuotaManager.deleteNamespace(namespace);
371    }
372  }
373
374  public SwitchRpcThrottleResponse switchRpcThrottle(SwitchRpcThrottleRequest request)
375      throws IOException {
376    boolean rpcThrottle = request.getRpcThrottleEnabled();
377    if (initialized) {
378      masterServices.getMasterCoprocessorHost().preSwitchRpcThrottle(rpcThrottle);
379      boolean oldRpcThrottle = rpcThrottleStorage.isRpcThrottleEnabled();
380      if (rpcThrottle != oldRpcThrottle) {
381        LOG.info("{} switch rpc throttle from {} to {}", masterServices.getClientIdAuditPrefix(),
382          oldRpcThrottle, rpcThrottle);
383        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
384        SwitchRpcThrottleProcedure procedure = new SwitchRpcThrottleProcedure(rpcThrottleStorage,
385            rpcThrottle, masterServices.getServerName(), latch);
386        masterServices.getMasterProcedureExecutor().submitProcedure(procedure);
387        latch.await();
388      } else {
389        LOG.warn("Skip switch rpc throttle to {} because it's the same with old value",
390          rpcThrottle);
391      }
392      SwitchRpcThrottleResponse response = SwitchRpcThrottleResponse.newBuilder()
393          .setPreviousRpcThrottleEnabled(oldRpcThrottle).build();
394      masterServices.getMasterCoprocessorHost().postSwitchRpcThrottle(oldRpcThrottle, rpcThrottle);
395      return response;
396    } else {
397      LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", rpcThrottle);
398      return SwitchRpcThrottleResponse.newBuilder().setPreviousRpcThrottleEnabled(false).build();
399    }
400  }
401
402  public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(IsRpcThrottleEnabledRequest request)
403      throws IOException {
404    if (initialized) {
405      masterServices.getMasterCoprocessorHost().preIsRpcThrottleEnabled();
406      boolean enabled = isRpcThrottleEnabled();
407      IsRpcThrottleEnabledResponse response =
408          IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(enabled).build();
409      masterServices.getMasterCoprocessorHost().postIsRpcThrottleEnabled(enabled);
410      return response;
411    } else {
412      LOG.warn("Skip get rpc throttle because rpc quota is disabled");
413      return IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(false).build();
414    }
415  }
416
417  public boolean isRpcThrottleEnabled() throws IOException {
418    return initialized ? rpcThrottleStorage.isRpcThrottleEnabled() : false;
419  }
420
421  public SwitchExceedThrottleQuotaResponse
422      switchExceedThrottleQuota(SwitchExceedThrottleQuotaRequest request) throws IOException {
423    boolean enabled = request.getExceedThrottleQuotaEnabled();
424    if (initialized) {
425      masterServices.getMasterCoprocessorHost().preSwitchExceedThrottleQuota(enabled);
426      boolean previousEnabled =
427          QuotaUtil.isExceedThrottleQuotaEnabled(masterServices.getConnection());
428      if (previousEnabled == enabled) {
429        LOG.warn("Skip switch exceed throttle quota to {} because it's the same with old value",
430          enabled);
431      } else {
432        QuotaUtil.switchExceedThrottleQuota(masterServices.getConnection(), enabled);
433        LOG.info("{} switch exceed throttle quota from {} to {}",
434          masterServices.getClientIdAuditPrefix(), previousEnabled, enabled);
435      }
436      SwitchExceedThrottleQuotaResponse response = SwitchExceedThrottleQuotaResponse.newBuilder()
437          .setPreviousExceedThrottleQuotaEnabled(previousEnabled).build();
438      masterServices.getMasterCoprocessorHost().postSwitchExceedThrottleQuota(previousEnabled,
439        enabled);
440      return response;
441    } else {
442      LOG.warn("Skip switch exceed throttle quota to {} because quota is disabled", enabled);
443      return SwitchExceedThrottleQuotaResponse.newBuilder()
444          .setPreviousExceedThrottleQuotaEnabled(false).build();
445    }
446  }
447
448  public boolean isExceedThrottleQuotaEnabled() throws IOException {
449    return initialized ? QuotaUtil.isExceedThrottleQuotaEnabled(masterServices.getConnection())
450        : false;
451  }
452
453  private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps)
454      throws IOException, InterruptedException {
455    if (req.hasRemoveAll() && req.getRemoveAll() == true) {
456      quotaOps.preApply(null);
457      quotaOps.delete();
458      quotaOps.postApply(null);
459      return;
460    }
461
462    // Apply quota changes
463    GlobalQuotaSettingsImpl currentQuota = quotaOps.fetch();
464    if (LOG.isTraceEnabled()) {
465      LOG.trace(
466          "Current quota for request(" + TextFormat.shortDebugString(req)
467              + "): " + currentQuota);
468    }
469    // Call the appropriate "pre" CP hook with the current quota value (may be null)
470    quotaOps.preApply(currentQuota);
471    // Translate the protobuf request back into a POJO
472    QuotaSettings newQuota = QuotaSettings.buildFromProto(req);
473    if (LOG.isTraceEnabled()) {
474      LOG.trace("Deserialized quota from request: " + newQuota);
475    }
476
477    // Merge the current quota settings with the new quota settings the user provided.
478    //
479    // NB: while SetQuotaRequest technically allows for multi types of quotas to be set in one
480    // message, the Java API (in Admin/AsyncAdmin) does not. Assume there is only one type.
481    GlobalQuotaSettingsImpl mergedQuota = currentQuota.merge(newQuota);
482    if (LOG.isTraceEnabled()) {
483      LOG.trace("Computed merged quota from current quota and user request: " + mergedQuota);
484    }
485
486    // Submit new changes
487    if (mergedQuota == null) {
488      quotaOps.delete();
489    } else {
490      quotaOps.update(mergedQuota);
491    }
492    // Advertise the final result via the "post" CP hook
493    quotaOps.postApply(mergedQuota);
494  }
495
496  public void checkNamespaceTableAndRegionQuota(TableName tName, int regions) throws IOException {
497    if (initialized) {
498      namespaceQuotaManager.checkQuotaToCreateTable(tName, regions);
499    }
500  }
501
502  public void checkAndUpdateNamespaceRegionQuota(TableName tName, int regions) throws IOException {
503    if (initialized) {
504      namespaceQuotaManager.checkQuotaToUpdateRegion(tName, regions);
505    }
506  }
507
508  /**
509   * @return cached region count, or -1 if quota manager is disabled or table status not found
510  */
511  public int getRegionCountOfTable(TableName tName) throws IOException {
512    if (initialized) {
513      return namespaceQuotaManager.getRegionCountOfTable(tName);
514    }
515    return -1;
516  }
517
518  @Override
519  public void onRegionMerged(RegionInfo mergedRegion) throws IOException {
520    if (initialized) {
521      namespaceQuotaManager.updateQuotaForRegionMerge(mergedRegion);
522    }
523  }
524
525  @Override
526  public void onRegionSplit(RegionInfo hri) throws IOException {
527    if (initialized) {
528      namespaceQuotaManager.checkQuotaToSplitRegion(hri);
529    }
530  }
531
532  /**
533   * Remove table from namespace quota.
534   *
535   * @param tName - The table name to update quota usage.
536   * @throws IOException Signals that an I/O exception has occurred.
537   */
538  public void removeTableFromNamespaceQuota(TableName tName) throws IOException {
539    if (initialized) {
540      namespaceQuotaManager.removeFromNamespaceUsage(tName);
541    }
542  }
543
544  public NamespaceAuditor getNamespaceQuotaManager() {
545    return this.namespaceQuotaManager;
546  }
547
548  /**
549   * Encapsulates CRUD quota operations for some subject.
550   */
551  private static interface SetQuotaOperations {
552    /**
553     * Fetches the current quota settings for the subject.
554     */
555    GlobalQuotaSettingsImpl fetch() throws IOException;
556    /**
557     * Deletes the quota for the subject.
558     */
559    void delete() throws IOException;
560    /**
561     * Persist the given quota for the subject.
562     */
563    void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
564    /**
565     * Performs some action before {@link #update(GlobalQuotaSettingsImpl)} with the current
566     * quota for the subject.
567     */
568    void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
569    /**
570     * Performs some action after {@link #update(GlobalQuotaSettingsImpl)} with the resulting
571     * quota from the request action for the subject.
572     */
573    void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
574  }
575
576  /* ==========================================================================
577   *  Helpers
578   */
579
580  private void checkQuotaSupport() throws IOException {
581    if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
582      throw new DoNotRetryIOException(
583        new UnsupportedOperationException("quota support disabled"));
584    }
585    if (!initialized) {
586      long maxWaitTime = masterServices.getConfiguration().getLong(
587        "hbase.master.wait.for.quota.manager.init", 30000); // default is 30 seconds.
588      long startTime = EnvironmentEdgeManager.currentTime();
589      do {
590        try {
591          Thread.sleep(100);
592        } catch (InterruptedException e) {
593          LOG.warn("Interrupted while waiting for Quota Manager to be initialized.");
594          break;
595        }
596      } while (!initialized && (EnvironmentEdgeManager.currentTime() - startTime) < maxWaitTime);
597      if (!initialized) {
598        throw new IOException("Quota manager is uninitialized, please retry later.");
599      }
600    }
601  }
602
603  private void createQuotaTable() throws IOException {
604    masterServices.createSystemTable(QuotaUtil.QUOTA_TABLE_DESC);
605  }
606
607  private static class NamedLock<T> {
608    private final HashSet<T> locks = new HashSet<>();
609
610    public void lock(final T name) throws InterruptedException {
611      synchronized (locks) {
612        while (locks.contains(name)) {
613          locks.wait();
614        }
615        locks.add(name);
616      }
617    }
618
619    public void unlock(final T name) {
620      synchronized (locks) {
621        locks.remove(name);
622        locks.notifyAll();
623      }
624    }
625  }
626
627  @Override
628  public void onRegionSplitReverted(RegionInfo hri) throws IOException {
629    if (initialized) {
630      this.namespaceQuotaManager.removeRegionFromNamespaceUsage(hri);
631    }
632  }
633
634  /**
635   * Holds the size of a region at the given time, millis since the epoch.
636   */
637  private static class SizeSnapshotWithTimestamp {
638    private final long size;
639    private final long time;
640
641    public SizeSnapshotWithTimestamp(long size, long time) {
642      this.size = size;
643      this.time = time;
644    }
645
646    public long getSize() {
647      return size;
648    }
649
650    public long getTime() {
651      return time;
652    }
653
654    @Override
655    public boolean equals(Object o) {
656      if (o instanceof SizeSnapshotWithTimestamp) {
657        SizeSnapshotWithTimestamp other = (SizeSnapshotWithTimestamp) o;
658        return size == other.size && time == other.time;
659      }
660      return false;
661    }
662
663    @Override
664    public int hashCode() {
665      HashCodeBuilder hcb = new HashCodeBuilder();
666      return hcb.append(size).append(time).toHashCode();
667    }
668
669    @Override
670    public String toString() {
671      StringBuilder sb = new StringBuilder(32);
672      sb.append("SizeSnapshotWithTimestamp={size=").append(size).append("B, ");
673      sb.append("time=").append(time).append("}");
674      return sb.toString();
675    }
676  }
677
678  void initializeRegionSizes() {
679    assert regionSizes == null;
680    this.regionSizes = new ConcurrentHashMap<>();
681  }
682
683  public void addRegionSize(RegionInfo hri, long size, long time) {
684    if (regionSizes == null) {
685      return;
686    }
687    regionSizes.put(hri, new SizeSnapshotWithTimestamp(size, time));
688  }
689
690  public Map<RegionInfo, Long> snapshotRegionSizes() {
691    if (regionSizes == null) {
692      return EMPTY_MAP;
693    }
694
695    Map<RegionInfo, Long> copy = new HashMap<>();
696    for (Entry<RegionInfo, SizeSnapshotWithTimestamp> entry : regionSizes.entrySet()) {
697      copy.put(entry.getKey(), entry.getValue().getSize());
698    }
699    return copy;
700  }
701
702  int pruneEntriesOlderThan(long timeToPruneBefore, QuotaObserverChore quotaObserverChore) {
703    if (regionSizes == null) {
704      return 0;
705    }
706    int numEntriesRemoved = 0;
707    Iterator<Entry<RegionInfo, SizeSnapshotWithTimestamp>> iterator =
708        regionSizes.entrySet().iterator();
709    while (iterator.hasNext()) {
710      RegionInfo regionInfo = iterator.next().getKey();
711      long currentEntryTime = regionSizes.get(regionInfo).getTime();
712      // do not prune the entries if table is in violation and
713      // violation policy is disable to avoid cycle of enable/disable.
714      // Please refer HBASE-22012 for more details.
715      // prune entries older than time.
716      if (currentEntryTime < timeToPruneBefore && !isInViolationAndPolicyDisable(
717          regionInfo.getTable(), quotaObserverChore)) {
718        iterator.remove();
719        numEntriesRemoved++;
720      }
721    }
722    return numEntriesRemoved;
723  }
724
725  /**
726   * Method to check if a table is in violation and policy set on table is DISABLE.
727   *
728   * @param tableName          tableName to check.
729   * @param quotaObserverChore QuotaObserverChore instance
730   * @return returns true if table is in violation and policy is disable else false.
731   */
732  private boolean isInViolationAndPolicyDisable(TableName tableName,
733      QuotaObserverChore quotaObserverChore) {
734    boolean isInViolationAtTable = false;
735    boolean isInViolationAtNamespace = false;
736    SpaceViolationPolicy tablePolicy = null;
737    SpaceViolationPolicy namespacePolicy = null;
738    // Get Current Snapshot for the given table
739    SpaceQuotaSnapshot tableQuotaSnapshot = quotaObserverChore.getTableQuotaSnapshot(tableName);
740    SpaceQuotaSnapshot namespaceQuotaSnapshot =
741        quotaObserverChore.getNamespaceQuotaSnapshot(tableName.getNamespaceAsString());
742    if (tableQuotaSnapshot != null) {
743      // check if table in violation
744      isInViolationAtTable = tableQuotaSnapshot.getQuotaStatus().isInViolation();
745      Optional<SpaceViolationPolicy> policy = tableQuotaSnapshot.getQuotaStatus().getPolicy();
746      if (policy.isPresent()) {
747        tablePolicy = policy.get();
748      }
749    }
750    if (namespaceQuotaSnapshot != null) {
751      // check namespace in violation
752      isInViolationAtNamespace = namespaceQuotaSnapshot.getQuotaStatus().isInViolation();
753      Optional<SpaceViolationPolicy> policy = namespaceQuotaSnapshot.getQuotaStatus().getPolicy();
754      if (policy.isPresent()) {
755        namespacePolicy = policy.get();
756      }
757    }
758    return (tablePolicy == SpaceViolationPolicy.DISABLE && isInViolationAtTable) || (
759        namespacePolicy == SpaceViolationPolicy.DISABLE && isInViolationAtNamespace);
760  }
761
762  /**
763   * Removes each region size entry where the RegionInfo references the provided TableName.
764   *
765   * @param tableName tableName.
766   */
767  public void removeRegionSizesForTable(TableName tableName) {
768    regionSizes.keySet().removeIf(regionInfo -> regionInfo.getTable().equals(tableName));
769  }
770
771  public void processFileArchivals(FileArchiveNotificationRequest request, Connection conn,
772      Configuration conf, FileSystem fs) throws IOException {
773    final HashMultimap<TableName,Entry<String,Long>> archivedFilesByTable = HashMultimap.create();
774    // Group the archived files by table
775    for (FileWithSize fileWithSize : request.getArchivedFilesList()) {
776      TableName tn = ProtobufUtil.toTableName(fileWithSize.getTableName());
777      archivedFilesByTable.put(
778          tn, Maps.immutableEntry(fileWithSize.getName(), fileWithSize.getSize()));
779    }
780    if (LOG.isTraceEnabled()) {
781      LOG.trace("Grouped archived files by table: " + archivedFilesByTable);
782    }
783    // Report each set of files to the appropriate object
784    for (TableName tn : archivedFilesByTable.keySet()) {
785      final Set<Entry<String,Long>> filesWithSize = archivedFilesByTable.get(tn);
786      final FileArchiverNotifier notifier = FileArchiverNotifierFactoryImpl.getInstance().get(
787          conn, conf, fs, tn);
788      notifier.addArchivedFiles(filesWithSize);
789    }
790  }
791}
792