001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.PrivateCellUtil; 029import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 030import org.apache.hadoop.hbase.io.hfile.BlockCache; 031import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 032import org.apache.hadoop.hbase.mob.MobFileCache; 033import org.apache.hadoop.hbase.regionserver.HRegion; 034import org.apache.hadoop.hbase.regionserver.RegionScanner; 035import org.apache.hadoop.hbase.util.CommonFSUtils; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * A client scanner for a region opened for read-only on the client side. Assumes region data is not 042 * changing. 043 */ 044@InterfaceAudience.Private 045public class ClientSideRegionScanner extends AbstractClientScanner { 046 047 private static final Logger LOG = LoggerFactory.getLogger(ClientSideRegionScanner.class); 048 049 private HRegion region; 050 private MobFileCache mobFileCache; 051 private BlockCache blockCache; 052 RegionScanner scanner; 053 List<Cell> values; 054 boolean hasMore = true; 055 056 public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir, 057 TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException { 058 // region is immutable, set isolation level 059 scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); 060 061 htd = TableDescriptorBuilder.newBuilder(htd).setReadOnly(true).build(); 062 063 // open region from the snapshot directory 064 region = HRegion.newHRegion(CommonFSUtils.getTableDir(rootDir, htd.getTableName()), null, fs, 065 conf, hri, htd, null); 066 region.setRestoredRegion(true); 067 // non RS process does not have a block cache, and this a client side scanner, 068 // create one for MapReduce jobs to cache the INDEX block by setting to use 069 // IndexOnlyLruBlockCache and set a value to HBASE_CLIENT_SCANNER_BLOCK_CACHE_SIZE_KEY 070 conf.set(BlockCacheFactory.BLOCKCACHE_POLICY_KEY, "IndexOnlyLRU"); 071 conf.setIfUnset(HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY, 072 String.valueOf(HConstants.HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT)); 073 // don't allow L2 bucket cache for non RS process to avoid unexpected disk usage. 074 conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY); 075 blockCache = BlockCacheFactory.createBlockCache(conf); 076 region.setBlockCache(blockCache); 077 // we won't initialize the MobFileCache when not running in RS process. so provided an 078 // initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only 079 // initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the 080 // cache for every region although it may hasn't any mob CF, BTW the cache is very light-weight. 081 mobFileCache = new MobFileCache(conf); 082 region.setMobFileCache(mobFileCache); 083 region.initialize(); 084 085 // create an internal region scanner 086 this.scanner = region.getScanner(scan); 087 values = new ArrayList<>(); 088 089 if (scanMetrics == null) { 090 initScanMetrics(scan); 091 } else { 092 this.scanMetrics = scanMetrics; 093 } 094 region.startRegionOperation(); 095 } 096 097 @Override 098 public Result next() throws IOException { 099 do { 100 if (!hasMore) { 101 return null; 102 } 103 values.clear(); 104 this.hasMore = scanner.nextRaw(values); 105 } while (values.isEmpty()); 106 107 Result result = Result.create(values); 108 if (this.scanMetrics != null) { 109 long resultSize = 0; 110 for (Cell cell : values) { 111 resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell); 112 } 113 this.scanMetrics.countOfBytesInResults.addAndGet(resultSize); 114 this.scanMetrics.countOfRowsScanned.incrementAndGet(); 115 } 116 117 return result; 118 } 119 120 @Override 121 public void close() { 122 if (this.scanner != null) { 123 try { 124 this.scanner.close(); 125 this.scanner = null; 126 } catch (IOException ex) { 127 LOG.warn("Exception while closing scanner", ex); 128 } 129 } 130 if (this.region != null) { 131 try { 132 this.region.closeRegionOperation(); 133 this.region.close(true); 134 this.region = null; 135 } catch (IOException ex) { 136 LOG.warn("Exception while closing region", ex); 137 } 138 } 139 140 // In typical region operation, RegionServerServices would handle the lifecycle of 141 // the MobFileCache and BlockCache. In ClientSideRegionScanner, we need to handle 142 // the lifecycle of these components ourselves to avoid resource leaks. 143 if (mobFileCache != null) { 144 mobFileCache.shutdown(); 145 mobFileCache = null; 146 } 147 148 if (blockCache != null) { 149 blockCache.shutdown(); 150 blockCache = null; 151 } 152 } 153 154 HRegion getRegion() { 155 return region; 156 } 157 158 @Override 159 public boolean renewLease() { 160 throw new UnsupportedOperationException(); 161 } 162}