001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.setCoprocessorError;
021import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
022
023import java.io.IOException;
024import java.util.concurrent.CompletableFuture;
025import java.util.concurrent.TimeUnit;
026import org.apache.hadoop.hbase.DoNotRetryIOException;
027import org.apache.hadoop.hbase.HRegionLocation;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
030import org.apache.hadoop.hbase.ipc.HBaseRpcController;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.yetus.audience.InterfaceAudience;
033
034import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
035import org.apache.hbase.thirdparty.com.google.protobuf.Message;
036import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
037import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
038import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
039
040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
043
044/**
045 * The implementation of a region based coprocessor rpc channel.
046 */
047@InterfaceAudience.Private
048class RegionCoprocessorRpcChannelImpl implements RpcChannel {
049
050  private final AsyncConnectionImpl conn;
051
052  private final TableName tableName;
053
054  private final RegionInfo region;
055
056  private final byte[] row;
057
058  private final long rpcTimeoutNs;
059
060  private final long operationTimeoutNs;
061
062  private byte[] lastRegion;
063
064  RegionCoprocessorRpcChannelImpl(AsyncConnectionImpl conn, TableName tableName, RegionInfo region,
065      byte[] row, long rpcTimeoutNs, long operationTimeoutNs) {
066    this.conn = conn;
067    this.tableName = tableName;
068    this.region = region;
069    this.row = row;
070    this.rpcTimeoutNs = rpcTimeoutNs;
071    this.operationTimeoutNs = operationTimeoutNs;
072  }
073
074  private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request,
075      Message responsePrototype, HBaseRpcController controller, HRegionLocation loc,
076      ClientService.Interface stub) {
077    CompletableFuture<Message> future = new CompletableFuture<>();
078    if (region != null && !Bytes.equals(loc.getRegion().getRegionName(), region.getRegionName())) {
079      future.completeExceptionally(new DoNotRetryIOException("Region name is changed, expected " +
080        region.getRegionNameAsString() + ", actual " + loc.getRegion().getRegionNameAsString()));
081      return future;
082    }
083    CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method,
084      request, row, loc.getRegion().getRegionName());
085    stub.execService(controller, csr,
086      new org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>() {
087
088        @Override
089        public void run(CoprocessorServiceResponse resp) {
090          if (controller.failed()) {
091            future.completeExceptionally(controller.getFailed());
092          } else {
093            lastRegion = resp.getRegion().getValue().toByteArray();
094            try {
095              future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
096            } catch (IOException e) {
097              future.completeExceptionally(e);
098            }
099          }
100        }
101      });
102    return future;
103  }
104
105  @Override
106  public void callMethod(MethodDescriptor method, RpcController controller, Message request,
107      Message responsePrototype, RpcCallback<Message> done) {
108    addListener(
109      conn.callerFactory.<Message> single().table(tableName).row(row)
110        .locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
111        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
112        .action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call(),
113      (r, e) -> {
114        if (e != null) {
115          setCoprocessorError(controller, e);
116        }
117        done.run(r);
118      });
119  }
120
121  public byte[] getLastRegion() {
122    return lastRegion;
123  }
124}