001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.util.List; 024import org.apache.hadoop.hbase.CellScanner; 025import org.apache.hadoop.hbase.DoNotRetryIOException; 026import org.apache.hadoop.hbase.HRegionLocation; 027import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; 028import org.apache.hadoop.hbase.util.Pair; 029import org.apache.hadoop.hbase.wal.WAL.Entry; 030import org.apache.yetus.audience.InterfaceAudience; 031 032import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 033 034import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 035import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 036 037/** 038 * For replicating edits to secondary replicas. 039 */ 040@InterfaceAudience.Private 041public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller<Void> { 042 043 private final RegionInfo replica; 044 045 private final Entry[] entries; 046 047 // whether to use replay instead of replicateToReplica, during rolling upgrading if the target 048 // region server has not been upgraded then it will not have the replicateToReplica method, so we 049 // could use replay method first, though it is not perfect. 050 private boolean useReplay; 051 052 public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer, 053 AsyncClusterConnectionImpl conn, int maxAttempts, long rpcTimeoutNs, long operationTimeoutNs, 054 RegionInfo replica, List<Entry> entries) { 055 super(retryTimer, conn, ConnectionUtils.getPriority(replica.getTable()), 056 conn.connConf.getPauseNs(), conn.connConf.getPauseNsForServerOverloaded(), maxAttempts, 057 operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt()); 058 this.replica = replica; 059 this.entries = entries.toArray(new Entry[0]); 060 } 061 062 @Override 063 protected Throwable preProcessError(Throwable error) { 064 if ( 065 error instanceof DoNotRetryIOException 066 && error.getCause() instanceof UnsupportedOperationException 067 ) { 068 // fallback to use replay, and also return the cause to let the upper retry 069 useReplay = true; 070 return error.getCause(); 071 } 072 return error; 073 } 074 075 private void onComplete(HRegionLocation loc) { 076 if (controller.failed()) { 077 onError(controller.getFailed(), 078 () -> "Call to " + loc.getServerName() + " for " + replica + " failed", 079 err -> conn.getLocator().updateCachedLocationOnError(loc, err)); 080 } else { 081 future.complete(null); 082 } 083 } 084 085 private void call(HRegionLocation loc) { 086 AdminService.Interface stub; 087 try { 088 stub = conn.getAdminStub(loc.getServerName()); 089 } catch (IOException e) { 090 onError(e, 091 () -> "Get async admin stub to " + loc.getServerName() + " for " + replica + " failed", 092 err -> conn.getLocator().updateCachedLocationOnError(loc, err)); 093 return; 094 } 095 Pair<ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtobufUtil 096 .buildReplicateWALEntryRequest(entries, replica.getEncodedNameAsBytes(), null, null, null); 097 resetCallTimeout(); 098 controller.setCellScanner(pair.getSecond()); 099 if (useReplay) { 100 stub.replay(controller, pair.getFirst(), r -> onComplete(loc)); 101 } else { 102 stub.replicateToReplica(controller, pair.getFirst(), r -> onComplete(loc)); 103 } 104 } 105 106 @Override 107 protected void doCall() { 108 long locateTimeoutNs; 109 if (operationTimeoutNs > 0) { 110 locateTimeoutNs = remainingTimeNs(); 111 if (locateTimeoutNs <= 0) { 112 completeExceptionally(); 113 return; 114 } 115 } else { 116 locateTimeoutNs = -1L; 117 } 118 addListener(conn.getLocator().getRegionLocation(replica.getTable(), replica.getStartKey(), 119 replica.getReplicaId(), RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> { 120 if (error != null) { 121 onError(error, () -> "Locate " + replica + " failed", err -> { 122 }); 123 return; 124 } 125 call(loc); 126 }); 127 } 128}