001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 021import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 022 023import java.util.concurrent.CompletableFuture; 024import java.util.concurrent.TimeUnit; 025import java.util.function.Supplier; 026import org.apache.hadoop.hbase.HRegionLocation; 027import org.apache.hadoop.hbase.RegionLocations; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.hbase.util.FutureUtils; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 038import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 039import org.apache.hbase.thirdparty.io.netty.util.Timeout; 040 041/** 042 * The asynchronous region locator. 043 */ 044@InterfaceAudience.Private 045class AsyncRegionLocator { 046 047 private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocator.class); 048 049 private final HashedWheelTimer retryTimer; 050 051 private final AsyncConnectionImpl conn; 052 053 private final AsyncMetaRegionLocator metaRegionLocator; 054 055 private final AsyncNonMetaRegionLocator nonMetaRegionLocator; 056 057 AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) { 058 this.conn = conn; 059 this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry); 060 this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn); 061 this.retryTimer = retryTimer; 062 } 063 064 private <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutNs, 065 Supplier<String> timeoutMsg) { 066 if (future.isDone() || timeoutNs <= 0) { 067 return future; 068 } 069 Timeout timeoutTask = retryTimer.newTimeout(t -> { 070 if (future.isDone()) { 071 return; 072 } 073 future.completeExceptionally(new TimeoutIOException(timeoutMsg.get())); 074 }, timeoutNs, TimeUnit.NANOSECONDS); 075 FutureUtils.addListener(future, (loc, error) -> { 076 if (error != null && error.getClass() != TimeoutIOException.class) { 077 // cancel timeout task if we are not completed by it. 078 timeoutTask.cancel(); 079 } 080 }); 081 return future; 082 } 083 084 private boolean isMeta(TableName tableName) { 085 return TableName.isMetaTableName(tableName); 086 } 087 088 CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row, 089 RegionLocateType type, boolean reload, long timeoutNs) { 090 CompletableFuture<RegionLocations> future = isMeta(tableName) 091 ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) 092 : nonMetaRegionLocator.getRegionLocations(tableName, row, 093 RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload); 094 return withTimeout(future, timeoutNs, 095 () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + 096 "ms) waiting for region locations for " + tableName + ", row='" + 097 Bytes.toStringBinary(row) + "'"); 098 } 099 100 CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, 101 int replicaId, RegionLocateType type, boolean reload, long timeoutNs) { 102 // meta region can not be split right now so we always call the same method. 103 // Change it later if the meta table can have more than one regions. 104 CompletableFuture<HRegionLocation> future = new CompletableFuture<>(); 105 CompletableFuture<RegionLocations> locsFuture = 106 isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload) 107 : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload); 108 addListener(locsFuture, (locs, error) -> { 109 if (error != null) { 110 future.completeExceptionally(error); 111 return; 112 } 113 HRegionLocation loc = locs.getRegionLocation(replicaId); 114 if (loc == null) { 115 future.completeExceptionally( 116 new RegionOfflineException("No location for " + tableName + ", row='" + 117 Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId)); 118 } else if (loc.getServerName() == null) { 119 future.completeExceptionally( 120 new RegionOfflineException("No server address listed for region '" + 121 loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) + 122 "', locateType=" + type + ", replicaId=" + replicaId)); 123 } else { 124 future.complete(loc); 125 } 126 }); 127 return withTimeout(future, timeoutNs, 128 () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + 129 "ms) waiting for region location for " + tableName + ", row='" + Bytes.toStringBinary(row) + 130 "', replicaId=" + replicaId); 131 } 132 133 CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, 134 int replicaId, RegionLocateType type, long timeoutNs) { 135 return getRegionLocation(tableName, row, replicaId, type, false, timeoutNs); 136 } 137 138 CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, 139 RegionLocateType type, boolean reload, long timeoutNs) { 140 return getRegionLocation(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload, 141 timeoutNs); 142 } 143 144 CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, 145 RegionLocateType type, long timeoutNs) { 146 return getRegionLocation(tableName, row, type, false, timeoutNs); 147 } 148 149 void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { 150 if (loc.getRegion().isMetaRegion()) { 151 metaRegionLocator.updateCachedLocationOnError(loc, exception); 152 } else { 153 nonMetaRegionLocator.updateCachedLocationOnError(loc, exception); 154 } 155 } 156 157 void clearCache(TableName tableName) { 158 LOG.debug("Clear meta cache for {}", tableName); 159 if (tableName.equals(META_TABLE_NAME)) { 160 metaRegionLocator.clearCache(); 161 } else { 162 nonMetaRegionLocator.clearCache(tableName); 163 } 164 } 165 166 void clearCache(ServerName serverName) { 167 LOG.debug("Clear meta cache for {}", serverName); 168 metaRegionLocator.clearCache(serverName); 169 nonMetaRegionLocator.clearCache(serverName); 170 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer); 171 } 172 173 void clearCache() { 174 metaRegionLocator.clearCache(); 175 nonMetaRegionLocator.clearCache(); 176 } 177 178 @VisibleForTesting 179 AsyncNonMetaRegionLocator getNonMetaRegionLocator() { 180 return nonMetaRegionLocator; 181 } 182}