001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.concurrent.BlockingQueue;
022import java.util.concurrent.ExecutorService;
023import java.util.concurrent.LinkedBlockingQueue;
024import java.util.concurrent.ThreadPoolExecutor;
025import java.util.concurrent.TimeUnit;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.DoNotRetryIOException;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.log.HBaseMarkers;
032import org.apache.hadoop.hbase.util.FutureUtils;
033import org.apache.hadoop.hbase.util.IOExceptionSupplier;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
039import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
040
041/**
042 * The connection implementation based on {@link AsyncConnection}.
043 */
044@InterfaceAudience.Private
045class ConnectionOverAsyncConnection implements Connection {
046
047  private static final Logger LOG = LoggerFactory.getLogger(ConnectionOverAsyncConnection.class);
048
049  private volatile boolean aborted = false;
050
051  // only used for executing coprocessor calls, as users may reference the methods in the
052  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
053  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
054  // interface.
055  private volatile ExecutorService batchPool = null;
056
057  private final AsyncConnectionImpl conn;
058
059  private final ConnectionConfiguration connConf;
060
061  ConnectionOverAsyncConnection(AsyncConnectionImpl conn) {
062    this.conn = conn;
063    this.connConf = new ConnectionConfiguration(conn.getConfiguration());
064  }
065
066  @Override
067  public void abort(String why, Throwable error) {
068    if (error != null) {
069      LOG.error(HBaseMarkers.FATAL, why, error);
070    } else {
071      LOG.error(HBaseMarkers.FATAL, why);
072    }
073    aborted = true;
074    try {
075      Closeables.close(this, true);
076    } catch (IOException e) {
077      throw new AssertionError(e);
078    }
079  }
080
081  @Override
082  public boolean isAborted() {
083    return aborted;
084  }
085
086  @Override
087  public Configuration getConfiguration() {
088    return conn.getConfiguration();
089  }
090
091  @Override
092  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
093    AsyncBufferedMutatorBuilder builder = conn.getBufferedMutatorBuilder(params.getTableName());
094    if (params.getRpcTimeout() != BufferedMutatorParams.UNSET) {
095      builder.setRpcTimeout(params.getRpcTimeout(), TimeUnit.MILLISECONDS);
096    }
097    if (params.getOperationTimeout() != BufferedMutatorParams.UNSET) {
098      builder.setOperationTimeout(params.getOperationTimeout(), TimeUnit.MILLISECONDS);
099    }
100    if (params.getWriteBufferSize() != BufferedMutatorParams.UNSET) {
101      builder.setWriteBufferSize(params.getWriteBufferSize());
102    }
103    if (params.getWriteBufferPeriodicFlushTimeoutMs() != BufferedMutatorParams.UNSET) {
104      builder.setWriteBufferPeriodicFlush(params.getWriteBufferPeriodicFlushTimeoutMs(),
105        TimeUnit.MILLISECONDS);
106    }
107    if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) {
108      builder.setMaxKeyValueSize(params.getMaxKeyValueSize());
109    }
110    if (!params.getRequestAttributes().isEmpty()) {
111
112      builder.setRequestAttributes(params.getRequestAttributes());
113    }
114    return new BufferedMutatorOverAsyncBufferedMutator(builder.build(), params.getListener());
115  }
116
117  @Override
118  public RegionLocator getRegionLocator(TableName tableName) throws IOException {
119    return new RegionLocatorOverAsyncTableRegionLocator(conn.getRegionLocator(tableName));
120  }
121
122  @Override
123  public void clearRegionLocationCache() {
124    conn.clearRegionLocationCache();
125  }
126
127  @Override
128  public Admin getAdmin() throws IOException {
129    return new AdminOverAsyncAdmin(this, (RawAsyncHBaseAdmin) conn.getAdmin());
130  }
131
132  @Override
133  public void close() throws IOException {
134    conn.close();
135  }
136
137  // will be called from AsyncConnection, to avoid infinite loop as in the above method we will call
138  // AsyncConnection.close.
139  synchronized void closePool() {
140    ExecutorService batchPool = this.batchPool;
141    if (batchPool != null) {
142      ConnectionUtils.shutdownPool(batchPool);
143      this.batchPool = null;
144    }
145  }
146
147  @Override
148  public boolean isClosed() {
149    return conn.isClosed();
150  }
151
152  // only used for executing coprocessor calls, as users may reference the methods in the
153  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
154  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
155  // interface.
156  private ThreadPoolExecutor createThreadPool() {
157    Configuration conf = conn.getConfiguration();
158    int threads = conf.getInt("hbase.hconnection.threads.max", 256);
159    long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
160    BlockingQueue<Runnable> workQueue =
161      new LinkedBlockingQueue<>(threads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
162        HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
163    ThreadPoolExecutor tpe = new ThreadPoolExecutor(threads, threads, keepAliveTime,
164      TimeUnit.SECONDS, workQueue,
165      new ThreadFactoryBuilder().setDaemon(true).setNameFormat(toString() + "-shared-%d").build());
166    tpe.allowCoreThreadTimeOut(true);
167    return tpe;
168  }
169
170  // only used for executing coprocessor calls, as users may reference the methods in the
171  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
172  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
173  // interface.
174  private ExecutorService getBatchPool() throws IOException {
175    if (batchPool == null) {
176      synchronized (this) {
177        if (isClosed()) {
178          throw new DoNotRetryIOException("Connection is closed");
179        }
180        if (batchPool == null) {
181          this.batchPool = createThreadPool();
182        }
183      }
184    }
185    return this.batchPool;
186  }
187
188  @Override
189  public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
190    return new TableBuilderBase(tableName, connConf) {
191
192      @Override
193      public Table build() {
194        IOExceptionSupplier<ExecutorService> poolSupplier =
195          pool != null ? () -> pool : ConnectionOverAsyncConnection.this::getBatchPool;
196        AsyncTableBuilder<AdvancedScanResultConsumer> tableBuilder =
197          conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS)
198            .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS)
199            .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS)
200            .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS);
201        requestAttributes.forEach(tableBuilder::setRequestAttribute);
202        return new TableOverAsyncTable(conn, tableBuilder.build(), poolSupplier);
203      }
204    };
205  }
206
207  @Override
208  public AsyncConnection toAsyncConnection() {
209    return conn;
210  }
211
212  @Override
213  public String getClusterId() {
214    return conn.getClusterId();
215  }
216
217  @Override
218  public Hbck getHbck() throws IOException {
219    return FutureUtils.get(conn.getHbck());
220  }
221
222  @Override
223  public Hbck getHbck(ServerName masterServer) throws IOException {
224    return conn.getHbck(masterServer);
225  }
226
227  /**
228   * An identifier that will remain the same for a given connection.
229   */
230  @Override
231  public String toString() {
232    return "connection-over-async-connection-0x" + Integer.toHexString(hashCode());
233  }
234}