001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to you under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.hadoop.hbase.quotas;
018
019import java.io.IOException;
020import java.util.Map;
021import java.util.Map.Entry;
022import java.util.Objects;
023import java.util.concurrent.locks.ReentrantReadWriteLock;
024import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
025import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
026
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellScanner;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.RegionInfo;
032import org.apache.hadoop.hbase.client.Result;
033import org.apache.hadoop.hbase.client.ResultScanner;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
041import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
042import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
043import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
046
047/**
048 * {@link QuotaSnapshotStore} for tables.
049 */
050@InterfaceAudience.Private
051public class TableQuotaSnapshotStore implements QuotaSnapshotStore<TableName> {
052  private static final Logger LOG = LoggerFactory.getLogger(TableQuotaSnapshotStore.class);
053
054  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
055  private final ReadLock rlock = lock.readLock();
056  private final WriteLock wlock = lock.writeLock();
057
058  private final Connection conn;
059  private final QuotaObserverChore chore;
060  private Map<RegionInfo,Long> regionUsage;
061
062  public TableQuotaSnapshotStore(Connection conn, QuotaObserverChore chore, Map<RegionInfo,Long> regionUsage) {
063    this.conn = Objects.requireNonNull(conn);
064    this.chore = Objects.requireNonNull(chore);
065    this.regionUsage = Objects.requireNonNull(regionUsage);
066  }
067
068  @Override
069  public SpaceQuota getSpaceQuota(TableName subject) throws IOException {
070    Quotas quotas = getQuotaForTable(subject);
071    if (quotas != null && quotas.hasSpace()) {
072      return quotas.getSpace();
073    }
074    return null;
075  }
076  /**
077   * Fetches the table quota. Visible for mocking/testing.
078   */
079  Quotas getQuotaForTable(TableName table) throws IOException {
080    return QuotaTableUtil.getTableQuota(conn, table);
081  }
082
083  @Override
084  public SpaceQuotaSnapshot getCurrentState(TableName table) {
085    // Defer the "current state" to the chore
086    return chore.getTableQuotaSnapshot(table);
087  }
088
089  @Override
090  public SpaceQuotaSnapshot getTargetState(
091      TableName table, SpaceQuota spaceQuota) throws IOException {
092    rlock.lock();
093    try {
094      final long sizeLimitInBytes = spaceQuota.getSoftLimit();
095      long sum = 0L;
096      for (Entry<RegionInfo,Long> entry : filterBySubject(table)) {
097        sum += entry.getValue();
098      }
099      // Add in the size for any snapshots against this table
100      sum += getSnapshotSizesForTable(table);
101      // Observance is defined as the size of the table being less than the limit
102      SpaceQuotaStatus status = sum <= sizeLimitInBytes ? SpaceQuotaStatus.notInViolation()
103          : new SpaceQuotaStatus(ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy()));
104      return new SpaceQuotaSnapshot(status, sum, sizeLimitInBytes);
105    } finally {
106      rlock.unlock();
107    }
108  }
109
110  /**
111   * Fetches any serialized snapshot sizes from the quota table for the {@code tn} provided. Any
112   * malformed records are skipped with a warning printed out.
113   */
114  long getSnapshotSizesForTable(TableName tn) throws IOException {
115    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
116      Scan s = QuotaTableUtil.createScanForSpaceSnapshotSizes(tn);
117      ResultScanner rs = quotaTable.getScanner(s);
118      try {
119        long size = 0L;
120        // Should just be a single row (for our table)
121        for (Result result : rs) {
122          // May have multiple columns, one for each snapshot
123          CellScanner cs = result.cellScanner();
124          while (cs.advance()) {
125            Cell current = cs.current();
126            try {
127              long snapshotSize = QuotaTableUtil.parseSnapshotSize(current);
128              if (LOG.isTraceEnabled()) {
129                LOG.trace("Saw snapshot size of " + snapshotSize + " for " + current);
130              }
131              size += snapshotSize;
132            } catch (InvalidProtocolBufferException e) {
133              LOG.warn("Failed to parse snapshot size from cell: " + current);
134            }
135          }
136        }
137        return size;
138      } finally {
139        if (null != rs) {
140          rs.close();
141        }
142      }
143    }
144  }
145
146  @Override
147  public Iterable<Entry<RegionInfo,Long>> filterBySubject(TableName table) {
148    rlock.lock();
149    try {
150      return Iterables.filter(regionUsage.entrySet(), new Predicate<Entry<RegionInfo,Long>>() {
151        @Override
152        public boolean apply(Entry<RegionInfo,Long> input) {
153          return table.equals(input.getKey().getTable());
154        }
155      });
156    } finally {
157      rlock.unlock();
158    }
159  }
160
161  @Override
162  public void setCurrentState(TableName table, SpaceQuotaSnapshot snapshot) {
163    // Defer the "current state" to the chore
164    this.chore.setTableQuotaSnapshot(table, snapshot);
165  }
166
167  @Override
168  public void setRegionUsage(Map<RegionInfo,Long> regionUsage) {
169    wlock.lock();
170    try {
171      this.regionUsage = Objects.requireNonNull(regionUsage);
172    } finally {
173      wlock.unlock();
174    }
175  }
176}