001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
030import org.apache.hadoop.hbase.io.hfile.BlockCache;
031import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
032import org.apache.hadoop.hbase.mob.MobFileCache;
033import org.apache.hadoop.hbase.regionserver.HRegion;
034import org.apache.hadoop.hbase.regionserver.RegionScanner;
035import org.apache.hadoop.hbase.util.CommonFSUtils;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * A client scanner for a region opened for read-only on the client side. Assumes region data is not
042 * changing.
043 */
044@InterfaceAudience.Private
045public class ClientSideRegionScanner extends AbstractClientScanner {
046
047  private static final Logger LOG = LoggerFactory.getLogger(ClientSideRegionScanner.class);
048
049  private HRegion region;
050  private MobFileCache mobFileCache;
051  private BlockCache blockCache;
052  RegionScanner scanner;
053  List<Cell> values;
054  boolean hasMore = true;
055
056  public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir,
057    TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException {
058    // region is immutable, set isolation level
059    scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
060
061    htd = TableDescriptorBuilder.newBuilder(htd).setReadOnly(true).build();
062
063    // open region from the snapshot directory
064    region = HRegion.newHRegion(CommonFSUtils.getTableDir(rootDir, htd.getTableName()), null, fs,
065      conf, hri, htd, null);
066    region.setRestoredRegion(true);
067    // non RS process does not have a block cache, and this a client side scanner,
068    // create one for MapReduce jobs to cache the INDEX block by setting to use
069    // IndexOnlyLruBlockCache and set a value to HBASE_CLIENT_SCANNER_BLOCK_CACHE_SIZE_KEY
070    conf.set(BlockCacheFactory.BLOCKCACHE_POLICY_KEY, "IndexOnlyLRU");
071    conf.setIfUnset(HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY,
072      String.valueOf(HConstants.HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT));
073    // don't allow L2 bucket cache for non RS process to avoid unexpected disk usage.
074    conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
075    blockCache = BlockCacheFactory.createBlockCache(conf);
076    region.setBlockCache(blockCache);
077    // we won't initialize the MobFileCache when not running in RS process. so provided an
078    // initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only
079    // initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the
080    // cache for every region although it may hasn't any mob CF, BTW the cache is very light-weight.
081    mobFileCache = new MobFileCache(conf);
082    region.setMobFileCache(mobFileCache);
083    region.initialize();
084
085    // create an internal region scanner
086    this.scanner = region.getScanner(scan);
087    values = new ArrayList<>();
088
089    if (scanMetrics == null) {
090      initScanMetrics(scan);
091    } else {
092      this.scanMetrics = scanMetrics;
093    }
094    region.startRegionOperation();
095  }
096
097  @Override
098  public Result next() throws IOException {
099    do {
100      if (!hasMore) {
101        return null;
102      }
103      values.clear();
104      this.hasMore = scanner.nextRaw(values);
105    } while (values.isEmpty());
106
107    Result result = Result.create(values);
108    if (this.scanMetrics != null) {
109      long resultSize = 0;
110      for (Cell cell : values) {
111        resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell);
112      }
113      this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
114      this.scanMetrics.countOfRowsScanned.incrementAndGet();
115    }
116
117    return result;
118  }
119
120  @Override
121  public void close() {
122    if (this.scanner != null) {
123      try {
124        this.scanner.close();
125        this.scanner = null;
126      } catch (IOException ex) {
127        LOG.warn("Exception while closing scanner", ex);
128      }
129    }
130    if (this.region != null) {
131      try {
132        this.region.closeRegionOperation();
133        this.region.close(true);
134        this.region = null;
135      } catch (IOException ex) {
136        LOG.warn("Exception while closing region", ex);
137      }
138    }
139
140    // In typical region operation, RegionServerServices would handle the lifecycle of
141    // the MobFileCache and BlockCache. In ClientSideRegionScanner, we need to handle
142    // the lifecycle of these components ourselves to avoid resource leaks.
143    if (mobFileCache != null) {
144      mobFileCache.shutdown();
145      mobFileCache = null;
146    }
147
148    if (blockCache != null) {
149      blockCache.shutdown();
150      blockCache = null;
151    }
152  }
153
154  HRegion getRegion() {
155    return region;
156  }
157
158  @Override
159  public boolean renewLease() {
160    throw new UnsupportedOperationException();
161  }
162}