001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 021import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 022import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 023 024import com.google.protobuf.Message; 025import com.google.protobuf.RpcChannel; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.EnumSet; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.Optional; 035import java.util.Set; 036import java.util.concurrent.CompletableFuture; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicReference; 039import java.util.function.BiConsumer; 040import java.util.function.Function; 041import java.util.regex.Pattern; 042import java.util.stream.Collectors; 043import java.util.stream.Stream; 044import org.apache.commons.io.IOUtils; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.hbase.AsyncMetaTableAccessor; 047import org.apache.hadoop.hbase.CacheEvictionStats; 048import org.apache.hadoop.hbase.CacheEvictionStatsAggregator; 049import org.apache.hadoop.hbase.ClusterMetrics; 050import org.apache.hadoop.hbase.ClusterMetrics.Option; 051import org.apache.hadoop.hbase.ClusterMetricsBuilder; 052import org.apache.hadoop.hbase.HConstants; 053import org.apache.hadoop.hbase.HRegionLocation; 054import org.apache.hadoop.hbase.MetaTableAccessor; 055import org.apache.hadoop.hbase.MetaTableAccessor.QueryType; 056import org.apache.hadoop.hbase.NamespaceDescriptor; 057import org.apache.hadoop.hbase.RegionLocations; 058import org.apache.hadoop.hbase.RegionMetrics; 059import org.apache.hadoop.hbase.RegionMetricsBuilder; 060import org.apache.hadoop.hbase.ServerName; 061import org.apache.hadoop.hbase.TableExistsException; 062import org.apache.hadoop.hbase.TableName; 063import org.apache.hadoop.hbase.TableNotDisabledException; 064import org.apache.hadoop.hbase.TableNotEnabledException; 065import org.apache.hadoop.hbase.TableNotFoundException; 066import org.apache.hadoop.hbase.UnknownRegionException; 067import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; 068import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; 069import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; 070import org.apache.hadoop.hbase.client.Scan.ReadType; 071import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 072import org.apache.hadoop.hbase.client.replication.TableCFs; 073import org.apache.hadoop.hbase.client.security.SecurityCapability; 074import org.apache.hadoop.hbase.exceptions.DeserializationException; 075import org.apache.hadoop.hbase.ipc.HBaseRpcController; 076import org.apache.hadoop.hbase.quotas.QuotaFilter; 077import org.apache.hadoop.hbase.quotas.QuotaSettings; 078import org.apache.hadoop.hbase.quotas.QuotaTableUtil; 079import org.apache.hadoop.hbase.replication.ReplicationException; 080import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 081import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 082import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 083import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 084import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 085import org.apache.hadoop.hbase.util.Bytes; 086import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 087import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 088import org.apache.yetus.audience.InterfaceAudience; 089import org.slf4j.Logger; 090import org.slf4j.LoggerFactory; 091 092import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 093import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 094import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 095import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 096import org.apache.hbase.thirdparty.io.netty.util.Timeout; 097import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 098 099import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 100import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 107import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 108import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 109import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 110import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 111import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 112import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 117import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 261import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 262 263/** 264 * The implementation of AsyncAdmin. 265 * <p> 266 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 267 * be finished inside the rpc framework thread, which means that the callbacks registered to the 268 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 269 * this class should not try to do time consuming tasks in the callbacks. 270 * @since 2.0.0 271 * @see AsyncHBaseAdmin 272 * @see AsyncConnection#getAdmin() 273 * @see AsyncConnection#getAdminBuilder() 274 */ 275@InterfaceAudience.Private 276class RawAsyncHBaseAdmin implements AsyncAdmin { 277 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; 278 279 private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class); 280 281 private final AsyncConnectionImpl connection; 282 283 private final HashedWheelTimer retryTimer; 284 285 private final AsyncTable<AdvancedScanResultConsumer> metaTable; 286 287 private final long rpcTimeoutNs; 288 289 private final long operationTimeoutNs; 290 291 private final long pauseNs; 292 293 private final int maxAttempts; 294 295 private final int startLogErrorsCnt; 296 297 private final NonceGenerator ng; 298 299 RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer, 300 AsyncAdminBuilderBase builder) { 301 this.connection = connection; 302 this.retryTimer = retryTimer; 303 this.metaTable = connection.getTable(META_TABLE_NAME); 304 this.rpcTimeoutNs = builder.rpcTimeoutNs; 305 this.operationTimeoutNs = builder.operationTimeoutNs; 306 this.pauseNs = builder.pauseNs; 307 this.maxAttempts = builder.maxAttempts; 308 this.startLogErrorsCnt = builder.startLogErrorsCnt; 309 this.ng = connection.getNonceGenerator(); 310 } 311 312 private <T> MasterRequestCallerBuilder<T> newMasterCaller() { 313 return this.connection.callerFactory.<T> masterRequest() 314 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 315 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 316 .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) 317 .startLogErrorsCnt(startLogErrorsCnt); 318 } 319 320 private <T> AdminRequestCallerBuilder<T> newAdminCaller() { 321 return this.connection.callerFactory.<T> adminRequest() 322 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 323 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 324 .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) 325 .startLogErrorsCnt(startLogErrorsCnt); 326 } 327 328 @FunctionalInterface 329 private interface MasterRpcCall<RESP, REQ> { 330 void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, 331 RpcCallback<RESP> done); 332 } 333 334 @FunctionalInterface 335 private interface AdminRpcCall<RESP, REQ> { 336 void call(AdminService.Interface stub, HBaseRpcController controller, REQ req, 337 RpcCallback<RESP> done); 338 } 339 340 @FunctionalInterface 341 private interface Converter<D, S> { 342 D convert(S src) throws IOException; 343 } 344 345 private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 346 MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, 347 Converter<RESP, PRESP> respConverter) { 348 CompletableFuture<RESP> future = new CompletableFuture<>(); 349 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 350 351 @Override 352 public void run(PRESP resp) { 353 if (controller.failed()) { 354 future.completeExceptionally(controller.getFailed()); 355 } else { 356 try { 357 future.complete(respConverter.convert(resp)); 358 } catch (IOException e) { 359 future.completeExceptionally(e); 360 } 361 } 362 } 363 }); 364 return future; 365 } 366 367 private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller, 368 AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall, 369 Converter<RESP, PRESP> respConverter) { 370 371 CompletableFuture<RESP> future = new CompletableFuture<>(); 372 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 373 374 @Override 375 public void run(PRESP resp) { 376 if (controller.failed()) { 377 future.completeExceptionally(new IOException(controller.errorText())); 378 } else { 379 try { 380 future.complete(respConverter.convert(resp)); 381 } catch (IOException e) { 382 future.completeExceptionally(e); 383 } 384 } 385 } 386 }); 387 return future; 388 } 389 390 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, 391 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 392 ProcedureBiConsumer consumer) { 393 CompletableFuture<Long> procFuture = 394 this.<Long> newMasterCaller().action((controller, stub) -> this 395 .<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)).call(); 396 CompletableFuture<Void> future = waitProcedureResult(procFuture); 397 addListener(future, consumer); 398 return future; 399 } 400 401 @FunctionalInterface 402 private interface TableOperator { 403 CompletableFuture<Void> operate(TableName table); 404 } 405 406 @Override 407 public CompletableFuture<Boolean> tableExists(TableName tableName) { 408 if (TableName.isMetaTableName(tableName)) { 409 return CompletableFuture.completedFuture(true); 410 } 411 return AsyncMetaTableAccessor.tableExists(metaTable, tableName); 412 } 413 414 @Override 415 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) { 416 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null, 417 includeSysTables)); 418 } 419 420 /** 421 * {@link #listTables(boolean)} 422 */ 423 @Override 424 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern, 425 boolean includeSysTables) { 426 Preconditions.checkNotNull(pattern, 427 "pattern is null. If you don't specify a pattern, use listTables(boolean) instead"); 428 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern, 429 includeSysTables)); 430 } 431 432 private CompletableFuture<List<TableDescriptor>> 433 getTableDescriptors(GetTableDescriptorsRequest request) { 434 return this.<List<TableDescriptor>> newMasterCaller() 435 .action((controller, stub) -> this 436 .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call( 437 controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done), 438 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 439 .call(); 440 } 441 442 @Override 443 public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) { 444 return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables)); 445 } 446 447 @Override 448 public CompletableFuture<List<TableName>> 449 listTableNames(Pattern pattern, boolean includeSysTables) { 450 Preconditions.checkNotNull(pattern, 451 "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead"); 452 return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables)); 453 } 454 455 private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) { 456 return this 457 .<List<TableName>> newMasterCaller() 458 .action( 459 (controller, stub) -> this 460 .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller, 461 stub, request, (s, c, req, done) -> s.getTableNames(c, req, done), 462 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call(); 463 } 464 465 @Override 466 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) { 467 return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this 468 .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse, 469 List<TableDescriptor>> call( 470 controller, stub, 471 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 472 (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done), 473 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 474 .call(); 475 } 476 477 @Override 478 public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) { 479 return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this 480 .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse, 481 List<TableName>> call( 482 controller, stub, 483 ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 484 (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done), 485 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList()))) 486 .call(); 487 } 488 489 @Override 490 public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) { 491 CompletableFuture<TableDescriptor> future = new CompletableFuture<>(); 492 addListener(this.<List<TableSchema>> newMasterCaller() 493 .action((controller, stub) -> this 494 .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call( 495 controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), 496 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 497 (resp) -> resp.getTableSchemaList())) 498 .call(), (tableSchemas, error) -> { 499 if (error != null) { 500 future.completeExceptionally(error); 501 return; 502 } 503 if (!tableSchemas.isEmpty()) { 504 future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0))); 505 } else { 506 future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); 507 } 508 }); 509 return future; 510 } 511 512 @Override 513 public CompletableFuture<Void> createTable(TableDescriptor desc) { 514 return createTable(desc.getTableName(), 515 RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce())); 516 } 517 518 @Override 519 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, 520 int numRegions) { 521 try { 522 return createTable(desc, getSplitKeys(startKey, endKey, numRegions)); 523 } catch (IllegalArgumentException e) { 524 return failedFuture(e); 525 } 526 } 527 528 @Override 529 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) { 530 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 531 + " use createTable(TableDescriptor) instead"); 532 try { 533 verifySplitKeys(splitKeys); 534 return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc, 535 splitKeys, ng.getNonceGroup(), ng.newNonce())); 536 } catch (IllegalArgumentException e) { 537 return failedFuture(e); 538 } 539 } 540 541 private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) { 542 Preconditions.checkNotNull(tableName, "table name is null"); 543 return this.<CreateTableRequest, CreateTableResponse> procedureCall(request, 544 (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), 545 new CreateTableProcedureBiConsumer(tableName)); 546 } 547 548 @Override 549 public CompletableFuture<Void> modifyTable(TableDescriptor desc) { 550 return this.<ModifyTableRequest, ModifyTableResponse> procedureCall( 551 RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), 552 ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done), 553 (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName())); 554 } 555 556 @Override 557 public CompletableFuture<Void> deleteTable(TableName tableName) { 558 return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter 559 .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 560 (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), 561 new DeleteTableProcedureBiConsumer(tableName)); 562 } 563 564 @Override 565 public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) { 566 return this.<TruncateTableRequest, TruncateTableResponse> procedureCall( 567 RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), 568 ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done), 569 (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName)); 570 } 571 572 @Override 573 public CompletableFuture<Void> enableTable(TableName tableName) { 574 return this.<EnableTableRequest, EnableTableResponse> procedureCall(RequestConverter 575 .buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 576 (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), 577 new EnableTableProcedureBiConsumer(tableName)); 578 } 579 580 @Override 581 public CompletableFuture<Void> disableTable(TableName tableName) { 582 return this.<DisableTableRequest, DisableTableResponse> procedureCall(RequestConverter 583 .buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 584 (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), 585 new DisableTableProcedureBiConsumer(tableName)); 586 } 587 588 @Override 589 public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { 590 if (TableName.isMetaTableName(tableName)) { 591 return CompletableFuture.completedFuture(true); 592 } 593 CompletableFuture<Boolean> future = new CompletableFuture<>(); 594 addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> { 595 if (error != null) { 596 future.completeExceptionally(error); 597 return; 598 } 599 if (state.isPresent()) { 600 future.complete(state.get().inStates(TableState.State.ENABLED)); 601 } else { 602 future.completeExceptionally(new TableNotFoundException(tableName)); 603 } 604 }); 605 return future; 606 } 607 608 @Override 609 public CompletableFuture<Boolean> isTableDisabled(TableName tableName) { 610 if (TableName.isMetaTableName(tableName)) { 611 return CompletableFuture.completedFuture(false); 612 } 613 CompletableFuture<Boolean> future = new CompletableFuture<>(); 614 addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> { 615 if (error != null) { 616 future.completeExceptionally(error); 617 return; 618 } 619 if (state.isPresent()) { 620 future.complete(state.get().inStates(TableState.State.DISABLED)); 621 } else { 622 future.completeExceptionally(new TableNotFoundException(tableName)); 623 } 624 }); 625 return future; 626 } 627 628 @Override 629 public CompletableFuture<Boolean> isTableAvailable(TableName tableName) { 630 return isTableAvailable(tableName, Optional.empty()); 631 } 632 633 @Override 634 public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) { 635 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 636 + " use isTableAvailable(TableName) instead"); 637 return isTableAvailable(tableName, Optional.of(splitKeys)); 638 } 639 640 private CompletableFuture<Boolean> isTableAvailable(TableName tableName, 641 Optional<byte[][]> splitKeys) { 642 if (TableName.isMetaTableName(tableName)) { 643 return connection.registry.getMetaRegionLocation().thenApply(locs -> Stream 644 .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null)); 645 } 646 CompletableFuture<Boolean> future = new CompletableFuture<>(); 647 addListener(isTableEnabled(tableName), (enabled, error) -> { 648 if (error != null) { 649 future.completeExceptionally(error); 650 return; 651 } 652 if (!enabled) { 653 future.complete(false); 654 } else { 655 addListener( 656 AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)), 657 (locations, error1) -> { 658 if (error1 != null) { 659 future.completeExceptionally(error1); 660 return; 661 } 662 List<HRegionLocation> notDeployedRegions = locations.stream() 663 .filter(loc -> loc.getServerName() == null).collect(Collectors.toList()); 664 if (notDeployedRegions.size() > 0) { 665 if (LOG.isDebugEnabled()) { 666 LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions"); 667 } 668 future.complete(false); 669 return; 670 } 671 672 Optional<Boolean> available = 673 splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys)); 674 future.complete(available.orElse(true)); 675 }); 676 } 677 }); 678 return future; 679 } 680 681 private boolean compareRegionsWithSplitKeys(List<HRegionLocation> locations, byte[][] splitKeys) { 682 int regionCount = 0; 683 for (HRegionLocation location : locations) { 684 RegionInfo info = location.getRegion(); 685 if (Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { 686 regionCount++; 687 continue; 688 } 689 for (byte[] splitKey : splitKeys) { 690 // Just check if the splitkey is available 691 if (Bytes.equals(info.getStartKey(), splitKey)) { 692 regionCount++; 693 break; 694 } 695 } 696 } 697 return regionCount == splitKeys.length + 1; 698 } 699 700 @Override 701 public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) { 702 return this.<AddColumnRequest, AddColumnResponse> procedureCall( 703 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 704 ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), 705 new AddColumnFamilyProcedureBiConsumer(tableName)); 706 } 707 708 @Override 709 public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) { 710 return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall( 711 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 712 ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done), 713 (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName)); 714 } 715 716 @Override 717 public CompletableFuture<Void> modifyColumnFamily(TableName tableName, 718 ColumnFamilyDescriptor columnFamily) { 719 return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall( 720 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 721 ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done), 722 (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName)); 723 } 724 725 @Override 726 public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) { 727 return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall( 728 RequestConverter.buildCreateNamespaceRequest(descriptor), 729 (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(), 730 new CreateNamespaceProcedureBiConsumer(descriptor.getName())); 731 } 732 733 @Override 734 public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) { 735 return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall( 736 RequestConverter.buildModifyNamespaceRequest(descriptor), 737 (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(), 738 new ModifyNamespaceProcedureBiConsumer(descriptor.getName())); 739 } 740 741 @Override 742 public CompletableFuture<Void> deleteNamespace(String name) { 743 return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall( 744 RequestConverter.buildDeleteNamespaceRequest(name), 745 (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(), 746 new DeleteNamespaceProcedureBiConsumer(name)); 747 } 748 749 @Override 750 public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) { 751 return this 752 .<NamespaceDescriptor> newMasterCaller() 753 .action( 754 (controller, stub) -> this 755 .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call( 756 controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c, 757 req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil 758 .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call(); 759 } 760 761 @Override 762 public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() { 763 return this 764 .<List<NamespaceDescriptor>> newMasterCaller() 765 .action( 766 (controller, stub) -> this 767 .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call( 768 controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, 769 done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil 770 .toNamespaceDescriptorList(resp))).call(); 771 } 772 773 @Override 774 public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) { 775 return this.<List<RegionInfo>> newAdminCaller() 776 .action((controller, stub) -> this 777 .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall( 778 controller, stub, RequestConverter.buildGetOnlineRegionRequest(), 779 (s, c, req, done) -> s.getOnlineRegion(c, req, done), 780 resp -> ProtobufUtil.getRegionInfos(resp))) 781 .serverName(serverName).call(); 782 } 783 784 @Override 785 public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) { 786 if (tableName.equals(META_TABLE_NAME)) { 787 return connection.getLocator().getRegionLocation(tableName, null, null, operationTimeoutNs) 788 .thenApply(loc -> Collections.singletonList(loc.getRegion())); 789 } else { 790 return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)) 791 .thenApply( 792 locs -> locs.stream().map(loc -> loc.getRegion()).collect(Collectors.toList())); 793 } 794 } 795 796 @Override 797 public CompletableFuture<Void> flush(TableName tableName) { 798 CompletableFuture<Void> future = new CompletableFuture<>(); 799 addListener(tableExists(tableName), (exists, err) -> { 800 if (err != null) { 801 future.completeExceptionally(err); 802 } else if (!exists) { 803 future.completeExceptionally(new TableNotFoundException(tableName)); 804 } else { 805 addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { 806 if (err2 != null) { 807 future.completeExceptionally(err2); 808 } else if (!tableEnabled) { 809 future.completeExceptionally(new TableNotEnabledException(tableName)); 810 } else { 811 addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), 812 new HashMap<>()), (ret, err3) -> { 813 if (err3 != null) { 814 future.completeExceptionally(err3); 815 } else { 816 future.complete(ret); 817 } 818 }); 819 } 820 }); 821 } 822 }); 823 return future; 824 } 825 826 @Override 827 public CompletableFuture<Void> flushRegion(byte[] regionName) { 828 CompletableFuture<Void> future = new CompletableFuture<>(); 829 addListener(getRegionLocation(regionName), (location, err) -> { 830 if (err != null) { 831 future.completeExceptionally(err); 832 return; 833 } 834 ServerName serverName = location.getServerName(); 835 if (serverName == null) { 836 future 837 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 838 return; 839 } 840 addListener(flush(serverName, location.getRegion()), (ret, err2) -> { 841 if (err2 != null) { 842 future.completeExceptionally(err2); 843 } else { 844 future.complete(ret); 845 } 846 }); 847 }); 848 return future; 849 } 850 851 private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo) { 852 return this.<Void> newAdminCaller() 853 .serverName(serverName) 854 .action( 855 (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall( 856 controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo 857 .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done), 858 resp -> null)) 859 .call(); 860 } 861 862 @Override 863 public CompletableFuture<Void> flushRegionServer(ServerName sn) { 864 CompletableFuture<Void> future = new CompletableFuture<>(); 865 addListener(getRegions(sn), (hRegionInfos, err) -> { 866 if (err != null) { 867 future.completeExceptionally(err); 868 return; 869 } 870 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 871 if (hRegionInfos != null) { 872 hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region))); 873 } 874 addListener(CompletableFuture.allOf( 875 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 876 if (err2 != null) { 877 future.completeExceptionally(err2); 878 } else { 879 future.complete(ret); 880 } 881 }); 882 }); 883 return future; 884 } 885 886 @Override 887 public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { 888 return compact(tableName, null, false, compactType); 889 } 890 891 @Override 892 public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, 893 CompactType compactType) { 894 Preconditions.checkNotNull(columnFamily, "columnFamily is null. " 895 + "If you don't specify a columnFamily, use compact(TableName) instead"); 896 return compact(tableName, columnFamily, false, compactType); 897 } 898 899 @Override 900 public CompletableFuture<Void> compactRegion(byte[] regionName) { 901 return compactRegion(regionName, null, false); 902 } 903 904 @Override 905 public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) { 906 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 907 + " If you don't specify a columnFamily, use compactRegion(regionName) instead"); 908 return compactRegion(regionName, columnFamily, false); 909 } 910 911 @Override 912 public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) { 913 return compact(tableName, null, true, compactType); 914 } 915 916 @Override 917 public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, 918 CompactType compactType) { 919 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 920 + "If you don't specify a columnFamily, use compact(TableName) instead"); 921 return compact(tableName, columnFamily, true, compactType); 922 } 923 924 @Override 925 public CompletableFuture<Void> majorCompactRegion(byte[] regionName) { 926 return compactRegion(regionName, null, true); 927 } 928 929 @Override 930 public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) { 931 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 932 + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead"); 933 return compactRegion(regionName, columnFamily, true); 934 } 935 936 @Override 937 public CompletableFuture<Void> compactRegionServer(ServerName sn) { 938 return compactRegionServer(sn, false); 939 } 940 941 @Override 942 public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) { 943 return compactRegionServer(sn, true); 944 } 945 946 private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) { 947 CompletableFuture<Void> future = new CompletableFuture<>(); 948 addListener(getRegions(sn), (hRegionInfos, err) -> { 949 if (err != null) { 950 future.completeExceptionally(err); 951 return; 952 } 953 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 954 if (hRegionInfos != null) { 955 hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); 956 } 957 addListener(CompletableFuture.allOf( 958 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 959 if (err2 != null) { 960 future.completeExceptionally(err2); 961 } else { 962 future.complete(ret); 963 } 964 }); 965 }); 966 return future; 967 } 968 969 private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, 970 boolean major) { 971 CompletableFuture<Void> future = new CompletableFuture<>(); 972 addListener(getRegionLocation(regionName), (location, err) -> { 973 if (err != null) { 974 future.completeExceptionally(err); 975 return; 976 } 977 ServerName serverName = location.getServerName(); 978 if (serverName == null) { 979 future 980 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 981 return; 982 } 983 addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily), 984 (ret, err2) -> { 985 if (err2 != null) { 986 future.completeExceptionally(err2); 987 } else { 988 future.complete(ret); 989 } 990 }); 991 }); 992 return future; 993 } 994 995 /** 996 * List all region locations for the specific table. 997 */ 998 private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) { 999 if (TableName.META_TABLE_NAME.equals(tableName)) { 1000 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); 1001 // For meta table, we use zk to fetch all locations. 1002 AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration()); 1003 addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> { 1004 if (err != null) { 1005 future.completeExceptionally(err); 1006 } else if (metaRegions == null || metaRegions.isEmpty() || 1007 metaRegions.getDefaultRegionLocation() == null) { 1008 future.completeExceptionally(new IOException("meta region does not found")); 1009 } else { 1010 future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); 1011 } 1012 // close the registry. 1013 IOUtils.closeQuietly(registry); 1014 }); 1015 return future; 1016 } else { 1017 // For non-meta table, we fetch all locations by scanning hbase:meta table 1018 return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)); 1019 } 1020 } 1021 1022 /** 1023 * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() 1024 */ 1025 private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, 1026 CompactType compactType) { 1027 CompletableFuture<Void> future = new CompletableFuture<>(); 1028 1029 switch (compactType) { 1030 case MOB: 1031 addListener(connection.registry.getMasterAddress(), (serverName, err) -> { 1032 if (err != null) { 1033 future.completeExceptionally(err); 1034 return; 1035 } 1036 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 1037 addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> { 1038 if (err2 != null) { 1039 future.completeExceptionally(err2); 1040 } else { 1041 future.complete(ret); 1042 } 1043 }); 1044 }); 1045 break; 1046 case NORMAL: 1047 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 1048 if (err != null) { 1049 future.completeExceptionally(err); 1050 return; 1051 } 1052 CompletableFuture<?>[] compactFutures = 1053 locations.stream().filter(l -> l.getRegion() != null) 1054 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 1055 .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) 1056 .toArray(CompletableFuture<?>[]::new); 1057 // future complete unless all of the compact futures are completed. 1058 addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> { 1059 if (err2 != null) { 1060 future.completeExceptionally(err2); 1061 } else { 1062 future.complete(ret); 1063 } 1064 }); 1065 }); 1066 break; 1067 default: 1068 throw new IllegalArgumentException("Unknown compactType: " + compactType); 1069 } 1070 return future; 1071 } 1072 1073 /** 1074 * Compact the region at specific region server. 1075 */ 1076 private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri, 1077 final boolean major, byte[] columnFamily) { 1078 return this 1079 .<Void> newAdminCaller() 1080 .serverName(sn) 1081 .action( 1082 (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall( 1083 controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(), 1084 major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done), 1085 resp -> null)).call(); 1086 } 1087 1088 private byte[] toEncodeRegionName(byte[] regionName) { 1089 try { 1090 return RegionInfo.isEncodedRegionName(regionName) ? regionName 1091 : Bytes.toBytes(RegionInfo.encodeRegionName(regionName)); 1092 } catch (IOException e) { 1093 return regionName; 1094 } 1095 } 1096 1097 private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, 1098 CompletableFuture<TableName> result) { 1099 addListener(getRegionLocation(encodeRegionName), (location, err) -> { 1100 if (err != null) { 1101 result.completeExceptionally(err); 1102 return; 1103 } 1104 RegionInfo regionInfo = location.getRegion(); 1105 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1106 result.completeExceptionally( 1107 new IllegalArgumentException("Can't invoke merge on non-default regions directly")); 1108 return; 1109 } 1110 if (!tableName.compareAndSet(null, regionInfo.getTable())) { 1111 if (!tableName.get().equals(regionInfo.getTable())) { 1112 // tables of this two region should be same. 1113 result.completeExceptionally( 1114 new IllegalArgumentException("Cannot merge regions from two different tables " + 1115 tableName.get() + " and " + regionInfo.getTable())); 1116 } else { 1117 result.complete(tableName.get()); 1118 } 1119 } 1120 }); 1121 } 1122 1123 private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA, 1124 byte[] encodeRegionNameB) { 1125 AtomicReference<TableName> tableNameRef = new AtomicReference<>(); 1126 CompletableFuture<TableName> future = new CompletableFuture<>(); 1127 1128 checkAndGetTableName(encodeRegionNameA, tableNameRef, future); 1129 checkAndGetTableName(encodeRegionNameB, tableNameRef, future); 1130 return future; 1131 } 1132 1133 @Override 1134 public CompletableFuture<Boolean> mergeSwitch(boolean on) { 1135 return setSplitOrMergeOn(on, MasterSwitchType.MERGE); 1136 } 1137 1138 @Override 1139 public CompletableFuture<Boolean> isMergeEnabled() { 1140 return isSplitOrMergeOn(MasterSwitchType.MERGE); 1141 } 1142 1143 @Override 1144 public CompletableFuture<Boolean> splitSwitch(boolean on) { 1145 return setSplitOrMergeOn(on, MasterSwitchType.SPLIT); 1146 } 1147 1148 @Override 1149 public CompletableFuture<Boolean> isSplitEnabled() { 1150 return isSplitOrMergeOn(MasterSwitchType.SPLIT); 1151 } 1152 1153 private CompletableFuture<Boolean> setSplitOrMergeOn(boolean on, MasterSwitchType switchType) { 1154 SetSplitOrMergeEnabledRequest request = 1155 RequestConverter.buildSetSplitOrMergeEnabledRequest(on, false, switchType); 1156 return this 1157 .<Boolean> newMasterCaller() 1158 .action( 1159 (controller, stub) -> this 1160 .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call( 1161 controller, stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, 1162 done), (resp) -> resp.getPrevValueList().get(0))).call(); 1163 } 1164 1165 private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) { 1166 IsSplitOrMergeEnabledRequest request = 1167 RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType); 1168 return this 1169 .<Boolean> newMasterCaller() 1170 .action( 1171 (controller, stub) -> this 1172 .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call( 1173 controller, stub, request, 1174 (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), 1175 (resp) -> resp.getEnabled())).call(); 1176 } 1177 1178 @Override 1179 public CompletableFuture<Void> mergeRegions(byte[] nameOfRegionA, byte[] nameOfRegionB, 1180 boolean forcible) { 1181 CompletableFuture<Void> future = new CompletableFuture<>(); 1182 final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA); 1183 final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB); 1184 1185 addListener(checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB), 1186 (tableName, err) -> { 1187 if (err != null) { 1188 future.completeExceptionally(err); 1189 return; 1190 } 1191 1192 MergeTableRegionsRequest request = null; 1193 try { 1194 request = RequestConverter.buildMergeTableRegionsRequest( 1195 new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(), 1196 ng.newNonce()); 1197 } catch (DeserializationException e) { 1198 future.completeExceptionally(e); 1199 return; 1200 } 1201 1202 addListener( 1203 this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request, 1204 (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(), 1205 new MergeTableRegionProcedureBiConsumer(tableName)), 1206 (ret, err2) -> { 1207 if (err2 != null) { 1208 future.completeExceptionally(err2); 1209 } else { 1210 future.complete(ret); 1211 } 1212 }); 1213 }); 1214 return future; 1215 } 1216 1217 @Override 1218 public CompletableFuture<Void> split(TableName tableName) { 1219 CompletableFuture<Void> future = new CompletableFuture<>(); 1220 addListener(tableExists(tableName), (exist, error) -> { 1221 if (error != null) { 1222 future.completeExceptionally(error); 1223 return; 1224 } 1225 if (!exist) { 1226 future.completeExceptionally(new TableNotFoundException(tableName)); 1227 return; 1228 } 1229 addListener( 1230 metaTable 1231 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) 1232 .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION)) 1233 .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))), 1234 (results, err2) -> { 1235 if (err2 != null) { 1236 future.completeExceptionally(err2); 1237 return; 1238 } 1239 if (results != null && !results.isEmpty()) { 1240 List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); 1241 for (Result r : results) { 1242 if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) { 1243 continue; 1244 } 1245 RegionLocations rl = MetaTableAccessor.getRegionLocations(r); 1246 if (rl != null) { 1247 for (HRegionLocation h : rl.getRegionLocations()) { 1248 if (h != null && h.getServerName() != null) { 1249 RegionInfo hri = h.getRegion(); 1250 if (hri == null || hri.isSplitParent() || 1251 hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1252 continue; 1253 } 1254 splitFutures.add(split(hri, null)); 1255 } 1256 } 1257 } 1258 } 1259 addListener( 1260 CompletableFuture 1261 .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])), 1262 (ret, exception) -> { 1263 if (exception != null) { 1264 future.completeExceptionally(exception); 1265 return; 1266 } 1267 future.complete(ret); 1268 }); 1269 } else { 1270 future.complete(null); 1271 } 1272 }); 1273 }); 1274 return future; 1275 } 1276 1277 @Override 1278 public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) { 1279 CompletableFuture<Void> result = new CompletableFuture<>(); 1280 if (splitPoint == null) { 1281 return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); 1282 } 1283 addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint), 1284 (loc, err) -> { 1285 if (err != null) { 1286 result.completeExceptionally(err); 1287 } else if (loc == null || loc.getRegion() == null) { 1288 result.completeExceptionally(new IllegalArgumentException( 1289 "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); 1290 } else { 1291 addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> { 1292 if (err2 != null) { 1293 result.completeExceptionally(err2); 1294 } else { 1295 result.complete(ret); 1296 } 1297 1298 }); 1299 } 1300 }); 1301 return result; 1302 } 1303 1304 @Override 1305 public CompletableFuture<Void> splitRegion(byte[] regionName) { 1306 CompletableFuture<Void> future = new CompletableFuture<>(); 1307 addListener(getRegionLocation(regionName), (location, err) -> { 1308 RegionInfo regionInfo = location.getRegion(); 1309 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1310 future 1311 .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " + 1312 "Replicas are auto-split when their primary is split.")); 1313 return; 1314 } 1315 ServerName serverName = location.getServerName(); 1316 if (serverName == null) { 1317 future 1318 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1319 return; 1320 } 1321 addListener(split(regionInfo, null), (ret, err2) -> { 1322 if (err2 != null) { 1323 future.completeExceptionally(err2); 1324 } else { 1325 future.complete(ret); 1326 } 1327 }); 1328 }); 1329 return future; 1330 } 1331 1332 @Override 1333 public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) { 1334 Preconditions.checkNotNull(splitPoint, 1335 "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead"); 1336 CompletableFuture<Void> future = new CompletableFuture<>(); 1337 addListener(getRegionLocation(regionName), (location, err) -> { 1338 RegionInfo regionInfo = location.getRegion(); 1339 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1340 future 1341 .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " + 1342 "Replicas are auto-split when their primary is split.")); 1343 return; 1344 } 1345 ServerName serverName = location.getServerName(); 1346 if (serverName == null) { 1347 future 1348 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1349 return; 1350 } 1351 if (regionInfo.getStartKey() != null && 1352 Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) { 1353 future.completeExceptionally( 1354 new IllegalArgumentException("should not give a splitkey which equals to startkey!")); 1355 return; 1356 } 1357 addListener(split(regionInfo, splitPoint), (ret, err2) -> { 1358 if (err2 != null) { 1359 future.completeExceptionally(err2); 1360 } else { 1361 future.complete(ret); 1362 } 1363 }); 1364 }); 1365 return future; 1366 } 1367 1368 private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) { 1369 CompletableFuture<Void> future = new CompletableFuture<>(); 1370 TableName tableName = hri.getTable(); 1371 SplitTableRegionRequest request = null; 1372 try { 1373 request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(), 1374 ng.newNonce()); 1375 } catch (DeserializationException e) { 1376 future.completeExceptionally(e); 1377 return future; 1378 } 1379 1380 addListener(this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(request, 1381 (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(), 1382 new SplitTableRegionProcedureBiConsumer(tableName)), (ret, err2) -> { 1383 if (err2 != null) { 1384 future.completeExceptionally(err2); 1385 } else { 1386 future.complete(ret); 1387 } 1388 }); 1389 return future; 1390 } 1391 1392 @Override 1393 public CompletableFuture<Void> assign(byte[] regionName) { 1394 CompletableFuture<Void> future = new CompletableFuture<>(); 1395 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1396 if (err != null) { 1397 future.completeExceptionally(err); 1398 return; 1399 } 1400 addListener(this.<Void> newMasterCaller() 1401 .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( 1402 controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), 1403 (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))) 1404 .call(), (ret, err2) -> { 1405 if (err2 != null) { 1406 future.completeExceptionally(err2); 1407 } else { 1408 future.complete(ret); 1409 } 1410 }); 1411 }); 1412 return future; 1413 } 1414 1415 @Override 1416 public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) { 1417 CompletableFuture<Void> future = new CompletableFuture<>(); 1418 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1419 if (err != null) { 1420 future.completeExceptionally(err); 1421 return; 1422 } 1423 addListener( 1424 this.<Void> newMasterCaller() 1425 .action(((controller, stub) -> this 1426 .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub, 1427 RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible), 1428 (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))) 1429 .call(), 1430 (ret, err2) -> { 1431 if (err2 != null) { 1432 future.completeExceptionally(err2); 1433 } else { 1434 future.complete(ret); 1435 } 1436 }); 1437 }); 1438 return future; 1439 } 1440 1441 @Override 1442 public CompletableFuture<Void> offline(byte[] regionName) { 1443 CompletableFuture<Void> future = new CompletableFuture<>(); 1444 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1445 if (err != null) { 1446 future.completeExceptionally(err); 1447 return; 1448 } 1449 addListener( 1450 this.<Void> newMasterCaller() 1451 .action(((controller, stub) -> this 1452 .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub, 1453 RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), 1454 (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))) 1455 .call(), 1456 (ret, err2) -> { 1457 if (err2 != null) { 1458 future.completeExceptionally(err2); 1459 } else { 1460 future.complete(ret); 1461 } 1462 }); 1463 }); 1464 return future; 1465 } 1466 1467 @Override 1468 public CompletableFuture<Void> move(byte[] regionName) { 1469 CompletableFuture<Void> future = new CompletableFuture<>(); 1470 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1471 if (err != null) { 1472 future.completeExceptionally(err); 1473 return; 1474 } 1475 addListener( 1476 moveRegion( 1477 RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), 1478 (ret, err2) -> { 1479 if (err2 != null) { 1480 future.completeExceptionally(err2); 1481 } else { 1482 future.complete(ret); 1483 } 1484 }); 1485 }); 1486 return future; 1487 } 1488 1489 @Override 1490 public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) { 1491 Preconditions.checkNotNull(destServerName, 1492 "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead"); 1493 CompletableFuture<Void> future = new CompletableFuture<>(); 1494 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1495 if (err != null) { 1496 future.completeExceptionally(err); 1497 return; 1498 } 1499 addListener(moveRegion(RequestConverter 1500 .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), 1501 (ret, err2) -> { 1502 if (err2 != null) { 1503 future.completeExceptionally(err2); 1504 } else { 1505 future.complete(ret); 1506 } 1507 }); 1508 }); 1509 return future; 1510 } 1511 1512 private CompletableFuture<Void> moveRegion(MoveRegionRequest request) { 1513 return this 1514 .<Void> newMasterCaller() 1515 .action( 1516 (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller, 1517 stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)).call(); 1518 } 1519 1520 @Override 1521 public CompletableFuture<Void> setQuota(QuotaSettings quota) { 1522 return this 1523 .<Void> newMasterCaller() 1524 .action( 1525 (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller, 1526 stub, QuotaSettings.buildSetQuotaRequestProto(quota), 1527 (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call(); 1528 } 1529 1530 @Override 1531 public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { 1532 CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); 1533 Scan scan = QuotaTableUtil.makeScan(filter); 1534 this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build() 1535 .scan(scan, new AdvancedScanResultConsumer() { 1536 List<QuotaSettings> settings = new ArrayList<>(); 1537 1538 @Override 1539 public void onNext(Result[] results, ScanController controller) { 1540 for (Result result : results) { 1541 try { 1542 QuotaTableUtil.parseResultToCollection(result, settings); 1543 } catch (IOException e) { 1544 controller.terminate(); 1545 future.completeExceptionally(e); 1546 } 1547 } 1548 } 1549 1550 @Override 1551 public void onError(Throwable error) { 1552 future.completeExceptionally(error); 1553 } 1554 1555 @Override 1556 public void onComplete() { 1557 future.complete(settings); 1558 } 1559 }); 1560 return future; 1561 } 1562 1563 @Override 1564 public CompletableFuture<Void> addReplicationPeer(String peerId, 1565 ReplicationPeerConfig peerConfig, boolean enabled) { 1566 return this 1567 .<Void> newMasterCaller() 1568 .action( 1569 (controller, stub) -> this 1570 .<AddReplicationPeerRequest, AddReplicationPeerResponse, Void> call(controller, stub, 1571 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), (s, 1572 c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call(); 1573 } 1574 1575 @Override 1576 public CompletableFuture<Void> removeReplicationPeer(String peerId) { 1577 return this 1578 .<Void> newMasterCaller() 1579 .action( 1580 (controller, stub) -> this 1581 .<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse, Void> call(controller, 1582 stub, RequestConverter.buildRemoveReplicationPeerRequest(peerId), 1583 (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> null)).call(); 1584 } 1585 1586 @Override 1587 public CompletableFuture<Void> enableReplicationPeer(String peerId) { 1588 return this 1589 .<Void> newMasterCaller() 1590 .action( 1591 (controller, stub) -> this 1592 .<EnableReplicationPeerRequest, EnableReplicationPeerResponse, Void> call(controller, 1593 stub, RequestConverter.buildEnableReplicationPeerRequest(peerId), 1594 (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> null)).call(); 1595 } 1596 1597 @Override 1598 public CompletableFuture<Void> disableReplicationPeer(String peerId) { 1599 return this 1600 .<Void> newMasterCaller() 1601 .action( 1602 (controller, stub) -> this 1603 .<DisableReplicationPeerRequest, DisableReplicationPeerResponse, Void> call( 1604 controller, stub, RequestConverter.buildDisableReplicationPeerRequest(peerId), (s, 1605 c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> null)) 1606 .call(); 1607 } 1608 1609 @Override 1610 public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) { 1611 return this 1612 .<ReplicationPeerConfig> newMasterCaller() 1613 .action( 1614 (controller, stub) -> this 1615 .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call( 1616 controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), ( 1617 s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), 1618 (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call(); 1619 } 1620 1621 @Override 1622 public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, 1623 ReplicationPeerConfig peerConfig) { 1624 return this 1625 .<Void> newMasterCaller() 1626 .action( 1627 (controller, stub) -> this 1628 .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse, Void> call( 1629 controller, stub, RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, 1630 peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), ( 1631 resp) -> null)).call(); 1632 } 1633 1634 @Override 1635 public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, 1636 Map<TableName, List<String>> tableCfs) { 1637 if (tableCfs == null) { 1638 return failedFuture(new ReplicationException("tableCfs is null")); 1639 } 1640 1641 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1642 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1643 if (!completeExceptionally(future, error)) { 1644 ReplicationPeerConfig newPeerConfig = 1645 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); 1646 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1647 if (!completeExceptionally(future, error)) { 1648 future.complete(result); 1649 } 1650 }); 1651 } 1652 }); 1653 return future; 1654 } 1655 1656 @Override 1657 public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, 1658 Map<TableName, List<String>> tableCfs) { 1659 if (tableCfs == null) { 1660 return failedFuture(new ReplicationException("tableCfs is null")); 1661 } 1662 1663 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1664 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1665 if (!completeExceptionally(future, error)) { 1666 ReplicationPeerConfig newPeerConfig = null; 1667 try { 1668 newPeerConfig = ReplicationPeerConfigUtil 1669 .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); 1670 } catch (ReplicationException e) { 1671 future.completeExceptionally(e); 1672 return; 1673 } 1674 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1675 if (!completeExceptionally(future, error)) { 1676 future.complete(result); 1677 } 1678 }); 1679 } 1680 }); 1681 return future; 1682 } 1683 1684 @Override 1685 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() { 1686 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null)); 1687 } 1688 1689 @Override 1690 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) { 1691 Preconditions.checkNotNull(pattern, 1692 "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead"); 1693 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern)); 1694 } 1695 1696 private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers( 1697 ListReplicationPeersRequest request) { 1698 return this 1699 .<List<ReplicationPeerDescription>> newMasterCaller() 1700 .action( 1701 (controller, stub) -> this 1702 .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call( 1703 controller, 1704 stub, 1705 request, 1706 (s, c, req, done) -> s.listReplicationPeers(c, req, done), 1707 (resp) -> resp.getPeerDescList().stream() 1708 .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) 1709 .collect(Collectors.toList()))).call(); 1710 } 1711 1712 @Override 1713 public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { 1714 CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); 1715 addListener(listTableDescriptors(), (tables, error) -> { 1716 if (!completeExceptionally(future, error)) { 1717 List<TableCFs> replicatedTableCFs = new ArrayList<>(); 1718 tables.forEach(table -> { 1719 Map<String, Integer> cfs = new HashMap<>(); 1720 Stream.of(table.getColumnFamilies()) 1721 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) 1722 .forEach(column -> { 1723 cfs.put(column.getNameAsString(), column.getScope()); 1724 }); 1725 if (!cfs.isEmpty()) { 1726 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); 1727 } 1728 }); 1729 future.complete(replicatedTableCFs); 1730 } 1731 }); 1732 return future; 1733 } 1734 1735 @Override 1736 public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { 1737 SnapshotProtos.SnapshotDescription snapshot = 1738 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); 1739 try { 1740 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 1741 } catch (IllegalArgumentException e) { 1742 return failedFuture(e); 1743 } 1744 CompletableFuture<Void> future = new CompletableFuture<>(); 1745 final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build(); 1746 addListener(this.<Long> newMasterCaller() 1747 .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller, 1748 stub, request, (s, c, req, done) -> s.snapshot(c, req, done), 1749 resp -> resp.getExpectedTimeout())) 1750 .call(), (expectedTimeout, err) -> { 1751 if (err != null) { 1752 future.completeExceptionally(err); 1753 return; 1754 } 1755 TimerTask pollingTask = new TimerTask() { 1756 int tries = 0; 1757 long startTime = EnvironmentEdgeManager.currentTime(); 1758 long endTime = startTime + expectedTimeout; 1759 long maxPauseTime = expectedTimeout / maxAttempts; 1760 1761 @Override 1762 public void run(Timeout timeout) throws Exception { 1763 if (EnvironmentEdgeManager.currentTime() < endTime) { 1764 addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> { 1765 if (err2 != null) { 1766 future.completeExceptionally(err2); 1767 } else if (done) { 1768 future.complete(null); 1769 } else { 1770 // retry again after pauseTime. 1771 long pauseTime = 1772 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 1773 pauseTime = Math.min(pauseTime, maxPauseTime); 1774 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 1775 TimeUnit.MILLISECONDS); 1776 } 1777 }); 1778 } else { 1779 future.completeExceptionally( 1780 new SnapshotCreationException("Snapshot '" + snapshot.getName() + 1781 "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc)); 1782 } 1783 } 1784 }; 1785 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 1786 }); 1787 return future; 1788 } 1789 1790 @Override 1791 public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) { 1792 return this 1793 .<Boolean> newMasterCaller() 1794 .action( 1795 (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call( 1796 controller, 1797 stub, 1798 IsSnapshotDoneRequest.newBuilder() 1799 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c, 1800 req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call(); 1801 } 1802 1803 @Override 1804 public CompletableFuture<Void> restoreSnapshot(String snapshotName) { 1805 boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean( 1806 HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT, 1807 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT); 1808 return restoreSnapshot(snapshotName, takeFailSafeSnapshot); 1809 } 1810 1811 @Override 1812 public CompletableFuture<Void> restoreSnapshot(String snapshotName, 1813 boolean takeFailSafeSnapshot) { 1814 CompletableFuture<Void> future = new CompletableFuture<>(); 1815 addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> { 1816 if (err != null) { 1817 future.completeExceptionally(err); 1818 return; 1819 } 1820 TableName tableName = null; 1821 if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { 1822 for (SnapshotDescription snap : snapshotDescriptions) { 1823 if (snap.getName().equals(snapshotName)) { 1824 tableName = snap.getTableName(); 1825 break; 1826 } 1827 } 1828 } 1829 if (tableName == null) { 1830 future.completeExceptionally(new RestoreSnapshotException( 1831 "Unable to find the table name for snapshot=" + snapshotName)); 1832 return; 1833 } 1834 final TableName finalTableName = tableName; 1835 addListener(tableExists(finalTableName), (exists, err2) -> { 1836 if (err2 != null) { 1837 future.completeExceptionally(err2); 1838 } else if (!exists) { 1839 // if table does not exist, then just clone snapshot into new table. 1840 completeConditionalOnFuture(future, 1841 internalRestoreSnapshot(snapshotName, finalTableName)); 1842 } else { 1843 addListener(isTableDisabled(finalTableName), (disabled, err4) -> { 1844 if (err4 != null) { 1845 future.completeExceptionally(err4); 1846 } else if (!disabled) { 1847 future.completeExceptionally(new TableNotDisabledException(finalTableName)); 1848 } else { 1849 completeConditionalOnFuture(future, 1850 restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot)); 1851 } 1852 }); 1853 } 1854 }); 1855 }); 1856 return future; 1857 } 1858 1859 private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName, 1860 boolean takeFailSafeSnapshot) { 1861 if (takeFailSafeSnapshot) { 1862 CompletableFuture<Void> future = new CompletableFuture<>(); 1863 // Step.1 Take a snapshot of the current state 1864 String failSafeSnapshotSnapshotNameFormat = 1865 this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, 1866 HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); 1867 final String failSafeSnapshotSnapshotName = 1868 failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName) 1869 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) 1870 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); 1871 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 1872 addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> { 1873 if (err != null) { 1874 future.completeExceptionally(err); 1875 } else { 1876 // Step.2 Restore snapshot 1877 addListener(internalRestoreSnapshot(snapshotName, tableName), (void2, err2) -> { 1878 if (err2 != null) { 1879 // Step.3.a Something went wrong during the restore and try to rollback. 1880 addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName), 1881 (void3, err3) -> { 1882 if (err3 != null) { 1883 future.completeExceptionally(err3); 1884 } else { 1885 String msg = 1886 "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" + 1887 failSafeSnapshotSnapshotName + " succeeded."; 1888 future.completeExceptionally(new RestoreSnapshotException(msg)); 1889 } 1890 }); 1891 } else { 1892 // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. 1893 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 1894 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> { 1895 if (err3 != null) { 1896 LOG.error( 1897 "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, 1898 err3); 1899 future.completeExceptionally(err3); 1900 } else { 1901 future.complete(ret3); 1902 } 1903 }); 1904 } 1905 }); 1906 } 1907 }); 1908 return future; 1909 } else { 1910 return internalRestoreSnapshot(snapshotName, tableName); 1911 } 1912 } 1913 1914 private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, 1915 CompletableFuture<T> parentFuture) { 1916 addListener(parentFuture, (res, err) -> { 1917 if (err != null) { 1918 dependentFuture.completeExceptionally(err); 1919 } else { 1920 dependentFuture.complete(res); 1921 } 1922 }); 1923 } 1924 1925 @Override 1926 public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) { 1927 CompletableFuture<Void> future = new CompletableFuture<>(); 1928 addListener(tableExists(tableName), (exists, err) -> { 1929 if (err != null) { 1930 future.completeExceptionally(err); 1931 } else if (exists) { 1932 future.completeExceptionally(new TableExistsException(tableName)); 1933 } else { 1934 completeConditionalOnFuture(future, internalRestoreSnapshot(snapshotName, tableName)); 1935 } 1936 }); 1937 return future; 1938 } 1939 1940 private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName) { 1941 SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder() 1942 .setName(snapshotName).setTable(tableName.getNameAsString()).build(); 1943 try { 1944 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 1945 } catch (IllegalArgumentException e) { 1946 return failedFuture(e); 1947 } 1948 return waitProcedureResult(this 1949 .<Long> newMasterCaller() 1950 .action( 1951 (controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call( 1952 controller, stub, RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot) 1953 .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(), (s, c, req, 1954 done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())).call()); 1955 } 1956 1957 @Override 1958 public CompletableFuture<List<SnapshotDescription>> listSnapshots() { 1959 return getCompletedSnapshots(null); 1960 } 1961 1962 @Override 1963 public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) { 1964 Preconditions.checkNotNull(pattern, 1965 "pattern is null. If you don't specify a pattern, use listSnapshots() instead"); 1966 return getCompletedSnapshots(pattern); 1967 } 1968 1969 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) { 1970 return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this 1971 .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>> 1972 call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(), 1973 (s, c, req, done) -> s.getCompletedSnapshots(c, req, done), 1974 resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern))) 1975 .call(); 1976 } 1977 1978 @Override 1979 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) { 1980 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 1981 + " If you don't specify a tableNamePattern, use listSnapshots() instead"); 1982 return getCompletedSnapshots(tableNamePattern, null); 1983 } 1984 1985 @Override 1986 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern, 1987 Pattern snapshotNamePattern) { 1988 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 1989 + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead"); 1990 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 1991 + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead"); 1992 return getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 1993 } 1994 1995 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots( 1996 Pattern tableNamePattern, Pattern snapshotNamePattern) { 1997 CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); 1998 addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> { 1999 if (err != null) { 2000 future.completeExceptionally(err); 2001 return; 2002 } 2003 if (tableNames == null || tableNames.size() <= 0) { 2004 future.complete(Collections.emptyList()); 2005 return; 2006 } 2007 addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> { 2008 if (err2 != null) { 2009 future.completeExceptionally(err2); 2010 return; 2011 } 2012 if (snapshotDescList == null || snapshotDescList.isEmpty()) { 2013 future.complete(Collections.emptyList()); 2014 return; 2015 } 2016 future.complete(snapshotDescList.stream() 2017 .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) 2018 .collect(Collectors.toList())); 2019 }); 2020 }); 2021 return future; 2022 } 2023 2024 @Override 2025 public CompletableFuture<Void> deleteSnapshot(String snapshotName) { 2026 return internalDeleteSnapshot(new SnapshotDescription(snapshotName)); 2027 } 2028 2029 @Override 2030 public CompletableFuture<Void> deleteSnapshots() { 2031 return internalDeleteSnapshots(null, null); 2032 } 2033 2034 @Override 2035 public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) { 2036 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2037 + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead"); 2038 return internalDeleteSnapshots(null, snapshotNamePattern); 2039 } 2040 2041 @Override 2042 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) { 2043 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2044 + " If you don't specify a tableNamePattern, use deleteSnapshots() instead"); 2045 return internalDeleteSnapshots(tableNamePattern, null); 2046 } 2047 2048 @Override 2049 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern, 2050 Pattern snapshotNamePattern) { 2051 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2052 + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead"); 2053 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2054 + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead"); 2055 return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern); 2056 } 2057 2058 private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern, 2059 Pattern snapshotNamePattern) { 2060 CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture; 2061 if (tableNamePattern == null) { 2062 listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern); 2063 } else { 2064 listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2065 } 2066 CompletableFuture<Void> future = new CompletableFuture<>(); 2067 addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> { 2068 if (err != null) { 2069 future.completeExceptionally(err); 2070 return; 2071 } 2072 if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { 2073 future.complete(null); 2074 return; 2075 } 2076 addListener(CompletableFuture.allOf(snapshotDescriptions.stream() 2077 .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> { 2078 if (e != null) { 2079 future.completeExceptionally(e); 2080 } else { 2081 future.complete(v); 2082 } 2083 }); 2084 })); 2085 return future; 2086 } 2087 2088 private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { 2089 return this 2090 .<Void> newMasterCaller() 2091 .action( 2092 (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call( 2093 controller, 2094 stub, 2095 DeleteSnapshotRequest.newBuilder() 2096 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c, 2097 req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call(); 2098 } 2099 2100 @Override 2101 public CompletableFuture<Void> execProcedure(String signature, String instance, 2102 Map<String, String> props) { 2103 CompletableFuture<Void> future = new CompletableFuture<>(); 2104 ProcedureDescription procDesc = 2105 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2106 addListener(this.<Long> newMasterCaller() 2107 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( 2108 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), 2109 (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) 2110 .call(), (expectedTimeout, err) -> { 2111 if (err != null) { 2112 future.completeExceptionally(err); 2113 return; 2114 } 2115 TimerTask pollingTask = new TimerTask() { 2116 int tries = 0; 2117 long startTime = EnvironmentEdgeManager.currentTime(); 2118 long endTime = startTime + expectedTimeout; 2119 long maxPauseTime = expectedTimeout / maxAttempts; 2120 2121 @Override 2122 public void run(Timeout timeout) throws Exception { 2123 if (EnvironmentEdgeManager.currentTime() < endTime) { 2124 addListener(isProcedureFinished(signature, instance, props), (done, err2) -> { 2125 if (err2 != null) { 2126 future.completeExceptionally(err2); 2127 return; 2128 } 2129 if (done) { 2130 future.complete(null); 2131 } else { 2132 // retry again after pauseTime. 2133 long pauseTime = 2134 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2135 pauseTime = Math.min(pauseTime, maxPauseTime); 2136 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 2137 TimeUnit.MICROSECONDS); 2138 } 2139 }); 2140 } else { 2141 future.completeExceptionally(new IOException("Procedure '" + signature + " : " + 2142 instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); 2143 } 2144 } 2145 }; 2146 // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. 2147 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2148 }); 2149 return future; 2150 } 2151 2152 @Override 2153 public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance, 2154 Map<String, String> props) { 2155 ProcedureDescription proDesc = 2156 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2157 return this.<byte[]> newMasterCaller() 2158 .action( 2159 (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call( 2160 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(), 2161 (s, c, req, done) -> s.execProcedureWithRet(c, req, done), 2162 resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null)) 2163 .call(); 2164 } 2165 2166 @Override 2167 public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance, 2168 Map<String, String> props) { 2169 ProcedureDescription proDesc = 2170 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2171 return this.<Boolean> newMasterCaller() 2172 .action((controller, stub) -> this 2173 .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub, 2174 IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(), 2175 (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone())) 2176 .call(); 2177 } 2178 2179 @Override 2180 public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) { 2181 return this.<Boolean> newMasterCaller().action( 2182 (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call( 2183 controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(), 2184 (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted())) 2185 .call(); 2186 } 2187 2188 @Override 2189 public CompletableFuture<String> getProcedures() { 2190 return this 2191 .<String> newMasterCaller() 2192 .action( 2193 (controller, stub) -> this 2194 .<GetProceduresRequest, GetProceduresResponse, String> call( 2195 controller, stub, GetProceduresRequest.newBuilder().build(), 2196 (s, c, req, done) -> s.getProcedures(c, req, done), 2197 resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call(); 2198 } 2199 2200 @Override 2201 public CompletableFuture<String> getLocks() { 2202 return this 2203 .<String> newMasterCaller() 2204 .action( 2205 (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call( 2206 controller, stub, GetLocksRequest.newBuilder().build(), 2207 (s, c, req, done) -> s.getLocks(c, req, done), 2208 resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call(); 2209 } 2210 2211 @Override 2212 public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload) { 2213 return this.<Void> newMasterCaller() 2214 .action((controller, stub) -> this 2215 .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call( 2216 controller, stub, RequestConverter.buildDecommissionRegionServersRequest(servers, offload), 2217 (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null)) 2218 .call(); 2219 } 2220 2221 @Override 2222 public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() { 2223 return this.<List<ServerName>> newMasterCaller() 2224 .action((controller, stub) -> this 2225 .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse, 2226 List<ServerName>> call( 2227 controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(), 2228 (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done), 2229 resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) 2230 .collect(Collectors.toList()))) 2231 .call(); 2232 } 2233 2234 @Override 2235 public CompletableFuture<Void> recommissionRegionServer(ServerName server, 2236 List<byte[]> encodedRegionNames) { 2237 return this.<Void> newMasterCaller() 2238 .action((controller, stub) -> this 2239 .<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(controller, 2240 stub, RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames), 2241 (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null)) 2242 .call(); 2243 } 2244 2245 /** 2246 * Get the region location for the passed region name. The region name may be a full region name 2247 * or encoded region name. If the region does not found, then it'll throw an 2248 * UnknownRegionException wrapped by a {@link CompletableFuture} 2249 * @param regionNameOrEncodedRegionName 2250 * @return region location, wrapped by a {@link CompletableFuture} 2251 */ 2252 @VisibleForTesting 2253 CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) { 2254 if (regionNameOrEncodedRegionName == null) { 2255 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2256 } 2257 try { 2258 CompletableFuture<Optional<HRegionLocation>> future; 2259 if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { 2260 future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable, 2261 regionNameOrEncodedRegionName); 2262 } else { 2263 future = AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName); 2264 } 2265 2266 CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); 2267 addListener(future, (location, err) -> { 2268 if (err != null) { 2269 returnedFuture.completeExceptionally(err); 2270 return; 2271 } 2272 if (!location.isPresent() || location.get().getRegion() == null) { 2273 returnedFuture.completeExceptionally( 2274 new UnknownRegionException("Invalid region name or encoded region name: " + 2275 Bytes.toStringBinary(regionNameOrEncodedRegionName))); 2276 } else { 2277 returnedFuture.complete(location.get()); 2278 } 2279 }); 2280 return returnedFuture; 2281 } catch (IOException e) { 2282 return failedFuture(e); 2283 } 2284 } 2285 2286 /** 2287 * Get the region info for the passed region name. The region name may be a full region name or 2288 * encoded region name. If the region does not found, then it'll throw an UnknownRegionException 2289 * wrapped by a {@link CompletableFuture} 2290 * @param regionNameOrEncodedRegionName 2291 * @return region info, wrapped by a {@link CompletableFuture} 2292 */ 2293 private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) { 2294 if (regionNameOrEncodedRegionName == null) { 2295 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2296 } 2297 2298 if (Bytes.equals(regionNameOrEncodedRegionName, 2299 RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) || 2300 Bytes.equals(regionNameOrEncodedRegionName, 2301 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) { 2302 return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO); 2303 } 2304 2305 CompletableFuture<RegionInfo> future = new CompletableFuture<>(); 2306 addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> { 2307 if (err != null) { 2308 future.completeExceptionally(err); 2309 } else { 2310 future.complete(location.getRegion()); 2311 } 2312 }); 2313 return future; 2314 } 2315 2316 private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { 2317 if (numRegions < 3) { 2318 throw new IllegalArgumentException("Must create at least three regions"); 2319 } else if (Bytes.compareTo(startKey, endKey) >= 0) { 2320 throw new IllegalArgumentException("Start key must be smaller than end key"); 2321 } 2322 if (numRegions == 3) { 2323 return new byte[][] { startKey, endKey }; 2324 } 2325 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 2326 if (splitKeys == null || splitKeys.length != numRegions - 1) { 2327 throw new IllegalArgumentException("Unable to split key range into enough regions"); 2328 } 2329 return splitKeys; 2330 } 2331 2332 private void verifySplitKeys(byte[][] splitKeys) { 2333 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); 2334 // Verify there are no duplicate split keys 2335 byte[] lastKey = null; 2336 for (byte[] splitKey : splitKeys) { 2337 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { 2338 throw new IllegalArgumentException("Empty split key must not be passed in the split keys."); 2339 } 2340 if (lastKey != null && Bytes.equals(splitKey, lastKey)) { 2341 throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: " 2342 + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey)); 2343 } 2344 lastKey = splitKey; 2345 } 2346 } 2347 2348 private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> { 2349 2350 abstract void onFinished(); 2351 2352 abstract void onError(Throwable error); 2353 2354 @Override 2355 public void accept(Void v, Throwable error) { 2356 if (error != null) { 2357 onError(error); 2358 return; 2359 } 2360 onFinished(); 2361 } 2362 } 2363 2364 private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { 2365 protected final TableName tableName; 2366 2367 TableProcedureBiConsumer(TableName tableName) { 2368 this.tableName = tableName; 2369 } 2370 2371 abstract String getOperationType(); 2372 2373 String getDescription() { 2374 return "Operation: " + getOperationType() + ", " + "Table Name: " 2375 + tableName.getNameWithNamespaceInclAsString(); 2376 } 2377 2378 @Override 2379 void onFinished() { 2380 LOG.info(getDescription() + " completed"); 2381 } 2382 2383 @Override 2384 void onError(Throwable error) { 2385 LOG.info(getDescription() + " failed with " + error.getMessage()); 2386 } 2387 } 2388 2389 private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer { 2390 protected final String namespaceName; 2391 2392 NamespaceProcedureBiConsumer(String namespaceName) { 2393 this.namespaceName = namespaceName; 2394 } 2395 2396 abstract String getOperationType(); 2397 2398 String getDescription() { 2399 return "Operation: " + getOperationType() + ", Namespace: " + namespaceName; 2400 } 2401 2402 @Override 2403 void onFinished() { 2404 LOG.info(getDescription() + " completed"); 2405 } 2406 2407 @Override 2408 void onError(Throwable error) { 2409 LOG.info(getDescription() + " failed with " + error.getMessage()); 2410 } 2411 } 2412 2413 private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2414 2415 CreateTableProcedureBiConsumer(TableName tableName) { 2416 super(tableName); 2417 } 2418 2419 @Override 2420 String getOperationType() { 2421 return "CREATE"; 2422 } 2423 } 2424 2425 private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { 2426 2427 ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2428 super(tableName); 2429 } 2430 2431 @Override 2432 String getOperationType() { 2433 return "ENABLE"; 2434 } 2435 } 2436 2437 private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { 2438 2439 DeleteTableProcedureBiConsumer(TableName tableName) { 2440 super(tableName); 2441 } 2442 2443 @Override 2444 String getOperationType() { 2445 return "DELETE"; 2446 } 2447 2448 @Override 2449 void onFinished() { 2450 connection.getLocator().clearCache(this.tableName); 2451 super.onFinished(); 2452 } 2453 } 2454 2455 private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2456 2457 TruncateTableProcedureBiConsumer(TableName tableName) { 2458 super(tableName); 2459 } 2460 2461 @Override 2462 String getOperationType() { 2463 return "TRUNCATE"; 2464 } 2465 } 2466 2467 private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2468 2469 EnableTableProcedureBiConsumer(TableName tableName) { 2470 super(tableName); 2471 } 2472 2473 @Override 2474 String getOperationType() { 2475 return "ENABLE"; 2476 } 2477 } 2478 2479 private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2480 2481 DisableTableProcedureBiConsumer(TableName tableName) { 2482 super(tableName); 2483 } 2484 2485 @Override 2486 String getOperationType() { 2487 return "DISABLE"; 2488 } 2489 } 2490 2491 private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2492 2493 AddColumnFamilyProcedureBiConsumer(TableName tableName) { 2494 super(tableName); 2495 } 2496 2497 @Override 2498 String getOperationType() { 2499 return "ADD_COLUMN_FAMILY"; 2500 } 2501 } 2502 2503 private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2504 2505 DeleteColumnFamilyProcedureBiConsumer(TableName tableName) { 2506 super(tableName); 2507 } 2508 2509 @Override 2510 String getOperationType() { 2511 return "DELETE_COLUMN_FAMILY"; 2512 } 2513 } 2514 2515 private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2516 2517 ModifyColumnFamilyProcedureBiConsumer(TableName tableName) { 2518 super(tableName); 2519 } 2520 2521 @Override 2522 String getOperationType() { 2523 return "MODIFY_COLUMN_FAMILY"; 2524 } 2525 } 2526 2527 private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2528 2529 CreateNamespaceProcedureBiConsumer(String namespaceName) { 2530 super(namespaceName); 2531 } 2532 2533 @Override 2534 String getOperationType() { 2535 return "CREATE_NAMESPACE"; 2536 } 2537 } 2538 2539 private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2540 2541 DeleteNamespaceProcedureBiConsumer(String namespaceName) { 2542 super(namespaceName); 2543 } 2544 2545 @Override 2546 String getOperationType() { 2547 return "DELETE_NAMESPACE"; 2548 } 2549 } 2550 2551 private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2552 2553 ModifyNamespaceProcedureBiConsumer(String namespaceName) { 2554 super(namespaceName); 2555 } 2556 2557 @Override 2558 String getOperationType() { 2559 return "MODIFY_NAMESPACE"; 2560 } 2561 } 2562 2563 private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2564 2565 MergeTableRegionProcedureBiConsumer(TableName tableName) { 2566 super(tableName); 2567 } 2568 2569 @Override 2570 String getOperationType() { 2571 return "MERGE_REGIONS"; 2572 } 2573 } 2574 2575 private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2576 2577 SplitTableRegionProcedureBiConsumer(TableName tableName) { 2578 super(tableName); 2579 } 2580 2581 @Override 2582 String getOperationType() { 2583 return "SPLIT_REGION"; 2584 } 2585 } 2586 2587 private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) { 2588 CompletableFuture<Void> future = new CompletableFuture<>(); 2589 addListener(procFuture, (procId, error) -> { 2590 if (error != null) { 2591 future.completeExceptionally(error); 2592 return; 2593 } 2594 getProcedureResult(procId, future, 0); 2595 }); 2596 return future; 2597 } 2598 2599 private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) { 2600 addListener( 2601 this.<GetProcedureResultResponse> newMasterCaller() 2602 .action((controller, stub) -> this 2603 .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call( 2604 controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(), 2605 (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) 2606 .call(), 2607 (response, error) -> { 2608 if (error != null) { 2609 LOG.warn("failed to get the procedure result procId={}", procId, 2610 ConnectionUtils.translateException(error)); 2611 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2612 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2613 return; 2614 } 2615 if (response.getState() == GetProcedureResultResponse.State.RUNNING) { 2616 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2617 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2618 return; 2619 } 2620 if (response.hasException()) { 2621 IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); 2622 future.completeExceptionally(ioe); 2623 } else { 2624 future.complete(null); 2625 } 2626 }); 2627 } 2628 2629 private <T> CompletableFuture<T> failedFuture(Throwable error) { 2630 CompletableFuture<T> future = new CompletableFuture<>(); 2631 future.completeExceptionally(error); 2632 return future; 2633 } 2634 2635 private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) { 2636 if (error != null) { 2637 future.completeExceptionally(error); 2638 return true; 2639 } 2640 return false; 2641 } 2642 2643 @Override 2644 public CompletableFuture<ClusterMetrics> getClusterMetrics() { 2645 return getClusterMetrics(EnumSet.allOf(Option.class)); 2646 } 2647 2648 @Override 2649 public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) { 2650 return this 2651 .<ClusterMetrics> newMasterCaller() 2652 .action( 2653 (controller, stub) -> this 2654 .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller, 2655 stub, RequestConverter.buildGetClusterStatusRequest(options), 2656 (s, c, req, done) -> s.getClusterStatus(c, req, done), 2657 resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call(); 2658 } 2659 2660 @Override 2661 public CompletableFuture<Void> shutdown() { 2662 return this 2663 .<Void> newMasterCaller() 2664 .action( 2665 (controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller, 2666 stub, ShutdownRequest.newBuilder().build(), 2667 (s, c, req, done) -> s.shutdown(c, req, done), resp -> null)).call(); 2668 } 2669 2670 @Override 2671 public CompletableFuture<Void> stopMaster() { 2672 return this 2673 .<Void> newMasterCaller() 2674 .action( 2675 (controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(controller, 2676 stub, StopMasterRequest.newBuilder().build(), 2677 (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)).call(); 2678 } 2679 2680 @Override 2681 public CompletableFuture<Void> stopRegionServer(ServerName serverName) { 2682 StopServerRequest request = 2683 RequestConverter.buildStopServerRequest("Called by admin client " 2684 + this.connection.toString()); 2685 return this 2686 .<Void> newAdminCaller() 2687 .action( 2688 (controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall( 2689 controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), 2690 resp -> null)).serverName(serverName).call(); 2691 } 2692 2693 @Override 2694 public CompletableFuture<Void> updateConfiguration(ServerName serverName) { 2695 return this 2696 .<Void> newAdminCaller() 2697 .action( 2698 (controller, stub) -> this 2699 .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall( 2700 controller, stub, UpdateConfigurationRequest.getDefaultInstance(), 2701 (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null)) 2702 .serverName(serverName).call(); 2703 } 2704 2705 @Override 2706 public CompletableFuture<Void> updateConfiguration() { 2707 CompletableFuture<Void> future = new CompletableFuture<Void>(); 2708 addListener( 2709 getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS)), 2710 (status, err) -> { 2711 if (err != null) { 2712 future.completeExceptionally(err); 2713 } else { 2714 List<CompletableFuture<Void>> futures = new ArrayList<>(); 2715 status.getLiveServerMetrics().keySet() 2716 .forEach(server -> futures.add(updateConfiguration(server))); 2717 futures.add(updateConfiguration(status.getMasterName())); 2718 status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master))); 2719 addListener( 2720 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 2721 (result, err2) -> { 2722 if (err2 != null) { 2723 future.completeExceptionally(err2); 2724 } else { 2725 future.complete(result); 2726 } 2727 }); 2728 } 2729 }); 2730 return future; 2731 } 2732 2733 @Override 2734 public CompletableFuture<Void> rollWALWriter(ServerName serverName) { 2735 return this 2736 .<Void> newAdminCaller() 2737 .action( 2738 (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall( 2739 controller, stub, RequestConverter.buildRollWALWriterRequest(), 2740 (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null)) 2741 .serverName(serverName).call(); 2742 } 2743 2744 @Override 2745 public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) { 2746 return this 2747 .<Void> newAdminCaller() 2748 .action( 2749 (controller, stub) -> this 2750 .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall( 2751 controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s, 2752 c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null)) 2753 .serverName(serverName).call(); 2754 } 2755 2756 @Override 2757 public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() { 2758 return this 2759 .<List<SecurityCapability>> newMasterCaller() 2760 .action( 2761 (controller, stub) -> this 2762 .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>> call( 2763 controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), (s, c, req, 2764 done) -> s.getSecurityCapabilities(c, req, done), (resp) -> ProtobufUtil 2765 .toSecurityCapabilityList(resp.getCapabilitiesList()))).call(); 2766 } 2767 2768 @Override 2769 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) { 2770 return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName); 2771 } 2772 2773 @Override 2774 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName, 2775 TableName tableName) { 2776 Preconditions.checkNotNull(tableName, 2777 "tableName is null. If you don't specify a tableName, use getRegionLoads() instead"); 2778 return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName); 2779 } 2780 2781 private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request, 2782 ServerName serverName) { 2783 return this.<List<RegionMetrics>> newAdminCaller() 2784 .action((controller, stub) -> this 2785 .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>> 2786 adminCall(controller, stub, request, (s, c, req, done) -> 2787 s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics)) 2788 .serverName(serverName).call(); 2789 } 2790 2791 @Override 2792 public CompletableFuture<Boolean> isMasterInMaintenanceMode() { 2793 return this 2794 .<Boolean> newMasterCaller() 2795 .action( 2796 (controller, stub) -> this 2797 .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller, 2798 stub, IsInMaintenanceModeRequest.newBuilder().build(), 2799 (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done), 2800 resp -> resp.getInMaintenanceMode())).call(); 2801 } 2802 2803 @Override 2804 public CompletableFuture<CompactionState> getCompactionState(TableName tableName, 2805 CompactType compactType) { 2806 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 2807 2808 switch (compactType) { 2809 case MOB: 2810 addListener(connection.registry.getMasterAddress(), (serverName, err) -> { 2811 if (err != null) { 2812 future.completeExceptionally(err); 2813 return; 2814 } 2815 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 2816 2817 addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName) 2818 .action((controller, stub) -> this 2819 .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall( 2820 controller, stub, 2821 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), 2822 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 2823 .call(), (resp2, err2) -> { 2824 if (err2 != null) { 2825 future.completeExceptionally(err2); 2826 } else { 2827 if (resp2.hasCompactionState()) { 2828 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 2829 } else { 2830 future.complete(CompactionState.NONE); 2831 } 2832 } 2833 }); 2834 }); 2835 break; 2836 case NORMAL: 2837 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 2838 if (err != null) { 2839 future.completeExceptionally(err); 2840 return; 2841 } 2842 List<CompactionState> regionStates = new ArrayList<>(); 2843 List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); 2844 locations.stream().filter(loc -> loc.getServerName() != null) 2845 .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline()) 2846 .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { 2847 futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { 2848 // If any region compaction state is MAJOR_AND_MINOR 2849 // the table compaction state is MAJOR_AND_MINOR, too. 2850 if (err2 != null) { 2851 future.completeExceptionally(unwrapCompletionException(err2)); 2852 } else if (regionState == CompactionState.MAJOR_AND_MINOR) { 2853 future.complete(regionState); 2854 } else { 2855 regionStates.add(regionState); 2856 } 2857 })); 2858 }); 2859 addListener( 2860 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 2861 (ret, err3) -> { 2862 // If future not completed, check all regions's compaction state 2863 if (!future.isCompletedExceptionally() && !future.isDone()) { 2864 CompactionState state = CompactionState.NONE; 2865 for (CompactionState regionState : regionStates) { 2866 switch (regionState) { 2867 case MAJOR: 2868 if (state == CompactionState.MINOR) { 2869 future.complete(CompactionState.MAJOR_AND_MINOR); 2870 } else { 2871 state = CompactionState.MAJOR; 2872 } 2873 break; 2874 case MINOR: 2875 if (state == CompactionState.MAJOR) { 2876 future.complete(CompactionState.MAJOR_AND_MINOR); 2877 } else { 2878 state = CompactionState.MINOR; 2879 } 2880 break; 2881 case NONE: 2882 default: 2883 } 2884 if (!future.isDone()) { 2885 future.complete(state); 2886 } 2887 } 2888 } 2889 }); 2890 }); 2891 break; 2892 default: 2893 throw new IllegalArgumentException("Unknown compactType: " + compactType); 2894 } 2895 2896 return future; 2897 } 2898 2899 @Override 2900 public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { 2901 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 2902 addListener(getRegionLocation(regionName), (location, err) -> { 2903 if (err != null) { 2904 future.completeExceptionally(err); 2905 return; 2906 } 2907 ServerName serverName = location.getServerName(); 2908 if (serverName == null) { 2909 future 2910 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 2911 return; 2912 } 2913 addListener( 2914 this.<GetRegionInfoResponse> newAdminCaller() 2915 .action((controller, stub) -> this 2916 .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall( 2917 controller, stub, 2918 RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(), 2919 true), 2920 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 2921 .serverName(serverName).call(), 2922 (resp2, err2) -> { 2923 if (err2 != null) { 2924 future.completeExceptionally(err2); 2925 } else { 2926 if (resp2.hasCompactionState()) { 2927 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 2928 } else { 2929 future.complete(CompactionState.NONE); 2930 } 2931 } 2932 }); 2933 }); 2934 return future; 2935 } 2936 2937 @Override 2938 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) { 2939 MajorCompactionTimestampRequest request = 2940 MajorCompactionTimestampRequest.newBuilder() 2941 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); 2942 return this 2943 .<Optional<Long>> newMasterCaller() 2944 .action( 2945 (controller, stub) -> this 2946 .<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> call( 2947 controller, stub, request, 2948 (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done), 2949 ProtobufUtil::toOptionalTimestamp)).call(); 2950 } 2951 2952 @Override 2953 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion( 2954 byte[] regionName) { 2955 CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); 2956 // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first 2957 addListener(getRegionInfo(regionName), (region, err) -> { 2958 if (err != null) { 2959 future.completeExceptionally(err); 2960 return; 2961 } 2962 MajorCompactionTimestampForRegionRequest.Builder builder = 2963 MajorCompactionTimestampForRegionRequest.newBuilder(); 2964 builder.setRegion( 2965 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); 2966 addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this 2967 .<MajorCompactionTimestampForRegionRequest, 2968 MajorCompactionTimestampResponse, Optional<Long>> call( 2969 controller, stub, builder.build(), 2970 (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done), 2971 ProtobufUtil::toOptionalTimestamp)) 2972 .call(), (timestamp, err2) -> { 2973 if (err2 != null) { 2974 future.completeExceptionally(err2); 2975 } else { 2976 future.complete(timestamp); 2977 } 2978 }); 2979 }); 2980 return future; 2981 } 2982 2983 @Override 2984 public CompletableFuture<Boolean> balancerSwitch(final boolean on) { 2985 return this 2986 .<Boolean> newMasterCaller() 2987 .action( 2988 (controller, stub) -> this 2989 .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, 2990 stub, RequestConverter.buildSetBalancerRunningRequest(on, true), 2991 (s, c, req, done) -> s.setBalancerRunning(c, req, done), 2992 (resp) -> resp.getPrevBalanceValue())).call(); 2993 } 2994 2995 @Override 2996 public CompletableFuture<Boolean> balance(boolean forcible) { 2997 return this 2998 .<Boolean> newMasterCaller() 2999 .action( 3000 (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller, 3001 stub, RequestConverter.buildBalanceRequest(forcible), 3002 (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call(); 3003 } 3004 3005 @Override 3006 public CompletableFuture<Boolean> isBalancerEnabled() { 3007 return this 3008 .<Boolean> newMasterCaller() 3009 .action( 3010 (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call( 3011 controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), 3012 (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3013 .call(); 3014 } 3015 3016 @Override 3017 public CompletableFuture<Boolean> normalizerSwitch(boolean on) { 3018 return this 3019 .<Boolean> newMasterCaller() 3020 .action( 3021 (controller, stub) -> this 3022 .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call( 3023 controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c, 3024 req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp 3025 .getPrevNormalizerValue())).call(); 3026 } 3027 3028 @Override 3029 public CompletableFuture<Boolean> isNormalizerEnabled() { 3030 return this 3031 .<Boolean> newMasterCaller() 3032 .action( 3033 (controller, stub) -> this 3034 .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller, 3035 stub, RequestConverter.buildIsNormalizerEnabledRequest(), 3036 (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), 3037 (resp) -> resp.getEnabled())).call(); 3038 } 3039 3040 @Override 3041 public CompletableFuture<Boolean> normalize() { 3042 return this 3043 .<Boolean> newMasterCaller() 3044 .action( 3045 (controller, stub) -> this.<NormalizeRequest, NormalizeResponse, Boolean> call( 3046 controller, stub, RequestConverter.buildNormalizeRequest(), 3047 (s, c, req, done) -> s.normalize(c, req, done), (resp) -> resp.getNormalizerRan())) 3048 .call(); 3049 } 3050 3051 @Override 3052 public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) { 3053 return this 3054 .<Boolean> newMasterCaller() 3055 .action( 3056 (controller, stub) -> this 3057 .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call( 3058 controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s, 3059 c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp 3060 .getPrevValue())).call(); 3061 } 3062 3063 @Override 3064 public CompletableFuture<Boolean> isCleanerChoreEnabled() { 3065 return this 3066 .<Boolean> newMasterCaller() 3067 .action( 3068 (controller, stub) -> this 3069 .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call( 3070 controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req, 3071 done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue())) 3072 .call(); 3073 } 3074 3075 @Override 3076 public CompletableFuture<Boolean> runCleanerChore() { 3077 return this 3078 .<Boolean> newMasterCaller() 3079 .action( 3080 (controller, stub) -> this 3081 .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub, 3082 RequestConverter.buildRunCleanerChoreRequest(), 3083 (s, c, req, done) -> s.runCleanerChore(c, req, done), 3084 (resp) -> resp.getCleanerChoreRan())).call(); 3085 } 3086 3087 @Override 3088 public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) { 3089 return this 3090 .<Boolean> newMasterCaller() 3091 .action( 3092 (controller, stub) -> this 3093 .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call( 3094 controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s, 3095 c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp 3096 .getPrevValue())).call(); 3097 } 3098 3099 @Override 3100 public CompletableFuture<Boolean> isCatalogJanitorEnabled() { 3101 return this 3102 .<Boolean> newMasterCaller() 3103 .action( 3104 (controller, stub) -> this 3105 .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call( 3106 controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c, 3107 req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp 3108 .getValue())).call(); 3109 } 3110 3111 @Override 3112 public CompletableFuture<Integer> runCatalogJanitor() { 3113 return this 3114 .<Integer> newMasterCaller() 3115 .action( 3116 (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call( 3117 controller, stub, RequestConverter.buildCatalogScanRequest(), 3118 (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult())) 3119 .call(); 3120 } 3121 3122 @Override 3123 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3124 ServiceCaller<S, R> callable) { 3125 MasterCoprocessorRpcChannelImpl channel = 3126 new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller()); 3127 S stub = stubMaker.apply(channel); 3128 CompletableFuture<R> future = new CompletableFuture<>(); 3129 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3130 callable.call(stub, controller, resp -> { 3131 if (controller.failed()) { 3132 future.completeExceptionally(controller.getFailed()); 3133 } else { 3134 future.complete(resp); 3135 } 3136 }); 3137 return future; 3138 } 3139 3140 @Override 3141 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3142 ServiceCaller<S, R> callable, ServerName serverName) { 3143 RegionServerCoprocessorRpcChannelImpl channel = 3144 new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName( 3145 serverName)); 3146 S stub = stubMaker.apply(channel); 3147 CompletableFuture<R> future = new CompletableFuture<>(); 3148 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3149 callable.call(stub, controller, resp -> { 3150 if (controller.failed()) { 3151 future.completeExceptionally(controller.getFailed()); 3152 } else { 3153 future.complete(resp); 3154 } 3155 }); 3156 return future; 3157 } 3158 3159 @Override 3160 public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) { 3161 return this.<List<ServerName>> newMasterCaller() 3162 .action((controller, stub) -> this 3163 .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call( 3164 controller, stub, RequestConverter.buildClearDeadServersRequest(servers), 3165 (s, c, req, done) -> s.clearDeadServers(c, req, done), 3166 (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList()))) 3167 .call(); 3168 } 3169 3170 private <T> ServerRequestCallerBuilder<T> newServerCaller() { 3171 return this.connection.callerFactory.<T> serverRequest() 3172 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 3173 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 3174 .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) 3175 .startLogErrorsCnt(startLogErrorsCnt); 3176 } 3177 3178 @Override 3179 public CompletableFuture<Void> enableTableReplication(TableName tableName) { 3180 if (tableName == null) { 3181 return failedFuture(new IllegalArgumentException("Table name is null")); 3182 } 3183 CompletableFuture<Void> future = new CompletableFuture<>(); 3184 addListener(tableExists(tableName), (exist, err) -> { 3185 if (err != null) { 3186 future.completeExceptionally(err); 3187 return; 3188 } 3189 if (!exist) { 3190 future.completeExceptionally(new TableNotFoundException( 3191 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3192 return; 3193 } 3194 addListener(getTableSplits(tableName), (splits, err1) -> { 3195 if (err1 != null) { 3196 future.completeExceptionally(err1); 3197 } else { 3198 addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> { 3199 if (err2 != null) { 3200 future.completeExceptionally(err2); 3201 } else { 3202 addListener(setTableReplication(tableName, true), (result3, err3) -> { 3203 if (err3 != null) { 3204 future.completeExceptionally(err3); 3205 } else { 3206 future.complete(result3); 3207 } 3208 }); 3209 } 3210 }); 3211 } 3212 }); 3213 }); 3214 return future; 3215 } 3216 3217 @Override 3218 public CompletableFuture<Void> disableTableReplication(TableName tableName) { 3219 if (tableName == null) { 3220 return failedFuture(new IllegalArgumentException("Table name is null")); 3221 } 3222 CompletableFuture<Void> future = new CompletableFuture<>(); 3223 addListener(tableExists(tableName), (exist, err) -> { 3224 if (err != null) { 3225 future.completeExceptionally(err); 3226 return; 3227 } 3228 if (!exist) { 3229 future.completeExceptionally(new TableNotFoundException( 3230 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3231 return; 3232 } 3233 addListener(setTableReplication(tableName, false), (result, err2) -> { 3234 if (err2 != null) { 3235 future.completeExceptionally(err2); 3236 } else { 3237 future.complete(result); 3238 } 3239 }); 3240 }); 3241 return future; 3242 } 3243 3244 private CompletableFuture<byte[][]> getTableSplits(TableName tableName) { 3245 CompletableFuture<byte[][]> future = new CompletableFuture<>(); 3246 addListener(getRegions(tableName), (regions, err2) -> { 3247 if (err2 != null) { 3248 future.completeExceptionally(err2); 3249 return; 3250 } 3251 if (regions.size() == 1) { 3252 future.complete(null); 3253 } else { 3254 byte[][] splits = new byte[regions.size() - 1][]; 3255 for (int i = 1; i < regions.size(); i++) { 3256 splits[i - 1] = regions.get(i).getStartKey(); 3257 } 3258 future.complete(splits); 3259 } 3260 }); 3261 return future; 3262 } 3263 3264 /** 3265 * Connect to peer and check the table descriptor on peer: 3266 * <ol> 3267 * <li>Create the same table on peer when not exist.</li> 3268 * <li>Throw an exception if the table already has replication enabled on any of the column 3269 * families.</li> 3270 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> 3271 * </ol> 3272 * @param tableName name of the table to sync to the peer 3273 * @param splits table split keys 3274 */ 3275 private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName, 3276 byte[][] splits) { 3277 CompletableFuture<Void> future = new CompletableFuture<>(); 3278 addListener(listReplicationPeers(), (peers, err) -> { 3279 if (err != null) { 3280 future.completeExceptionally(err); 3281 return; 3282 } 3283 if (peers == null || peers.size() <= 0) { 3284 future.completeExceptionally( 3285 new IllegalArgumentException("Found no peer cluster for replication.")); 3286 return; 3287 } 3288 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3289 peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName)) 3290 .forEach(peer -> { 3291 futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); 3292 }); 3293 addListener( 3294 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3295 (result, err2) -> { 3296 if (err2 != null) { 3297 future.completeExceptionally(err2); 3298 } else { 3299 future.complete(result); 3300 } 3301 }); 3302 }); 3303 return future; 3304 } 3305 3306 private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits, 3307 ReplicationPeerDescription peer) { 3308 Configuration peerConf = null; 3309 try { 3310 peerConf = 3311 ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer); 3312 } catch (IOException e) { 3313 return failedFuture(e); 3314 } 3315 CompletableFuture<Void> future = new CompletableFuture<>(); 3316 addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> { 3317 if (err != null) { 3318 future.completeExceptionally(err); 3319 return; 3320 } 3321 addListener(getDescriptor(tableName), (tableDesc, err1) -> { 3322 if (err1 != null) { 3323 future.completeExceptionally(err1); 3324 return; 3325 } 3326 AsyncAdmin peerAdmin = conn.getAdmin(); 3327 addListener(peerAdmin.tableExists(tableName), (exist, err2) -> { 3328 if (err2 != null) { 3329 future.completeExceptionally(err2); 3330 return; 3331 } 3332 if (!exist) { 3333 CompletableFuture<Void> createTableFuture = null; 3334 if (splits == null) { 3335 createTableFuture = peerAdmin.createTable(tableDesc); 3336 } else { 3337 createTableFuture = peerAdmin.createTable(tableDesc, splits); 3338 } 3339 addListener(createTableFuture, (result, err3) -> { 3340 if (err3 != null) { 3341 future.completeExceptionally(err3); 3342 } else { 3343 future.complete(result); 3344 } 3345 }); 3346 } else { 3347 addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin), 3348 (result, err4) -> { 3349 if (err4 != null) { 3350 future.completeExceptionally(err4); 3351 } else { 3352 future.complete(result); 3353 } 3354 }); 3355 } 3356 }); 3357 }); 3358 }); 3359 return future; 3360 } 3361 3362 private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName, 3363 TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { 3364 CompletableFuture<Void> future = new CompletableFuture<>(); 3365 addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> { 3366 if (err != null) { 3367 future.completeExceptionally(err); 3368 return; 3369 } 3370 if (peerTableDesc == null) { 3371 future.completeExceptionally( 3372 new IllegalArgumentException("Failed to get table descriptor for table " + 3373 tableName.getNameAsString() + " from peer cluster " + peer.getPeerId())); 3374 return; 3375 } 3376 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) { 3377 future.completeExceptionally(new IllegalArgumentException( 3378 "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() + 3379 ", but the table descriptors are not same when compared with source cluster." + 3380 " Thus can not enable the table's replication switch.")); 3381 return; 3382 } 3383 future.complete(null); 3384 }); 3385 return future; 3386 } 3387 3388 /** 3389 * Set the table's replication switch if the table's replication switch is already not set. 3390 * @param tableName name of the table 3391 * @param enableRep is replication switch enable or disable 3392 */ 3393 private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) { 3394 CompletableFuture<Void> future = new CompletableFuture<>(); 3395 addListener(getDescriptor(tableName), (tableDesc, err) -> { 3396 if (err != null) { 3397 future.completeExceptionally(err); 3398 return; 3399 } 3400 if (!tableDesc.matchReplicationScope(enableRep)) { 3401 int scope = 3402 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; 3403 TableDescriptor newTableDesc = 3404 TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build(); 3405 addListener(modifyTable(newTableDesc), (result, err2) -> { 3406 if (err2 != null) { 3407 future.completeExceptionally(err2); 3408 } else { 3409 future.complete(result); 3410 } 3411 }); 3412 } else { 3413 future.complete(null); 3414 } 3415 }); 3416 return future; 3417 } 3418 3419 @Override 3420 public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) { 3421 CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>(); 3422 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3423 if (err != null) { 3424 future.completeExceptionally(err); 3425 return; 3426 } 3427 Map<ServerName, List<RegionInfo>> regionInfoByServerName = 3428 locations.stream().filter(l -> l.getRegion() != null) 3429 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 3430 .collect(Collectors.groupingBy(l -> l.getServerName(), 3431 Collectors.mapping(l -> l.getRegion(), Collectors.toList()))); 3432 List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>(); 3433 CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator(); 3434 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) { 3435 futures 3436 .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> { 3437 if (err2 != null) { 3438 future.completeExceptionally(unwrapCompletionException(err2)); 3439 } else { 3440 aggregator.append(stats); 3441 } 3442 })); 3443 } 3444 addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 3445 (ret, err3) -> { 3446 if (err3 != null) { 3447 future.completeExceptionally(unwrapCompletionException(err3)); 3448 } else { 3449 future.complete(aggregator.sum()); 3450 } 3451 }); 3452 }); 3453 return future; 3454 } 3455 3456 private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName, 3457 List<RegionInfo> hris) { 3458 return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this 3459 .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall( 3460 controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris), 3461 (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done), 3462 resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats()))) 3463 .serverName(serverName).call(); 3464 } 3465}