001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one or more 005 * contributor license agreements. See the NOTICE file distributed with this 006 * work for additional information regarding copyright ownership. The ASF 007 * licenses this file to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 016 * License for the specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.hadoop.hbase.client; 020 021import static org.apache.hadoop.hbase.client.ConnectionUtils.createCloseRowBefore; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow; 024 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.ArrayList; 028import java.util.List; 029 030import org.apache.hadoop.hbase.DoNotRetryIOException; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.HRegionLocation; 033import org.apache.hadoop.hbase.RegionLocations; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 037import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 038import org.apache.hadoop.hbase.util.Bytes; 039 040 041/** 042 * A reversed ScannerCallable which supports backward scanning. 043 */ 044@InterfaceAudience.Private 045public class ReversedScannerCallable extends ScannerCallable { 046 047 /** 048 * @param connection which connection 049 * @param tableName table callable is on 050 * @param scan the scan to execute 051 * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect 052 * metrics 053 * @param rpcFactory to create an {@link com.google.protobuf.RpcController} to talk to the 054 * regionserver 055 * @param replicaId the replica id 056 */ 057 public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, 058 ScanMetrics scanMetrics, RpcControllerFactory rpcFactory, int replicaId) { 059 super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId); 060 } 061 062 /** 063 * @param reload force reload of server location 064 */ 065 @Override 066 public void prepare(boolean reload) throws IOException { 067 if (Thread.interrupted()) { 068 throw new InterruptedIOException(); 069 } 070 if (!instantiated || reload) { 071 // we should use range locate if 072 // 1. we do not want the start row 073 // 2. the start row is empty which means we need to locate to the last region. 074 if (scan.includeStartRow() && !isEmptyStartRow(getRow())) { 075 // Just locate the region with the row 076 RegionLocations rl = getRegionLocations(reload, getRow()); 077 this.location = getLocationForReplica(rl); 078 if (location == null || location.getServerName() == null) { 079 throw new IOException("Failed to find location, tableName=" 080 + getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload=" 081 + reload); 082 } 083 } else { 084 // Need to locate the regions with the range, and the target location is 085 // the last one which is the previous region of last region scanner 086 byte[] locateStartRow = createCloseRowBefore(getRow()); 087 List<HRegionLocation> locatedRegions = locateRegionsInRange( 088 locateStartRow, getRow(), reload); 089 if (locatedRegions.isEmpty()) { 090 throw new DoNotRetryIOException( 091 "Does hbase:meta exist hole? Couldn't get regions for the range from " 092 + Bytes.toStringBinary(locateStartRow) + " to " 093 + Bytes.toStringBinary(getRow())); 094 } 095 this.location = locatedRegions.get(locatedRegions.size() - 1); 096 } 097 setStub(getConnection().getClient(getLocation().getServerName())); 098 checkIfRegionServerIsRemote(); 099 instantiated = true; 100 } 101 102 // check how often we retry. 103 if (reload) { 104 incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); 105 } 106 } 107 108 /** 109 * Get the corresponding regions for an arbitrary range of keys. 110 * @param startKey Starting row in range, inclusive 111 * @param endKey Ending row in range, exclusive 112 * @param reload force reload of server location 113 * @return A list of HRegionLocation corresponding to the regions that contain 114 * the specified range 115 */ 116 private List<HRegionLocation> locateRegionsInRange(byte[] startKey, 117 byte[] endKey, boolean reload) throws IOException { 118 final boolean endKeyIsEndOfTable = Bytes.equals(endKey, 119 HConstants.EMPTY_END_ROW); 120 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { 121 throw new IllegalArgumentException("Invalid range: " 122 + Bytes.toStringBinary(startKey) + " > " 123 + Bytes.toStringBinary(endKey)); 124 } 125 List<HRegionLocation> regionList = new ArrayList<>(); 126 byte[] currentKey = startKey; 127 do { 128 RegionLocations rl = getRegionLocations(reload, currentKey); 129 HRegionLocation regionLocation = getLocationForReplica(rl); 130 if (regionLocation.getRegionInfo().containsRow(currentKey)) { 131 regionList.add(regionLocation); 132 } else { 133 throw new DoNotRetryIOException( 134 "Does hbase:meta exist hole? Locating row " + Bytes.toStringBinary(currentKey) + 135 " returns incorrect region " + regionLocation.getRegionInfo()); 136 } 137 currentKey = regionLocation.getRegionInfo().getEndKey(); 138 } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) 139 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0)); 140 return regionList; 141 } 142 143 @Override 144 public ScannerCallable getScannerCallableForReplica(int id) { 145 ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), getTableName(), 146 this.getScan(), this.scanMetrics, rpcControllerFactory, id); 147 r.setCaching(this.getCaching()); 148 return r; 149 } 150}