001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.util.Collections;
024import java.util.List;
025import org.apache.hadoop.hbase.DoNotRetryIOException;
026import org.apache.hadoop.hbase.ExtendedCellScanner;
027import org.apache.hadoop.hbase.HRegionLocation;
028import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
029import org.apache.hadoop.hbase.util.Pair;
030import org.apache.hadoop.hbase.wal.WAL.Entry;
031import org.apache.yetus.audience.InterfaceAudience;
032
033import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
034
035import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
037
038/**
039 * For replicating edits to secondary replicas.
040 */
041@InterfaceAudience.Private
042public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller<Void> {
043
044  private final RegionInfo replica;
045
046  private final Entry[] entries;
047
048  // whether to use replay instead of replicateToReplica, during rolling upgrading if the target
049  // region server has not been upgraded then it will not have the replicateToReplica method, so we
050  // could use replay method first, though it is not perfect.
051  private boolean useReplay;
052
053  public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer,
054    AsyncClusterConnectionImpl conn, int maxAttempts, long rpcTimeoutNs, long operationTimeoutNs,
055    RegionInfo replica, List<Entry> entries) {
056    super(retryTimer, conn, ConnectionUtils.getPriority(replica.getTable()),
057      conn.connConf.getPauseNs(), conn.connConf.getPauseNsForServerOverloaded(), maxAttempts,
058      operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt(),
059      Collections.emptyMap());
060    this.replica = replica;
061    this.entries = entries.toArray(new Entry[0]);
062  }
063
064  @Override
065  protected Throwable preProcessError(Throwable error) {
066    if (
067      error instanceof DoNotRetryIOException
068        && error.getCause() instanceof UnsupportedOperationException
069    ) {
070      // fallback to use replay, and also return the cause to let the upper retry
071      useReplay = true;
072      return error.getCause();
073    }
074    return error;
075  }
076
077  private void onComplete(HRegionLocation loc) {
078    if (controller.failed()) {
079      onError(controller.getFailed(),
080        () -> "Call to " + loc.getServerName() + " for " + replica + " failed",
081        err -> conn.getLocator().updateCachedLocationOnError(loc, err));
082    } else {
083      future.complete(null);
084    }
085  }
086
087  private void call(HRegionLocation loc) {
088    AdminService.Interface stub;
089    try {
090      stub = conn.getAdminStub(loc.getServerName());
091    } catch (IOException e) {
092      onError(e,
093        () -> "Get async admin stub to " + loc.getServerName() + " for " + replica + " failed",
094        err -> conn.getLocator().updateCachedLocationOnError(loc, err));
095      return;
096    }
097    Pair<ReplicateWALEntryRequest, ExtendedCellScanner> pair = ReplicationProtobufUtil
098      .buildReplicateWALEntryRequest(entries, replica.getEncodedNameAsBytes(), null, null, null);
099    resetCallTimeout();
100    controller.setCellScanner(pair.getSecond());
101    if (useReplay) {
102      stub.replay(controller, pair.getFirst(), r -> onComplete(loc));
103    } else {
104      stub.replicateToReplica(controller, pair.getFirst(), r -> onComplete(loc));
105    }
106  }
107
108  @Override
109  protected void doCall() {
110    long locateTimeoutNs;
111    if (operationTimeoutNs > 0) {
112      locateTimeoutNs = remainingTimeNs();
113      if (locateTimeoutNs <= 0) {
114        completeExceptionally();
115        return;
116      }
117    } else {
118      locateTimeoutNs = -1L;
119    }
120    addListener(conn.getLocator().getRegionLocation(replica.getTable(), replica.getStartKey(),
121      replica.getReplicaId(), RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> {
122        if (error != null) {
123          onError(error, () -> "Locate " + replica + " failed", err -> {
124          });
125          return;
126        }
127        call(loc);
128      });
129  }
130}