001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.thrift2.client;
020
021import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_SOCKET_TIMEOUT_CONNECT;
022import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.lang.reflect.Constructor;
027import java.net.UnknownHostException;
028import java.util.Arrays;
029import java.util.HashMap;
030import java.util.Map;
031import java.util.concurrent.ExecutorService;
032import javax.net.ssl.SSLException;
033import org.apache.commons.lang3.NotImplementedException;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.AsyncConnection;
039import org.apache.hadoop.hbase.client.BufferedMutator;
040import org.apache.hadoop.hbase.client.BufferedMutatorParams;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionUtils;
043import org.apache.hadoop.hbase.client.RegionLocator;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.client.TableBuilder;
046import org.apache.hadoop.hbase.security.User;
047import org.apache.hadoop.hbase.thrift.Constants;
048import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
049import org.apache.hadoop.hbase.util.Pair;
050import org.apache.http.HttpRequest;
051import org.apache.http.client.HttpClient;
052import org.apache.http.client.config.RequestConfig;
053import org.apache.http.client.utils.HttpClientUtils;
054import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
055import org.apache.http.impl.client.HttpClientBuilder;
056import org.apache.http.protocol.HttpContext;
057import org.apache.thrift.TException;
058import org.apache.thrift.protocol.TBinaryProtocol;
059import org.apache.thrift.protocol.TCompactProtocol;
060import org.apache.thrift.protocol.TProtocol;
061import org.apache.thrift.transport.TFramedTransport;
062import org.apache.thrift.transport.THttpClient;
063import org.apache.thrift.transport.TSocket;
064import org.apache.thrift.transport.TTransport;
065import org.apache.thrift.transport.TTransportException;
066import org.apache.yetus.audience.InterfaceAudience;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
071
072@InterfaceAudience.Private
073public class ThriftConnection implements Connection {
074  private static final Logger LOG = LoggerFactory.getLogger(ThriftConnection.class);
075  private Configuration conf;
076  private User user;
077  // For HTTP protocol
078  private HttpClient httpClient;
079  private boolean httpClientCreated = false;
080  private boolean isClosed = false;
081
082  private String host;
083  private int port;
084  private boolean isFramed = false;
085  private boolean isCompact = false;
086
087  // TODO: We can rip out the ThriftClient piece of it rather than creating a new client every time.
088  ThriftClientBuilder clientBuilder;
089
090  private int operationTimeout;
091  private int connectTimeout;
092
093  public ThriftConnection(Configuration conf, ExecutorService pool, final User user)
094      throws IOException {
095    this.conf = conf;
096    this.user = user;
097    this.host = conf.get(Constants.HBASE_THRIFT_SERVER_NAME);
098    this.port = conf.getInt(Constants.HBASE_THRIFT_SERVER_PORT, -1);
099    Preconditions.checkArgument(port > 0);
100    Preconditions.checkArgument(host != null);
101    this.isFramed = conf.getBoolean(Constants.FRAMED_CONF_KEY, Constants.FRAMED_CONF_DEFAULT);
102    this.isCompact = conf.getBoolean(Constants.COMPACT_CONF_KEY, Constants.COMPACT_CONF_DEFAULT);
103    this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
104        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
105    this.connectTimeout = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
106
107    String className = conf.get(Constants.HBASE_THRIFT_CLIENT_BUIDLER_CLASS,
108        DefaultThriftClientBuilder.class.getName());
109    try {
110      Class<?> clazz = Class.forName(className);
111      Constructor<?> constructor = clazz
112          .getDeclaredConstructor(ThriftConnection.class);
113      constructor.setAccessible(true);
114      clientBuilder = (ThriftClientBuilder) constructor.newInstance(this);
115    }catch (Exception e) {
116      throw new IOException(e);
117    }
118  }
119
120  public synchronized void setHttpClient(HttpClient httpClient) {
121    this.httpClient = httpClient;
122  }
123
124  @Override
125  public Configuration getConfiguration() {
126    return conf;
127  }
128
129  public String getHost() {
130    return host;
131  }
132
133  public int getPort() {
134    return port;
135  }
136
137  public boolean isFramed() {
138    return isFramed;
139  }
140
141  public boolean isCompact() {
142    return isCompact;
143  }
144
145  public int getOperationTimeout() {
146    return operationTimeout;
147  }
148
149  public int getConnectTimeout() {
150    return connectTimeout;
151  }
152
153  /**
154   * the default thrift client builder.
155   * One can extend the ThriftClientBuilder to builder custom client, implement
156   * features like authentication(hbase-examples/thrift/DemoClient)
157   *
158   */
159  public static class DefaultThriftClientBuilder extends ThriftClientBuilder  {
160
161    @Override
162    public Pair<THBaseService.Client, TTransport> getClient() throws IOException {
163      TSocket sock = new TSocket(connection.getHost(), connection.getPort());
164      sock.setSocketTimeout(connection.getOperationTimeout());
165      sock.setConnectTimeout(connection.getConnectTimeout());
166      TTransport tTransport = sock;
167      if (connection.isFramed()) {
168        tTransport = new TFramedTransport(tTransport);
169      }
170      try {
171        sock.open();
172      } catch (TTransportException e) {
173        throw new IOException(e);
174      }
175      TProtocol prot;
176      if (connection.isCompact()) {
177        prot = new TCompactProtocol(tTransport);
178      } else {
179        prot = new TBinaryProtocol(tTransport);
180      }
181      THBaseService.Client client = new THBaseService.Client(prot);
182      return new Pair<>(client, tTransport);
183    }
184
185    public DefaultThriftClientBuilder(ThriftConnection connection) {
186      super(connection);
187    }
188  }
189
190  /**
191   * the default thrift http client builder.
192   * One can extend the ThriftClientBuilder to builder custom http client, implement
193   * features like authentication or 'DoAs'(hbase-examples/thrift/HttpDoAsClient)
194   *
195   */
196  public static class HTTPThriftClientBuilder extends ThriftClientBuilder {
197    Map<String,String> customHeader = new HashMap<>();
198
199    public HTTPThriftClientBuilder(ThriftConnection connection) {
200      super(connection);
201    }
202
203    public void addCostumHeader(String key, String value) {
204      customHeader.put(key, value);
205    }
206
207    @Override
208    public Pair<THBaseService.Client, TTransport> getClient() throws IOException {
209      Preconditions.checkArgument(connection.getHost().startsWith("http"),
210          "http client host must start with http or https");
211      String url = connection.getHost() + ":" + connection.getPort();
212      try {
213        THttpClient httpClient = new THttpClient(url, connection.getHttpClient());
214        for (Map.Entry<String, String> header : customHeader.entrySet()) {
215          httpClient.setCustomHeader(header.getKey(), header.getValue());
216        }
217        httpClient.open();
218        TProtocol prot = new TBinaryProtocol(httpClient);
219        THBaseService.Client client = new THBaseService.Client(prot);
220        return new Pair<>(client, httpClient);
221      } catch (TTransportException e) {
222        throw new IOException(e);
223      }
224
225    }
226  }
227
228  /**
229   * Get a ThriftAdmin, ThriftAdmin is NOT thread safe
230   * @return a ThriftAdmin
231   * @throws IOException IOException
232   */
233  @Override
234  public Admin getAdmin() throws IOException {
235    Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient();
236    return new ThriftAdmin(client.getFirst(), client.getSecond(), conf);
237  }
238
239  public static class DelayRetryHandler extends DefaultHttpRequestRetryHandler {
240    private long pause;
241
242    public DelayRetryHandler(int retryCount, long pause) {
243      super(retryCount, true, Arrays.asList(
244          InterruptedIOException.class,
245          UnknownHostException.class,
246          SSLException.class));
247      this.pause = pause;
248    }
249
250    @Override
251    public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
252      // Don't sleep for retrying the first time
253      if (executionCount > 1 && pause > 0) {
254        try {
255          long sleepTime = ConnectionUtils.getPauseTime(pause, executionCount - 1);
256          Thread.sleep(sleepTime);
257        } catch (InterruptedException ie) {
258          //reset interrupt marker
259          Thread.currentThread().interrupt();
260        }
261      }
262      return super.retryRequest(exception, executionCount, context);
263    }
264
265    @Override
266    protected boolean handleAsIdempotent(HttpRequest request) {
267      return true;
268    }
269  }
270
271  public synchronized HttpClient getHttpClient() {
272    if (httpClient != null) {
273      return httpClient;
274    }
275    int retry = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
276        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
277    long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 5);
278    HttpClientBuilder builder = HttpClientBuilder.create();
279    RequestConfig.Builder requestBuilder = RequestConfig.custom();
280    requestBuilder = requestBuilder.setConnectTimeout(getConnectTimeout());
281    requestBuilder = requestBuilder.setSocketTimeout(getOperationTimeout());
282    builder.setRetryHandler(new DelayRetryHandler(retry, pause));
283    builder.setDefaultRequestConfig(requestBuilder.build());
284    httpClient = builder.build();
285    httpClientCreated = true;
286    return httpClient;
287  }
288
289  @Override
290  public synchronized void close() throws IOException {
291    if (httpClient != null && httpClientCreated) {
292      HttpClientUtils.closeQuietly(httpClient);
293    }
294    isClosed = true;
295  }
296
297  @Override
298  public boolean isClosed() {
299    return isClosed;
300  }
301
302  /**
303   * Get a TableBuider to build ThriftTable, ThriftTable is NOT thread safe
304   * @return a TableBuilder
305   * @throws IOException IOException
306   */
307  @Override
308  public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
309    return new TableBuilder() {
310      @Override
311      public TableBuilder setOperationTimeout(int timeout) {
312        return this;
313      }
314
315      @Override
316      public TableBuilder setRpcTimeout(int timeout) {
317        return this;
318      }
319
320      @Override
321      public TableBuilder setReadRpcTimeout(int timeout) {
322        return this;
323      }
324
325      @Override
326      public TableBuilder setWriteRpcTimeout(int timeout) {
327        return this;
328      }
329
330      @Override
331      public Table build() {
332        try {
333          Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient();
334          return new ThriftTable(tableName, client.getFirst(), client.getSecond(), conf);
335        } catch (IOException ioE) {
336          throw new RuntimeException(ioE);
337        }
338      }
339    };
340  }
341
342  @Override
343  public void abort(String why, Throwable e) {
344
345  }
346
347  @Override
348  public boolean isAborted() {
349    return false;
350  }
351
352  @Override
353  public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
354    throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
355  }
356
357  @Override
358  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
359    throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
360  }
361
362  @Override
363  public RegionLocator getRegionLocator(TableName tableName) throws IOException {
364    throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
365  }
366
367  @Override
368  public void clearRegionLocationCache() {
369    throw new NotImplementedException("clearRegionLocationCache not supported in ThriftTable");
370  }
371
372  @Override
373  public AsyncConnection toAsyncConnection() {
374    throw new NotImplementedException("toAsyncConnection not supported in ThriftTable");
375  }
376
377  @Override
378  public String getClusterId() {
379    try {
380      Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient();
381      return client.getFirst().getClusterId();
382    } catch (TException | IOException e) {
383      LOG.error("Error fetching cluster ID: ", e);
384    }
385    return null;
386  }
387}