001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.thrift2.client; 019 020import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_SOCKET_TIMEOUT_CONNECT; 021import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT; 022 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.lang.reflect.Constructor; 026import java.net.UnknownHostException; 027import java.util.Arrays; 028import java.util.HashMap; 029import java.util.Map; 030import java.util.concurrent.ExecutorService; 031import javax.net.ssl.SSLException; 032import org.apache.commons.lang3.NotImplementedException; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.BufferedMutator; 038import org.apache.hadoop.hbase.client.BufferedMutatorParams; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.ConnectionUtils; 041import org.apache.hadoop.hbase.client.RegionLocator; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.client.TableBuilder; 044import org.apache.hadoop.hbase.security.User; 045import org.apache.hadoop.hbase.thrift.Constants; 046import org.apache.hadoop.hbase.thrift2.generated.THBaseService; 047import org.apache.hadoop.hbase.util.Pair; 048import org.apache.http.HttpRequest; 049import org.apache.http.client.HttpClient; 050import org.apache.http.client.config.RequestConfig; 051import org.apache.http.client.utils.HttpClientUtils; 052import org.apache.http.impl.client.DefaultHttpRequestRetryHandler; 053import org.apache.http.impl.client.HttpClientBuilder; 054import org.apache.http.protocol.HttpContext; 055import org.apache.thrift.TException; 056import org.apache.thrift.protocol.TBinaryProtocol; 057import org.apache.thrift.protocol.TCompactProtocol; 058import org.apache.thrift.protocol.TProtocol; 059import org.apache.thrift.transport.THttpClient; 060import org.apache.thrift.transport.TSocket; 061import org.apache.thrift.transport.TTransport; 062import org.apache.thrift.transport.TTransportException; 063import org.apache.thrift.transport.layered.TFramedTransport; 064import org.apache.yetus.audience.InterfaceAudience; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 069 070@InterfaceAudience.Private 071public class ThriftConnection implements Connection { 072 private static final Logger LOG = LoggerFactory.getLogger(ThriftConnection.class); 073 private Configuration conf; 074 private User user; 075 // For HTTP protocol 076 private HttpClient httpClient; 077 private boolean httpClientCreated = false; 078 private boolean isClosed = false; 079 080 private String host; 081 private int port; 082 private boolean isFramed = false; 083 private boolean isCompact = false; 084 085 // TODO: We can rip out the ThriftClient piece of it rather than creating a new client every time. 086 ThriftClientBuilder clientBuilder; 087 088 private int operationTimeout; 089 private int connectTimeout; 090 091 public ThriftConnection(Configuration conf, ExecutorService pool, final User user) 092 throws IOException { 093 this.conf = conf; 094 this.user = user; 095 this.host = conf.get(Constants.HBASE_THRIFT_SERVER_NAME); 096 this.port = conf.getInt(Constants.HBASE_THRIFT_SERVER_PORT, -1); 097 Preconditions.checkArgument(port > 0); 098 Preconditions.checkArgument(host != null); 099 this.isFramed = conf.getBoolean(Constants.FRAMED_CONF_KEY, Constants.FRAMED_CONF_DEFAULT); 100 this.isCompact = conf.getBoolean(Constants.COMPACT_CONF_KEY, Constants.COMPACT_CONF_DEFAULT); 101 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 102 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 103 this.connectTimeout = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT); 104 105 String className = conf.get(Constants.HBASE_THRIFT_CLIENT_BUIDLER_CLASS, 106 DefaultThriftClientBuilder.class.getName()); 107 try { 108 Class<?> clazz = Class.forName(className); 109 Constructor<?> constructor = clazz.getDeclaredConstructor(ThriftConnection.class); 110 constructor.setAccessible(true); 111 clientBuilder = (ThriftClientBuilder) constructor.newInstance(this); 112 } catch (Exception e) { 113 throw new IOException(e); 114 } 115 } 116 117 public synchronized void setHttpClient(HttpClient httpClient) { 118 this.httpClient = httpClient; 119 } 120 121 @Override 122 public Configuration getConfiguration() { 123 return conf; 124 } 125 126 public String getHost() { 127 return host; 128 } 129 130 public int getPort() { 131 return port; 132 } 133 134 public boolean isFramed() { 135 return isFramed; 136 } 137 138 public boolean isCompact() { 139 return isCompact; 140 } 141 142 public int getOperationTimeout() { 143 return operationTimeout; 144 } 145 146 public int getConnectTimeout() { 147 return connectTimeout; 148 } 149 150 /** 151 * the default thrift client builder. One can extend the ThriftClientBuilder to builder custom 152 * client, implement features like authentication(hbase-examples/thrift/DemoClient) 153 */ 154 public static class DefaultThriftClientBuilder extends ThriftClientBuilder { 155 156 @Override 157 public Pair<THBaseService.Client, TTransport> getClient() throws IOException { 158 TTransport tTransport = null; 159 try { 160 TSocket sock = new TSocket(connection.getHost(), connection.getPort()); 161 sock.setSocketTimeout(connection.getOperationTimeout()); 162 sock.setConnectTimeout(connection.getConnectTimeout()); 163 tTransport = sock; 164 if (connection.isFramed()) { 165 tTransport = new TFramedTransport(tTransport); 166 } 167 168 sock.open(); 169 } catch (TTransportException e) { 170 throw new IOException(e); 171 } 172 TProtocol prot; 173 if (connection.isCompact()) { 174 prot = new TCompactProtocol(tTransport); 175 } else { 176 prot = new TBinaryProtocol(tTransport); 177 } 178 THBaseService.Client client = new THBaseService.Client(prot); 179 return new Pair<>(client, tTransport); 180 } 181 182 public DefaultThriftClientBuilder(ThriftConnection connection) { 183 super(connection); 184 } 185 } 186 187 /** 188 * the default thrift http client builder. One can extend the ThriftClientBuilder to builder 189 * custom http client, implement features like authentication or 190 * 'DoAs'(hbase-examples/thrift/HttpDoAsClient) 191 */ 192 public static class HTTPThriftClientBuilder extends ThriftClientBuilder { 193 Map<String, String> customHeader = new HashMap<>(); 194 195 public HTTPThriftClientBuilder(ThriftConnection connection) { 196 super(connection); 197 } 198 199 public void addCostumHeader(String key, String value) { 200 customHeader.put(key, value); 201 } 202 203 @Override 204 public Pair<THBaseService.Client, TTransport> getClient() throws IOException { 205 Preconditions.checkArgument(connection.getHost().startsWith("http"), 206 "http client host must start with http or https"); 207 String url = connection.getHost() + ":" + connection.getPort(); 208 try { 209 THttpClient httpClient = new THttpClient(url, connection.getHttpClient()); 210 for (Map.Entry<String, String> header : customHeader.entrySet()) { 211 httpClient.setCustomHeader(header.getKey(), header.getValue()); 212 } 213 httpClient.open(); 214 TProtocol prot = new TBinaryProtocol(httpClient); 215 THBaseService.Client client = new THBaseService.Client(prot); 216 return new Pair<>(client, httpClient); 217 } catch (TTransportException e) { 218 throw new IOException(e); 219 } 220 221 } 222 } 223 224 /** 225 * Get a ThriftAdmin, ThriftAdmin is NOT thread safe 226 * @return a ThriftAdmin 227 * @throws IOException IOException 228 */ 229 @Override 230 public Admin getAdmin() throws IOException { 231 Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient(); 232 return new ThriftAdmin(client.getFirst(), client.getSecond(), conf); 233 } 234 235 public static class DelayRetryHandler extends DefaultHttpRequestRetryHandler { 236 private long pause; 237 238 public DelayRetryHandler(int retryCount, long pause) { 239 super(retryCount, true, Arrays.asList(InterruptedIOException.class, 240 UnknownHostException.class, SSLException.class)); 241 this.pause = pause; 242 } 243 244 @Override 245 public boolean retryRequest(IOException exception, int executionCount, HttpContext context) { 246 // Don't sleep for retrying the first time 247 if (executionCount > 1 && pause > 0) { 248 try { 249 long sleepTime = ConnectionUtils.getPauseTime(pause, executionCount - 1); 250 Thread.sleep(sleepTime); 251 } catch (InterruptedException ie) { 252 // reset interrupt marker 253 Thread.currentThread().interrupt(); 254 } 255 } 256 return super.retryRequest(exception, executionCount, context); 257 } 258 259 @Override 260 protected boolean handleAsIdempotent(HttpRequest request) { 261 return true; 262 } 263 } 264 265 public synchronized HttpClient getHttpClient() { 266 if (httpClient != null) { 267 return httpClient; 268 } 269 int retry = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 270 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 271 long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 5); 272 HttpClientBuilder builder = HttpClientBuilder.create(); 273 RequestConfig.Builder requestBuilder = RequestConfig.custom(); 274 requestBuilder = requestBuilder.setConnectTimeout(getConnectTimeout()); 275 requestBuilder = requestBuilder.setSocketTimeout(getOperationTimeout()); 276 builder.setRetryHandler(new DelayRetryHandler(retry, pause)); 277 builder.setDefaultRequestConfig(requestBuilder.build()); 278 httpClient = builder.build(); 279 httpClientCreated = true; 280 return httpClient; 281 } 282 283 @Override 284 public synchronized void close() throws IOException { 285 if (httpClient != null && httpClientCreated) { 286 HttpClientUtils.closeQuietly(httpClient); 287 } 288 isClosed = true; 289 } 290 291 @Override 292 public boolean isClosed() { 293 return isClosed; 294 } 295 296 /** 297 * Get a TableBuider to build ThriftTable, ThriftTable is NOT thread safe 298 * @return a TableBuilder 299 * @throws IOException IOException 300 */ 301 @Override 302 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 303 return new TableBuilder() { 304 @Override 305 public TableBuilder setOperationTimeout(int timeout) { 306 return this; 307 } 308 309 @Override 310 public TableBuilder setRpcTimeout(int timeout) { 311 return this; 312 } 313 314 @Override 315 public TableBuilder setReadRpcTimeout(int timeout) { 316 return this; 317 } 318 319 @Override 320 public TableBuilder setWriteRpcTimeout(int timeout) { 321 return this; 322 } 323 324 @Override 325 public Table build() { 326 try { 327 Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient(); 328 return new ThriftTable(tableName, client.getFirst(), client.getSecond(), conf); 329 } catch (IOException ioE) { 330 throw new RuntimeException(ioE); 331 } 332 } 333 }; 334 } 335 336 @Override 337 public void abort(String why, Throwable e) { 338 339 } 340 341 @Override 342 public boolean isAborted() { 343 return false; 344 } 345 346 @Override 347 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 348 throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable"); 349 } 350 351 @Override 352 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 353 throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable"); 354 } 355 356 @Override 357 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 358 throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable"); 359 } 360 361 @Override 362 public void clearRegionLocationCache() { 363 throw new NotImplementedException("clearRegionLocationCache not supported in ThriftTable"); 364 } 365 366 @Override 367 public String getClusterId() { 368 try { 369 Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient(); 370 return client.getFirst().getClusterId(); 371 } catch (TException | IOException e) { 372 LOG.error("Error fetching cluster ID: ", e); 373 } 374 return null; 375 } 376}