1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.ConnectException;
24 import java.net.SocketTimeoutException;
25
26 import org.apache.hadoop.hbase.DoNotRetryIOException;
27 import org.apache.hadoop.hbase.HBaseIOException;
28 import org.apache.hadoop.hbase.HRegionLocation;
29 import org.apache.hadoop.hbase.NotServingRegionException;
30 import org.apache.hadoop.hbase.RegionLocations;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
33 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
34 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.hbase.util.Bytes;
37
38
39
40
41
42 @InterfaceAudience.Private
43 public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<T> {
44
45 protected final ClusterConnection connection;
46
47 protected final RpcControllerFactory rpcControllerFactory;
48
49 protected AdminService.BlockingInterface stub;
50
51 protected HRegionLocation location;
52
53 protected final TableName tableName;
54 protected final byte[] row;
55 protected final int replicaId;
56
57 protected final static int MIN_WAIT_DEAD_SERVER = 10000;
58
59 public RegionAdminServiceCallable(ClusterConnection connection,
60 RpcControllerFactory rpcControllerFactory, TableName tableName, byte[] row) {
61 this(connection, rpcControllerFactory, null, tableName, row);
62 }
63
64 public RegionAdminServiceCallable(ClusterConnection connection,
65 RpcControllerFactory rpcControllerFactory, HRegionLocation location,
66 TableName tableName, byte[] row) {
67 this(connection, rpcControllerFactory, location,
68 tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
69 }
70
71 public RegionAdminServiceCallable(ClusterConnection connection,
72 RpcControllerFactory rpcControllerFactory, HRegionLocation location,
73 TableName tableName, byte[] row, int replicaId) {
74 this.connection = connection;
75 this.rpcControllerFactory = rpcControllerFactory;
76 this.location = location;
77 this.tableName = tableName;
78 this.row = row;
79 this.replicaId = replicaId;
80 }
81
82 @Override
83 public void prepare(boolean reload) throws IOException {
84 if (Thread.interrupted()) {
85 throw new InterruptedIOException();
86 }
87
88 if (reload || location == null) {
89 location = getLocation(!reload);
90 }
91
92 if (location == null) {
93
94 throw new HBaseIOException(getExceptionMessage());
95 }
96
97 this.setStub(connection.getAdmin(location.getServerName()));
98 }
99
100 protected void setStub(AdminService.BlockingInterface stub) {
101 this.stub = stub;
102 }
103
104 public HRegionLocation getLocation(boolean useCache) throws IOException {
105 RegionLocations rl = getRegionLocations(connection, tableName, row, useCache, replicaId);
106 if (rl == null) {
107 throw new HBaseIOException(getExceptionMessage());
108 }
109 HRegionLocation location = rl.getRegionLocation(replicaId);
110 if (location == null) {
111 throw new HBaseIOException(getExceptionMessage());
112 }
113
114 return location;
115 }
116
117 @Override
118 public void throwable(Throwable t, boolean retrying) {
119 if (t instanceof SocketTimeoutException ||
120 t instanceof ConnectException ||
121 t instanceof RetriesExhaustedException ||
122 (location != null && getConnection().isDeadServer(location.getServerName()))) {
123
124
125
126 if (this.location != null) getConnection().clearCaches(location.getServerName());
127 } else if (t instanceof RegionMovedException) {
128 getConnection().updateCachedLocations(tableName, row, t, location);
129 } else if (t instanceof NotServingRegionException) {
130
131
132 getConnection().deleteCachedRegionLocation(location);
133 }
134 }
135
136
137
138
139 HConnection getConnection() {
140 return this.connection;
141 }
142
143
144 protected String getExceptionMessage() {
145 return "There is no location" + " table=" + tableName
146 + " ,replica=" + replicaId + ", row=" + Bytes.toStringBinary(row);
147 }
148
149 @Override
150 public String getExceptionMessageAdditionalDetail() {
151 return null;
152 }
153
154 @Override
155 public long sleep(long pause, int tries) {
156 long sleep = ConnectionUtils.getPauseTime(pause, tries);
157 if (sleep < MIN_WAIT_DEAD_SERVER
158 && (location == null || connection.isDeadServer(location.getServerName()))) {
159 sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f);
160 }
161 return sleep;
162 }
163
164 public static RegionLocations getRegionLocations(
165 ClusterConnection connection, TableName tableName, byte[] row,
166 boolean useCache, int replicaId)
167 throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
168 RegionLocations rl;
169 try {
170 rl = connection.locateRegion(tableName, row, useCache, true, replicaId);
171 } catch (DoNotRetryIOException e) {
172 throw e;
173 } catch (RetriesExhaustedException e) {
174 throw e;
175 } catch (InterruptedIOException e) {
176 throw e;
177 } catch (IOException e) {
178 throw new RetriesExhaustedException("Can't get the location", e);
179 }
180 if (rl == null) {
181 throw new RetriesExhaustedException("Can't get the locations");
182 }
183
184 return rl;
185 }
186 }