001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.Map;
022import java.util.Map.Entry;
023import java.util.Objects;
024import java.util.concurrent.locks.ReentrantReadWriteLock;
025import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
026import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
027import java.util.stream.Collectors;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellScanner;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.Connection;
032import org.apache.hadoop.hbase.client.RegionInfo;
033import org.apache.hadoop.hbase.client.Result;
034import org.apache.hadoop.hbase.client.ResultScanner;
035import org.apache.hadoop.hbase.client.Scan;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
043
044import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
047
048/**
049 * {@link QuotaSnapshotStore} for tables.
050 */
051@InterfaceAudience.Private
052public class TableQuotaSnapshotStore implements QuotaSnapshotStore<TableName> {
053  private static final Logger LOG = LoggerFactory.getLogger(TableQuotaSnapshotStore.class);
054
055  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
056  private final ReadLock rlock = lock.readLock();
057  private final WriteLock wlock = lock.writeLock();
058
059  private final Connection conn;
060  private final QuotaObserverChore chore;
061  private Map<RegionInfo, Long> regionUsage;
062
063  public TableQuotaSnapshotStore(Connection conn, QuotaObserverChore chore,
064    Map<RegionInfo, Long> regionUsage) {
065    this.conn = Objects.requireNonNull(conn);
066    this.chore = Objects.requireNonNull(chore);
067    this.regionUsage = Objects.requireNonNull(regionUsage);
068  }
069
070  @Override
071  public SpaceQuota getSpaceQuota(TableName subject) throws IOException {
072    Quotas quotas = getQuotaForTable(subject);
073    if (quotas != null && quotas.hasSpace()) {
074      return quotas.getSpace();
075    }
076    return null;
077  }
078
079  /**
080   * Fetches the table quota. Visible for mocking/testing.
081   */
082  Quotas getQuotaForTable(TableName table) throws IOException {
083    return QuotaTableUtil.getTableQuota(conn, table);
084  }
085
086  @Override
087  public SpaceQuotaSnapshot getCurrentState(TableName table) {
088    // Defer the "current state" to the chore
089    return chore.getTableQuotaSnapshot(table);
090  }
091
092  @Override
093  public SpaceQuotaSnapshot getTargetState(TableName table, SpaceQuota spaceQuota)
094    throws IOException {
095    rlock.lock();
096    try {
097      final long sizeLimitInBytes = spaceQuota.getSoftLimit();
098      long sum = 0L;
099      for (Entry<RegionInfo, Long> entry : filterBySubject(table)) {
100        sum += entry.getValue();
101      }
102      // Add in the size for any snapshots against this table
103      sum += getSnapshotSizesForTable(table);
104      // Observance is defined as the size of the table being less than the limit
105      SpaceQuotaStatus status = sum <= sizeLimitInBytes
106        ? SpaceQuotaStatus.notInViolation()
107        : new SpaceQuotaStatus(ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy()));
108      return new SpaceQuotaSnapshot(status, sum, sizeLimitInBytes);
109    } finally {
110      rlock.unlock();
111    }
112  }
113
114  /**
115   * Fetches any serialized snapshot sizes from the quota table for the {@code tn} provided. Any
116   * malformed records are skipped with a warning printed out.
117   */
118  long getSnapshotSizesForTable(TableName tn) throws IOException {
119    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
120      Scan s = QuotaTableUtil.createScanForSpaceSnapshotSizes(tn);
121      ResultScanner rs = quotaTable.getScanner(s);
122      try {
123        long size = 0L;
124        // Should just be a single row (for our table)
125        for (Result result : rs) {
126          // May have multiple columns, one for each snapshot
127          CellScanner cs = result.cellScanner();
128          while (cs.advance()) {
129            Cell current = cs.current();
130            try {
131              long snapshotSize = QuotaTableUtil.parseSnapshotSize(current);
132              if (LOG.isTraceEnabled()) {
133                LOG.trace("Saw snapshot size of " + snapshotSize + " for " + current);
134              }
135              size += snapshotSize;
136            } catch (InvalidProtocolBufferException e) {
137              LOG.warn("Failed to parse snapshot size from cell: " + current);
138            }
139          }
140        }
141        return size;
142      } finally {
143        if (null != rs) {
144          rs.close();
145        }
146      }
147    }
148  }
149
150  @Override
151  public Iterable<Entry<RegionInfo, Long>> filterBySubject(TableName table) {
152    rlock.lock();
153    try {
154      return regionUsage.entrySet().stream()
155        .filter(entry -> table.equals(entry.getKey().getTable())).collect(Collectors.toList());
156    } finally {
157      rlock.unlock();
158    }
159  }
160
161  @Override
162  public void setCurrentState(TableName table, SpaceQuotaSnapshot snapshot) {
163    // Defer the "current state" to the chore
164    this.chore.setTableQuotaSnapshot(table, snapshot);
165  }
166
167  @Override
168  public void setRegionUsage(Map<RegionInfo, Long> regionUsage) {
169    wlock.lock();
170    try {
171      this.regionUsage = Objects.requireNonNull(regionUsage);
172    } finally {
173      wlock.unlock();
174    }
175  }
176}