001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to you under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.hadoop.hbase.quotas; 018 019import java.io.IOException; 020import java.util.Map; 021import java.util.Map.Entry; 022import java.util.Objects; 023import java.util.concurrent.locks.ReentrantReadWriteLock; 024import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; 025import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; 026import java.util.stream.Collectors; 027import org.apache.hadoop.hbase.client.Connection; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 030import org.apache.yetus.audience.InterfaceAudience; 031 032import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 033import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; 035 036/** 037 * {@link QuotaSnapshotStore} implementation for namespaces. 038 */ 039@InterfaceAudience.Private 040public class NamespaceQuotaSnapshotStore implements QuotaSnapshotStore<String> { 041 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 042 private final ReadLock rlock = lock.readLock(); 043 private final WriteLock wlock = lock.writeLock(); 044 045 private final Connection conn; 046 private final QuotaObserverChore chore; 047 private Map<RegionInfo,Long> regionUsage; 048 049 public NamespaceQuotaSnapshotStore(Connection conn, QuotaObserverChore chore, Map<RegionInfo,Long> regionUsage) { 050 this.conn = Objects.requireNonNull(conn); 051 this.chore = Objects.requireNonNull(chore); 052 this.regionUsage = Objects.requireNonNull(regionUsage); 053 } 054 055 @Override 056 public SpaceQuota getSpaceQuota(String namespace) throws IOException { 057 Quotas quotas = getQuotaForNamespace(namespace); 058 if (quotas != null && quotas.hasSpace()) { 059 return quotas.getSpace(); 060 } 061 return null; 062 } 063 064 /** 065 * Fetches the namespace quota. Visible for mocking/testing. 066 */ 067 Quotas getQuotaForNamespace(String namespace) throws IOException { 068 return QuotaTableUtil.getNamespaceQuota(conn, namespace); 069 } 070 071 @Override 072 public SpaceQuotaSnapshot getCurrentState(String namespace) { 073 // Defer the "current state" to the chore 074 return this.chore.getNamespaceQuotaSnapshot(namespace); 075 } 076 077 @Override 078 public SpaceQuotaSnapshot getTargetState( 079 String subject, SpaceQuota spaceQuota) throws IOException { 080 rlock.lock(); 081 try { 082 final long sizeLimitInBytes = spaceQuota.getSoftLimit(); 083 long sum = 0L; 084 for (Entry<RegionInfo,Long> entry : filterBySubject(subject)) { 085 sum += entry.getValue(); 086 } 087 // Add in the size for any snapshots against this table 088 sum += QuotaTableUtil.getNamespaceSnapshotSize(conn, subject); 089 // Observance is defined as the size of the table being less than the limit 090 SpaceQuotaStatus status = sum <= sizeLimitInBytes ? SpaceQuotaStatus.notInViolation() 091 : new SpaceQuotaStatus(ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy())); 092 return new SpaceQuotaSnapshot(status, sum, sizeLimitInBytes); 093 } finally { 094 rlock.unlock(); 095 } 096 } 097 098 @Override 099 public Iterable<Entry<RegionInfo, Long>> filterBySubject(String namespace) { 100 rlock.lock(); 101 try { 102 return regionUsage.entrySet().stream() 103 .filter(entry -> namespace.equals(entry.getKey().getTable().getNamespaceAsString())) 104 .collect(Collectors.toList()); 105 } finally { 106 rlock.unlock(); 107 } 108 } 109 110 @Override 111 public void setCurrentState(String namespace, SpaceQuotaSnapshot snapshot) { 112 // Defer the "current state" to the chore 113 this.chore.setNamespaceQuotaSnapshot(namespace, snapshot); 114 } 115 116 @Override 117 public void setRegionUsage(Map<RegionInfo,Long> regionUsage) { 118 wlock.lock(); 119 try { 120 this.regionUsage = Objects.requireNonNull(regionUsage); 121 } finally { 122 wlock.unlock(); 123 } 124 } 125}