001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to you under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.hadoop.hbase.quotas; 018 019import java.io.IOException; 020import java.util.Map; 021import java.util.Map.Entry; 022import java.util.Objects; 023import java.util.concurrent.locks.ReentrantReadWriteLock; 024import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; 025import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; 026 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.CellScanner; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Connection; 031import org.apache.hadoop.hbase.client.RegionInfo; 032import org.apache.hadoop.hbase.client.Result; 033import org.apache.hadoop.hbase.client.ResultScanner; 034import org.apache.hadoop.hbase.client.Scan; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040import org.apache.hbase.thirdparty.com.google.common.base.Predicate; 041import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 042import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 043import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; 046 047/** 048 * {@link QuotaSnapshotStore} for tables. 049 */ 050@InterfaceAudience.Private 051public class TableQuotaSnapshotStore implements QuotaSnapshotStore<TableName> { 052 private static final Logger LOG = LoggerFactory.getLogger(TableQuotaSnapshotStore.class); 053 054 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 055 private final ReadLock rlock = lock.readLock(); 056 private final WriteLock wlock = lock.writeLock(); 057 058 private final Connection conn; 059 private final QuotaObserverChore chore; 060 private Map<RegionInfo,Long> regionUsage; 061 062 public TableQuotaSnapshotStore(Connection conn, QuotaObserverChore chore, Map<RegionInfo,Long> regionUsage) { 063 this.conn = Objects.requireNonNull(conn); 064 this.chore = Objects.requireNonNull(chore); 065 this.regionUsage = Objects.requireNonNull(regionUsage); 066 } 067 068 @Override 069 public SpaceQuota getSpaceQuota(TableName subject) throws IOException { 070 Quotas quotas = getQuotaForTable(subject); 071 if (quotas != null && quotas.hasSpace()) { 072 return quotas.getSpace(); 073 } 074 return null; 075 } 076 /** 077 * Fetches the table quota. Visible for mocking/testing. 078 */ 079 Quotas getQuotaForTable(TableName table) throws IOException { 080 return QuotaTableUtil.getTableQuota(conn, table); 081 } 082 083 @Override 084 public SpaceQuotaSnapshot getCurrentState(TableName table) { 085 // Defer the "current state" to the chore 086 return chore.getTableQuotaSnapshot(table); 087 } 088 089 @Override 090 public SpaceQuotaSnapshot getTargetState( 091 TableName table, SpaceQuota spaceQuota) throws IOException { 092 rlock.lock(); 093 try { 094 final long sizeLimitInBytes = spaceQuota.getSoftLimit(); 095 long sum = 0L; 096 for (Entry<RegionInfo,Long> entry : filterBySubject(table)) { 097 sum += entry.getValue(); 098 } 099 // Add in the size for any snapshots against this table 100 sum += getSnapshotSizesForTable(table); 101 // Observance is defined as the size of the table being less than the limit 102 SpaceQuotaStatus status = sum <= sizeLimitInBytes ? SpaceQuotaStatus.notInViolation() 103 : new SpaceQuotaStatus(ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy())); 104 return new SpaceQuotaSnapshot(status, sum, sizeLimitInBytes); 105 } finally { 106 rlock.unlock(); 107 } 108 } 109 110 /** 111 * Fetches any serialized snapshot sizes from the quota table for the {@code tn} provided. Any 112 * malformed records are skipped with a warning printed out. 113 */ 114 long getSnapshotSizesForTable(TableName tn) throws IOException { 115 try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) { 116 Scan s = QuotaTableUtil.createScanForSpaceSnapshotSizes(tn); 117 ResultScanner rs = quotaTable.getScanner(s); 118 try { 119 long size = 0L; 120 // Should just be a single row (for our table) 121 for (Result result : rs) { 122 // May have multiple columns, one for each snapshot 123 CellScanner cs = result.cellScanner(); 124 while (cs.advance()) { 125 Cell current = cs.current(); 126 try { 127 long snapshotSize = QuotaTableUtil.parseSnapshotSize(current); 128 if (LOG.isTraceEnabled()) { 129 LOG.trace("Saw snapshot size of " + snapshotSize + " for " + current); 130 } 131 size += snapshotSize; 132 } catch (InvalidProtocolBufferException e) { 133 LOG.warn("Failed to parse snapshot size from cell: " + current); 134 } 135 } 136 } 137 return size; 138 } finally { 139 if (null != rs) { 140 rs.close(); 141 } 142 } 143 } 144 } 145 146 @Override 147 public Iterable<Entry<RegionInfo,Long>> filterBySubject(TableName table) { 148 rlock.lock(); 149 try { 150 return Iterables.filter(regionUsage.entrySet(), new Predicate<Entry<RegionInfo,Long>>() { 151 @Override 152 public boolean apply(Entry<RegionInfo,Long> input) { 153 return table.equals(input.getKey().getTable()); 154 } 155 }); 156 } finally { 157 rlock.unlock(); 158 } 159 } 160 161 @Override 162 public void setCurrentState(TableName table, SpaceQuotaSnapshot snapshot) { 163 // Defer the "current state" to the chore 164 this.chore.setTableQuotaSnapshot(table, snapshot); 165 } 166 167 @Override 168 public void setRegionUsage(Map<RegionInfo,Long> regionUsage) { 169 wlock.lock(); 170 try { 171 this.regionUsage = Objects.requireNonNull(regionUsage); 172 } finally { 173 wlock.unlock(); 174 } 175 } 176}