001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.concurrent.BlockingQueue;
022import java.util.concurrent.ExecutorService;
023import java.util.concurrent.LinkedBlockingQueue;
024import java.util.concurrent.ThreadPoolExecutor;
025import java.util.concurrent.TimeUnit;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.DoNotRetryIOException;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.log.HBaseMarkers;
032import org.apache.hadoop.hbase.util.FutureUtils;
033import org.apache.hadoop.hbase.util.IOExceptionSupplier;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
039import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
040
041/**
042 * The connection implementation based on {@link AsyncConnection}.
043 */
044@InterfaceAudience.Private
045class ConnectionOverAsyncConnection implements Connection {
046
047  private static final Logger LOG = LoggerFactory.getLogger(ConnectionOverAsyncConnection.class);
048
049  private volatile boolean aborted = false;
050
051  // only used for executing coprocessor calls, as users may reference the methods in the
052  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
053  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
054  // interface.
055  private volatile ExecutorService batchPool = null;
056
057  private final AsyncConnectionImpl conn;
058
059  private final ConnectionConfiguration connConf;
060
061  ConnectionOverAsyncConnection(AsyncConnectionImpl conn) {
062    this.conn = conn;
063    this.connConf = new ConnectionConfiguration(conn.getConfiguration());
064  }
065
066  @Override
067  public void abort(String why, Throwable error) {
068    if (error != null) {
069      LOG.error(HBaseMarkers.FATAL, why, error);
070    } else {
071      LOG.error(HBaseMarkers.FATAL, why);
072    }
073    aborted = true;
074    try {
075      Closeables.close(this, true);
076    } catch (IOException e) {
077      throw new AssertionError(e);
078    }
079  }
080
081  @Override
082  public boolean isAborted() {
083    return aborted;
084  }
085
086  @Override
087  public Configuration getConfiguration() {
088    return conn.getConfiguration();
089  }
090
091  @Override
092  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
093    AsyncBufferedMutatorBuilder builder = conn.getBufferedMutatorBuilder(params.getTableName());
094    if (params.getRpcTimeout() != BufferedMutatorParams.UNSET) {
095      builder.setRpcTimeout(params.getRpcTimeout(), TimeUnit.MILLISECONDS);
096    }
097    if (params.getOperationTimeout() != BufferedMutatorParams.UNSET) {
098      builder.setOperationTimeout(params.getOperationTimeout(), TimeUnit.MILLISECONDS);
099    }
100    if (params.getWriteBufferSize() != BufferedMutatorParams.UNSET) {
101      builder.setWriteBufferSize(params.getWriteBufferSize());
102    }
103    if (params.getWriteBufferPeriodicFlushTimeoutMs() != BufferedMutatorParams.UNSET) {
104      builder.setWriteBufferPeriodicFlush(params.getWriteBufferPeriodicFlushTimeoutMs(),
105        TimeUnit.MILLISECONDS);
106    }
107    if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) {
108      builder.setMaxKeyValueSize(params.getMaxKeyValueSize());
109    }
110    return new BufferedMutatorOverAsyncBufferedMutator(builder.build(), params.getListener());
111  }
112
113  @Override
114  public RegionLocator getRegionLocator(TableName tableName) throws IOException {
115    return new RegionLocatorOverAsyncTableRegionLocator(conn.getRegionLocator(tableName));
116  }
117
118  @Override
119  public void clearRegionLocationCache() {
120    conn.clearRegionLocationCache();
121  }
122
123  @Override
124  public Admin getAdmin() throws IOException {
125    return new AdminOverAsyncAdmin(this, (RawAsyncHBaseAdmin) conn.getAdmin());
126  }
127
128  @Override
129  public void close() throws IOException {
130    conn.close();
131  }
132
133  // will be called from AsyncConnection, to avoid infinite loop as in the above method we will call
134  // AsyncConnection.close.
135  synchronized void closePool() {
136    ExecutorService batchPool = this.batchPool;
137    if (batchPool != null) {
138      ConnectionUtils.shutdownPool(batchPool);
139      this.batchPool = null;
140    }
141  }
142
143  @Override
144  public boolean isClosed() {
145    return conn.isClosed();
146  }
147
148  // only used for executing coprocessor calls, as users may reference the methods in the
149  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
150  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
151  // interface.
152  private ThreadPoolExecutor createThreadPool() {
153    Configuration conf = conn.getConfiguration();
154    int threads = conf.getInt("hbase.hconnection.threads.max", 256);
155    long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
156    BlockingQueue<Runnable> workQueue =
157      new LinkedBlockingQueue<>(threads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
158        HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
159    ThreadPoolExecutor tpe = new ThreadPoolExecutor(threads, threads, keepAliveTime,
160      TimeUnit.SECONDS, workQueue,
161      new ThreadFactoryBuilder().setDaemon(true).setNameFormat(toString() + "-shared-%d").build());
162    tpe.allowCoreThreadTimeOut(true);
163    return tpe;
164  }
165
166  // only used for executing coprocessor calls, as users may reference the methods in the
167  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
168  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
169  // interface.
170  private ExecutorService getBatchPool() throws IOException {
171    if (batchPool == null) {
172      synchronized (this) {
173        if (isClosed()) {
174          throw new DoNotRetryIOException("Connection is closed");
175        }
176        if (batchPool == null) {
177          this.batchPool = createThreadPool();
178        }
179      }
180    }
181    return this.batchPool;
182  }
183
184  @Override
185  public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
186    return new TableBuilderBase(tableName, connConf) {
187
188      @Override
189      public Table build() {
190        IOExceptionSupplier<ExecutorService> poolSupplier =
191          pool != null ? () -> pool : ConnectionOverAsyncConnection.this::getBatchPool;
192        AsyncTableBuilder<AdvancedScanResultConsumer> tableBuilder =
193          conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS)
194            .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS)
195            .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS)
196            .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS);
197        requestAttributes.forEach(tableBuilder::setRequestAttribute);
198        return new TableOverAsyncTable(conn, tableBuilder.build(), poolSupplier);
199      }
200    };
201  }
202
203  @Override
204  public AsyncConnection toAsyncConnection() {
205    return conn;
206  }
207
208  @Override
209  public String getClusterId() {
210    return conn.getClusterId();
211  }
212
213  @Override
214  public Hbck getHbck() throws IOException {
215    return FutureUtils.get(conn.getHbck());
216  }
217
218  @Override
219  public Hbck getHbck(ServerName masterServer) throws IOException {
220    return conn.getHbck(masterServer);
221  }
222
223  /**
224   * An identifier that will remain the same for a given connection.
225   */
226  @Override
227  public String toString() {
228    return "connection-over-async-connection-0x" + Integer.toHexString(hashCode());
229  }
230}