001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.util.Collections; 024import java.util.List; 025import java.util.Optional; 026import org.apache.hadoop.hbase.DoNotRetryIOException; 027import org.apache.hadoop.hbase.ExtendedCellScanner; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.HRegionLocation; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; 032import org.apache.hadoop.hbase.util.Pair; 033import org.apache.hadoop.hbase.wal.WAL.Entry; 034import org.apache.yetus.audience.InterfaceAudience; 035 036import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 037 038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 040 041/** 042 * For replicating edits to secondary replicas. 043 */ 044@InterfaceAudience.Private 045public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller<Void> { 046 047 private final RegionInfo replica; 048 049 private final Entry[] entries; 050 051 private final Optional<TableName> tableName; 052 053 // whether to use replay instead of replicateToReplica, during rolling upgrading if the target 054 // region server has not been upgraded then it will not have the replicateToReplica method, so we 055 // could use replay method first, though it is not perfect. 056 private boolean useReplay; 057 058 public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer, 059 AsyncClusterConnectionImpl conn, int maxAttempts, long rpcTimeoutNs, long operationTimeoutNs, 060 RegionInfo replica, List<Entry> entries) { 061 super(retryTimer, conn, HConstants.PRIORITY_UNSET, conn.connConf.getPauseNs(), 062 conn.connConf.getPauseNsForServerOverloaded(), maxAttempts, operationTimeoutNs, rpcTimeoutNs, 063 conn.connConf.getStartLogErrorsCnt(), Collections.emptyMap()); 064 this.replica = replica; 065 this.tableName = Optional.of(replica.getTable()); 066 this.entries = entries.toArray(new Entry[0]); 067 } 068 069 @Override 070 protected Optional<TableName> getTableName() { 071 return tableName; 072 } 073 074 @Override 075 protected Throwable preProcessError(Throwable error) { 076 if ( 077 error instanceof DoNotRetryIOException 078 && error.getCause() instanceof UnsupportedOperationException 079 ) { 080 // fallback to use replay, and also return the cause to let the upper retry 081 useReplay = true; 082 return error.getCause(); 083 } 084 return error; 085 } 086 087 private void onComplete(HRegionLocation loc) { 088 if (controller.failed()) { 089 onError(controller.getFailed(), 090 () -> "Call to " + loc.getServerName() + " for " + replica + " failed", 091 err -> conn.getLocator().updateCachedLocationOnError(loc, err)); 092 } else { 093 future.complete(null); 094 } 095 } 096 097 private void call(HRegionLocation loc) { 098 AdminService.Interface stub; 099 try { 100 stub = conn.getAdminStub(loc.getServerName()); 101 } catch (IOException e) { 102 onError(e, 103 () -> "Get async admin stub to " + loc.getServerName() + " for " + replica + " failed", 104 err -> conn.getLocator().updateCachedLocationOnError(loc, err)); 105 return; 106 } 107 Pair<ReplicateWALEntryRequest, ExtendedCellScanner> pair = ReplicationProtobufUtil 108 .buildReplicateWALEntryRequest(entries, replica.getEncodedNameAsBytes(), null, null, null); 109 resetCallTimeout(); 110 controller.setCellScanner(pair.getSecond()); 111 if (useReplay) { 112 stub.replay(controller, pair.getFirst(), r -> onComplete(loc)); 113 } else { 114 stub.replicateToReplica(controller, pair.getFirst(), r -> onComplete(loc)); 115 } 116 } 117 118 @Override 119 protected void doCall() { 120 long locateTimeoutNs; 121 if (operationTimeoutNs > 0) { 122 locateTimeoutNs = remainingTimeNs(); 123 if (locateTimeoutNs <= 0) { 124 completeExceptionally(); 125 return; 126 } 127 } else { 128 locateTimeoutNs = -1L; 129 } 130 addListener(conn.getLocator().getRegionLocation(replica.getTable(), replica.getStartKey(), 131 replica.getReplicaId(), RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> { 132 if (error != null) { 133 onError(error, () -> "Locate " + replica + " failed", err -> { 134 }); 135 return; 136 } 137 call(loc); 138 }); 139 } 140}