001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.thrift2.client; 019 020import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_SOCKET_TIMEOUT_CONNECT; 021import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT; 022 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.lang.reflect.Constructor; 026import java.net.UnknownHostException; 027import java.util.Arrays; 028import java.util.HashMap; 029import java.util.Map; 030import java.util.concurrent.ExecutorService; 031import javax.net.ssl.SSLException; 032import org.apache.commons.lang3.NotImplementedException; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.AsyncConnection; 038import org.apache.hadoop.hbase.client.BufferedMutator; 039import org.apache.hadoop.hbase.client.BufferedMutatorParams; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.ConnectionUtils; 042import org.apache.hadoop.hbase.client.RegionLocator; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.client.TableBuilder; 045import org.apache.hadoop.hbase.security.User; 046import org.apache.hadoop.hbase.thrift.Constants; 047import org.apache.hadoop.hbase.thrift2.generated.THBaseService; 048import org.apache.hadoop.hbase.util.Pair; 049import org.apache.http.HttpRequest; 050import org.apache.http.client.HttpClient; 051import org.apache.http.client.config.RequestConfig; 052import org.apache.http.client.utils.HttpClientUtils; 053import org.apache.http.impl.client.DefaultHttpRequestRetryHandler; 054import org.apache.http.impl.client.HttpClientBuilder; 055import org.apache.http.protocol.HttpContext; 056import org.apache.thrift.TException; 057import org.apache.thrift.protocol.TBinaryProtocol; 058import org.apache.thrift.protocol.TCompactProtocol; 059import org.apache.thrift.protocol.TProtocol; 060import org.apache.thrift.transport.THttpClient; 061import org.apache.thrift.transport.TSocket; 062import org.apache.thrift.transport.TTransport; 063import org.apache.thrift.transport.TTransportException; 064import org.apache.thrift.transport.layered.TFramedTransport; 065import org.apache.yetus.audience.InterfaceAudience; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 070 071@InterfaceAudience.Private 072public class ThriftConnection implements Connection { 073 private static final Logger LOG = LoggerFactory.getLogger(ThriftConnection.class); 074 private Configuration conf; 075 private User user; 076 // For HTTP protocol 077 private HttpClient httpClient; 078 private boolean httpClientCreated = false; 079 private boolean isClosed = false; 080 081 private String host; 082 private int port; 083 private boolean isFramed = false; 084 private boolean isCompact = false; 085 086 // TODO: We can rip out the ThriftClient piece of it rather than creating a new client every time. 087 ThriftClientBuilder clientBuilder; 088 089 private int operationTimeout; 090 private int connectTimeout; 091 092 public ThriftConnection(Configuration conf, ExecutorService pool, final User user, 093 Map<String, byte[]> connectionAttributes) throws IOException { 094 this.conf = conf; 095 this.user = user; 096 this.host = conf.get(Constants.HBASE_THRIFT_SERVER_NAME); 097 this.port = conf.getInt(Constants.HBASE_THRIFT_SERVER_PORT, -1); 098 Preconditions.checkArgument(port > 0); 099 Preconditions.checkArgument(host != null); 100 this.isFramed = conf.getBoolean(Constants.FRAMED_CONF_KEY, Constants.FRAMED_CONF_DEFAULT); 101 this.isCompact = conf.getBoolean(Constants.COMPACT_CONF_KEY, Constants.COMPACT_CONF_DEFAULT); 102 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 103 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 104 this.connectTimeout = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT); 105 106 String className = conf.get(Constants.HBASE_THRIFT_CLIENT_BUIDLER_CLASS, 107 DefaultThriftClientBuilder.class.getName()); 108 try { 109 Class<?> clazz = Class.forName(className); 110 Constructor<?> constructor = clazz.getDeclaredConstructor(ThriftConnection.class); 111 constructor.setAccessible(true); 112 clientBuilder = (ThriftClientBuilder) constructor.newInstance(this); 113 } catch (Exception e) { 114 throw new IOException(e); 115 } 116 } 117 118 public synchronized void setHttpClient(HttpClient httpClient) { 119 this.httpClient = httpClient; 120 } 121 122 @Override 123 public Configuration getConfiguration() { 124 return conf; 125 } 126 127 public String getHost() { 128 return host; 129 } 130 131 public int getPort() { 132 return port; 133 } 134 135 public boolean isFramed() { 136 return isFramed; 137 } 138 139 public boolean isCompact() { 140 return isCompact; 141 } 142 143 public int getOperationTimeout() { 144 return operationTimeout; 145 } 146 147 public int getConnectTimeout() { 148 return connectTimeout; 149 } 150 151 /** 152 * the default thrift client builder. One can extend the ThriftClientBuilder to builder custom 153 * client, implement features like authentication(hbase-examples/thrift/DemoClient) 154 */ 155 public static class DefaultThriftClientBuilder extends ThriftClientBuilder { 156 157 @Override 158 public Pair<THBaseService.Client, TTransport> getClient() throws IOException { 159 TTransport tTransport = null; 160 try { 161 TSocket sock = new TSocket(connection.getHost(), connection.getPort()); 162 sock.setSocketTimeout(connection.getOperationTimeout()); 163 sock.setConnectTimeout(connection.getConnectTimeout()); 164 tTransport = sock; 165 if (connection.isFramed()) { 166 tTransport = new TFramedTransport(tTransport); 167 } 168 169 sock.open(); 170 } catch (TTransportException e) { 171 throw new IOException(e); 172 } 173 TProtocol prot; 174 if (connection.isCompact()) { 175 prot = new TCompactProtocol(tTransport); 176 } else { 177 prot = new TBinaryProtocol(tTransport); 178 } 179 THBaseService.Client client = new THBaseService.Client(prot); 180 return new Pair<>(client, tTransport); 181 } 182 183 public DefaultThriftClientBuilder(ThriftConnection connection) { 184 super(connection); 185 } 186 } 187 188 /** 189 * the default thrift http client builder. One can extend the ThriftClientBuilder to builder 190 * custom http client, implement features like authentication or 191 * 'DoAs'(hbase-examples/thrift/HttpDoAsClient) 192 */ 193 public static class HTTPThriftClientBuilder extends ThriftClientBuilder { 194 Map<String, String> customHeader = new HashMap<>(); 195 196 public HTTPThriftClientBuilder(ThriftConnection connection) { 197 super(connection); 198 } 199 200 public void addCostumHeader(String key, String value) { 201 customHeader.put(key, value); 202 } 203 204 @Override 205 public Pair<THBaseService.Client, TTransport> getClient() throws IOException { 206 Preconditions.checkArgument(connection.getHost().startsWith("http"), 207 "http client host must start with http or https"); 208 String url = connection.getHost() + ":" + connection.getPort(); 209 try { 210 THttpClient httpClient = new THttpClient(url, connection.getHttpClient()); 211 for (Map.Entry<String, String> header : customHeader.entrySet()) { 212 httpClient.setCustomHeader(header.getKey(), header.getValue()); 213 } 214 httpClient.open(); 215 TProtocol prot = new TBinaryProtocol(httpClient); 216 THBaseService.Client client = new THBaseService.Client(prot); 217 return new Pair<>(client, httpClient); 218 } catch (TTransportException e) { 219 throw new IOException(e); 220 } 221 222 } 223 } 224 225 /** 226 * Get a ThriftAdmin, ThriftAdmin is NOT thread safe 227 * @return a ThriftAdmin 228 * @throws IOException IOException 229 */ 230 @Override 231 public Admin getAdmin() throws IOException { 232 Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient(); 233 return new ThriftAdmin(client.getFirst(), client.getSecond(), conf); 234 } 235 236 public static class DelayRetryHandler extends DefaultHttpRequestRetryHandler { 237 private long pause; 238 239 public DelayRetryHandler(int retryCount, long pause) { 240 super(retryCount, true, Arrays.asList(InterruptedIOException.class, 241 UnknownHostException.class, SSLException.class)); 242 this.pause = pause; 243 } 244 245 @Override 246 public boolean retryRequest(IOException exception, int executionCount, HttpContext context) { 247 // Don't sleep for retrying the first time 248 if (executionCount > 1 && pause > 0) { 249 try { 250 long sleepTime = ConnectionUtils.getPauseTime(pause, executionCount - 1); 251 Thread.sleep(sleepTime); 252 } catch (InterruptedException ie) { 253 // reset interrupt marker 254 Thread.currentThread().interrupt(); 255 } 256 } 257 return super.retryRequest(exception, executionCount, context); 258 } 259 260 @Override 261 protected boolean handleAsIdempotent(HttpRequest request) { 262 return true; 263 } 264 } 265 266 public synchronized HttpClient getHttpClient() { 267 if (httpClient != null) { 268 return httpClient; 269 } 270 int retry = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 271 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 272 long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 5); 273 HttpClientBuilder builder = HttpClientBuilder.create(); 274 RequestConfig.Builder requestBuilder = RequestConfig.custom(); 275 requestBuilder = requestBuilder.setConnectTimeout(getConnectTimeout()); 276 requestBuilder = requestBuilder.setSocketTimeout(getOperationTimeout()); 277 builder.setRetryHandler(new DelayRetryHandler(retry, pause)); 278 builder.setDefaultRequestConfig(requestBuilder.build()); 279 httpClient = builder.build(); 280 httpClientCreated = true; 281 return httpClient; 282 } 283 284 @Override 285 public synchronized void close() throws IOException { 286 if (httpClient != null && httpClientCreated) { 287 HttpClientUtils.closeQuietly(httpClient); 288 } 289 isClosed = true; 290 } 291 292 @Override 293 public boolean isClosed() { 294 return isClosed; 295 } 296 297 /** 298 * Get a TableBuider to build ThriftTable, ThriftTable is NOT thread safe 299 * @return a TableBuilder 300 * @throws IOException IOException 301 */ 302 @Override 303 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 304 return new TableBuilder() { 305 @Override 306 public TableBuilder setOperationTimeout(int timeout) { 307 return this; 308 } 309 310 @Override 311 public TableBuilder setRpcTimeout(int timeout) { 312 return this; 313 } 314 315 @Override 316 public TableBuilder setReadRpcTimeout(int timeout) { 317 return this; 318 } 319 320 @Override 321 public TableBuilder setWriteRpcTimeout(int timeout) { 322 return this; 323 } 324 325 @Override 326 public TableBuilder setRequestAttribute(String key, byte[] value) { 327 return this; 328 } 329 330 @Override 331 public Table build() { 332 try { 333 Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient(); 334 return new ThriftTable(tableName, client.getFirst(), client.getSecond(), conf); 335 } catch (IOException ioE) { 336 throw new RuntimeException(ioE); 337 } 338 } 339 }; 340 } 341 342 @Override 343 public void abort(String why, Throwable e) { 344 345 } 346 347 @Override 348 public boolean isAborted() { 349 return false; 350 } 351 352 @Override 353 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 354 throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable"); 355 } 356 357 @Override 358 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 359 throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable"); 360 } 361 362 @Override 363 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 364 throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable"); 365 } 366 367 @Override 368 public void clearRegionLocationCache() { 369 throw new NotImplementedException("clearRegionLocationCache not supported in ThriftTable"); 370 } 371 372 @Override 373 public AsyncConnection toAsyncConnection() { 374 throw new NotImplementedException("toAsyncConnection not supported in ThriftTable"); 375 } 376 377 @Override 378 public String getClusterId() { 379 try { 380 Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient(); 381 return client.getFirst().getClusterId(); 382 } catch (TException | IOException e) { 383 LOG.error("Error fetching cluster ID: ", e); 384 } 385 return null; 386 } 387}