1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.hbase.DoNotRetryIOException;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.HRegionLocation;
31 import org.apache.hadoop.hbase.RegionLocations;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
36 import org.apache.hadoop.hbase.util.Bytes;
37
38
39
40
41
42 @InterfaceAudience.Private
43 public class ReversedScannerCallable extends ScannerCallable {
44
45
46
47
48
49 protected final byte[] locateStartRow;
50
51
52
53
54
55
56
57
58
59
60 public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
61 ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory rpcFactory) {
62 super(connection, tableName, scan, scanMetrics, rpcFactory);
63 this.locateStartRow = locateStartRow;
64 }
65
66
67
68
69
70
71
72
73
74
75
76 public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
77 ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory rpcFactory, int replicaId) {
78 super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId);
79 this.locateStartRow = locateStartRow;
80 }
81
82
83
84
85
86 @Deprecated
87 public ReversedScannerCallable(ClusterConnection connection, TableName tableName,
88 Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) {
89 this(connection, tableName, scan, scanMetrics, locateStartRow, RpcControllerFactory
90 .instantiate(connection.getConfiguration()));
91 }
92
93
94
95
96
97 @Override
98 public void prepare(boolean reload) throws IOException {
99 if (Thread.interrupted()) {
100 throw new InterruptedIOException();
101 }
102 if (!instantiated || reload) {
103 if (locateStartRow == null) {
104
105 RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(reload, id,
106 getConnection(), tableName, row);
107 this.location = id < rl.size() ? rl.getRegionLocation(id) : null;
108 if (this.location == null) {
109 throw new IOException("Failed to find location, tableName="
110 + tableName + ", row=" + Bytes.toStringBinary(row) + ", reload="
111 + reload);
112 }
113 } else {
114
115
116 List<HRegionLocation> locatedRegions = locateRegionsInRange(
117 locateStartRow, row, reload);
118 if (locatedRegions.isEmpty()) {
119 throw new DoNotRetryIOException(
120 "Does hbase:meta exist hole? Couldn't get regions for the range from "
121 + Bytes.toStringBinary(locateStartRow) + " to "
122 + Bytes.toStringBinary(row));
123 }
124 this.location = locatedRegions.get(locatedRegions.size() - 1);
125 }
126 setStub(getConnection().getClient(getLocation().getServerName()));
127 checkIfRegionServerIsRemote();
128 instantiated = true;
129 }
130
131
132
133
134 if (reload && this.scanMetrics != null) {
135 this.scanMetrics.countOfRPCRetries.incrementAndGet();
136 if (isRegionServerRemote) {
137 this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
138 }
139 }
140 }
141
142
143
144
145
146
147
148
149
150
151 private List<HRegionLocation> locateRegionsInRange(byte[] startKey,
152 byte[] endKey, boolean reload) throws IOException {
153 final boolean endKeyIsEndOfTable = Bytes.equals(endKey,
154 HConstants.EMPTY_END_ROW);
155 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
156 throw new IllegalArgumentException("Invalid range: "
157 + Bytes.toStringBinary(startKey) + " > "
158 + Bytes.toStringBinary(endKey));
159 }
160 List<HRegionLocation> regionList = new ArrayList<HRegionLocation>();
161 byte[] currentKey = startKey;
162 do {
163 RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(reload, id,
164 getConnection(), tableName, currentKey);
165 HRegionLocation regionLocation = id < rl.size() ? rl.getRegionLocation(id) : null;
166 if (regionLocation != null && regionLocation.getRegionInfo().containsRow(currentKey)) {
167 regionList.add(regionLocation);
168 } else {
169 throw new DoNotRetryIOException("Does hbase:meta exist hole? Locating row "
170 + Bytes.toStringBinary(currentKey) + " returns incorrect region "
171 + regionLocation.getRegionInfo());
172 }
173 currentKey = regionLocation.getRegionInfo().getEndKey();
174 } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
175 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
176 return regionList;
177 }
178
179 @Override
180 public ScannerCallable getScannerCallableForReplica(int id) {
181 ReversedScannerCallable r = new ReversedScannerCallable(this.cConnection, this.tableName,
182 this.getScan(), this.scanMetrics, this.locateStartRow, controllerFactory, id);
183 r.setCaching(this.getCaching());
184 return r;
185 }
186 }