001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.PrivateCellUtil; 032import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 033import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; 034import org.apache.hadoop.hbase.io.hfile.BlockCache; 035import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 036import org.apache.hadoop.hbase.mob.MobFileCache; 037import org.apache.hadoop.hbase.regionserver.HRegion; 038import org.apache.hadoop.hbase.regionserver.RegionScanner; 039import org.apache.hadoop.hbase.util.CommonFSUtils; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * A client scanner for a region opened for read-only on the client side. Assumes region data is not 046 * changing. 047 */ 048@InterfaceAudience.Private 049public class ClientSideRegionScanner extends AbstractClientScanner { 050 051 private static final Logger LOG = LoggerFactory.getLogger(ClientSideRegionScanner.class); 052 053 private HRegion region; 054 private MobFileCache mobFileCache; 055 private BlockCache blockCache; 056 RegionScanner scanner; 057 List<Cell> values; 058 boolean hasMore = true; 059 private final Set<Path> filesRead; 060 061 public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir, 062 TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException { 063 // region is immutable, set isolation level 064 scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); 065 066 htd = TableDescriptorBuilder.newBuilder(htd).setReadOnly(true).build(); 067 068 // open region from the snapshot directory 069 region = HRegion.newHRegion(CommonFSUtils.getTableDir(rootDir, htd.getTableName()), null, fs, 070 conf, hri, htd, null, null); 071 region.setRestoredRegion(true); 072 // non RS process does not have a block cache, and this a client side scanner, 073 // create one for MapReduce jobs to cache the INDEX block by setting to use 074 // IndexOnlyLruBlockCache and set a value to HBASE_CLIENT_SCANNER_BLOCK_CACHE_SIZE_KEY 075 conf.set(BlockCacheFactory.BLOCKCACHE_POLICY_KEY, "IndexOnlyLRU"); 076 conf.setIfUnset(HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY, 077 String.valueOf(HConstants.HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT)); 078 // don't allow L2 bucket cache for non RS process to avoid unexpected disk usage. 079 conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY); 080 blockCache = BlockCacheFactory.createBlockCache(conf); 081 region.setBlockCache(blockCache); 082 // we won't initialize the MobFileCache when not running in RS process. so provided an 083 // initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only 084 // initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the 085 // cache for every region although it may hasn't any mob CF, BTW the cache is very light-weight. 086 mobFileCache = new MobFileCache(conf); 087 region.setMobFileCache(mobFileCache); 088 region.initialize(); 089 090 // create an internal region scanner 091 this.scanner = region.getScanner(scan); 092 this.filesRead = new HashSet<>(); 093 values = new ArrayList<>(); 094 095 if (scanMetrics == null) { 096 initScanMetrics(scan); 097 } else { 098 this.scanMetrics = scanMetrics; 099 setIsScanMetricsByRegionEnabled(scan.isScanMetricsByRegionEnabled()); 100 } 101 if (isScanMetricsByRegionEnabled()) { 102 this.scanMetrics.moveToNextRegion(); 103 this.scanMetrics.initScanMetricsRegionInfo(region.getRegionInfo().getEncodedName(), null); 104 // The server name will be null in scan metrics as this is a client side region scanner 105 } 106 region.startRegionOperation(); 107 } 108 109 @Override 110 public Result next() throws IOException { 111 do { 112 if (!hasMore) { 113 return null; 114 } 115 values.clear(); 116 this.hasMore = scanner.nextRaw(values); 117 } while (values.isEmpty()); 118 119 Result result = Result.create(values); 120 if (this.scanMetrics != null) { 121 long resultSize = 0; 122 for (Cell cell : values) { 123 resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell); 124 } 125 this.scanMetrics.addToCounter(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME, resultSize); 126 this.scanMetrics.addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 1); 127 } 128 129 return result; 130 } 131 132 @Override 133 public void close() { 134 if (this.scanner != null) { 135 try { 136 this.scanner.close(); 137 this.filesRead.addAll(this.scanner.getFilesRead()); 138 this.scanner = null; 139 } catch (IOException ex) { 140 LOG.warn("Exception while closing scanner", ex); 141 } 142 } 143 if (this.region != null) { 144 try { 145 this.region.closeRegionOperation(); 146 this.region.close(true); 147 this.region = null; 148 } catch (IOException ex) { 149 LOG.warn("Exception while closing region", ex); 150 } 151 } 152 153 // In typical region operation, RegionServerServices would handle the lifecycle of 154 // the MobFileCache and BlockCache. In ClientSideRegionScanner, we need to handle 155 // the lifecycle of these components ourselves to avoid resource leaks. 156 if (mobFileCache != null) { 157 mobFileCache.shutdown(); 158 mobFileCache = null; 159 } 160 161 if (blockCache != null) { 162 blockCache.shutdown(); 163 blockCache = null; 164 } 165 } 166 167 HRegion getRegion() { 168 return region; 169 } 170 171 /** 172 * Returns the set of store file paths that were successfully read by the underlying region 173 * scanner. Populated when this scanner is closed. 174 */ 175 public Set<Path> getFilesRead() { 176 return Collections.unmodifiableSet(this.filesRead); 177 } 178 179 @Override 180 public boolean renewLease() { 181 throw new UnsupportedOperationException(); 182 } 183}