001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import com.google.protobuf.Descriptors.MethodDescriptor;
023import com.google.protobuf.Message;
024import com.google.protobuf.RpcCallback;
025import com.google.protobuf.RpcChannel;
026import com.google.protobuf.RpcController;
027import java.io.IOException;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.hbase.DoNotRetryIOException;
031import org.apache.hadoop.hbase.HRegionLocation;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
034import org.apache.hadoop.hbase.ipc.HBaseRpcController;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.yetus.audience.InterfaceAudience;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
041
042/**
043 * The implementation of a region based coprocessor rpc channel.
044 */
045@InterfaceAudience.Private
046class RegionCoprocessorRpcChannelImpl implements RpcChannel {
047
048  private final AsyncConnectionImpl conn;
049
050  private final TableName tableName;
051
052  private final RegionInfo region;
053
054  private final byte[] row;
055
056  private final long rpcTimeoutNs;
057
058  private final long operationTimeoutNs;
059
060  RegionCoprocessorRpcChannelImpl(AsyncConnectionImpl conn, TableName tableName, RegionInfo region,
061      byte[] row, long rpcTimeoutNs, long operationTimeoutNs) {
062    this.conn = conn;
063    this.tableName = tableName;
064    this.region = region;
065    this.row = row;
066    this.rpcTimeoutNs = rpcTimeoutNs;
067    this.operationTimeoutNs = operationTimeoutNs;
068  }
069
070  private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request,
071      Message responsePrototype, HBaseRpcController controller, HRegionLocation loc,
072      ClientService.Interface stub) {
073    CompletableFuture<Message> future = new CompletableFuture<>();
074    if (region != null
075        && !Bytes.equals(loc.getRegionInfo().getRegionName(), region.getRegionName())) {
076      future.completeExceptionally(new DoNotRetryIOException(
077          "Region name is changed, expected " + region.getRegionNameAsString() + ", actual "
078              + loc.getRegionInfo().getRegionNameAsString()));
079      return future;
080    }
081    CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method,
082      request, row, loc.getRegionInfo().getRegionName());
083    stub.execService(controller, csr,
084      new org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>() {
085
086        @Override
087        public void run(CoprocessorServiceResponse resp) {
088          if (controller.failed()) {
089            future.completeExceptionally(controller.getFailed());
090          } else {
091            try {
092              future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
093            } catch (IOException e) {
094              future.completeExceptionally(e);
095            }
096          }
097        }
098      });
099    return future;
100  }
101
102  @Override
103  public void callMethod(MethodDescriptor method, RpcController controller, Message request,
104      Message responsePrototype, RpcCallback<Message> done) {
105    addListener(
106      conn.callerFactory.<Message> single().table(tableName).row(row)
107        .locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
108        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
109        .action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call(),
110      (r, e) -> {
111        if (e != null) {
112          ((ClientCoprocessorRpcController) controller).setFailed(e);
113        }
114        done.run(r);
115      });
116  }
117}