001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.thrift2.client; 020 021import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_SOCKET_TIMEOUT_CONNECT; 022import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.lang.reflect.Constructor; 027import java.net.UnknownHostException; 028import java.util.Arrays; 029import java.util.HashMap; 030import java.util.Map; 031import java.util.concurrent.ExecutorService; 032 033import javax.net.ssl.SSLException; 034 035import org.apache.commons.lang3.NotImplementedException; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.client.BufferedMutator; 041import org.apache.hadoop.hbase.client.BufferedMutatorParams; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.ConnectionUtils; 044import org.apache.hadoop.hbase.client.RegionLocator; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.client.TableBuilder; 047import org.apache.hadoop.hbase.security.User; 048import org.apache.hadoop.hbase.thrift.Constants; 049import org.apache.hadoop.hbase.thrift2.generated.THBaseService; 050import org.apache.hadoop.hbase.util.Pair; 051import org.apache.http.HttpRequest; 052import org.apache.http.client.HttpClient; 053import org.apache.http.client.config.RequestConfig; 054import org.apache.http.client.utils.HttpClientUtils; 055import org.apache.http.impl.client.DefaultHttpRequestRetryHandler; 056import org.apache.http.impl.client.HttpClientBuilder; 057import org.apache.http.protocol.HttpContext; 058import org.apache.thrift.protocol.TBinaryProtocol; 059import org.apache.thrift.protocol.TCompactProtocol; 060import org.apache.thrift.protocol.TProtocol; 061import org.apache.thrift.transport.TFramedTransport; 062import org.apache.thrift.transport.THttpClient; 063import org.apache.thrift.transport.TSocket; 064import org.apache.thrift.transport.TTransport; 065import org.apache.thrift.transport.TTransportException; 066import org.apache.yetus.audience.InterfaceAudience; 067 068import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 069 070@InterfaceAudience.Private 071public class ThriftConnection implements Connection { 072 private Configuration conf; 073 private User user; 074 // For HTTP protocol 075 private HttpClient httpClient; 076 private boolean httpClientCreated = false; 077 private boolean isClosed = false; 078 079 private String host; 080 private int port; 081 private boolean isFramed = false; 082 private boolean isCompact = false; 083 084 private ThriftClientBuilder clientBuilder; 085 086 private int operationTimeout; 087 private int connectTimeout; 088 089 public ThriftConnection(Configuration conf, ExecutorService pool, final User user) 090 throws IOException { 091 this.conf = conf; 092 this.user = user; 093 this.host = conf.get(Constants.HBASE_THRIFT_SERVER_NAME); 094 this.port = conf.getInt(Constants.HBASE_THRIFT_SERVER_PORT, -1); 095 Preconditions.checkArgument(port > 0); 096 Preconditions.checkArgument(host != null); 097 this.isFramed = conf.getBoolean(Constants.FRAMED_CONF_KEY, Constants.FRAMED_CONF_DEFAULT); 098 this.isCompact = conf.getBoolean(Constants.COMPACT_CONF_KEY, Constants.COMPACT_CONF_DEFAULT); 099 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 100 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 101 this.connectTimeout = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT); 102 103 String className = conf.get(Constants.HBASE_THRIFT_CLIENT_BUIDLER_CLASS, 104 DefaultThriftClientBuilder.class.getName()); 105 try { 106 Class<?> clazz = Class.forName(className); 107 Constructor<?> constructor = clazz 108 .getDeclaredConstructor(ThriftConnection.class); 109 constructor.setAccessible(true); 110 clientBuilder = (ThriftClientBuilder) constructor.newInstance(this); 111 }catch (Exception e) { 112 throw new IOException(e); 113 } 114 } 115 116 public synchronized void setHttpClient(HttpClient httpClient) { 117 this.httpClient = httpClient; 118 } 119 120 @Override 121 public Configuration getConfiguration() { 122 return conf; 123 } 124 125 public String getHost() { 126 return host; 127 } 128 129 public int getPort() { 130 return port; 131 } 132 133 public boolean isFramed() { 134 return isFramed; 135 } 136 137 public boolean isCompact() { 138 return isCompact; 139 } 140 141 public int getOperationTimeout() { 142 return operationTimeout; 143 } 144 145 public int getConnectTimeout() { 146 return connectTimeout; 147 } 148 149 public ThriftClientBuilder getClientBuilder() { 150 return clientBuilder; 151 } 152 153 /** 154 * the default thrift client builder. 155 * One can extend the ThriftClientBuilder to builder custom client, implement 156 * features like authentication(hbase-examples/thrift/DemoClient) 157 * 158 */ 159 public static class DefaultThriftClientBuilder extends ThriftClientBuilder { 160 161 @Override 162 public Pair<THBaseService.Client, TTransport> getClient() throws IOException { 163 TSocket sock = new TSocket(connection.getHost(), connection.getPort()); 164 sock.setSocketTimeout(connection.getOperationTimeout()); 165 sock.setConnectTimeout(connection.getConnectTimeout()); 166 TTransport tTransport = sock; 167 if (connection.isFramed()) { 168 tTransport = new TFramedTransport(tTransport); 169 } 170 try { 171 sock.open(); 172 } catch (TTransportException e) { 173 throw new IOException(e); 174 } 175 TProtocol prot; 176 if (connection.isCompact()) { 177 prot = new TCompactProtocol(tTransport); 178 } else { 179 prot = new TBinaryProtocol(tTransport); 180 } 181 THBaseService.Client client = new THBaseService.Client(prot); 182 return new Pair<>(client, tTransport); 183 } 184 185 public DefaultThriftClientBuilder(ThriftConnection connection) { 186 super(connection); 187 } 188 } 189 190 /** 191 * the default thrift http client builder. 192 * One can extend the ThriftClientBuilder to builder custom http client, implement 193 * features like authentication or 'DoAs'(hbase-examples/thrift/HttpDoAsClient) 194 * 195 */ 196 public static class HTTPThriftClientBuilder extends ThriftClientBuilder { 197 Map<String,String> customHeader = new HashMap<>(); 198 199 public HTTPThriftClientBuilder(ThriftConnection connection) { 200 super(connection); 201 } 202 203 public void addCostumHeader(String key, String value) { 204 customHeader.put(key, value); 205 } 206 207 @Override 208 public Pair<THBaseService.Client, TTransport> getClient() throws IOException { 209 Preconditions.checkArgument(connection.getHost().startsWith("http"), 210 "http client host must start with http or https"); 211 String url = connection.getHost() + ":" + connection.getPort(); 212 try { 213 THttpClient httpClient = new THttpClient(url, connection.getHttpClient()); 214 for (Map.Entry<String, String> header : customHeader.entrySet()) { 215 httpClient.setCustomHeader(header.getKey(), header.getValue()); 216 } 217 httpClient.open(); 218 TProtocol prot = new TBinaryProtocol(httpClient); 219 THBaseService.Client client = new THBaseService.Client(prot); 220 return new Pair<>(client, httpClient); 221 } catch (TTransportException e) { 222 throw new IOException(e); 223 } 224 225 } 226 } 227 228 /** 229 * Get a ThriftAdmin, ThriftAdmin is NOT thread safe 230 * @return a ThriftAdmin 231 * @throws IOException IOException 232 */ 233 @Override 234 public Admin getAdmin() throws IOException { 235 Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient(); 236 return new ThriftAdmin(client.getFirst(), client.getSecond(), conf); 237 } 238 239 public static class DelayRetryHandler extends DefaultHttpRequestRetryHandler { 240 private long pause; 241 242 public DelayRetryHandler(int retryCount, long pause) { 243 super(retryCount, true, Arrays.asList( 244 InterruptedIOException.class, 245 UnknownHostException.class, 246 SSLException.class)); 247 this.pause = pause; 248 } 249 250 @Override 251 public boolean retryRequest(IOException exception, int executionCount, HttpContext context) { 252 // Don't sleep for retrying the first time 253 if (executionCount > 1 && pause > 0) { 254 try { 255 long sleepTime = ConnectionUtils.getPauseTime(pause, executionCount - 1); 256 Thread.sleep(sleepTime); 257 } catch (InterruptedException ie) { 258 //reset interrupt marker 259 Thread.currentThread().interrupt(); 260 } 261 } 262 return super.retryRequest(exception, executionCount, context); 263 } 264 265 @Override 266 protected boolean handleAsIdempotent(HttpRequest request) { 267 return true; 268 } 269 } 270 271 public synchronized HttpClient getHttpClient() { 272 if (httpClient != null) { 273 return httpClient; 274 } 275 int retry = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 276 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 277 long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 5); 278 HttpClientBuilder builder = HttpClientBuilder.create(); 279 RequestConfig.Builder requestBuilder = RequestConfig.custom(); 280 requestBuilder = requestBuilder.setConnectTimeout(getConnectTimeout()); 281 requestBuilder = requestBuilder.setSocketTimeout(getOperationTimeout()); 282 builder.setRetryHandler(new DelayRetryHandler(retry, pause)); 283 builder.setDefaultRequestConfig(requestBuilder.build()); 284 httpClient = builder.build(); 285 httpClientCreated = true; 286 return httpClient; 287 } 288 289 @Override 290 public synchronized void close() throws IOException { 291 if (httpClient != null && httpClientCreated) { 292 HttpClientUtils.closeQuietly(httpClient); 293 } 294 isClosed = true; 295 } 296 297 @Override 298 public boolean isClosed() { 299 return isClosed; 300 } 301 302 /** 303 * Get a TableBuider to build ThriftTable, ThriftTable is NOT thread safe 304 * @return a TableBuilder 305 * @throws IOException IOException 306 */ 307 @Override 308 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 309 return new TableBuilder() { 310 @Override 311 public TableBuilder setOperationTimeout(int timeout) { 312 return this; 313 } 314 315 @Override 316 public TableBuilder setRpcTimeout(int timeout) { 317 return this; 318 } 319 320 @Override 321 public TableBuilder setReadRpcTimeout(int timeout) { 322 return this; 323 } 324 325 @Override 326 public TableBuilder setWriteRpcTimeout(int timeout) { 327 return this; 328 } 329 330 @Override 331 public Table build() { 332 try { 333 Pair<THBaseService.Client, TTransport> client = clientBuilder.getClient(); 334 return new ThriftTable(tableName, client.getFirst(), client.getSecond(), conf); 335 } catch (IOException ioE) { 336 throw new RuntimeException(ioE); 337 } 338 339 } 340 }; 341 } 342 343 @Override 344 public void abort(String why, Throwable e) { 345 346 } 347 348 @Override 349 public boolean isAborted() { 350 return false; 351 } 352 353 @Override 354 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 355 throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable"); 356 } 357 358 @Override 359 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 360 throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable"); 361 } 362 363 @Override 364 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 365 throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable"); 366 } 367 368 @Override 369 public void clearRegionLocationCache() { 370 throw new NotImplementedException("clearRegionLocationCache not supported in ThriftTable"); 371 } 372}