001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.PrivateCellUtil; 029import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 030import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 031import org.apache.hadoop.hbase.mob.MobFileCache; 032import org.apache.hadoop.hbase.regionserver.HRegion; 033import org.apache.hadoop.hbase.regionserver.RegionScanner; 034import org.apache.hadoop.hbase.util.CommonFSUtils; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * A client scanner for a region opened for read-only on the client side. Assumes region data is not 041 * changing. 042 */ 043@InterfaceAudience.Private 044public class ClientSideRegionScanner extends AbstractClientScanner { 045 046 private static final Logger LOG = LoggerFactory.getLogger(ClientSideRegionScanner.class); 047 048 private HRegion region; 049 RegionScanner scanner; 050 List<Cell> values; 051 052 public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir, 053 TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException { 054 // region is immutable, set isolation level 055 scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); 056 057 htd = TableDescriptorBuilder.newBuilder(htd).setReadOnly(true).build(); 058 059 // open region from the snapshot directory 060 region = HRegion.newHRegion(CommonFSUtils.getTableDir(rootDir, htd.getTableName()), null, fs, 061 conf, hri, htd, null); 062 region.setRestoredRegion(true); 063 // non RS process does not have a block cache, and this a client side scanner, 064 // create one for MapReduce jobs to cache the INDEX block by setting to use 065 // IndexOnlyLruBlockCache and set a value to HBASE_CLIENT_SCANNER_BLOCK_CACHE_SIZE_KEY 066 conf.set(BlockCacheFactory.BLOCKCACHE_POLICY_KEY, "IndexOnlyLRU"); 067 conf.setIfUnset(HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY, 068 String.valueOf(HConstants.HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT)); 069 // don't allow L2 bucket cache for non RS process to avoid unexpected disk usage. 070 conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY); 071 region.setBlockCache(BlockCacheFactory.createBlockCache(conf)); 072 // we won't initialize the MobFileCache when not running in RS process. so provided an 073 // initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only 074 // initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the 075 // cache for every region although it may hasn't any mob CF, BTW the cache is very light-weight. 076 region.setMobFileCache(new MobFileCache(conf)); 077 region.initialize(); 078 079 // create an internal region scanner 080 this.scanner = region.getScanner(scan); 081 values = new ArrayList<>(); 082 083 if (scanMetrics == null) { 084 initScanMetrics(scan); 085 } else { 086 this.scanMetrics = scanMetrics; 087 } 088 region.startRegionOperation(); 089 } 090 091 @Override 092 public Result next() throws IOException { 093 values.clear(); 094 scanner.nextRaw(values); 095 if (values.isEmpty()) { 096 // we are done 097 return null; 098 } 099 100 Result result = Result.create(values); 101 if (this.scanMetrics != null) { 102 long resultSize = 0; 103 for (Cell cell : values) { 104 resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell); 105 } 106 this.scanMetrics.countOfBytesInResults.addAndGet(resultSize); 107 this.scanMetrics.countOfRowsScanned.incrementAndGet(); 108 } 109 110 return result; 111 } 112 113 @Override 114 public void close() { 115 if (this.scanner != null) { 116 try { 117 this.scanner.close(); 118 this.scanner = null; 119 } catch (IOException ex) { 120 LOG.warn("Exception while closing scanner", ex); 121 } 122 } 123 if (this.region != null) { 124 try { 125 this.region.closeRegionOperation(); 126 this.region.close(true); 127 this.region = null; 128 } catch (IOException ex) { 129 LOG.warn("Exception while closing region", ex); 130 } 131 } 132 } 133 134 HRegion getRegion() { 135 return region; 136 } 137 138 @Override 139 public boolean renewLease() { 140 throw new UnsupportedOperationException(); 141 } 142}