001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
030import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
031import org.apache.hadoop.hbase.mob.MobFileCache;
032import org.apache.hadoop.hbase.regionserver.HRegion;
033import org.apache.hadoop.hbase.regionserver.RegionScanner;
034import org.apache.hadoop.hbase.util.CommonFSUtils;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * A client scanner for a region opened for read-only on the client side. Assumes region data is not
041 * changing.
042 */
043@InterfaceAudience.Private
044public class ClientSideRegionScanner extends AbstractClientScanner {
045
046  private static final Logger LOG = LoggerFactory.getLogger(ClientSideRegionScanner.class);
047
048  private HRegion region;
049  RegionScanner scanner;
050  List<Cell> values;
051
052  public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir,
053    TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException {
054    // region is immutable, set isolation level
055    scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
056
057    htd = TableDescriptorBuilder.newBuilder(htd).setReadOnly(true).build();
058
059    // open region from the snapshot directory
060    region = HRegion.newHRegion(CommonFSUtils.getTableDir(rootDir, htd.getTableName()), null, fs,
061      conf, hri, htd, null);
062    region.setRestoredRegion(true);
063    // non RS process does not have a block cache, and this a client side scanner,
064    // create one for MapReduce jobs to cache the INDEX block by setting to use
065    // IndexOnlyLruBlockCache and set a value to HBASE_CLIENT_SCANNER_BLOCK_CACHE_SIZE_KEY
066    conf.set(BlockCacheFactory.BLOCKCACHE_POLICY_KEY, "IndexOnlyLRU");
067    conf.setIfUnset(HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY,
068      String.valueOf(HConstants.HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT));
069    // don't allow L2 bucket cache for non RS process to avoid unexpected disk usage.
070    conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
071    region.setBlockCache(BlockCacheFactory.createBlockCache(conf));
072    // we won't initialize the MobFileCache when not running in RS process. so provided an
073    // initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only
074    // initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the
075    // cache for every region although it may hasn't any mob CF, BTW the cache is very light-weight.
076    region.setMobFileCache(new MobFileCache(conf));
077    region.initialize();
078
079    // create an internal region scanner
080    this.scanner = region.getScanner(scan);
081    values = new ArrayList<>();
082
083    if (scanMetrics == null) {
084      initScanMetrics(scan);
085    } else {
086      this.scanMetrics = scanMetrics;
087    }
088    region.startRegionOperation();
089  }
090
091  @Override
092  public Result next() throws IOException {
093    values.clear();
094    scanner.nextRaw(values);
095    if (values.isEmpty()) {
096      // we are done
097      return null;
098    }
099
100    Result result = Result.create(values);
101    if (this.scanMetrics != null) {
102      long resultSize = 0;
103      for (Cell cell : values) {
104        resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell);
105      }
106      this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
107      this.scanMetrics.countOfRowsScanned.incrementAndGet();
108    }
109
110    return result;
111  }
112
113  @Override
114  public void close() {
115    if (this.scanner != null) {
116      try {
117        this.scanner.close();
118        this.scanner = null;
119      } catch (IOException ex) {
120        LOG.warn("Exception while closing scanner", ex);
121      }
122    }
123    if (this.region != null) {
124      try {
125        this.region.closeRegionOperation();
126        this.region.close(true);
127        this.region = null;
128      } catch (IOException ex) {
129        LOG.warn("Exception while closing region", ex);
130      }
131    }
132  }
133
134  HRegion getRegion() {
135    return region;
136  }
137
138  @Override
139  public boolean renewLease() {
140    throw new UnsupportedOperationException();
141  }
142}