001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.ThreadPoolExecutor;
026import java.util.concurrent.TimeUnit;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.DoNotRetryIOException;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.log.HBaseMarkers;
033import org.apache.hadoop.hbase.util.FutureUtils;
034import org.apache.hadoop.hbase.util.IOExceptionSupplier;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
041
042/**
043 * The connection implementation based on {@link AsyncConnection}.
044 */
045@InterfaceAudience.Private
046class ConnectionOverAsyncConnection implements Connection {
047
048  private static final Logger LOG = LoggerFactory.getLogger(ConnectionOverAsyncConnection.class);
049
050  private volatile boolean aborted = false;
051
052  // only used for executing coprocessor calls, as users may reference the methods in the
053  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
054  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
055  // interface.
056  private volatile ExecutorService batchPool = null;
057
058  private final AsyncConnectionImpl conn;
059
060  private final ConnectionConfiguration connConf;
061
062  ConnectionOverAsyncConnection(AsyncConnectionImpl conn) {
063    this.conn = conn;
064    this.connConf = new ConnectionConfiguration(conn.getConfiguration());
065  }
066
067  @RestrictedApi(explanation = "Should only be called in tests", link = "",
068      allowedOnPath = ".*/src/test/.*")
069  AsyncConnectionImpl getAsyncConnection() {
070    return conn;
071  }
072
073  @Override
074  public void abort(String why, Throwable error) {
075    if (error != null) {
076      LOG.error(HBaseMarkers.FATAL, why, error);
077    } else {
078      LOG.error(HBaseMarkers.FATAL, why);
079    }
080    aborted = true;
081    try {
082      Closeables.close(this, true);
083    } catch (IOException e) {
084      throw new AssertionError(e);
085    }
086  }
087
088  @Override
089  public boolean isAborted() {
090    return aborted;
091  }
092
093  @Override
094  public Configuration getConfiguration() {
095    return conn.getConfiguration();
096  }
097
098  @Override
099  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
100    AsyncBufferedMutatorBuilder builder = conn.getBufferedMutatorBuilder(params.getTableName());
101    if (params.getRpcTimeout() != BufferedMutatorParams.UNSET) {
102      builder.setRpcTimeout(params.getRpcTimeout(), TimeUnit.MILLISECONDS);
103    }
104    if (params.getOperationTimeout() != BufferedMutatorParams.UNSET) {
105      builder.setOperationTimeout(params.getOperationTimeout(), TimeUnit.MILLISECONDS);
106    }
107    if (params.getWriteBufferSize() != BufferedMutatorParams.UNSET) {
108      builder.setWriteBufferSize(params.getWriteBufferSize());
109    }
110    if (params.getWriteBufferPeriodicFlushTimeoutMs() != BufferedMutatorParams.UNSET) {
111      builder.setWriteBufferPeriodicFlush(params.getWriteBufferPeriodicFlushTimeoutMs(),
112        TimeUnit.MILLISECONDS);
113    }
114    if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) {
115      builder.setMaxKeyValueSize(params.getMaxKeyValueSize());
116    }
117    if (params.getMaxMutations() != BufferedMutatorParams.UNSET) {
118      builder.setMaxMutations(params.getMaxMutations());
119    }
120    if (!params.getRequestAttributes().isEmpty()) {
121
122      builder.setRequestAttributes(params.getRequestAttributes());
123    }
124    return new BufferedMutatorOverAsyncBufferedMutator(builder.build(), params.getListener());
125  }
126
127  @Override
128  public RegionLocator getRegionLocator(TableName tableName) throws IOException {
129    return new RegionLocatorOverAsyncTableRegionLocator(conn.getRegionLocator(tableName));
130  }
131
132  @Override
133  public void clearRegionLocationCache() {
134    conn.clearRegionLocationCache();
135  }
136
137  @Override
138  public Admin getAdmin() throws IOException {
139    return new AdminOverAsyncAdmin(this, (RawAsyncHBaseAdmin) conn.getAdmin());
140  }
141
142  @Override
143  public void close() throws IOException {
144    conn.close();
145  }
146
147  // will be called from AsyncConnection, to avoid infinite loop as in the above method we will call
148  // AsyncConnection.close.
149  synchronized void closePool() {
150    ExecutorService batchPool = this.batchPool;
151    if (batchPool != null) {
152      ConnectionUtils.shutdownPool(batchPool);
153      this.batchPool = null;
154    }
155  }
156
157  @Override
158  public boolean isClosed() {
159    return conn.isClosed();
160  }
161
162  // only used for executing coprocessor calls, as users may reference the methods in the
163  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
164  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
165  // interface.
166  private ThreadPoolExecutor createThreadPool() {
167    Configuration conf = conn.getConfiguration();
168    int threads = conf.getInt("hbase.hconnection.threads.max", 256);
169    long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
170    BlockingQueue<Runnable> workQueue =
171      new LinkedBlockingQueue<>(threads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
172        HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
173    ThreadPoolExecutor tpe = new ThreadPoolExecutor(threads, threads, keepAliveTime,
174      TimeUnit.SECONDS, workQueue,
175      new ThreadFactoryBuilder().setDaemon(true).setNameFormat(toString() + "-shared-%d").build());
176    tpe.allowCoreThreadTimeOut(true);
177    return tpe;
178  }
179
180  // only used for executing coprocessor calls, as users may reference the methods in the
181  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
182  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
183  // interface.
184  private ExecutorService getBatchPool() throws IOException {
185    if (batchPool == null) {
186      synchronized (this) {
187        if (isClosed()) {
188          throw new DoNotRetryIOException("Connection is closed");
189        }
190        if (batchPool == null) {
191          this.batchPool = createThreadPool();
192        }
193      }
194    }
195    return this.batchPool;
196  }
197
198  @Override
199  public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
200    return new TableBuilderBase(tableName, connConf) {
201
202      @Override
203      public Table build() {
204        IOExceptionSupplier<ExecutorService> poolSupplier =
205          pool != null ? () -> pool : ConnectionOverAsyncConnection.this::getBatchPool;
206        AsyncTableBuilder<AdvancedScanResultConsumer> tableBuilder =
207          conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS)
208            .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS)
209            .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS)
210            .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS);
211        requestAttributes.forEach(tableBuilder::setRequestAttribute);
212        return new TableOverAsyncTable(conn, tableBuilder.build(), poolSupplier);
213      }
214    };
215  }
216
217  @Override
218  public AsyncConnection toAsyncConnection() {
219    return conn;
220  }
221
222  @Override
223  public String getClusterId() {
224    return conn.getClusterId();
225  }
226
227  @Override
228  public Hbck getHbck() throws IOException {
229    return FutureUtils.get(conn.getHbck());
230  }
231
232  @Override
233  public Hbck getHbck(ServerName masterServer) throws IOException {
234    return conn.getHbck(masterServer);
235  }
236
237  /**
238   * An identifier that will remain the same for a given connection.
239   */
240  @Override
241  public String toString() {
242    return "connection-over-async-connection-0x" + Integer.toHexString(hashCode());
243  }
244}