001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 021import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY; 022import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY; 023import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 024 025import io.opentelemetry.api.trace.Span; 026import io.opentelemetry.api.trace.StatusCode; 027import io.opentelemetry.context.Scope; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.List; 031import java.util.Objects; 032import java.util.Optional; 033import java.util.concurrent.CompletableFuture; 034import java.util.concurrent.TimeUnit; 035import java.util.function.Function; 036import java.util.function.Supplier; 037import java.util.stream.Collectors; 038import org.apache.hadoop.hbase.HRegionLocation; 039import org.apache.hadoop.hbase.RegionLocations; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.trace.ConnectionSpanBuilder; 043import org.apache.hadoop.hbase.client.trace.TableSpanBuilder; 044import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 045import org.apache.hadoop.hbase.trace.TraceUtil; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.FutureUtils; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 053import org.apache.hbase.thirdparty.io.netty.util.Timeout; 054 055/** 056 * The asynchronous region locator. 057 */ 058@InterfaceAudience.Private 059class AsyncRegionLocator { 060 061 private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocator.class); 062 063 private final HashedWheelTimer retryTimer; 064 065 private final AsyncConnectionImpl conn; 066 067 private final AsyncMetaRegionLocator metaRegionLocator; 068 069 private final AsyncNonMetaRegionLocator nonMetaRegionLocator; 070 071 AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) { 072 this.conn = conn; 073 this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry); 074 this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn); 075 this.retryTimer = retryTimer; 076 } 077 078 private <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutNs, 079 Supplier<String> timeoutMsg) { 080 if (future.isDone() || timeoutNs <= 0) { 081 return future; 082 } 083 Timeout timeoutTask = retryTimer.newTimeout(t -> { 084 if (future.isDone()) { 085 return; 086 } 087 future.completeExceptionally(new TimeoutIOException(timeoutMsg.get())); 088 }, timeoutNs, TimeUnit.NANOSECONDS); 089 FutureUtils.addListener(future, (loc, error) -> { 090 if (error != null && error.getClass() != TimeoutIOException.class) { 091 // cancel timeout task if we are not completed by it. 092 timeoutTask.cancel(); 093 } 094 }); 095 return future; 096 } 097 098 private boolean isMeta(TableName tableName) { 099 return TableName.isMetaTableName(tableName); 100 } 101 102 private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture<T>> action, 103 Function<T, List<String>> getRegionNames, Supplier<Span> spanSupplier) { 104 final Span span = spanSupplier.get(); 105 try (Scope scope = span.makeCurrent()) { 106 CompletableFuture<T> future = action.get(); 107 FutureUtils.addListener(future, (resp, error) -> { 108 if (error != null) { 109 TraceUtil.setError(span, error); 110 } else { 111 List<String> regionNames = getRegionNames.apply(resp); 112 if (!regionNames.isEmpty()) { 113 span.setAttribute(REGION_NAMES_KEY, regionNames); 114 } 115 span.setStatus(StatusCode.OK); 116 } 117 span.end(); 118 }); 119 return future; 120 } 121 } 122 123 private static List<String> getRegionNames(RegionLocations locs) { 124 if (locs == null || locs.getRegionLocations() == null) { 125 return Collections.emptyList(); 126 } 127 return Arrays.stream(locs.getRegionLocations()).filter(Objects::nonNull) 128 .map(HRegionLocation::getRegion).map(RegionInfo::getRegionNameAsString) 129 .collect(Collectors.toList()); 130 } 131 132 private static List<String> getRegionNames(HRegionLocation location) { 133 return Optional.ofNullable(location).map(HRegionLocation::getRegion) 134 .map(RegionInfo::getRegionNameAsString).map(Collections::singletonList) 135 .orElseGet(Collections::emptyList); 136 } 137 138 CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row, 139 RegionLocateType type, boolean reload, long timeoutNs) { 140 final Supplier<Span> supplier = new TableSpanBuilder(conn) 141 .setName("AsyncRegionLocator.getRegionLocations").setTableName(tableName); 142 return tracedLocationFuture(() -> { 143 CompletableFuture<RegionLocations> future = isMeta(tableName) 144 ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) 145 : nonMetaRegionLocator.getRegionLocations(tableName, row, 146 RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload); 147 return withTimeout(future, timeoutNs, 148 () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) 149 + "ms) waiting for region locations for " + tableName + ", row='" 150 + Bytes.toStringBinary(row) + "'"); 151 }, AsyncRegionLocator::getRegionNames, supplier); 152 } 153 154 CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, 155 int replicaId, RegionLocateType type, boolean reload, long timeoutNs) { 156 final Supplier<Span> supplier = new TableSpanBuilder(conn) 157 .setName("AsyncRegionLocator.getRegionLocation").setTableName(tableName); 158 return tracedLocationFuture(() -> { 159 // meta region can not be split right now so we always call the same method. 160 // Change it later if the meta table can have more than one regions. 161 CompletableFuture<HRegionLocation> future = new CompletableFuture<>(); 162 CompletableFuture<RegionLocations> locsFuture = isMeta(tableName) 163 ? metaRegionLocator.getRegionLocations(replicaId, reload) 164 : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload); 165 addListener(locsFuture, (locs, error) -> { 166 if (error != null) { 167 future.completeExceptionally(error); 168 return; 169 } 170 HRegionLocation loc = locs.getRegionLocation(replicaId); 171 if (loc == null) { 172 future.completeExceptionally( 173 new RegionOfflineException("No location for " + tableName + ", row='" 174 + Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId)); 175 } else if (loc.getServerName() == null) { 176 future.completeExceptionally( 177 new RegionOfflineException("No server address listed for region '" 178 + loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) 179 + "', locateType=" + type + ", replicaId=" + replicaId)); 180 } else { 181 future.complete(loc); 182 } 183 }); 184 return withTimeout(future, timeoutNs, 185 () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) 186 + "ms) waiting for region location for " + tableName + ", row='" 187 + Bytes.toStringBinary(row) + "', replicaId=" + replicaId); 188 }, AsyncRegionLocator::getRegionNames, supplier); 189 } 190 191 CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, 192 int replicaId, RegionLocateType type, long timeoutNs) { 193 return getRegionLocation(tableName, row, replicaId, type, false, timeoutNs); 194 } 195 196 CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, 197 RegionLocateType type, boolean reload, long timeoutNs) { 198 return getRegionLocation(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload, 199 timeoutNs); 200 } 201 202 CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, 203 RegionLocateType type, long timeoutNs) { 204 return getRegionLocation(tableName, row, type, false, timeoutNs); 205 } 206 207 void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { 208 if (loc.getRegion().isMetaRegion()) { 209 metaRegionLocator.updateCachedLocationOnError(loc, exception); 210 } else { 211 nonMetaRegionLocator.updateCachedLocationOnError(loc, exception); 212 } 213 } 214 215 void clearCache(TableName tableName) { 216 Supplier<Span> supplier = 217 new TableSpanBuilder(conn).setName("AsyncRegionLocator.clearCache").setTableName(tableName); 218 TraceUtil.trace(() -> { 219 LOG.debug("Clear meta cache for {}", tableName); 220 if (tableName.equals(META_TABLE_NAME)) { 221 metaRegionLocator.clearCache(); 222 } else { 223 nonMetaRegionLocator.clearCache(tableName); 224 } 225 }, supplier); 226 } 227 228 void clearCache(ServerName serverName) { 229 Supplier<Span> supplier = 230 new ConnectionSpanBuilder(conn).setName("AsyncRegionLocator.clearCache") 231 .addAttribute(SERVER_NAME_KEY, serverName.getServerName()); 232 TraceUtil.trace(() -> { 233 LOG.debug("Clear meta cache for {}", serverName); 234 metaRegionLocator.clearCache(serverName); 235 nonMetaRegionLocator.clearCache(serverName); 236 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer); 237 }, supplier); 238 } 239 240 void clearCache() { 241 Supplier<Span> supplier = 242 new ConnectionSpanBuilder(conn).setName("AsyncRegionLocator.clearCache"); 243 TraceUtil.trace(() -> { 244 metaRegionLocator.clearCache(); 245 nonMetaRegionLocator.clearCache(); 246 }, supplier); 247 } 248 249 AsyncNonMetaRegionLocator getNonMetaRegionLocator() { 250 return nonMetaRegionLocator; 251 } 252 253 // only used for testing whether we have cached the location for a region. 254 RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) { 255 if (TableName.isMetaTableName(tableName)) { 256 return metaRegionLocator.getRegionLocationInCache(); 257 } else { 258 return nonMetaRegionLocator.getRegionLocationInCache(tableName, row); 259 } 260 } 261 262 // only used for testing whether we have cached the location for a table. 263 int getNumberOfCachedRegionLocations(TableName tableName) { 264 if (TableName.isMetaTableName(tableName)) { 265 return metaRegionLocator.getNumberOfCachedRegionLocations(); 266 } else { 267 return nonMetaRegionLocator.getNumberOfCachedRegionLocations(tableName); 268 } 269 } 270}