001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.util.Collections; 024import java.util.List; 025import org.apache.hadoop.hbase.DoNotRetryIOException; 026import org.apache.hadoop.hbase.ExtendedCellScanner; 027import org.apache.hadoop.hbase.HRegionLocation; 028import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; 029import org.apache.hadoop.hbase.util.Pair; 030import org.apache.hadoop.hbase.wal.WAL.Entry; 031import org.apache.yetus.audience.InterfaceAudience; 032 033import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 034 035import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 037 038/** 039 * For replicating edits to secondary replicas. 040 */ 041@InterfaceAudience.Private 042public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller<Void> { 043 044 private final RegionInfo replica; 045 046 private final Entry[] entries; 047 048 // whether to use replay instead of replicateToReplica, during rolling upgrading if the target 049 // region server has not been upgraded then it will not have the replicateToReplica method, so we 050 // could use replay method first, though it is not perfect. 051 private boolean useReplay; 052 053 public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer, 054 AsyncClusterConnectionImpl conn, int maxAttempts, long rpcTimeoutNs, long operationTimeoutNs, 055 RegionInfo replica, List<Entry> entries) { 056 super(retryTimer, conn, ConnectionUtils.getPriority(replica.getTable()), 057 conn.connConf.getPauseNs(), conn.connConf.getPauseNsForServerOverloaded(), maxAttempts, 058 operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt(), 059 Collections.emptyMap()); 060 this.replica = replica; 061 this.entries = entries.toArray(new Entry[0]); 062 } 063 064 @Override 065 protected Throwable preProcessError(Throwable error) { 066 if ( 067 error instanceof DoNotRetryIOException 068 && error.getCause() instanceof UnsupportedOperationException 069 ) { 070 // fallback to use replay, and also return the cause to let the upper retry 071 useReplay = true; 072 return error.getCause(); 073 } 074 return error; 075 } 076 077 private void onComplete(HRegionLocation loc) { 078 if (controller.failed()) { 079 onError(controller.getFailed(), 080 () -> "Call to " + loc.getServerName() + " for " + replica + " failed", 081 err -> conn.getLocator().updateCachedLocationOnError(loc, err)); 082 } else { 083 future.complete(null); 084 } 085 } 086 087 private void call(HRegionLocation loc) { 088 AdminService.Interface stub; 089 try { 090 stub = conn.getAdminStub(loc.getServerName()); 091 } catch (IOException e) { 092 onError(e, 093 () -> "Get async admin stub to " + loc.getServerName() + " for " + replica + " failed", 094 err -> conn.getLocator().updateCachedLocationOnError(loc, err)); 095 return; 096 } 097 Pair<ReplicateWALEntryRequest, ExtendedCellScanner> pair = ReplicationProtobufUtil 098 .buildReplicateWALEntryRequest(entries, replica.getEncodedNameAsBytes(), null, null, null); 099 resetCallTimeout(); 100 controller.setCellScanner(pair.getSecond()); 101 if (useReplay) { 102 stub.replay(controller, pair.getFirst(), r -> onComplete(loc)); 103 } else { 104 stub.replicateToReplica(controller, pair.getFirst(), r -> onComplete(loc)); 105 } 106 } 107 108 @Override 109 protected void doCall() { 110 long locateTimeoutNs; 111 if (operationTimeoutNs > 0) { 112 locateTimeoutNs = remainingTimeNs(); 113 if (locateTimeoutNs <= 0) { 114 completeExceptionally(); 115 return; 116 } 117 } else { 118 locateTimeoutNs = -1L; 119 } 120 addListener(conn.getLocator().getRegionLocation(replica.getTable(), replica.getStartKey(), 121 replica.getReplicaId(), RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> { 122 if (error != null) { 123 onError(error, () -> "Locate " + replica + " failed", err -> { 124 }); 125 return; 126 } 127 call(loc); 128 }); 129 } 130}