001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.concurrent.BlockingQueue;
022import java.util.concurrent.ExecutorService;
023import java.util.concurrent.LinkedBlockingQueue;
024import java.util.concurrent.ThreadPoolExecutor;
025import java.util.concurrent.TimeUnit;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.DoNotRetryIOException;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.log.HBaseMarkers;
032import org.apache.hadoop.hbase.util.FutureUtils;
033import org.apache.hadoop.hbase.util.IOExceptionSupplier;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
039import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
040
041/**
042 * The connection implementation based on {@link AsyncConnection}.
043 */
044@InterfaceAudience.Private
045class ConnectionOverAsyncConnection implements Connection {
046
047  private static final Logger LOG = LoggerFactory.getLogger(ConnectionOverAsyncConnection.class);
048
049  private volatile boolean aborted = false;
050
051  // only used for executing coprocessor calls, as users may reference the methods in the
052  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
053  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
054  // interface.
055  private volatile ExecutorService batchPool = null;
056
057  private final AsyncConnectionImpl conn;
058
059  private final ConnectionConfiguration connConf;
060
061  ConnectionOverAsyncConnection(AsyncConnectionImpl conn) {
062    this.conn = conn;
063    this.connConf = new ConnectionConfiguration(conn.getConfiguration());
064  }
065
066  @Override
067  public void abort(String why, Throwable error) {
068    if (error != null) {
069      LOG.error(HBaseMarkers.FATAL, why, error);
070    } else {
071      LOG.error(HBaseMarkers.FATAL, why);
072    }
073    aborted = true;
074    try {
075      Closeables.close(this, true);
076    } catch (IOException e) {
077      throw new AssertionError(e);
078    }
079  }
080
081  @Override
082  public boolean isAborted() {
083    return aborted;
084  }
085
086  @Override
087  public Configuration getConfiguration() {
088    return conn.getConfiguration();
089  }
090
091  @Override
092  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
093    AsyncBufferedMutatorBuilder builder = conn.getBufferedMutatorBuilder(params.getTableName());
094    if (params.getRpcTimeout() != BufferedMutatorParams.UNSET) {
095      builder.setRpcTimeout(params.getRpcTimeout(), TimeUnit.MILLISECONDS);
096    }
097    if (params.getOperationTimeout() != BufferedMutatorParams.UNSET) {
098      builder.setOperationTimeout(params.getOperationTimeout(), TimeUnit.MILLISECONDS);
099    }
100    if (params.getWriteBufferSize() != BufferedMutatorParams.UNSET) {
101      builder.setWriteBufferSize(params.getWriteBufferSize());
102    }
103    if (params.getWriteBufferPeriodicFlushTimeoutMs() != BufferedMutatorParams.UNSET) {
104      builder.setWriteBufferPeriodicFlush(params.getWriteBufferPeriodicFlushTimeoutMs(),
105        TimeUnit.MILLISECONDS);
106    }
107    if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) {
108      builder.setMaxKeyValueSize(params.getMaxKeyValueSize());
109    }
110    if (params.getMaxMutations() != BufferedMutatorParams.UNSET) {
111      builder.setMaxMutations(params.getMaxMutations());
112    }
113    if (!params.getRequestAttributes().isEmpty()) {
114
115      builder.setRequestAttributes(params.getRequestAttributes());
116    }
117    return new BufferedMutatorOverAsyncBufferedMutator(builder.build(), params.getListener());
118  }
119
120  @Override
121  public RegionLocator getRegionLocator(TableName tableName) throws IOException {
122    return new RegionLocatorOverAsyncTableRegionLocator(conn.getRegionLocator(tableName));
123  }
124
125  @Override
126  public void clearRegionLocationCache() {
127    conn.clearRegionLocationCache();
128  }
129
130  @Override
131  public Admin getAdmin() throws IOException {
132    return new AdminOverAsyncAdmin(this, (RawAsyncHBaseAdmin) conn.getAdmin());
133  }
134
135  @Override
136  public void close() throws IOException {
137    conn.close();
138  }
139
140  // will be called from AsyncConnection, to avoid infinite loop as in the above method we will call
141  // AsyncConnection.close.
142  synchronized void closePool() {
143    ExecutorService batchPool = this.batchPool;
144    if (batchPool != null) {
145      ConnectionUtils.shutdownPool(batchPool);
146      this.batchPool = null;
147    }
148  }
149
150  @Override
151  public boolean isClosed() {
152    return conn.isClosed();
153  }
154
155  // only used for executing coprocessor calls, as users may reference the methods in the
156  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
157  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
158  // interface.
159  private ThreadPoolExecutor createThreadPool() {
160    Configuration conf = conn.getConfiguration();
161    int threads = conf.getInt("hbase.hconnection.threads.max", 256);
162    long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
163    BlockingQueue<Runnable> workQueue =
164      new LinkedBlockingQueue<>(threads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
165        HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
166    ThreadPoolExecutor tpe = new ThreadPoolExecutor(threads, threads, keepAliveTime,
167      TimeUnit.SECONDS, workQueue,
168      new ThreadFactoryBuilder().setDaemon(true).setNameFormat(toString() + "-shared-%d").build());
169    tpe.allowCoreThreadTimeOut(true);
170    return tpe;
171  }
172
173  // only used for executing coprocessor calls, as users may reference the methods in the
174  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
175  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
176  // interface.
177  private ExecutorService getBatchPool() throws IOException {
178    if (batchPool == null) {
179      synchronized (this) {
180        if (isClosed()) {
181          throw new DoNotRetryIOException("Connection is closed");
182        }
183        if (batchPool == null) {
184          this.batchPool = createThreadPool();
185        }
186      }
187    }
188    return this.batchPool;
189  }
190
191  @Override
192  public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
193    return new TableBuilderBase(tableName, connConf) {
194
195      @Override
196      public Table build() {
197        IOExceptionSupplier<ExecutorService> poolSupplier =
198          pool != null ? () -> pool : ConnectionOverAsyncConnection.this::getBatchPool;
199        AsyncTableBuilder<AdvancedScanResultConsumer> tableBuilder =
200          conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS)
201            .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS)
202            .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS)
203            .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS);
204        requestAttributes.forEach(tableBuilder::setRequestAttribute);
205        return new TableOverAsyncTable(conn, tableBuilder.build(), poolSupplier);
206      }
207    };
208  }
209
210  @Override
211  public AsyncConnection toAsyncConnection() {
212    return conn;
213  }
214
215  @Override
216  public String getClusterId() {
217    return conn.getClusterId();
218  }
219
220  @Override
221  public Hbck getHbck() throws IOException {
222    return FutureUtils.get(conn.getHbck());
223  }
224
225  @Override
226  public Hbck getHbck(ServerName masterServer) throws IOException {
227    return conn.getHbck(masterServer);
228  }
229
230  /**
231   * An identifier that will remain the same for a given connection.
232   */
233  @Override
234  public String toString() {
235    return "connection-over-async-connection-0x" + Integer.toHexString(hashCode());
236  }
237}