001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import com.google.protobuf.Descriptors.MethodDescriptor;
023import com.google.protobuf.Message;
024import com.google.protobuf.RpcCallback;
025import com.google.protobuf.RpcChannel;
026import com.google.protobuf.RpcController;
027import io.opentelemetry.context.Context;
028import io.opentelemetry.context.Scope;
029import java.io.IOException;
030import java.util.concurrent.CompletableFuture;
031import java.util.concurrent.TimeUnit;
032import org.apache.hadoop.hbase.DoNotRetryIOException;
033import org.apache.hadoop.hbase.HRegionLocation;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
036import org.apache.hadoop.hbase.ipc.HBaseRpcController;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.yetus.audience.InterfaceAudience;
039
040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
042
043/**
044 * The implementation of a region based coprocessor rpc channel.
045 */
046@InterfaceAudience.Private
047class RegionCoprocessorRpcChannelImpl implements RpcChannel {
048
049  private final AsyncConnectionImpl conn;
050
051  private final TableName tableName;
052
053  private final RegionInfo region;
054
055  private final byte[] row;
056
057  private final long rpcTimeoutNs;
058
059  private final long operationTimeoutNs;
060
061  RegionCoprocessorRpcChannelImpl(AsyncConnectionImpl conn, TableName tableName, RegionInfo region,
062    byte[] row, long rpcTimeoutNs, long operationTimeoutNs) {
063    this.conn = conn;
064    this.tableName = tableName;
065    this.region = region;
066    this.row = row;
067    this.rpcTimeoutNs = rpcTimeoutNs;
068    this.operationTimeoutNs = operationTimeoutNs;
069  }
070
071  private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request,
072    Message responsePrototype, HBaseRpcController controller, HRegionLocation loc,
073    ClientService.Interface stub) {
074    final Context context = Context.current();
075    CompletableFuture<Message> future = new CompletableFuture<>();
076    if (
077      region != null && !Bytes.equals(loc.getRegionInfo().getRegionName(), region.getRegionName())
078    ) {
079      future.completeExceptionally(new DoNotRetryIOException(
080        "Region name is changed, expected " + region.getRegionNameAsString() + ", actual "
081          + loc.getRegionInfo().getRegionNameAsString()));
082      return future;
083    }
084    CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method,
085      request, row, loc.getRegionInfo().getRegionName());
086    stub.execService(controller, csr, resp -> {
087      try (Scope ignored = context.makeCurrent()) {
088        if (controller.failed()) {
089          future.completeExceptionally(controller.getFailed());
090        } else {
091          try {
092            future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
093          } catch (IOException e) {
094            future.completeExceptionally(e);
095          }
096        }
097      }
098    });
099    return future;
100  }
101
102  @Override
103  public void callMethod(MethodDescriptor method, RpcController controller, Message request,
104    Message responsePrototype, RpcCallback<Message> done) {
105    final Context context = Context.current();
106    addListener(conn.callerFactory.<Message> single().table(tableName).row(row)
107      .locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
108      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS).action((c, l, s) -> {
109        try (Scope ignored = context.makeCurrent()) {
110          return rpcCall(method, request, responsePrototype, c, l, s);
111        }
112      }).call(), (r, e) -> {
113        try (Scope ignored = context.makeCurrent()) {
114          if (e != null) {
115            ((ClientCoprocessorRpcController) controller).setFailed(e);
116          }
117          done.run(r);
118        }
119      });
120  }
121}