1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.rest.client;
21
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.net.URL;
25 import java.util.Collections;
26 import java.util.Map;
27 import java.util.concurrent.ConcurrentHashMap;
28
29 import org.apache.commons.httpclient.Header;
30 import org.apache.commons.httpclient.HttpClient;
31 import org.apache.commons.httpclient.HttpMethod;
32 import org.apache.commons.httpclient.HttpStatus;
33 import org.apache.commons.httpclient.HttpVersion;
34 import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
35 import org.apache.commons.httpclient.URI;
36 import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
37 import org.apache.commons.httpclient.methods.DeleteMethod;
38 import org.apache.commons.httpclient.methods.GetMethod;
39 import org.apache.commons.httpclient.methods.HeadMethod;
40 import org.apache.commons.httpclient.methods.PostMethod;
41 import org.apache.commons.httpclient.methods.PutMethod;
42 import org.apache.commons.httpclient.params.HttpClientParams;
43 import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.hbase.classification.InterfaceAudience;
47 import org.apache.hadoop.hbase.classification.InterfaceStability;
48 import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
49 import org.apache.hadoop.security.authentication.client.AuthenticationException;
50 import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
51
52
53
54
55
56 @InterfaceAudience.Public
57 @InterfaceStability.Stable
58 public class Client {
59 public static final Header[] EMPTY_HEADER_ARRAY = new Header[0];
60
61 private static final Log LOG = LogFactory.getLog(Client.class);
62
63 private HttpClient httpClient;
64 private Cluster cluster;
65 private boolean sslEnabled;
66
67 private Map<String, String> extraHeaders;
68
69 private static final String AUTH_COOKIE = "hadoop.auth";
70 private static final String AUTH_COOKIE_EQ = AUTH_COOKIE + "=";
71 private static final String COOKIE = "Cookie";
72
73
74
75
76 public Client() {
77 this(null);
78 }
79
80 private void initialize(Cluster cluster, boolean sslEnabled) {
81 this.cluster = cluster;
82 this.sslEnabled = sslEnabled;
83 MultiThreadedHttpConnectionManager manager =
84 new MultiThreadedHttpConnectionManager();
85 HttpConnectionManagerParams managerParams = manager.getParams();
86 managerParams.setConnectionTimeout(2000);
87 managerParams.setDefaultMaxConnectionsPerHost(10);
88 managerParams.setMaxTotalConnections(100);
89 extraHeaders = new ConcurrentHashMap<String, String>();
90 this.httpClient = new HttpClient(manager);
91 HttpClientParams clientParams = httpClient.getParams();
92 clientParams.setVersion(HttpVersion.HTTP_1_1);
93
94 }
95
96
97
98
99 public Client(Cluster cluster) {
100 initialize(cluster, false);
101 }
102
103
104
105
106
107
108 public Client(Cluster cluster, boolean sslEnabled) {
109 initialize(cluster, sslEnabled);
110 }
111
112
113
114
115 public void shutdown() {
116 MultiThreadedHttpConnectionManager manager =
117 (MultiThreadedHttpConnectionManager) httpClient.getHttpConnectionManager();
118 manager.shutdown();
119 }
120
121
122
123
124 public HttpClient getHttpClient() {
125 return httpClient;
126 }
127
128
129
130
131
132
133 public void addExtraHeader(final String name, final String value) {
134 extraHeaders.put(name, value);
135 }
136
137
138
139
140 public String getExtraHeader(final String name) {
141 return extraHeaders.get(name);
142 }
143
144
145
146
147 public Map<String, String> getExtraHeaders() {
148 return Collections.unmodifiableMap(extraHeaders);
149 }
150
151
152
153
154 public void removeExtraHeader(final String name) {
155 extraHeaders.remove(name);
156 }
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171 public int executePathOnly(Cluster cluster, HttpMethod method,
172 Header[] headers, String path) throws IOException {
173 IOException lastException;
174 if (cluster.nodes.size() < 1) {
175 throw new IOException("Cluster is empty");
176 }
177 int start = (int)Math.round((cluster.nodes.size() - 1) * Math.random());
178 int i = start;
179 do {
180 cluster.lastHost = cluster.nodes.get(i);
181 try {
182 StringBuilder sb = new StringBuilder();
183 if (sslEnabled) {
184 sb.append("https://");
185 } else {
186 sb.append("http://");
187 }
188 sb.append(cluster.lastHost);
189 sb.append(path);
190 URI uri = new URI(sb.toString(), true);
191 return executeURI(method, headers, uri.toString());
192 } catch (IOException e) {
193 lastException = e;
194 }
195 } while (++i != start && i < cluster.nodes.size());
196 throw lastException;
197 }
198
199
200
201
202
203
204
205
206
207 public int executeURI(HttpMethod method, Header[] headers, String uri)
208 throws IOException {
209 method.setURI(new URI(uri, true));
210 for (Map.Entry<String, String> e: extraHeaders.entrySet()) {
211 method.addRequestHeader(e.getKey(), e.getValue());
212 }
213 if (headers != null) {
214 for (Header header: headers) {
215 method.addRequestHeader(header);
216 }
217 }
218 long startTime = System.currentTimeMillis();
219 int code = httpClient.executeMethod(method);
220 if (code == HttpStatus.SC_UNAUTHORIZED) {
221 LOG.debug("Performing negotiation with the server.");
222 negotiate(method, uri);
223 code = httpClient.executeMethod(method);
224 }
225 long endTime = System.currentTimeMillis();
226 if (LOG.isTraceEnabled()) {
227 LOG.trace(method.getName() + " " + uri + " " + code + " " +
228 method.getStatusText() + " in " + (endTime - startTime) + " ms");
229 }
230 return code;
231 }
232
233
234
235
236
237
238
239
240
241
242
243
244 public int execute(Cluster cluster, HttpMethod method, Header[] headers,
245 String path) throws IOException {
246 if (path.startsWith("/")) {
247 return executePathOnly(cluster, method, headers, path);
248 }
249 return executeURI(method, headers, path);
250 }
251
252
253
254
255
256
257
258 private void negotiate(HttpMethod method, String uri) throws IOException {
259 try {
260 AuthenticatedURL.Token token = new AuthenticatedURL.Token();
261 KerberosAuthenticator authenticator = new KerberosAuthenticator();
262 authenticator.authenticate(new URL(uri), token);
263
264 injectToken(method, token);
265 } catch (AuthenticationException e) {
266 LOG.error("Failed to negotiate with the server.", e);
267 throw new IOException(e);
268 }
269 }
270
271
272
273
274
275
276 private void injectToken(HttpMethod method, AuthenticatedURL.Token token) {
277 String t = token.toString();
278 if (t != null) {
279 if (!t.startsWith("\"")) {
280 t = "\"" + t + "\"";
281 }
282 method.addRequestHeader(COOKIE, AUTH_COOKIE_EQ + t);
283 }
284 }
285
286
287
288
289 public Cluster getCluster() {
290 return cluster;
291 }
292
293
294
295
296 public void setCluster(Cluster cluster) {
297 this.cluster = cluster;
298 }
299
300
301
302
303
304
305
306 public Response head(String path) throws IOException {
307 return head(cluster, path, null);
308 }
309
310
311
312
313
314
315
316
317
318 public Response head(Cluster cluster, String path, Header[] headers)
319 throws IOException {
320 HeadMethod method = new HeadMethod();
321 try {
322 int code = execute(cluster, method, null, path);
323 headers = method.getResponseHeaders();
324 return new Response(code, headers, null);
325 } finally {
326 method.releaseConnection();
327 }
328 }
329
330
331
332
333
334
335
336 public Response get(String path) throws IOException {
337 return get(cluster, path);
338 }
339
340
341
342
343
344
345
346
347 public Response get(Cluster cluster, String path) throws IOException {
348 return get(cluster, path, EMPTY_HEADER_ARRAY);
349 }
350
351
352
353
354
355
356
357
358 public Response get(String path, String accept) throws IOException {
359 return get(cluster, path, accept);
360 }
361
362
363
364
365
366
367
368
369
370 public Response get(Cluster cluster, String path, String accept)
371 throws IOException {
372 Header[] headers = new Header[1];
373 headers[0] = new Header("Accept", accept);
374 return get(cluster, path, headers);
375 }
376
377
378
379
380
381
382
383
384
385 public Response get(String path, Header[] headers) throws IOException {
386 return get(cluster, path, headers);
387 }
388
389
390
391
392
393
394
395
396
397 public Response get(Cluster c, String path, Header[] headers)
398 throws IOException {
399 GetMethod method = new GetMethod();
400 try {
401 int code = execute(c, method, headers, path);
402 headers = method.getResponseHeaders();
403 byte[] body = method.getResponseBody();
404 InputStream in = method.getResponseBodyAsStream();
405 return new Response(code, headers, body, in);
406 } finally {
407 method.releaseConnection();
408 }
409 }
410
411
412
413
414
415
416
417
418
419 public Response put(String path, String contentType, byte[] content)
420 throws IOException {
421 return put(cluster, path, contentType, content);
422 }
423
424
425
426
427
428
429
430
431
432
433 public Response put(Cluster cluster, String path, String contentType,
434 byte[] content) throws IOException {
435 Header[] headers = new Header[1];
436 headers[0] = new Header("Content-Type", contentType);
437 return put(cluster, path, headers, content);
438 }
439
440
441
442
443
444
445
446
447
448
449 public Response put(String path, Header[] headers, byte[] content)
450 throws IOException {
451 return put(cluster, path, headers, content);
452 }
453
454
455
456
457
458
459
460
461
462
463
464 public Response put(Cluster cluster, String path, Header[] headers,
465 byte[] content) throws IOException {
466 PutMethod method = new PutMethod();
467 try {
468 method.setRequestEntity(new ByteArrayRequestEntity(content));
469 int code = execute(cluster, method, headers, path);
470 headers = method.getResponseHeaders();
471 content = method.getResponseBody();
472 return new Response(code, headers, content);
473 } finally {
474 method.releaseConnection();
475 }
476 }
477
478
479
480
481
482
483
484
485
486 public Response post(String path, String contentType, byte[] content)
487 throws IOException {
488 return post(cluster, path, contentType, content);
489 }
490
491
492
493
494
495
496
497
498
499
500 public Response post(Cluster cluster, String path, String contentType,
501 byte[] content) throws IOException {
502 Header[] headers = new Header[1];
503 headers[0] = new Header("Content-Type", contentType);
504 return post(cluster, path, headers, content);
505 }
506
507
508
509
510
511
512
513
514
515
516 public Response post(String path, Header[] headers, byte[] content)
517 throws IOException {
518 return post(cluster, path, headers, content);
519 }
520
521
522
523
524
525
526
527
528
529
530
531 public Response post(Cluster cluster, String path, Header[] headers,
532 byte[] content) throws IOException {
533 PostMethod method = new PostMethod();
534 try {
535 method.setRequestEntity(new ByteArrayRequestEntity(content));
536 int code = execute(cluster, method, headers, path);
537 headers = method.getResponseHeaders();
538 content = method.getResponseBody();
539 return new Response(code, headers, content);
540 } finally {
541 method.releaseConnection();
542 }
543 }
544
545
546
547
548
549
550
551 public Response delete(String path) throws IOException {
552 return delete(cluster, path);
553 }
554
555
556
557
558
559
560
561
562 public Response delete(Cluster cluster, String path) throws IOException {
563 DeleteMethod method = new DeleteMethod();
564 try {
565 int code = execute(cluster, method, null, path);
566 Header[] headers = method.getResponseHeaders();
567 byte[] content = method.getResponseBody();
568 return new Response(code, headers, content);
569 } finally {
570 method.releaseConnection();
571 }
572 }
573 }