001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.PrivateCellUtil;
032import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
033import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
034import org.apache.hadoop.hbase.io.hfile.BlockCache;
035import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
036import org.apache.hadoop.hbase.mob.MobFileCache;
037import org.apache.hadoop.hbase.regionserver.HRegion;
038import org.apache.hadoop.hbase.regionserver.RegionScanner;
039import org.apache.hadoop.hbase.util.CommonFSUtils;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * A client scanner for a region opened for read-only on the client side. Assumes region data is not
046 * changing.
047 */
048@InterfaceAudience.Private
049public class ClientSideRegionScanner extends AbstractClientScanner {
050
051  private static final Logger LOG = LoggerFactory.getLogger(ClientSideRegionScanner.class);
052
053  private HRegion region;
054  private MobFileCache mobFileCache;
055  private BlockCache blockCache;
056  RegionScanner scanner;
057  List<Cell> values;
058  boolean hasMore = true;
059  private final Set<Path> filesRead;
060
061  public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir,
062    TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException {
063    // region is immutable, set isolation level
064    scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
065
066    htd = TableDescriptorBuilder.newBuilder(htd).setReadOnly(true).build();
067
068    // open region from the snapshot directory
069    region = HRegion.newHRegion(CommonFSUtils.getTableDir(rootDir, htd.getTableName()), null, fs,
070      conf, hri, htd, null, null);
071    region.setRestoredRegion(true);
072    // non RS process does not have a block cache, and this a client side scanner,
073    // create one for MapReduce jobs to cache the INDEX block by setting to use
074    // IndexOnlyLruBlockCache and set a value to HBASE_CLIENT_SCANNER_BLOCK_CACHE_SIZE_KEY
075    conf.set(BlockCacheFactory.BLOCKCACHE_POLICY_KEY, "IndexOnlyLRU");
076    conf.setIfUnset(HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY,
077      String.valueOf(HConstants.HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT));
078    // don't allow L2 bucket cache for non RS process to avoid unexpected disk usage.
079    conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
080    blockCache = BlockCacheFactory.createBlockCache(conf);
081    region.setBlockCache(blockCache);
082    // we won't initialize the MobFileCache when not running in RS process. so provided an
083    // initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only
084    // initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the
085    // cache for every region although it may hasn't any mob CF, BTW the cache is very light-weight.
086    mobFileCache = new MobFileCache(conf);
087    region.setMobFileCache(mobFileCache);
088    region.initialize();
089
090    // create an internal region scanner
091    this.scanner = region.getScanner(scan);
092    this.filesRead = new HashSet<>();
093    values = new ArrayList<>();
094
095    if (scanMetrics == null) {
096      initScanMetrics(scan);
097    } else {
098      this.scanMetrics = scanMetrics;
099      setIsScanMetricsByRegionEnabled(scan.isScanMetricsByRegionEnabled());
100    }
101    if (isScanMetricsByRegionEnabled()) {
102      this.scanMetrics.moveToNextRegion();
103      this.scanMetrics.initScanMetricsRegionInfo(region.getRegionInfo().getEncodedName(), null);
104      // The server name will be null in scan metrics as this is a client side region scanner
105    }
106    region.startRegionOperation();
107  }
108
109  @Override
110  public Result next() throws IOException {
111    do {
112      if (!hasMore) {
113        return null;
114      }
115      values.clear();
116      this.hasMore = scanner.nextRaw(values);
117    } while (values.isEmpty());
118
119    Result result = Result.create(values);
120    if (this.scanMetrics != null) {
121      long resultSize = 0;
122      for (Cell cell : values) {
123        resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell);
124      }
125      this.scanMetrics.addToCounter(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME, resultSize);
126      this.scanMetrics.addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 1);
127    }
128
129    return result;
130  }
131
132  @Override
133  public void close() {
134    if (this.scanner != null) {
135      try {
136        this.scanner.close();
137        this.filesRead.addAll(this.scanner.getFilesRead());
138        this.scanner = null;
139      } catch (IOException ex) {
140        LOG.warn("Exception while closing scanner", ex);
141      }
142    }
143    if (this.region != null) {
144      try {
145        this.region.closeRegionOperation();
146        this.region.close(true);
147        this.region = null;
148      } catch (IOException ex) {
149        LOG.warn("Exception while closing region", ex);
150      }
151    }
152
153    // In typical region operation, RegionServerServices would handle the lifecycle of
154    // the MobFileCache and BlockCache. In ClientSideRegionScanner, we need to handle
155    // the lifecycle of these components ourselves to avoid resource leaks.
156    if (mobFileCache != null) {
157      mobFileCache.shutdown();
158      mobFileCache = null;
159    }
160
161    if (blockCache != null) {
162      blockCache.shutdown();
163      blockCache = null;
164    }
165  }
166
167  HRegion getRegion() {
168    return region;
169  }
170
171  /**
172   * Returns the set of store file paths that were successfully read by the underlying region
173   * scanner. Populated when this scanner is closed.
174   */
175  public Set<Path> getFilesRead() {
176    return Collections.unmodifiableSet(this.filesRead);
177  }
178
179  @Override
180  public boolean renewLease() {
181    throw new UnsupportedOperationException();
182  }
183}