001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 021import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 022import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 023 024import com.google.protobuf.Message; 025import com.google.protobuf.RpcChannel; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.EnumSet; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.Optional; 035import java.util.Set; 036import java.util.concurrent.CompletableFuture; 037import java.util.concurrent.ConcurrentLinkedQueue; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicReference; 040import java.util.function.BiConsumer; 041import java.util.function.Function; 042import java.util.function.Supplier; 043import java.util.regex.Pattern; 044import java.util.stream.Collectors; 045import java.util.stream.Stream; 046import org.apache.commons.io.IOUtils; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.hbase.AsyncMetaTableAccessor; 049import org.apache.hadoop.hbase.CacheEvictionStats; 050import org.apache.hadoop.hbase.CacheEvictionStatsAggregator; 051import org.apache.hadoop.hbase.ClusterMetrics; 052import org.apache.hadoop.hbase.ClusterMetrics.Option; 053import org.apache.hadoop.hbase.ClusterMetricsBuilder; 054import org.apache.hadoop.hbase.HConstants; 055import org.apache.hadoop.hbase.HRegionLocation; 056import org.apache.hadoop.hbase.MetaTableAccessor; 057import org.apache.hadoop.hbase.MetaTableAccessor.QueryType; 058import org.apache.hadoop.hbase.NamespaceDescriptor; 059import org.apache.hadoop.hbase.RegionLocations; 060import org.apache.hadoop.hbase.RegionMetrics; 061import org.apache.hadoop.hbase.RegionMetricsBuilder; 062import org.apache.hadoop.hbase.ServerName; 063import org.apache.hadoop.hbase.TableExistsException; 064import org.apache.hadoop.hbase.TableName; 065import org.apache.hadoop.hbase.TableNotDisabledException; 066import org.apache.hadoop.hbase.TableNotEnabledException; 067import org.apache.hadoop.hbase.TableNotFoundException; 068import org.apache.hadoop.hbase.UnknownRegionException; 069import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder; 070import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; 071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; 072import org.apache.hadoop.hbase.client.Scan.ReadType; 073import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 074import org.apache.hadoop.hbase.client.replication.TableCFs; 075import org.apache.hadoop.hbase.client.security.SecurityCapability; 076import org.apache.hadoop.hbase.exceptions.DeserializationException; 077import org.apache.hadoop.hbase.ipc.HBaseRpcController; 078import org.apache.hadoop.hbase.quotas.QuotaFilter; 079import org.apache.hadoop.hbase.quotas.QuotaSettings; 080import org.apache.hadoop.hbase.quotas.QuotaTableUtil; 081import org.apache.hadoop.hbase.replication.ReplicationException; 082import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 083import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 084import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 085import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 086import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 087import org.apache.hadoop.hbase.util.Bytes; 088import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 089import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 090import org.apache.yetus.audience.InterfaceAudience; 091import org.slf4j.Logger; 092import org.slf4j.LoggerFactory; 093 094import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 095import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 096import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 097import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 098import org.apache.hbase.thirdparty.io.netty.util.Timeout; 099import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 100 101import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 102import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 107import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 108import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 109import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 110import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 111import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 112import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 117import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 261import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 262import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; 264 265/** 266 * The implementation of AsyncAdmin. 267 * <p> 268 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 269 * be finished inside the rpc framework thread, which means that the callbacks registered to the 270 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 271 * this class should not try to do time consuming tasks in the callbacks. 272 * @since 2.0.0 273 * @see AsyncHBaseAdmin 274 * @see AsyncConnection#getAdmin() 275 * @see AsyncConnection#getAdminBuilder() 276 */ 277@InterfaceAudience.Private 278class RawAsyncHBaseAdmin implements AsyncAdmin { 279 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; 280 281 private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class); 282 283 private final AsyncConnectionImpl connection; 284 285 private final HashedWheelTimer retryTimer; 286 287 private final AsyncTable<AdvancedScanResultConsumer> metaTable; 288 289 private final long rpcTimeoutNs; 290 291 private final long operationTimeoutNs; 292 293 private final long pauseNs; 294 295 private final int maxAttempts; 296 297 private final int startLogErrorsCnt; 298 299 private final NonceGenerator ng; 300 301 RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer, 302 AsyncAdminBuilderBase builder) { 303 this.connection = connection; 304 this.retryTimer = retryTimer; 305 this.metaTable = connection.getTable(META_TABLE_NAME); 306 this.rpcTimeoutNs = builder.rpcTimeoutNs; 307 this.operationTimeoutNs = builder.operationTimeoutNs; 308 this.pauseNs = builder.pauseNs; 309 this.maxAttempts = builder.maxAttempts; 310 this.startLogErrorsCnt = builder.startLogErrorsCnt; 311 this.ng = connection.getNonceGenerator(); 312 } 313 314 private <T> MasterRequestCallerBuilder<T> newMasterCaller() { 315 return this.connection.callerFactory.<T> masterRequest() 316 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 317 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 318 .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) 319 .startLogErrorsCnt(startLogErrorsCnt); 320 } 321 322 private <T> AdminRequestCallerBuilder<T> newAdminCaller() { 323 return this.connection.callerFactory.<T> adminRequest() 324 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 325 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 326 .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) 327 .startLogErrorsCnt(startLogErrorsCnt); 328 } 329 330 @FunctionalInterface 331 private interface MasterRpcCall<RESP, REQ> { 332 void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, 333 RpcCallback<RESP> done); 334 } 335 336 @FunctionalInterface 337 private interface AdminRpcCall<RESP, REQ> { 338 void call(AdminService.Interface stub, HBaseRpcController controller, REQ req, 339 RpcCallback<RESP> done); 340 } 341 342 @FunctionalInterface 343 private interface Converter<D, S> { 344 D convert(S src) throws IOException; 345 } 346 347 private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 348 MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, 349 Converter<RESP, PRESP> respConverter) { 350 CompletableFuture<RESP> future = new CompletableFuture<>(); 351 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 352 353 @Override 354 public void run(PRESP resp) { 355 if (controller.failed()) { 356 future.completeExceptionally(controller.getFailed()); 357 } else { 358 try { 359 future.complete(respConverter.convert(resp)); 360 } catch (IOException e) { 361 future.completeExceptionally(e); 362 } 363 } 364 } 365 }); 366 return future; 367 } 368 369 private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller, 370 AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall, 371 Converter<RESP, PRESP> respConverter) { 372 373 CompletableFuture<RESP> future = new CompletableFuture<>(); 374 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() { 375 376 @Override 377 public void run(PRESP resp) { 378 if (controller.failed()) { 379 future.completeExceptionally(controller.getFailed()); 380 } else { 381 try { 382 future.complete(respConverter.convert(resp)); 383 } catch (IOException e) { 384 future.completeExceptionally(e); 385 } 386 } 387 } 388 }); 389 return future; 390 } 391 392 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, 393 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, 394 ProcedureBiConsumer consumer) { 395 CompletableFuture<Long> procFuture = 396 this.<Long> newMasterCaller().action((controller, stub) -> this 397 .<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)).call(); 398 CompletableFuture<Void> future = waitProcedureResult(procFuture); 399 addListener(future, consumer); 400 return future; 401 } 402 403 @FunctionalInterface 404 private interface TableOperator { 405 CompletableFuture<Void> operate(TableName table); 406 } 407 408 @Override 409 public CompletableFuture<Boolean> tableExists(TableName tableName) { 410 if (TableName.isMetaTableName(tableName)) { 411 return CompletableFuture.completedFuture(true); 412 } 413 return AsyncMetaTableAccessor.tableExists(metaTable, tableName); 414 } 415 416 @Override 417 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) { 418 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null, 419 includeSysTables)); 420 } 421 422 /** 423 * {@link #listTableDescriptors(boolean)} 424 */ 425 @Override 426 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern, 427 boolean includeSysTables) { 428 Preconditions.checkNotNull(pattern, 429 "pattern is null. If you don't specify a pattern, use listTables(boolean) instead"); 430 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern, 431 includeSysTables)); 432 } 433 434 private CompletableFuture<List<TableDescriptor>> 435 getTableDescriptors(GetTableDescriptorsRequest request) { 436 return this.<List<TableDescriptor>> newMasterCaller() 437 .action((controller, stub) -> this 438 .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call( 439 controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done), 440 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 441 .call(); 442 } 443 444 @Override 445 public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) { 446 return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables)); 447 } 448 449 @Override 450 public CompletableFuture<List<TableName>> 451 listTableNames(Pattern pattern, boolean includeSysTables) { 452 Preconditions.checkNotNull(pattern, 453 "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead"); 454 return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables)); 455 } 456 457 private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) { 458 return this 459 .<List<TableName>> newMasterCaller() 460 .action( 461 (controller, stub) -> this 462 .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller, 463 stub, request, (s, c, req, done) -> s.getTableNames(c, req, done), 464 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call(); 465 } 466 467 @Override 468 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) { 469 return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this 470 .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse, 471 List<TableDescriptor>> call( 472 controller, stub, 473 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 474 (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done), 475 (resp) -> ProtobufUtil.toTableDescriptorList(resp))) 476 .call(); 477 } 478 479 @Override 480 public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) { 481 return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this 482 .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse, 483 List<TableName>> call( 484 controller, stub, 485 ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(), 486 (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done), 487 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList()))) 488 .call(); 489 } 490 491 @Override 492 public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) { 493 CompletableFuture<TableDescriptor> future = new CompletableFuture<>(); 494 addListener(this.<List<TableSchema>> newMasterCaller() 495 .action((controller, stub) -> this 496 .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call( 497 controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), 498 (s, c, req, done) -> s.getTableDescriptors(c, req, done), 499 (resp) -> resp.getTableSchemaList())) 500 .call(), (tableSchemas, error) -> { 501 if (error != null) { 502 future.completeExceptionally(error); 503 return; 504 } 505 if (!tableSchemas.isEmpty()) { 506 future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0))); 507 } else { 508 future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); 509 } 510 }); 511 return future; 512 } 513 514 @Override 515 public CompletableFuture<Void> createTable(TableDescriptor desc) { 516 return createTable(desc.getTableName(), 517 RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce())); 518 } 519 520 @Override 521 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, 522 int numRegions) { 523 try { 524 return createTable(desc, getSplitKeys(startKey, endKey, numRegions)); 525 } catch (IllegalArgumentException e) { 526 return failedFuture(e); 527 } 528 } 529 530 @Override 531 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) { 532 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 533 + " use createTable(TableDescriptor) instead"); 534 try { 535 verifySplitKeys(splitKeys); 536 return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc, 537 splitKeys, ng.getNonceGroup(), ng.newNonce())); 538 } catch (IllegalArgumentException e) { 539 return failedFuture(e); 540 } 541 } 542 543 private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) { 544 Preconditions.checkNotNull(tableName, "table name is null"); 545 return this.<CreateTableRequest, CreateTableResponse> procedureCall(request, 546 (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), 547 new CreateTableProcedureBiConsumer(tableName)); 548 } 549 550 @Override 551 public CompletableFuture<Void> modifyTable(TableDescriptor desc) { 552 return this.<ModifyTableRequest, ModifyTableResponse> procedureCall( 553 RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), 554 ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done), 555 (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName())); 556 } 557 558 @Override 559 public CompletableFuture<Void> deleteTable(TableName tableName) { 560 return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter 561 .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 562 (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), 563 new DeleteTableProcedureBiConsumer(tableName)); 564 } 565 566 @Override 567 public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) { 568 return this.<TruncateTableRequest, TruncateTableResponse> procedureCall( 569 RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), 570 ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done), 571 (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName)); 572 } 573 574 @Override 575 public CompletableFuture<Void> enableTable(TableName tableName) { 576 return this.<EnableTableRequest, EnableTableResponse> procedureCall(RequestConverter 577 .buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 578 (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), 579 new EnableTableProcedureBiConsumer(tableName)); 580 } 581 582 @Override 583 public CompletableFuture<Void> disableTable(TableName tableName) { 584 return this.<DisableTableRequest, DisableTableResponse> procedureCall(RequestConverter 585 .buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), 586 (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), 587 new DisableTableProcedureBiConsumer(tableName)); 588 } 589 590 @Override 591 public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { 592 if (TableName.isMetaTableName(tableName)) { 593 return CompletableFuture.completedFuture(true); 594 } 595 CompletableFuture<Boolean> future = new CompletableFuture<>(); 596 addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> { 597 if (error != null) { 598 future.completeExceptionally(error); 599 return; 600 } 601 if (state.isPresent()) { 602 future.complete(state.get().inStates(TableState.State.ENABLED)); 603 } else { 604 future.completeExceptionally(new TableNotFoundException(tableName)); 605 } 606 }); 607 return future; 608 } 609 610 @Override 611 public CompletableFuture<Boolean> isTableDisabled(TableName tableName) { 612 if (TableName.isMetaTableName(tableName)) { 613 return CompletableFuture.completedFuture(false); 614 } 615 CompletableFuture<Boolean> future = new CompletableFuture<>(); 616 addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> { 617 if (error != null) { 618 future.completeExceptionally(error); 619 return; 620 } 621 if (state.isPresent()) { 622 future.complete(state.get().inStates(TableState.State.DISABLED)); 623 } else { 624 future.completeExceptionally(new TableNotFoundException(tableName)); 625 } 626 }); 627 return future; 628 } 629 630 @Override 631 public CompletableFuture<Boolean> isTableAvailable(TableName tableName) { 632 return isTableAvailable(tableName, Optional.empty()); 633 } 634 635 @Override 636 public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) { 637 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys," 638 + " use isTableAvailable(TableName) instead"); 639 return isTableAvailable(tableName, Optional.of(splitKeys)); 640 } 641 642 private CompletableFuture<Boolean> isTableAvailable(TableName tableName, 643 Optional<byte[][]> splitKeys) { 644 if (TableName.isMetaTableName(tableName)) { 645 return connection.registry.getMetaRegionLocation().thenApply(locs -> Stream 646 .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null)); 647 } 648 CompletableFuture<Boolean> future = new CompletableFuture<>(); 649 addListener(isTableEnabled(tableName), (enabled, error) -> { 650 if (error != null) { 651 if (error instanceof TableNotFoundException) { 652 future.complete(false); 653 } else { 654 future.completeExceptionally(error); 655 } 656 return; 657 } 658 if (!enabled) { 659 future.complete(false); 660 } else { 661 addListener( 662 AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName), 663 (locations, error1) -> { 664 if (error1 != null) { 665 future.completeExceptionally(error1); 666 return; 667 } 668 List<HRegionLocation> notDeployedRegions = locations.stream() 669 .filter(loc -> loc.getServerName() == null).collect(Collectors.toList()); 670 if (notDeployedRegions.size() > 0) { 671 if (LOG.isDebugEnabled()) { 672 LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions"); 673 } 674 future.complete(false); 675 return; 676 } 677 678 Optional<Boolean> available = 679 splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys)); 680 future.complete(available.orElse(true)); 681 }); 682 } 683 }); 684 return future; 685 } 686 687 private boolean compareRegionsWithSplitKeys(List<HRegionLocation> locations, byte[][] splitKeys) { 688 int regionCount = 0; 689 for (HRegionLocation location : locations) { 690 RegionInfo info = location.getRegion(); 691 if (Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { 692 regionCount++; 693 continue; 694 } 695 for (byte[] splitKey : splitKeys) { 696 // Just check if the splitkey is available 697 if (Bytes.equals(info.getStartKey(), splitKey)) { 698 regionCount++; 699 break; 700 } 701 } 702 } 703 return regionCount == splitKeys.length + 1; 704 } 705 706 @Override 707 public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) { 708 return this.<AddColumnRequest, AddColumnResponse> procedureCall( 709 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 710 ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), 711 new AddColumnFamilyProcedureBiConsumer(tableName)); 712 } 713 714 @Override 715 public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) { 716 return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall( 717 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 718 ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done), 719 (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName)); 720 } 721 722 @Override 723 public CompletableFuture<Void> modifyColumnFamily(TableName tableName, 724 ColumnFamilyDescriptor columnFamily) { 725 return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall( 726 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), 727 ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done), 728 (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName)); 729 } 730 731 @Override 732 public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) { 733 return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall( 734 RequestConverter.buildCreateNamespaceRequest(descriptor), 735 (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(), 736 new CreateNamespaceProcedureBiConsumer(descriptor.getName())); 737 } 738 739 @Override 740 public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) { 741 return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall( 742 RequestConverter.buildModifyNamespaceRequest(descriptor), 743 (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(), 744 new ModifyNamespaceProcedureBiConsumer(descriptor.getName())); 745 } 746 747 @Override 748 public CompletableFuture<Void> deleteNamespace(String name) { 749 return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall( 750 RequestConverter.buildDeleteNamespaceRequest(name), 751 (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(), 752 new DeleteNamespaceProcedureBiConsumer(name)); 753 } 754 755 @Override 756 public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) { 757 return this 758 .<NamespaceDescriptor> newMasterCaller() 759 .action( 760 (controller, stub) -> this 761 .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call( 762 controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c, 763 req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil 764 .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call(); 765 } 766 767 @Override 768 public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() { 769 return this 770 .<List<NamespaceDescriptor>> newMasterCaller() 771 .action( 772 (controller, stub) -> this 773 .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call( 774 controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, 775 done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil 776 .toNamespaceDescriptorList(resp))).call(); 777 } 778 779 @Override 780 public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) { 781 return this.<List<RegionInfo>> newAdminCaller() 782 .action((controller, stub) -> this 783 .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall( 784 controller, stub, RequestConverter.buildGetOnlineRegionRequest(), 785 (s, c, req, done) -> s.getOnlineRegion(c, req, done), 786 resp -> ProtobufUtil.getRegionInfos(resp))) 787 .serverName(serverName).call(); 788 } 789 790 @Override 791 public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) { 792 if (tableName.equals(META_TABLE_NAME)) { 793 return connection.getLocator().getRegionLocation(tableName, null, null, operationTimeoutNs) 794 .thenApply(loc -> Collections.singletonList(loc.getRegion())); 795 } else { 796 return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName) 797 .thenApply( 798 locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList())); 799 } 800 } 801 802 @Override 803 public CompletableFuture<Void> flush(TableName tableName) { 804 CompletableFuture<Void> future = new CompletableFuture<>(); 805 addListener(tableExists(tableName), (exists, err) -> { 806 if (err != null) { 807 future.completeExceptionally(err); 808 } else if (!exists) { 809 future.completeExceptionally(new TableNotFoundException(tableName)); 810 } else { 811 addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { 812 if (err2 != null) { 813 future.completeExceptionally(err2); 814 } else if (!tableEnabled) { 815 future.completeExceptionally(new TableNotEnabledException(tableName)); 816 } else { 817 addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), 818 new HashMap<>()), (ret, err3) -> { 819 if (err3 != null) { 820 future.completeExceptionally(err3); 821 } else { 822 future.complete(ret); 823 } 824 }); 825 } 826 }); 827 } 828 }); 829 return future; 830 } 831 832 @Override 833 public CompletableFuture<Void> flushRegion(byte[] regionName) { 834 CompletableFuture<Void> future = new CompletableFuture<>(); 835 addListener(getRegionLocation(regionName), (location, err) -> { 836 if (err != null) { 837 future.completeExceptionally(err); 838 return; 839 } 840 ServerName serverName = location.getServerName(); 841 if (serverName == null) { 842 future 843 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 844 return; 845 } 846 addListener(flush(serverName, location.getRegion()), (ret, err2) -> { 847 if (err2 != null) { 848 future.completeExceptionally(err2); 849 } else { 850 future.complete(ret); 851 } 852 }); 853 }); 854 return future; 855 } 856 857 private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo) { 858 return this.<Void> newAdminCaller() 859 .serverName(serverName) 860 .action( 861 (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall( 862 controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo 863 .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done), 864 resp -> null)) 865 .call(); 866 } 867 868 @Override 869 public CompletableFuture<Void> flushRegionServer(ServerName sn) { 870 CompletableFuture<Void> future = new CompletableFuture<>(); 871 addListener(getRegions(sn), (hRegionInfos, err) -> { 872 if (err != null) { 873 future.completeExceptionally(err); 874 return; 875 } 876 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 877 if (hRegionInfos != null) { 878 hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region))); 879 } 880 addListener(CompletableFuture.allOf( 881 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 882 if (err2 != null) { 883 future.completeExceptionally(err2); 884 } else { 885 future.complete(ret); 886 } 887 }); 888 }); 889 return future; 890 } 891 892 @Override 893 public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) { 894 return compact(tableName, null, false, compactType); 895 } 896 897 @Override 898 public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, 899 CompactType compactType) { 900 Preconditions.checkNotNull(columnFamily, "columnFamily is null. " 901 + "If you don't specify a columnFamily, use compact(TableName) instead"); 902 return compact(tableName, columnFamily, false, compactType); 903 } 904 905 @Override 906 public CompletableFuture<Void> compactRegion(byte[] regionName) { 907 return compactRegion(regionName, null, false); 908 } 909 910 @Override 911 public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) { 912 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 913 + " If you don't specify a columnFamily, use compactRegion(regionName) instead"); 914 return compactRegion(regionName, columnFamily, false); 915 } 916 917 @Override 918 public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) { 919 return compact(tableName, null, true, compactType); 920 } 921 922 @Override 923 public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily, 924 CompactType compactType) { 925 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 926 + "If you don't specify a columnFamily, use compact(TableName) instead"); 927 return compact(tableName, columnFamily, true, compactType); 928 } 929 930 @Override 931 public CompletableFuture<Void> majorCompactRegion(byte[] regionName) { 932 return compactRegion(regionName, null, true); 933 } 934 935 @Override 936 public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) { 937 Preconditions.checkNotNull(columnFamily, "columnFamily is null." 938 + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead"); 939 return compactRegion(regionName, columnFamily, true); 940 } 941 942 @Override 943 public CompletableFuture<Void> compactRegionServer(ServerName sn) { 944 return compactRegionServer(sn, false); 945 } 946 947 @Override 948 public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) { 949 return compactRegionServer(sn, true); 950 } 951 952 private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) { 953 CompletableFuture<Void> future = new CompletableFuture<>(); 954 addListener(getRegions(sn), (hRegionInfos, err) -> { 955 if (err != null) { 956 future.completeExceptionally(err); 957 return; 958 } 959 List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); 960 if (hRegionInfos != null) { 961 hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); 962 } 963 addListener(CompletableFuture.allOf( 964 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { 965 if (err2 != null) { 966 future.completeExceptionally(err2); 967 } else { 968 future.complete(ret); 969 } 970 }); 971 }); 972 return future; 973 } 974 975 private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, 976 boolean major) { 977 CompletableFuture<Void> future = new CompletableFuture<>(); 978 addListener(getRegionLocation(regionName), (location, err) -> { 979 if (err != null) { 980 future.completeExceptionally(err); 981 return; 982 } 983 ServerName serverName = location.getServerName(); 984 if (serverName == null) { 985 future 986 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 987 return; 988 } 989 addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily), 990 (ret, err2) -> { 991 if (err2 != null) { 992 future.completeExceptionally(err2); 993 } else { 994 future.complete(ret); 995 } 996 }); 997 }); 998 return future; 999 } 1000 1001 /** 1002 * List all region locations for the specific table. 1003 */ 1004 private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) { 1005 if (TableName.META_TABLE_NAME.equals(tableName)) { 1006 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); 1007 // For meta table, we use zk to fetch all locations. 1008 AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration()); 1009 addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> { 1010 if (err != null) { 1011 future.completeExceptionally(err); 1012 } else if (metaRegions == null || metaRegions.isEmpty() || 1013 metaRegions.getDefaultRegionLocation() == null) { 1014 future.completeExceptionally(new IOException("meta region does not found")); 1015 } else { 1016 future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); 1017 } 1018 // close the registry. 1019 IOUtils.closeQuietly(registry); 1020 }); 1021 return future; 1022 } else { 1023 // For non-meta table, we fetch all locations by scanning hbase:meta table 1024 return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName); 1025 } 1026 } 1027 1028 /** 1029 * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() 1030 */ 1031 private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, 1032 CompactType compactType) { 1033 CompletableFuture<Void> future = new CompletableFuture<>(); 1034 1035 switch (compactType) { 1036 case MOB: 1037 addListener(connection.registry.getMasterAddress(), (serverName, err) -> { 1038 if (err != null) { 1039 future.completeExceptionally(err); 1040 return; 1041 } 1042 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 1043 addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> { 1044 if (err2 != null) { 1045 future.completeExceptionally(err2); 1046 } else { 1047 future.complete(ret); 1048 } 1049 }); 1050 }); 1051 break; 1052 case NORMAL: 1053 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 1054 if (err != null) { 1055 future.completeExceptionally(err); 1056 return; 1057 } 1058 if (locations == null || locations.isEmpty()) { 1059 future.completeExceptionally(new TableNotFoundException(tableName)); 1060 } 1061 CompletableFuture<?>[] compactFutures = 1062 locations.stream().filter(l -> l.getRegion() != null) 1063 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 1064 .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) 1065 .toArray(CompletableFuture<?>[]::new); 1066 // future complete unless all of the compact futures are completed. 1067 addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> { 1068 if (err2 != null) { 1069 future.completeExceptionally(err2); 1070 } else { 1071 future.complete(ret); 1072 } 1073 }); 1074 }); 1075 break; 1076 default: 1077 throw new IllegalArgumentException("Unknown compactType: " + compactType); 1078 } 1079 return future; 1080 } 1081 1082 /** 1083 * Compact the region at specific region server. 1084 */ 1085 private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri, 1086 final boolean major, byte[] columnFamily) { 1087 return this 1088 .<Void> newAdminCaller() 1089 .serverName(sn) 1090 .action( 1091 (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall( 1092 controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(), 1093 major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done), 1094 resp -> null)).call(); 1095 } 1096 1097 private byte[] toEncodeRegionName(byte[] regionName) { 1098 try { 1099 return RegionInfo.isEncodedRegionName(regionName) ? regionName 1100 : Bytes.toBytes(RegionInfo.encodeRegionName(regionName)); 1101 } catch (IOException e) { 1102 return regionName; 1103 } 1104 } 1105 1106 private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, 1107 CompletableFuture<TableName> result) { 1108 addListener(getRegionLocation(encodeRegionName), (location, err) -> { 1109 if (err != null) { 1110 result.completeExceptionally(err); 1111 return; 1112 } 1113 RegionInfo regionInfo = location.getRegion(); 1114 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1115 result.completeExceptionally( 1116 new IllegalArgumentException("Can't invoke merge on non-default regions directly")); 1117 return; 1118 } 1119 if (!tableName.compareAndSet(null, regionInfo.getTable())) { 1120 if (!tableName.get().equals(regionInfo.getTable())) { 1121 // tables of this two region should be same. 1122 result.completeExceptionally( 1123 new IllegalArgumentException("Cannot merge regions from two different tables " + 1124 tableName.get() + " and " + regionInfo.getTable())); 1125 } else { 1126 result.complete(tableName.get()); 1127 } 1128 } 1129 }); 1130 } 1131 1132 private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA, 1133 byte[] encodeRegionNameB) { 1134 AtomicReference<TableName> tableNameRef = new AtomicReference<>(); 1135 CompletableFuture<TableName> future = new CompletableFuture<>(); 1136 1137 checkAndGetTableName(encodeRegionNameA, tableNameRef, future); 1138 checkAndGetTableName(encodeRegionNameB, tableNameRef, future); 1139 return future; 1140 } 1141 1142 @Override 1143 public CompletableFuture<Boolean> mergeSwitch(boolean on) { 1144 return setSplitOrMergeOn(on, MasterSwitchType.MERGE); 1145 } 1146 1147 @Override 1148 public CompletableFuture<Boolean> isMergeEnabled() { 1149 return isSplitOrMergeOn(MasterSwitchType.MERGE); 1150 } 1151 1152 @Override 1153 public CompletableFuture<Boolean> splitSwitch(boolean on) { 1154 return setSplitOrMergeOn(on, MasterSwitchType.SPLIT); 1155 } 1156 1157 @Override 1158 public CompletableFuture<Boolean> isSplitEnabled() { 1159 return isSplitOrMergeOn(MasterSwitchType.SPLIT); 1160 } 1161 1162 private CompletableFuture<Boolean> setSplitOrMergeOn(boolean on, MasterSwitchType switchType) { 1163 SetSplitOrMergeEnabledRequest request = 1164 RequestConverter.buildSetSplitOrMergeEnabledRequest(on, false, switchType); 1165 return this 1166 .<Boolean> newMasterCaller() 1167 .action( 1168 (controller, stub) -> this 1169 .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call( 1170 controller, stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, 1171 done), (resp) -> resp.getPrevValueList().get(0))).call(); 1172 } 1173 1174 private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) { 1175 IsSplitOrMergeEnabledRequest request = 1176 RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType); 1177 return this 1178 .<Boolean> newMasterCaller() 1179 .action( 1180 (controller, stub) -> this 1181 .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call( 1182 controller, stub, request, 1183 (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), 1184 (resp) -> resp.getEnabled())).call(); 1185 } 1186 1187 @Override 1188 public CompletableFuture<Void> mergeRegions(byte[] nameOfRegionA, byte[] nameOfRegionB, 1189 boolean forcible) { 1190 CompletableFuture<Void> future = new CompletableFuture<>(); 1191 final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA); 1192 final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB); 1193 1194 addListener(checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB), 1195 (tableName, err) -> { 1196 if (err != null) { 1197 future.completeExceptionally(err); 1198 return; 1199 } 1200 1201 MergeTableRegionsRequest request = null; 1202 try { 1203 request = RequestConverter.buildMergeTableRegionsRequest( 1204 new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(), 1205 ng.newNonce()); 1206 } catch (DeserializationException e) { 1207 future.completeExceptionally(e); 1208 return; 1209 } 1210 1211 addListener( 1212 this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request, 1213 (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(), 1214 new MergeTableRegionProcedureBiConsumer(tableName)), 1215 (ret, err2) -> { 1216 if (err2 != null) { 1217 future.completeExceptionally(err2); 1218 } else { 1219 future.complete(ret); 1220 } 1221 }); 1222 }); 1223 return future; 1224 } 1225 1226 @Override 1227 public CompletableFuture<Void> split(TableName tableName) { 1228 CompletableFuture<Void> future = new CompletableFuture<>(); 1229 addListener(tableExists(tableName), (exist, error) -> { 1230 if (error != null) { 1231 future.completeExceptionally(error); 1232 return; 1233 } 1234 if (!exist) { 1235 future.completeExceptionally(new TableNotFoundException(tableName)); 1236 return; 1237 } 1238 addListener( 1239 metaTable 1240 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) 1241 .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION)) 1242 .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))), 1243 (results, err2) -> { 1244 if (err2 != null) { 1245 future.completeExceptionally(err2); 1246 return; 1247 } 1248 if (results != null && !results.isEmpty()) { 1249 List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); 1250 for (Result r : results) { 1251 if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) { 1252 continue; 1253 } 1254 RegionLocations rl = MetaTableAccessor.getRegionLocations(r); 1255 if (rl != null) { 1256 for (HRegionLocation h : rl.getRegionLocations()) { 1257 if (h != null && h.getServerName() != null) { 1258 RegionInfo hri = h.getRegion(); 1259 if (hri == null || hri.isSplitParent() || 1260 hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1261 continue; 1262 } 1263 splitFutures.add(split(hri, null)); 1264 } 1265 } 1266 } 1267 } 1268 addListener( 1269 CompletableFuture 1270 .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])), 1271 (ret, exception) -> { 1272 if (exception != null) { 1273 future.completeExceptionally(exception); 1274 return; 1275 } 1276 future.complete(ret); 1277 }); 1278 } else { 1279 future.complete(null); 1280 } 1281 }); 1282 }); 1283 return future; 1284 } 1285 1286 @Override 1287 public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) { 1288 CompletableFuture<Void> result = new CompletableFuture<>(); 1289 if (splitPoint == null) { 1290 return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); 1291 } 1292 addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true), 1293 (loc, err) -> { 1294 if (err != null) { 1295 result.completeExceptionally(err); 1296 } else if (loc == null || loc.getRegion() == null) { 1297 result.completeExceptionally(new IllegalArgumentException( 1298 "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); 1299 } else { 1300 addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> { 1301 if (err2 != null) { 1302 result.completeExceptionally(err2); 1303 } else { 1304 result.complete(ret); 1305 } 1306 1307 }); 1308 } 1309 }); 1310 return result; 1311 } 1312 1313 @Override 1314 public CompletableFuture<Void> splitRegion(byte[] regionName) { 1315 CompletableFuture<Void> future = new CompletableFuture<>(); 1316 addListener(getRegionLocation(regionName), (location, err) -> { 1317 if (err != null) { 1318 future.completeExceptionally(err); 1319 return; 1320 } 1321 RegionInfo regionInfo = location.getRegion(); 1322 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1323 future 1324 .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " + 1325 "Replicas are auto-split when their primary is split.")); 1326 return; 1327 } 1328 ServerName serverName = location.getServerName(); 1329 if (serverName == null) { 1330 future 1331 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1332 return; 1333 } 1334 addListener(split(regionInfo, null), (ret, err2) -> { 1335 if (err2 != null) { 1336 future.completeExceptionally(err2); 1337 } else { 1338 future.complete(ret); 1339 } 1340 }); 1341 }); 1342 return future; 1343 } 1344 1345 @Override 1346 public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) { 1347 Preconditions.checkNotNull(splitPoint, 1348 "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead"); 1349 CompletableFuture<Void> future = new CompletableFuture<>(); 1350 addListener(getRegionLocation(regionName), (location, err) -> { 1351 if (err != null) { 1352 future.completeExceptionally(err); 1353 return; 1354 } 1355 RegionInfo regionInfo = location.getRegion(); 1356 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 1357 future 1358 .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " + 1359 "Replicas are auto-split when their primary is split.")); 1360 return; 1361 } 1362 ServerName serverName = location.getServerName(); 1363 if (serverName == null) { 1364 future 1365 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 1366 return; 1367 } 1368 if (regionInfo.getStartKey() != null && 1369 Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) { 1370 future.completeExceptionally( 1371 new IllegalArgumentException("should not give a splitkey which equals to startkey!")); 1372 return; 1373 } 1374 addListener(split(regionInfo, splitPoint), (ret, err2) -> { 1375 if (err2 != null) { 1376 future.completeExceptionally(err2); 1377 } else { 1378 future.complete(ret); 1379 } 1380 }); 1381 }); 1382 return future; 1383 } 1384 1385 private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) { 1386 CompletableFuture<Void> future = new CompletableFuture<>(); 1387 TableName tableName = hri.getTable(); 1388 SplitTableRegionRequest request = null; 1389 try { 1390 request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(), 1391 ng.newNonce()); 1392 } catch (DeserializationException e) { 1393 future.completeExceptionally(e); 1394 return future; 1395 } 1396 1397 addListener(this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(request, 1398 (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(), 1399 new SplitTableRegionProcedureBiConsumer(tableName)), (ret, err2) -> { 1400 if (err2 != null) { 1401 future.completeExceptionally(err2); 1402 } else { 1403 future.complete(ret); 1404 } 1405 }); 1406 return future; 1407 } 1408 1409 @Override 1410 public CompletableFuture<Void> assign(byte[] regionName) { 1411 CompletableFuture<Void> future = new CompletableFuture<>(); 1412 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1413 if (err != null) { 1414 future.completeExceptionally(err); 1415 return; 1416 } 1417 addListener(this.<Void> newMasterCaller() 1418 .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( 1419 controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), 1420 (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))) 1421 .call(), (ret, err2) -> { 1422 if (err2 != null) { 1423 future.completeExceptionally(err2); 1424 } else { 1425 future.complete(ret); 1426 } 1427 }); 1428 }); 1429 return future; 1430 } 1431 1432 @Override 1433 public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) { 1434 CompletableFuture<Void> future = new CompletableFuture<>(); 1435 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1436 if (err != null) { 1437 future.completeExceptionally(err); 1438 return; 1439 } 1440 addListener( 1441 this.<Void> newMasterCaller() 1442 .action(((controller, stub) -> this 1443 .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub, 1444 RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible), 1445 (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))) 1446 .call(), 1447 (ret, err2) -> { 1448 if (err2 != null) { 1449 future.completeExceptionally(err2); 1450 } else { 1451 future.complete(ret); 1452 } 1453 }); 1454 }); 1455 return future; 1456 } 1457 1458 @Override 1459 public CompletableFuture<Void> offline(byte[] regionName) { 1460 CompletableFuture<Void> future = new CompletableFuture<>(); 1461 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1462 if (err != null) { 1463 future.completeExceptionally(err); 1464 return; 1465 } 1466 addListener( 1467 this.<Void> newMasterCaller() 1468 .action(((controller, stub) -> this 1469 .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub, 1470 RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), 1471 (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))) 1472 .call(), 1473 (ret, err2) -> { 1474 if (err2 != null) { 1475 future.completeExceptionally(err2); 1476 } else { 1477 future.complete(ret); 1478 } 1479 }); 1480 }); 1481 return future; 1482 } 1483 1484 @Override 1485 public CompletableFuture<Void> move(byte[] regionName) { 1486 CompletableFuture<Void> future = new CompletableFuture<>(); 1487 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1488 if (err != null) { 1489 future.completeExceptionally(err); 1490 return; 1491 } 1492 addListener( 1493 moveRegion( 1494 RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), 1495 (ret, err2) -> { 1496 if (err2 != null) { 1497 future.completeExceptionally(err2); 1498 } else { 1499 future.complete(ret); 1500 } 1501 }); 1502 }); 1503 return future; 1504 } 1505 1506 @Override 1507 public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) { 1508 Preconditions.checkNotNull(destServerName, 1509 "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead"); 1510 CompletableFuture<Void> future = new CompletableFuture<>(); 1511 addListener(getRegionInfo(regionName), (regionInfo, err) -> { 1512 if (err != null) { 1513 future.completeExceptionally(err); 1514 return; 1515 } 1516 addListener(moveRegion(RequestConverter 1517 .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), 1518 (ret, err2) -> { 1519 if (err2 != null) { 1520 future.completeExceptionally(err2); 1521 } else { 1522 future.complete(ret); 1523 } 1524 }); 1525 }); 1526 return future; 1527 } 1528 1529 private CompletableFuture<Void> moveRegion(MoveRegionRequest request) { 1530 return this 1531 .<Void> newMasterCaller() 1532 .action( 1533 (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller, 1534 stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)).call(); 1535 } 1536 1537 @Override 1538 public CompletableFuture<Void> setQuota(QuotaSettings quota) { 1539 return this 1540 .<Void> newMasterCaller() 1541 .action( 1542 (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller, 1543 stub, QuotaSettings.buildSetQuotaRequestProto(quota), 1544 (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call(); 1545 } 1546 1547 @Override 1548 public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) { 1549 CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>(); 1550 Scan scan = QuotaTableUtil.makeScan(filter); 1551 this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build() 1552 .scan(scan, new AdvancedScanResultConsumer() { 1553 List<QuotaSettings> settings = new ArrayList<>(); 1554 1555 @Override 1556 public void onNext(Result[] results, ScanController controller) { 1557 for (Result result : results) { 1558 try { 1559 QuotaTableUtil.parseResultToCollection(result, settings); 1560 } catch (IOException e) { 1561 controller.terminate(); 1562 future.completeExceptionally(e); 1563 } 1564 } 1565 } 1566 1567 @Override 1568 public void onError(Throwable error) { 1569 future.completeExceptionally(error); 1570 } 1571 1572 @Override 1573 public void onComplete() { 1574 future.complete(settings); 1575 } 1576 }); 1577 return future; 1578 } 1579 1580 @Override 1581 public CompletableFuture<Void> addReplicationPeer(String peerId, 1582 ReplicationPeerConfig peerConfig, boolean enabled) { 1583 return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall( 1584 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), 1585 (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1586 new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER")); 1587 } 1588 1589 @Override 1590 public CompletableFuture<Void> removeReplicationPeer(String peerId) { 1591 return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall( 1592 RequestConverter.buildRemoveReplicationPeerRequest(peerId), 1593 (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1594 new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER")); 1595 } 1596 1597 @Override 1598 public CompletableFuture<Void> enableReplicationPeer(String peerId) { 1599 return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall( 1600 RequestConverter.buildEnableReplicationPeerRequest(peerId), 1601 (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1602 new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER")); 1603 } 1604 1605 @Override 1606 public CompletableFuture<Void> disableReplicationPeer(String peerId) { 1607 return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall( 1608 RequestConverter.buildDisableReplicationPeerRequest(peerId), 1609 (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), 1610 new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER")); 1611 } 1612 1613 @Override 1614 public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) { 1615 return this 1616 .<ReplicationPeerConfig> newMasterCaller() 1617 .action( 1618 (controller, stub) -> this 1619 .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call( 1620 controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), ( 1621 s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), 1622 (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call(); 1623 } 1624 1625 @Override 1626 public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, 1627 ReplicationPeerConfig peerConfig) { 1628 return this 1629 .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall( 1630 RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig), 1631 (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), 1632 (resp) -> resp.getProcId(), 1633 new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); 1634 } 1635 1636 @Override 1637 public CompletableFuture<Void> appendReplicationPeerTableCFs(String id, 1638 Map<TableName, List<String>> tableCfs) { 1639 if (tableCfs == null) { 1640 return failedFuture(new ReplicationException("tableCfs is null")); 1641 } 1642 1643 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1644 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1645 if (!completeExceptionally(future, error)) { 1646 ReplicationPeerConfig newPeerConfig = 1647 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); 1648 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1649 if (!completeExceptionally(future, error)) { 1650 future.complete(result); 1651 } 1652 }); 1653 } 1654 }); 1655 return future; 1656 } 1657 1658 @Override 1659 public CompletableFuture<Void> removeReplicationPeerTableCFs(String id, 1660 Map<TableName, List<String>> tableCfs) { 1661 if (tableCfs == null) { 1662 return failedFuture(new ReplicationException("tableCfs is null")); 1663 } 1664 1665 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1666 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { 1667 if (!completeExceptionally(future, error)) { 1668 ReplicationPeerConfig newPeerConfig = null; 1669 try { 1670 newPeerConfig = ReplicationPeerConfigUtil 1671 .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); 1672 } catch (ReplicationException e) { 1673 future.completeExceptionally(e); 1674 return; 1675 } 1676 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { 1677 if (!completeExceptionally(future, error)) { 1678 future.complete(result); 1679 } 1680 }); 1681 } 1682 }); 1683 return future; 1684 } 1685 1686 @Override 1687 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() { 1688 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null)); 1689 } 1690 1691 @Override 1692 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) { 1693 Preconditions.checkNotNull(pattern, 1694 "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead"); 1695 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern)); 1696 } 1697 1698 private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers( 1699 ListReplicationPeersRequest request) { 1700 return this 1701 .<List<ReplicationPeerDescription>> newMasterCaller() 1702 .action( 1703 (controller, stub) -> this 1704 .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call( 1705 controller, 1706 stub, 1707 request, 1708 (s, c, req, done) -> s.listReplicationPeers(c, req, done), 1709 (resp) -> resp.getPeerDescList().stream() 1710 .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) 1711 .collect(Collectors.toList()))).call(); 1712 } 1713 1714 @Override 1715 public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { 1716 CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); 1717 addListener(listTableDescriptors(), (tables, error) -> { 1718 if (!completeExceptionally(future, error)) { 1719 List<TableCFs> replicatedTableCFs = new ArrayList<>(); 1720 tables.forEach(table -> { 1721 Map<String, Integer> cfs = new HashMap<>(); 1722 Stream.of(table.getColumnFamilies()) 1723 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) 1724 .forEach(column -> { 1725 cfs.put(column.getNameAsString(), column.getScope()); 1726 }); 1727 if (!cfs.isEmpty()) { 1728 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); 1729 } 1730 }); 1731 future.complete(replicatedTableCFs); 1732 } 1733 }); 1734 return future; 1735 } 1736 1737 @Override 1738 public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { 1739 SnapshotProtos.SnapshotDescription snapshot = 1740 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); 1741 try { 1742 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 1743 } catch (IllegalArgumentException e) { 1744 return failedFuture(e); 1745 } 1746 CompletableFuture<Void> future = new CompletableFuture<>(); 1747 final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build(); 1748 addListener(this.<Long> newMasterCaller() 1749 .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller, 1750 stub, request, (s, c, req, done) -> s.snapshot(c, req, done), 1751 resp -> resp.getExpectedTimeout())) 1752 .call(), (expectedTimeout, err) -> { 1753 if (err != null) { 1754 future.completeExceptionally(err); 1755 return; 1756 } 1757 TimerTask pollingTask = new TimerTask() { 1758 int tries = 0; 1759 long startTime = EnvironmentEdgeManager.currentTime(); 1760 long endTime = startTime + expectedTimeout; 1761 long maxPauseTime = expectedTimeout / maxAttempts; 1762 1763 @Override 1764 public void run(Timeout timeout) throws Exception { 1765 if (EnvironmentEdgeManager.currentTime() < endTime) { 1766 addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> { 1767 if (err2 != null) { 1768 future.completeExceptionally(err2); 1769 } else if (done) { 1770 future.complete(null); 1771 } else { 1772 // retry again after pauseTime. 1773 long pauseTime = 1774 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 1775 pauseTime = Math.min(pauseTime, maxPauseTime); 1776 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 1777 TimeUnit.MILLISECONDS); 1778 } 1779 }); 1780 } else { 1781 future.completeExceptionally( 1782 new SnapshotCreationException("Snapshot '" + snapshot.getName() + 1783 "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc)); 1784 } 1785 } 1786 }; 1787 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 1788 }); 1789 return future; 1790 } 1791 1792 @Override 1793 public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) { 1794 return this 1795 .<Boolean> newMasterCaller() 1796 .action( 1797 (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call( 1798 controller, 1799 stub, 1800 IsSnapshotDoneRequest.newBuilder() 1801 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c, 1802 req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call(); 1803 } 1804 1805 @Override 1806 public CompletableFuture<Void> restoreSnapshot(String snapshotName) { 1807 boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean( 1808 HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT, 1809 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT); 1810 return restoreSnapshot(snapshotName, takeFailSafeSnapshot); 1811 } 1812 1813 @Override 1814 public CompletableFuture<Void> restoreSnapshot(String snapshotName, 1815 boolean takeFailSafeSnapshot) { 1816 CompletableFuture<Void> future = new CompletableFuture<>(); 1817 addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> { 1818 if (err != null) { 1819 future.completeExceptionally(err); 1820 return; 1821 } 1822 TableName tableName = null; 1823 if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { 1824 for (SnapshotDescription snap : snapshotDescriptions) { 1825 if (snap.getName().equals(snapshotName)) { 1826 tableName = snap.getTableName(); 1827 break; 1828 } 1829 } 1830 } 1831 if (tableName == null) { 1832 future.completeExceptionally(new RestoreSnapshotException( 1833 "Unable to find the table name for snapshot=" + snapshotName)); 1834 return; 1835 } 1836 final TableName finalTableName = tableName; 1837 addListener(tableExists(finalTableName), (exists, err2) -> { 1838 if (err2 != null) { 1839 future.completeExceptionally(err2); 1840 } else if (!exists) { 1841 // if table does not exist, then just clone snapshot into new table. 1842 completeConditionalOnFuture(future, 1843 internalRestoreSnapshot(snapshotName, finalTableName)); 1844 } else { 1845 addListener(isTableDisabled(finalTableName), (disabled, err4) -> { 1846 if (err4 != null) { 1847 future.completeExceptionally(err4); 1848 } else if (!disabled) { 1849 future.completeExceptionally(new TableNotDisabledException(finalTableName)); 1850 } else { 1851 completeConditionalOnFuture(future, 1852 restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot)); 1853 } 1854 }); 1855 } 1856 }); 1857 }); 1858 return future; 1859 } 1860 1861 private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName, 1862 boolean takeFailSafeSnapshot) { 1863 if (takeFailSafeSnapshot) { 1864 CompletableFuture<Void> future = new CompletableFuture<>(); 1865 // Step.1 Take a snapshot of the current state 1866 String failSafeSnapshotSnapshotNameFormat = 1867 this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, 1868 HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); 1869 final String failSafeSnapshotSnapshotName = 1870 failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName) 1871 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) 1872 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); 1873 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 1874 addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> { 1875 if (err != null) { 1876 future.completeExceptionally(err); 1877 } else { 1878 // Step.2 Restore snapshot 1879 addListener(internalRestoreSnapshot(snapshotName, tableName), (void2, err2) -> { 1880 if (err2 != null) { 1881 // Step.3.a Something went wrong during the restore and try to rollback. 1882 addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName), 1883 (void3, err3) -> { 1884 if (err3 != null) { 1885 future.completeExceptionally(err3); 1886 } else { 1887 String msg = 1888 "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" + 1889 failSafeSnapshotSnapshotName + " succeeded."; 1890 future.completeExceptionally(new RestoreSnapshotException(msg)); 1891 } 1892 }); 1893 } else { 1894 // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. 1895 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); 1896 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> { 1897 if (err3 != null) { 1898 LOG.error( 1899 "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, 1900 err3); 1901 future.completeExceptionally(err3); 1902 } else { 1903 future.complete(ret3); 1904 } 1905 }); 1906 } 1907 }); 1908 } 1909 }); 1910 return future; 1911 } else { 1912 return internalRestoreSnapshot(snapshotName, tableName); 1913 } 1914 } 1915 1916 private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, 1917 CompletableFuture<T> parentFuture) { 1918 addListener(parentFuture, (res, err) -> { 1919 if (err != null) { 1920 dependentFuture.completeExceptionally(err); 1921 } else { 1922 dependentFuture.complete(res); 1923 } 1924 }); 1925 } 1926 1927 @Override 1928 public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) { 1929 CompletableFuture<Void> future = new CompletableFuture<>(); 1930 addListener(tableExists(tableName), (exists, err) -> { 1931 if (err != null) { 1932 future.completeExceptionally(err); 1933 } else if (exists) { 1934 future.completeExceptionally(new TableExistsException(tableName)); 1935 } else { 1936 completeConditionalOnFuture(future, internalRestoreSnapshot(snapshotName, tableName)); 1937 } 1938 }); 1939 return future; 1940 } 1941 1942 private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName) { 1943 SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder() 1944 .setName(snapshotName).setTable(tableName.getNameAsString()).build(); 1945 try { 1946 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); 1947 } catch (IllegalArgumentException e) { 1948 return failedFuture(e); 1949 } 1950 return waitProcedureResult(this 1951 .<Long> newMasterCaller() 1952 .action( 1953 (controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call( 1954 controller, stub, RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot) 1955 .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(), (s, c, req, 1956 done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())).call()); 1957 } 1958 1959 @Override 1960 public CompletableFuture<List<SnapshotDescription>> listSnapshots() { 1961 return getCompletedSnapshots(null); 1962 } 1963 1964 @Override 1965 public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) { 1966 Preconditions.checkNotNull(pattern, 1967 "pattern is null. If you don't specify a pattern, use listSnapshots() instead"); 1968 return getCompletedSnapshots(pattern); 1969 } 1970 1971 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) { 1972 return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this 1973 .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>> 1974 call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(), 1975 (s, c, req, done) -> s.getCompletedSnapshots(c, req, done), 1976 resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern))) 1977 .call(); 1978 } 1979 1980 @Override 1981 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) { 1982 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 1983 + " If you don't specify a tableNamePattern, use listSnapshots() instead"); 1984 return getCompletedSnapshots(tableNamePattern, null); 1985 } 1986 1987 @Override 1988 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern, 1989 Pattern snapshotNamePattern) { 1990 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 1991 + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead"); 1992 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 1993 + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead"); 1994 return getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 1995 } 1996 1997 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots( 1998 Pattern tableNamePattern, Pattern snapshotNamePattern) { 1999 CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); 2000 addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> { 2001 if (err != null) { 2002 future.completeExceptionally(err); 2003 return; 2004 } 2005 if (tableNames == null || tableNames.size() <= 0) { 2006 future.complete(Collections.emptyList()); 2007 return; 2008 } 2009 addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> { 2010 if (err2 != null) { 2011 future.completeExceptionally(err2); 2012 return; 2013 } 2014 if (snapshotDescList == null || snapshotDescList.isEmpty()) { 2015 future.complete(Collections.emptyList()); 2016 return; 2017 } 2018 future.complete(snapshotDescList.stream() 2019 .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) 2020 .collect(Collectors.toList())); 2021 }); 2022 }); 2023 return future; 2024 } 2025 2026 @Override 2027 public CompletableFuture<Void> deleteSnapshot(String snapshotName) { 2028 return internalDeleteSnapshot(new SnapshotDescription(snapshotName)); 2029 } 2030 2031 @Override 2032 public CompletableFuture<Void> deleteSnapshots() { 2033 return internalDeleteSnapshots(null, null); 2034 } 2035 2036 @Override 2037 public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) { 2038 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2039 + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead"); 2040 return internalDeleteSnapshots(null, snapshotNamePattern); 2041 } 2042 2043 @Override 2044 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) { 2045 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2046 + " If you don't specify a tableNamePattern, use deleteSnapshots() instead"); 2047 return internalDeleteSnapshots(tableNamePattern, null); 2048 } 2049 2050 @Override 2051 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern, 2052 Pattern snapshotNamePattern) { 2053 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null." 2054 + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead"); 2055 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null." 2056 + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead"); 2057 return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern); 2058 } 2059 2060 private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern, 2061 Pattern snapshotNamePattern) { 2062 CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture; 2063 if (tableNamePattern == null) { 2064 listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern); 2065 } else { 2066 listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern); 2067 } 2068 CompletableFuture<Void> future = new CompletableFuture<>(); 2069 addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> { 2070 if (err != null) { 2071 future.completeExceptionally(err); 2072 return; 2073 } 2074 if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { 2075 future.complete(null); 2076 return; 2077 } 2078 addListener(CompletableFuture.allOf(snapshotDescriptions.stream() 2079 .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> { 2080 if (e != null) { 2081 future.completeExceptionally(e); 2082 } else { 2083 future.complete(v); 2084 } 2085 }); 2086 })); 2087 return future; 2088 } 2089 2090 private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { 2091 return this 2092 .<Void> newMasterCaller() 2093 .action( 2094 (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call( 2095 controller, 2096 stub, 2097 DeleteSnapshotRequest.newBuilder() 2098 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c, 2099 req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call(); 2100 } 2101 2102 @Override 2103 public CompletableFuture<Void> execProcedure(String signature, String instance, 2104 Map<String, String> props) { 2105 CompletableFuture<Void> future = new CompletableFuture<>(); 2106 ProcedureDescription procDesc = 2107 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2108 addListener(this.<Long> newMasterCaller() 2109 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( 2110 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), 2111 (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) 2112 .call(), (expectedTimeout, err) -> { 2113 if (err != null) { 2114 future.completeExceptionally(err); 2115 return; 2116 } 2117 TimerTask pollingTask = new TimerTask() { 2118 int tries = 0; 2119 long startTime = EnvironmentEdgeManager.currentTime(); 2120 long endTime = startTime + expectedTimeout; 2121 long maxPauseTime = expectedTimeout / maxAttempts; 2122 2123 @Override 2124 public void run(Timeout timeout) throws Exception { 2125 if (EnvironmentEdgeManager.currentTime() < endTime) { 2126 addListener(isProcedureFinished(signature, instance, props), (done, err2) -> { 2127 if (err2 != null) { 2128 future.completeExceptionally(err2); 2129 return; 2130 } 2131 if (done) { 2132 future.complete(null); 2133 } else { 2134 // retry again after pauseTime. 2135 long pauseTime = 2136 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); 2137 pauseTime = Math.min(pauseTime, maxPauseTime); 2138 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, 2139 TimeUnit.MICROSECONDS); 2140 } 2141 }); 2142 } else { 2143 future.completeExceptionally(new IOException("Procedure '" + signature + " : " + 2144 instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); 2145 } 2146 } 2147 }; 2148 // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. 2149 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); 2150 }); 2151 return future; 2152 } 2153 2154 @Override 2155 public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance, 2156 Map<String, String> props) { 2157 ProcedureDescription proDesc = 2158 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2159 return this.<byte[]> newMasterCaller() 2160 .action( 2161 (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call( 2162 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(), 2163 (s, c, req, done) -> s.execProcedureWithRet(c, req, done), 2164 resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null)) 2165 .call(); 2166 } 2167 2168 @Override 2169 public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance, 2170 Map<String, String> props) { 2171 ProcedureDescription proDesc = 2172 ProtobufUtil.buildProcedureDescription(signature, instance, props); 2173 return this.<Boolean> newMasterCaller() 2174 .action((controller, stub) -> this 2175 .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub, 2176 IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(), 2177 (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone())) 2178 .call(); 2179 } 2180 2181 @Override 2182 public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) { 2183 return this.<Boolean> newMasterCaller().action( 2184 (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call( 2185 controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(), 2186 (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted())) 2187 .call(); 2188 } 2189 2190 @Override 2191 public CompletableFuture<String> getProcedures() { 2192 return this 2193 .<String> newMasterCaller() 2194 .action( 2195 (controller, stub) -> this 2196 .<GetProceduresRequest, GetProceduresResponse, String> call( 2197 controller, stub, GetProceduresRequest.newBuilder().build(), 2198 (s, c, req, done) -> s.getProcedures(c, req, done), 2199 resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call(); 2200 } 2201 2202 @Override 2203 public CompletableFuture<String> getLocks() { 2204 return this 2205 .<String> newMasterCaller() 2206 .action( 2207 (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call( 2208 controller, stub, GetLocksRequest.newBuilder().build(), 2209 (s, c, req, done) -> s.getLocks(c, req, done), 2210 resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call(); 2211 } 2212 2213 @Override 2214 public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload) { 2215 return this.<Void> newMasterCaller() 2216 .action((controller, stub) -> this 2217 .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call( 2218 controller, stub, RequestConverter.buildDecommissionRegionServersRequest(servers, offload), 2219 (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null)) 2220 .call(); 2221 } 2222 2223 @Override 2224 public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() { 2225 return this.<List<ServerName>> newMasterCaller() 2226 .action((controller, stub) -> this 2227 .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse, 2228 List<ServerName>> call( 2229 controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(), 2230 (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done), 2231 resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName) 2232 .collect(Collectors.toList()))) 2233 .call(); 2234 } 2235 2236 @Override 2237 public CompletableFuture<Void> recommissionRegionServer(ServerName server, 2238 List<byte[]> encodedRegionNames) { 2239 return this.<Void> newMasterCaller() 2240 .action((controller, stub) -> this 2241 .<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(controller, 2242 stub, RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames), 2243 (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null)) 2244 .call(); 2245 } 2246 2247 /** 2248 * Get the region location for the passed region name. The region name may be a full region name 2249 * or encoded region name. If the region does not found, then it'll throw an 2250 * UnknownRegionException wrapped by a {@link CompletableFuture} 2251 * @param regionNameOrEncodedRegionName 2252 * @return region location, wrapped by a {@link CompletableFuture} 2253 */ 2254 @VisibleForTesting 2255 CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) { 2256 if (regionNameOrEncodedRegionName == null) { 2257 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2258 } 2259 try { 2260 CompletableFuture<Optional<HRegionLocation>> future; 2261 if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { 2262 future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable, 2263 regionNameOrEncodedRegionName); 2264 } else { 2265 future = AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName); 2266 } 2267 2268 CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); 2269 addListener(future, (location, err) -> { 2270 if (err != null) { 2271 returnedFuture.completeExceptionally(err); 2272 return; 2273 } 2274 if (!location.isPresent() || location.get().getRegion() == null) { 2275 returnedFuture.completeExceptionally( 2276 new UnknownRegionException("Invalid region name or encoded region name: " + 2277 Bytes.toStringBinary(regionNameOrEncodedRegionName))); 2278 } else { 2279 returnedFuture.complete(location.get()); 2280 } 2281 }); 2282 return returnedFuture; 2283 } catch (IOException e) { 2284 return failedFuture(e); 2285 } 2286 } 2287 2288 /** 2289 * Get the region info for the passed region name. The region name may be a full region name or 2290 * encoded region name. If the region does not found, then it'll throw an UnknownRegionException 2291 * wrapped by a {@link CompletableFuture} 2292 * @param regionNameOrEncodedRegionName 2293 * @return region info, wrapped by a {@link CompletableFuture} 2294 */ 2295 private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) { 2296 if (regionNameOrEncodedRegionName == null) { 2297 return failedFuture(new IllegalArgumentException("Passed region name can't be null")); 2298 } 2299 2300 if (Bytes.equals(regionNameOrEncodedRegionName, 2301 RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) || 2302 Bytes.equals(regionNameOrEncodedRegionName, 2303 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) { 2304 return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO); 2305 } 2306 2307 CompletableFuture<RegionInfo> future = new CompletableFuture<>(); 2308 addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> { 2309 if (err != null) { 2310 future.completeExceptionally(err); 2311 } else { 2312 future.complete(location.getRegion()); 2313 } 2314 }); 2315 return future; 2316 } 2317 2318 private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { 2319 if (numRegions < 3) { 2320 throw new IllegalArgumentException("Must create at least three regions"); 2321 } else if (Bytes.compareTo(startKey, endKey) >= 0) { 2322 throw new IllegalArgumentException("Start key must be smaller than end key"); 2323 } 2324 if (numRegions == 3) { 2325 return new byte[][] { startKey, endKey }; 2326 } 2327 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 2328 if (splitKeys == null || splitKeys.length != numRegions - 1) { 2329 throw new IllegalArgumentException("Unable to split key range into enough regions"); 2330 } 2331 return splitKeys; 2332 } 2333 2334 private void verifySplitKeys(byte[][] splitKeys) { 2335 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); 2336 // Verify there are no duplicate split keys 2337 byte[] lastKey = null; 2338 for (byte[] splitKey : splitKeys) { 2339 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { 2340 throw new IllegalArgumentException("Empty split key must not be passed in the split keys."); 2341 } 2342 if (lastKey != null && Bytes.equals(splitKey, lastKey)) { 2343 throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: " 2344 + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey)); 2345 } 2346 lastKey = splitKey; 2347 } 2348 } 2349 2350 private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> { 2351 2352 abstract void onFinished(); 2353 2354 abstract void onError(Throwable error); 2355 2356 @Override 2357 public void accept(Void v, Throwable error) { 2358 if (error != null) { 2359 onError(error); 2360 return; 2361 } 2362 onFinished(); 2363 } 2364 } 2365 2366 private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { 2367 protected final TableName tableName; 2368 2369 TableProcedureBiConsumer(TableName tableName) { 2370 this.tableName = tableName; 2371 } 2372 2373 abstract String getOperationType(); 2374 2375 String getDescription() { 2376 return "Operation: " + getOperationType() + ", " + "Table Name: " 2377 + tableName.getNameWithNamespaceInclAsString(); 2378 } 2379 2380 @Override 2381 void onFinished() { 2382 LOG.info(getDescription() + " completed"); 2383 } 2384 2385 @Override 2386 void onError(Throwable error) { 2387 LOG.info(getDescription() + " failed with " + error.getMessage()); 2388 } 2389 } 2390 2391 private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer { 2392 protected final String namespaceName; 2393 2394 NamespaceProcedureBiConsumer(String namespaceName) { 2395 this.namespaceName = namespaceName; 2396 } 2397 2398 abstract String getOperationType(); 2399 2400 String getDescription() { 2401 return "Operation: " + getOperationType() + ", Namespace: " + namespaceName; 2402 } 2403 2404 @Override 2405 void onFinished() { 2406 LOG.info(getDescription() + " completed"); 2407 } 2408 2409 @Override 2410 void onError(Throwable error) { 2411 LOG.info(getDescription() + " failed with " + error.getMessage()); 2412 } 2413 } 2414 2415 private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2416 2417 CreateTableProcedureBiConsumer(TableName tableName) { 2418 super(tableName); 2419 } 2420 2421 @Override 2422 String getOperationType() { 2423 return "CREATE"; 2424 } 2425 } 2426 2427 private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { 2428 2429 ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { 2430 super(tableName); 2431 } 2432 2433 @Override 2434 String getOperationType() { 2435 return "ENABLE"; 2436 } 2437 } 2438 2439 private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { 2440 2441 DeleteTableProcedureBiConsumer(TableName tableName) { 2442 super(tableName); 2443 } 2444 2445 @Override 2446 String getOperationType() { 2447 return "DELETE"; 2448 } 2449 2450 @Override 2451 void onFinished() { 2452 connection.getLocator().clearCache(this.tableName); 2453 super.onFinished(); 2454 } 2455 } 2456 2457 private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { 2458 2459 TruncateTableProcedureBiConsumer(TableName tableName) { 2460 super(tableName); 2461 } 2462 2463 @Override 2464 String getOperationType() { 2465 return "TRUNCATE"; 2466 } 2467 } 2468 2469 private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2470 2471 EnableTableProcedureBiConsumer(TableName tableName) { 2472 super(tableName); 2473 } 2474 2475 @Override 2476 String getOperationType() { 2477 return "ENABLE"; 2478 } 2479 } 2480 2481 private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer { 2482 2483 DisableTableProcedureBiConsumer(TableName tableName) { 2484 super(tableName); 2485 } 2486 2487 @Override 2488 String getOperationType() { 2489 return "DISABLE"; 2490 } 2491 } 2492 2493 private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2494 2495 AddColumnFamilyProcedureBiConsumer(TableName tableName) { 2496 super(tableName); 2497 } 2498 2499 @Override 2500 String getOperationType() { 2501 return "ADD_COLUMN_FAMILY"; 2502 } 2503 } 2504 2505 private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2506 2507 DeleteColumnFamilyProcedureBiConsumer(TableName tableName) { 2508 super(tableName); 2509 } 2510 2511 @Override 2512 String getOperationType() { 2513 return "DELETE_COLUMN_FAMILY"; 2514 } 2515 } 2516 2517 private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { 2518 2519 ModifyColumnFamilyProcedureBiConsumer(TableName tableName) { 2520 super(tableName); 2521 } 2522 2523 @Override 2524 String getOperationType() { 2525 return "MODIFY_COLUMN_FAMILY"; 2526 } 2527 } 2528 2529 private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2530 2531 CreateNamespaceProcedureBiConsumer(String namespaceName) { 2532 super(namespaceName); 2533 } 2534 2535 @Override 2536 String getOperationType() { 2537 return "CREATE_NAMESPACE"; 2538 } 2539 } 2540 2541 private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2542 2543 DeleteNamespaceProcedureBiConsumer(String namespaceName) { 2544 super(namespaceName); 2545 } 2546 2547 @Override 2548 String getOperationType() { 2549 return "DELETE_NAMESPACE"; 2550 } 2551 } 2552 2553 private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { 2554 2555 ModifyNamespaceProcedureBiConsumer(String namespaceName) { 2556 super(namespaceName); 2557 } 2558 2559 @Override 2560 String getOperationType() { 2561 return "MODIFY_NAMESPACE"; 2562 } 2563 } 2564 2565 private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2566 2567 MergeTableRegionProcedureBiConsumer(TableName tableName) { 2568 super(tableName); 2569 } 2570 2571 @Override 2572 String getOperationType() { 2573 return "MERGE_REGIONS"; 2574 } 2575 } 2576 2577 private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { 2578 2579 SplitTableRegionProcedureBiConsumer(TableName tableName) { 2580 super(tableName); 2581 } 2582 2583 @Override 2584 String getOperationType() { 2585 return "SPLIT_REGION"; 2586 } 2587 } 2588 2589 private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { 2590 private final String peerId; 2591 private final Supplier<String> getOperation; 2592 2593 ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) { 2594 this.peerId = peerId; 2595 this.getOperation = getOperation; 2596 } 2597 2598 String getDescription() { 2599 return "Operation: " + getOperation.get() + ", peerId: " + peerId; 2600 } 2601 2602 @Override 2603 void onFinished() { 2604 LOG.info(getDescription() + " completed"); 2605 } 2606 2607 @Override 2608 void onError(Throwable error) { 2609 LOG.info(getDescription() + " failed with " + error.getMessage()); 2610 } 2611 } 2612 2613 private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) { 2614 CompletableFuture<Void> future = new CompletableFuture<>(); 2615 addListener(procFuture, (procId, error) -> { 2616 if (error != null) { 2617 future.completeExceptionally(error); 2618 return; 2619 } 2620 getProcedureResult(procId, future, 0); 2621 }); 2622 return future; 2623 } 2624 2625 private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) { 2626 addListener( 2627 this.<GetProcedureResultResponse> newMasterCaller() 2628 .action((controller, stub) -> this 2629 .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call( 2630 controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(), 2631 (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) 2632 .call(), 2633 (response, error) -> { 2634 if (error != null) { 2635 LOG.warn("failed to get the procedure result procId={}", procId, 2636 ConnectionUtils.translateException(error)); 2637 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2638 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2639 return; 2640 } 2641 if (response.getState() == GetProcedureResultResponse.State.RUNNING) { 2642 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), 2643 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); 2644 return; 2645 } 2646 if (response.hasException()) { 2647 IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); 2648 future.completeExceptionally(ioe); 2649 } else { 2650 future.complete(null); 2651 } 2652 }); 2653 } 2654 2655 private <T> CompletableFuture<T> failedFuture(Throwable error) { 2656 CompletableFuture<T> future = new CompletableFuture<>(); 2657 future.completeExceptionally(error); 2658 return future; 2659 } 2660 2661 private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) { 2662 if (error != null) { 2663 future.completeExceptionally(error); 2664 return true; 2665 } 2666 return false; 2667 } 2668 2669 @Override 2670 public CompletableFuture<ClusterMetrics> getClusterMetrics() { 2671 return getClusterMetrics(EnumSet.allOf(Option.class)); 2672 } 2673 2674 @Override 2675 public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) { 2676 return this 2677 .<ClusterMetrics> newMasterCaller() 2678 .action( 2679 (controller, stub) -> this 2680 .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller, 2681 stub, RequestConverter.buildGetClusterStatusRequest(options), 2682 (s, c, req, done) -> s.getClusterStatus(c, req, done), 2683 resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call(); 2684 } 2685 2686 @Override 2687 public CompletableFuture<Void> shutdown() { 2688 return this 2689 .<Void> newMasterCaller() 2690 .action( 2691 (controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller, 2692 stub, ShutdownRequest.newBuilder().build(), 2693 (s, c, req, done) -> s.shutdown(c, req, done), resp -> null)).call(); 2694 } 2695 2696 @Override 2697 public CompletableFuture<Void> stopMaster() { 2698 return this 2699 .<Void> newMasterCaller() 2700 .action( 2701 (controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(controller, 2702 stub, StopMasterRequest.newBuilder().build(), 2703 (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)).call(); 2704 } 2705 2706 @Override 2707 public CompletableFuture<Void> stopRegionServer(ServerName serverName) { 2708 StopServerRequest request = 2709 RequestConverter.buildStopServerRequest("Called by admin client " 2710 + this.connection.toString()); 2711 return this 2712 .<Void> newAdminCaller() 2713 .action( 2714 (controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall( 2715 controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), 2716 resp -> null)).serverName(serverName).call(); 2717 } 2718 2719 @Override 2720 public CompletableFuture<Void> updateConfiguration(ServerName serverName) { 2721 return this 2722 .<Void> newAdminCaller() 2723 .action( 2724 (controller, stub) -> this 2725 .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall( 2726 controller, stub, UpdateConfigurationRequest.getDefaultInstance(), 2727 (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null)) 2728 .serverName(serverName).call(); 2729 } 2730 2731 @Override 2732 public CompletableFuture<Void> updateConfiguration() { 2733 CompletableFuture<Void> future = new CompletableFuture<Void>(); 2734 addListener( 2735 getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS)), 2736 (status, err) -> { 2737 if (err != null) { 2738 future.completeExceptionally(err); 2739 } else { 2740 List<CompletableFuture<Void>> futures = new ArrayList<>(); 2741 status.getLiveServerMetrics().keySet() 2742 .forEach(server -> futures.add(updateConfiguration(server))); 2743 futures.add(updateConfiguration(status.getMasterName())); 2744 status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master))); 2745 addListener( 2746 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 2747 (result, err2) -> { 2748 if (err2 != null) { 2749 future.completeExceptionally(err2); 2750 } else { 2751 future.complete(result); 2752 } 2753 }); 2754 } 2755 }); 2756 return future; 2757 } 2758 2759 @Override 2760 public CompletableFuture<Void> rollWALWriter(ServerName serverName) { 2761 return this 2762 .<Void> newAdminCaller() 2763 .action( 2764 (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall( 2765 controller, stub, RequestConverter.buildRollWALWriterRequest(), 2766 (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null)) 2767 .serverName(serverName).call(); 2768 } 2769 2770 @Override 2771 public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) { 2772 return this 2773 .<Void> newAdminCaller() 2774 .action( 2775 (controller, stub) -> this 2776 .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall( 2777 controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s, 2778 c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null)) 2779 .serverName(serverName).call(); 2780 } 2781 2782 @Override 2783 public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() { 2784 return this 2785 .<List<SecurityCapability>> newMasterCaller() 2786 .action( 2787 (controller, stub) -> this 2788 .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>> call( 2789 controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), (s, c, req, 2790 done) -> s.getSecurityCapabilities(c, req, done), (resp) -> ProtobufUtil 2791 .toSecurityCapabilityList(resp.getCapabilitiesList()))).call(); 2792 } 2793 2794 @Override 2795 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) { 2796 return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName); 2797 } 2798 2799 @Override 2800 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName, 2801 TableName tableName) { 2802 Preconditions.checkNotNull(tableName, 2803 "tableName is null. If you don't specify a tableName, use getRegionLoads() instead"); 2804 return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName); 2805 } 2806 2807 private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request, 2808 ServerName serverName) { 2809 return this.<List<RegionMetrics>> newAdminCaller() 2810 .action((controller, stub) -> this 2811 .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>> 2812 adminCall(controller, stub, request, (s, c, req, done) -> 2813 s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics)) 2814 .serverName(serverName).call(); 2815 } 2816 2817 @Override 2818 public CompletableFuture<Boolean> isMasterInMaintenanceMode() { 2819 return this 2820 .<Boolean> newMasterCaller() 2821 .action( 2822 (controller, stub) -> this 2823 .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller, 2824 stub, IsInMaintenanceModeRequest.newBuilder().build(), 2825 (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done), 2826 resp -> resp.getInMaintenanceMode())).call(); 2827 } 2828 2829 @Override 2830 public CompletableFuture<CompactionState> getCompactionState(TableName tableName, 2831 CompactType compactType) { 2832 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 2833 2834 switch (compactType) { 2835 case MOB: 2836 addListener(connection.registry.getMasterAddress(), (serverName, err) -> { 2837 if (err != null) { 2838 future.completeExceptionally(err); 2839 return; 2840 } 2841 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); 2842 2843 addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName) 2844 .action((controller, stub) -> this 2845 .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall( 2846 controller, stub, 2847 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), 2848 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 2849 .call(), (resp2, err2) -> { 2850 if (err2 != null) { 2851 future.completeExceptionally(err2); 2852 } else { 2853 if (resp2.hasCompactionState()) { 2854 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 2855 } else { 2856 future.complete(CompactionState.NONE); 2857 } 2858 } 2859 }); 2860 }); 2861 break; 2862 case NORMAL: 2863 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 2864 if (err != null) { 2865 future.completeExceptionally(err); 2866 return; 2867 } 2868 ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>(); 2869 List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); 2870 locations.stream().filter(loc -> loc.getServerName() != null) 2871 .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline()) 2872 .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { 2873 futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { 2874 // If any region compaction state is MAJOR_AND_MINOR 2875 // the table compaction state is MAJOR_AND_MINOR, too. 2876 if (err2 != null) { 2877 future.completeExceptionally(unwrapCompletionException(err2)); 2878 } else if (regionState == CompactionState.MAJOR_AND_MINOR) { 2879 future.complete(regionState); 2880 } else { 2881 regionStates.add(regionState); 2882 } 2883 })); 2884 }); 2885 addListener( 2886 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 2887 (ret, err3) -> { 2888 // If future not completed, check all regions's compaction state 2889 if (!future.isCompletedExceptionally() && !future.isDone()) { 2890 CompactionState state = CompactionState.NONE; 2891 for (CompactionState regionState : regionStates) { 2892 switch (regionState) { 2893 case MAJOR: 2894 if (state == CompactionState.MINOR) { 2895 future.complete(CompactionState.MAJOR_AND_MINOR); 2896 } else { 2897 state = CompactionState.MAJOR; 2898 } 2899 break; 2900 case MINOR: 2901 if (state == CompactionState.MAJOR) { 2902 future.complete(CompactionState.MAJOR_AND_MINOR); 2903 } else { 2904 state = CompactionState.MINOR; 2905 } 2906 break; 2907 case NONE: 2908 default: 2909 } 2910 } 2911 if (!future.isDone()) { 2912 future.complete(state); 2913 } 2914 } 2915 }); 2916 }); 2917 break; 2918 default: 2919 throw new IllegalArgumentException("Unknown compactType: " + compactType); 2920 } 2921 2922 return future; 2923 } 2924 2925 @Override 2926 public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { 2927 CompletableFuture<CompactionState> future = new CompletableFuture<>(); 2928 addListener(getRegionLocation(regionName), (location, err) -> { 2929 if (err != null) { 2930 future.completeExceptionally(err); 2931 return; 2932 } 2933 ServerName serverName = location.getServerName(); 2934 if (serverName == null) { 2935 future 2936 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); 2937 return; 2938 } 2939 addListener( 2940 this.<GetRegionInfoResponse> newAdminCaller() 2941 .action((controller, stub) -> this 2942 .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall( 2943 controller, stub, 2944 RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(), 2945 true), 2946 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) 2947 .serverName(serverName).call(), 2948 (resp2, err2) -> { 2949 if (err2 != null) { 2950 future.completeExceptionally(err2); 2951 } else { 2952 if (resp2.hasCompactionState()) { 2953 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); 2954 } else { 2955 future.complete(CompactionState.NONE); 2956 } 2957 } 2958 }); 2959 }); 2960 return future; 2961 } 2962 2963 @Override 2964 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) { 2965 MajorCompactionTimestampRequest request = 2966 MajorCompactionTimestampRequest.newBuilder() 2967 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); 2968 return this 2969 .<Optional<Long>> newMasterCaller() 2970 .action( 2971 (controller, stub) -> this 2972 .<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> call( 2973 controller, stub, request, 2974 (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done), 2975 ProtobufUtil::toOptionalTimestamp)).call(); 2976 } 2977 2978 @Override 2979 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion( 2980 byte[] regionName) { 2981 CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); 2982 // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first 2983 addListener(getRegionInfo(regionName), (region, err) -> { 2984 if (err != null) { 2985 future.completeExceptionally(err); 2986 return; 2987 } 2988 MajorCompactionTimestampForRegionRequest.Builder builder = 2989 MajorCompactionTimestampForRegionRequest.newBuilder(); 2990 builder.setRegion( 2991 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); 2992 addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this 2993 .<MajorCompactionTimestampForRegionRequest, 2994 MajorCompactionTimestampResponse, Optional<Long>> call( 2995 controller, stub, builder.build(), 2996 (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done), 2997 ProtobufUtil::toOptionalTimestamp)) 2998 .call(), (timestamp, err2) -> { 2999 if (err2 != null) { 3000 future.completeExceptionally(err2); 3001 } else { 3002 future.complete(timestamp); 3003 } 3004 }); 3005 }); 3006 return future; 3007 } 3008 3009 @Override 3010 public CompletableFuture<Boolean> balancerSwitch(final boolean on) { 3011 return this 3012 .<Boolean> newMasterCaller() 3013 .action( 3014 (controller, stub) -> this 3015 .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, 3016 stub, RequestConverter.buildSetBalancerRunningRequest(on, true), 3017 (s, c, req, done) -> s.setBalancerRunning(c, req, done), 3018 (resp) -> resp.getPrevBalanceValue())).call(); 3019 } 3020 3021 @Override 3022 public CompletableFuture<Boolean> balance(boolean forcible) { 3023 return this 3024 .<Boolean> newMasterCaller() 3025 .action( 3026 (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller, 3027 stub, RequestConverter.buildBalanceRequest(forcible), 3028 (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call(); 3029 } 3030 3031 @Override 3032 public CompletableFuture<Boolean> isBalancerEnabled() { 3033 return this 3034 .<Boolean> newMasterCaller() 3035 .action( 3036 (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call( 3037 controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), 3038 (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())) 3039 .call(); 3040 } 3041 3042 @Override 3043 public CompletableFuture<Boolean> normalizerSwitch(boolean on) { 3044 return this 3045 .<Boolean> newMasterCaller() 3046 .action( 3047 (controller, stub) -> this 3048 .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call( 3049 controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c, 3050 req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp 3051 .getPrevNormalizerValue())).call(); 3052 } 3053 3054 @Override 3055 public CompletableFuture<Boolean> isNormalizerEnabled() { 3056 return this 3057 .<Boolean> newMasterCaller() 3058 .action( 3059 (controller, stub) -> this 3060 .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller, 3061 stub, RequestConverter.buildIsNormalizerEnabledRequest(), 3062 (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), 3063 (resp) -> resp.getEnabled())).call(); 3064 } 3065 3066 @Override 3067 public CompletableFuture<Boolean> normalize() { 3068 return this 3069 .<Boolean> newMasterCaller() 3070 .action( 3071 (controller, stub) -> this.<NormalizeRequest, NormalizeResponse, Boolean> call( 3072 controller, stub, RequestConverter.buildNormalizeRequest(), 3073 (s, c, req, done) -> s.normalize(c, req, done), (resp) -> resp.getNormalizerRan())) 3074 .call(); 3075 } 3076 3077 @Override 3078 public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) { 3079 return this 3080 .<Boolean> newMasterCaller() 3081 .action( 3082 (controller, stub) -> this 3083 .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call( 3084 controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s, 3085 c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp 3086 .getPrevValue())).call(); 3087 } 3088 3089 @Override 3090 public CompletableFuture<Boolean> isCleanerChoreEnabled() { 3091 return this 3092 .<Boolean> newMasterCaller() 3093 .action( 3094 (controller, stub) -> this 3095 .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call( 3096 controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req, 3097 done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue())) 3098 .call(); 3099 } 3100 3101 @Override 3102 public CompletableFuture<Boolean> runCleanerChore() { 3103 return this 3104 .<Boolean> newMasterCaller() 3105 .action( 3106 (controller, stub) -> this 3107 .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub, 3108 RequestConverter.buildRunCleanerChoreRequest(), 3109 (s, c, req, done) -> s.runCleanerChore(c, req, done), 3110 (resp) -> resp.getCleanerChoreRan())).call(); 3111 } 3112 3113 @Override 3114 public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) { 3115 return this 3116 .<Boolean> newMasterCaller() 3117 .action( 3118 (controller, stub) -> this 3119 .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call( 3120 controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s, 3121 c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp 3122 .getPrevValue())).call(); 3123 } 3124 3125 @Override 3126 public CompletableFuture<Boolean> isCatalogJanitorEnabled() { 3127 return this 3128 .<Boolean> newMasterCaller() 3129 .action( 3130 (controller, stub) -> this 3131 .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call( 3132 controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c, 3133 req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp 3134 .getValue())).call(); 3135 } 3136 3137 @Override 3138 public CompletableFuture<Integer> runCatalogJanitor() { 3139 return this 3140 .<Integer> newMasterCaller() 3141 .action( 3142 (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call( 3143 controller, stub, RequestConverter.buildCatalogScanRequest(), 3144 (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult())) 3145 .call(); 3146 } 3147 3148 @Override 3149 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3150 ServiceCaller<S, R> callable) { 3151 MasterCoprocessorRpcChannelImpl channel = 3152 new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller()); 3153 S stub = stubMaker.apply(channel); 3154 CompletableFuture<R> future = new CompletableFuture<>(); 3155 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3156 callable.call(stub, controller, resp -> { 3157 if (controller.failed()) { 3158 future.completeExceptionally(controller.getFailed()); 3159 } else { 3160 future.complete(resp); 3161 } 3162 }); 3163 return future; 3164 } 3165 3166 @Override 3167 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 3168 ServiceCaller<S, R> callable, ServerName serverName) { 3169 RegionServerCoprocessorRpcChannelImpl channel = 3170 new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName( 3171 serverName)); 3172 S stub = stubMaker.apply(channel); 3173 CompletableFuture<R> future = new CompletableFuture<>(); 3174 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 3175 callable.call(stub, controller, resp -> { 3176 if (controller.failed()) { 3177 future.completeExceptionally(controller.getFailed()); 3178 } else { 3179 future.complete(resp); 3180 } 3181 }); 3182 return future; 3183 } 3184 3185 @Override 3186 public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) { 3187 return this.<List<ServerName>> newMasterCaller() 3188 .action((controller, stub) -> this 3189 .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call( 3190 controller, stub, RequestConverter.buildClearDeadServersRequest(servers), 3191 (s, c, req, done) -> s.clearDeadServers(c, req, done), 3192 (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList()))) 3193 .call(); 3194 } 3195 3196 private <T> ServerRequestCallerBuilder<T> newServerCaller() { 3197 return this.connection.callerFactory.<T> serverRequest() 3198 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 3199 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 3200 .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) 3201 .startLogErrorsCnt(startLogErrorsCnt); 3202 } 3203 3204 @Override 3205 public CompletableFuture<Void> enableTableReplication(TableName tableName) { 3206 if (tableName == null) { 3207 return failedFuture(new IllegalArgumentException("Table name is null")); 3208 } 3209 CompletableFuture<Void> future = new CompletableFuture<>(); 3210 addListener(tableExists(tableName), (exist, err) -> { 3211 if (err != null) { 3212 future.completeExceptionally(err); 3213 return; 3214 } 3215 if (!exist) { 3216 future.completeExceptionally(new TableNotFoundException( 3217 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3218 return; 3219 } 3220 addListener(getTableSplits(tableName), (splits, err1) -> { 3221 if (err1 != null) { 3222 future.completeExceptionally(err1); 3223 } else { 3224 addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> { 3225 if (err2 != null) { 3226 future.completeExceptionally(err2); 3227 } else { 3228 addListener(setTableReplication(tableName, true), (result3, err3) -> { 3229 if (err3 != null) { 3230 future.completeExceptionally(err3); 3231 } else { 3232 future.complete(result3); 3233 } 3234 }); 3235 } 3236 }); 3237 } 3238 }); 3239 }); 3240 return future; 3241 } 3242 3243 @Override 3244 public CompletableFuture<Void> disableTableReplication(TableName tableName) { 3245 if (tableName == null) { 3246 return failedFuture(new IllegalArgumentException("Table name is null")); 3247 } 3248 CompletableFuture<Void> future = new CompletableFuture<>(); 3249 addListener(tableExists(tableName), (exist, err) -> { 3250 if (err != null) { 3251 future.completeExceptionally(err); 3252 return; 3253 } 3254 if (!exist) { 3255 future.completeExceptionally(new TableNotFoundException( 3256 "Table '" + tableName.getNameAsString() + "' does not exists.")); 3257 return; 3258 } 3259 addListener(setTableReplication(tableName, false), (result, err2) -> { 3260 if (err2 != null) { 3261 future.completeExceptionally(err2); 3262 } else { 3263 future.complete(result); 3264 } 3265 }); 3266 }); 3267 return future; 3268 } 3269 3270 private CompletableFuture<byte[][]> getTableSplits(TableName tableName) { 3271 CompletableFuture<byte[][]> future = new CompletableFuture<>(); 3272 addListener(getRegions(tableName), (regions, err2) -> { 3273 if (err2 != null) { 3274 future.completeExceptionally(err2); 3275 return; 3276 } 3277 if (regions.size() == 1) { 3278 future.complete(null); 3279 } else { 3280 byte[][] splits = new byte[regions.size() - 1][]; 3281 for (int i = 1; i < regions.size(); i++) { 3282 splits[i - 1] = regions.get(i).getStartKey(); 3283 } 3284 future.complete(splits); 3285 } 3286 }); 3287 return future; 3288 } 3289 3290 /** 3291 * Connect to peer and check the table descriptor on peer: 3292 * <ol> 3293 * <li>Create the same table on peer when not exist.</li> 3294 * <li>Throw an exception if the table already has replication enabled on any of the column 3295 * families.</li> 3296 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li> 3297 * </ol> 3298 * @param tableName name of the table to sync to the peer 3299 * @param splits table split keys 3300 */ 3301 private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName, 3302 byte[][] splits) { 3303 CompletableFuture<Void> future = new CompletableFuture<>(); 3304 addListener(listReplicationPeers(), (peers, err) -> { 3305 if (err != null) { 3306 future.completeExceptionally(err); 3307 return; 3308 } 3309 if (peers == null || peers.size() <= 0) { 3310 future.completeExceptionally( 3311 new IllegalArgumentException("Found no peer cluster for replication.")); 3312 return; 3313 } 3314 List<CompletableFuture<Void>> futures = new ArrayList<>(); 3315 peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName)) 3316 .forEach(peer -> { 3317 futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); 3318 }); 3319 addListener( 3320 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), 3321 (result, err2) -> { 3322 if (err2 != null) { 3323 future.completeExceptionally(err2); 3324 } else { 3325 future.complete(result); 3326 } 3327 }); 3328 }); 3329 return future; 3330 } 3331 3332 private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits, 3333 ReplicationPeerDescription peer) { 3334 Configuration peerConf = null; 3335 try { 3336 peerConf = 3337 ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer); 3338 } catch (IOException e) { 3339 return failedFuture(e); 3340 } 3341 CompletableFuture<Void> future = new CompletableFuture<>(); 3342 addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> { 3343 if (err != null) { 3344 future.completeExceptionally(err); 3345 return; 3346 } 3347 addListener(getDescriptor(tableName), (tableDesc, err1) -> { 3348 if (err1 != null) { 3349 future.completeExceptionally(err1); 3350 return; 3351 } 3352 AsyncAdmin peerAdmin = conn.getAdmin(); 3353 addListener(peerAdmin.tableExists(tableName), (exist, err2) -> { 3354 if (err2 != null) { 3355 future.completeExceptionally(err2); 3356 return; 3357 } 3358 if (!exist) { 3359 CompletableFuture<Void> createTableFuture = null; 3360 if (splits == null) { 3361 createTableFuture = peerAdmin.createTable(tableDesc); 3362 } else { 3363 createTableFuture = peerAdmin.createTable(tableDesc, splits); 3364 } 3365 addListener(createTableFuture, (result, err3) -> { 3366 if (err3 != null) { 3367 future.completeExceptionally(err3); 3368 } else { 3369 future.complete(result); 3370 } 3371 }); 3372 } else { 3373 addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin), 3374 (result, err4) -> { 3375 if (err4 != null) { 3376 future.completeExceptionally(err4); 3377 } else { 3378 future.complete(result); 3379 } 3380 }); 3381 } 3382 }); 3383 }); 3384 }); 3385 return future; 3386 } 3387 3388 private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName, 3389 TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { 3390 CompletableFuture<Void> future = new CompletableFuture<>(); 3391 addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> { 3392 if (err != null) { 3393 future.completeExceptionally(err); 3394 return; 3395 } 3396 if (peerTableDesc == null) { 3397 future.completeExceptionally( 3398 new IllegalArgumentException("Failed to get table descriptor for table " + 3399 tableName.getNameAsString() + " from peer cluster " + peer.getPeerId())); 3400 return; 3401 } 3402 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) { 3403 future.completeExceptionally(new IllegalArgumentException( 3404 "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() + 3405 ", but the table descriptors are not same when compared with source cluster." + 3406 " Thus can not enable the table's replication switch.")); 3407 return; 3408 } 3409 future.complete(null); 3410 }); 3411 return future; 3412 } 3413 3414 /** 3415 * Set the table's replication switch if the table's replication switch is already not set. 3416 * @param tableName name of the table 3417 * @param enableRep is replication switch enable or disable 3418 */ 3419 private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) { 3420 CompletableFuture<Void> future = new CompletableFuture<>(); 3421 addListener(getDescriptor(tableName), (tableDesc, err) -> { 3422 if (err != null) { 3423 future.completeExceptionally(err); 3424 return; 3425 } 3426 if (!tableDesc.matchReplicationScope(enableRep)) { 3427 int scope = 3428 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; 3429 TableDescriptor newTableDesc = 3430 TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build(); 3431 addListener(modifyTable(newTableDesc), (result, err2) -> { 3432 if (err2 != null) { 3433 future.completeExceptionally(err2); 3434 } else { 3435 future.complete(result); 3436 } 3437 }); 3438 } else { 3439 future.complete(null); 3440 } 3441 }); 3442 return future; 3443 } 3444 3445 @Override 3446 public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) { 3447 CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>(); 3448 addListener(getTableHRegionLocations(tableName), (locations, err) -> { 3449 if (err != null) { 3450 future.completeExceptionally(err); 3451 return; 3452 } 3453 Map<ServerName, List<RegionInfo>> regionInfoByServerName = 3454 locations.stream().filter(l -> l.getRegion() != null) 3455 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) 3456 .collect(Collectors.groupingBy(l -> l.getServerName(), 3457 Collectors.mapping(l -> l.getRegion(), Collectors.toList()))); 3458 List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>(); 3459 CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator(); 3460 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) { 3461 futures 3462 .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> { 3463 if (err2 != null) { 3464 future.completeExceptionally(unwrapCompletionException(err2)); 3465 } else { 3466 aggregator.append(stats); 3467 } 3468 })); 3469 } 3470 addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 3471 (ret, err3) -> { 3472 if (err3 != null) { 3473 future.completeExceptionally(unwrapCompletionException(err3)); 3474 } else { 3475 future.complete(aggregator.sum()); 3476 } 3477 }); 3478 }); 3479 return future; 3480 } 3481 3482 @Override 3483 public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName, 3484 boolean preserveSplits) { 3485 CompletableFuture<Void> future = new CompletableFuture<>(); 3486 addListener(tableExists(tableName), (exist, err) -> { 3487 if (err != null) { 3488 future.completeExceptionally(err); 3489 return; 3490 } 3491 if (!exist) { 3492 future.completeExceptionally(new TableNotFoundException(tableName)); 3493 return; 3494 } 3495 addListener(tableExists(newTableName), (exist1, err1) -> { 3496 if (err1 != null) { 3497 future.completeExceptionally(err1); 3498 return; 3499 } 3500 if (exist1) { 3501 future.completeExceptionally(new TableExistsException(newTableName)); 3502 return; 3503 } 3504 addListener(getDescriptor(tableName), (tableDesc, err2) -> { 3505 if (err2 != null) { 3506 future.completeExceptionally(err2); 3507 return; 3508 } 3509 TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc); 3510 if (preserveSplits) { 3511 addListener(getTableSplits(tableName), (splits, err3) -> { 3512 if (err3 != null) { 3513 future.completeExceptionally(err3); 3514 } else { 3515 addListener(createTable(newTableDesc, splits), (result, err4) -> { 3516 if (err4 != null) { 3517 future.completeExceptionally(err4); 3518 } else { 3519 future.complete(result); 3520 } 3521 }); 3522 } 3523 }); 3524 } else { 3525 addListener(createTable(newTableDesc), (result, err5) -> { 3526 if (err5 != null) { 3527 future.completeExceptionally(err5); 3528 } else { 3529 future.complete(result); 3530 } 3531 }); 3532 } 3533 }); 3534 }); 3535 }); 3536 return future; 3537 } 3538 3539 private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName, 3540 List<RegionInfo> hris) { 3541 return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this 3542 .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall( 3543 controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris), 3544 (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done), 3545 resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats()))) 3546 .serverName(serverName).call(); 3547 } 3548}