1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.hbase.DoNotRetryIOException;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.HRegionLocation;
31 import org.apache.hadoop.hbase.RegionLocations;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
36 import org.apache.hadoop.hbase.util.Bytes;
37
38
39
40
41
42 @InterfaceAudience.Private
43 public class ReversedScannerCallable extends ScannerCallable {
44
45
46
47
48
49 protected final byte[] locateStartRow;
50
51
52
53
54
55
56
57
58
59
60 public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
61 ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory rpcFactory) {
62 super(connection, tableName, scan, scanMetrics, rpcFactory);
63 this.locateStartRow = locateStartRow;
64 }
65
66
67
68
69
70
71
72
73
74
75
76 public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
77 ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory rpcFactory, int replicaId) {
78 super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId);
79 this.locateStartRow = locateStartRow;
80 }
81
82
83
84
85
86 @Deprecated
87 public ReversedScannerCallable(ClusterConnection connection, TableName tableName,
88 Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) {
89 this(connection, tableName, scan, scanMetrics, locateStartRow, RpcControllerFactory
90 .instantiate(connection.getConfiguration()));
91 }
92
93
94
95
96
97 @Override
98 public void prepare(boolean reload) throws IOException {
99 if (Thread.interrupted()) {
100 throw new InterruptedIOException();
101 }
102 if (!instantiated || reload) {
103 if (locateStartRow == null) {
104
105 RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, id,
106 getConnection(), getTableName(), getRow());
107 this.location = id < rl.size() ? rl.getRegionLocation(id) : null;
108 if (this.location == null) {
109 throw new IOException("Failed to find location, tableName="
110 + tableName + ", row=" + Bytes.toStringBinary(row) + ", reload="
111 + reload);
112 }
113 } else {
114
115
116 List<HRegionLocation> locatedRegions = locateRegionsInRange(
117 locateStartRow, row, reload);
118 if (locatedRegions.isEmpty()) {
119 throw new DoNotRetryIOException(
120 "Does hbase:meta exist hole? Couldn't get regions for the range from "
121 + Bytes.toStringBinary(locateStartRow) + " to "
122 + Bytes.toStringBinary(row));
123 }
124 this.location = locatedRegions.get(locatedRegions.size() - 1);
125 }
126 setStub(getConnection().getClient(getLocation().getServerName()));
127 checkIfRegionServerIsRemote();
128 instantiated = true;
129 }
130
131
132
133
134 if (reload && this.scanMetrics != null) {
135 this.scanMetrics.countOfRPCRetries.incrementAndGet();
136 if (isRegionServerRemote) {
137 this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
138 }
139 }
140 }
141
142
143
144
145
146
147
148
149
150
151 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
152 justification="I thought I'd fixed it but FB still complains; see below")
153 private List<HRegionLocation> locateRegionsInRange(byte[] startKey,
154 byte[] endKey, boolean reload) throws IOException {
155 final boolean endKeyIsEndOfTable = Bytes.equals(endKey,
156 HConstants.EMPTY_END_ROW);
157 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
158 throw new IllegalArgumentException("Invalid range: "
159 + Bytes.toStringBinary(startKey) + " > "
160 + Bytes.toStringBinary(endKey));
161 }
162 List<HRegionLocation> regionList = new ArrayList<HRegionLocation>();
163 byte[] currentKey = startKey;
164 do {
165 RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, id,
166 getConnection(), getTableName(), currentKey);
167 HRegionLocation regionLocation = id < rl.size() ? rl.getRegionLocation(id) : null;
168 if (regionLocation != null && regionLocation.getRegionInfo().containsRow(currentKey)) {
169 regionList.add(regionLocation);
170 } else {
171
172 throw new DoNotRetryIOException("Does hbase:meta exist hole? Locating row "
173 + Bytes.toStringBinary(currentKey) + " returns incorrect region "
174 + (regionLocation != null? regionLocation.getRegionInfo(): null));
175 }
176 currentKey = regionLocation.getRegionInfo().getEndKey();
177 } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
178 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
179 return regionList;
180 }
181
182 @Override
183 public ScannerCallable getScannerCallableForReplica(int id) {
184 ReversedScannerCallable r = new ReversedScannerCallable(this.cConnection, this.tableName,
185 this.getScan(), this.scanMetrics, this.locateStartRow, controllerFactory, id);
186 r.setCaching(this.getCaching());
187 return r;
188 }
189 }