001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one or more 005 * contributor license agreements. See the NOTICE file distributed with this 006 * work for additional information regarding copyright ownership. The ASF 007 * licenses this file to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 016 * License for the specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.hadoop.hbase.client; 020 021import static org.apache.hadoop.hbase.client.ConnectionUtils.createCloseRowBefore; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow; 024 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.ArrayList; 028import java.util.List; 029 030import org.apache.hadoop.hbase.DoNotRetryIOException; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.HRegionLocation; 033import org.apache.hadoop.hbase.RegionLocations; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 037import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 038import org.apache.hadoop.hbase.util.Bytes; 039 040 041/** 042 * A reversed ScannerCallable which supports backward scanning. 043 */ 044@InterfaceAudience.Private 045public class ReversedScannerCallable extends ScannerCallable { 046 047 /** 048 * @param connection 049 * @param tableName 050 * @param scan 051 * @param scanMetrics 052 * @param rpcFactory to create an {@link com.google.protobuf.RpcController} to talk to the 053 * regionserver 054 */ 055 public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, 056 ScanMetrics scanMetrics, RpcControllerFactory rpcFactory) { 057 super(connection, tableName, scan, scanMetrics, rpcFactory); 058 } 059 060 /** 061 * @param connection 062 * @param tableName 063 * @param scan 064 * @param scanMetrics 065 * @param rpcFactory to create an {@link com.google.protobuf.RpcController} to talk to the 066 * regionserver 067 * @param replicaId the replica id 068 */ 069 public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, 070 ScanMetrics scanMetrics, RpcControllerFactory rpcFactory, int replicaId) { 071 super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId); 072 } 073 074 /** 075 * @param reload force reload of server location 076 * @throws IOException 077 */ 078 @Override 079 public void prepare(boolean reload) throws IOException { 080 if (Thread.interrupted()) { 081 throw new InterruptedIOException(); 082 } 083 if (!instantiated || reload) { 084 // we should use range locate if 085 // 1. we do not want the start row 086 // 2. the start row is empty which means we need to locate to the last region. 087 if (scan.includeStartRow() && !isEmptyStartRow(getRow())) { 088 // Just locate the region with the row 089 RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, id, 090 getConnection(), getTableName(), getRow()); 091 this.location = id < rl.size() ? rl.getRegionLocation(id) : null; 092 if (location == null || location.getServerName() == null) { 093 throw new IOException("Failed to find location, tableName=" 094 + getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload=" 095 + reload); 096 } 097 } else { 098 // Need to locate the regions with the range, and the target location is 099 // the last one which is the previous region of last region scanner 100 byte[] locateStartRow = createCloseRowBefore(getRow()); 101 List<HRegionLocation> locatedRegions = locateRegionsInRange( 102 locateStartRow, getRow(), reload); 103 if (locatedRegions.isEmpty()) { 104 throw new DoNotRetryIOException( 105 "Does hbase:meta exist hole? Couldn't get regions for the range from " 106 + Bytes.toStringBinary(locateStartRow) + " to " 107 + Bytes.toStringBinary(getRow())); 108 } 109 this.location = locatedRegions.get(locatedRegions.size() - 1); 110 } 111 setStub(getConnection().getClient(getLocation().getServerName())); 112 checkIfRegionServerIsRemote(); 113 instantiated = true; 114 } 115 116 // check how often we retry. 117 if (reload) { 118 incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); 119 } 120 } 121 122 /** 123 * Get the corresponding regions for an arbitrary range of keys. 124 * @param startKey Starting row in range, inclusive 125 * @param endKey Ending row in range, exclusive 126 * @param reload force reload of server location 127 * @return A list of HRegionLocation corresponding to the regions that contain 128 * the specified range 129 * @throws IOException 130 */ 131 private List<HRegionLocation> locateRegionsInRange(byte[] startKey, 132 byte[] endKey, boolean reload) throws IOException { 133 final boolean endKeyIsEndOfTable = Bytes.equals(endKey, 134 HConstants.EMPTY_END_ROW); 135 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { 136 throw new IllegalArgumentException("Invalid range: " 137 + Bytes.toStringBinary(startKey) + " > " 138 + Bytes.toStringBinary(endKey)); 139 } 140 List<HRegionLocation> regionList = new ArrayList<>(); 141 byte[] currentKey = startKey; 142 do { 143 RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, id, 144 getConnection(), getTableName(), currentKey); 145 HRegionLocation regionLocation = id < rl.size() ? rl.getRegionLocation(id) : null; 146 if (regionLocation != null && regionLocation.getRegionInfo().containsRow(currentKey)) { 147 regionList.add(regionLocation); 148 } else { 149 throw new DoNotRetryIOException("Does hbase:meta exist hole? Locating row " 150 + Bytes.toStringBinary(currentKey) + " returns incorrect region " 151 + (regionLocation == null ? null : regionLocation.getRegionInfo())); 152 } 153 currentKey = regionLocation.getRegionInfo().getEndKey(); 154 } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) 155 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0)); 156 return regionList; 157 } 158 159 @Override 160 public ScannerCallable getScannerCallableForReplica(int id) { 161 ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), getTableName(), 162 this.getScan(), this.scanMetrics, rpcControllerFactory, id); 163 r.setCaching(this.getCaching()); 164 return r; 165 } 166}