001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
030import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
031import org.apache.hadoop.hbase.io.hfile.BlockCache;
032import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
033import org.apache.hadoop.hbase.mob.MobFileCache;
034import org.apache.hadoop.hbase.regionserver.HRegion;
035import org.apache.hadoop.hbase.regionserver.RegionScanner;
036import org.apache.hadoop.hbase.util.CommonFSUtils;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * A client scanner for a region opened for read-only on the client side. Assumes region data is not
043 * changing.
044 */
045@InterfaceAudience.Private
046public class ClientSideRegionScanner extends AbstractClientScanner {
047
048  private static final Logger LOG = LoggerFactory.getLogger(ClientSideRegionScanner.class);
049
050  private HRegion region;
051  private MobFileCache mobFileCache;
052  private BlockCache blockCache;
053  RegionScanner scanner;
054  List<Cell> values;
055  boolean hasMore = true;
056
057  public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir,
058    TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException {
059    // region is immutable, set isolation level
060    scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
061
062    htd = TableDescriptorBuilder.newBuilder(htd).setReadOnly(true).build();
063
064    // open region from the snapshot directory
065    region = HRegion.newHRegion(CommonFSUtils.getTableDir(rootDir, htd.getTableName()), null, fs,
066      conf, hri, htd, null);
067    region.setRestoredRegion(true);
068    // non RS process does not have a block cache, and this a client side scanner,
069    // create one for MapReduce jobs to cache the INDEX block by setting to use
070    // IndexOnlyLruBlockCache and set a value to HBASE_CLIENT_SCANNER_BLOCK_CACHE_SIZE_KEY
071    conf.set(BlockCacheFactory.BLOCKCACHE_POLICY_KEY, "IndexOnlyLRU");
072    conf.setIfUnset(HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY,
073      String.valueOf(HConstants.HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT));
074    // don't allow L2 bucket cache for non RS process to avoid unexpected disk usage.
075    conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
076    blockCache = BlockCacheFactory.createBlockCache(conf);
077    region.setBlockCache(blockCache);
078    // we won't initialize the MobFileCache when not running in RS process. so provided an
079    // initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only
080    // initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the
081    // cache for every region although it may hasn't any mob CF, BTW the cache is very light-weight.
082    mobFileCache = new MobFileCache(conf);
083    region.setMobFileCache(mobFileCache);
084    region.initialize();
085
086    // create an internal region scanner
087    this.scanner = region.getScanner(scan);
088    values = new ArrayList<>();
089
090    if (scanMetrics == null) {
091      initScanMetrics(scan);
092    } else {
093      this.scanMetrics = scanMetrics;
094      setIsScanMetricsByRegionEnabled(scan.isScanMetricsByRegionEnabled());
095    }
096    if (isScanMetricsByRegionEnabled()) {
097      this.scanMetrics.moveToNextRegion();
098      this.scanMetrics.initScanMetricsRegionInfo(region.getRegionInfo().getEncodedName(), null);
099      // The server name will be null in scan metrics as this is a client side region scanner
100    }
101    region.startRegionOperation();
102  }
103
104  @Override
105  public Result next() throws IOException {
106    do {
107      if (!hasMore) {
108        return null;
109      }
110      values.clear();
111      this.hasMore = scanner.nextRaw(values);
112    } while (values.isEmpty());
113
114    Result result = Result.create(values);
115    if (this.scanMetrics != null) {
116      long resultSize = 0;
117      for (Cell cell : values) {
118        resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell);
119      }
120      this.scanMetrics.addToCounter(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME, resultSize);
121      this.scanMetrics.addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 1);
122    }
123
124    return result;
125  }
126
127  @Override
128  public void close() {
129    if (this.scanner != null) {
130      try {
131        this.scanner.close();
132        this.scanner = null;
133      } catch (IOException ex) {
134        LOG.warn("Exception while closing scanner", ex);
135      }
136    }
137    if (this.region != null) {
138      try {
139        this.region.closeRegionOperation();
140        this.region.close(true);
141        this.region = null;
142      } catch (IOException ex) {
143        LOG.warn("Exception while closing region", ex);
144      }
145    }
146
147    // In typical region operation, RegionServerServices would handle the lifecycle of
148    // the MobFileCache and BlockCache. In ClientSideRegionScanner, we need to handle
149    // the lifecycle of these components ourselves to avoid resource leaks.
150    if (mobFileCache != null) {
151      mobFileCache.shutdown();
152      mobFileCache = null;
153    }
154
155    if (blockCache != null) {
156      blockCache.shutdown();
157      blockCache = null;
158    }
159  }
160
161  HRegion getRegion() {
162    return region;
163  }
164
165  @Override
166  public boolean renewLease() {
167    throw new UnsupportedOperationException();
168  }
169}