001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.thrift2.client;
020
021import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_SOCKET_TIMEOUT_CONNECT;
022import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.lang.reflect.Constructor;
027import java.net.UnknownHostException;
028import java.util.Arrays;
029import java.util.HashMap;
030import java.util.Map;
031import java.util.concurrent.ExecutorService;
032import javax.net.ssl.SSLException;
033import org.apache.commons.lang3.NotImplementedException;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.AsyncConnection;
039import org.apache.hadoop.hbase.client.BufferedMutator;
040import org.apache.hadoop.hbase.client.BufferedMutatorParams;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionUtils;
043import org.apache.hadoop.hbase.client.RegionLocator;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.client.TableBuilder;
046import org.apache.hadoop.hbase.security.User;
047import org.apache.hadoop.hbase.thrift.Constants;
048import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
049import org.apache.hadoop.hbase.util.Pair;
050import org.apache.http.HttpRequest;
051import org.apache.http.client.HttpClient;
052import org.apache.http.client.config.RequestConfig;
053import org.apache.http.client.utils.HttpClientUtils;
054import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
055import org.apache.http.impl.client.HttpClientBuilder;
056import org.apache.http.protocol.HttpContext;
057import org.apache.thrift.protocol.TBinaryProtocol;
058import org.apache.thrift.protocol.TCompactProtocol;
059import org.apache.thrift.protocol.TProtocol;
060import org.apache.thrift.transport.TFramedTransport;
061import org.apache.thrift.transport.THttpClient;
062import org.apache.thrift.transport.TSocket;
063import org.apache.thrift.transport.TTransport;
064import org.apache.thrift.transport.TTransportException;
065import org.apache.yetus.audience.InterfaceAudience;
066
067import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
068
069@InterfaceAudience.Private
070public class ThriftConnection implements Connection {
071  private Configuration conf;
072  private User user;
073  // For HTTP protocol
074  private HttpClient httpClient;
075  private boolean httpClientCreated = false;
076  private boolean isClosed = false;
077
078  private String host;
079  private int port;
080  private boolean isFramed = false;
081  private boolean isCompact = false;
082
083  private ThriftClientBuilder clientBuilder;
084
085  private int operationTimeout;
086  private int connectTimeout;
087
088  public ThriftConnection(Configuration conf, ExecutorService pool, final User user)
089      throws IOException {
090    this.conf = conf;
091    this.user = user;
092    this.host = conf.get(Constants.HBASE_THRIFT_SERVER_NAME);
093    this.port = conf.getInt(Constants.HBASE_THRIFT_SERVER_PORT, -1);
094    Preconditions.checkArgument(port > 0);
095    Preconditions.checkArgument(host != null);
096    this.isFramed = conf.getBoolean(Constants.FRAMED_CONF_KEY, Constants.FRAMED_CONF_DEFAULT);
097    this.isCompact = conf.getBoolean(Constants.COMPACT_CONF_KEY, Constants.COMPACT_CONF_DEFAULT);
098    this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
099        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
100    this.connectTimeout = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
101
102    String className = conf.get(Constants.HBASE_THRIFT_CLIENT_BUIDLER_CLASS,
103        DefaultThriftClientBuilder.class.getName());
104    try {
105      Class<?> clazz = Class.forName(className);
106      Constructor<?> constructor = clazz
107          .getDeclaredConstructor(ThriftConnection.class);
108      constructor.setAccessible(true);
109      clientBuilder = (ThriftClientBuilder) constructor.newInstance(this);
110    }catch (Exception e) {
111      throw new IOException(e);
112    }
113  }
114
115  public synchronized void setHttpClient(HttpClient httpClient) {
116    this.httpClient = httpClient;
117  }
118
119  @Override
120  public Configuration getConfiguration() {
121    return conf;
122  }
123
124  public String getHost() {
125    return host;
126  }
127
128  public int getPort() {
129    return port;
130  }
131
132  public boolean isFramed() {
133    return isFramed;
134  }
135
136  public boolean isCompact() {
137    return isCompact;
138  }
139
140  public int getOperationTimeout() {
141    return operationTimeout;
142  }
143
144  public int getConnectTimeout() {
145    return connectTimeout;
146  }
147
148  public ThriftClientBuilder getClientBuilder() {
149    return clientBuilder;
150  }
151
152  /**
153   * the default thrift client builder.
154   * One can extend the ThriftClientBuilder to builder custom client, implement
155   * features like authentication(hbase-examples/thrift/DemoClient)
156   *
157   */
158  public static class DefaultThriftClientBuilder extends ThriftClientBuilder  {
159
160    @Override
161    public Pair<THBaseService.Client, TTransport> getClient() throws IOException {
162      TSocket sock = new TSocket(connection.getHost(), connection.getPort());
163      sock.setSocketTimeout(connection.getOperationTimeout());
164      sock.setConnectTimeout(connection.getConnectTimeout());
165      TTransport tTransport = sock;
166      if (connection.isFramed()) {
167        tTransport = new TFramedTransport(tTransport);
168      }
169      try {
170        sock.open();
171      } catch (TTransportException e) {
172        throw new IOException(e);
173      }
174      TProtocol prot;
175      if (connection.isCompact()) {
176        prot = new TCompactProtocol(tTransport);
177      } else {
178        prot = new TBinaryProtocol(tTransport);
179      }
180      THBaseService.Client client = new THBaseService.Client(prot);
181      return new Pair<>(client, tTransport);
182    }
183
184    public DefaultThriftClientBuilder(ThriftConnection connection) {
185      super(connection);
186    }
187  }
188
189  /**
190   * the default thrift http client builder.
191   * One can extend the ThriftClientBuilder to builder custom http client, implement
192   * features like authentication or 'DoAs'(hbase-examples/thrift/HttpDoAsClient)
193   *
194   */
195  public static class HTTPThriftClientBuilder extends ThriftClientBuilder {
196    Map<String,String> customHeader = new HashMap<>();
197
198    public HTTPThriftClientBuilder(ThriftConnection connection) {
199      super(connection);
200    }
201
202    public void addCostumHeader(String key, String value) {
203      customHeader.put(key, value);
204    }
205
206    @Override
207    public Pair<THBaseService.Client, TTransport> getClient() throws IOException {
208      Preconditions.checkArgument(connection.getHost().startsWith("http"),
209          "http client host must start with http or https");
210      String url = connection.getHost() + ":" + connection.getPort();
211      try {
212        THttpClient httpClient = new THttpClient(url, connection.getHttpClient());
213        for (Map.Entry<String, String> header : customHeader.entrySet()) {
214          httpClient.setCustomHeader(header.getKey(), header.getValue());
215        }
216        httpClient.open();
217        TProtocol prot = new TBinaryProtocol(httpClient);
218        THBaseService.Client client = new THBaseService.Client(prot);
219        return new Pair<>(client, httpClient);
220      } catch (TTransportException e) {
221        throw new IOException(e);
222      }
223
224    }
225  }
226
227  /**
228   * Get a ThriftAdmin, ThriftAdmin is NOT thread safe
229   * @return a ThriftAdmin
230   * @throws IOException IOException
231   */
232  @Override
233  public Admin getAdmin() throws IOException {
234    Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient();
235    return new ThriftAdmin(client.getFirst(), client.getSecond(), conf);
236  }
237
238  public static class DelayRetryHandler extends DefaultHttpRequestRetryHandler {
239    private long pause;
240
241    public DelayRetryHandler(int retryCount, long pause) {
242      super(retryCount, true, Arrays.asList(
243          InterruptedIOException.class,
244          UnknownHostException.class,
245          SSLException.class));
246      this.pause = pause;
247    }
248
249    @Override
250    public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
251      // Don't sleep for retrying the first time
252      if (executionCount > 1 && pause > 0) {
253        try {
254          long sleepTime = ConnectionUtils.getPauseTime(pause, executionCount - 1);
255          Thread.sleep(sleepTime);
256        } catch (InterruptedException ie) {
257          //reset interrupt marker
258          Thread.currentThread().interrupt();
259        }
260      }
261      return super.retryRequest(exception, executionCount, context);
262    }
263
264    @Override
265    protected boolean handleAsIdempotent(HttpRequest request) {
266      return true;
267    }
268  }
269
270  public synchronized HttpClient getHttpClient() {
271    if (httpClient != null) {
272      return httpClient;
273    }
274    int retry = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
275        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
276    long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 5);
277    HttpClientBuilder builder = HttpClientBuilder.create();
278    RequestConfig.Builder requestBuilder = RequestConfig.custom();
279    requestBuilder = requestBuilder.setConnectTimeout(getConnectTimeout());
280    requestBuilder = requestBuilder.setSocketTimeout(getOperationTimeout());
281    builder.setRetryHandler(new DelayRetryHandler(retry, pause));
282    builder.setDefaultRequestConfig(requestBuilder.build());
283    httpClient = builder.build();
284    httpClientCreated = true;
285    return httpClient;
286  }
287
288  @Override
289  public synchronized void close() throws IOException {
290    if (httpClient != null && httpClientCreated) {
291      HttpClientUtils.closeQuietly(httpClient);
292    }
293    isClosed = true;
294  }
295
296  @Override
297  public boolean isClosed() {
298    return isClosed;
299  }
300
301  /**
302   * Get a TableBuider to build ThriftTable, ThriftTable is NOT thread safe
303   * @return a TableBuilder
304   * @throws IOException IOException
305   */
306  @Override
307  public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
308    return new TableBuilder() {
309      @Override
310      public TableBuilder setOperationTimeout(int timeout) {
311        return this;
312      }
313
314      @Override
315      public TableBuilder setRpcTimeout(int timeout) {
316        return this;
317      }
318
319      @Override
320      public TableBuilder setReadRpcTimeout(int timeout) {
321        return this;
322      }
323
324      @Override
325      public TableBuilder setWriteRpcTimeout(int timeout) {
326        return this;
327      }
328
329      @Override
330      public Table build() {
331        try {
332          Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient();
333          return new ThriftTable(tableName, client.getFirst(), client.getSecond(), conf);
334        } catch (IOException ioE) {
335          throw new RuntimeException(ioE);
336        }
337
338      }
339    };
340  }
341
342  @Override
343  public void abort(String why, Throwable e) {
344
345  }
346
347  @Override
348  public boolean isAborted() {
349    return false;
350  }
351
352  @Override
353  public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
354    throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
355  }
356
357  @Override
358  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
359    throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
360  }
361
362  @Override
363  public RegionLocator getRegionLocator(TableName tableName) throws IOException {
364    throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
365  }
366
367  @Override
368  public void clearRegionLocationCache() {
369    throw new NotImplementedException("clearRegionLocationCache not supported in ThriftTable");
370  }
371
372  @Override
373  public AsyncConnection toAsyncConnection() {
374    throw new NotImplementedException("toAsyncConnection not supported in ThriftTable");
375  }
376}