001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.Map; 022import java.util.Map.Entry; 023import java.util.Objects; 024import java.util.concurrent.locks.ReentrantReadWriteLock; 025import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; 026import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; 027import java.util.stream.Collectors; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.client.RegionInfo; 030import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 031import org.apache.yetus.audience.InterfaceAudience; 032 033import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; 035import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; 036 037/** 038 * {@link QuotaSnapshotStore} implementation for namespaces. 039 */ 040@InterfaceAudience.Private 041public class NamespaceQuotaSnapshotStore implements QuotaSnapshotStore<String> { 042 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 043 private final ReadLock rlock = lock.readLock(); 044 private final WriteLock wlock = lock.writeLock(); 045 046 private final Connection conn; 047 private final QuotaObserverChore chore; 048 private Map<RegionInfo, Long> regionUsage; 049 050 public NamespaceQuotaSnapshotStore(Connection conn, QuotaObserverChore chore, 051 Map<RegionInfo, Long> regionUsage) { 052 this.conn = Objects.requireNonNull(conn); 053 this.chore = Objects.requireNonNull(chore); 054 this.regionUsage = Objects.requireNonNull(regionUsage); 055 } 056 057 @Override 058 public SpaceQuota getSpaceQuota(String namespace) throws IOException { 059 Quotas quotas = getQuotaForNamespace(namespace); 060 if (quotas != null && quotas.hasSpace()) { 061 return quotas.getSpace(); 062 } 063 return null; 064 } 065 066 /** 067 * Fetches the namespace quota. Visible for mocking/testing. 068 */ 069 Quotas getQuotaForNamespace(String namespace) throws IOException { 070 return QuotaTableUtil.getNamespaceQuota(conn, namespace); 071 } 072 073 @Override 074 public SpaceQuotaSnapshot getCurrentState(String namespace) { 075 // Defer the "current state" to the chore 076 return this.chore.getNamespaceQuotaSnapshot(namespace); 077 } 078 079 @Override 080 public SpaceQuotaSnapshot getTargetState(String subject, SpaceQuota spaceQuota) 081 throws IOException { 082 rlock.lock(); 083 try { 084 final long sizeLimitInBytes = spaceQuota.getSoftLimit(); 085 long sum = 0L; 086 for (Entry<RegionInfo, Long> entry : filterBySubject(subject)) { 087 sum += entry.getValue(); 088 } 089 // Add in the size for any snapshots against this table 090 sum += QuotaTableUtil.getNamespaceSnapshotSize(conn, subject); 091 // Observance is defined as the size of the table being less than the limit 092 SpaceQuotaStatus status = sum <= sizeLimitInBytes 093 ? SpaceQuotaStatus.notInViolation() 094 : new SpaceQuotaStatus(ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy())); 095 return new SpaceQuotaSnapshot(status, sum, sizeLimitInBytes); 096 } finally { 097 rlock.unlock(); 098 } 099 } 100 101 @Override 102 public Iterable<Entry<RegionInfo, Long>> filterBySubject(String namespace) { 103 rlock.lock(); 104 try { 105 return regionUsage.entrySet().stream() 106 .filter(entry -> namespace.equals(entry.getKey().getTable().getNamespaceAsString())) 107 .collect(Collectors.toList()); 108 } finally { 109 rlock.unlock(); 110 } 111 } 112 113 @Override 114 public void setCurrentState(String namespace, SpaceQuotaSnapshot snapshot) { 115 // Defer the "current state" to the chore 116 this.chore.setNamespaceQuotaSnapshot(namespace, snapshot); 117 } 118 119 @Override 120 public void setRegionUsage(Map<RegionInfo, Long> regionUsage) { 121 wlock.lock(); 122 try { 123 this.regionUsage = Objects.requireNonNull(regionUsage); 124 } finally { 125 wlock.unlock(); 126 } 127 } 128}