001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 021import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 022 023import java.util.concurrent.CompletableFuture; 024import java.util.concurrent.TimeUnit; 025import java.util.function.Supplier; 026import org.apache.hadoop.hbase.HRegionLocation; 027import org.apache.hadoop.hbase.RegionLocations; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.hbase.util.FutureUtils; 033import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 034import org.apache.hbase.thirdparty.io.netty.util.Timeout; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * The asynchronous region locator. 041 */ 042@InterfaceAudience.Private 043class AsyncRegionLocator { 044 045 private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocator.class); 046 047 private final HashedWheelTimer retryTimer; 048 049 private final AsyncConnectionImpl conn; 050 051 private final AsyncMetaRegionLocator metaRegionLocator; 052 053 private final AsyncNonMetaRegionLocator nonMetaRegionLocator; 054 055 AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) { 056 this.conn = conn; 057 this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry); 058 this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn); 059 this.retryTimer = retryTimer; 060 } 061 062 private <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutNs, 063 Supplier<String> timeoutMsg) { 064 if (future.isDone() || timeoutNs <= 0) { 065 return future; 066 } 067 Timeout timeoutTask = retryTimer.newTimeout(t -> { 068 if (future.isDone()) { 069 return; 070 } 071 future.completeExceptionally(new TimeoutIOException(timeoutMsg.get())); 072 }, timeoutNs, TimeUnit.NANOSECONDS); 073 FutureUtils.addListener(future, (loc, error) -> { 074 if (error != null && error.getClass() != TimeoutIOException.class) { 075 // cancel timeout task if we are not completed by it. 076 timeoutTask.cancel(); 077 } 078 }); 079 return future; 080 } 081 082 private boolean isMeta(TableName tableName) { 083 return TableName.isMetaTableName(tableName); 084 } 085 086 CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row, 087 RegionLocateType type, boolean reload, long timeoutNs) { 088 CompletableFuture<RegionLocations> future = isMeta(tableName) 089 ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) 090 : nonMetaRegionLocator.getRegionLocations(tableName, row, 091 RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload); 092 return withTimeout(future, timeoutNs, 093 () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + 094 "ms) waiting for region locations for " + tableName + ", row='" + 095 Bytes.toStringBinary(row) + "'"); 096 } 097 098 CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, 099 int replicaId, RegionLocateType type, boolean reload, long timeoutNs) { 100 // meta region can not be split right now so we always call the same method. 101 // Change it later if the meta table can have more than one regions. 102 CompletableFuture<HRegionLocation> future = new CompletableFuture<>(); 103 CompletableFuture<RegionLocations> locsFuture = 104 isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload) 105 : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload); 106 addListener(locsFuture, (locs, error) -> { 107 if (error != null) { 108 future.completeExceptionally(error); 109 return; 110 } 111 HRegionLocation loc = locs.getRegionLocation(replicaId); 112 if (loc == null) { 113 future.completeExceptionally( 114 new RegionOfflineException("No location for " + tableName + ", row='" + 115 Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId)); 116 } else if (loc.getServerName() == null) { 117 future.completeExceptionally( 118 new RegionOfflineException("No server address listed for region '" + 119 loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) + 120 "', locateType=" + type + ", replicaId=" + replicaId)); 121 } else { 122 future.complete(loc); 123 } 124 }); 125 return withTimeout(future, timeoutNs, 126 () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + 127 "ms) waiting for region location for " + tableName + ", row='" + Bytes.toStringBinary(row) + 128 "', replicaId=" + replicaId); 129 } 130 131 CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, 132 int replicaId, RegionLocateType type, long timeoutNs) { 133 return getRegionLocation(tableName, row, replicaId, type, false, timeoutNs); 134 } 135 136 CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, 137 RegionLocateType type, boolean reload, long timeoutNs) { 138 return getRegionLocation(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload, 139 timeoutNs); 140 } 141 142 CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, 143 RegionLocateType type, long timeoutNs) { 144 return getRegionLocation(tableName, row, type, false, timeoutNs); 145 } 146 147 void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { 148 if (loc.getRegion().isMetaRegion()) { 149 metaRegionLocator.updateCachedLocationOnError(loc, exception); 150 } else { 151 nonMetaRegionLocator.updateCachedLocationOnError(loc, exception); 152 } 153 } 154 155 void clearCache(TableName tableName) { 156 LOG.debug("Clear meta cache for {}", tableName); 157 if (tableName.equals(META_TABLE_NAME)) { 158 metaRegionLocator.clearCache(); 159 } else { 160 nonMetaRegionLocator.clearCache(tableName); 161 } 162 } 163 164 void clearCache(ServerName serverName) { 165 LOG.debug("Clear meta cache for {}", serverName); 166 metaRegionLocator.clearCache(serverName); 167 nonMetaRegionLocator.clearCache(serverName); 168 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer); 169 } 170 171 void clearCache() { 172 metaRegionLocator.clearCache(); 173 nonMetaRegionLocator.clearCache(); 174 } 175 176 AsyncNonMetaRegionLocator getNonMetaRegionLocator() { 177 return nonMetaRegionLocator; 178 } 179}