001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.thrift2.client;
020
021import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_SOCKET_TIMEOUT_CONNECT;
022import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.lang.reflect.Constructor;
027import java.net.UnknownHostException;
028import java.util.Arrays;
029import java.util.HashMap;
030import java.util.Map;
031import java.util.concurrent.ExecutorService;
032
033import javax.net.ssl.SSLException;
034
035import org.apache.commons.lang3.NotImplementedException;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.BufferedMutator;
041import org.apache.hadoop.hbase.client.BufferedMutatorParams;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionUtils;
044import org.apache.hadoop.hbase.client.RegionLocator;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TableBuilder;
047import org.apache.hadoop.hbase.security.User;
048import org.apache.hadoop.hbase.thrift.Constants;
049import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
050import org.apache.hadoop.hbase.util.Pair;
051import org.apache.http.HttpRequest;
052import org.apache.http.client.HttpClient;
053import org.apache.http.client.config.RequestConfig;
054import org.apache.http.client.utils.HttpClientUtils;
055import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
056import org.apache.http.impl.client.HttpClientBuilder;
057import org.apache.http.protocol.HttpContext;
058import org.apache.thrift.protocol.TBinaryProtocol;
059import org.apache.thrift.protocol.TCompactProtocol;
060import org.apache.thrift.protocol.TProtocol;
061import org.apache.thrift.transport.TFramedTransport;
062import org.apache.thrift.transport.THttpClient;
063import org.apache.thrift.transport.TSocket;
064import org.apache.thrift.transport.TTransport;
065import org.apache.thrift.transport.TTransportException;
066import org.apache.yetus.audience.InterfaceAudience;
067
068import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
069
070@InterfaceAudience.Private
071public class ThriftConnection implements Connection {
072  private Configuration conf;
073  private User user;
074  // For HTTP protocol
075  private HttpClient httpClient;
076  private boolean httpClientCreated = false;
077  private boolean isClosed = false;
078
079  private String host;
080  private int port;
081  private boolean isFramed = false;
082  private boolean isCompact = false;
083
084  private ThriftClientBuilder clientBuilder;
085
086  private int operationTimeout;
087  private int connectTimeout;
088
089  public ThriftConnection(Configuration conf, ExecutorService pool, final User user)
090      throws IOException {
091    this.conf = conf;
092    this.user = user;
093    this.host = conf.get(Constants.HBASE_THRIFT_SERVER_NAME);
094    this.port = conf.getInt(Constants.HBASE_THRIFT_SERVER_PORT, -1);
095    Preconditions.checkArgument(port > 0);
096    Preconditions.checkArgument(host != null);
097    this.isFramed = conf.getBoolean(Constants.FRAMED_CONF_KEY, Constants.FRAMED_CONF_DEFAULT);
098    this.isCompact = conf.getBoolean(Constants.COMPACT_CONF_KEY, Constants.COMPACT_CONF_DEFAULT);
099    this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
100        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
101    this.connectTimeout = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
102
103    String className = conf.get(Constants.HBASE_THRIFT_CLIENT_BUIDLER_CLASS,
104        DefaultThriftClientBuilder.class.getName());
105    try {
106      Class<?> clazz = Class.forName(className);
107      Constructor<?> constructor = clazz
108          .getDeclaredConstructor(ThriftConnection.class);
109      constructor.setAccessible(true);
110      clientBuilder = (ThriftClientBuilder) constructor.newInstance(this);
111    }catch (Exception e) {
112      throw new IOException(e);
113    }
114  }
115
116  public synchronized void setHttpClient(HttpClient httpClient) {
117    this.httpClient = httpClient;
118  }
119
120  @Override
121  public Configuration getConfiguration() {
122    return conf;
123  }
124
125  public String getHost() {
126    return host;
127  }
128
129  public int getPort() {
130    return port;
131  }
132
133  public boolean isFramed() {
134    return isFramed;
135  }
136
137  public boolean isCompact() {
138    return isCompact;
139  }
140
141  public int getOperationTimeout() {
142    return operationTimeout;
143  }
144
145  public int getConnectTimeout() {
146    return connectTimeout;
147  }
148
149  public ThriftClientBuilder getClientBuilder() {
150    return clientBuilder;
151  }
152
153  /**
154   * the default thrift client builder.
155   * One can extend the ThriftClientBuilder to builder custom client, implement
156   * features like authentication(hbase-examples/thrift/DemoClient)
157   *
158   */
159  public static class DefaultThriftClientBuilder extends ThriftClientBuilder  {
160
161    @Override
162    public Pair<THBaseService.Client, TTransport> getClient() throws IOException {
163      TSocket sock = new TSocket(connection.getHost(), connection.getPort());
164      sock.setSocketTimeout(connection.getOperationTimeout());
165      sock.setConnectTimeout(connection.getConnectTimeout());
166      TTransport tTransport = sock;
167      if (connection.isFramed()) {
168        tTransport = new TFramedTransport(tTransport);
169      }
170      try {
171        sock.open();
172      } catch (TTransportException e) {
173        throw new IOException(e);
174      }
175      TProtocol prot;
176      if (connection.isCompact()) {
177        prot = new TCompactProtocol(tTransport);
178      } else {
179        prot = new TBinaryProtocol(tTransport);
180      }
181      THBaseService.Client client = new THBaseService.Client(prot);
182      return new Pair<>(client, tTransport);
183    }
184
185    public DefaultThriftClientBuilder(ThriftConnection connection) {
186      super(connection);
187    }
188  }
189
190  /**
191   * the default thrift http client builder.
192   * One can extend the ThriftClientBuilder to builder custom http client, implement
193   * features like authentication or 'DoAs'(hbase-examples/thrift/HttpDoAsClient)
194   *
195   */
196  public static class HTTPThriftClientBuilder extends ThriftClientBuilder {
197    Map<String,String> customHeader = new HashMap<>();
198
199    public HTTPThriftClientBuilder(ThriftConnection connection) {
200      super(connection);
201    }
202
203    public void addCostumHeader(String key, String value) {
204      customHeader.put(key, value);
205    }
206
207    @Override
208    public Pair<THBaseService.Client, TTransport> getClient() throws IOException {
209      Preconditions.checkArgument(connection.getHost().startsWith("http"),
210          "http client host must start with http or https");
211      String url = connection.getHost() + ":" + connection.getPort();
212      try {
213        THttpClient httpClient = new THttpClient(url, connection.getHttpClient());
214        for (Map.Entry<String, String> header : customHeader.entrySet()) {
215          httpClient.setCustomHeader(header.getKey(), header.getValue());
216        }
217        httpClient.open();
218        TProtocol prot = new TBinaryProtocol(httpClient);
219        THBaseService.Client client = new THBaseService.Client(prot);
220        return new Pair<>(client, httpClient);
221      } catch (TTransportException e) {
222        throw new IOException(e);
223      }
224
225    }
226  }
227
228  /**
229   * Get a ThriftAdmin, ThriftAdmin is NOT thread safe
230   * @return a ThriftAdmin
231   * @throws IOException IOException
232   */
233  @Override
234  public Admin getAdmin() throws IOException {
235    Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient();
236    return new ThriftAdmin(client.getFirst(), client.getSecond(), conf);
237  }
238
239  public static class DelayRetryHandler extends DefaultHttpRequestRetryHandler {
240    private long pause;
241
242    public DelayRetryHandler(int retryCount, long pause) {
243      super(retryCount, true, Arrays.asList(
244          InterruptedIOException.class,
245          UnknownHostException.class,
246          SSLException.class));
247      this.pause = pause;
248    }
249
250    @Override
251    public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
252      // Don't sleep for retrying the first time
253      if (executionCount > 1 && pause > 0) {
254        try {
255          long sleepTime = ConnectionUtils.getPauseTime(pause, executionCount - 1);
256          Thread.sleep(sleepTime);
257        } catch (InterruptedException ie) {
258          //reset interrupt marker
259          Thread.currentThread().interrupt();
260        }
261      }
262      return super.retryRequest(exception, executionCount, context);
263    }
264
265    @Override
266    protected boolean handleAsIdempotent(HttpRequest request) {
267      return true;
268    }
269  }
270
271  public synchronized HttpClient getHttpClient() {
272    if (httpClient != null) {
273      return httpClient;
274    }
275    int retry = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
276        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
277    long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 5);
278    HttpClientBuilder builder = HttpClientBuilder.create();
279    RequestConfig.Builder requestBuilder = RequestConfig.custom();
280    requestBuilder = requestBuilder.setConnectTimeout(getConnectTimeout());
281    requestBuilder = requestBuilder.setSocketTimeout(getOperationTimeout());
282    builder.setRetryHandler(new DelayRetryHandler(retry, pause));
283    builder.setDefaultRequestConfig(requestBuilder.build());
284    httpClient = builder.build();
285    httpClientCreated = true;
286    return httpClient;
287  }
288
289  @Override
290  public synchronized void close() throws IOException {
291    if (httpClient != null && httpClientCreated) {
292      HttpClientUtils.closeQuietly(httpClient);
293    }
294    isClosed = true;
295  }
296
297  @Override
298  public boolean isClosed() {
299    return isClosed;
300  }
301
302  /**
303   * Get a TableBuider to build ThriftTable, ThriftTable is NOT thread safe
304   * @return a TableBuilder
305   * @throws IOException IOException
306   */
307  @Override
308  public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
309    return new TableBuilder() {
310      @Override
311      public TableBuilder setOperationTimeout(int timeout) {
312        return this;
313      }
314
315      @Override
316      public TableBuilder setRpcTimeout(int timeout) {
317        return this;
318      }
319
320      @Override
321      public TableBuilder setReadRpcTimeout(int timeout) {
322        return this;
323      }
324
325      @Override
326      public TableBuilder setWriteRpcTimeout(int timeout) {
327        return this;
328      }
329
330      @Override
331      public Table build() {
332        try {
333          Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient();
334          return new ThriftTable(tableName, client.getFirst(), client.getSecond(), conf);
335        } catch (IOException ioE) {
336          throw new RuntimeException(ioE);
337        }
338
339      }
340    };
341  }
342
343  @Override
344  public void abort(String why, Throwable e) {
345
346  }
347
348  @Override
349  public boolean isAborted() {
350    return false;
351  }
352
353  @Override
354  public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
355    throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
356  }
357
358  @Override
359  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
360    throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
361  }
362
363  @Override
364  public RegionLocator getRegionLocator(TableName tableName) throws IOException {
365    throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
366  }
367
368  @Override
369  public void clearRegionLocationCache() {
370    throw new NotImplementedException("clearRegionLocationCache not supported in ThriftTable");
371  }
372}