001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.createCloseRowBefore; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import org.apache.hadoop.hbase.DoNotRetryIOException; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.HRegionLocation; 029import org.apache.hadoop.hbase.RegionLocations; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.TableNotEnabledException; 032import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 033import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.Pair; 036import org.apache.yetus.audience.InterfaceAudience; 037 038/** 039 * A reversed ScannerCallable which supports backward scanning. 040 */ 041@InterfaceAudience.Private 042public class ReversedScannerCallable extends ScannerCallable { 043 044 private byte[] locationSearchKey; 045 046 /** 047 * @param connection which connection 048 * @param tableName table callable is on 049 * @param scan the scan to execute 050 * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect 051 * metrics 052 * @param rpcFactory to create an {@link com.google.protobuf.RpcController} to talk to the 053 * regionserver 054 * @param replicaId the replica id 055 */ 056 public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, 057 ScanMetrics scanMetrics, RpcControllerFactory rpcFactory, int replicaId) { 058 super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId); 059 } 060 061 @Override 062 public void throwable(Throwable t, boolean retrying) { 063 // for reverse scans, we need to update cache using the search key found for the reverse scan 064 // range in prepare. Otherwise, we will see weird behavior at the table boundaries, 065 // when trying to clear cache for an empty row. 066 if (location != null && locationSearchKey != null) { 067 getConnection().updateCachedLocations(getTableName(), 068 location.getRegionInfo().getRegionName(), locationSearchKey, t, location.getServerName()); 069 } 070 } 071 072 /** 073 * @param reload force reload of server location 074 */ 075 @Override 076 public void prepare(boolean reload) throws IOException { 077 if (Thread.interrupted()) { 078 throw new InterruptedIOException(); 079 } 080 081 if ( 082 reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME) 083 && getConnection().isTableDisabled(getTableName()) 084 ) { 085 throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled."); 086 } 087 088 if (!instantiated || reload) { 089 // we should use range locate if 090 // 1. we do not want the start row 091 // 2. the start row is empty which means we need to locate to the last region. 092 if (scan.includeStartRow() && !isEmptyStartRow(getRow())) { 093 // Just locate the region with the row 094 RegionLocations rl = getRegionLocationsForPrepare(getRow()); 095 this.location = getLocationForReplica(rl); 096 this.locationSearchKey = getRow(); 097 } else { 098 // The locateStart row is an approximation. So we need to search between 099 // that and the actual row in order to really find the last region 100 byte[] locateStartRow = createCloseRowBefore(getRow()); 101 Pair<HRegionLocation, byte[]> lastRegionAndKey = 102 locateLastRegionInRange(locateStartRow, getRow()); 103 this.location = lastRegionAndKey.getFirst(); 104 this.locationSearchKey = lastRegionAndKey.getSecond(); 105 } 106 107 if (location == null || location.getServerName() == null) { 108 throw new IOException("Failed to find location, tableName=" + getTableName() + ", row=" 109 + Bytes.toStringBinary(getRow()) + ", reload=" + reload); 110 } 111 112 setStub(getConnection().getClient(getLocation().getServerName())); 113 checkIfRegionServerIsRemote(); 114 instantiated = true; 115 } 116 117 // check how often we retry. 118 if (reload) { 119 incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); 120 } 121 } 122 123 /** 124 * Get the last region before the endkey, which will be used to execute the reverse scan 125 * @param startKey Starting row in range, inclusive 126 * @param endKey Ending row in range, exclusive 127 * @return The last location, and the rowKey used to find it. May be null, if a region could not 128 * be found. 129 */ 130 private Pair<HRegionLocation, byte[]> locateLastRegionInRange(byte[] startKey, byte[] endKey) 131 throws IOException { 132 final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW); 133 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { 134 throw new IllegalArgumentException( 135 "Invalid range: " + Bytes.toStringBinary(startKey) + " > " + Bytes.toStringBinary(endKey)); 136 } 137 138 HRegionLocation lastRegion = null; 139 byte[] lastFoundKey = null; 140 byte[] currentKey = startKey; 141 142 do { 143 RegionLocations rl = getRegionLocationsForPrepare(currentKey); 144 HRegionLocation regionLocation = getLocationForReplica(rl); 145 if (regionLocation.getRegionInfo().containsRow(currentKey)) { 146 lastFoundKey = currentKey; 147 lastRegion = regionLocation; 148 } else { 149 throw new DoNotRetryIOException( 150 "Does hbase:meta exist hole? Locating row " + Bytes.toStringBinary(currentKey) 151 + " returns incorrect region " + regionLocation.getRegionInfo()); 152 } 153 currentKey = regionLocation.getRegionInfo().getEndKey(); 154 } while ( 155 !Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) 156 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0) 157 ); 158 159 return new Pair<>(lastRegion, lastFoundKey); 160 } 161 162 @Override 163 public ScannerCallable getScannerCallableForReplica(int id) { 164 ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), getTableName(), 165 this.getScan(), this.scanMetrics, rpcControllerFactory, id); 166 r.setCaching(this.getCaching()); 167 return r; 168 } 169}