001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift2.client;
019
020import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_SOCKET_TIMEOUT_CONNECT;
021import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT;
022
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.lang.reflect.Constructor;
026import java.net.UnknownHostException;
027import java.util.Arrays;
028import java.util.HashMap;
029import java.util.Map;
030import java.util.concurrent.ExecutorService;
031import javax.net.ssl.SSLException;
032import org.apache.commons.lang3.NotImplementedException;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.AsyncConnection;
038import org.apache.hadoop.hbase.client.BufferedMutator;
039import org.apache.hadoop.hbase.client.BufferedMutatorParams;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.ConnectionUtils;
042import org.apache.hadoop.hbase.client.RegionLocator;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.client.TableBuilder;
045import org.apache.hadoop.hbase.security.User;
046import org.apache.hadoop.hbase.thrift.Constants;
047import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
048import org.apache.hadoop.hbase.util.Pair;
049import org.apache.http.HttpRequest;
050import org.apache.http.client.HttpClient;
051import org.apache.http.client.config.RequestConfig;
052import org.apache.http.client.utils.HttpClientUtils;
053import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
054import org.apache.http.impl.client.HttpClientBuilder;
055import org.apache.http.protocol.HttpContext;
056import org.apache.thrift.TException;
057import org.apache.thrift.protocol.TBinaryProtocol;
058import org.apache.thrift.protocol.TCompactProtocol;
059import org.apache.thrift.protocol.TProtocol;
060import org.apache.thrift.transport.THttpClient;
061import org.apache.thrift.transport.TSocket;
062import org.apache.thrift.transport.TTransport;
063import org.apache.thrift.transport.TTransportException;
064import org.apache.thrift.transport.layered.TFramedTransport;
065import org.apache.yetus.audience.InterfaceAudience;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
070
071@InterfaceAudience.Private
072public class ThriftConnection implements Connection {
073  private static final Logger LOG = LoggerFactory.getLogger(ThriftConnection.class);
074  private Configuration conf;
075  private User user;
076  // For HTTP protocol
077  private HttpClient httpClient;
078  private boolean httpClientCreated = false;
079  private boolean isClosed = false;
080
081  private String host;
082  private int port;
083  private boolean isFramed = false;
084  private boolean isCompact = false;
085
086  // TODO: We can rip out the ThriftClient piece of it rather than creating a new client every time.
087  ThriftClientBuilder clientBuilder;
088
089  private int operationTimeout;
090  private int connectTimeout;
091
092  public ThriftConnection(Configuration conf, ExecutorService pool, final User user,
093    Map<String, byte[]> connectionAttributes) throws IOException {
094    this.conf = conf;
095    this.user = user;
096    this.host = conf.get(Constants.HBASE_THRIFT_SERVER_NAME);
097    this.port = conf.getInt(Constants.HBASE_THRIFT_SERVER_PORT, -1);
098    Preconditions.checkArgument(port > 0);
099    Preconditions.checkArgument(host != null);
100    this.isFramed = conf.getBoolean(Constants.FRAMED_CONF_KEY, Constants.FRAMED_CONF_DEFAULT);
101    this.isCompact = conf.getBoolean(Constants.COMPACT_CONF_KEY, Constants.COMPACT_CONF_DEFAULT);
102    this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
103      HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
104    this.connectTimeout = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
105
106    String className = conf.get(Constants.HBASE_THRIFT_CLIENT_BUIDLER_CLASS,
107      DefaultThriftClientBuilder.class.getName());
108    try {
109      Class<?> clazz = Class.forName(className);
110      Constructor<?> constructor = clazz.getDeclaredConstructor(ThriftConnection.class);
111      constructor.setAccessible(true);
112      clientBuilder = (ThriftClientBuilder) constructor.newInstance(this);
113    } catch (Exception e) {
114      throw new IOException(e);
115    }
116  }
117
118  public synchronized void setHttpClient(HttpClient httpClient) {
119    this.httpClient = httpClient;
120  }
121
122  @Override
123  public Configuration getConfiguration() {
124    return conf;
125  }
126
127  public String getHost() {
128    return host;
129  }
130
131  public int getPort() {
132    return port;
133  }
134
135  public boolean isFramed() {
136    return isFramed;
137  }
138
139  public boolean isCompact() {
140    return isCompact;
141  }
142
143  public int getOperationTimeout() {
144    return operationTimeout;
145  }
146
147  public int getConnectTimeout() {
148    return connectTimeout;
149  }
150
151  /**
152   * the default thrift client builder. One can extend the ThriftClientBuilder to builder custom
153   * client, implement features like authentication(hbase-examples/thrift/DemoClient)
154   */
155  public static class DefaultThriftClientBuilder extends ThriftClientBuilder {
156
157    @Override
158    public Pair<THBaseService.Client, TTransport> getClient() throws IOException {
159      TTransport tTransport = null;
160      try {
161        TSocket sock = new TSocket(connection.getHost(), connection.getPort());
162        sock.setSocketTimeout(connection.getOperationTimeout());
163        sock.setConnectTimeout(connection.getConnectTimeout());
164        tTransport = sock;
165        if (connection.isFramed()) {
166          tTransport = new TFramedTransport(tTransport);
167        }
168
169        sock.open();
170      } catch (TTransportException e) {
171        throw new IOException(e);
172      }
173      TProtocol prot;
174      if (connection.isCompact()) {
175        prot = new TCompactProtocol(tTransport);
176      } else {
177        prot = new TBinaryProtocol(tTransport);
178      }
179      THBaseService.Client client = new THBaseService.Client(prot);
180      return new Pair<>(client, tTransport);
181    }
182
183    public DefaultThriftClientBuilder(ThriftConnection connection) {
184      super(connection);
185    }
186  }
187
188  /**
189   * the default thrift http client builder. One can extend the ThriftClientBuilder to builder
190   * custom http client, implement features like authentication or
191   * 'DoAs'(hbase-examples/thrift/HttpDoAsClient)
192   */
193  public static class HTTPThriftClientBuilder extends ThriftClientBuilder {
194    Map<String, String> customHeader = new HashMap<>();
195
196    public HTTPThriftClientBuilder(ThriftConnection connection) {
197      super(connection);
198    }
199
200    public void addCostumHeader(String key, String value) {
201      customHeader.put(key, value);
202    }
203
204    @Override
205    public Pair<THBaseService.Client, TTransport> getClient() throws IOException {
206      Preconditions.checkArgument(connection.getHost().startsWith("http"),
207        "http client host must start with http or https");
208      String url = connection.getHost() + ":" + connection.getPort();
209      try {
210        THttpClient httpClient = new THttpClient(url, connection.getHttpClient());
211        for (Map.Entry<String, String> header : customHeader.entrySet()) {
212          httpClient.setCustomHeader(header.getKey(), header.getValue());
213        }
214        httpClient.open();
215        TProtocol prot = new TBinaryProtocol(httpClient);
216        THBaseService.Client client = new THBaseService.Client(prot);
217        return new Pair<>(client, httpClient);
218      } catch (TTransportException e) {
219        throw new IOException(e);
220      }
221
222    }
223  }
224
225  /**
226   * Get a ThriftAdmin, ThriftAdmin is NOT thread safe
227   * @return a ThriftAdmin
228   * @throws IOException IOException
229   */
230  @Override
231  public Admin getAdmin() throws IOException {
232    Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient();
233    return new ThriftAdmin(client.getFirst(), client.getSecond(), conf);
234  }
235
236  public static class DelayRetryHandler extends DefaultHttpRequestRetryHandler {
237    private long pause;
238
239    public DelayRetryHandler(int retryCount, long pause) {
240      super(retryCount, true, Arrays.asList(InterruptedIOException.class,
241        UnknownHostException.class, SSLException.class));
242      this.pause = pause;
243    }
244
245    @Override
246    public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
247      // Don't sleep for retrying the first time
248      if (executionCount > 1 && pause > 0) {
249        try {
250          long sleepTime = ConnectionUtils.getPauseTime(pause, executionCount - 1);
251          Thread.sleep(sleepTime);
252        } catch (InterruptedException ie) {
253          // reset interrupt marker
254          Thread.currentThread().interrupt();
255        }
256      }
257      return super.retryRequest(exception, executionCount, context);
258    }
259
260    @Override
261    protected boolean handleAsIdempotent(HttpRequest request) {
262      return true;
263    }
264  }
265
266  public synchronized HttpClient getHttpClient() {
267    if (httpClient != null) {
268      return httpClient;
269    }
270    int retry = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
271      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
272    long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 5);
273    HttpClientBuilder builder = HttpClientBuilder.create();
274    RequestConfig.Builder requestBuilder = RequestConfig.custom();
275    requestBuilder = requestBuilder.setConnectTimeout(getConnectTimeout());
276    requestBuilder = requestBuilder.setSocketTimeout(getOperationTimeout());
277    builder.setRetryHandler(new DelayRetryHandler(retry, pause));
278    builder.setDefaultRequestConfig(requestBuilder.build());
279    httpClient = builder.build();
280    httpClientCreated = true;
281    return httpClient;
282  }
283
284  @Override
285  public synchronized void close() throws IOException {
286    if (httpClient != null && httpClientCreated) {
287      HttpClientUtils.closeQuietly(httpClient);
288    }
289    isClosed = true;
290  }
291
292  @Override
293  public boolean isClosed() {
294    return isClosed;
295  }
296
297  /**
298   * Get a TableBuider to build ThriftTable, ThriftTable is NOT thread safe
299   * @return a TableBuilder
300   * @throws IOException IOException
301   */
302  @Override
303  public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
304    return new TableBuilder() {
305      @Override
306      public TableBuilder setOperationTimeout(int timeout) {
307        return this;
308      }
309
310      @Override
311      public TableBuilder setRpcTimeout(int timeout) {
312        return this;
313      }
314
315      @Override
316      public TableBuilder setReadRpcTimeout(int timeout) {
317        return this;
318      }
319
320      @Override
321      public TableBuilder setWriteRpcTimeout(int timeout) {
322        return this;
323      }
324
325      @Override
326      public TableBuilder setRequestAttribute(String key, byte[] value) {
327        return this;
328      }
329
330      @Override
331      public Table build() {
332        try {
333          Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient();
334          return new ThriftTable(tableName, client.getFirst(), client.getSecond(), conf);
335        } catch (IOException ioE) {
336          throw new RuntimeException(ioE);
337        }
338      }
339    };
340  }
341
342  @Override
343  public void abort(String why, Throwable e) {
344
345  }
346
347  @Override
348  public boolean isAborted() {
349    return false;
350  }
351
352  @Override
353  public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
354    throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
355  }
356
357  @Override
358  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
359    throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
360  }
361
362  @Override
363  public RegionLocator getRegionLocator(TableName tableName) throws IOException {
364    throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
365  }
366
367  @Override
368  public void clearRegionLocationCache() {
369    throw new NotImplementedException("clearRegionLocationCache not supported in ThriftTable");
370  }
371
372  @Override
373  public AsyncConnection toAsyncConnection() {
374    throw new NotImplementedException("toAsyncConnection not supported in ThriftTable");
375  }
376
377  @Override
378  public String getClusterId() {
379    try {
380      Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient();
381      return client.getFirst().getClusterId();
382    } catch (TException | IOException e) {
383      LOG.error("Error fetching cluster ID: ", e);
384    }
385    return null;
386  }
387}