001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.rest.client;
021
022import java.io.ByteArrayInputStream;
023import java.io.ByteArrayOutputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.URL;
029import java.util.Collections;
030import java.util.Map;
031import java.util.concurrent.ConcurrentHashMap;
032
033import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
034import org.apache.hadoop.security.authentication.client.AuthenticationException;
035import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
036import org.apache.http.Header;
037import org.apache.http.HttpResponse;
038import org.apache.http.HttpStatus;
039import org.apache.http.client.HttpClient;
040import org.apache.http.client.methods.HttpDelete;
041import org.apache.http.client.methods.HttpGet;
042import org.apache.http.client.methods.HttpHead;
043import org.apache.http.client.methods.HttpPost;
044import org.apache.http.client.methods.HttpPut;
045import org.apache.http.client.methods.HttpUriRequest;
046import org.apache.http.entity.InputStreamEntity;
047import org.apache.http.impl.client.DefaultHttpClient;
048import org.apache.http.message.BasicHeader;
049import org.apache.http.params.CoreConnectionPNames;
050import org.apache.http.util.EntityUtils;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * A wrapper around HttpClient which provides some useful function and
057 * semantics for interacting with the REST gateway.
058 */
059@InterfaceAudience.Public
060public class Client {
061  public static final Header[] EMPTY_HEADER_ARRAY = new Header[0];
062
063  private static final Logger LOG = LoggerFactory.getLogger(Client.class);
064
065  private HttpClient httpClient;
066  private Cluster cluster;
067  private boolean sslEnabled;
068  private HttpResponse resp;
069  private HttpGet httpGet = null;
070
071  private Map<String, String> extraHeaders;
072
073  private static final String AUTH_COOKIE = "hadoop.auth";
074  private static final String AUTH_COOKIE_EQ = AUTH_COOKIE + "=";
075  private static final String COOKIE = "Cookie";
076
077  /**
078   * Default Constructor
079   */
080  public Client() {
081    this(null);
082  }
083
084  private void initialize(Cluster cluster, boolean sslEnabled) {
085    this.cluster = cluster;
086    this.sslEnabled = sslEnabled;
087    extraHeaders = new ConcurrentHashMap<>();
088    String clspath = System.getProperty("java.class.path");
089    LOG.debug("classpath " + clspath);
090    this.httpClient = new DefaultHttpClient();
091    this.httpClient.getParams().setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 2000);
092  }
093
094  /**
095   * Constructor
096   * @param cluster the cluster definition
097   */
098  public Client(Cluster cluster) {
099    initialize(cluster, false);
100  }
101
102  /**
103   * Constructor
104   * @param cluster the cluster definition
105   * @param sslEnabled enable SSL or not
106   */
107  public Client(Cluster cluster, boolean sslEnabled) {
108    initialize(cluster, sslEnabled);
109  }
110
111  /**
112   * Shut down the client. Close any open persistent connections.
113   */
114  public void shutdown() {
115  }
116
117  /**
118   * @return the wrapped HttpClient
119   */
120  public HttpClient getHttpClient() {
121    return httpClient;
122  }
123
124  /**
125   * Add extra headers.  These extra headers will be applied to all http
126   * methods before they are removed. If any header is not used any more,
127   * client needs to remove it explicitly.
128   */
129  public void addExtraHeader(final String name, final String value) {
130    extraHeaders.put(name, value);
131  }
132
133  /**
134   * Get an extra header value.
135   */
136  public String getExtraHeader(final String name) {
137    return extraHeaders.get(name);
138  }
139
140  /**
141   * Get all extra headers (read-only).
142   */
143  public Map<String, String> getExtraHeaders() {
144    return Collections.unmodifiableMap(extraHeaders);
145  }
146
147  /**
148   * Remove an extra header.
149   */
150  public void removeExtraHeader(final String name) {
151    extraHeaders.remove(name);
152  }
153
154  /**
155   * Execute a transaction method given only the path. Will select at random
156   * one of the members of the supplied cluster definition and iterate through
157   * the list until a transaction can be successfully completed. The
158   * definition of success here is a complete HTTP transaction, irrespective
159   * of result code.
160   * @param cluster the cluster definition
161   * @param method the transaction method
162   * @param headers HTTP header values to send
163   * @param path the properly urlencoded path
164   * @return the HTTP response code
165   * @throws IOException
166   */
167  public HttpResponse executePathOnly(Cluster cluster, HttpUriRequest method,
168      Header[] headers, String path) throws IOException {
169    IOException lastException;
170    if (cluster.nodes.size() < 1) {
171      throw new IOException("Cluster is empty");
172    }
173    int start = (int)Math.round((cluster.nodes.size() - 1) * Math.random());
174    int i = start;
175    do {
176      cluster.lastHost = cluster.nodes.get(i);
177      try {
178        StringBuilder sb = new StringBuilder();
179        if (sslEnabled) {
180          sb.append("https://");
181        } else {
182          sb.append("http://");
183        }
184        sb.append(cluster.lastHost);
185        sb.append(path);
186        URI uri = new URI(sb.toString());
187        if (method instanceof HttpPut) {
188          HttpPut put = new HttpPut(uri);
189          put.setEntity(((HttpPut) method).getEntity());
190          put.setHeaders(method.getAllHeaders());
191          method = put;
192        } else if (method instanceof HttpGet) {
193          method = new HttpGet(uri);
194        } else if (method instanceof HttpHead) {
195          method = new HttpHead(uri);
196        } else if (method instanceof HttpDelete) {
197          method = new HttpDelete(uri);
198        } else if (method instanceof HttpPost) {
199          HttpPost post = new HttpPost(uri);
200          post.setEntity(((HttpPost) method).getEntity());
201          post.setHeaders(method.getAllHeaders());
202          method = post;
203        }
204        return executeURI(method, headers, uri.toString());
205      } catch (IOException e) {
206        lastException = e;
207      } catch (URISyntaxException use) {
208        lastException = new IOException(use);
209      }
210    } while (++i != start && i < cluster.nodes.size());
211    throw lastException;
212  }
213
214  /**
215   * Execute a transaction method given a complete URI.
216   * @param method the transaction method
217   * @param headers HTTP header values to send
218   * @param uri a properly urlencoded URI
219   * @return the HTTP response code
220   * @throws IOException
221   */
222  public HttpResponse executeURI(HttpUriRequest method, Header[] headers, String uri)
223      throws IOException {
224    // method.setURI(new URI(uri, true));
225    for (Map.Entry<String, String> e: extraHeaders.entrySet()) {
226      method.addHeader(e.getKey(), e.getValue());
227    }
228    if (headers != null) {
229      for (Header header: headers) {
230        method.addHeader(header);
231      }
232    }
233    long startTime = System.currentTimeMillis();
234    if (resp != null) EntityUtils.consumeQuietly(resp.getEntity());
235    resp = httpClient.execute(method);
236    if (resp.getStatusLine().getStatusCode() == HttpStatus.SC_UNAUTHORIZED) {
237      // Authentication error
238      LOG.debug("Performing negotiation with the server.");
239      negotiate(method, uri);
240      resp = httpClient.execute(method);
241    }
242
243    long endTime = System.currentTimeMillis();
244    if (LOG.isTraceEnabled()) {
245      LOG.trace(method.getMethod() + " " + uri + " " + resp.getStatusLine().getStatusCode() + " " +
246          resp.getStatusLine().getReasonPhrase() + " in " + (endTime - startTime) + " ms");
247    }
248    return resp;
249  }
250
251  /**
252   * Execute a transaction method. Will call either <tt>executePathOnly</tt>
253   * or <tt>executeURI</tt> depending on whether a path only is supplied in
254   * 'path', or if a complete URI is passed instead, respectively.
255   * @param cluster the cluster definition
256   * @param method the HTTP method
257   * @param headers HTTP header values to send
258   * @param path the properly urlencoded path or URI
259   * @return the HTTP response code
260   * @throws IOException
261   */
262  public HttpResponse execute(Cluster cluster, HttpUriRequest method, Header[] headers,
263      String path) throws IOException {
264    if (path.startsWith("/")) {
265      return executePathOnly(cluster, method, headers, path);
266    }
267    return executeURI(method, headers, path);
268  }
269
270  /**
271   * Initiate client side Kerberos negotiation with the server.
272   * @param method method to inject the authentication token into.
273   * @param uri the String to parse as a URL.
274   * @throws IOException if unknown protocol is found.
275   */
276  private void negotiate(HttpUriRequest method, String uri) throws IOException {
277    try {
278      AuthenticatedURL.Token token = new AuthenticatedURL.Token();
279      KerberosAuthenticator authenticator = new KerberosAuthenticator();
280      authenticator.authenticate(new URL(uri), token);
281      // Inject the obtained negotiated token in the method cookie
282      injectToken(method, token);
283    } catch (AuthenticationException e) {
284      LOG.error("Failed to negotiate with the server.", e);
285      throw new IOException(e);
286    }
287  }
288
289  /**
290   * Helper method that injects an authentication token to send with the method.
291   * @param method method to inject the authentication token into.
292   * @param token authentication token to inject.
293   */
294  private void injectToken(HttpUriRequest method, AuthenticatedURL.Token token) {
295    String t = token.toString();
296    if (t != null) {
297      if (!t.startsWith("\"")) {
298        t = "\"" + t + "\"";
299      }
300      method.addHeader(COOKIE, AUTH_COOKIE_EQ + t);
301    }
302  }
303
304  /**
305   * @return the cluster definition
306   */
307  public Cluster getCluster() {
308    return cluster;
309  }
310
311  /**
312   * @param cluster the cluster definition
313   */
314  public void setCluster(Cluster cluster) {
315    this.cluster = cluster;
316  }
317
318  /**
319   * Send a HEAD request
320   * @param path the path or URI
321   * @return a Response object with response detail
322   * @throws IOException
323   */
324  public Response head(String path) throws IOException {
325    return head(cluster, path, null);
326  }
327
328  /**
329   * Send a HEAD request
330   * @param cluster the cluster definition
331   * @param path the path or URI
332   * @param headers the HTTP headers to include in the request
333   * @return a Response object with response detail
334   * @throws IOException
335   */
336  public Response head(Cluster cluster, String path, Header[] headers)
337      throws IOException {
338    HttpHead method = new HttpHead(path);
339    try {
340      HttpResponse resp = execute(cluster, method, null, path);
341      return new Response(resp.getStatusLine().getStatusCode(), resp.getAllHeaders(), null);
342    } finally {
343      method.releaseConnection();
344    }
345  }
346
347  /**
348   * Send a GET request
349   * @param path the path or URI
350   * @return a Response object with response detail
351   * @throws IOException
352   */
353  public Response get(String path) throws IOException {
354    return get(cluster, path);
355  }
356
357  /**
358   * Send a GET request
359   * @param cluster the cluster definition
360   * @param path the path or URI
361   * @return a Response object with response detail
362   * @throws IOException
363   */
364  public Response get(Cluster cluster, String path) throws IOException {
365    return get(cluster, path, EMPTY_HEADER_ARRAY);
366  }
367
368  /**
369   * Send a GET request
370   * @param path the path or URI
371   * @param accept Accept header value
372   * @return a Response object with response detail
373   * @throws IOException
374   */
375  public Response get(String path, String accept) throws IOException {
376    return get(cluster, path, accept);
377  }
378
379  /**
380   * Send a GET request
381   * @param cluster the cluster definition
382   * @param path the path or URI
383   * @param accept Accept header value
384   * @return a Response object with response detail
385   * @throws IOException
386   */
387  public Response get(Cluster cluster, String path, String accept)
388      throws IOException {
389    Header[] headers = new Header[1];
390    headers[0] = new BasicHeader("Accept", accept);
391    return get(cluster, path, headers);
392  }
393
394  /**
395   * Send a GET request
396   * @param path the path or URI
397   * @param headers the HTTP headers to include in the request,
398   * <tt>Accept</tt> must be supplied
399   * @return a Response object with response detail
400   * @throws IOException
401   */
402  public Response get(String path, Header[] headers) throws IOException {
403    return get(cluster, path, headers);
404  }
405
406  /**
407   * Returns the response body of the HTTPResponse, if any, as an array of bytes.
408   * If response body is not available or cannot be read, returns <tt>null</tt>
409   *
410   * Note: This will cause the entire response body to be buffered in memory. A
411   * malicious server may easily exhaust all the VM memory. It is strongly
412   * recommended, to use getResponseAsStream if the content length of the response
413   * is unknown or reasonably large.
414   *
415   * @param resp HttpResponse
416   * @return The response body, null if body is empty
417   * @throws IOException If an I/O (transport) problem occurs while obtaining the
418   * response body.
419   */
420  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
421      "NP_LOAD_OF_KNOWN_NULL_VALUE", justification = "null is possible return value")
422  public static byte[] getResponseBody(HttpResponse resp) throws IOException {
423    if (resp.getEntity() == null) return null;
424    try (InputStream instream = resp.getEntity().getContent()) {
425      if (instream != null) {
426        long contentLength = resp.getEntity().getContentLength();
427        if (contentLength > Integer.MAX_VALUE) {
428          //guard integer cast from overflow
429          throw new IOException("Content too large to be buffered: " + contentLength +" bytes");
430        }
431        ByteArrayOutputStream outstream = new ByteArrayOutputStream(
432            contentLength > 0 ? (int) contentLength : 4*1024);
433        byte[] buffer = new byte[4096];
434        int len;
435        while ((len = instream.read(buffer)) > 0) {
436          outstream.write(buffer, 0, len);
437        }
438        outstream.close();
439        return outstream.toByteArray();
440      }
441      return null;
442    }
443  }
444
445  /**
446   * Send a GET request
447   * @param c the cluster definition
448   * @param path the path or URI
449   * @param headers the HTTP headers to include in the request
450   * @return a Response object with response detail
451   * @throws IOException
452   */
453  public Response get(Cluster c, String path, Header[] headers)
454      throws IOException {
455    if (httpGet != null) {
456      httpGet.releaseConnection();
457    }
458    httpGet = new HttpGet(path);
459    HttpResponse resp = execute(c, httpGet, headers, path);
460    return new Response(resp.getStatusLine().getStatusCode(), resp.getAllHeaders(),
461        resp, resp.getEntity() == null ? null : resp.getEntity().getContent());
462  }
463
464  /**
465   * Send a PUT request
466   * @param path the path or URI
467   * @param contentType the content MIME type
468   * @param content the content bytes
469   * @return a Response object with response detail
470   * @throws IOException
471   */
472  public Response put(String path, String contentType, byte[] content)
473      throws IOException {
474    return put(cluster, path, contentType, content);
475  }
476
477  /**
478   * Send a PUT request
479   * @param path the path or URI
480   * @param contentType the content MIME type
481   * @param content the content bytes
482   * @param extraHdr extra Header to send
483   * @return a Response object with response detail
484   * @throws IOException
485   */
486  public Response put(String path, String contentType, byte[] content, Header extraHdr)
487      throws IOException {
488    return put(cluster, path, contentType, content, extraHdr);
489  }
490
491  /**
492   * Send a PUT request
493   * @param cluster the cluster definition
494   * @param path the path or URI
495   * @param contentType the content MIME type
496   * @param content the content bytes
497   * @return a Response object with response detail
498   * @throws IOException for error
499   */
500  public Response put(Cluster cluster, String path, String contentType,
501      byte[] content) throws IOException {
502    Header[] headers = new Header[1];
503    headers[0] = new BasicHeader("Content-Type", contentType);
504    return put(cluster, path, headers, content);
505  }
506
507  /**
508   * Send a PUT request
509   * @param cluster the cluster definition
510   * @param path the path or URI
511   * @param contentType the content MIME type
512   * @param content the content bytes
513   * @param extraHdr additional Header to send
514   * @return a Response object with response detail
515   * @throws IOException for error
516   */
517  public Response put(Cluster cluster, String path, String contentType,
518      byte[] content, Header extraHdr) throws IOException {
519    int cnt = extraHdr == null ? 1 : 2;
520    Header[] headers = new Header[cnt];
521    headers[0] = new BasicHeader("Content-Type", contentType);
522    if (extraHdr != null) {
523      headers[1] = extraHdr;
524    }
525    return put(cluster, path, headers, content);
526  }
527
528  /**
529   * Send a PUT request
530   * @param path the path or URI
531   * @param headers the HTTP headers to include, <tt>Content-Type</tt> must be
532   * supplied
533   * @param content the content bytes
534   * @return a Response object with response detail
535   * @throws IOException
536   */
537  public Response put(String path, Header[] headers, byte[] content)
538      throws IOException {
539    return put(cluster, path, headers, content);
540  }
541
542  /**
543   * Send a PUT request
544   * @param cluster the cluster definition
545   * @param path the path or URI
546   * @param headers the HTTP headers to include, <tt>Content-Type</tt> must be
547   * supplied
548   * @param content the content bytes
549   * @return a Response object with response detail
550   * @throws IOException
551   */
552  public Response put(Cluster cluster, String path, Header[] headers,
553      byte[] content) throws IOException {
554    HttpPut method = new HttpPut(path);
555    try {
556      method.setEntity(new InputStreamEntity(new ByteArrayInputStream(content), content.length));
557      HttpResponse resp = execute(cluster, method, headers, path);
558      headers = resp.getAllHeaders();
559      content = getResponseBody(resp);
560      return new Response(resp.getStatusLine().getStatusCode(), headers, content);
561    } finally {
562      method.releaseConnection();
563    }
564  }
565
566  /**
567   * Send a POST request
568   * @param path the path or URI
569   * @param contentType the content MIME type
570   * @param content the content bytes
571   * @return a Response object with response detail
572   * @throws IOException
573   */
574  public Response post(String path, String contentType, byte[] content)
575      throws IOException {
576    return post(cluster, path, contentType, content);
577  }
578
579  /**
580   * Send a POST request
581   * @param path the path or URI
582   * @param contentType the content MIME type
583   * @param content the content bytes
584   * @param extraHdr additional Header to send
585   * @return a Response object with response detail
586   * @throws IOException
587   */
588  public Response post(String path, String contentType, byte[] content, Header extraHdr)
589      throws IOException {
590    return post(cluster, path, contentType, content, extraHdr);
591  }
592
593  /**
594   * Send a POST request
595   * @param cluster the cluster definition
596   * @param path the path or URI
597   * @param contentType the content MIME type
598   * @param content the content bytes
599   * @return a Response object with response detail
600   * @throws IOException for error
601   */
602  public Response post(Cluster cluster, String path, String contentType,
603      byte[] content) throws IOException {
604    Header[] headers = new Header[1];
605    headers[0] = new BasicHeader("Content-Type", contentType);
606    return post(cluster, path, headers, content);
607  }
608
609  /**
610   * Send a POST request
611   * @param cluster the cluster definition
612   * @param path the path or URI
613   * @param contentType the content MIME type
614   * @param content the content bytes
615   * @param extraHdr additional Header to send
616   * @return a Response object with response detail
617   * @throws IOException for error
618   */
619  public Response post(Cluster cluster, String path, String contentType,
620      byte[] content, Header extraHdr) throws IOException {
621    int cnt = extraHdr == null ? 1 : 2;
622    Header[] headers = new Header[cnt];
623    headers[0] = new BasicHeader("Content-Type", contentType);
624    if (extraHdr != null) {
625      headers[1] = extraHdr;
626    }
627    return post(cluster, path, headers, content);
628  }
629
630  /**
631   * Send a POST request
632   * @param path the path or URI
633   * @param headers the HTTP headers to include, <tt>Content-Type</tt> must be
634   * supplied
635   * @param content the content bytes
636   * @return a Response object with response detail
637   * @throws IOException
638   */
639  public Response post(String path, Header[] headers, byte[] content)
640      throws IOException {
641    return post(cluster, path, headers, content);
642  }
643
644  /**
645   * Send a POST request
646   * @param cluster the cluster definition
647   * @param path the path or URI
648   * @param headers the HTTP headers to include, <tt>Content-Type</tt> must be
649   * supplied
650   * @param content the content bytes
651   * @return a Response object with response detail
652   * @throws IOException
653   */
654  public Response post(Cluster cluster, String path, Header[] headers,
655      byte[] content) throws IOException {
656    HttpPost method = new HttpPost(path);
657    try {
658      method.setEntity(new InputStreamEntity(new ByteArrayInputStream(content), content.length));
659      HttpResponse resp = execute(cluster, method, headers, path);
660      headers = resp.getAllHeaders();
661      content = getResponseBody(resp);
662      return new Response(resp.getStatusLine().getStatusCode(), headers, content);
663    } finally {
664      method.releaseConnection();
665    }
666  }
667
668  /**
669   * Send a DELETE request
670   * @param path the path or URI
671   * @return a Response object with response detail
672   * @throws IOException
673   */
674  public Response delete(String path) throws IOException {
675    return delete(cluster, path);
676  }
677
678  /**
679   * Send a DELETE request
680   * @param path the path or URI
681   * @param extraHdr additional Header to send
682   * @return a Response object with response detail
683   * @throws IOException
684   */
685  public Response delete(String path, Header extraHdr) throws IOException {
686    return delete(cluster, path, extraHdr);
687  }
688
689  /**
690   * Send a DELETE request
691   * @param cluster the cluster definition
692   * @param path the path or URI
693   * @return a Response object with response detail
694   * @throws IOException for error
695   */
696  public Response delete(Cluster cluster, String path) throws IOException {
697    HttpDelete method = new HttpDelete(path);
698    try {
699      HttpResponse resp = execute(cluster, method, null, path);
700      Header[] headers = resp.getAllHeaders();
701      byte[] content = getResponseBody(resp);
702      return new Response(resp.getStatusLine().getStatusCode(), headers, content);
703    } finally {
704      method.releaseConnection();
705    }
706  }
707
708  /**
709   * Send a DELETE request
710   * @param cluster the cluster definition
711   * @param path the path or URI
712   * @return a Response object with response detail
713   * @throws IOException for error
714   */
715  public Response delete(Cluster cluster, String path, Header extraHdr) throws IOException {
716    HttpDelete method = new HttpDelete(path);
717    try {
718      Header[] headers = { extraHdr };
719      HttpResponse resp = execute(cluster, method, headers, path);
720      headers = resp.getAllHeaders();
721      byte[] content = getResponseBody(resp);
722      return new Response(resp.getStatusLine().getStatusCode(), headers, content);
723    } finally {
724      method.releaseConnection();
725    }
726  }
727}