001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.Map; 022import java.util.Map.Entry; 023import java.util.Objects; 024import java.util.concurrent.locks.ReentrantReadWriteLock; 025import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; 026import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; 027import java.util.stream.Collectors; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellScanner; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Connection; 032import org.apache.hadoop.hbase.client.RegionInfo; 033import org.apache.hadoop.hbase.client.Result; 034import org.apache.hadoop.hbase.client.ResultScanner; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 043 044import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; 047 048/** 049 * {@link QuotaSnapshotStore} for tables. 050 */ 051@InterfaceAudience.Private 052public class TableQuotaSnapshotStore implements QuotaSnapshotStore<TableName> { 053 private static final Logger LOG = LoggerFactory.getLogger(TableQuotaSnapshotStore.class); 054 055 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 056 private final ReadLock rlock = lock.readLock(); 057 private final WriteLock wlock = lock.writeLock(); 058 059 private final Connection conn; 060 private final QuotaObserverChore chore; 061 private Map<RegionInfo, Long> regionUsage; 062 063 public TableQuotaSnapshotStore(Connection conn, QuotaObserverChore chore, 064 Map<RegionInfo, Long> regionUsage) { 065 this.conn = Objects.requireNonNull(conn); 066 this.chore = Objects.requireNonNull(chore); 067 this.regionUsage = Objects.requireNonNull(regionUsage); 068 } 069 070 @Override 071 public SpaceQuota getSpaceQuota(TableName subject) throws IOException { 072 Quotas quotas = getQuotaForTable(subject); 073 if (quotas != null && quotas.hasSpace()) { 074 return quotas.getSpace(); 075 } 076 return null; 077 } 078 079 /** 080 * Fetches the table quota. Visible for mocking/testing. 081 */ 082 Quotas getQuotaForTable(TableName table) throws IOException { 083 return QuotaTableUtil.getTableQuota(conn, table); 084 } 085 086 @Override 087 public SpaceQuotaSnapshot getCurrentState(TableName table) { 088 // Defer the "current state" to the chore 089 return chore.getTableQuotaSnapshot(table); 090 } 091 092 @Override 093 public SpaceQuotaSnapshot getTargetState(TableName table, SpaceQuota spaceQuota) 094 throws IOException { 095 rlock.lock(); 096 try { 097 final long sizeLimitInBytes = spaceQuota.getSoftLimit(); 098 long sum = 0L; 099 for (Entry<RegionInfo, Long> entry : filterBySubject(table)) { 100 sum += entry.getValue(); 101 } 102 // Add in the size for any snapshots against this table 103 sum += getSnapshotSizesForTable(table); 104 // Observance is defined as the size of the table being less than the limit 105 SpaceQuotaStatus status = sum <= sizeLimitInBytes 106 ? SpaceQuotaStatus.notInViolation() 107 : new SpaceQuotaStatus(ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy())); 108 return new SpaceQuotaSnapshot(status, sum, sizeLimitInBytes); 109 } finally { 110 rlock.unlock(); 111 } 112 } 113 114 /** 115 * Fetches any serialized snapshot sizes from the quota table for the {@code tn} provided. Any 116 * malformed records are skipped with a warning printed out. 117 */ 118 long getSnapshotSizesForTable(TableName tn) throws IOException { 119 try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) { 120 Scan s = QuotaTableUtil.createScanForSpaceSnapshotSizes(tn); 121 ResultScanner rs = quotaTable.getScanner(s); 122 try { 123 long size = 0L; 124 // Should just be a single row (for our table) 125 for (Result result : rs) { 126 // May have multiple columns, one for each snapshot 127 CellScanner cs = result.cellScanner(); 128 while (cs.advance()) { 129 Cell current = cs.current(); 130 try { 131 long snapshotSize = QuotaTableUtil.parseSnapshotSize(current); 132 if (LOG.isTraceEnabled()) { 133 LOG.trace("Saw snapshot size of " + snapshotSize + " for " + current); 134 } 135 size += snapshotSize; 136 } catch (InvalidProtocolBufferException e) { 137 LOG.warn("Failed to parse snapshot size from cell: " + current); 138 } 139 } 140 } 141 return size; 142 } finally { 143 if (null != rs) { 144 rs.close(); 145 } 146 } 147 } 148 } 149 150 @Override 151 public Iterable<Entry<RegionInfo, Long>> filterBySubject(TableName table) { 152 rlock.lock(); 153 try { 154 return regionUsage.entrySet().stream() 155 .filter(entry -> table.equals(entry.getKey().getTable())).collect(Collectors.toList()); 156 } finally { 157 rlock.unlock(); 158 } 159 } 160 161 @Override 162 public void setCurrentState(TableName table, SpaceQuotaSnapshot snapshot) { 163 // Defer the "current state" to the chore 164 this.chore.setTableQuotaSnapshot(table, snapshot); 165 } 166 167 @Override 168 public void setRegionUsage(Map<RegionInfo, Long> regionUsage) { 169 wlock.lock(); 170 try { 171 this.regionUsage = Objects.requireNonNull(regionUsage); 172 } finally { 173 wlock.unlock(); 174 } 175 } 176}